diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index ae8e28b33..77b761e6e 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -285,6 +285,7 @@ where let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); let mut word_pair_proximity_docids = Vec::new(); + let mut word_docids = Vec::new(); let mut databases_seen = 0; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -294,6 +295,19 @@ where for result in lmdb_writer_rx { let typed_chunk = match result? { + TypedChunk::WordDocids(chunk) => { + // We extract and mmap our chunk file to be able to get it for next processes. + let mut file = chunk.into_inner(); + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); + let chunk = grenad::Reader::new(cursor_mmap)?; + word_docids.push(chunk); + + // We reconstruct our typed-chunk back. + file.rewind()?; + let chunk = grenad::Reader::new(file)?; + TypedChunk::WordDocids(chunk) + } TypedChunk::WordPairProximityDocids(chunk) => { // We extract and mmap our chunk file to be able to get it for next processes. let mut file = chunk.into_inner(); @@ -345,7 +359,7 @@ where let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases(word_pair_proximity_docids)?; + self.execute_prefix_databases(word_docids, word_pair_proximity_docids)?; Ok(all_documents_ids.len()) } @@ -353,6 +367,7 @@ where #[logging_timer::time("IndexDocuments::{}")] pub fn execute_prefix_databases( self, + word_docids: Vec>, word_pair_proximity_docids: Vec>, ) -> Result<()> where @@ -404,7 +419,7 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute()?; + builder.execute(word_docids, &previous_words_prefixes_fst)?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 30dabf1ae..0703707f0 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,11 +1,12 @@ -use std::str; +use std::collections::HashMap; -use fst::Streamer; -use grenad::CompressionType; -use heed::types::ByteSlice; +use fst::IntoStreamer; +use grenad::{CompressionType, MergerBuilder}; +use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod, + create_sorter, fst_stream_into_hashset, merge_roaring_bitmaps, sorter_into_lmdb_database, + CursorClonableMmap, MergeFn, WriteMethod, }; use crate::{Index, Result}; @@ -34,11 +35,18 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixDocids::{}")] - pub fn execute(self) -> Result<()> { - // Clear the word prefix docids database. - self.index.word_prefix_docids.clear(self.wtxn)?; - + pub fn execute>( + self, + new_word_docids: Vec>, + old_prefix_fst: &fst::Set, + ) -> Result<()> { let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + let prefix_fst_keys = prefix_fst.into_stream().into_strs()?; + let prefix_fst_keys: Vec<_> = + prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); + + // We compute the set of prefixes that are no more part of the prefix fst. + let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); // It is forbidden to keep a mutable reference into the database // and write into it at the same time, therefore we write into another file. @@ -50,18 +58,46 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { self.max_memory, ); - // We iterate over all the prefixes and retrieve the corresponding docids. - let mut prefix_stream = prefix_fst.stream(); - while let Some(bytes) = prefix_stream.next() { - let prefix = str::from_utf8(bytes)?; - let db = self.index.word_docids.remap_data_type::(); - for result in db.prefix_iter(self.wtxn, prefix)? { - let (_word, data) = result?; - prefix_docids_sorter.insert(prefix, data)?; + let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); + word_docids_merger.extend(new_word_docids); + let mut word_docids_iter = word_docids_merger.build().into_merger_iter()?; + + let mut current_prefixes: Option<&&[String]> = None; + let mut prefixes_cache = HashMap::new(); + while let Some((word, data)) = word_docids_iter.next()? { + current_prefixes = match current_prefixes.take() { + Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), + _otherwise => { + write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; + prefix_fst_keys + .iter() + .find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) + } + }; + + if let Some(prefixes) = current_prefixes { + for prefix in prefixes.iter() { + if word.starts_with(prefix.as_bytes()) { + match prefixes_cache.get_mut(prefix.as_bytes()) { + Some(value) => value.push(data.to_owned()), + None => { + prefixes_cache.insert(prefix.clone().into(), vec![data.to_owned()]); + } + } + } + } } } - drop(prefix_fst); + // We remove all the entries that are no more required in this word prefix docids database. + let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data(); + while let Some((prefix, _)) = iter.next().transpose()? { + if suppr_pw.contains(prefix.as_bytes()) { + unsafe { iter.del_current()? }; + } + } + + drop(iter); // We finally write the word prefix docids into the LMDB database. sorter_into_lmdb_database( @@ -69,9 +105,22 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { *self.index.word_prefix_docids.as_polymorph(), prefix_docids_sorter, merge_roaring_bitmaps, - WriteMethod::Append, + WriteMethod::GetMergePut, )?; Ok(()) } } + +fn write_prefixes_in_sorter( + prefixes: &mut HashMap, Vec>>, + sorter: &mut grenad::Sorter, +) -> Result<()> { + for (key, data_slices) in prefixes.drain() { + for data in data_slices { + sorter.insert(&key, data)?; + } + } + + Ok(()) +}