diff --git a/milli/src/lib.rs b/milli/src/lib.rs index 45418c074..48b03b6cc 100644 --- a/milli/src/lib.rs +++ b/milli/src/lib.rs @@ -88,6 +88,7 @@ pub type Object = serde_json::Map; pub type Position = u32; pub type RelativePosition = u16; pub type SmallString32 = smallstr::SmallString<[u8; 32]>; +pub type Prefix = smallstr::SmallString<[u8; 16]>; pub type SmallVec16 = smallvec::SmallVec<[T; 16]>; pub type SmallVec32 = smallvec::SmallVec<[T; 32]>; pub type SmallVec8 = smallvec::SmallVec<[T; 8]>; diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index d9823096e..10c0a706b 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -8,7 +8,7 @@ use memmap2::Mmap; use super::extract::FacetKind; use super::StdResult; -use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY}; +use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::update::new::KvReaderFieldId; use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{DocumentId, Index}; @@ -257,6 +257,17 @@ impl MainSender<'_> { } } + pub fn write_words_prefixes_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> { + let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value( + WORDS_PREFIXES_FST_KEY.as_bytes(), + value, + )); + match self.0.send(WriterOperation { database: Database::Main, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Delete(KeyEntry::from_key(key)); match self.0.send(WriterOperation { database: Database::Main, entry }) { diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 57821c51a..e30333b3a 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -14,6 +14,10 @@ use super::channel::*; use super::document_change::DocumentChange; use super::extract::*; use super::merger::merge_grenad_entries; +use super::word_fst_builder::PrefixDelta; +use super::words_prefix_docids::{ + compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, +}; use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; @@ -174,7 +178,7 @@ where // TODO manage the errors correctly let current_span = tracing::Span::current(); - let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || { + let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || { let span = tracing::trace_span!(target: "indexing::documents", parent: ¤t_span, "merge"); let _entered = span.enter(); @@ -202,7 +206,20 @@ where /// TODO handle the panicking threads handle.join().unwrap()?; - handle2.join().unwrap()?; + let merger_result = merger_thread.join().unwrap()?; + + if let Some(prefix_delta) = merger_result.prefix_delta { + let span = tracing::trace_span!(target: "indexing", "prefix"); + let _entered = span.enter(); + + let PrefixDelta { modified, deleted } = prefix_delta; + // Compute word prefix docids + compute_word_prefix_docids(wtxn, index, &modified, &deleted)?; + // Compute word prefix fid docids + compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?; + // Compute word prefix position docids + compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)?; + } Ok(()) as Result<_> })?; diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 7e1a80888..0d80f75ec 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -6,15 +6,17 @@ use grenad::Merger; use heed::types::Bytes; use heed::{Database, RoTxn}; use roaring::RoaringBitmap; +use std::collections::HashSet; use super::channel::*; use super::extract::FacetKind; +use super::word_fst_builder::{PrefixData, PrefixDelta, PrefixSettings}; use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::new::word_fst_builder::WordFstBuilder; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Result}; +use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Prefix, Result}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] @@ -24,10 +26,11 @@ pub fn merge_grenad_entries( rtxn: &RoTxn, index: &Index, mut global_fields_ids_map: GlobalFieldsIdsMap<'_>, -) -> Result<()> { +) -> Result { let mut buffer: Vec = Vec::new(); let mut documents_ids = index.documents_ids(rtxn)?; let mut geo_extractor = GeoExtractor::new(rtxn, index)?; + let mut merger_result = MergerResult::default(); for merger_operation in receiver { match merger_operation { @@ -59,7 +62,15 @@ pub fn merge_grenad_entries( } MergerOperation::WordDocidsMerger(merger) => { let words_fst = index.words_fst(rtxn)?; - let mut word_fst_builder = WordFstBuilder::new(&words_fst, 4)?; + let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; + /// TODO make this configurable + let prefix_settings = PrefixSettings { + compute_prefixes: true, + max_prefix_length: 4, + prefix_count_threshold: 100, + }; + word_fst_builder.with_prefix_settings(prefix_settings); + { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); @@ -80,8 +91,12 @@ pub fn merge_grenad_entries( tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); let _entered = span.enter(); - let (word_fst_mmap, prefix_fst_mmap) = word_fst_builder.build()?; + let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, rtxn)?; sender.main().write_words_fst(word_fst_mmap).unwrap(); + if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { + sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap(); + merger_result.prefix_delta = Some(prefix_delta); + } } } MergerOperation::WordFidDocidsMerger(merger) => { @@ -185,7 +200,13 @@ pub fn merge_grenad_entries( // ... - Ok(()) + Ok(merger_result) +} + +#[derive(Default, Debug)] +pub struct MergerResult { + /// The delta of the prefixes + pub prefix_delta: Option, } pub struct GeoExtractor { diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index dedd89497..98b60378f 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -13,6 +13,7 @@ mod items_pool; mod merger; mod top_level_map; mod word_fst_builder; +mod words_prefix_docids; /// TODO move them elsewhere pub type StdResult = std::result::Result; diff --git a/milli/src/update/new/word_fst_builder.rs b/milli/src/update/new/word_fst_builder.rs index 1e02bdc3b..6c415c17e 100644 --- a/milli/src/update/new/word_fst_builder.rs +++ b/milli/src/update/new/word_fst_builder.rs @@ -2,50 +2,37 @@ use std::{fs::File, io::BufWriter}; use fst::{Set, SetBuilder, Streamer}; use memmap2::Mmap; +use std::collections::HashSet; use tempfile::tempfile; -use crate::{update::del_add::DelAdd, Result, SmallString32}; +use crate::{update::del_add::DelAdd, Prefix, Result}; pub struct WordFstBuilder<'a> { stream: Option>, word_fst_builder: SetBuilder>, - /// TODO: Replace the full memory allocation - prefix_fst_builders: Vec>>, - max_prefix_length: usize, last_word: Option>, - current_prefix: Vec, - current_prefix_count: Vec, - prefix_count_threshold: u64, + prefix_fst_builder: Option, inserted_words: usize, registered_words: usize, - base_set_length: usize, } impl<'a> WordFstBuilder<'a> { - pub fn new( - words_fst: &'a Set>, - max_prefix_length: usize, - ) -> Result { - let mut prefix_fst_builders = Vec::new(); - for _ in 0..max_prefix_length { - prefix_fst_builders.push(SetBuilder::memory()); - } - + pub fn new(words_fst: &'a Set>) -> Result { Ok(Self { stream: Some(words_fst.stream()), word_fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?, - prefix_fst_builders, - max_prefix_length, + prefix_fst_builder: None, last_word: None, - current_prefix: vec![SmallString32::new(); max_prefix_length], - current_prefix_count: vec![0; max_prefix_length], - prefix_count_threshold: 100, inserted_words: 0, registered_words: 0, - base_set_length: words_fst.len(), }) } + pub fn with_prefix_settings(&mut self, prefix_settings: PrefixSettings) -> &Self { + self.prefix_fst_builder = PrefixFstBuilder::new(prefix_settings); + self + } + pub fn register_word(&mut self, deladd: DelAdd, right: &[u8]) -> Result<()> { if deladd == DelAdd::Addition { self.registered_words += 1; @@ -85,7 +72,7 @@ impl<'a> WordFstBuilder<'a> { // If we reach this point, it means that the stream is empty // and we need to insert the incoming word - self.insert_word(right)?; + self.insert_word(right, deladd, true)?; self.stream = Some(stream); } @@ -104,26 +91,18 @@ impl<'a> WordFstBuilder<'a> { match left.cmp(right) { std::cmp::Ordering::Less => { // We need to insert the last word from the current fst - self.insert_word(left)?; + self.insert_word(left, DelAdd::Addition, false)?; left_inserted = true; } std::cmp::Ordering::Equal => { - // Addition: We insert the word - // Deletion: We delete the word by not inserting it - if deladd == DelAdd::Addition { - self.insert_word(right)?; - } + self.insert_word(right, deladd, true)?; left_inserted = true; right_inserted = true; } std::cmp::Ordering::Greater => { - // Addition: We insert the word and keep the last word - // Deletion: We keep the current word until the left word to delete is greater or equal - if deladd == DelAdd::Addition { - self.insert_word(right)?; - } + self.insert_word(right, deladd, true)?; right_inserted = true; } @@ -132,14 +111,111 @@ impl<'a> WordFstBuilder<'a> { Ok((left_inserted, right_inserted)) } - fn insert_word(&mut self, bytes: &[u8]) -> Result<()> { - self.inserted_words += 1; - self.word_fst_builder.insert(bytes)?; + fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> { + // Addition: We insert the word + // Deletion: We delete the word by not inserting it + if deladd == DelAdd::Addition { + self.inserted_words += 1; + self.word_fst_builder.insert(bytes)?; + } + if let Some(prefix_fst_builder) = self.prefix_fst_builder.as_mut() { + prefix_fst_builder.insert_word(bytes, deladd, is_modified)?; + } + + Ok(()) + } + + fn drain_stream(&mut self) -> Result<()> { + if let Some(mut stream) = self.stream.take() { + while let Some(current) = stream.next() { + self.insert_word(current, DelAdd::Addition, false)?; + } + } + + Ok(()) + } + + pub fn build( + mut self, + index: &crate::Index, + rtxn: &heed::RoTxn, + ) -> Result<(Mmap, Option)> { + self.drain_stream()?; + + /// TODO: ugly unwrap + let words_fst_file = self.word_fst_builder.into_inner()?.into_inner().unwrap(); + let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; + + let prefix_data = self + .prefix_fst_builder + .map(|prefix_fst_builder| prefix_fst_builder.build(index, rtxn)) + .transpose()?; + + Ok((words_fst_mmap, prefix_data)) + } +} + +#[derive(Debug)] +pub struct PrefixSettings { + pub prefix_count_threshold: u64, + pub max_prefix_length: usize, + pub compute_prefixes: bool, +} + +pub struct PrefixData { + pub prefixes_fst_mmap: Mmap, + pub prefix_delta: PrefixDelta, +} + +#[derive(Debug)] +pub struct PrefixDelta { + pub modified: HashSet, + pub deleted: HashSet, +} + +struct PrefixFstBuilder { + prefix_count_threshold: u64, + max_prefix_length: usize, + /// TODO: Replace the full memory allocation + prefix_fst_builders: Vec>>, + current_prefix: Vec, + current_prefix_count: Vec, + modified_prefixes: HashSet, + current_prefix_is_modified: Vec, +} + +impl PrefixFstBuilder { + pub fn new(prefix_settings: PrefixSettings) -> Option { + let PrefixSettings { prefix_count_threshold, max_prefix_length, compute_prefixes } = + prefix_settings; + + if !compute_prefixes { + return None; + } + + let mut prefix_fst_builders = Vec::new(); + for _ in 0..max_prefix_length { + prefix_fst_builders.push(SetBuilder::memory()); + } + + Some(Self { + prefix_count_threshold, + max_prefix_length, + prefix_fst_builders, + current_prefix: vec![Prefix::new(); max_prefix_length], + current_prefix_count: vec![0; max_prefix_length], + modified_prefixes: HashSet::new(), + current_prefix_is_modified: vec![false; max_prefix_length], + }) + } + + fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> { for n in 0..self.max_prefix_length { let current_prefix = &mut self.current_prefix[n]; let current_prefix_count = &mut self.current_prefix_count[n]; let builder = &mut self.prefix_fst_builders[n]; + let current_prefix_is_modified = &mut self.current_prefix_is_modified[n]; // We try to get the first n bytes out of this string but we only want // to split at valid characters bounds. If we try to split in the middle of @@ -153,43 +229,36 @@ impl<'a> WordFstBuilder<'a> { // This is the first iteration of the loop, // or the current word doesn't starts with the current prefix. if *current_prefix_count == 0 || prefix != current_prefix.as_str() { - *current_prefix = SmallString32::from(prefix); + *current_prefix = Prefix::from(prefix); *current_prefix_count = 0; + *current_prefix_is_modified = false; } - *current_prefix_count += 1; + *current_prefix_is_modified |= is_modified; + + if deladd == DelAdd::Addition { + *current_prefix_count += 1; + } // There is enough words corresponding to this prefix to add it to the cache. - /// TODO: (LEGACY) Replace this by `==` to avoid inserting several times the same prefix? - if *current_prefix_count >= self.prefix_count_threshold { + if *current_prefix_count == self.prefix_count_threshold { builder.insert(prefix)?; + + if *current_prefix_is_modified { + self.modified_prefixes.insert(current_prefix.clone()); + } } } Ok(()) } - fn drain_stream(&mut self) -> Result<()> { - if let Some(mut stream) = self.stream.take() { - while let Some(current) = stream.next() { - self.insert_word(current)?; - } - } - - Ok(()) - } - - pub fn build(mut self) -> Result<(Mmap, Mmap)> { - self.drain_stream()?; - - /// TODO: ugly unwrap - let words_fst_file = self.word_fst_builder.into_inner()?.into_inner().unwrap(); - let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; - + fn build(self, index: &crate::Index, rtxn: &heed::RoTxn) -> Result { // We merge all of the previously computed prefixes into on final set. let mut prefix_fsts = Vec::new(); - for builder in self.prefix_fst_builders { - prefix_fsts.push(builder.into_set()); + for builder in self.prefix_fst_builders.into_iter() { + let prefix_fst = builder.into_set(); + prefix_fsts.push(prefix_fst); } let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter()); let mut builder = SetBuilder::new(BufWriter::new(tempfile()?))?; @@ -197,14 +266,22 @@ impl<'a> WordFstBuilder<'a> { /// TODO: ugly unwrap let prefix_fst_file = builder.into_inner()?.into_inner().unwrap(); let prefix_fst_mmap = unsafe { Mmap::map(&prefix_fst_file)? }; + let new_prefix_fst = Set::new(&prefix_fst_mmap)?; + let old_prefix_fst = index.words_prefixes_fst(rtxn)?; + let mut deleted_prefixes = HashSet::new(); + { + let mut deleted_prefixes_stream = old_prefix_fst.op().add(&new_prefix_fst).difference(); + while let Some(prefix) = deleted_prefixes_stream.next() { + deleted_prefixes.insert(Prefix::from(std::str::from_utf8(prefix)?)); + } + } - eprintln!("================================================"); - eprintln!( - "inserted words: {}, registered words: {}, base set len: {}", - self.inserted_words, self.registered_words, self.base_set_length - ); - eprintln!("================================================"); - - Ok((words_fst_mmap, prefix_fst_mmap)) + Ok(PrefixData { + prefixes_fst_mmap: prefix_fst_mmap, + prefix_delta: PrefixDelta { + modified: self.modified_prefixes, + deleted: deleted_prefixes, + }, + }) } } diff --git a/milli/src/update/new/words_prefix_docids.rs b/milli/src/update/new/words_prefix_docids.rs new file mode 100644 index 000000000..32a22ba73 --- /dev/null +++ b/milli/src/update/new/words_prefix_docids.rs @@ -0,0 +1,108 @@ +use std::collections::HashSet; + +use heed::Database; +use heed::{types::Bytes, RwTxn}; +use roaring::RoaringBitmap; + +use crate::{CboRoaringBitmapCodec, Index, Prefix, Result}; + +struct WordPrefixDocids { + database: Database, + prefix_database: Database, +} + +impl WordPrefixDocids { + fn new( + database: Database, + prefix_database: Database, + ) -> WordPrefixDocids { + WordPrefixDocids { database, prefix_database } + } + + fn execute( + self, + wtxn: &mut heed::RwTxn, + prefix_to_compute: &HashSet, + prefix_to_delete: &HashSet, + ) -> Result<()> { + self.delete_prefixes(wtxn, prefix_to_delete)?; + self.recompute_modified_prefixes(wtxn, prefix_to_compute) + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] + fn delete_prefixes(&self, wtxn: &mut heed::RwTxn, prefixes: &HashSet) -> Result<()> { + // We remove all the entries that are no more required in this word prefix docids database. + for prefix in prefixes { + let prefix = prefix.as_bytes(); + if !self.prefix_database.delete(wtxn, prefix)? { + unreachable!("We tried to delete an unknown key") + } + } + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] + fn recompute_modified_prefixes( + &self, + wtxn: &mut RwTxn, + prefixes: &HashSet, + ) -> Result<()> { + // We fetch the docids associated to the newly added word prefix fst only. + let mut docids = RoaringBitmap::new(); + for prefix in prefixes { + docids.clear(); + let prefix = prefix.as_bytes(); + for result in self.database.prefix_iter(wtxn, prefix)? { + let (_word, data) = result?; + docids |= &data; + } + + self.prefix_database.put(wtxn, prefix, &docids)?; + } + + Ok(()) + } +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] +pub fn compute_word_prefix_docids( + wtxn: &mut RwTxn, + index: &Index, + prefix_to_compute: &HashSet, + prefix_to_delete: &HashSet, +) -> Result<()> { + WordPrefixDocids::new( + index.word_docids.remap_key_type(), + index.word_prefix_docids.remap_key_type(), + ) + .execute(wtxn, prefix_to_compute, prefix_to_delete) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] +pub fn compute_word_prefix_fid_docids( + wtxn: &mut RwTxn, + index: &Index, + prefix_to_compute: &HashSet, + prefix_to_delete: &HashSet, +) -> Result<()> { + WordPrefixDocids::new( + index.word_fid_docids.remap_key_type(), + index.word_prefix_fid_docids.remap_key_type(), + ) + .execute(wtxn, prefix_to_compute, prefix_to_delete) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] +pub fn compute_word_prefix_position_docids( + wtxn: &mut RwTxn, + index: &Index, + prefix_to_compute: &HashSet, + prefix_to_delete: &HashSet, +) -> Result<()> { + WordPrefixDocids::new( + index.word_position_docids.remap_key_type(), + index.word_prefix_position_docids.remap_key_type(), + ) + .execute(wtxn, prefix_to_compute, prefix_to_delete) +}