diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs index 570d292ef..790cdd028 100644 --- a/milli/src/update/del_add.rs +++ b/milli/src/update/del_add.rs @@ -30,6 +30,24 @@ impl Key for DelAdd { } } +// TODO remove this implementation +impl obkv2::Key for DelAdd { + const BYTES_SIZE: usize = std::mem::size_of::(); + type BYTES = [u8; ::BYTES_SIZE]; + + fn to_be_bytes(&self) -> Self::BYTES { + u8::to_be_bytes(*self as u8) + } + + fn from_be_bytes(array: Self::BYTES) -> Self { + match u8::from_be_bytes(array) { + 0 => Self::Deletion, + 1 => Self::Addition, + otherwise => unreachable!("DelAdd has only 2 variants, unknown variant: {}", otherwise), + } + } +} + /// Creates a Kv> from Kv /// /// Deletion: put all the values under DelAdd::Deletion diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 0dd2d9935..4123e568c 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,3 +1,5 @@ +use std::fs::File; + use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; use heed::types::Bytes; @@ -6,31 +8,73 @@ use super::StdResult; use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. -pub fn merge_writer_channel(cap: usize) -> WriterChannels { +pub fn merger_writer_channels(cap: usize) -> MergerWriterChannels { let (sender, receiver) = crossbeam_channel::bounded(cap); - WriterChannels { + MergerWriterChannels { writer_receiver: WriterReceiver(receiver), merger_sender: MergerSender(sender.clone()), document_sender: DocumentSender(sender), } } -pub struct WriterChannels { +pub struct MergerWriterChannels { pub writer_receiver: WriterReceiver, pub merger_sender: MergerSender, pub document_sender: DocumentSender, } +/// The capacity of the channel is currently in number of messages. +pub fn extractors_merger_channels(cap: usize) -> ExtractorsMergerChannels { + let (sender, receiver) = crossbeam_channel::bounded(cap); + + ExtractorsMergerChannels { + merger_receiver: MergerReceiver(receiver), + deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender(sender.clone()), + } +} + +pub struct ExtractorsMergerChannels { + pub merger_receiver: MergerReceiver, + pub deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender, +} + pub struct KeyValueEntry { - pub key_length: u16, - pub data: Box<[u8]>, + key_length: usize, + data: Box<[u8]>, } impl KeyValueEntry { - pub fn entry(&self) -> (&[u8], &[u8]) { - self.data.split_at(self.key_length as usize) + pub fn from_key_value(key: &[u8], value: &[u8]) -> Self { + let mut data = Vec::with_capacity(key.len() + value.len()); + data.extend_from_slice(key); + data.extend_from_slice(value); + + KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } } + + pub fn entry(&self) -> (&[u8], &[u8]) { + self.data.split_at(self.key_length) + } +} + +pub struct KeyEntry { + data: Box<[u8]>, +} + +impl KeyEntry { + pub fn from_key(key: &[u8]) -> Self { + KeyEntry { data: key.to_vec().into_boxed_slice() } + } + + pub fn entry(&self) -> &[u8] { + self.data.as_ref() + } +} + +enum EntryOperation { + Delete(KeyEntry), + Write(KeyValueEntry), } pub struct DocumentEntry { @@ -54,14 +98,14 @@ impl DocumentEntry { } pub enum WriterOperation { - WordDocIds(KeyValueEntry), + WordDocids(EntryOperation), Document(DocumentEntry), } impl WriterOperation { pub fn database(&self, index: &Index) -> heed::Database { match self { - WriterOperation::WordDocIds(_) => index.word_docids.remap_types(), + WriterOperation::WordDocids(_) => index.word_docids.remap_types(), WriterOperation::Document(_) => index.documents.remap_types(), } } @@ -77,17 +121,58 @@ impl WriterReceiver { pub struct MergerSender(Sender); +impl MergerSender { + pub fn word_docids(&self) -> WordDocidsSender<'_> { + WordDocidsSender(&self.0) + } +} + +pub struct WordDocidsSender<'a>(&'a Sender); + +impl WordDocidsSender<'_> { + pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { + let operation = EntryOperation::Write(KeyValueEntry::from_key_value(key, value)); + match self.0.send(WriterOperation::WordDocids(operation)) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + let operation = EntryOperation::Delete(KeyEntry::from_key(key)); + match self.0.send(WriterOperation::WordDocids(operation)) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} + #[derive(Clone)] pub struct DocumentSender(Sender); impl DocumentSender { - pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError> { + pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError<()>> { match self.0.send(WriterOperation::Document(document)) { Ok(()) => Ok(()), - Err(SendError(wop)) => match wop { - WriterOperation::Document(entry) => Err(SendError(entry)), - _ => unreachable!(), - }, + Err(SendError(_)) => Err(SendError(())), } } } + +pub enum MergerOperation { + WordDocidsCursors(Vec>), +} + +pub struct MergerReceiver(Receiver); + +impl IntoIterator for MergerReceiver { + type Item = MergerOperation; + type IntoIter = crossbeam_channel::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +#[derive(Clone)] +pub struct DeladdCboRoaringBitmapSender(Sender); diff --git a/milli/src/update/new/merge/del_add_roaring_bitmap_merger.rs b/milli/src/update/new/merge/del_add_roaring_bitmap_merger.rs new file mode 100644 index 000000000..5e6310170 --- /dev/null +++ b/milli/src/update/new/merge/del_add_roaring_bitmap_merger.rs @@ -0,0 +1,61 @@ +use std::borrow::Cow; +use std::io; + +use grenad2::MergeFunction; +use roaring::RoaringBitmap; + +use crate::update::del_add::DelAdd; +use crate::update::new::indexer::{KvReaderDelAdd, KvWriterDelAdd}; + +/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv +/// separately and outputs a new DelAdd with both unions. +pub struct DelAddRoaringBitmapMerger; + +impl MergeFunction for DelAddRoaringBitmapMerger { + type Error = io::Error; + + fn merge<'a>( + &self, + _key: &[u8], + values: &[Cow<'a, [u8]>], + ) -> std::result::Result, Self::Error> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + // Retrieve the bitmaps from both sides + let mut del_bitmaps_bytes = Vec::new(); + let mut add_bitmaps_bytes = Vec::new(); + for value in values { + let obkv: &KvReaderDelAdd = value.as_ref().into(); + if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) { + del_bitmaps_bytes.push(bitmap_bytes); + } + if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) { + add_bitmaps_bytes.push(bitmap_bytes); + } + } + + let mut output_deladd_obkv = KvWriterDelAdd::memory(); + + // Deletion + let mut buffer = Vec::new(); + let mut merged = RoaringBitmap::new(); + for bytes in del_bitmaps_bytes { + merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?; + } + merged.serialize_into(&mut buffer)?; + output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?; + + // Addition + buffer.clear(); + merged.clear(); + for bytes in add_bitmaps_bytes { + merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?; + } + merged.serialize_into(&mut buffer)?; + output_deladd_obkv.insert(DelAdd::Addition, &buffer)?; + + output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) + } + } +} diff --git a/milli/src/update/new/merge/mod.rs b/milli/src/update/new/merge/mod.rs new file mode 100644 index 000000000..6057b8d89 --- /dev/null +++ b/milli/src/update/new/merge/mod.rs @@ -0,0 +1,3 @@ +mod del_add_roaring_bitmap_merger; + +pub use del_add_roaring_bitmap_merger::DelAddRoaringBitmapMerger; diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 726153c53..24e9c95db 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -2,6 +2,7 @@ mod document_change; // mod extract; mod channel; mod items_pool; +mod merge; mod global_fields_ids_map; @@ -22,18 +23,25 @@ mod indexer { use roaring::RoaringBitmap; use serde_json::Value; + use super::channel::{MergerReceiver, MergerSender}; use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::items_pool::ItemsPool; + use super::merge; use crate::documents::{ obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, }; + use crate::update::del_add::DelAdd; + use crate::update::new::channel::MergerOperation; use crate::update::{AvailableDocumentsIds, IndexDocumentsMethod}; use crate::{ - DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError, Result, UserError, + CboRoaringBitmapCodec, DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError, + Result, UserError, }; pub type KvReaderFieldId = obkv2::KvReader; + pub type KvReaderDelAdd = obkv2::KvReader; pub type KvWriterFieldId = obkv2::KvWriter; + pub type KvWriterDelAdd = obkv2::KvWriter; pub struct DocumentOperationIndexer { operations: Vec, @@ -355,7 +363,101 @@ mod indexer { pub struct UpdateByFunctionIndexer; - // fn + enum Operation { + Write(RoaringBitmap), + Delete, + Ignore, + } + + /// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap. + fn merge_cbo_bitmaps( + current: Option<&[u8]>, + del: Option<&[u8]>, + add: Option<&[u8]>, + ) -> Result { + let bitmap = match current { + Some(current_bitmap_bytes) => { + let bitmap_without_del = match del { + Some(del_bytes) => { + let del_bitmap = CboRoaringBitmapCodec::deserialize_from(del_bytes)?; + CboRoaringBitmapCodec::intersection_with_serialized( + current_bitmap_bytes, + &del_bitmap, + )? + } + None => CboRoaringBitmapCodec::deserialize_from(current_bitmap_bytes)?, + }; + + match add { + Some(add_bytes) => { + let add = CboRoaringBitmapCodec::deserialize_from(add_bytes)?; + bitmap_without_del | add + } + None => bitmap_without_del, + } + } + None => match add { + Some(add_bytes) => CboRoaringBitmapCodec::deserialize_from(add_bytes)?, + None => return Ok(Operation::Ignore), + }, + }; + + if bitmap.is_empty() { + Ok(Operation::Delete) + } else { + Ok(Operation::Write(bitmap)) + } + } + + /// Return the slice directly from the serialize_into method + fn cbo_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec) -> &'b [u8] { + buffer.clear(); + CboRoaringBitmapCodec::serialize_into(bitmap, buffer); + buffer.as_slice() + } + + /// TODO We must return some infos/stats + fn merge_grenad_entries( + receiver: MergerReceiver, + sender: MergerSender, + rtxn: &RoTxn, + index: &Index, + ) -> Result<()> { + let mut buffer = Vec::new(); + + for merger_operation in receiver { + match merger_operation { + MergerOperation::WordDocidsCursors(cursors) => { + let sender = sender.word_docids(); + let database = index.word_docids.remap_types::(); + + let mut builder = grenad2::MergerBuilder::new(merge::DelAddRoaringBitmapMerger); + builder.extend(cursors); + /// TODO manage the error correctly + let mut merger_iter = builder.build().into_stream_merger_iter().unwrap(); + + // TODO manage the error correctly + while let Some((key, deladd)) = merger_iter.next().unwrap() { + let current = database.get(rtxn, key)?; + let deladd: &KvReaderDelAdd = deladd.into(); + let del = deladd.get(DelAdd::Deletion); + let add = deladd.get(DelAdd::Addition); + + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + let value = cbo_serialize_into_vec(&bitmap, &mut buffer); + sender.write(key, value).unwrap(); + } + Operation::Delete => sender.delete(key).unwrap(), + Operation::Ignore => (), + } + } + } + } + } + + Ok(()) + } /// Reads the previous version of a document from the database, the new versions /// in the grenad update files and merges them to generate a new boxed obkv.