habemus field distribution

This commit is contained in:
Louis Dureuil 2024-10-30 10:06:46 +01:00
parent 4ebedf4dc8
commit 0f6a1dbce7
No known key found for this signature in database

View File

@ -56,13 +56,13 @@ struct DocumentExtractor<'a> {
} }
impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
type Data = FullySend<()>; type Data = FullySend<RefCell<HashMap<String, i64>>>;
fn init_data( fn init_data(
&self, &self,
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>, _extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> { ) -> Result<Self::Data> {
Ok(FullySend(())) Ok(FullySend(Default::default()))
} }
fn process<'doc>( fn process<'doc>(
@ -71,6 +71,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
context: &DocumentChangeContext<Self::Data>, context: &DocumentChangeContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
let mut document_buffer = Vec::new(); let mut document_buffer = Vec::new();
let mut field_distribution_delta = context.data.0.borrow_mut();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
@ -82,10 +83,34 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
match change { match change {
DocumentChange::Deletion(deletion) => { DocumentChange::Deletion(deletion) => {
let docid = deletion.docid(); let docid = deletion.docid();
let content = deletion.current(
&context.txn,
context.index,
&context.db_fields_ids_map,
)?;
for res in content.iter_top_level_fields() {
let (f, _) = res?;
let entry = field_distribution_delta.entry_ref(f).or_default();
*entry -= 1;
}
self.document_sender.delete(docid, external_docid).unwrap(); self.document_sender.delete(docid, external_docid).unwrap();
} }
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
let docid = update.docid(); let docid = update.docid();
let content =
update.current(&context.txn, context.index, &context.db_fields_ids_map)?;
for res in content.iter_top_level_fields() {
let (f, _) = res?;
let entry = field_distribution_delta.entry_ref(f).or_default();
*entry -= 1;
}
let content = update.updated();
for res in content.iter_top_level_fields() {
let (f, _) = res?;
let entry = field_distribution_delta.entry_ref(f).or_default();
*entry += 1;
}
let content = let content =
update.merged(&context.txn, context.index, &context.db_fields_ids_map)?; update.merged(&context.txn, context.index, &context.db_fields_ids_map)?;
let vector_content = update.merged_vectors( let vector_content = update.merged_vectors(
@ -105,6 +130,11 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
DocumentChange::Insertion(insertion) => { DocumentChange::Insertion(insertion) => {
let docid = insertion.docid(); let docid = insertion.docid();
let content = insertion.inserted(); let content = insertion.inserted();
for res in content.iter_top_level_fields() {
let (f, _) = res?;
let entry = field_distribution_delta.entry_ref(f).or_default();
*entry += 1;
}
let inserted_vectors = insertion.inserted_vectors(&context.doc_alloc)?; let inserted_vectors = insertion.inserted_vectors(&context.doc_alloc)?;
let content = write_to_obkv( let content = write_to_obkv(
&content, &content,
@ -163,9 +193,13 @@ where
fields_ids_map_store: &fields_ids_map_store, fields_ids_map_store: &fields_ids_map_store,
}; };
let mut field_distribution = index.field_distribution(wtxn)?;
thread::scope(|s| -> Result<()> { thread::scope(|s| -> Result<()> {
let indexer_span = tracing::Span::current(); let indexer_span = tracing::Span::current();
let embedders = &embedders; let embedders = &embedders;
// prevent moving the field_distribution in the inner closure...
let field_distribution = &mut field_distribution;
// TODO manage the errors correctly // TODO manage the errors correctly
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| { pool.in_place_scope(|_s| {
@ -178,6 +212,17 @@ where
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?;
for field_distribution_delta in datastore {
let field_distribution_delta = field_distribution_delta.0.into_inner();
for (field, delta) in field_distribution_delta {
let current = field_distribution.entry(field).or_default();
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta);
}
}
field_distribution.retain(|_, v| *v == 0);
document_sender.finish().unwrap(); document_sender.finish().unwrap();
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
@ -479,7 +524,7 @@ where
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?; let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?;
inner_index_settings.recompute_facets(wtxn, index)?; inner_index_settings.recompute_facets(wtxn, index)?;
inner_index_settings.recompute_searchables(wtxn, index)?; inner_index_settings.recompute_searchables(wtxn, index)?;
index.put_field_distribution(wtxn, &field_distribution)?;
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
Ok(()) Ok(())