From 60cc09abec465f63aab239ca0edb2a4e1b5799db Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Mon, 21 Oct 2024 09:28:49 +0200 Subject: [PATCH] Implement facet search exctraction --- milli/src/update/new/channel.rs | 54 ++++ milli/src/update/new/facet_search_builder.rs | 275 +++++++++++++++++++ milli/src/update/new/fst_merger_builder.rs | 155 +++++++++++ milli/src/update/new/merger.rs | 18 +- milli/src/update/new/mod.rs | 2 + milli/src/update/new/word_fst_builder.rs | 127 ++------- 6 files changed, 521 insertions(+), 110 deletions(-) create mode 100644 milli/src/update/new/facet_search_builder.rs create mode 100644 milli/src/update/new/fst_merger_builder.rs diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 8226046e6..d63180ba1 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -144,6 +144,8 @@ pub enum Database { FacetIdExistsDocids, FacetIdF64NumberDocids, FacetIdStringDocids, + FacetIdNormalizedStringStrings, + FacetIdStringFst, } impl Database { @@ -163,6 +165,10 @@ impl Database { Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(), Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(), Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(), + Database::FacetIdNormalizedStringStrings => { + index.facet_id_normalized_string_strings.remap_types() + } + Database::FacetIdStringFst => index.facet_id_string_fst.remap_types(), } } } @@ -240,6 +246,10 @@ impl MergerSender { DocumentsSender(self) } + pub fn facet_searchable(&self) -> FacetSearchableSender<'_> { + FacetSearchableSender { sender: self } + } + pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap( DOCUMENTS_IDS_KEY.as_bytes(), @@ -445,6 +455,50 @@ impl DocidsSender for FacetDocidsSender<'_> { } } +pub struct FacetSearchableSender<'a> { + sender: &'a MergerSender, +} + +impl FacetSearchableSender<'_> { + pub fn write_facet(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); + match self + .sender + .send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry }) + { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete_facet(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Delete(KeyEntry::from_key(key)); + match self + .sender + .send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry }) + { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn write_fst(&self, key: &[u8], value: Mmap) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(key, value)); + match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete_fst(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Delete(KeyEntry::from_key(key)); + match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} + pub struct DocumentsSender<'a>(&'a MergerSender); impl DocumentsSender<'_> { diff --git a/milli/src/update/new/facet_search_builder.rs b/milli/src/update/new/facet_search_builder.rs new file mode 100644 index 000000000..4602b5a30 --- /dev/null +++ b/milli/src/update/new/facet_search_builder.rs @@ -0,0 +1,275 @@ +use std::collections::{BTreeSet, HashMap}; + +use charabia::{normalizer::NormalizerOption, Language, Normalize, StrDetection, Token}; +use grenad::Sorter; +use heed::{ + types::{Bytes, SerdeJson}, + BytesDecode, BytesEncode, RoTxn, +}; + +use crate::{ + heed_codec::{ + facet::{FacetGroupKey, FacetGroupKeyCodec}, + StrRefCodec, + }, + update::{ + create_sorter, + del_add::{DelAdd, KvWriterDelAdd}, + MergeDeladdBtreesetString, + }, + BEU16StrCodec, FieldId, GlobalFieldsIdsMap, Index, LocalizedAttributesRule, Result, + MAX_FACET_VALUE_LENGTH, +}; + +use super::{ + channel::FacetSearchableSender, extract::FacetKind, fst_merger_builder::FstMergerBuilder, + KvReaderDelAdd, +}; + +pub struct FacetSearchBuilder<'indexer> { + registered_facets: HashMap, + normalized_facet_string_docids_sorter: Sorter, + global_fields_ids_map: GlobalFieldsIdsMap<'indexer>, + localized_attributes_rules: Vec, + // Buffered data below + buffer: Vec, + localized_field_ids: HashMap>>, +} + +impl<'indexer> FacetSearchBuilder<'indexer> { + pub fn new( + global_fields_ids_map: GlobalFieldsIdsMap<'indexer>, + localized_attributes_rules: Vec, + ) -> Self { + let registered_facets = HashMap::new(); + let normalized_facet_string_docids_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdBtreesetString, + grenad::CompressionType::None, + None, + None, + Some(0), + ); + + Self { + registered_facets, + normalized_facet_string_docids_sorter, + buffer: Vec::new(), + global_fields_ids_map, + localized_attributes_rules, + localized_field_ids: HashMap::new(), + } + } + + fn extract_key_data<'k>(&self, key: &'k [u8]) -> Result>> { + match FacetKind::from(key[0]) { + // Only strings are searchable + FacetKind::String => Ok(Some( + FacetGroupKeyCodec::::bytes_decode(&key[1..]) + .map_err(heed::Error::Encoding)?, + )), + _ => Ok(None), + } + } + + pub fn register_from_key(&mut self, deladd: DelAdd, facet_key: &[u8]) -> Result<()> { + let Some(FacetGroupKey { field_id, level: _level, left_bound }) = + self.extract_key_data(facet_key)? + else { + return Ok(()); + }; + + if deladd == DelAdd::Addition { + self.registered_facets.entry(field_id).and_modify(|count| *count += 1).or_insert(1); + } + + let locales = self.locales(field_id); + let hyper_normalized_value = normalize_facet_string(left_bound, locales.as_deref()); + + let set = BTreeSet::from_iter(std::iter::once(left_bound)); + + // as the facet string is the same, we can put the deletion and addition in the same obkv. + self.buffer.clear(); + let mut obkv = KvWriterDelAdd::new(&mut self.buffer); + let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?; + obkv.insert(deladd, val)?; + obkv.finish()?; + + let key: (u16, &str) = (field_id, hyper_normalized_value.as_ref()); + let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + self.normalized_facet_string_docids_sorter.insert(key_bytes, &self.buffer)?; + + Ok(()) + } + + fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> { + if self.localized_field_ids.get(&field_id).is_none() { + let Some(field_name) = self.global_fields_ids_map.name(field_id) else { + unreachable!("Field id {} not found in the global fields ids map", field_id); + }; + + let locales = self + .localized_attributes_rules + .iter() + .find(|rule| rule.match_str(field_name)) + .map(|rule| rule.locales.clone()); + + self.localized_field_ids.insert(field_id, locales); + } + + self.localized_field_ids.get(&field_id).unwrap().as_deref() + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_fst")] + pub fn merge_and_send( + self, + index: &Index, + rtxn: &RoTxn<'_>, + sender: FacetSearchableSender, + ) -> Result<()> { + let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?; + let mut builder = grenad::MergerBuilder::new(MergeDeladdBtreesetString); + builder.extend(reader); + + let database = index.facet_id_normalized_string_strings.remap_types::(); + + let mut merger_iter = builder.build().into_stream_merger_iter()?; + let mut current_field_id = None; + let mut fst; + let mut fst_merger_builder: Option = None; + while let Some((key, deladd)) = merger_iter.next()? { + let (field_id, normalized_facet_string) = + BEU16StrCodec::bytes_decode(&key).map_err(heed::Error::Encoding)?; + + if current_field_id != Some(field_id) { + if let Some(fst_merger_builder) = fst_merger_builder { + // send the previous fst to the channel + let mmap = fst_merger_builder.build(&mut callback)?; + sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap(); + } + + println!("getting fst for field_id: {}", field_id); + fst = index.facet_id_string_fst.get(rtxn, &field_id)?; + fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?); + current_field_id = Some(field_id); + } + + let current = database.get(rtxn, key)?; + let deladd: &KvReaderDelAdd = deladd.into(); + let del = deladd.get(DelAdd::Deletion); + let add = deladd.get(DelAdd::Addition); + + match merge_btreesets(current, del, add)? { + Operation::Write(value) => { + match fst_merger_builder.as_mut() { + Some(fst_merger_builder) => { + fst_merger_builder.register( + DelAdd::Addition, + normalized_facet_string.as_bytes(), + &mut callback, + )?; + } + None => unreachable!(), + } + let key = (field_id, normalized_facet_string); + let key_bytes = + BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + sender.write_facet(&key_bytes, &value).unwrap(); + } + Operation::Delete => { + match fst_merger_builder.as_mut() { + Some(fst_merger_builder) => { + fst_merger_builder.register( + DelAdd::Deletion, + normalized_facet_string.as_bytes(), + &mut callback, + )?; + } + None => unreachable!(), + } + let key = (field_id, normalized_facet_string); + let key_bytes = + BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + sender.delete_facet(&key_bytes).unwrap(); + } + Operation::Ignore => (), + } + } + + if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) { + let mmap = fst_merger_builder.build(&mut callback)?; + sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap(); + } + + Ok(()) + } +} + +fn callback(_bytes: &[u8], _deladd: DelAdd, _is_modified: bool) -> Result<()> { + Ok(()) +} + +fn merge_btreesets<'a>( + current: Option<&[u8]>, + del: Option<&[u8]>, + add: Option<&[u8]>, +) -> Result { + let mut result: BTreeSet = match current { + Some(current) => SerdeJson::bytes_decode(current).map_err(heed::Error::Encoding)?, + None => BTreeSet::new(), + }; + if let Some(del) = del { + let del: BTreeSet = SerdeJson::bytes_decode(del).map_err(heed::Error::Encoding)?; + result = result.difference(&del).cloned().collect(); + } + if let Some(add) = add { + let add: BTreeSet = SerdeJson::bytes_decode(add).map_err(heed::Error::Encoding)?; + result.extend(add); + } + + /// TODO remove allocation + let result = SerdeJson::bytes_encode(&result).map_err(heed::Error::Encoding)?.into_owned(); + if Some(result.as_ref()) == current { + Ok(Operation::Ignore) + } else if result.is_empty() { + Ok(Operation::Delete) + } else { + Ok(Operation::Write(result)) + } +} + +/// Normalizes the facet string and truncates it to the max length. +fn normalize_facet_string(facet_string: &str, locales: Option<&[Language]>) -> String { + let options: NormalizerOption = NormalizerOption { lossy: true, ..Default::default() }; + let mut detection = StrDetection::new(facet_string, locales); + + let script = detection.script(); + // Detect the language of the facet string only if several locales are explicitly provided. + let language = match locales { + Some(&[language]) => Some(language), + Some(multiple_locales) if multiple_locales.len() > 1 => detection.language(), + _ => None, + }; + + let token = Token { + lemma: std::borrow::Cow::Borrowed(facet_string), + script, + language, + ..Default::default() + }; + + // truncate the facet string to the max length + token + .normalize(&options) + .lemma + .char_indices() + .take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect() +} + +enum Operation { + Write(Vec), + Delete, + Ignore, +} diff --git a/milli/src/update/new/fst_merger_builder.rs b/milli/src/update/new/fst_merger_builder.rs new file mode 100644 index 000000000..9fd259ce6 --- /dev/null +++ b/milli/src/update/new/fst_merger_builder.rs @@ -0,0 +1,155 @@ +use std::{fs::File, io::BufWriter}; + +use fst::{Set, SetBuilder, Streamer}; +use memmap2::Mmap; +use tempfile::tempfile; + +use crate::{update::del_add::DelAdd, InternalError, Result}; + +pub struct FstMergerBuilder<'a> { + stream: Option>, + fst_builder: SetBuilder>, + last: Option>, + inserted_words: usize, +} + +impl<'a> FstMergerBuilder<'a> { + pub fn new>(fst: Option<&'a Set>) -> Result { + Ok(Self { + stream: fst.map(|fst| fst.stream()), + fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?, + last: None, + inserted_words: 0, + }) + } + + pub fn register( + &mut self, + deladd: DelAdd, + right: &[u8], + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<()> { + if let Some(left) = self.last.take() { + let (left_inserted, right_inserted) = + self.compare_and_insert(deladd, left.as_slice(), right, insertion_callback)?; + + // left was not inserted, so we keep it for the next iteration + if !left_inserted { + self.last = Some(left); + } + + // right was inserted, so we can stop + if right_inserted { + return Ok(()); + } + } + + if let Some(mut stream) = self.stream.take() { + while let Some(left) = stream.next() { + let (left_inserted, right_inserted) = + self.compare_and_insert(deladd, left, right, insertion_callback)?; + + // left was not inserted, so we keep it for the next iteration + if !left_inserted { + self.last = Some(left.to_vec()); + } + + // right was inserted, so we can stop + if right_inserted { + self.stream = Some(stream); + return Ok(()); + } + } + } + + // If we reach this point, it means that the stream is empty + // and we need to insert the incoming word + self.insert(right, deladd, true, insertion_callback)?; + + Ok(()) + } + + fn compare_and_insert( + &mut self, + deladd: DelAdd, + left: &[u8], + right: &[u8], + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<(bool, bool)> { + let mut left_inserted = false; + let mut right_inserted = false; + match left.cmp(right) { + std::cmp::Ordering::Less => { + // We need to insert the last word from the current fst + self.insert(left, DelAdd::Addition, false, insertion_callback)?; + + left_inserted = true; + } + std::cmp::Ordering::Equal => { + self.insert(right, deladd, true, insertion_callback)?; + + left_inserted = true; + right_inserted = true; + } + std::cmp::Ordering::Greater => { + self.insert(right, deladd, true, insertion_callback)?; + + right_inserted = true; + } + } + + Ok((left_inserted, right_inserted)) + } + + fn insert( + &mut self, + bytes: &[u8], + deladd: DelAdd, + is_modified: bool, + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<()> { + // Addition: We insert the word + // Deletion: We delete the word by not inserting it + if deladd == DelAdd::Addition { + self.inserted_words += 1; + self.fst_builder.insert(bytes)?; + } + + insertion_callback(bytes, deladd, is_modified)?; + + Ok(()) + } + + fn drain_stream( + &mut self, + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result<()> { + if let Some(last) = self.last.take() { + self.insert(last.as_slice(), DelAdd::Addition, false, insertion_callback)?; + } + + if let Some(mut stream) = self.stream.take() { + while let Some(current) = stream.next() { + self.insert(current, DelAdd::Addition, false, insertion_callback)?; + } + } + + Ok(()) + } + + pub fn build( + mut self, + insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, + ) -> Result { + self.drain_stream(insertion_callback)?; + + let fst_file = self + .fst_builder + .into_inner()? + .into_inner() + .map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?; + let fst_mmap = unsafe { Mmap::map(&fst_file)? }; + + Ok(fst_mmap) + } +} diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 6183beb63..740b215e2 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -10,13 +10,17 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::FacetKind; +use super::facet_search_builder::FacetSearchBuilder; use super::word_fst_builder::{PrefixData, PrefixDelta}; use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::new::word_fst_builder::WordFstBuilder; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; +use crate::{ + localized_attributes_rules, CboRoaringBitmapCodec, Error, FieldId, GeoPoint, + GlobalFieldsIdsMap, Index, Result, +}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] @@ -170,6 +174,12 @@ pub fn merge_grenad_entries( tracing::trace_span!(target: "indexing::documents::merge", "facet_docids"); let _entered = span.enter(); let mut facet_field_ids_delta = FacetFieldIdsDelta::new(); + let localized_attributes_rules = + index.localized_attributes_rules(rtxn)?.unwrap_or_default(); + let mut facet_search_builder = FacetSearchBuilder::new( + global_fields_ids_map.clone(), + localized_attributes_rules, + ); merge_and_send_facet_docids( merger, FacetDatabases::new(index), @@ -177,9 +187,12 @@ pub fn merge_grenad_entries( &mut buffer, sender.facet_docids(), &mut facet_field_ids_delta, + &mut facet_search_builder, )?; merger_result.facet_field_ids_delta = Some(facet_field_ids_delta); + // merge and send the facet fst and the searchable facet values + facet_search_builder.merge_and_send(index, rtxn, sender.facet_searchable())?; } } } @@ -294,6 +307,7 @@ fn merge_and_send_facet_docids( buffer: &mut Vec, docids_sender: impl DocidsSender, facet_field_ids_delta: &mut FacetFieldIdsDelta, + facet_search_builder: &mut FacetSearchBuilder, ) -> Result<()> { let mut merger_iter = merger.into_stream_merger_iter().unwrap(); while let Some((key, deladd)) = merger_iter.next().unwrap() { @@ -305,11 +319,13 @@ fn merge_and_send_facet_docids( match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { facet_field_ids_delta.register_from_key(key); + facet_search_builder.register_from_key(DelAdd::Addition, key)?; let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); docids_sender.write(key, value).unwrap(); } Operation::Delete => { facet_field_ids_delta.register_from_key(key); + facet_search_builder.register_from_key(DelAdd::Deletion, key)?; docids_sender.delete(key).unwrap(); } Operation::Ignore => (), diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 37ccc75cd..16a6dd092 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -8,6 +8,8 @@ mod channel; pub mod document; mod document_change; mod extract; +mod facet_search_builder; +mod fst_merger_builder; pub mod indexer; mod merger; mod parallel_iterator_ext; diff --git a/milli/src/update/new/word_fst_builder.rs b/milli/src/update/new/word_fst_builder.rs index 867d3e86d..834266045 100644 --- a/milli/src/update/new/word_fst_builder.rs +++ b/milli/src/update/new/word_fst_builder.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::BufWriter}; +use std::io::BufWriter; use fst::{Set, SetBuilder, Streamer}; use memmap2::Mmap; @@ -7,23 +7,19 @@ use tempfile::tempfile; use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result}; +use super::fst_merger_builder::FstMergerBuilder; + pub struct WordFstBuilder<'a> { - stream: Option>, - word_fst_builder: SetBuilder>, - last_word: Option>, + word_fst_builder: FstMergerBuilder<'a>, prefix_fst_builder: Option, - inserted_words: usize, registered_words: usize, } impl<'a> WordFstBuilder<'a> { pub fn new(words_fst: &'a Set>) -> Result { Ok(Self { - stream: Some(words_fst.stream()), - word_fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?, + word_fst_builder: FstMergerBuilder::new(Some(words_fst))?, prefix_fst_builder: None, - last_word: None, - inserted_words: 0, registered_words: 0, }) } @@ -38,100 +34,13 @@ impl<'a> WordFstBuilder<'a> { self.registered_words += 1; } - if let Some(left) = self.last_word.take() { - let (left_inserted, right_inserted) = - self.compare_and_insert(deladd, left.as_slice(), right)?; - - // left was not inserted, so we keep it for the next iteration - if !left_inserted { - self.last_word = Some(left); + self.word_fst_builder.register(deladd, right, &mut |bytes, deladd, is_modified| { + if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder { + prefix_fst_builder.insert_word(bytes, deladd, is_modified) + } else { + Ok(()) } - - // right was inserted, so we can stop - if right_inserted { - return Ok(()); - } - } - - if let Some(mut stream) = self.stream.take() { - while let Some(left) = stream.next() { - let (left_inserted, right_inserted) = - self.compare_and_insert(deladd, left, right)?; - - // left was not inserted, so we keep it for the next iteration - if !left_inserted { - self.last_word = Some(left.to_vec()); - } - - // right was inserted, so we can stop - if right_inserted { - self.stream = Some(stream); - return Ok(()); - } - } - - // If we reach this point, it means that the stream is empty - // and we need to insert the incoming word - self.insert_word(right, deladd, true)?; - - self.stream = Some(stream); - } - - Ok(()) - } - - pub fn compare_and_insert( - &mut self, - deladd: DelAdd, - left: &[u8], - right: &[u8], - ) -> Result<(bool, bool)> { - let mut left_inserted = false; - let mut right_inserted = false; - match left.cmp(right) { - std::cmp::Ordering::Less => { - // We need to insert the last word from the current fst - self.insert_word(left, DelAdd::Addition, false)?; - - left_inserted = true; - } - std::cmp::Ordering::Equal => { - self.insert_word(right, deladd, true)?; - - left_inserted = true; - right_inserted = true; - } - std::cmp::Ordering::Greater => { - self.insert_word(right, deladd, true)?; - - right_inserted = true; - } - } - - Ok((left_inserted, right_inserted)) - } - - fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> { - // Addition: We insert the word - // Deletion: We delete the word by not inserting it - if deladd == DelAdd::Addition { - self.inserted_words += 1; - self.word_fst_builder.insert(bytes)?; - } - - if let Some(prefix_fst_builder) = self.prefix_fst_builder.as_mut() { - prefix_fst_builder.insert_word(bytes, deladd, is_modified)?; - } - - Ok(()) - } - - fn drain_stream(&mut self) -> Result<()> { - if let Some(mut stream) = self.stream.take() { - while let Some(current) = stream.next() { - self.insert_word(current, DelAdd::Addition, false)?; - } - } + })?; Ok(()) } @@ -141,13 +50,13 @@ impl<'a> WordFstBuilder<'a> { index: &crate::Index, rtxn: &heed::RoTxn, ) -> Result<(Mmap, Option)> { - self.drain_stream()?; - - let words_fst_file = - self.word_fst_builder.into_inner()?.into_inner().map_err(|_| { - InternalError::IndexingMergingKeys { process: "building-words-fst" } - })?; - let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; + let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| { + if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder { + prefix_fst_builder.insert_word(bytes, deladd, is_modified) + } else { + Ok(()) + } + })?; let prefix_data = self .prefix_fst_builder