diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 9b1c73b36..93b86617c 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -279,9 +279,9 @@ where let index_documents_ids = self.index.documents_ids(self.wtxn)?; let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); - let mut word_pair_proximity_docids = Vec::new(); - let mut word_position_docids = Vec::new(); - let mut word_docids = Vec::new(); + let mut word_pair_proximity_docids = None; + let mut word_position_docids = None; + let mut word_docids = None; let mut databases_seen = 0; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -293,17 +293,17 @@ where let typed_chunk = match result? { TypedChunk::WordDocids(chunk) => { let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - word_docids.push(cloneable_chunk); + word_docids = Some(cloneable_chunk); TypedChunk::WordDocids(chunk) } TypedChunk::WordPairProximityDocids(chunk) => { let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - word_pair_proximity_docids.push(cloneable_chunk); + word_pair_proximity_docids = Some(cloneable_chunk); TypedChunk::WordPairProximityDocids(chunk) } TypedChunk::WordPositionDocids(chunk) => { let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - word_position_docids.push(cloneable_chunk); + word_position_docids = Some(cloneable_chunk); TypedChunk::WordPositionDocids(chunk) } otherwise => otherwise, @@ -345,9 +345,9 @@ where self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; self.execute_prefix_databases( - word_docids, - word_pair_proximity_docids, - word_position_docids, + word_docids.unwrap(), + word_pair_proximity_docids.unwrap(), + word_position_docids.unwrap(), )?; Ok(all_documents_ids.len()) @@ -356,9 +356,9 @@ where #[logging_timer::time("IndexDocuments::{}")] pub fn execute_prefix_databases( self, - word_docids: Vec>, - word_pair_proximity_docids: Vec>, - word_position_docids: Vec>, + word_docids: grenad::Reader, + word_pair_proximity_docids: grenad::Reader, + word_position_docids: grenad::Reader, ) -> Result<()> where F: Fn(UpdateIndexingStep) + Sync, diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 0bb5edb9a..2baaf2f19 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; -use grenad::{CompressionType, MergerBuilder}; +use grenad::CompressionType; use heed::types::ByteSlice; use crate::update::index_documents::{ @@ -35,7 +35,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { #[logging_timer::time("WordPrefixDocids::{}")] pub fn execute( self, - new_word_docids: Vec>, + new_word_docids: grenad::Reader, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -50,15 +50,10 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { self.max_memory, ); - let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); - for reader in new_word_docids { - word_docids_merger.push(reader.into_cursor()?); - } - let mut word_docids_iter = word_docids_merger.build().into_stream_merger_iter()?; - + let mut new_word_docids_iter = new_word_docids.into_cursor()?; let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((word, data)) = word_docids_iter.next()? { + while let Some((word, data)) = new_word_docids_iter.move_on_next()? { current_prefixes = match current_prefixes.take() { Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), _otherwise => { diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index b498d5850..692dd1568 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; -use grenad::{CompressionType, MergerBuilder}; +use grenad::CompressionType; use heed::types::ByteSlice; use heed::BytesDecode; use log::debug; @@ -64,7 +64,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { #[logging_timer::time("WordPrefixPairProximityDocids::{}")] pub fn execute( self, - new_word_pair_proximity_docids: Vec>, + new_word_pair_proximity_docids: grenad::Reader, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -74,14 +74,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { let new_prefix_fst_words: Vec<_> = new_prefix_fst_words.linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); - // We retrieve and merge the created word pair proximities docids entries - // for the newly added documents. - let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - for reader in new_word_pair_proximity_docids { - wppd_merger.push(reader.into_cursor()?); - } - let mut wppd_iter = wppd_merger.build().into_stream_merger_iter()?; - + let mut new_wppd_iter = new_word_pair_proximity_docids.into_cursor()?; let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -95,7 +88,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((key, data)) = wppd_iter.next()? { + while let Some((key, data)) = new_wppd_iter.move_on_next()? { let (w1, w2, prox) = StrStrU8Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; if prox > self.max_proximity { continue; diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index 9e15f4d6c..324516325 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::num::NonZeroU32; use std::{cmp, str}; -use grenad::{CompressionType, MergerBuilder}; +use grenad::CompressionType; use heed::types::ByteSlice; use heed::{BytesDecode, BytesEncode}; use log::debug; @@ -57,7 +57,7 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { #[logging_timer::time("WordPrefixPositionDocids::{}")] pub fn execute( self, - new_word_position_docids: Vec>, + new_word_position_docids: grenad::Reader, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -72,18 +72,13 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { self.max_memory, ); - let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - for reader in new_word_position_docids { - word_position_docids_merger.push(reader.into_cursor()?); - } - let mut word_position_docids_iter = - word_position_docids_merger.build().into_stream_merger_iter()?; + let mut new_word_position_docids_iter = new_word_position_docids.into_cursor()?; // We fetch all the new common prefixes between the previous and new prefix fst. let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((key, data)) = word_position_docids_iter.next()? { + while let Some((key, data)) = new_word_position_docids_iter.move_on_next()? { let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; current_prefixes = match current_prefixes.take() {