Introduce typed channels and the merger loop

This commit is contained in:
Clément Renault 2024-08-29 17:51:42 +02:00
parent 874c1ac538
commit 45c060831e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 285 additions and 16 deletions

View File

@ -30,6 +30,24 @@ impl Key for DelAdd {
} }
} }
// TODO remove this implementation
impl obkv2::Key for DelAdd {
const BYTES_SIZE: usize = std::mem::size_of::<DelAdd>();
type BYTES = [u8; <Self as obkv2::Key>::BYTES_SIZE];
fn to_be_bytes(&self) -> Self::BYTES {
u8::to_be_bytes(*self as u8)
}
fn from_be_bytes(array: Self::BYTES) -> Self {
match u8::from_be_bytes(array) {
0 => Self::Deletion,
1 => Self::Addition,
otherwise => unreachable!("DelAdd has only 2 variants, unknown variant: {}", otherwise),
}
}
}
/// Creates a Kv<K, Kv<DelAdd, value>> from Kv<K, value> /// Creates a Kv<K, Kv<DelAdd, value>> from Kv<K, value>
/// ///
/// Deletion: put all the values under DelAdd::Deletion /// Deletion: put all the values under DelAdd::Deletion

View File

@ -1,3 +1,5 @@
use std::fs::File;
use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
use heed::types::Bytes; use heed::types::Bytes;
@ -6,31 +8,73 @@ use super::StdResult;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
pub fn merge_writer_channel(cap: usize) -> WriterChannels { pub fn merger_writer_channels(cap: usize) -> MergerWriterChannels {
let (sender, receiver) = crossbeam_channel::bounded(cap); let (sender, receiver) = crossbeam_channel::bounded(cap);
WriterChannels { MergerWriterChannels {
writer_receiver: WriterReceiver(receiver), writer_receiver: WriterReceiver(receiver),
merger_sender: MergerSender(sender.clone()), merger_sender: MergerSender(sender.clone()),
document_sender: DocumentSender(sender), document_sender: DocumentSender(sender),
} }
} }
pub struct WriterChannels { pub struct MergerWriterChannels {
pub writer_receiver: WriterReceiver, pub writer_receiver: WriterReceiver,
pub merger_sender: MergerSender, pub merger_sender: MergerSender,
pub document_sender: DocumentSender, pub document_sender: DocumentSender,
} }
/// The capacity of the channel is currently in number of messages.
pub fn extractors_merger_channels(cap: usize) -> ExtractorsMergerChannels {
let (sender, receiver) = crossbeam_channel::bounded(cap);
ExtractorsMergerChannels {
merger_receiver: MergerReceiver(receiver),
deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender(sender.clone()),
}
}
pub struct ExtractorsMergerChannels {
pub merger_receiver: MergerReceiver,
pub deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender,
}
pub struct KeyValueEntry { pub struct KeyValueEntry {
pub key_length: u16, key_length: usize,
pub data: Box<[u8]>, data: Box<[u8]>,
} }
impl KeyValueEntry { impl KeyValueEntry {
pub fn entry(&self) -> (&[u8], &[u8]) { pub fn from_key_value(key: &[u8], value: &[u8]) -> Self {
self.data.split_at(self.key_length as usize) let mut data = Vec::with_capacity(key.len() + value.len());
data.extend_from_slice(key);
data.extend_from_slice(value);
KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
} }
pub fn entry(&self) -> (&[u8], &[u8]) {
self.data.split_at(self.key_length)
}
}
pub struct KeyEntry {
data: Box<[u8]>,
}
impl KeyEntry {
pub fn from_key(key: &[u8]) -> Self {
KeyEntry { data: key.to_vec().into_boxed_slice() }
}
pub fn entry(&self) -> &[u8] {
self.data.as_ref()
}
}
enum EntryOperation {
Delete(KeyEntry),
Write(KeyValueEntry),
} }
pub struct DocumentEntry { pub struct DocumentEntry {
@ -54,14 +98,14 @@ impl DocumentEntry {
} }
pub enum WriterOperation { pub enum WriterOperation {
WordDocIds(KeyValueEntry), WordDocids(EntryOperation),
Document(DocumentEntry), Document(DocumentEntry),
} }
impl WriterOperation { impl WriterOperation {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> { pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
match self { match self {
WriterOperation::WordDocIds(_) => index.word_docids.remap_types(), WriterOperation::WordDocids(_) => index.word_docids.remap_types(),
WriterOperation::Document(_) => index.documents.remap_types(), WriterOperation::Document(_) => index.documents.remap_types(),
} }
} }
@ -77,17 +121,58 @@ impl WriterReceiver {
pub struct MergerSender(Sender<WriterOperation>); pub struct MergerSender(Sender<WriterOperation>);
impl MergerSender {
pub fn word_docids(&self) -> WordDocidsSender<'_> {
WordDocidsSender(&self.0)
}
}
pub struct WordDocidsSender<'a>(&'a Sender<WriterOperation>);
impl WordDocidsSender<'_> {
pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
let operation = EntryOperation::Write(KeyValueEntry::from_key_value(key, value));
match self.0.send(WriterOperation::WordDocids(operation)) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let operation = EntryOperation::Delete(KeyEntry::from_key(key));
match self.0.send(WriterOperation::WordDocids(operation)) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct DocumentSender(Sender<WriterOperation>); pub struct DocumentSender(Sender<WriterOperation>);
impl DocumentSender { impl DocumentSender {
pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError<DocumentEntry>> { pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError<()>> {
match self.0.send(WriterOperation::Document(document)) { match self.0.send(WriterOperation::Document(document)) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(SendError(wop)) => match wop { Err(SendError(_)) => Err(SendError(())),
WriterOperation::Document(entry) => Err(SendError(entry)),
_ => unreachable!(),
},
} }
} }
} }
pub enum MergerOperation {
WordDocidsCursors(Vec<grenad2::ReaderCursor<File>>),
}
pub struct MergerReceiver(Receiver<MergerOperation>);
impl IntoIterator for MergerReceiver {
type Item = MergerOperation;
type IntoIter = crossbeam_channel::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(Clone)]
pub struct DeladdCboRoaringBitmapSender(Sender<MergerOperation>);

