Allow library users to specify the rayon ThreadPool for UpdateBuilder

This commit is contained in:
Clément Renault 2020-11-02 19:11:22 +01:00
parent 87902de010
commit 7e120fc441
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 37 additions and 20 deletions

View File

@ -15,6 +15,8 @@ use futures::{FutureExt, StreamExt};
use grenad::CompressionType; use grenad::CompressionType;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use indexmap::IndexMap; use indexmap::IndexMap;
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use serde::{Serialize, Deserialize, Deserializer}; use serde::{Serialize, Deserialize, Deserializer};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::fs::File as TFile; use tokio::fs::File as TFile;
@ -27,6 +29,8 @@ use crate::tokenizer::{simple_tokenizer, TokenType};
use crate::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat}; use crate::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat};
use crate::{Index, UpdateStore, SearchResult}; use crate::{Index, UpdateStore, SearchResult};
static GLOBAL_THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
#[derive(Debug, StructOpt)] #[derive(Debug, StructOpt)]
/// The HTTP main server of the milli project. /// The HTTP main server of the milli project.
pub struct Opt { pub struct Opt {
@ -201,6 +205,11 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(opt.database_size); options.map_size(opt.database_size);
// Setup the global thread pool
let jobs = opt.indexer.indexing_jobs.unwrap_or(0);
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;
GLOBAL_THREAD_POOL.set(pool).unwrap();
// Open the LMDB database. // Open the LMDB database.
let index = Index::new(options, &opt.database)?; let index = Index::new(options, &opt.database)?;
@ -227,9 +236,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level { if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level {
update_builder.chunk_compression_level(chunk_compression_level); update_builder.chunk_compression_level(chunk_compression_level);
} }
if let Some(indexing_jobs) = indexer_opt_cloned.indexing_jobs { update_builder.thread_pool(GLOBAL_THREAD_POOL.get().unwrap());
update_builder.indexing_jobs(indexing_jobs);
}
update_builder.log_every_n(indexer_opt_cloned.log_every_n); update_builder.log_every_n(indexer_opt_cloned.log_every_n);
update_builder.max_memory(indexer_opt_cloned.max_memory); update_builder.max_memory(indexer_opt_cloned.max_memory);
update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size); update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size);

View File

