2024-09-04 10:55:06 +02:00
|
|
|
use std::mem;
|
2024-09-04 17:03:09 +02:00
|
|
|
use std::num::NonZeroUsize;
|
2024-08-28 18:45:16 +02:00
|
|
|
|
2024-09-03 11:02:39 +02:00
|
|
|
use grenad::{MergeFunction, Sorter};
|
2024-08-28 18:45:16 +02:00
|
|
|
use lru::LruCache;
|
|
|
|
use roaring::RoaringBitmap;
|
|
|
|
use smallvec::SmallVec;
|
|
|
|
|
2024-09-04 10:55:06 +02:00
|
|
|
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
|
|
|
use crate::CboRoaringBitmapCodec;
|
2024-08-28 18:45:16 +02:00
|
|
|
|
|
|
|
#[derive(Debug)]
|
2024-09-04 17:03:09 +02:00
|
|
|
pub struct CboCachedSorter<MF> {
|
2024-08-28 18:45:16 +02:00
|
|
|
cache: lru::LruCache<SmallVec<[u8; 20]>, DelAddRoaringBitmap>,
|
|
|
|
sorter: Sorter<MF>,
|
|
|
|
deladd_buffer: Vec<u8>,
|
|
|
|
cbo_buffer: Vec<u8>,
|
|
|
|
}
|
|
|
|
|
2024-09-04 17:03:09 +02:00
|
|
|
impl<MF> CboCachedSorter<MF> {
|
2024-08-28 18:45:16 +02:00
|
|
|
pub fn new(cap: NonZeroUsize, sorter: Sorter<MF>) -> Self {
|
2024-09-04 17:03:09 +02:00
|
|
|
CboCachedSorter {
|
2024-08-28 18:45:16 +02:00
|
|
|
cache: lru::LruCache::new(cap),
|
|
|
|
sorter,
|
|
|
|
deladd_buffer: Vec::new(),
|
|
|
|
cbo_buffer: Vec::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-09-04 17:03:09 +02:00
|
|
|
impl<MF: MergeFunction> CboCachedSorter<MF> {
|
2024-08-28 18:45:16 +02:00
|
|
|
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> {
|
|
|
|
match self.cache.get_mut(key) {
|
|
|
|
Some(DelAddRoaringBitmap { del, add: _ }) => {
|
2024-09-23 16:38:21 +02:00
|
|
|
del.get_or_insert_with(PushOptimizedBitmap::default).insert(n);
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
let value = DelAddRoaringBitmap::new_del_u32(n);
|
|
|
|
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
|
|
|
self.write_entry(key, deladd)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn insert_del(
|
|
|
|
&mut self,
|
|
|
|
key: &[u8],
|
|
|
|
bitmap: RoaringBitmap,
|
|
|
|
) -> grenad::Result<(), MF::Error> {
|
|
|
|
match self.cache.get_mut(key) {
|
|
|
|
Some(DelAddRoaringBitmap { del, add: _ }) => {
|
2024-09-23 16:38:21 +02:00
|
|
|
del.get_or_insert_with(PushOptimizedBitmap::default).union_with_bitmap(bitmap);
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
let value = DelAddRoaringBitmap::new_del(bitmap);
|
|
|
|
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
|
|
|
self.write_entry(key, deladd)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> {
|
|
|
|
match self.cache.get_mut(key) {
|
|
|
|
Some(DelAddRoaringBitmap { del: _, add }) => {
|
2024-09-23 16:38:21 +02:00
|
|
|
add.get_or_insert_with(PushOptimizedBitmap::default).insert(n);
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
let value = DelAddRoaringBitmap::new_add_u32(n);
|
|
|
|
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
|
|
|
self.write_entry(key, deladd)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn insert_add(
|
|
|
|
&mut self,
|
|
|
|
key: &[u8],
|
|
|
|
bitmap: RoaringBitmap,
|
|
|
|
) -> grenad::Result<(), MF::Error> {
|
|
|
|
match self.cache.get_mut(key) {
|
|
|
|
Some(DelAddRoaringBitmap { del: _, add }) => {
|
2024-09-23 16:38:21 +02:00
|
|
|
add.get_or_insert_with(PushOptimizedBitmap::default).union_with_bitmap(bitmap);
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
let value = DelAddRoaringBitmap::new_add(bitmap);
|
|
|
|
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
|
|
|
self.write_entry(key, deladd)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> {
|
|
|
|
match self.cache.get_mut(key) {
|
|
|
|
Some(DelAddRoaringBitmap { del, add }) => {
|
2024-09-23 16:38:21 +02:00
|
|
|
del.get_or_insert_with(PushOptimizedBitmap::default).insert(n);
|
|
|
|
add.get_or_insert_with(PushOptimizedBitmap::default).insert(n);
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
let value = DelAddRoaringBitmap::new_del_add_u32(n);
|
|
|
|
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
|
|
|
self.write_entry(key, deladd)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn write_entry<A: AsRef<[u8]>>(
|
|
|
|
&mut self,
|
|
|
|
key: A,
|
|
|
|
deladd: DelAddRoaringBitmap,
|
|
|
|
) -> grenad::Result<(), MF::Error> {
|
2024-09-04 10:55:06 +02:00
|
|
|
/// TODO we must create a serialization trait to correctly serialize bitmaps
|
2024-08-28 18:45:16 +02:00
|
|
|
self.deladd_buffer.clear();
|
|
|
|
let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer);
|
|
|
|
match deladd {
|
|
|
|
DelAddRoaringBitmap { del: Some(del), add: None } => {
|
|
|
|
self.cbo_buffer.clear();
|
2024-09-23 16:38:21 +02:00
|
|
|
CboRoaringBitmapCodec::serialize_into(&del.bitmap, &mut self.cbo_buffer);
|
2024-08-28 18:45:16 +02:00
|
|
|
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
|
|
|
|
}
|
|
|
|
DelAddRoaringBitmap { del: None, add: Some(add) } => {
|
|
|
|
self.cbo_buffer.clear();
|
2024-09-23 16:38:21 +02:00
|
|
|
CboRoaringBitmapCodec::serialize_into(&add.bitmap, &mut self.cbo_buffer);
|
2024-08-28 18:45:16 +02:00
|
|
|
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
|
|
|
|
}
|
|
|
|
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
|
|
|
|
self.cbo_buffer.clear();
|
2024-09-23 16:38:21 +02:00
|
|
|
CboRoaringBitmapCodec::serialize_into(&del.bitmap, &mut self.cbo_buffer);
|
2024-08-28 18:45:16 +02:00
|
|
|
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
|
|
|
|
|
|
|
|
self.cbo_buffer.clear();
|
2024-09-23 16:38:21 +02:00
|
|
|
CboRoaringBitmapCodec::serialize_into(&add.bitmap, &mut self.cbo_buffer);
|
2024-08-28 18:45:16 +02:00
|
|
|
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
|
|
|
|
}
|
|
|
|
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
|
|
|
|
}
|
|
|
|
let bytes = value_writer.into_inner().unwrap();
|
|
|
|
self.sorter.insert(key, bytes)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> grenad::Result<(), MF::Error> {
|
|
|
|
self.sorter.insert(key, val)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn into_sorter(mut self) -> grenad::Result<Sorter<MF>, MF::Error> {
|
|
|
|
let default_arc = LruCache::new(NonZeroUsize::MIN);
|
|
|
|
for (key, deladd) in mem::replace(&mut self.cache, default_arc) {
|
|
|
|
self.write_entry(key, deladd)?;
|
|
|
|
}
|
|
|
|
Ok(self.sorter)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct DelAddRoaringBitmap {
|
2024-09-23 16:38:21 +02:00
|
|
|
pub del: Option<PushOptimizedBitmap>,
|
|
|
|
pub add: Option<PushOptimizedBitmap>,
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl DelAddRoaringBitmap {
|
|
|
|
fn new_del_add_u32(n: u32) -> Self {
|
|
|
|
DelAddRoaringBitmap {
|
2024-09-23 16:38:21 +02:00
|
|
|
del: Some(PushOptimizedBitmap::from_single(n)),
|
|
|
|
add: Some(PushOptimizedBitmap::from_single(n)),
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn new_del(bitmap: RoaringBitmap) -> Self {
|
2024-09-23 16:38:21 +02:00
|
|
|
DelAddRoaringBitmap { del: Some(PushOptimizedBitmap::from_bitmap(bitmap)), add: None }
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
fn new_del_u32(n: u32) -> Self {
|
2024-09-23 16:38:21 +02:00
|
|
|
DelAddRoaringBitmap { del: Some(PushOptimizedBitmap::from_single(n)), add: None }
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
fn new_add(bitmap: RoaringBitmap) -> Self {
|
2024-09-23 16:38:21 +02:00
|
|
|
DelAddRoaringBitmap { del: None, add: Some(PushOptimizedBitmap::from_bitmap(bitmap)) }
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
fn new_add_u32(n: u32) -> Self {
|
2024-09-23 16:38:21 +02:00
|
|
|
DelAddRoaringBitmap { del: None, add: Some(PushOptimizedBitmap::from_single(n)) }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Default)]
|
|
|
|
struct PushOptimizedBitmap {
|
|
|
|
max: Option<u32>,
|
|
|
|
bitmap: RoaringBitmap,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PushOptimizedBitmap {
|
|
|
|
fn from_bitmap(bitmap: RoaringBitmap) -> PushOptimizedBitmap {
|
|
|
|
PushOptimizedBitmap { max: bitmap.max(), bitmap }
|
|
|
|
}
|
|
|
|
|
|
|
|
fn from_single(single: u32) -> PushOptimizedBitmap {
|
|
|
|
PushOptimizedBitmap { max: Some(single), bitmap: RoaringBitmap::from([single]) }
|
|
|
|
}
|
|
|
|
|
|
|
|
fn insert(&mut self, n: u32) {
|
|
|
|
if self.max.map_or(true, |max| n > max) {
|
|
|
|
self.max = Some(n);
|
|
|
|
self.bitmap.push(n);
|
|
|
|
} else {
|
|
|
|
self.bitmap.insert(n);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn union_with_bitmap(&mut self, bitmap: RoaringBitmap) {
|
|
|
|
self.bitmap |= bitmap;
|
|
|
|
self.max = self.bitmap.max();
|
2024-08-28 18:45:16 +02:00
|
|
|
}
|
2024-09-04 17:03:09 +02:00
|
|
|
}
|