diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 6e36f4c5d..1b2ee48de 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -15,6 +15,8 @@ use futures::{FutureExt, StreamExt}; use grenad::CompressionType; use heed::EnvOpenOptions; use indexmap::IndexMap; +use once_cell::sync::OnceCell; +use rayon::ThreadPool; use serde::{Serialize, Deserialize, Deserializer}; use structopt::StructOpt; use tokio::fs::File as TFile; @@ -27,6 +29,8 @@ use crate::tokenizer::{simple_tokenizer, TokenType}; use crate::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat}; use crate::{Index, UpdateStore, SearchResult}; +static GLOBAL_THREAD_POOL: OnceCell = OnceCell::new(); + #[derive(Debug, StructOpt)] /// The HTTP main server of the milli project. pub struct Opt { @@ -201,6 +205,11 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let mut options = EnvOpenOptions::new(); options.map_size(opt.database_size); + // Setup the global thread pool + let jobs = opt.indexer.indexing_jobs.unwrap_or(0); + let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; + GLOBAL_THREAD_POOL.set(pool).unwrap(); + // Open the LMDB database. let index = Index::new(options, &opt.database)?; @@ -227,9 +236,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level { update_builder.chunk_compression_level(chunk_compression_level); } - if let Some(indexing_jobs) = indexer_opt_cloned.indexing_jobs { - update_builder.indexing_jobs(indexing_jobs); - } + update_builder.thread_pool(GLOBAL_THREAD_POOL.get().unwrap()); update_builder.log_every_n(indexer_opt_cloned.log_every_n); update_builder.max_memory(indexer_opt_cloned.max_memory); update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size); diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 8860c1a93..1606ecd32 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -10,6 +10,7 @@ use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; use log::{debug, info, error}; use rayon::prelude::*; +use rayon::ThreadPool; use crate::index::Index; use self::store::Store; use self::merge_function::{ @@ -191,7 +192,7 @@ pub enum UpdateFormat { JsonStream, } -pub struct IndexDocuments<'t, 'u, 'i> { +pub struct IndexDocuments<'t, 'u, 'i, 'a> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, pub(crate) log_every_n: Option, @@ -201,14 +202,14 @@ pub struct IndexDocuments<'t, 'u, 'i> { pub(crate) chunk_compression_type: CompressionType, pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, - pub(crate) indexing_jobs: Option, + pub(crate) thread_pool: Option<&'a ThreadPool>, update_method: IndexDocumentsMethod, update_format: UpdateFormat, autogenerate_docids: bool, } -impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i> { +impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { + pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i, 'a> { IndexDocuments { wtxn, index, @@ -219,7 +220,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { chunk_compression_type: CompressionType::None, chunk_compression_level: None, chunk_fusing_shrink_size: None, - indexing_jobs: None, + thread_pool: None, update_method: IndexDocumentsMethod::ReplaceDocuments, update_format: UpdateFormat::Json, autogenerate_docids: true, @@ -288,7 +289,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { chunk_compression_type: self.chunk_compression_type, chunk_compression_level: self.chunk_compression_level, chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, - indexing_jobs: self.indexing_jobs, + thread_pool: self.thread_pool, }; let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; deletion_builder.delete_documents(&replaced_documents_ids); @@ -323,8 +324,16 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { let log_every_n = self.log_every_n; let chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; - let jobs = self.indexing_jobs.unwrap_or(0); - let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; + let backup_pool; + let pool = match self.thread_pool { + Some(pool) => pool, + None => { + // We initialize a bakcup pool with the default + // settings if none have already been set. + backup_pool = rayon::ThreadPoolBuilder::new().build()?; + &backup_pool + }, + }; let (receiver, docid_word_positions_readers, documents_readers) = pool.install(|| { let num_threads = rayon::current_num_threads(); diff --git a/src/update/update_builder.rs b/src/update/update_builder.rs index da7ee287b..a4a70be91 100644 --- a/src/update/update_builder.rs +++ b/src/update/update_builder.rs @@ -1,9 +1,10 @@ use grenad::CompressionType; +use rayon::ThreadPool; use crate::Index; use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings}; -pub struct UpdateBuilder { +pub struct UpdateBuilder<'a> { pub(crate) log_every_n: Option, pub(crate) max_nb_chunks: Option, pub(crate) max_memory: Option, @@ -11,11 +12,11 @@ pub struct UpdateBuilder { pub(crate) chunk_compression_type: CompressionType, pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, - pub(crate) indexing_jobs: Option, + pub(crate) thread_pool: Option<&'a ThreadPool>, } -impl UpdateBuilder { - pub fn new() -> UpdateBuilder { +impl<'a> UpdateBuilder<'a> { + pub fn new() -> UpdateBuilder<'a> { UpdateBuilder { log_every_n: None, max_nb_chunks: None, @@ -24,7 +25,7 @@ impl UpdateBuilder { chunk_compression_type: CompressionType::None, chunk_compression_level: None, chunk_fusing_shrink_size: None, - indexing_jobs: None, + thread_pool: None, } } @@ -56,8 +57,8 @@ impl UpdateBuilder { self.chunk_fusing_shrink_size = Some(chunk_fusing_shrink_size); } - pub fn indexing_jobs(&mut self, indexing_jobs: usize) { - self.indexing_jobs = Some(indexing_jobs); + pub fn thread_pool(&mut self, thread_pool: &'a ThreadPool) { + self.thread_pool = Some(thread_pool); } pub fn clear_documents<'t, 'u, 'i>( @@ -82,7 +83,7 @@ impl UpdateBuilder { self, wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, - ) -> IndexDocuments<'t, 'u, 'i> + ) -> IndexDocuments<'t, 'u, 'i, 'a> { let mut builder = IndexDocuments::new(wtxn, index); @@ -93,7 +94,7 @@ impl UpdateBuilder { builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_level = self.chunk_compression_level; builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; - builder.indexing_jobs = self.indexing_jobs; + builder.thread_pool = self.thread_pool; builder }