Use thread pool in process index op

This commit is contained in:
Louis Dureuil 2025-03-01 23:46:37 +01:00
parent b7d5576347
commit 93ba4b924a
2 changed files with 45 additions and 15 deletions

View File

@ -3,6 +3,7 @@ use bumpalo::Bump;
use meilisearch_types::heed::RwTxn; use meilisearch_types::heed::RwTxn;
use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::update::new::indexer::document_changes::CHUNK_SIZE;
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult; use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder};
@ -112,17 +113,24 @@ impl IndexScheduler {
let local_pool; let local_pool;
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.rayon_thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new() local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}")) .thread_name(|i| format!("rayon-{i}"))
.build() .build()
.unwrap(); .unwrap();
&local_pool &local_pool
} }
}; };
let thread_pool = match &indexer_config.thread_pool {
Some(thread_pool) => thread_pool,
None => {
&scoped_thread_pool::ThreadPool::with_available_parallelism("index".into())
}
};
progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges);
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@ -133,6 +141,8 @@ impl IndexScheduler {
&mut new_fields_ids_map, &mut new_fields_ids_map,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
progress.clone(), progress.clone(),
thread_pool,
CHUNK_SIZE,
) )
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
@ -173,6 +183,7 @@ impl IndexScheduler {
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
thread_pool,
pool, pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -261,7 +272,7 @@ impl IndexScheduler {
if task.error.is_none() { if task.error.is_none() {
let local_pool; let local_pool;
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.rayon_thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new() local_pool = ThreadPoolNoAbortBuilder::new()
@ -272,16 +283,19 @@ impl IndexScheduler {
} }
}; };
let thread_pool = match &indexer_config.thread_pool {
Some(thread_pool) => thread_pool,
None => &scoped_thread_pool::ThreadPool::with_available_parallelism(
"index".into(),
),
};
let candidates_count = candidates.len(); let candidates_count = candidates.len();
progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges); progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges);
let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone());
let document_changes = pool let document_changes = indexer
.install(|| { .into_changes(&primary_key, &indexer_alloc, thread_pool, CHUNK_SIZE)
indexer .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
.into_changes(&primary_key)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))
})
.unwrap()?;
let embedders = index let embedders = index
.embedding_configs(index_wtxn) .embedding_configs(index_wtxn)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
@ -291,6 +305,7 @@ impl IndexScheduler {
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
thread_pool,
pool, pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
@ -421,7 +436,7 @@ impl IndexScheduler {
if !tasks.iter().all(|res| res.error.is_some()) { if !tasks.iter().all(|res| res.error.is_some()) {
let local_pool; let local_pool;
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.rayon_thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new() local_pool = ThreadPoolNoAbortBuilder::new()
@ -432,11 +447,19 @@ impl IndexScheduler {
} }
}; };
let thread_pool = match &indexer_config.thread_pool {
Some(thread_pool) => thread_pool,
None => &scoped_thread_pool::ThreadPool::with_available_parallelism(
"index".into(),
),
};
progress.update_progress(DocumentDeletionProgress::DeleteDocuments); progress.update_progress(DocumentDeletionProgress::DeleteDocuments);
let mut indexer = indexer::DocumentDeletion::new(); let mut indexer = indexer::DocumentDeletion::new();
let candidates_count = to_delete.len(); let candidates_count = to_delete.len();
indexer.delete_documents_by_docids(to_delete); indexer.delete_documents_by_docids(to_delete);
let document_changes = indexer.into_changes(&indexer_alloc, primary_key); let document_changes =
indexer.into_changes(&indexer_alloc, primary_key, thread_pool, CHUNK_SIZE);
let embedders = index let embedders = index
.embedding_configs(index_wtxn) .embedding_configs(index_wtxn)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
@ -446,6 +469,7 @@ impl IndexScheduler {
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
thread_pool,
pool, pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,

View File

@ -743,15 +743,21 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> { fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
let thread_pool = ThreadPoolNoAbortBuilder::new() let rayon_thread_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|index| format!("indexing-thread:{index}")) .thread_name(|index| format!("rayon-{index}"))
.num_threads(*other.max_indexing_threads) .num_threads(*other.max_indexing_threads)
.build()?; .build()?;
let thread_pool = Some(scoped_thread_pool::ThreadPool::new(
NonZeroUsize::new(*other.max_indexing_threads).unwrap_or(NonZeroUsize::new(1).unwrap()),
"index".to_string(),
));
Ok(Self { Ok(Self {
log_every_n: Some(DEFAULT_LOG_EVERY_N), log_every_n: Some(DEFAULT_LOG_EVERY_N),
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
thread_pool: Some(thread_pool), rayon_thread_pool: Some(rayon_thread_pool),
thread_pool,
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget, skip_index_budget: other.skip_index_budget,
..Default::default() ..Default::default()