Fix the cache to serialize entries correctly

This commit is contained in:
Clément Renault 2024-09-04 10:55:06 +02:00
parent 781a186f75
commit 3b82d8b5b9
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,13 +1,13 @@
use std::borrow::Cow;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::{io, mem}; use std::mem;
use grenad::{MergeFunction, Sorter}; use grenad::{MergeFunction, Sorter};
use lru::LruCache; use lru::LruCache;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use smallvec::SmallVec; use smallvec::SmallVec;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::CboRoaringBitmapCodec;
#[derive(Debug)] #[derive(Debug)]
pub struct CachedSorter<MF> { pub struct CachedSorter<MF> {
@ -123,26 +123,27 @@ impl<MF: MergeFunction> CachedSorter<MF> {
key: A, key: A,
deladd: DelAddRoaringBitmap, deladd: DelAddRoaringBitmap,
) -> grenad::Result<(), MF::Error> { ) -> grenad::Result<(), MF::Error> {
/// TODO we must create a serialization trait to correctly serialize bitmaps
self.deladd_buffer.clear(); self.deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer); let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer);
match deladd { match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => { DelAddRoaringBitmap { del: Some(del), add: None } => {
self.cbo_buffer.clear(); self.cbo_buffer.clear();
RoaringBitmap::serialize_into(&del, &mut self.cbo_buffer)?; CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
} }
DelAddRoaringBitmap { del: None, add: Some(add) } => { DelAddRoaringBitmap { del: None, add: Some(add) } => {
self.cbo_buffer.clear(); self.cbo_buffer.clear();
RoaringBitmap::serialize_into(&add, &mut self.cbo_buffer)?; CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
} }
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => { DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
self.cbo_buffer.clear(); self.cbo_buffer.clear();
RoaringBitmap::serialize_into(&del, &mut self.cbo_buffer)?; CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
self.cbo_buffer.clear(); self.cbo_buffer.clear();
RoaringBitmap::serialize_into(&add, &mut self.cbo_buffer)?; CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
} }
DelAddRoaringBitmap { del: None, add: None } => return Ok(()), DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
@ -193,56 +194,4 @@ impl DelAddRoaringBitmap {
fn new_add_u32(n: u32) -> Self { fn new_add_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
} }
} }
/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions.
pub struct DelAddRoaringBitmapMerger;
impl MergeFunction for DelAddRoaringBitmapMerger {
type Error = io::Error;
fn merge<'a>(
&self,
_key: &[u8],
values: &[Cow<'a, [u8]>],
) -> std::result::Result<Cow<'a, [u8]>, Self::Error> {
if values.len() == 1 {
Ok(values[0].clone())
} else {
// Retrieve the bitmaps from both sides
let mut del_bitmaps_bytes = Vec::new();
let mut add_bitmaps_bytes = Vec::new();
for value in values {
let obkv: &KvReaderDelAdd = value.as_ref().into();
if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) {
del_bitmaps_bytes.push(bitmap_bytes);
}
if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) {
add_bitmaps_bytes.push(bitmap_bytes);
}
}
let mut output_deladd_obkv = KvWriterDelAdd::memory();
// Deletion
let mut buffer = Vec::new();
let mut merged = RoaringBitmap::new();
for bytes in del_bitmaps_bytes {
merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?;
}
merged.serialize_into(&mut buffer)?;
output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?;
// Addition
buffer.clear();
merged.clear();
for bytes in add_bitmaps_bytes {
merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?;
}
output_deladd_obkv.insert(DelAdd::Addition, &buffer)?;
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
}
}
}