diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 7fba0ffa4..dd0ff781d 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -56,13 +56,13 @@ struct DocumentExtractor<'a> { } impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { - type Data = FullySend<()>; + type Data = FullySend>>; fn init_data( &self, _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, ) -> Result { - Ok(FullySend(())) + Ok(FullySend(Default::default())) } fn process<'doc>( @@ -71,6 +71,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { context: &DocumentChangeContext, ) -> Result<()> { let mut document_buffer = Vec::new(); + let mut field_distribution_delta = context.data.0.borrow_mut(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); @@ -82,10 +83,34 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { match change { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); + let content = deletion.current( + &context.txn, + context.index, + &context.db_fields_ids_map, + )?; + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = field_distribution_delta.entry_ref(f).or_default(); + *entry -= 1; + } self.document_sender.delete(docid, external_docid).unwrap(); } DocumentChange::Update(update) => { let docid = update.docid(); + let content = + update.current(&context.txn, context.index, &context.db_fields_ids_map)?; + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = field_distribution_delta.entry_ref(f).or_default(); + *entry -= 1; + } + let content = update.updated(); + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = field_distribution_delta.entry_ref(f).or_default(); + *entry += 1; + } + let content = update.merged(&context.txn, context.index, &context.db_fields_ids_map)?; let vector_content = update.merged_vectors( @@ -105,6 +130,11 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); let content = insertion.inserted(); + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = field_distribution_delta.entry_ref(f).or_default(); + *entry += 1; + } let inserted_vectors = insertion.inserted_vectors(&context.doc_alloc)?; let content = write_to_obkv( &content, @@ -163,9 +193,13 @@ where fields_ids_map_store: &fields_ids_map_store, }; + let mut field_distribution = index.field_distribution(wtxn)?; + thread::scope(|s| -> Result<()> { let indexer_span = tracing::Span::current(); let embedders = &embedders; + // prevent moving the field_distribution in the inner closure... + let field_distribution = &mut field_distribution; // TODO manage the errors correctly let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.in_place_scope(|_s| { @@ -178,6 +212,17 @@ where let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; + for field_distribution_delta in datastore { + let field_distribution_delta = field_distribution_delta.0.into_inner(); + for (field, delta) in field_distribution_delta { + let current = field_distribution.entry(field).or_default(); + // adding the delta should never cause a negative result, as we are removing fields that previously existed. + *current = current.saturating_add_signed(delta); + } + } + + field_distribution.retain(|_, v| *v == 0); + document_sender.finish().unwrap(); const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; @@ -479,7 +524,7 @@ where let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?; inner_index_settings.recompute_facets(wtxn, index)?; inner_index_settings.recompute_searchables(wtxn, index)?; - + index.put_field_distribution(wtxn, &field_distribution)?; index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; Ok(())