From 8b260de5a068cb356d6f5fc1c829dcc8c1c87ed8 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 5 Nov 2024 16:46:43 +0100 Subject: [PATCH] Reimplement facet search and facetr level and put them in dedidcated functions --- milli/src/update/new/channel.rs | 38 ----- milli/src/update/new/facet_search_builder.rs | 30 ++-- milli/src/update/new/indexer/mod.rs | 170 ++++++++++++------- milli/src/update/new/merger.rs | 27 +-- 4 files changed, 125 insertions(+), 140 deletions(-) diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index af6e2215c..d6f2837b6 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -94,8 +94,6 @@ pub enum Database { FacetIdExistsDocids, FacetIdF64NumberDocids, FacetIdStringDocids, - FacetIdNormalizedStringStrings, - FacetIdStringFst, } impl Database { @@ -115,10 +113,6 @@ impl Database { Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(), Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(), Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(), - Database::FacetIdNormalizedStringStrings => { - index.facet_id_normalized_string_strings.remap_types() - } - Database::FacetIdStringFst => index.facet_id_string_fst.remap_types(), } } } @@ -194,10 +188,6 @@ impl ExtractorSender { DocumentsSender(self) } - pub fn facet_searchable(&self) -> FacetSearchableSender<'_> { - FacetSearchableSender { sender: self } - } - pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap( DOCUMENTS_IDS_KEY.as_bytes(), @@ -322,34 +312,6 @@ impl DocidsSender for FacetDocidsSender<'_> { } } -pub struct FacetSearchableSender<'a> { - sender: &'a ExtractorSender, -} - -impl FacetSearchableSender<'_> { - pub fn write_facet(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); - match self - .sender - .send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry }) - { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } - - pub fn delete_facet(&self, key: &[u8]) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Delete(KeyEntry::from_key(key)); - match self - .sender - .send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry }) - { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } -} - pub struct DocumentsSender<'a>(&'a ExtractorSender); impl DocumentsSender<'_> { diff --git a/milli/src/update/new/facet_search_builder.rs b/milli/src/update/new/facet_search_builder.rs index b9db80afb..839120540 100644 --- a/milli/src/update/new/facet_search_builder.rs +++ b/milli/src/update/new/facet_search_builder.rs @@ -6,7 +6,6 @@ use grenad::Sorter; use heed::types::{Bytes, SerdeJson}; use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn}; -use super::channel::FacetSearchableSender; use super::extract::FacetKind; use super::fst_merger_builder::FstMergerBuilder; use super::KvReaderDelAdd; @@ -42,7 +41,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> { None, None, Some(0), - false, + true, ); Self { @@ -115,13 +114,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> { } #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_fst")] - pub fn merge_and_send( - self, - index: &Index, - wtxn: &mut RwTxn, - rtxn: &RoTxn, - sender: FacetSearchableSender, - ) -> Result<()> { + pub fn merge_and_write(self, index: &Index, wtxn: &mut RwTxn, rtxn: &RoTxn) -> Result<()> { let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?; let mut builder = grenad::MergerBuilder::new(MergeDeladdBtreesetString); builder.extend(reader); @@ -138,24 +131,24 @@ impl<'indexer> FacetSearchBuilder<'indexer> { if current_field_id != Some(field_id) { if let Some(fst_merger_builder) = fst_merger_builder { - // send the previous fst to the channel let mmap = fst_merger_builder.build(&mut callback)?; - // sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap(); - todo!("What to do"); + index + .facet_id_string_fst + .remap_data_type::() + .put(wtxn, &field_id, &mmap)?; } - println!("getting fst for field_id: {}", field_id); fst = index.facet_id_string_fst.get(rtxn, &field_id)?; fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?); current_field_id = Some(field_id); } - let current = database.get(rtxn, key)?; + let previous = database.get(rtxn, key)?; let deladd: &KvReaderDelAdd = deladd.into(); let del = deladd.get(DelAdd::Deletion); let add = deladd.get(DelAdd::Addition); - match merge_btreesets(current, del, add)? { + match merge_btreesets(previous, del, add)? { Operation::Write(value) => { match fst_merger_builder.as_mut() { Some(fst_merger_builder) => { @@ -170,7 +163,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> { let key = (field_id, normalized_facet_string); let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; - sender.write_facet(&key_bytes, &value).unwrap(); + database.put(wtxn, &key_bytes, &value)?; } Operation::Delete => { match fst_merger_builder.as_mut() { @@ -186,7 +179,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> { let key = (field_id, normalized_facet_string); let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; - sender.delete_facet(&key_bytes).unwrap(); + database.delete(wtxn, &key_bytes)?; } Operation::Ignore => (), } @@ -194,8 +187,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> { if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) { let mmap = fst_merger_builder.build(&mut callback)?; - // sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap(); - todo!("What to do"); + index.facet_id_string_fst.remap_data_type::().put(wtxn, &field_id, &mmap)?; } Ok(()) diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index c2ca08c55..430313fbd 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -18,6 +18,7 @@ pub use update_by_function::UpdateByFunction; use super::channel::*; use super::extract::*; +use super::facet_search_builder::FacetSearchBuilder; use super::merger::{FacetDatabases, FacetFieldIdsDelta}; use super::word_fst_builder::PrefixDelta; use super::words_prefix_docids::{ @@ -114,7 +115,6 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let _entered = span.enter(); facet_field_ids_delta = merge_and_send_facet_docids( - global_fields_ids_map, FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?, FacetDatabases::new(index), index, @@ -240,9 +240,6 @@ where }) })?; - let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); - let indexer_span = tracing::Span::current(); - for operation in writer_receiver { let database = operation.database(index); match operation.entry() { @@ -258,65 +255,14 @@ where /// TODO handle the panicking threads let facet_field_ids_delta = extractor_handle.join().unwrap()?; - let prefix_delta = { - let rtxn = index.read_txn()?; - let words_fst = index.words_fst(&rtxn)?; - let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; - let prefix_settings = index.prefix_settings(&rtxn)?; - word_fst_builder.with_prefix_settings(prefix_settings); - - let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::(); - let current_words = index.word_docids.iter(wtxn)?.remap_data_type::(); - for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) { - (Ok((l, _)), Ok((r, _))) => l.cmp(r), - (Err(_), _) | (_, Err(_)) => Ordering::Equal, - }) { - match eob { - EitherOrBoth::Both(lhs, rhs) => { - let (word, lhs_bytes) = lhs?; - let (_, rhs_bytes) = rhs?; - if lhs_bytes != rhs_bytes { - word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; - } - } - EitherOrBoth::Left(result) => { - let (word, _) = result?; - word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?; - } - EitherOrBoth::Right(result) => { - let (word, _) = result?; - word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; - } - } - } - - let span = tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); - let _entered = span.enter(); - - let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; - // extractor_sender.main().write_words_fst(word_fst_mmap).unwrap(); - index.main.remap_types::().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; - if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { - // extractor_sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap(); - index.main.remap_types::().put( - wtxn, - WORDS_PREFIXES_FST_KEY, - &prefixes_fst_mmap, - )?; - Some(prefix_delta) - } else { - None - } - }; - - // if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta { - // compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; - // } - - if let Some(prefix_delta) = prefix_delta { + if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { compute_prefix_database(index, wtxn, prefix_delta)?; } + compute_facet_search_database(index, wtxn, global_fields_ids_map)?; + + compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; + Result::Ok(()) })?; @@ -358,6 +304,110 @@ fn compute_prefix_database( compute_word_prefix_position_docids(wtxn, index, &modified, &deleted) } +#[tracing::instrument(level = "trace", skip_all, target = "indexing")] +fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result> { + let rtxn = index.read_txn()?; + let words_fst = index.words_fst(&rtxn)?; + let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; + let prefix_settings = index.prefix_settings(&rtxn)?; + word_fst_builder.with_prefix_settings(prefix_settings); + + let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::(); + let current_words = index.word_docids.iter(wtxn)?.remap_data_type::(); + for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) { + (Ok((l, _)), Ok((r, _))) => l.cmp(r), + (Err(_), _) | (_, Err(_)) => Ordering::Equal, + }) { + match eob { + EitherOrBoth::Both(lhs, rhs) => { + let (word, lhs_bytes) = lhs?; + let (_, rhs_bytes) = rhs?; + if lhs_bytes != rhs_bytes { + word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; + } + } + EitherOrBoth::Left(result) => { + let (word, _) = result?; + word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?; + } + EitherOrBoth::Right(result) => { + let (word, _) = result?; + word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; + } + } + } + + let span = tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); + let _entered = span.enter(); + + let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; + // extractor_sender.main().write_words_fst(word_fst_mmap).unwrap(); + index.main.remap_types::().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; + if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { + // extractor_sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap(); + index.main.remap_types::().put( + wtxn, + WORDS_PREFIXES_FST_KEY, + &prefixes_fst_mmap, + )?; + Ok(Some(prefix_delta)) + } else { + Ok(None) + } +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")] +fn compute_facet_search_database( + index: &Index, + wtxn: &mut RwTxn, + global_fields_ids_map: GlobalFieldsIdsMap, +) -> Result<()> { + let rtxn = index.read_txn()?; + let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?; + let mut facet_search_builder = FacetSearchBuilder::new( + global_fields_ids_map, + localized_attributes_rules.unwrap_or_default(), + ); + + let previous_facet_id_string_docids = index + .facet_id_string_docids + .iter(&rtxn)? + .remap_data_type::() + .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); + let current_facet_id_string_docids = index + .facet_id_string_docids + .iter(wtxn)? + .remap_data_type::() + .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); + for eob in merge_join_by( + previous_facet_id_string_docids, + current_facet_id_string_docids, + |lhs, rhs| match (lhs, rhs) { + (Ok((l, _)), Ok((r, _))) => l.cmp(r), + (Err(_), _) | (_, Err(_)) => Ordering::Equal, + }, + ) { + match eob { + EitherOrBoth::Both(lhs, rhs) => { + let (_, _) = lhs?; + let (_, _) = rhs?; + } + EitherOrBoth::Left(result) => { + let (key, _) = result?; + facet_search_builder + .register_from_key(DelAdd::Deletion, key.left_bound.as_ref())?; + } + EitherOrBoth::Right(result) => { + let (key, _) = result?; + facet_search_builder + .register_from_key(DelAdd::Addition, key.left_bound.as_ref())?; + } + } + } + + facet_search_builder.merge_and_write(index, wtxn, &rtxn) +} + #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] fn compute_facet_level_database( index: &Index, diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 7b3dd85aa..b1c5c5fd9 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -11,9 +11,7 @@ use super::channel::*; use super::extract::{ merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, }; -use super::facet_search_builder::FacetSearchBuilder; use super::DocumentChange; -use crate::update::del_add::DelAdd; use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; pub struct GeoExtractor { @@ -93,37 +91,29 @@ pub fn merge_and_send_docids<'extractor>( } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -pub fn merge_and_send_facet_docids<'indexer, 'extractor>( - global_fields_ids_map: GlobalFieldsIdsMap<'indexer>, +pub fn merge_and_send_facet_docids<'extractor>( mut caches: Vec>, database: FacetDatabases, index: &Index, docids_sender: impl DocidsSender + Sync, -) -> Result<(FacetFieldIdsDelta, FacetSearchBuilder<'indexer>)> { +) -> Result { transpose_and_freeze_caches(&mut caches)? .into_par_iter() .map(|frozen| { let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let rtxn = index.read_txn()?; - let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?; - let mut facet_search_builder = FacetSearchBuilder::new( - global_fields_ids_map.clone(), - localized_attributes_rules.unwrap_or_default(), - ); let mut buffer = Vec::new(); merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { facet_field_ids_delta.register_from_key(key); - facet_search_builder.register_from_key(DelAdd::Addition, key)?; let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); docids_sender.write(key, value).unwrap(); Ok(()) } Operation::Delete => { facet_field_ids_delta.register_from_key(key); - facet_search_builder.register_from_key(DelAdd::Deletion, key)?; docids_sender.delete(key).unwrap(); Ok(()) } @@ -131,18 +121,9 @@ pub fn merge_and_send_facet_docids<'indexer, 'extractor>( } })?; - Ok((facet_field_ids_delta, facet_search_builder)) + Ok(facet_field_ids_delta) }) - .reduce( - || Ok((FacetFieldIdsDelta::default(), todo!())), - |lhs, rhs| { - let (lhs_ffid, lhs_fsb) = lhs?; - let (rhs_ffid, rhs_fsb) = rhs?; - let ffid_merged = lhs_ffid.merge(rhs_ffid); - let fsb_merged = todo!(); - Ok((ffid_merged, fsb_merged)) - }, - ) + .reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?))) } pub struct FacetDatabases<'a> {