@ -10,6 +10,7 @@ use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
use heed::types::ByteSlice; use heed::types::ByteSlice;
use log::{debug, info, error}; use log::{debug, info, error};
use rayon::prelude::*; use rayon::prelude::*;
use rayon::ThreadPool;
use crate::index::Index; use crate::index::Index;
use self::store::Store; use self::store::Store;
use self::merge_function::{ use self::merge_function::{
@ -191,7 +192,7 @@ pub enum UpdateFormat {
JsonStream, JsonStream,
} }
pub struct IndexDocuments<'t, 'u, 'i> { pub struct IndexDocuments<'t, 'u, 'i, 'a> {
wtxn: &'t mut heed::RwTxn<'i, 'u>, wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
pub(crate) log_every_n: Option<usize>, pub(crate) log_every_n: Option<usize>,
@ -201,14 +202,14 @@ pub struct IndexDocuments<'t, 'u, 'i> {
pub(crate) chunk_compression_type: CompressionType, pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>, pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>, pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) indexing_jobs: Option<usize>, pub(crate) thread_pool: Option<&'a ThreadPool>,
update_method: IndexDocumentsMethod, update_method: IndexDocumentsMethod,
update_format: UpdateFormat, update_format: UpdateFormat,
autogenerate_docids: bool, autogenerate_docids: bool,
} }
impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i> { pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i, 'a> {
IndexDocuments { IndexDocuments {
wtxn, wtxn,
index, index,
@ -219,7 +220,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
chunk_compression_type: CompressionType::None, chunk_compression_type: CompressionType::None,
chunk_compression_level: None, chunk_compression_level: None,
chunk_fusing_shrink_size: None, chunk_fusing_shrink_size: None,
indexing_jobs: None, thread_pool: None,
update_method: IndexDocumentsMethod::ReplaceDocuments, update_method: IndexDocumentsMethod::ReplaceDocuments,
update_format: UpdateFormat::Json, update_format: UpdateFormat::Json,
autogenerate_docids: true, autogenerate_docids: true,
@ -288,7 +289,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
chunk_compression_type: self.chunk_compression_type, chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level, chunk_compression_level: self.chunk_compression_level,
chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, chunk_fusing_shrink_size: self.chunk_fusing_shrink_size,
indexing_jobs: self.indexing_jobs, thread_pool: self.thread_pool,
}; };
let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?;
deletion_builder.delete_documents(&replaced_documents_ids); deletion_builder.delete_documents(&replaced_documents_ids);
@ -323,8 +324,16 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
let log_every_n = self.log_every_n; let log_every_n = self.log_every_n;
let chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; let chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
let jobs = self.indexing_jobs.unwrap_or(0); let backup_pool;
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; let pool = match self.thread_pool {
Some(pool) => pool,
None => {
// We initialize a bakcup pool with the default
// settings if none have already been set.
backup_pool = rayon::ThreadPoolBuilder::new().build()?;
&backup_pool
},
};
let (receiver, docid_word_positions_readers, documents_readers) = pool.install(|| { let (receiver, docid_word_positions_readers, documents_readers) = pool.install(|| {
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();

View File

@ -1,9 +1,10 @@
use grenad::CompressionType; use grenad::CompressionType;
use rayon::ThreadPool;
use crate::Index; use crate::Index;
use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings}; use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings};
pub struct UpdateBuilder { pub struct UpdateBuilder<'a> {
pub(crate) log_every_n: Option<usize>, pub(crate) log_every_n: Option<usize>,
pub(crate) max_nb_chunks: Option<usize>, pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>, pub(crate) max_memory: Option<usize>,
@ -11,11 +12,11 @@ pub struct UpdateBuilder {
pub(crate) chunk_compression_type: CompressionType, pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>, pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>, pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) indexing_jobs: Option<usize>, pub(crate) thread_pool: Option<&'a ThreadPool>,
} }
impl UpdateBuilder { impl<'a> UpdateBuilder<'a> {
pub fn new() -> UpdateBuilder { pub fn new() -> UpdateBuilder<'a> {
UpdateBuilder { UpdateBuilder {
log_every_n: None, log_every_n: None,
max_nb_chunks: None, max_nb_chunks: None,
@ -24,7 +25,7 @@ impl UpdateBuilder {
chunk_compression_type: CompressionType::None, chunk_compression_type: CompressionType::None,
chunk_compression_level: None, chunk_compression_level: None,
chunk_fusing_shrink_size: None, chunk_fusing_shrink_size: None,
indexing_jobs: None, thread_pool: None,
} }
} }
@ -56,8 +57,8 @@ impl UpdateBuilder {
self.chunk_fusing_shrink_size = Some(chunk_fusing_shrink_size); self.chunk_fusing_shrink_size = Some(chunk_fusing_shrink_size);
} }
pub fn indexing_jobs(&mut self, indexing_jobs: usize) { pub fn thread_pool(&mut self, thread_pool: &'a ThreadPool) {
self.indexing_jobs = Some(indexing_jobs); self.thread_pool = Some(thread_pool);
} }
pub fn clear_documents<'t, 'u, 'i>( pub fn clear_documents<'t, 'u, 'i>(
@ -82,7 +83,7 @@ impl UpdateBuilder {
self, self,
wtxn: &'t mut heed::RwTxn<'i, 'u>, wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
) -> IndexDocuments<'t, 'u, 'i> ) -> IndexDocuments<'t, 'u, 'i, 'a>
{ {
let mut builder = IndexDocuments::new(wtxn, index); let mut builder = IndexDocuments::new(wtxn, index);
@ -93,7 +94,7 @@ impl UpdateBuilder {
builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level; builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
builder.indexing_jobs = self.indexing_jobs; builder.thread_pool = self.thread_pool;
builder builder
} }