diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 8226046e6..d63180ba1 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -144,6 +144,8 @@ pub enum Database { FacetIdExistsDocids, FacetIdF64NumberDocids, FacetIdStringDocids, + FacetIdNormalizedStringStrings, + FacetIdStringFst, } impl Database { @@ -163,6 +165,10 @@ impl Database { Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(), Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(), Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(), + Database::FacetIdNormalizedStringStrings => { + index.facet_id_normalized_string_strings.remap_types() + } + Database::FacetIdStringFst => index.facet_id_string_fst.remap_types(), } } } @@ -240,6 +246,10 @@ impl MergerSender { DocumentsSender(self) } + pub fn facet_searchable(&self) -> FacetSearchableSender<'_> { + FacetSearchableSender { sender: self } + } + pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap( DOCUMENTS_IDS_KEY.as_bytes(), @@ -445,6 +455,50 @@ impl DocidsSender for FacetDocidsSender<'_> { } } +pub struct FacetSearchableSender<'a> { + sender: &'a MergerSender, +} + +impl FacetSearchableSender<'_> { + pub fn write_facet(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); + match self + .sender + .send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry }) + { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete_facet(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Delete(KeyEntry::from_key(key)); + match self + .sender + .send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry }) + { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn write_fst(&self, key: &[u8], value: Mmap) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(key, value)); + match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete_fst(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Delete(KeyEntry::from_key(key)); + match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} + pub struct DocumentsSender<'a>(&'a MergerSender); impl DocumentsSender<'_> { diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index c76ab49d0..a5cbd3700 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,9 +1,11 @@ use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; +use std::mem::size_of; use std::num::NonZero; use std::ops::DerefMut as _; +use bumpalo::collections::vec::Vec as BumpVec; use bumpalo::Bump; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; @@ -118,30 +120,33 @@ impl WordDocidsCachedSorters { word: &str, exact: bool, docid: u32, - buffer: &mut Vec, + bump: &Bump, ) -> Result<()> { - let key = word.as_bytes(); + let word_bytes = word.as_bytes(); if exact { - self.exact_word_docids.insert_add_u32(key, docid)?; + self.exact_word_docids.insert_add_u32(word_bytes, docid)?; } else { - self.word_docids.insert_add_u32(key, docid)?; + self.word_docids.insert_add_u32(word_bytes, docid)?; } + let buffer_size = word_bytes.len() + 1 + size_of::(); + let mut buffer = BumpVec::with_capacity_in(buffer_size, bump); + buffer.clear(); - buffer.extend_from_slice(word.as_bytes()); + buffer.extend_from_slice(word_bytes); buffer.push(0); buffer.extend_from_slice(&field_id.to_be_bytes()); - self.word_fid_docids.insert_add_u32(buffer, docid)?; + self.word_fid_docids.insert_add_u32(&buffer, docid)?; let position = bucketed_position(position); buffer.clear(); - buffer.extend_from_slice(word.as_bytes()); + buffer.extend_from_slice(word_bytes); buffer.push(0); buffer.extend_from_slice(&position.to_be_bytes()); - self.word_position_docids.insert_add_u32(buffer, docid)?; + self.word_position_docids.insert_add_u32(&buffer, docid)?; if self.current_docid.map_or(false, |id| docid != id) { - self.flush_fid_word_count(buffer)?; + self.flush_fid_word_count(&mut buffer)?; } self.fid_word_count @@ -160,30 +165,33 @@ impl WordDocidsCachedSorters { word: &str, exact: bool, docid: u32, - buffer: &mut Vec, + bump: &Bump, ) -> Result<()> { - let key = word.as_bytes(); + let word_bytes = word.as_bytes(); if exact { - self.exact_word_docids.insert_del_u32(key, docid)?; + self.exact_word_docids.insert_del_u32(word_bytes, docid)?; } else { - self.word_docids.insert_del_u32(key, docid)?; + self.word_docids.insert_del_u32(word_bytes, docid)?; } + let buffer_size = word_bytes.len() + 1 + size_of::(); + let mut buffer = BumpVec::with_capacity_in(buffer_size, bump); + buffer.clear(); - buffer.extend_from_slice(word.as_bytes()); + buffer.extend_from_slice(word_bytes); buffer.push(0); buffer.extend_from_slice(&field_id.to_be_bytes()); - self.word_fid_docids.insert_del_u32(buffer, docid)?; + self.word_fid_docids.insert_del_u32(&buffer, docid)?; let position = bucketed_position(position); buffer.clear(); - buffer.extend_from_slice(word.as_bytes()); + buffer.extend_from_slice(word_bytes); buffer.push(0); buffer.extend_from_slice(&position.to_be_bytes()); - self.word_position_docids.insert_del_u32(buffer, docid)?; + self.word_position_docids.insert_del_u32(&buffer, docid)?; if self.current_docid.map_or(false, |id| docid != id) { - self.flush_fid_word_count(buffer)?; + self.flush_fid_word_count(&mut buffer)?; } self.fid_word_count @@ -195,7 +203,7 @@ impl WordDocidsCachedSorters { Ok(()) } - fn flush_fid_word_count(&mut self, buffer: &mut Vec) -> Result<()> { + fn flush_fid_word_count(&mut self, buffer: &mut BumpVec) -> Result<()> { for (fid, (current_count, new_count)) in self.fid_word_count.drain() { if current_count != new_count { if current_count <= MAX_COUNTED_WORDS { @@ -420,11 +428,11 @@ impl WordDocidsExtractors { let cached_sorter = cached_sorter.deref_mut(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let new_fields_ids_map = new_fields_ids_map.deref_mut(); + let doc_alloc = &context.doc_alloc; let exact_attributes = index.exact_attributes(rtxn)?; let is_exact_attribute = |fname: &str| exact_attributes.iter().any(|attr| contained_in(fname, attr)); - let mut buffer = Vec::new(); match document_change { DocumentChange::Deletion(inner) => { let mut token_fn = |fname: &str, fid, pos, word: &str| { @@ -435,7 +443,7 @@ impl WordDocidsExtractors { word, is_exact_attribute(fname), inner.docid(), - &mut buffer, + doc_alloc, ) .map_err(crate::Error::from) }; @@ -454,7 +462,7 @@ impl WordDocidsExtractors { word, is_exact_attribute(fname), inner.docid(), - &mut buffer, + doc_alloc, ) .map_err(crate::Error::from) }; @@ -472,7 +480,7 @@ impl WordDocidsExtractors { word, is_exact_attribute(fname), inner.docid(), - &mut buffer, + doc_alloc, ) .map_err(crate::Error::from) }; @@ -491,7 +499,7 @@ impl WordDocidsExtractors { word, is_exact_attribute(fname), inner.docid(), - &mut buffer, + doc_alloc, ) .map_err(crate::Error::from) }; @@ -503,6 +511,8 @@ impl WordDocidsExtractors { } } + let buffer_size = size_of::(); + let mut buffer = BumpVec::with_capacity_in(buffer_size, &context.doc_alloc); cached_sorter.flush_fid_word_count(&mut buffer) } diff --git a/milli/src/update/new/facet_search_builder.rs b/milli/src/update/new/facet_search_builder.rs new file mode 100644 index 000000000..4602b5a30 --- /dev/null +++ b/milli/src/update/new/facet_search_builder.rs @@ -0,0 +1,275 @@ +use std::collections::{BTreeSet, HashMap}; + +use charabia::{normalizer::NormalizerOption, Language, Normalize, StrDetection, Token}; +use grenad::Sorter; +use heed::{ + types::{Bytes, SerdeJson}, + BytesDecode, BytesEncode, RoTxn, +}; + +use crate::{ + heed_codec::{ + facet::{FacetGroupKey, FacetGroupKeyCodec}, + StrRefCodec, + }, + update::{ + create_sorter, + del_add::{DelAdd, KvWriterDelAdd}, + MergeDeladdBtreesetString, + }, + BEU16StrCodec, FieldId, GlobalFieldsIdsMap, Index, LocalizedAttributesRule, Result, + MAX_FACET_VALUE_LENGTH, +}; + +use super::{ + channel::FacetSearchableSender, extract::FacetKind, fst_merger_builder::FstMergerBuilder, + KvReaderDelAdd, +}; + +pub struct FacetSearchBuilder<'indexer> { + registered_facets: HashMap, + normalized_facet_string_docids_sorter: Sorter, + global_fields_ids_map: GlobalFieldsIdsMap<'indexer>, + localized_attributes_rules: Vec, + // Buffered data below + buffer: Vec, + localized_field_ids: HashMap>>, +} + +impl<'indexer> FacetSearchBuilder<'indexer> { + pub fn new( + global_fields_ids_map: GlobalFieldsIdsMap<'indexer>, + localized_attributes_rules: Vec, + ) -> Self { + let registered_facets = HashMap::new(); + let normalized_facet_string_docids_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdBtreesetString, + grenad::CompressionType::None, + None, + None, + Some(0), + ); + + Self { + registered_facets, + normalized_facet_string_docids_sorter, + buffer: Vec::new(), + global_fields_ids_map, + localized_attributes_rules, + localized_field_ids: HashMap::new(), + } + } + + fn extract_key_data<'k>(&self, key: &'k [u8]) -> Result>> { + match FacetKind::from(key[0]) { + // Only strings are searchable + FacetKind::String => Ok(Some( + FacetGroupKeyCodec::::bytes_decode(&key[1..]) + .map_err(heed::Error::Encoding)?, + )), + _ => Ok(None), + } + } + + pub fn register_from_key(&mut self, deladd: DelAdd, facet_key: &[u8]) -> Result<()> { + let Some(FacetGroupKey { field_id, level: _level, left_bound }) = + self.extract_key_data(facet_key)? + else { + return Ok(()); + }; + + if deladd == DelAdd::Addition { + self.registered_facets.entry(field_id).and_modify(|count| *count += 1).or_insert(1); + } + + let locales = self.locales(field_id); + let hyper_normalized_value = normalize_facet_string(left_bound, locales.as_deref()); + + let set = BTreeSet::from_iter(std::iter::once(left_bound)); + + // as the facet string is the same, we can put the deletion and addition in the same obkv. + self.buffer.clear(); + let mut obkv = KvWriterDelAdd::new(&mut self.buffer); + let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?; + obkv.insert(deladd, val)?; + obkv.finish()?; + + let key: (u16, &str) = (field_id, hyper_normalized_value.as_ref()); + let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + self.normalized_facet_string_docids_sorter.insert(key_bytes, &self.buffer)?; + + Ok(()) + } + + fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> { + if self.localized_field_ids.get(&field_id).is_none() { + let Some(field_name) = self.global_fields_ids_map.name(field_id) else { + unreachable!("Field id {} not found in the global fields ids map", field_id); + }; + + let locales = self + .localized_attributes_rules + .iter() + .find(|rule| rule.match_str(field_name)) + .map(|rule| rule.locales.clone()); + + self.localized_field_ids.insert(field_id, locales); + } + + self.localized_field_ids.get(&field_id).unwrap().as_deref() + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_fst")] + pub fn merge_and_send( + self, + index: &Index, + rtxn: &RoTxn<'_>, + sender: FacetSearchableSender, + ) -> Result<()> { + let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?; + let mut builder = grenad::MergerBuilder::new(MergeDeladdBtreesetString); + builder.extend(reader); + + let database = index.facet_id_normalized_string_strings.remap_types::(); + + let mut merger_iter = builder.build().into_stream_merger_iter()?; + let mut current_field_id = None; + let mut fst; + let mut fst_merger_builder: Option = None; + while let Some((key, deladd)) = merger_iter.next()? { + let (field_id, normalized_facet_string) = + BEU16StrCodec::bytes_decode(&key).map_err(heed::Error::Encoding)?; + + if current_field_id != Some(field_id) { + if let Some(fst_merger_builder) = fst_merger_builder { + // send the previous fst to the channel + let mmap = fst_merger_builder.build(&mut callback)?; + sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap(); + } + + println!("getting fst for field_id: {}", field_id); + fst = index.facet_id_string_fst.get(rtxn, &field_id)?; + fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?); + current_field_id = Some(field_id); + } + + let current = database.get(rtxn, key)?; + let deladd: &KvReaderDelAdd = deladd.into(); + let del = deladd.get(DelAdd::Deletion); + let add = deladd.get(DelAdd::Addition); + + match merge_btreesets(current, del, add)? { + Operation::Write(value) => { + match fst_merger_builder.as_mut() { + Some(fst_merger_builder) => { + fst_merger_builder.register( + DelAdd::Addition, + normalized_facet_string.as_bytes(), + &mut callback, + )?; + } + None => unreachable!(), + } + let key = (field_id, normalized_facet_string); + let key_bytes = + BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + sender.write_facet(&key_bytes, &value).unwrap(); + } + Operation::Delete => { + match fst_merger_builder.as_mut() { + Some(fst_merger_builder) => { + fst_merger_builder.register( + DelAdd::Deletion, + normalized_facet_string.as_bytes(), + &mut callback, + )?; + } + None => unreachable!(), + } + let key = (field_id, normalized_facet_string); + let key_bytes = + BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + sender.delete_facet(&key_bytes).unwrap(); + } + Operation::Ignore => (), + } + } + + if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) { + let mmap = fst_merger_builder.build(&mut callback)?; + sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap(); + } + + Ok(()) + } +} + +fn callback(_bytes: &[u8], _deladd: DelAdd, _is_modified: bool) -> Result<()> { + Ok(()) +} + +fn merge_btreesets<'a>( + current: Option<&[u8]>, + del: Option<&[u8]>, + add: Option<&[u8]>, +) -> Result { + let mut result: BTreeSet = match current { + Some(current) => SerdeJson::bytes_decode(current).map_err(heed::Error::Encoding)?, + None => BTreeSet::new(), + }; + if let Some(del) = del { + let del: BTreeSet = SerdeJson::bytes_decode(del).map_err(heed::Error::Encoding)?; + result = result.difference(&del).cloned().collect(); + } + if let Some(add) = add { + let add: BTreeSet = SerdeJson::bytes_decode(add).map_err(heed::Error::Encoding)?; + result.extend(add); + } + + /// TODO remove allocation + let result = SerdeJson::bytes_encode(&result).map_err(heed::Error::Encoding)?.into_owned(); + if Some(result.as_ref()) == current { + Ok(Operation::Ignore) + } else if result.is_empty() { + Ok(Operation::Delete) + } else { + Ok(Operation::Write(result)) + } +} + +/// Normalizes the facet string and truncates it to the max length. +fn normalize_facet_string(facet_string: &str, locales: Option<&[Language]>) -> String { + let options: NormalizerOption = NormalizerOption { lossy: true, ..Default::default() }; + let mut detection = StrDetection::new(facet_string, locales); + + let script = detection.script(); + // Detect the language of the facet string only if several locales are explicitly provided. + let language = match locales { + Some(&[language]) => Some(language), + Some(multiple_locales) if multiple_locales.len() > 1 => detection.language(), + _ => None, + }; + + let token = Token { + lemma: std::borrow::Cow::Borrowed(facet_string), + script, + language, + ..Default::default() + }; + + // truncate the facet string to the max length + token + .normalize(&options) + .lemma + .char_indices() + .take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect() +} + +enum Operation { + Write(Vec), + Delete, + Ignore, +} diff --git a/milli/src/update/new/fst_merger_builder.rs b/milli/src/update/new/fst_merger_builder.rs new file mode 100644 index 000000000..9fd259ce6 --- /dev/null +++ b/milli/src/update/new/fst_merger_builder.rs @@ -0,0 +1,155 @@ +use std::{fs::File, io::BufWriter}; + +use fst::{Set, SetBuilder, Streamer}; +use memmap2::Mmap; +use tempfile::tempfile; + +use crate::{update::del_add::DelAdd, InternalError, Result}; + +pub struct FstMergerBuilder<'a> { + stream: Option>, + fst_builder: SetBuilder>, + last: Option>, + inserted_words: usize, +} + +impl<'a> FstMergerBuilder<'a> { + pub fn new>(fst: Option<&'a Set>) -> Result { + Ok(Self { + stream: fst.map(|fst| fst.stream()), + fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?, + last: None, + inserted_words: 0, + }) + } + + pub fn register( + &mut self, + deladd: DelAdd, + right: &[u8], + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<()> { + if let Some(left) = self.last.take() { + let (left_inserted, right_inserted) = + self.compare_and_insert(deladd, left.as_slice(), right, insertion_callback)?; + + // left was not inserted, so we keep it for the next iteration + if !left_inserted { + self.last = Some(left); + } + + // right was inserted, so we can stop + if right_inserted { + return Ok(()); + } + } + + if let Some(mut stream) = self.stream.take() { + while let Some(left) = stream.next() { + let (left_inserted, right_inserted) = + self.compare_and_insert(deladd, left, right, insertion_callback)?; + + // left was not inserted, so we keep it for the next iteration + if !left_inserted { + self.last = Some(left.to_vec()); + } + + // right was inserted, so we can stop + if right_inserted { + self.stream = Some(stream); + return Ok(()); + } + } + } + + // If we reach this point, it means that the stream is empty + // and we need to insert the incoming word + self.insert(right, deladd, true, insertion_callback)?; + + Ok(()) + } + + fn compare_and_insert( + &mut self, + deladd: DelAdd, + left: &[u8], + right: &[u8], + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<(bool, bool)> { + let mut left_inserted = false; + let mut right_inserted = false; + match left.cmp(right) { + std::cmp::Ordering::Less => { + // We need to insert the last word from the current fst + self.insert(left, DelAdd::Addition, false, insertion_callback)?; + + left_inserted = true; + } + std::cmp::Ordering::Equal => { + self.insert(right, deladd, true, insertion_callback)?; + + left_inserted = true; + right_inserted = true; + } + std::cmp::Ordering::Greater => { + self.insert(right, deladd, true, insertion_callback)?; + + right_inserted = true; + } + } + + Ok((left_inserted, right_inserted)) + } + + fn insert( + &mut self, + bytes: &[u8], + deladd: DelAdd, + is_modified: bool, + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<()> { + // Addition: We insert the word + // Deletion: We delete the word by not inserting it + if deladd == DelAdd::Addition { + self.inserted_words += 1; + self.fst_builder.insert(bytes)?; + } + + insertion_callback(bytes, deladd, is_modified)?; + + Ok(()) + } + + fn drain_stream( + &mut self, + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<()> { + if let Some(last) = self.last.take() { + self.insert(last.as_slice(), DelAdd::Addition, false, insertion_callback)?; + } + + if let Some(mut stream) = self.stream.take() { + while let Some(current) = stream.next() { + self.insert(current, DelAdd::Addition, false, insertion_callback)?; + } + } + + Ok(()) + } + + pub fn build( + mut self, + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result { + self.drain_stream(insertion_callback)?; + + let fst_file = self + .fst_builder + .into_inner()? + .into_inner() + .map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?; + let fst_mmap = unsafe { Mmap::map(&fst_file)? }; + + Ok(fst_mmap) + } +} diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 6183beb63..740b215e2 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -10,13 +10,17 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::FacetKind; +use super::facet_search_builder::FacetSearchBuilder; use super::word_fst_builder::{PrefixData, PrefixDelta}; use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::new::word_fst_builder::WordFstBuilder; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; +use crate::{ + localized_attributes_rules, CboRoaringBitmapCodec, Error, FieldId, GeoPoint, + GlobalFieldsIdsMap, Index, Result, +}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] @@ -170,6 +174,12 @@ pub fn merge_grenad_entries( tracing::trace_span!(target: "indexing::documents::merge", "facet_docids"); let _entered = span.enter(); let mut facet_field_ids_delta = FacetFieldIdsDelta::new(); + let localized_attributes_rules = + index.localized_attributes_rules(rtxn)?.unwrap_or_default(); + let mut facet_search_builder = FacetSearchBuilder::new( + global_fields_ids_map.clone(), + localized_attributes_rules, + ); merge_and_send_facet_docids( merger, FacetDatabases::new(index), @@ -177,9 +187,12 @@ pub fn merge_grenad_entries( &mut buffer, sender.facet_docids(), &mut facet_field_ids_delta, + &mut facet_search_builder, )?; merger_result.facet_field_ids_delta = Some(facet_field_ids_delta); + // merge and send the facet fst and the searchable facet values + facet_search_builder.merge_and_send(index, rtxn, sender.facet_searchable())?; } } } @@ -294,6 +307,7 @@ fn merge_and_send_facet_docids( buffer: &mut Vec, docids_sender: impl DocidsSender, facet_field_ids_delta: &mut FacetFieldIdsDelta, + facet_search_builder: &mut FacetSearchBuilder, ) -> Result<()> { let mut merger_iter = merger.into_stream_merger_iter().unwrap(); while let Some((key, deladd)) = merger_iter.next().unwrap() { @@ -305,11 +319,13 @@ fn merge_and_send_facet_docids( match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { facet_field_ids_delta.register_from_key(key); + facet_search_builder.register_from_key(DelAdd::Addition, key)?; let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); docids_sender.write(key, value).unwrap(); } Operation::Delete => { facet_field_ids_delta.register_from_key(key); + facet_search_builder.register_from_key(DelAdd::Deletion, key)?; docids_sender.delete(key).unwrap(); } Operation::Ignore => (), diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 37ccc75cd..16a6dd092 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -8,6 +8,8 @@ mod channel; pub mod document; mod document_change; mod extract; +mod facet_search_builder; +mod fst_merger_builder; pub mod indexer; mod merger; mod parallel_iterator_ext; diff --git a/milli/src/update/new/word_fst_builder.rs b/milli/src/update/new/word_fst_builder.rs index 867d3e86d..834266045 100644 --- a/milli/src/update/new/word_fst_builder.rs +++ b/milli/src/update/new/word_fst_builder.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::BufWriter}; +use std::io::BufWriter; use fst::{Set, SetBuilder, Streamer}; use memmap2::Mmap; @@ -7,23 +7,19 @@ use tempfile::tempfile; use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result}; +use super::fst_merger_builder::FstMergerBuilder; + pub struct WordFstBuilder<'a> { - stream: Option>, - word_fst_builder: SetBuilder>, - last_word: Option>, + word_fst_builder: FstMergerBuilder<'a>, prefix_fst_builder: Option, - inserted_words: usize, registered_words: usize, } impl<'a> WordFstBuilder<'a> { pub fn new(words_fst: &'a Set>) -> Result { Ok(Self { - stream: Some(words_fst.stream()), - word_fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?, + word_fst_builder: FstMergerBuilder::new(Some(words_fst))?, prefix_fst_builder: None, - last_word: None, - inserted_words: 0, registered_words: 0, }) } @@ -38,100 +34,13 @@ impl<'a> WordFstBuilder<'a> { self.registered_words += 1; } - if let Some(left) = self.last_word.take() { - let (left_inserted, right_inserted) = - self.compare_and_insert(deladd, left.as_slice(), right)?; - - // left was not inserted, so we keep it for the next iteration - if !left_inserted { - self.last_word = Some(left); + self.word_fst_builder.register(deladd, right, &mut |bytes, deladd, is_modified| { + if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder { + prefix_fst_builder.insert_word(bytes, deladd, is_modified) + } else { + Ok(()) } - - // right was inserted, so we can stop - if right_inserted { - return Ok(()); - } - } - - if let Some(mut stream) = self.stream.take() { - while let Some(left) = stream.next() { - let (left_inserted, right_inserted) = - self.compare_and_insert(deladd, left, right)?; - - // left was not inserted, so we keep it for the next iteration - if !left_inserted { - self.last_word = Some(left.to_vec()); - } - - // right was inserted, so we can stop - if right_inserted { - self.stream = Some(stream); - return Ok(()); - } - } - - // If we reach this point, it means that the stream is empty - // and we need to insert the incoming word - self.insert_word(right, deladd, true)?; - - self.stream = Some(stream); - } - - Ok(()) - } - - pub fn compare_and_insert( - &mut self, - deladd: DelAdd, - left: &[u8], - right: &[u8], - ) -> Result<(bool, bool)> { - let mut left_inserted = false; - let mut right_inserted = false; - match left.cmp(right) { - std::cmp::Ordering::Less => { - // We need to insert the last word from the current fst - self.insert_word(left, DelAdd::Addition, false)?; - - left_inserted = true; - } - std::cmp::Ordering::Equal => { - self.insert_word(right, deladd, true)?; - - left_inserted = true; - right_inserted = true; - } - std::cmp::Ordering::Greater => { - self.insert_word(right, deladd, true)?; - - right_inserted = true; - } - } - - Ok((left_inserted, right_inserted)) - } - - fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> { - // Addition: We insert the word - // Deletion: We delete the word by not inserting it - if deladd == DelAdd::Addition { - self.inserted_words += 1; - self.word_fst_builder.insert(bytes)?; - } - - if let Some(prefix_fst_builder) = self.prefix_fst_builder.as_mut() { - prefix_fst_builder.insert_word(bytes, deladd, is_modified)?; - } - - Ok(()) - } - - fn drain_stream(&mut self) -> Result<()> { - if let Some(mut stream) = self.stream.take() { - while let Some(current) = stream.next() { - self.insert_word(current, DelAdd::Addition, false)?; - } - } + })?; Ok(()) } @@ -141,13 +50,13 @@ impl<'a> WordFstBuilder<'a> { index: &crate::Index, rtxn: &heed::RoTxn, ) -> Result<(Mmap, Option)> { - self.drain_stream()?; - - let words_fst_file = - self.word_fst_builder.into_inner()?.into_inner().map_err(|_| { - InternalError::IndexingMergingKeys { process: "building-words-fst" } - })?; - let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; + let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| { + if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder { + prefix_fst_builder.insert_word(bytes, deladd, is_modified) + } else { + Ok(()) + } + })?; let prefix_data = self .prefix_fst_builder