From a9ecbf0b64e7a11ea836f241c22d7d6840b1b3a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 6 Nov 2024 12:12:48 +0100 Subject: [PATCH] Use the Bbbul crate in the cache to better control memory --- Cargo.lock | 12 +- milli/src/update/new/extract/cache.rs | 194 +++++++++++++++++++++----- 2 files changed, 167 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 633fdca8f..b49000574 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -572,6 +572,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bitpacking" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c1d3e2bfd8d06048a179f7b17afc3188effa10385e7b00dc65af6aae732ea92" +dependencies = [ + "crunchy", +] + [[package]] name = "bitvec" version = "1.0.1" @@ -4430,9 +4439,10 @@ dependencies = [ [[package]] name = "raw-collections" version = "0.1.0" -source = "git+https://github.com/dureuill/raw-collections.git#4ab9619207632c20f4e0c2e126d9d909cc58ef65" +source = "git+https://github.com/dureuill/raw-collections.git#48801130cb16d758ba9c610b0fe25747e4d85534" dependencies = [ "allocator-api2", + "bitpacking", "bumpalo", "hashbrown 0.15.0", "serde", diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index a366435d8..63590db69 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -72,7 +72,9 @@ use bumpalo::Bump; use grenad::ReaderCursor; use hashbrown::hash_map::RawEntryMut; use hashbrown::HashMap; +use raw_collections::bbbul::{BitPacker, BitPacker4x}; use raw_collections::map::FrozenMap; +use raw_collections::{Bbbul, FrozenBbbul}; use roaring::RoaringBitmap; use rustc_hash::FxBuildHasher; @@ -130,7 +132,7 @@ impl<'extractor> BalancedCaches<'extractor> { Ok(()) } InnerCaches::Spilling(spilling) => { - spilling.insert_del_u32(&self.hasher, buckets, key, n) + spilling.insert_del_u32(&self.hasher, self.alloc, buckets, key, n) } } } @@ -147,7 +149,7 @@ impl<'extractor> BalancedCaches<'extractor> { Ok(()) } InnerCaches::Spilling(spilling) => { - spilling.insert_add_u32(&self.hasher, buckets, key, n) + spilling.insert_add_u32(&self.hasher, self.alloc, buckets, key, n) } } } @@ -165,7 +167,7 @@ impl<'extractor> BalancedCaches<'extractor> { ); let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum(); - eprintln!("The last allocated HasMap took {allocated} bytes"); + eprintln!("The last allocated HashMap took {allocated} bytes"); let dummy = NormalCaches { caches: Vec::new() }; let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy); @@ -181,6 +183,24 @@ impl<'extractor> BalancedCaches<'extractor> { .iter_mut() .enumerate() .map(|(bucket, map)| { + // safety: we are transmuting the Bbbul into a FrozenBbbul + // that are the same size. + let map = unsafe { + std::mem::transmute::< + &mut HashMap< + &[u8], + DelAddBbbul, // from this + FxBuildHasher, + &Bump, + >, + &mut HashMap< + &[u8], + FrozenDelAddBbbul, // to that + FxBuildHasher, + &Bump, + >, + >(map) + }; Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() }) }) .collect(), @@ -196,6 +216,24 @@ impl<'extractor> BalancedCaches<'extractor> { .map(BufReader::new) .map(|bufreader| grenad::Reader::new(bufreader).map_err(Into::into)) .collect::>()?; + // safety: we are transmuting the Bbbul into a FrozenBbbul + // that are the same size. + let map = unsafe { + std::mem::transmute::< + &mut HashMap< + &[u8], + DelAddBbbul, // from this + FxBuildHasher, + &Bump, + >, + &mut HashMap< + &[u8], + FrozenDelAddBbbul, // to that + FxBuildHasher, + &Bump, + >, + >(map) + }; Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled }) }) .collect(), @@ -206,7 +244,14 @@ impl<'extractor> BalancedCaches<'extractor> { unsafe impl MostlySend for BalancedCaches<'_> {} struct NormalCaches<'extractor> { - caches: Vec>, + caches: Vec< + HashMap< + &'extractor [u8], + DelAddBbbul<'extractor, BitPacker4x>, + FxBuildHasher, + &'extractor Bump, + >, + >, } impl<'extractor> NormalCaches<'extractor> { @@ -223,13 +268,13 @@ impl<'extractor> NormalCaches<'extractor> { match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { - entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n); + entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n); } RawEntryMut::Vacant(entry) => { entry.insert_hashed_nocheck( hash, alloc.alloc_slice_copy(key), - DelAddRoaringBitmap::new_del_u32(n), + DelAddBbbul::new_del_u32_in(n, alloc), ); } } @@ -247,13 +292,13 @@ impl<'extractor> NormalCaches<'extractor> { let bucket = compute_bucket_from_hash(buckets, hash); match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { - entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n); + entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n); } RawEntryMut::Vacant(entry) => { entry.insert_hashed_nocheck( hash, alloc.alloc_slice_copy(key), - DelAddRoaringBitmap::new_add_u32(n), + DelAddBbbul::new_add_u32_in(n, alloc), ); } } @@ -261,7 +306,14 @@ impl<'extractor> NormalCaches<'extractor> { } struct SpillingCaches<'extractor> { - caches: Vec>, + caches: Vec< + HashMap< + &'extractor [u8], + DelAddBbbul<'extractor, BitPacker4x>, + FxBuildHasher, + &'extractor Bump, + >, + >, spilled_entries: Vec>, deladd_buffer: Vec, cbo_buffer: Vec, @@ -270,7 +322,12 @@ struct SpillingCaches<'extractor> { impl<'extractor> SpillingCaches<'extractor> { fn from_cache_maps( caches: Vec< - HashMap<&'extractor [u8], DelAddRoaringBitmap, FxBuildHasher, &'extractor Bump>, + HashMap< + &'extractor [u8], + DelAddBbbul<'extractor, BitPacker4x>, + FxBuildHasher, + &'extractor Bump, + >, >, ) -> SpillingCaches<'extractor> { SpillingCaches { @@ -291,6 +348,7 @@ impl<'extractor> SpillingCaches<'extractor> { pub fn insert_del_u32( &mut self, hasher: &FxBuildHasher, + alloc: &'extractor Bump, buckets: usize, key: &[u8], n: u32, @@ -299,25 +357,23 @@ impl<'extractor> SpillingCaches<'extractor> { let bucket = compute_bucket_from_hash(buckets, hash); match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { - entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n); + entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n); Ok(()) } - RawEntryMut::Vacant(_entry) => { - let deladd = DelAddRoaringBitmap::new_del_u32(n); - spill_entry_to_sorter( - &mut self.spilled_entries[bucket], - &mut self.deladd_buffer, - &mut self.cbo_buffer, - key, - deladd, - ) - } + RawEntryMut::Vacant(_entry) => spill_entry_to_sorter( + &mut self.spilled_entries[bucket], + &mut self.deladd_buffer, + &mut self.cbo_buffer, + key, + DelAddRoaringBitmap::new_del_u32(n), + ), } } pub fn insert_add_u32( &mut self, hasher: &FxBuildHasher, + alloc: &'extractor Bump, buckets: usize, key: &[u8], n: u32, @@ -326,19 +382,16 @@ impl<'extractor> SpillingCaches<'extractor> { let bucket = compute_bucket_from_hash(buckets, hash); match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) { RawEntryMut::Occupied(mut entry) => { - entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n); + entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n); Ok(()) } - RawEntryMut::Vacant(_entry) => { - let deladd = DelAddRoaringBitmap::new_add_u32(n); - spill_entry_to_sorter( - &mut self.spilled_entries[bucket], - &mut self.deladd_buffer, - &mut self.cbo_buffer, - key, - deladd, - ) - } + RawEntryMut::Vacant(_entry) => spill_entry_to_sorter( + &mut self.spilled_entries[bucket], + &mut self.deladd_buffer, + &mut self.cbo_buffer, + key, + DelAddRoaringBitmap::new_add_u32(n), + ), } } } @@ -387,7 +440,13 @@ fn spill_entry_to_sorter( pub struct FrozenCache<'a, 'extractor> { bucket: usize, - cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, FxBuildHasher>, + cache: FrozenMap< + 'a, + 'extractor, + &'extractor [u8], + FrozenDelAddBbbul<'extractor, BitPacker4x>, + FxBuildHasher, + >, spilled: Vec>>, } @@ -467,7 +526,7 @@ where for (map_index, map) in maps.iter_mut().enumerate() { if first_entry.source_index != map_index { if let Some(new) = map.get_mut(first_key) { - output = output.merge(mem::take(new)); + output.append_and_clear_bbbul(new); } } } @@ -483,14 +542,15 @@ where // Then manage the content on the HashMap entries that weren't taken (mem::take). while let Some(mut map) = maps.pop() { - for (key, output) in map.iter_mut() { - let mut output = mem::take(output); + for (key, bbbul) in map.iter_mut() { + let mut output = DelAddRoaringBitmap::empty(); + output.append_and_clear_bbbul(bbbul); // Make sure we don't try to work with entries already managed by the spilled - if !output.is_empty() { + if !bbbul.is_empty() { for rhs in maps.iter_mut() { if let Some(new) = rhs.get_mut(key) { - output = output.merge(mem::take(new)); + output.append_and_clear_bbbul(new); } } @@ -530,6 +590,44 @@ impl PartialOrd for Entry { } } +pub struct DelAddBbbul<'bump, B> { + pub del: Option>, + pub add: Option>, +} + +impl<'bump, B: BitPacker> DelAddBbbul<'bump, B> { + pub fn insert_del_u32_in(&mut self, n: u32, bump: &'bump Bump) { + self.del.get_or_insert_with(|| Bbbul::new_in(bump)).insert(n); + } + + pub fn insert_add_u32_in(&mut self, n: u32, bump: &'bump Bump) { + self.add.get_or_insert_with(|| Bbbul::new_in(bump)).insert(n); + } + + pub fn new_del_u32_in(n: u32, bump: &'bump Bump) -> Self { + let mut bbbul = Bbbul::new_in(bump); + bbbul.insert(n); + DelAddBbbul { del: Some(bbbul), add: None } + } + + pub fn new_add_u32_in(n: u32, bump: &'bump Bump) -> Self { + let mut bbbul = Bbbul::new_in(bump); + bbbul.insert(n); + DelAddBbbul { del: None, add: Some(bbbul) } + } +} + +pub struct FrozenDelAddBbbul<'bump, B> { + pub del: Option>, + pub add: Option>, +} + +impl<'bump, B> FrozenDelAddBbbul<'bump, B> { + fn is_empty(&self) -> bool { + self.del.is_none() && self.add.is_none() + } +} + #[derive(Debug, Default, Clone)] pub struct DelAddRoaringBitmap { pub del: Option, @@ -578,6 +676,26 @@ impl DelAddRoaringBitmap { DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } } + pub fn append_and_clear_bbbul(&mut self, bbbul: &mut FrozenDelAddBbbul<'_, B>) { + let FrozenDelAddBbbul { del, add } = bbbul; + + if let Some(ref mut bbbul) = del.take() { + let del = self.del.get_or_insert_with(RoaringBitmap::new); + let mut iter = bbbul.iter_and_clear(); + while let Some(block) = iter.next_block() { + del.append(block.iter().copied()); + } + } + + if let Some(ref mut bbbul) = add.take() { + let add = self.add.get_or_insert_with(RoaringBitmap::new); + let mut iter = bbbul.iter_and_clear(); + while let Some(block) = iter.next_block() { + add.append(block.iter().copied()); + } + } + } + pub fn merge(self, rhs: DelAddRoaringBitmap) -> DelAddRoaringBitmap { let DelAddRoaringBitmap { del, add } = self; let DelAddRoaringBitmap { del: ndel, add: nadd } = rhs;