From 52843123d49d5b8a7903a9c6f95ae584f7e87a8c Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 5 Dec 2024 10:03:05 +0100 Subject: [PATCH] Clean up and remove the non-sorted merge_caches function --- crates/milli/src/update/new/extract/cache.rs | 103 +------------------ crates/milli/src/update/new/extract/mod.rs | 3 +- crates/milli/src/update/new/merger.rs | 8 +- 3 files changed, 8 insertions(+), 106 deletions(-) diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index 325a72280..658a3127c 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -466,110 +466,13 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>( Ok(bucket_caches) } -/// Merges the caches that must be all associated to the same bucket. +/// Merges the caches that must be all associated to the same bucket +/// but make sure to sort the different buckets before performing the merges. /// /// # Panics /// /// - If the bucket IDs in these frozen caches are not exactly the same. -pub fn merge_caches(frozen: Vec, mut f: F) -> Result<()> -where - F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, -{ - let mut maps = Vec::new(); - let mut readers = Vec::new(); - let mut current_bucket = None; - for FrozenCache { bucket, cache, ref mut spilled } in frozen { - assert_eq!(*current_bucket.get_or_insert(bucket), bucket); - maps.push(cache); - readers.append(spilled); - } - - // First manage the spilled entries by looking into the HashMaps, - // merge them and mark them as dummy. - let mut heap = BinaryHeap::new(); - for (source_index, source) in readers.into_iter().enumerate() { - let mut cursor = source.into_cursor()?; - if cursor.move_on_next()?.is_some() { - heap.push(Entry { cursor, source_index }); - } - } - - loop { - let mut first_entry = match heap.pop() { - Some(entry) => entry, - None => break, - }; - - let (first_key, first_value) = match first_entry.cursor.current() { - Some((key, value)) => (key, value), - None => break, - }; - - let mut output = DelAddRoaringBitmap::from_bytes(first_value)?; - while let Some(mut entry) = heap.peek_mut() { - if let Some((key, _value)) = entry.cursor.current() { - if first_key == key { - let new = DelAddRoaringBitmap::from_bytes(first_value)?; - output = output.merge(new); - // When we are done we the current value of this entry move make - // it move forward and let the heap reorganize itself (on drop) - if entry.cursor.move_on_next()?.is_none() { - PeekMut::pop(entry); - } - } else { - break; - } - } - } - - // Once we merged all of the spilled bitmaps we must also - // fetch the entries from the non-spilled entries (the HashMaps). - for (map_index, map) in maps.iter_mut().enumerate() { - if first_entry.source_index != map_index { - if let Some(new) = map.get_mut(first_key) { - output.union_and_clear_bbbul(new); - } - } - } - - // We send the merged entry outside. - (f)(first_key, output)?; - - // Don't forget to put the first entry back into the heap. - if first_entry.cursor.move_on_next()?.is_some() { - heap.push(first_entry) - } - } - - // Then manage the content on the HashMap entries that weren't taken (mem::take). - while let Some(mut map) = maps.pop() { - for (key, bbbul) in map.iter_mut() { - // Make sure we don't try to work with entries already managed by the spilled - if bbbul.is_empty() { - continue; - } - - let mut output = DelAddRoaringBitmap::empty(); - output.union_and_clear_bbbul(bbbul); - - for rhs in maps.iter_mut() { - if let Some(new) = rhs.get_mut(key) { - output.union_and_clear_bbbul(new); - } - } - - // We send the merged entry outside. - (f)(key, output)?; - } - } - - Ok(()) -} - -/// Merges the caches that must be all associated to the same bucket. -/// -/// It merges entries like the `merge_caches` function -pub fn merge_caches_alt(frozen: Vec, mut f: F) -> Result<()> +pub fn merge_caches_sorted(frozen: Vec, mut f: F) -> Result<()> where F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, { diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 3601dd9c6..0bdf31635 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -7,8 +7,7 @@ mod vectors; use bumpalo::Bump; pub use cache::{ - merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches, - DelAddRoaringBitmap, + merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, }; pub use documents::*; pub use faceted::*; diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 9f2aae5a8..85f5a70f7 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -9,8 +9,8 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::{ - merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches, - DelAddRoaringBitmap, FacetKind, GeoExtractorData, + merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, + FacetKind, GeoExtractorData, }; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; @@ -78,7 +78,7 @@ where if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); } - merge_caches_alt(frozen, |key, DelAddRoaringBitmap { del, add }| { + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { @@ -107,7 +107,7 @@ pub fn merge_and_send_facet_docids<'extractor>( .map(|frozen| { let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let rtxn = index.read_txn()?; - merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => {