From 500ddc76b549fb9f1af54b2dd6abfa15960381bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 21 May 2024 16:16:36 +0200 Subject: [PATCH] Make the flattened sorter optional --- milli/src/update/index_documents/mod.rs | 36 +++++++++------- milli/src/update/index_documents/transform.rs | 43 ++++++++++++------- milli/src/update/settings.rs | 1 + 3 files changed, 49 insertions(+), 31 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index cceb25338..dccfbe795 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -360,7 +360,10 @@ where let min_chunk_size = 1024 * 512; // 512KiB // compute the chunk size from the number of available threads and the inputed data size. - let total_size = flattened_documents.metadata().map(|m| m.len()); + let total_size = match flattened_documents.as_ref() { + Some(flattened_documents) => flattened_documents.metadata().map(|m| m.len()), + None => Ok(default_chunk_size as u64), + }; let current_num_threads = pool.current_num_threads(); // if we have more than 2 thread, create a number of chunk equal to 3/4 threads count let chunk_count = if current_num_threads > 2 { @@ -374,11 +377,14 @@ where } }; - let flattened_documents = grenad::Reader::new(flattened_documents)?; let original_documents = match original_documents { Some(original_documents) => Some(grenad::Reader::new(original_documents)?), None => None, }; + let flattened_documents = match flattened_documents { + Some(flattened_documents) => Some(grenad::Reader::new(flattened_documents)?), + None => None, + }; let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; @@ -400,22 +406,20 @@ where let _enter = child_span.enter(); puffin::profile_scope!("extract_and_send_grenad_chunks"); // split obkv file into several chunks - let original_chunk_iter = - match original_documents { - Some(original_documents) => { - grenad_obkv_into_chunks( - original_documents, - pool_params, - documents_chunk_size - ) - .map(either::Either::Left) - }, - None => Ok(either::Right(iter::empty())), - }; + let original_chunk_iter = match original_documents { + Some(original_documents) => { + grenad_obkv_into_chunks(original_documents,pool_params,documents_chunk_size).map(either::Left) + }, + None => Ok(either::Right(iter::empty())), + }; // split obkv file into several chunks - let flattened_chunk_iter = - grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size); + let flattened_chunk_iter = match flattened_documents { + Some(flattened_documents) => { + grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size).map(either::Left) + }, + None => Ok(either::Right(iter::empty())), + }; let result = original_chunk_iter.and_then(|original_chunk| { let flattened_chunk = flattened_chunk_iter?; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index f7e3d79fd..8bedd778e 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -34,7 +34,7 @@ pub struct TransformOutput { pub field_distribution: FieldDistribution, pub documents_count: usize, pub original_documents: Option, - pub flattened_documents: File, + pub flattened_documents: Option, } /// Extract the external ids, deduplicate and compute the new internal documents ids @@ -825,9 +825,9 @@ impl<'a, 'i> Transform<'a, 'i> { original_documents: Some( original_documents.into_inner().map_err(|err| err.into_error())?, ), - flattened_documents: flattened_documents - .into_inner() - .map_err(|err| err.into_error())?, + flattened_documents: Some( + flattened_documents.into_inner().map_err(|err| err.into_error())?, + ), }) } @@ -840,6 +840,9 @@ impl<'a, 'i> Transform<'a, 'i> { original_obkv_buffer: &mut Vec, flattened_obkv_buffer: &mut Vec, ) -> Result<()> { + /// TODO do a XOR of the faceted fields + /// TODO if reindex_searchable returns true store all searchables else none + /// TODO no longer useful after Tamo's PR let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone(); let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone(); let mut obkv_writer = KvWriter::<_, FieldId>::memory(); @@ -907,14 +910,19 @@ impl<'a, 'i> Transform<'a, 'i> { }; // We initialize the sorter with the user indexing settings. - let mut flattened_sorter = create_sorter( - grenad::SortAlgorithm::Stable, - keep_first, - self.indexer_settings.chunk_compression_type, - self.indexer_settings.chunk_compression_level, - self.indexer_settings.max_nb_chunks, - self.indexer_settings.max_memory.map(|mem| mem / 2), - ); + let mut flattened_sorter = + if settings_diff.reindex_searchable() || settings_diff.reindex_facets() { + Some(create_sorter( + grenad::SortAlgorithm::Stable, + keep_first, + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + self.indexer_settings.max_nb_chunks, + self.indexer_settings.max_memory.map(|mem| mem / 2), + )) + } else { + None + }; let mut original_obkv_buffer = Vec::new(); let mut flattened_obkv_buffer = Vec::new(); @@ -938,7 +946,9 @@ impl<'a, 'i> Transform<'a, 'i> { if let Some(original_sorter) = original_sorter.as_mut() { original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?; } - flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?; + if let Some(flattened_sorter) = flattened_sorter.as_mut() { + flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?; + } } let grenad_params = GrenadParameters { @@ -949,7 +959,10 @@ impl<'a, 'i> Transform<'a, 'i> { }; // Once we have written all the documents, we merge everything into a Reader. - let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; + let flattened_documents = match flattened_sorter { + Some(flattened_sorter) => Some(sorter_into_reader(flattened_sorter, grenad_params)?), + None => None, + }; let original_documents = match original_sorter { Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?), None => None, @@ -961,7 +974,7 @@ impl<'a, 'i> Transform<'a, 'i> { settings_diff, documents_count, original_documents: original_documents.map(|od| od.into_inner().into_inner()), - flattened_documents: flattened_documents.into_inner().into_inner(), + flattened_documents: flattened_documents.map(|fd| fd.into_inner().into_inner()), }) } } diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index 0599bb9d8..c7d6ff0fd 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -1099,6 +1099,7 @@ impl InnerIndexSettingsDiff { } pub fn reindex_searchable(&self) -> bool { + // TODO no longer useful after Tamo's PR self.old .fields_ids_map .iter()