diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index eff3740a0..79ef4cdc8 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -3,6 +3,7 @@ use bumpalo::Bump; use meilisearch_types::heed::RwTxn; use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::progress::Progress; +use meilisearch_types::milli::update::new::indexer::document_changes::CHUNK_SIZE; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::DocumentAdditionResult; use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; @@ -112,17 +113,24 @@ impl IndexScheduler { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool = match &indexer_config.rayon_thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) + .thread_name(|i| format!("rayon-{i}")) .build() .unwrap(); &local_pool } }; + let thread_pool = match &indexer_config.thread_pool { + Some(thread_pool) => thread_pool, + None => { + &scoped_thread_pool::ThreadPool::with_available_parallelism("index".into()) + } + }; + progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); let (document_changes, operation_stats, primary_key) = indexer .into_changes( @@ -133,6 +141,8 @@ impl IndexScheduler { &mut new_fields_ids_map, &|| must_stop_processing.get(), progress.clone(), + thread_pool, + CHUNK_SIZE, ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; @@ -173,6 +183,7 @@ impl IndexScheduler { indexer::index( index_wtxn, index, + thread_pool, pool, indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -261,7 +272,7 @@ impl IndexScheduler { if task.error.is_none() { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool = match &indexer_config.rayon_thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -272,16 +283,19 @@ impl IndexScheduler { } }; + let thread_pool = match &indexer_config.thread_pool { + Some(thread_pool) => thread_pool, + None => &scoped_thread_pool::ThreadPool::with_available_parallelism( + "index".into(), + ), + }; + let candidates_count = candidates.len(); progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges); let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); - let document_changes = pool - .install(|| { - indexer - .into_changes(&primary_key) - .map_err(|err| Error::from_milli(err, Some(index_uid.clone()))) - }) - .unwrap()?; + let document_changes = indexer + .into_changes(&primary_key, &indexer_alloc, thread_pool, CHUNK_SIZE) + .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; let embedders = index .embedding_configs(index_wtxn) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; @@ -291,6 +305,7 @@ impl IndexScheduler { indexer::index( index_wtxn, index, + thread_pool, pool, indexer_config.grenad_parameters(), &db_fields_ids_map, @@ -421,7 +436,7 @@ impl IndexScheduler { if !tasks.iter().all(|res| res.error.is_some()) { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool = match &indexer_config.rayon_thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -432,11 +447,19 @@ impl IndexScheduler { } }; + let thread_pool = match &indexer_config.thread_pool { + Some(thread_pool) => thread_pool, + None => &scoped_thread_pool::ThreadPool::with_available_parallelism( + "index".into(), + ), + }; + progress.update_progress(DocumentDeletionProgress::DeleteDocuments); let mut indexer = indexer::DocumentDeletion::new(); let candidates_count = to_delete.len(); indexer.delete_documents_by_docids(to_delete); - let document_changes = indexer.into_changes(&indexer_alloc, primary_key); + let document_changes = + indexer.into_changes(&indexer_alloc, primary_key, thread_pool, CHUNK_SIZE); let embedders = index .embedding_configs(index_wtxn) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; @@ -446,6 +469,7 @@ impl IndexScheduler { indexer::index( index_wtxn, index, + thread_pool, pool, indexer_config.grenad_parameters(), &db_fields_ids_map, diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index acf4393d3..03e6d302f 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -743,15 +743,21 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error; fn try_from(other: &IndexerOpts) -> Result { - let thread_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|index| format!("indexing-thread:{index}")) + let rayon_thread_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|index| format!("rayon-{index}")) .num_threads(*other.max_indexing_threads) .build()?; + let thread_pool = Some(scoped_thread_pool::ThreadPool::new( + NonZeroUsize::new(*other.max_indexing_threads).unwrap_or(NonZeroUsize::new(1).unwrap()), + "index".to_string(), + )); + Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), - thread_pool: Some(thread_pool), + rayon_thread_pool: Some(rayon_thread_pool), + thread_pool, max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default()