diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 91d108c72..0e6e59e10 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -4,11 +4,13 @@ mod transform; mod typed_chunk; use std::collections::HashSet; -use std::io::{Read, Seek}; +use std::io::{Cursor, Read, Seek}; use std::iter::FromIterator; use std::num::{NonZeroU32, NonZeroUsize}; use crossbeam_channel::{Receiver, Sender}; +use heed::types::Str; +use heed::Database; use log::debug; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; @@ -20,7 +22,7 @@ pub use self::helpers::{ fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, }; -use self::helpers::{grenad_obkv_into_chunks, merge_nothing, GrenadParameters}; +use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::DocumentBatchReader; pub use crate::update::index_documents::helpers::CursorClonableMmap; @@ -28,7 +30,7 @@ use crate::update::{ self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst, }; -use crate::{Index, Result}; +use crate::{Index, Result, RoaringBitmapCodec}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 5; @@ -433,25 +435,25 @@ where }); if let Some(word_docids) = word_docids { - let mut word_docids_builder = grenad::MergerBuilder::new(merge_nothing as MergeFn); - word_docids_builder.push(word_docids.into_cursor()?); - if let Some(exact_word_docids) = exact_word_docids { - word_docids_builder.push(exact_word_docids.into_cursor()?); - } - - let word_docids_iter = word_docids_builder.build().into_stream_merger_iter()?; - // Run the word prefix docids update operation. - let mut builder = WordPrefixDocids::new( + execute_word_prefix_docids( self.wtxn, + word_docids, self.index.word_docids.clone(), self.index.word_prefix_docids.clone(), - ); - builder.chunk_compression_type = self.indexer_config.chunk_compression_type; - builder.chunk_compression_level = self.indexer_config.chunk_compression_level; - builder.max_nb_chunks = self.indexer_config.max_nb_chunks; - builder.max_memory = self.indexer_config.max_memory; - builder.execute( - word_docids_iter, + &self.indexer_config, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; + } + + if let Some(exact_word_docids) = exact_word_docids { + execute_word_prefix_docids( + self.wtxn, + exact_word_docids, + self.index.exact_word_docids.clone(), + self.index.exact_word_prefix_docids.clone(), + &self.indexer_config, &new_prefix_fst_words, &common_prefix_fst_words, &del_prefix_fst_words, @@ -516,6 +518,32 @@ where } } +/// Run the word prefix docids update operation. +fn execute_word_prefix_docids( + txn: &mut heed::RwTxn, + reader: grenad::Reader>, + word_docids_db: Database, + word_prefix_docids_db: Database, + indexer_config: &IndexerConfig, + new_prefix_fst_words: &[String], + common_prefix_fst_words: &[&[String]], + del_prefix_fst_words: &HashSet>, +) -> Result<()> { + let cursor = reader.into_cursor()?; + let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db); + builder.chunk_compression_type = indexer_config.chunk_compression_type; + builder.chunk_compression_level = indexer_config.chunk_compression_level; + builder.max_nb_chunks = indexer_config.max_nb_chunks; + builder.max_memory = indexer_config.max_memory; + builder.execute( + cursor, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; + Ok(()) +} + #[cfg(test)] mod tests { use std::io::Cursor; diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index b166812a5..2887b5583 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -23,12 +23,12 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { pub fn new( wtxn: &'t mut heed::RwTxn<'i, 'u>, word_docids: Database, - word_prefixes_docids: Database, + word_prefix_docids: Database, ) -> WordPrefixDocids<'t, 'u, 'i> { WordPrefixDocids { wtxn, word_docids, - word_prefix_docids: word_prefixes_docids, + word_prefix_docids, chunk_compression_type: CompressionType::None, chunk_compression_level: None, max_nb_chunks: None, @@ -39,7 +39,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { #[logging_timer::time("WordPrefixDocids::{}")] pub fn execute( self, - mut new_word_docids_iter: grenad::MergerIter, + mut new_word_docids_iter: grenad::ReaderCursor, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -57,7 +57,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { if !common_prefix_fst_words.is_empty() { let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((word, data)) = new_word_docids_iter.next()? { + while let Some((word, data)) = new_word_docids_iter.move_on_next()? { current_prefixes = match current_prefixes.take() { Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), _otherwise => {