diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index be077d142..658a3127c 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -466,12 +466,13 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>( Ok(bucket_caches) } -/// Merges the caches that must be all associated to the same bucket. +/// Merges the caches that must be all associated to the same bucket +/// but make sure to sort the different buckets before performing the merges. /// /// # Panics /// /// - If the bucket IDs in these frozen caches are not exactly the same. -pub fn merge_caches(frozen: Vec, mut f: F) -> Result<()> +pub fn merge_caches_sorted(frozen: Vec, mut f: F) -> Result<()> where F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, { @@ -543,12 +544,12 @@ where // Then manage the content on the HashMap entries that weren't taken (mem::take). while let Some(mut map) = maps.pop() { - for (key, bbbul) in map.iter_mut() { - // Make sure we don't try to work with entries already managed by the spilled - if bbbul.is_empty() { - continue; - } + // Make sure we don't try to work with entries already managed by the spilled + let mut ordered_entries: Vec<_> = + map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect(); + ordered_entries.sort_unstable_by_key(|(key, _)| *key); + for (key, bbbul) in ordered_entries { let mut output = DelAddRoaringBitmap::empty(); output.union_and_clear_bbbul(bbbul); diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index e67f70db1..0bdf31635 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -6,7 +6,9 @@ mod searchable; mod vectors; use bumpalo::Bump; -pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap}; +pub use cache::{ + merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, +}; pub use documents::*; pub use faceted::*; pub use geo::*; diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 512e094fb..9728f99d6 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -9,8 +9,8 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::{ - merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, - GeoExtractorData, + merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, + FacetKind, GeoExtractorData, }; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; @@ -78,7 +78,7 @@ where if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); } - merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { @@ -107,7 +107,7 @@ pub fn merge_and_send_facet_docids<'extractor>( .map(|frozen| { let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let rtxn = index.read_txn()?; - merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { diff --git a/crates/milli/src/update/new/word_fst_builder.rs b/crates/milli/src/update/new/word_fst_builder.rs index 6bc72d91d..a9a5222be 100644 --- a/crates/milli/src/update/new/word_fst_builder.rs +++ b/crates/milli/src/update/new/word_fst_builder.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::BTreeSet; use std::io::BufWriter; use fst::{Set, SetBuilder, Streamer}; @@ -75,8 +75,8 @@ pub struct PrefixData { #[derive(Debug)] pub struct PrefixDelta { - pub modified: HashSet, - pub deleted: HashSet, + pub modified: BTreeSet, + pub deleted: BTreeSet, } struct PrefixFstBuilder { @@ -86,7 +86,7 @@ struct PrefixFstBuilder { prefix_fst_builders: Vec>>, current_prefix: Vec, current_prefix_count: Vec, - modified_prefixes: HashSet, + modified_prefixes: BTreeSet, current_prefix_is_modified: Vec, } @@ -110,7 +110,7 @@ impl PrefixFstBuilder { prefix_fst_builders, current_prefix: vec![Prefix::new(); max_prefix_length], current_prefix_count: vec![0; max_prefix_length], - modified_prefixes: HashSet::new(), + modified_prefixes: BTreeSet::new(), current_prefix_is_modified: vec![false; max_prefix_length], }) } @@ -180,7 +180,7 @@ impl PrefixFstBuilder { let prefix_fst_mmap = unsafe { Mmap::map(&prefix_fst_file)? }; let new_prefix_fst = Set::new(&prefix_fst_mmap)?; let old_prefix_fst = index.words_prefixes_fst(rtxn)?; - let mut deleted_prefixes = HashSet::new(); + let mut deleted_prefixes = BTreeSet::new(); { let mut deleted_prefixes_stream = old_prefix_fst.op().add(&new_prefix_fst).difference(); while let Some(prefix) = deleted_prefixes_stream.next() { diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index 7e56beeae..bf64049c3 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::collections::HashSet; +use std::collections::BTreeSet; use std::io::{BufReader, BufWriter, Read, Seek, Write}; use hashbrown::HashMap; @@ -37,8 +37,8 @@ impl WordPrefixDocids { fn execute( self, wtxn: &mut heed::RwTxn, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, ) -> Result<()> { delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; self.recompute_modified_prefixes(wtxn, prefix_to_compute) @@ -48,7 +48,7 @@ impl WordPrefixDocids { fn recompute_modified_prefixes( &self, wtxn: &mut RwTxn, - prefixes: &HashSet, + prefixes: &BTreeSet, ) -> Result<()> { // We fetch the docids associated to the newly added word prefix fst only. // And collect the CboRoaringBitmaps pointers in an HashMap. @@ -127,7 +127,7 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> { pub fn from_prefixes( database: Database, rtxn: &'rtxn RoTxn, - prefixes: &'a HashSet, + prefixes: &'a BTreeSet, ) -> heed::Result { let database = database.remap_data_type::(); @@ -173,8 +173,8 @@ impl WordPrefixIntegerDocids { fn execute( self, wtxn: &mut heed::RwTxn, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, ) -> Result<()> { delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; self.recompute_modified_prefixes(wtxn, prefix_to_compute) @@ -184,7 +184,7 @@ impl WordPrefixIntegerDocids { fn recompute_modified_prefixes( &self, wtxn: &mut RwTxn, - prefixes: &HashSet, + prefixes: &BTreeSet, ) -> Result<()> { // We fetch the docids associated to the newly added word prefix fst only. // And collect the CboRoaringBitmaps pointers in an HashMap. @@ -262,7 +262,7 @@ impl<'a, 'rtxn> FrozenPrefixIntegerBitmaps<'a, 'rtxn> { pub fn from_prefixes( database: Database, rtxn: &'rtxn RoTxn, - prefixes: &'a HashSet, + prefixes: &'a BTreeSet, ) -> heed::Result { let database = database.remap_data_type::(); @@ -291,7 +291,7 @@ unsafe impl<'a, 'rtxn> Sync for FrozenPrefixIntegerBitmaps<'a, 'rtxn> {} fn delete_prefixes( wtxn: &mut RwTxn, prefix_database: &Database, - prefixes: &HashSet, + prefixes: &BTreeSet, ) -> Result<()> { // We remove all the entries that are no more required in this word prefix docids database. for prefix in prefixes { @@ -309,8 +309,8 @@ fn delete_prefixes( pub fn compute_word_prefix_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( @@ -325,8 +325,8 @@ pub fn compute_word_prefix_docids( pub fn compute_exact_word_prefix_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( @@ -341,8 +341,8 @@ pub fn compute_exact_word_prefix_docids( pub fn compute_word_prefix_fid_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new( @@ -357,8 +357,8 @@ pub fn compute_word_prefix_fid_docids( pub fn compute_word_prefix_position_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new(