View File

@ -0,0 +1,61 @@
use std::borrow::Cow;
use std::io;
use grenad2::MergeFunction;
use roaring::RoaringBitmap;
use crate::update::del_add::DelAdd;
use crate::update::new::indexer::{KvReaderDelAdd, KvWriterDelAdd};
/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions.
pub struct DelAddRoaringBitmapMerger;
impl MergeFunction for DelAddRoaringBitmapMerger {
type Error = io::Error;
fn merge<'a>(
&self,
_key: &[u8],
values: &[Cow<'a, [u8]>],
) -> std::result::Result<Cow<'a, [u8]>, Self::Error> {
if values.len() == 1 {
Ok(values[0].clone())
} else {
// Retrieve the bitmaps from both sides
let mut del_bitmaps_bytes = Vec::new();
let mut add_bitmaps_bytes = Vec::new();
for value in values {
let obkv: &KvReaderDelAdd = value.as_ref().into();
if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) {
del_bitmaps_bytes.push(bitmap_bytes);
}
if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) {
add_bitmaps_bytes.push(bitmap_bytes);
}
}
let mut output_deladd_obkv = KvWriterDelAdd::memory();
// Deletion
let mut buffer = Vec::new();
let mut merged = RoaringBitmap::new();
for bytes in del_bitmaps_bytes {
merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?;
}
merged.serialize_into(&mut buffer)?;
output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?;
// Addition
buffer.clear();
merged.clear();
for bytes in add_bitmaps_bytes {
merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?;
}
merged.serialize_into(&mut buffer)?;
output_deladd_obkv.insert(DelAdd::Addition, &buffer)?;
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
}
}
}

View File

@ -0,0 +1,3 @@
mod del_add_roaring_bitmap_merger;
pub use del_add_roaring_bitmap_merger::DelAddRoaringBitmapMerger;

View File

