diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 9ba81fb11..1c7f04974 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::io::{self, BufWriter}; use bincode::ErrorKind; -use fst::{Set, SetBuilder}; +use fst::{Set, SetBuilder, Streamer}; use grenad::Merger; use heed::types::Bytes; use heed::{BoxedError, Database, RoTxn}; @@ -11,8 +11,8 @@ use roaring::RoaringBitmap; use tempfile::tempfile; use super::channel::*; -use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; use super::extract::FacetKind; +use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::MergeDeladdCboRoaringBitmaps; @@ -29,7 +29,7 @@ pub fn merge_grenad_entries( index: &Index, mut global_fields_ids_map: GlobalFieldsIdsMap<'_>, ) -> Result<()> { - let mut buffer = Vec::new(); + let mut buffer: Vec = Vec::new(); let mut documents_ids = index.documents_ids(rtxn)?; let mut geo_extractor = GeoExtractor::new(rtxn, index)?; @@ -46,8 +46,7 @@ pub fn merge_grenad_entries( rtxn, &mut buffer, sender.docids::(), - |_key| Ok(()), - |_key| Ok(()), + |_, _key| Ok(()), )?; } MergerOperation::FidWordCountDocidsMerger(merger) => { @@ -59,13 +58,12 @@ pub fn merge_grenad_entries( rtxn, &mut buffer, sender.docids::(), - |_key| Ok(()), - |_key| Ok(()), + |_, _key| Ok(()), )?; } MergerOperation::WordDocidsMerger(merger) => { - let mut add_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?; - let mut del_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?; + let words_fst = index.words_fst(rtxn)?; + let mut word_fst_builder = WordFstBuilder::new(&words_fst, 4)?; { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); @@ -77,8 +75,7 @@ pub fn merge_grenad_entries( rtxn, &mut buffer, sender.docids::(), - |key| add_words_fst.insert(key), - |key| del_words_fst.insert(key), + |deladd, key| word_fst_builder.register_word(deladd, key), )?; } @@ -86,9 +83,8 @@ pub fn merge_grenad_entries( let span = tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); let _entered = span.enter(); - // Move that into a dedicated function - let words_fst = index.words_fst(rtxn)?; - let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?; + + let mmap = word_fst_builder.build()?; sender.main().write_words_fst(mmap).unwrap(); } } @@ -102,8 +98,7 @@ pub fn merge_grenad_entries( rtxn, &mut buffer, sender.docids::(), - |_key| Ok(()), - |_key| Ok(()), + |_, _key| Ok(()), )?; } MergerOperation::WordPairProximityDocidsMerger(merger) => { @@ -115,8 +110,7 @@ pub fn merge_grenad_entries( rtxn, &mut buffer, sender.docids::(), - |_key| Ok(()), - |_key| Ok(()), + |_, _key| Ok(()), )?; } MergerOperation::WordPositionDocidsMerger(merger) => { @@ -128,8 +122,7 @@ pub fn merge_grenad_entries( rtxn, &mut buffer, sender.docids::(), - |_key| Ok(()), - |_key| Ok(()), + |_, _key| Ok(()), )?; } MergerOperation::InsertDocument { docid, document } => { @@ -199,6 +192,142 @@ pub fn merge_grenad_entries( Ok(()) } +struct WordFstBuilder<'a> { + stream: fst::set::Stream<'a>, + word_fst_builder: SetBuilder>, + prefix_fst_builders: Vec>>, + max_prefix_length: usize, + last_word: Vec, +} + +impl<'a> WordFstBuilder<'a> { + pub fn new( + words_fst: &'a Set>, + max_prefix_length: usize, + ) -> Result { + let mut prefix_fst_builders = Vec::new(); + for _ in 0..max_prefix_length { + prefix_fst_builders.push(SetBuilder::new(BufWriter::new(tempfile()?))?); + } + + Ok(Self { + stream: words_fst.stream(), + word_fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?, + prefix_fst_builders, + max_prefix_length, + last_word: Vec::new(), + }) + } + + pub fn register_word(&mut self, deladd: DelAdd, key: &[u8]) -> Result<()> { + match deladd { + DelAdd::Addition => self.add_word(key), + DelAdd::Deletion => self.del_word(key), + } + } + + pub fn add_word(&mut self, word: &[u8]) -> Result<()> { + if !self.last_word.is_empty() { + let next = self.last_word.as_slice(); + match next.cmp(word) { + std::cmp::Ordering::Less => { + // We need to insert the last word from the current fst + self.word_fst_builder.insert(next)?; + self.last_word.clear(); + } + std::cmp::Ordering::Equal => { + // We insert the word and drop the last word + self.word_fst_builder.insert(next)?; + self.last_word.clear(); + return Ok(()); + } + std::cmp::Ordering::Greater => { + // We insert the word and keep the last word + self.word_fst_builder.insert(word)?; + + return Ok(()); + } + } + } + + while let Some(next) = self.stream.next() { + match next.cmp(word) { + std::cmp::Ordering::Less => { + // We need to insert the last word from the current fst + self.word_fst_builder.insert(next)?; + } + std::cmp::Ordering::Equal => { + // We insert the word + self.word_fst_builder.insert(next)?; + + return Ok(()); + } + std::cmp::Ordering::Greater => { + // We insert the word and keep the last word + self.word_fst_builder.insert(word)?; + self.last_word.clear(); + self.last_word.extend_from_slice(next); + + return Ok(()); + } + } + } + + Ok(()) + } + + pub fn del_word(&mut self, word: &[u8]) -> Result<()> { + if !self.last_word.is_empty() { + let next = self.last_word.as_slice(); + match next.cmp(word) { + std::cmp::Ordering::Less => { + // We insert the word from the current fst because the next word to delete is greater + self.word_fst_builder.insert(next)?; + self.last_word.clear(); + } + std::cmp::Ordering::Equal => { + // We delete the word by not inserting it in the new fst and drop the last word + self.last_word.clear(); + return Ok(()); + } + std::cmp::Ordering::Greater => { + // keep the current word until the next word to delete is greater or equal + return Ok(()); + } + } + } + + while let Some(next) = self.stream.next() { + match next.cmp(word) { + std::cmp::Ordering::Less => { + // We insert the word from the current fst because the next word to delete is greater + self.word_fst_builder.insert(next)?; + } + std::cmp::Ordering::Equal => { + // We delete the word by not inserting it in the new fst and drop the last word + return Ok(()); + } + std::cmp::Ordering::Greater => { + // keep the current word until the next word to delete is greater or equal + self.last_word.clear(); + self.last_word.extend_from_slice(next); + + return Ok(()); + } + } + } + + Ok(()) + } + + pub fn build(mut self) -> Result { + let words_fst_file = self.word_fst_builder.into_inner()?.into_inner().unwrap(); + let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; + + Ok(words_fst_mmap) + } +} + pub struct GeoExtractor { rtree: Option>, } @@ -247,30 +376,6 @@ impl GeoExtractor { } } -fn compute_new_words_fst( - add_words_fst: SetBuilder>, - del_words_fst: SetBuilder>, - words_fst: Set>, -) -> Result { - let add_words_fst_file = add_words_fst.into_inner()?; - let add_words_fst_mmap = unsafe { Mmap::map(&add_words_fst_file.into_inner().unwrap())? }; - let add_words_fst = Set::new(&add_words_fst_mmap)?; - - let del_words_fst_file = del_words_fst.into_inner()?; - let del_words_fst_mmap = unsafe { Mmap::map(&del_words_fst_file.into_inner().unwrap())? }; - let del_words_fst = Set::new(&del_words_fst_mmap)?; - - let diff = words_fst.op().add(&del_words_fst).difference(); - let stream = add_words_fst.op().add(diff).union(); - - let mut words_fst = SetBuilder::new(tempfile()?)?; - words_fst.extend_stream(stream)?; - let words_fst_file = words_fst.into_inner()?; - let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; - - Ok(words_fst_mmap) -} - #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] fn merge_and_send_docids( merger: Merger, @@ -278,8 +383,7 @@ fn merge_and_send_docids( rtxn: &RoTxn<'_>, buffer: &mut Vec, docids_sender: impl DocidsSender, - mut add_key: impl FnMut(&[u8]) -> fst::Result<()>, - mut del_key: impl FnMut(&[u8]) -> fst::Result<()>, + mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>, ) -> Result<()> { let mut merger_iter = merger.into_stream_merger_iter().unwrap(); while let Some((key, deladd)) = merger_iter.next().unwrap() { @@ -292,11 +396,11 @@ fn merge_and_send_docids( Operation::Write(bitmap) => { let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); docids_sender.write(key, value).unwrap(); - add_key(key)?; + register_key(DelAdd::Addition, key)?; } Operation::Delete => { docids_sender.delete(key).unwrap(); - del_key(key)?; + register_key(DelAdd::Deletion, key)?; } Operation::Ignore => (), }