From 14261f8f045fdaa026526fe54e4d830e9301d988 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 1 Oct 2024 16:13:08 +0200 Subject: [PATCH] Integrate facet level bulk update Only the facet bulk update has been added so far, the incremental must be completely rewritten Factorize facet merging Fix facet level extraction --- milli/src/update/new/channel.rs | 43 ++++--- .../new/extract/faceted/extract_facets.rs | 4 +- milli/src/update/new/extract/faceted/mod.rs | 8 ++ milli/src/update/new/indexer/mod.rs | 73 ++++++++--- milli/src/update/new/merger.rs | 120 +++++++++++++----- 5 files changed, 181 insertions(+), 67 deletions(-) diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index bd06b5123..bcac0fa03 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -120,6 +120,7 @@ pub struct WriterOperation { entry: EntryOperation, } +#[derive(Debug)] pub enum Database { Documents, ExternalDocumentsIds, @@ -158,6 +159,18 @@ impl Database { } } +impl From for Database { + fn from(value: FacetKind) -> Self { + match value { + FacetKind::Number => Database::FacetIdF64NumberDocids, + FacetKind::String => Database::FacetIdStringDocids, + FacetKind::Null => Database::FacetIdIsNullDocids, + FacetKind::Empty => Database::FacetIdIsEmptyDocids, + FacetKind::Exists => Database::FacetIdExistsDocids, + } + } +} + impl WriterOperation { pub fn database(&self, index: &Index) -> heed::Database { self.database.database(index) @@ -395,8 +408,18 @@ pub struct FacetDocidsSender<'a> { impl DocidsSender for FacetDocidsSender<'_> { fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { - let (database, key) = self.extract_database(key); - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); + let (facet_kind, key) = FacetKind::extract_from_key(key); + let database = Database::from(facet_kind); + // let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); + let entry = match facet_kind { + // skip level group size + FacetKind::String | FacetKind::Number => { + // add facet group size + let value = [&[1], value].concat(); + EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &value)) + } + _ => EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)), + }; match self.sender.send(WriterOperation { database, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), @@ -404,7 +427,8 @@ impl DocidsSender for FacetDocidsSender<'_> { } fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { - let (database, key) = self.extract_database(key); + let (facet_kind, key) = FacetKind::extract_from_key(key); + let database = Database::from(facet_kind); let entry = EntryOperation::Delete(KeyEntry::from_key(key)); match self.sender.send(WriterOperation { database, entry }) { Ok(()) => Ok(()), @@ -413,19 +437,6 @@ impl DocidsSender for FacetDocidsSender<'_> { } } -impl FacetDocidsSender<'_> { - fn extract_database<'a>(&self, key: &'a [u8]) -> (Database, &'a [u8]) { - let database = match FacetKind::from(key[0]) { - FacetKind::Number => Database::FacetIdF64NumberDocids, - FacetKind::String => Database::FacetIdStringDocids, - FacetKind::Null => Database::FacetIdIsNullDocids, - FacetKind::Empty => Database::FacetIdIsEmptyDocids, - FacetKind::Exists => Database::FacetIdExistsDocids, - }; - (database, &key[1..]) - } -} - pub struct DocumentsSender<'a>(&'a MergerSender); impl DocumentsSender<'_> { diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index e4e6f7010..8ffec68f3 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -129,7 +129,7 @@ impl FacetedDocidsExtractor { buffer.clear(); buffer.push(FacetKind::Number as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - buffer.push(1); // level 0 + buffer.push(0); // level 0 buffer.extend_from_slice(&ordered); buffer.extend_from_slice(&n.to_be_bytes()); @@ -145,7 +145,7 @@ impl FacetedDocidsExtractor { buffer.clear(); buffer.push(FacetKind::String as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - buffer.push(1); // level 0 + buffer.push(0); // level 0 buffer.extend_from_slice(truncated.as_bytes()); cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) } diff --git a/milli/src/update/new/extract/faceted/mod.rs b/milli/src/update/new/extract/faceted/mod.rs index a59c64d9a..65e90cdf4 100644 --- a/milli/src/update/new/extract/faceted/mod.rs +++ b/milli/src/update/new/extract/faceted/mod.rs @@ -4,6 +4,7 @@ mod facet_document; pub use extract_facets::FacetedDocidsExtractor; #[repr(u8)] +#[derive(Debug, Clone, Copy)] pub enum FacetKind { Number = 0, String = 1, @@ -24,3 +25,10 @@ impl From for FacetKind { } } } + +impl FacetKind { + pub fn extract_from_key<'k>(key: &'k [u8]) -> (FacetKind, &'k [u8]) { + debug_assert!(key.len() > 3); + (FacetKind::from(key[0]), &key[1..]) + } +} diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 17de2b310..4d89a839e 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -13,18 +13,19 @@ pub use update_by_function::UpdateByFunction; use super::channel::*; use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::extract::*; -use super::merger::merge_grenad_entries; +use super::merger::{merge_grenad_entries, FacetFieldIdsDelta}; use super::word_fst_builder::PrefixDelta; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, }; use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; +use crate::facet::FacetType; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; -use crate::update::GrenadParameters; use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::update::{FacetsUpdateBulk, GrenadParameters}; mod document_deletion; mod document_operation; @@ -71,11 +72,11 @@ where let global_fields_ids_map_clone = global_fields_ids_map.clone(); thread::scope(|s| { + let indexer_span = tracing::Span::current(); // TODO manage the errors correctly - let current_span = tracing::Span::current(); let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.in_place_scope(|_s| { - let span = tracing::trace_span!(target: "indexing::documents", parent: ¤t_span, "extract"); + let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); let _entered = span.enter(); let document_changes = document_changes.into_par_iter(); @@ -179,11 +180,11 @@ where }) })?; + let indexer_span = tracing::Span::current(); // TODO manage the errors correctly - let current_span = tracing::Span::current(); let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || { let span = - tracing::trace_span!(target: "indexing::documents", parent: ¤t_span, "merge"); + tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge"); let _entered = span.enter(); let rtxn = index.read_txn().unwrap(); merge_grenad_entries( @@ -211,17 +212,12 @@ where handle.join().unwrap()?; let merger_result = merger_thread.join().unwrap()?; - if let Some(prefix_delta) = merger_result.prefix_delta { - let span = tracing::trace_span!(target: "indexing", "prefix"); - let _entered = span.enter(); + if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta { + compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; + } - let PrefixDelta { modified, deleted } = prefix_delta; - // Compute word prefix docids - compute_word_prefix_docids(wtxn, index, &modified, &deleted)?; - // Compute word prefix fid docids - compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?; - // Compute word prefix position docids - compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)?; + if let Some(prefix_delta) = merger_result.prefix_delta { + compute_prefix_database(index, wtxn, prefix_delta)?; } Ok(()) as Result<_> @@ -238,6 +234,51 @@ where Ok(()) } +#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] +fn compute_prefix_database( + index: &Index, + wtxn: &mut RwTxn, + prefix_delta: PrefixDelta, +) -> Result<()> { + let PrefixDelta { modified, deleted } = prefix_delta; + // Compute word prefix docids + compute_word_prefix_docids(wtxn, index, &modified, &deleted)?; + // Compute word prefix fid docids + compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?; + // Compute word prefix position docids + compute_word_prefix_position_docids(wtxn, index, &modified, &deleted) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] +fn compute_facet_level_database( + index: &Index, + wtxn: &mut RwTxn, + facet_field_ids_delta: FacetFieldIdsDelta, +) -> Result<()> { + if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); + let _entered = span.enter(); + FacetsUpdateBulk::new_not_updating_level_0( + index, + modified_facet_string_ids, + FacetType::String, + ) + .execute(wtxn)?; + } + if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); + let _entered = span.enter(); + FacetsUpdateBulk::new_not_updating_level_0( + index, + modified_facet_number_ids, + FacetType::Number, + ) + .execute(wtxn)?; + } + + Ok(()) +} + /// TODO: GrenadParameters::default() should be removed in favor a passed parameter /// TODO: manage the errors correctly /// TODO: we must have a single trait that also gives the extractor type diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index c010a5d83..9751be66c 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -16,7 +16,9 @@ use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::new::word_fst_builder::WordFstBuilder; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Prefix, Result}; +use crate::{ + CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Prefix, Result, +}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] @@ -188,13 +190,17 @@ pub fn merge_grenad_entries( let span = tracing::trace_span!(target: "indexing::documents::merge", "facet_docids"); let _entered = span.enter(); + let mut facet_field_ids_delta = FacetFieldIdsDelta::new(); merge_and_send_facet_docids( merger, FacetDatabases::new(index), rtxn, &mut buffer, sender.facet_docids(), + &mut facet_field_ids_delta, )?; + + merger_result.facet_field_ids_delta = Some(facet_field_ids_delta); } } } @@ -218,6 +224,8 @@ pub fn merge_grenad_entries( pub struct MergerResult { /// The delta of the prefixes pub prefix_delta: Option, + /// The field ids that have been modified + pub facet_field_ids_delta: Option, } pub struct GeoExtractor { @@ -308,20 +316,23 @@ fn merge_and_send_facet_docids( rtxn: &RoTxn<'_>, buffer: &mut Vec, docids_sender: impl DocidsSender, + facet_field_ids_delta: &mut FacetFieldIdsDelta, ) -> Result<()> { let mut merger_iter = merger.into_stream_merger_iter().unwrap(); while let Some((key, deladd)) = merger_iter.next().unwrap() { - let current = database.get(rtxn, key)?; + let current = database.get_cbo_roaring_bytes_value(rtxn, key)?; let deladd: &KvReaderDelAdd = deladd.into(); let del = deladd.get(DelAdd::Deletion); let add = deladd.get(DelAdd::Addition); match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { + facet_field_ids_delta.register_from_key(key); let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); docids_sender.write(key, value).unwrap(); } Operation::Delete => { + facet_field_ids_delta.register_from_key(key); docids_sender.delete(key).unwrap(); } Operation::Ignore => (), @@ -331,43 +342,84 @@ fn merge_and_send_facet_docids( Ok(()) } -struct FacetDatabases { - /// Maps the facet field id and the docids for which this field exists - facet_id_exists_docids: Database, - /// Maps the facet field id and the docids for which this field is set as null - facet_id_is_null_docids: Database, - /// Maps the facet field id and the docids for which this field is considered empty - facet_id_is_empty_docids: Database, - /// Maps the facet field id and ranges of numbers with the docids that corresponds to them. - facet_id_f64_docids: Database, - /// Maps the facet field id and ranges of strings with the docids that corresponds to them. - facet_id_string_docids: Database, +struct FacetDatabases<'a> { + index: &'a Index, } -impl FacetDatabases { - fn new(index: &Index) -> Self { - Self { - facet_id_exists_docids: index.facet_id_exists_docids.remap_types(), - facet_id_is_null_docids: index.facet_id_is_null_docids.remap_types(), - facet_id_is_empty_docids: index.facet_id_is_empty_docids.remap_types(), - facet_id_f64_docids: index.facet_id_f64_docids.remap_types(), - facet_id_string_docids: index.facet_id_string_docids.remap_types(), - } +impl<'a> FacetDatabases<'a> { + fn new(index: &'a Index) -> Self { + Self { index } } - fn get<'a>(&self, rtxn: &'a RoTxn<'_>, key: &[u8]) -> heed::Result> { - let (facet_kind, key) = self.extract_facet_kind(key); + fn get_cbo_roaring_bytes_value<'t>( + &self, + rtxn: &'t RoTxn<'_>, + key: &[u8], + ) -> heed::Result> { + let (facet_kind, key) = FacetKind::extract_from_key(key); + + let value = + super::channel::Database::from(facet_kind).database(self.index).get(rtxn, key)?; match facet_kind { - FacetKind::Exists => self.facet_id_exists_docids.get(rtxn, key), - FacetKind::Null => self.facet_id_is_null_docids.get(rtxn, key), - FacetKind::Empty => self.facet_id_is_empty_docids.get(rtxn, key), - FacetKind::Number => self.facet_id_f64_docids.get(rtxn, key), - FacetKind::String => self.facet_id_string_docids.get(rtxn, key), + // skip level group size + FacetKind::String | FacetKind::Number => Ok(value.map(|v| &v[1..])), + _ => Ok(value), + } + } +} + +#[derive(Debug)] +pub struct FacetFieldIdsDelta { + /// The field ids that have been modified + modified_facet_string_ids: HashSet, + modified_facet_number_ids: HashSet, +} + +impl FacetFieldIdsDelta { + fn new() -> Self { + Self { + modified_facet_string_ids: HashSet::new(), + modified_facet_number_ids: HashSet::new(), } } - fn extract_facet_kind<'a>(&self, key: &'a [u8]) -> (FacetKind, &'a [u8]) { - (FacetKind::from(key[0]), &key[1..]) + fn register_facet_string_id(&mut self, field_id: FieldId) { + self.modified_facet_string_ids.insert(field_id); + } + + fn register_facet_number_id(&mut self, field_id: FieldId) { + self.modified_facet_number_ids.insert(field_id); + } + + fn register_from_key(&mut self, key: &[u8]) { + let (facet_kind, field_id) = self.extract_key_data(key); + match facet_kind { + FacetKind::Number => self.register_facet_number_id(field_id), + FacetKind::String => self.register_facet_string_id(field_id), + _ => (), + } + } + + fn extract_key_data<'a>(&self, key: &'a [u8]) -> (FacetKind, FieldId) { + let facet_kind = FacetKind::from(key[0]); + let field_id = FieldId::from_be_bytes([key[1], key[2]]); + (facet_kind, field_id) + } + + pub fn modified_facet_string_ids(&self) -> Option> { + if self.modified_facet_string_ids.is_empty() { + None + } else { + Some(self.modified_facet_string_ids.iter().copied().collect()) + } + } + + pub fn modified_facet_number_ids(&self) -> Option> { + if self.modified_facet_number_ids.is_empty() { + None + } else { + Some(self.modified_facet_number_ids.iter().copied().collect()) + } } } @@ -396,11 +448,13 @@ fn merge_cbo_bitmaps( (Some(current), None, Some(add)) => Ok(Operation::Write(current | add)), (Some(current), Some(del), add) => { let output = match add { - Some(add) => (current - del) | add, - None => current - del, + Some(add) => (¤t - del) | add, + None => ¤t - del, }; if output.is_empty() { Ok(Operation::Delete) + } else if current == output { + Ok(Operation::Ignore) } else { Ok(Operation::Write(output)) }