@ -2,6 +2,7 @@ mod document_change;
// mod extract; // mod extract;
mod channel; mod channel;
mod items_pool; mod items_pool;
mod merge;
mod global_fields_ids_map; mod global_fields_ids_map;
@ -22,18 +23,25 @@ mod indexer {
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::Value; use serde_json::Value;
use super::channel::{MergerReceiver, MergerSender};
use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::document_change::{Deletion, DocumentChange, Insertion, Update};
use super::items_pool::ItemsPool; use super::items_pool::ItemsPool;
use super::merge;
use crate::documents::{ use crate::documents::{
obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey,
}; };
use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::{AvailableDocumentsIds, IndexDocumentsMethod}; use crate::update::{AvailableDocumentsIds, IndexDocumentsMethod};
use crate::{ use crate::{
DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError, Result, UserError, CboRoaringBitmapCodec, DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError,
Result, UserError,
}; };
pub type KvReaderFieldId = obkv2::KvReader<FieldId>; pub type KvReaderFieldId = obkv2::KvReader<FieldId>;
pub type KvReaderDelAdd = obkv2::KvReader<DelAdd>;
pub type KvWriterFieldId<W> = obkv2::KvWriter<W, FieldId>; pub type KvWriterFieldId<W> = obkv2::KvWriter<W, FieldId>;
pub type KvWriterDelAdd<W> = obkv2::KvWriter<W, DelAdd>;
pub struct DocumentOperationIndexer { pub struct DocumentOperationIndexer {
operations: Vec<Payload>, operations: Vec<Payload>,
@ -355,7 +363,101 @@ mod indexer {
pub struct UpdateByFunctionIndexer; pub struct UpdateByFunctionIndexer;
// fn enum Operation {
Write(RoaringBitmap),
Delete,
Ignore,
}
/// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap.
fn merge_cbo_bitmaps(
current: Option<&[u8]>,
del: Option<&[u8]>,
add: Option<&[u8]>,
) -> Result<Operation> {
let bitmap = match current {
Some(current_bitmap_bytes) => {
let bitmap_without_del = match del {
Some(del_bytes) => {
let del_bitmap = CboRoaringBitmapCodec::deserialize_from(del_bytes)?;
CboRoaringBitmapCodec::intersection_with_serialized(
current_bitmap_bytes,
&del_bitmap,
)?
}
None => CboRoaringBitmapCodec::deserialize_from(current_bitmap_bytes)?,
};
match add {
Some(add_bytes) => {
let add = CboRoaringBitmapCodec::deserialize_from(add_bytes)?;
bitmap_without_del | add
}
None => bitmap_without_del,
}
}
None => match add {
Some(add_bytes) => CboRoaringBitmapCodec::deserialize_from(add_bytes)?,
None => return Ok(Operation::Ignore),
},
};
if bitmap.is_empty() {
Ok(Operation::Delete)
} else {
Ok(Operation::Write(bitmap))
}
}
/// Return the slice directly from the serialize_into method
fn cbo_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec<u8>) -> &'b [u8] {
buffer.clear();
CboRoaringBitmapCodec::serialize_into(bitmap, buffer);
buffer.as_slice()
}
/// TODO We must return some infos/stats
fn merge_grenad_entries(
receiver: MergerReceiver,
sender: MergerSender,
rtxn: &RoTxn,
index: &Index,
) -> Result<()> {
let mut buffer = Vec::new();
for merger_operation in receiver {
match merger_operation {
MergerOperation::WordDocidsCursors(cursors) => {
let sender = sender.word_docids();
let database = index.word_docids.remap_types::<Bytes, Bytes>();
let mut builder = grenad2::MergerBuilder::new(merge::DelAddRoaringBitmapMerger);
builder.extend(cursors);
/// TODO manage the error correctly
let mut merger_iter = builder.build().into_stream_merger_iter().unwrap();
// TODO manage the error correctly
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
let value = cbo_serialize_into_vec(&bitmap, &mut buffer);
sender.write(key, value).unwrap();
}
Operation::Delete => sender.delete(key).unwrap(),
Operation::Ignore => (),
}
}
}
}
}
Ok(())
}
/// Reads the previous version of a document from the database, the new versions /// Reads the previous version of a document from the database, the new versions
/// in the grenad update files and merges them to generate a new boxed obkv. /// in the grenad update files and merges them to generate a new boxed obkv.