diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 0ebd2d120..1ad25b422 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -1290,7 +1290,6 @@ impl IndexScheduler { let db_fields_ids_map = index.fields_ids_map(&rtxn)?; let mut new_fields_ids_map = db_fields_ids_map.clone(); - let indexer_config = self.index_mapper.indexer_config(); let mut content_files_iter = content_files.iter(); let mut indexer = indexer::DocumentOperation::new(method); let embedders = index.embedding_configs(index_wtxn)?; @@ -1313,6 +1312,7 @@ impl IndexScheduler { } let local_pool; + let indexer_config = self.index_mapper.indexer_config(); let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { @@ -1366,6 +1366,7 @@ impl IndexScheduler { indexer::index( index_wtxn, index, + indexer_config.grenad_parameters(), &db_fields_ids_map, new_fields_ids_map, primary_key, @@ -1456,7 +1457,8 @@ impl IndexScheduler { if task.error.is_none() { let local_pool; - let pool = match &self.index_mapper.indexer_config().thread_pool { + let indexer_config = self.index_mapper.indexer_config(); + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -1474,6 +1476,7 @@ impl IndexScheduler { indexer::index( index_wtxn, index, + indexer_config.grenad_parameters(), &db_fields_ids_map, new_fields_ids_map, None, // cannot change primary key in DocumentEdition @@ -1606,7 +1609,8 @@ impl IndexScheduler { if !tasks.iter().all(|res| res.error.is_some()) { let local_pool; - let pool = match &self.index_mapper.indexer_config().thread_pool { + let indexer_config = self.index_mapper.indexer_config(); + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -1624,6 +1628,7 @@ impl IndexScheduler { indexer::index( index_wtxn, index, + indexer_config.grenad_parameters(), &db_fields_ids_map, new_fields_ids_map, None, // document deletion never changes primary key diff --git a/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs b/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs index b7da39878..62dc40edc 100644 --- a/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/crates/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -119,12 +119,8 @@ impl GrenadParameters { /// /// This should be called inside of a rayon thread pool, /// otherwise, it will take the global number of threads. - /// - /// The max memory cannot exceed a given reasonable value. pub fn max_memory_by_thread(&self) -> Option { - self.max_memory.map(|max_memory| { - (max_memory / rayon::current_num_threads()).min(MAX_GRENAD_SORTER_USAGE) - }) + self.max_memory.map(|max_memory| (max_memory / rayon::current_num_threads())) } } diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 115059a1d..6fb33ad78 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,5 +1,6 @@ use grenad::CompressionType; +use super::GrenadParameters; use crate::thread_pool_no_abort::ThreadPoolNoAbort; #[derive(Debug)] @@ -15,6 +16,17 @@ pub struct IndexerConfig { pub skip_index_budget: bool, } +impl IndexerConfig { + pub fn grenad_parameters(&self) -> GrenadParameters { + GrenadParameters { + chunk_compression_type: self.chunk_compression_type, + chunk_compression_level: self.chunk_compression_level, + max_memory: self.max_memory, + max_nb_chunks: self.max_nb_chunks, + } + } +} + impl Default for IndexerConfig { fn default() -> Self { Self { diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 19e908612..d30a50c52 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -36,7 +36,7 @@ impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(BalancedCaches::new_in( self.buckets, - self.grenad_parameters.max_memory, + self.grenad_parameters.max_memory_by_thread(), extractor_alloc, ))) } diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index e26a7dc6c..e883a04cc 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -150,7 +150,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { ) -> Result<()> { let rtxn = &context.rtxn; let index = context.index; - let max_memory = self.grenad_parameters.max_memory; + let max_memory = self.grenad_parameters.max_memory_by_thread(); let db_fields_ids_map = context.db_fields_ids_map; let mut data_ref = context.data.borrow_mut_or_yield(); diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index c67fc347a..dfb55853f 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -214,7 +214,7 @@ impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in( self.buckets, - self.grenad_parameters.max_memory, + self.grenad_parameters.max_memory_by_thread(), extractor_alloc, )))) } diff --git a/crates/milli/src/update/new/extract/searchable/mod.rs b/crates/milli/src/update/new/extract/searchable/mod.rs index b75a01cd2..46a05be4e 100644 --- a/crates/milli/src/update/new/extract/searchable/mod.rs +++ b/crates/milli/src/update/new/extract/searchable/mod.rs @@ -36,7 +36,7 @@ impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(BalancedCaches::new_in( self.buckets, - self.grenad_parameters.max_memory, + self.grenad_parameters.max_memory_by_thread(), extractor_alloc, ))) } diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 00cc2d2c1..71fcdd204 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -132,6 +132,7 @@ mod steps { pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( wtxn: &mut RwTxn, index: &'index Index, + grenad_parameters: GrenadParameters, db_fields_ids_map: &'indexer FieldsIdsMap, new_fields_ids_map: FieldsIdsMap, new_primary_key: Option>, @@ -209,16 +210,6 @@ where field_distribution.retain(|_, v| *v != 0); - const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; - let current_num_threads = rayon::current_num_threads(); - let max_memory = TEN_GIB / current_num_threads; - eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); - - let grenad_parameters = GrenadParameters { - max_memory: Some(max_memory), - ..GrenadParameters::default() - }; - let facet_field_ids_delta; { @@ -228,7 +219,8 @@ where let (finished_steps, step_name) = steps::extract_facets(); facet_field_ids_delta = merge_and_send_facet_docids( - FacetedDocidsExtractor::run_extraction(grenad_parameters, + FacetedDocidsExtractor::run_extraction( + grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, @@ -344,7 +336,8 @@ where let (finished_steps, step_name) = steps::extract_word_proximity(); - let caches = ::run_extraction(grenad_parameters, + let caches = ::run_extraction( + grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, @@ -398,7 +391,8 @@ where }; let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let (finished_steps, step_name) = steps::extract_geo_points(); - extract(document_changes, + extract( + document_changes, &extractor, indexing_context, &mut extractor_allocs,