diff --git a/milli/src/update/index_documents/cache.rs b/milli/src/update/index_documents/cache.rs index 51adf4ee5..338c564e6 100644 --- a/milli/src/update/index_documents/cache.rs +++ b/milli/src/update/index_documents/cache.rs @@ -1,8 +1,10 @@ -use std::borrow::Cow; +use std::borrow::{Borrow, Cow}; +use std::hash::Hash; +use std::iter::Chain; use std::mem; use std::num::NonZeroUsize; -use lru::LruCache; +use lru::{IntoIter, LruCache}; use roaring::RoaringBitmap; use smallvec::SmallVec; @@ -10,7 +12,7 @@ use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::CboRoaringBitmapCodec; pub struct SorterCacheDelAddCboRoaringBitmap { - cache: LruCache, DelAddRoaringBitmap>, + cache: ArcCache, DelAddRoaringBitmap>, prefix: &'static [u8; 3], sorter: grenad::Sorter, deladd_buffer: Vec, @@ -26,7 +28,7 @@ impl SorterCacheDelAddCboRoaringBitmap { conn: redis::Connection, ) -> Self { SorterCacheDelAddCboRoaringBitmap { - cache: LruCache::new(cap), + cache: ArcCache::new(cap), prefix, sorter, deladd_buffer: Vec::new(), @@ -41,15 +43,22 @@ where MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result, U>, { pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error> { - match self.cache.get_mut(key) { + let (cache, evicted) = self.cache.get_mut(key); + match cache { Some(DelAddRoaringBitmap { del, add: _ }) => { del.get_or_insert_with(RoaringBitmap::new).insert(n); - Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_u32(n)) { - Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), - None => Ok(()), - }, + None => { + let value = DelAddRoaringBitmap::new_del_u32(n); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry_to_sorter(key, deladd)?; + } + } + } + + match evicted { + Some((key, value)) => self.write_entry_to_sorter(key, value), + None => Ok(()), } } @@ -58,28 +67,42 @@ where key: &[u8], bitmap: RoaringBitmap, ) -> Result<(), grenad::Error> { - match self.cache.get_mut(key) { + let (cache, evicted) = self.cache.get_mut(key); + match cache { Some(DelAddRoaringBitmap { del, add: _ }) => { *del.get_or_insert_with(RoaringBitmap::new) |= bitmap; - Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del(bitmap)) { - Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), - None => Ok(()), - }, + None => { + let value = DelAddRoaringBitmap::new_del(bitmap); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry_to_sorter(key, deladd)?; + } + } + } + + match evicted { + Some((key, value)) => self.write_entry_to_sorter(key, value), + None => Ok(()), } } pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error> { - match self.cache.get_mut(key) { + let (cache, evicted) = self.cache.get_mut(key); + match cache { Some(DelAddRoaringBitmap { del: _, add }) => { add.get_or_insert_with(RoaringBitmap::new).insert(n); - Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add_u32(n)) { - Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), - None => Ok(()), - }, + None => { + let value = DelAddRoaringBitmap::new_add_u32(n); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry_to_sorter(key, deladd)?; + } + } + } + + match evicted { + Some((key, value)) => self.write_entry_to_sorter(key, value), + None => Ok(()), } } @@ -88,29 +111,43 @@ where key: &[u8], bitmap: RoaringBitmap, ) -> Result<(), grenad::Error> { - match self.cache.get_mut(key) { + let (cache, evicted) = self.cache.get_mut(key); + match cache { Some(DelAddRoaringBitmap { del: _, add }) => { *add.get_or_insert_with(RoaringBitmap::new) |= bitmap; - Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add(bitmap)) { - Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), - None => Ok(()), - }, + None => { + let value = DelAddRoaringBitmap::new_add(bitmap); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry_to_sorter(key, deladd)?; + } + } + } + + match evicted { + Some((key, value)) => self.write_entry_to_sorter(key, value), + None => Ok(()), } } pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error> { - match self.cache.get_mut(key) { + let (cache, evicted) = self.cache.get_mut(key); + match cache { Some(DelAddRoaringBitmap { del, add }) => { del.get_or_insert_with(RoaringBitmap::new).insert(n); add.get_or_insert_with(RoaringBitmap::new).insert(n); - Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_add_u32(n)) { - Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), - None => Ok(()), - }, + None => { + let value = DelAddRoaringBitmap::new_del_add_u32(n); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry_to_sorter(key, deladd)?; + } + } + } + + match evicted { + Some((key, value)) => self.write_entry_to_sorter(key, value), + None => Ok(()), } } @@ -155,8 +192,8 @@ where } pub fn into_sorter(mut self) -> Result, grenad::Error> { - let default_lru = LruCache::new(NonZeroUsize::MIN); - for (key, deladd) in mem::replace(&mut self.cache, default_lru) { + let default_arc = ArcCache::new(NonZeroUsize::MIN); + for (key, deladd) in mem::replace(&mut self.cache, default_arc) { self.write_entry_to_sorter(key, deladd)?; } Ok(self.sorter) @@ -192,3 +229,67 @@ impl DelAddRoaringBitmap { DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } } } + +// TODO support custom State (3rd param S of LruCache) +pub struct ArcCache { + recent_set: LruCache, + // recent_evicted: LruCache, + frequent_set: LruCache, + // frequent_evicted: LruCache, + // capacity: NonZeroUsize, + // negative means shrinking recent and increasing frequent + // positive means shrinking frequent and increasing recent + // target: isize, +} + +impl ArcCache { + pub fn new(cap: NonZeroUsize) -> Self { + ArcCache { + recent_set: LruCache::new(cap), + // recent_evicted: LruCache::new(cap), + frequent_set: LruCache::new(cap), + // frequent_evicted: LruCache::new(cap), + // capacity: cap, + // target: 0, + } + } +} + +impl ArcCache { + pub fn get_mut<'a, Q>(&'a mut self, k: &Q) -> (Option<&'a mut V>, Option<(K, V)>) + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + // Rust is too dumb to let me get_mut directly... + if self.frequent_set.contains(k) { + return (self.frequent_set.get_mut(k), None); + } + + if let Some((key, value)) = self.recent_set.pop_entry(k) { + let evicted = self.frequent_set.push(key, value); + let inserted = self.frequent_set.get_mut(k).unwrap(); + // if let Some((evicted_key, _)) = evicted.as_ref() { + // self.frequent_evicted.push(evicted_key.clone(), ()); + // } + return (Some(inserted), evicted); + } + + // TODO implement live resize of LRUs + + (None, None) + } + + pub fn push(&mut self, k: K, v: V) -> Option<(K, V)> { + self.frequent_set.push(k, v) + } +} + +impl IntoIterator for ArcCache { + type Item = (K, V); + type IntoIter = Chain, IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + self.recent_set.into_iter().chain(self.frequent_set) + } +}