use bumpalo::collections::CollectIn; use bumpalo::Bump; use hashbrown::hash_map::Entry; use heed::RoTxn; use memmap2::Mmap; use raw_collections::RawMap; use rayon::slice::ParallelSlice; use serde_json::value::RawValue; use serde_json::Deserializer; use super::super::document_change::DocumentChange; use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; use super::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::update::new::document::Versions; use crate::update::new::{Deletion, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; pub struct DocumentOperation<'pl> { operations: Vec>, method: MergeMethod, } impl<'pl> DocumentOperation<'pl> { pub fn new(method: IndexDocumentsMethod) -> Self { Self { operations: Default::default(), method: MergeMethod::from(method) } } /// TODO please give me a type /// The payload is expected to be in the grenad format pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<()> { payload.advise(memmap2::Advice::Sequential)?; self.operations.push(Payload::Addition(&payload[..])); Ok(()) } pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) { self.operations.push(Payload::Deletion(to_delete)) } pub fn into_changes( self, indexer: &'pl Bump, index: &Index, rtxn: &'pl RoTxn<'pl>, primary_key_from_op: Option<&'pl str>, new_fields_ids_map: &mut FieldsIdsMap, ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> { let Self { operations, method } = self; let documents_ids = index.documents_ids(rtxn)?; let mut operations_stats = Vec::new(); let mut available_docids = AvailableIds::new(&documents_ids); let mut docids_version_offsets = hashbrown::HashMap::new(); let mut primary_key = None; for operation in operations { let (bytes, document_count, result) = match operation { Payload::Addition(payload) => extract_addition_payload_changes( indexer, index, rtxn, primary_key_from_op, &mut primary_key, new_fields_ids_map, &mut available_docids, &docids_version_offsets, method, payload, ), Payload::Deletion(to_delete) => extract_deletion_payload_changes( index, rtxn, &mut available_docids, &docids_version_offsets, method, to_delete, ), }; let error = match result { Ok(new_docids_version_offsets) => { // If we don't have any error then we can merge the content of this payload // into to main payload. Else we just drop this payload extraction. merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets); None } Err(Error::UserError(user_error)) => Some(user_error), Err(e) => return Err(e), }; operations_stats.push(PayloadStats { document_count, bytes, error }); } // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = docids_version_offsets.drain().collect_in(indexer); // Reorder the offsets to make sure we iterate on the file sequentially // And finally sort them docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations)); let docids_version_offsets = docids_version_offsets.into_bump_slice(); Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) } } fn extract_addition_payload_changes<'r, 'pl: 'r>( indexer: &'pl Bump, index: &Index, rtxn: &'r RoTxn<'r>, primary_key_from_op: Option<&'r str>, primary_key: &mut Option>, new_fields_ids_map: &mut FieldsIdsMap, available_docids: &mut AvailableIds, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, method: MergeMethod, payload: &'pl [u8], ) -> (u64, u64, Result>>) { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); /// TODO manage the error let mut previous_offset = 0; let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>(); loop { let optdoc = match iter.next().transpose() { Ok(optdoc) => optdoc, Err(e) => { return ( payload.len() as u64, new_docids_version_offsets.len() as u64, Err(InternalError::SerdeJson(e).into()), ) } }; // Only guess the primary key if it is the first document let retrieved_primary_key = if previous_offset == 0 { let optdoc = match optdoc { Some(doc) => match RawMap::from_raw_value(doc, indexer) { Ok(docmap) => Some(docmap), Err(error) => { return ( payload.len() as u64, new_docids_version_offsets.len() as u64, Err(Error::UserError(UserError::SerdeJson(error))), ) } }, None => None, }; let result = retrieve_or_guess_primary_key( rtxn, index, new_fields_ids_map, primary_key_from_op, optdoc, ); let (pk, _has_been_changed) = match result { Ok(Ok(pk)) => pk, Ok(Err(user_error)) => { return ( payload.len() as u64, new_docids_version_offsets.len() as u64, Err(Error::UserError(user_error)), ) } Err(error) => { return ( payload.len() as u64, new_docids_version_offsets.len() as u64, Err(error), ) } }; primary_key.get_or_insert(pk) } else { primary_key.as_ref().unwrap() }; let doc = match optdoc { Some(doc) => doc, None => break, }; let external_id = match retrieved_primary_key.extract_fields_and_docid( doc, new_fields_ids_map, indexer, ) { Ok(edi) => edi, Err(e) => { return (payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e)) } }; let external_id = external_id.to_de(); let current_offset = iter.byte_offset(); let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] }; match main_docids_version_offsets.get(external_id) { None => { let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) { Ok(Some(docid)) => (docid, false), Ok(None) => ( match available_docids.next() { Some(docid) => docid, None => { return ( payload.len() as u64, new_docids_version_offsets.len() as u64, Err(UserError::DocumentLimitReached.into()), ) } }, true, ), Err(e) => { return ( payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e.into()), ) } }; match new_docids_version_offsets.entry(external_id) { Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset), Entry::Vacant(entry) => { entry.insert(PayloadOperations::new_addition( method, docid, is_new, document_offset, )); } } } Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset), Entry::Vacant(entry) => { entry.insert(PayloadOperations::new_addition( method, payload_operations.docid, payload_operations.is_new, document_offset, )); } }, } previous_offset = iter.byte_offset(); } (payload.len() as u64, new_docids_version_offsets.len() as u64, Ok(new_docids_version_offsets)) } fn extract_deletion_payload_changes<'s, 'pl: 's>( index: &Index, rtxn: &RoTxn, available_docids: &mut AvailableIds, main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, method: MergeMethod, to_delete: &'pl [&'pl str], ) -> (u64, u64, Result>>) { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); let mut document_count = 0; for external_id in to_delete { match main_docids_version_offsets.get(external_id) { None => { let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) { Ok(Some(docid)) => (docid, false), Ok(None) => ( match available_docids.next() { Some(docid) => docid, None => { return ( 0, new_docids_version_offsets.len() as u64, Err(UserError::DocumentLimitReached.into()), ) } }, true, ), Err(e) => return (0, new_docids_version_offsets.len() as u64, Err(e.into())), }; match new_docids_version_offsets.entry(external_id) { Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), Entry::Vacant(entry) => { entry.insert(PayloadOperations::new_deletion(method, docid, is_new)); } } } Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), Entry::Vacant(entry) => { entry.insert(PayloadOperations::new_deletion( method, payload_operations.docid, payload_operations.is_new, )); } }, } document_count += 1; } (0, document_count, Ok(new_docids_version_offsets)) } fn merge_version_offsets<'s, 'pl>( main: &mut hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, new: hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, ) { // We cannot swap like nothing because documents // operations must be in the right order. if main.is_empty() { return *main = new; } for (key, new_payload) in new { match main.entry(key) { Entry::Occupied(mut entry) => entry.get_mut().append_operations(new_payload.operations), Entry::Vacant(entry) => { entry.insert(new_payload); } } } } impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { type Item = (&'pl str, PayloadOperations<'pl>); fn iter( &self, chunk_size: usize, ) -> impl rayon::prelude::IndexedParallelIterator> { self.docids_version_offsets.par_chunks(chunk_size) } fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, context: &'doc DocumentChangeContext, item: &'doc Self::Item, ) -> Result>> where 'pl: 'doc, { let (external_doc, payload_operations) = item; payload_operations.merge_method.merge( payload_operations.docid, external_doc, payload_operations.is_new, &context.doc_alloc, &payload_operations.operations[..], ) } fn len(&self) -> usize { self.docids_version_offsets.len() } } pub struct DocumentOperationChanges<'pl> { docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)], } pub enum Payload<'pl> { Addition(&'pl [u8]), Deletion(&'pl [&'pl str]), } pub struct PayloadStats { pub bytes: u64, pub document_count: u64, pub error: Option, } pub struct PayloadOperations<'pl> { /// The internal document id of the document. pub docid: DocumentId, /// Wether this document is not in the current database (visible by the rtxn). pub is_new: bool, /// The operations to perform, in order, on this document. pub operations: Vec>, /// The merge method we are using to merge payloads and documents. merge_method: MergeMethod, } impl<'pl> PayloadOperations<'pl> { fn new_deletion(merge_method: MergeMethod, docid: DocumentId, is_new: bool) -> Self { Self { docid, is_new, operations: vec![InnerDocOp::Deletion], merge_method } } fn new_addition( merge_method: MergeMethod, docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>, ) -> Self { Self { docid, is_new, operations: vec![InnerDocOp::Addition(offset)], merge_method } } } impl<'pl> PayloadOperations<'pl> { fn push_addition(&mut self, offset: DocumentOffset<'pl>) { if self.merge_method.useless_previous_changes() { self.operations.clear(); } self.operations.push(InnerDocOp::Addition(offset)) } fn push_deletion(&mut self) { self.operations.clear(); self.operations.push(InnerDocOp::Deletion); } fn append_operations(&mut self, mut operations: Vec>) { debug_assert!(!operations.is_empty()); if self.merge_method.useless_previous_changes() { self.operations.clear(); } self.operations.append(&mut operations); } } #[derive(Clone)] pub enum InnerDocOp<'pl> { Addition(DocumentOffset<'pl>), Deletion, } /// Represents an offset where a document lives /// in an mmapped grenad reader file. #[derive(Clone)] pub struct DocumentOffset<'pl> { /// The mmapped payload files. pub content: &'pl [u8], } trait MergeChanges { /// Whether the payloads in the list of operations are useless or not. fn useless_previous_changes(&self) -> bool; /// Returns a key that is used to order the payloads the right way. fn sort_key(&self, docops: &[InnerDocOp]) -> usize; fn merge<'doc>( &self, docid: DocumentId, external_docid: &'doc str, is_new: bool, doc_alloc: &'doc Bump, operations: &'doc [InnerDocOp], ) -> Result>>; } #[derive(Debug, Clone, Copy)] enum MergeMethod { ForReplacement(MergeDocumentForReplacement), ForUpdates(MergeDocumentForUpdates), } impl MergeChanges for MergeMethod { fn useless_previous_changes(&self) -> bool { match self { MergeMethod::ForReplacement(merge) => merge.useless_previous_changes(), MergeMethod::ForUpdates(merge) => merge.useless_previous_changes(), } } fn sort_key(&self, docops: &[InnerDocOp]) -> usize { match self { MergeMethod::ForReplacement(merge) => merge.sort_key(docops), MergeMethod::ForUpdates(merge) => merge.sort_key(docops), } } fn merge<'doc>( &self, docid: DocumentId, external_docid: &'doc str, is_new: bool, doc_alloc: &'doc Bump, operations: &'doc [InnerDocOp], ) -> Result>> { match self { MergeMethod::ForReplacement(merge) => { merge.merge(docid, external_docid, is_new, doc_alloc, operations) } MergeMethod::ForUpdates(merge) => { merge.merge(docid, external_docid, is_new, doc_alloc, operations) } } } } impl From for MergeMethod { fn from(method: IndexDocumentsMethod) -> Self { match method { IndexDocumentsMethod::ReplaceDocuments => { MergeMethod::ForReplacement(MergeDocumentForReplacement) } IndexDocumentsMethod::UpdateDocuments => { MergeMethod::ForUpdates(MergeDocumentForUpdates) } } } } #[derive(Debug, Clone, Copy)] struct MergeDocumentForReplacement; impl MergeChanges for MergeDocumentForReplacement { fn useless_previous_changes(&self) -> bool { true } /// Reorders to read only the last change. fn sort_key(&self, docops: &[InnerDocOp]) -> usize { let f = |ido: &_| match ido { InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), InnerDocOp::Deletion => None, }; docops.iter().rev().find_map(f).unwrap_or(0) } /// Returns only the most recent version of a document based on the updates from the payloads. /// /// This function is only meant to be used when doing a replacement and not an update. fn merge<'doc>( &self, docid: DocumentId, external_doc: &'doc str, is_new: bool, doc_alloc: &'doc Bump, operations: &'doc [InnerDocOp], ) -> Result>> { match operations.last() { Some(InnerDocOp::Addition(DocumentOffset { content })) => { let document = serde_json::from_slice(content).unwrap(); let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) .map_err(UserError::SerdeJson)?; if is_new { Ok(Some(DocumentChange::Insertion(Insertion::create( docid, external_doc, Versions::single(document), )))) } else { Ok(Some(DocumentChange::Update(Update::create( docid, external_doc, Versions::single(document), true, )))) } } Some(InnerDocOp::Deletion) => { return if is_new { Ok(None) } else { let deletion = Deletion::create(docid, external_doc); Ok(Some(DocumentChange::Deletion(deletion))) }; } None => unreachable!("We must not have empty set of operations on a document"), } } } #[derive(Debug, Clone, Copy)] struct MergeDocumentForUpdates; impl MergeChanges for MergeDocumentForUpdates { fn useless_previous_changes(&self) -> bool { false } /// Reorders to read the first changes first so that it's faster to read the first one and then the rest. fn sort_key(&self, docops: &[InnerDocOp]) -> usize { let f = |ido: &_| match ido { InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), InnerDocOp::Deletion => None, }; docops.iter().find_map(f).unwrap_or(0) } /// Reads the previous version of a document from the database, the new versions /// in the grenad update files and merges them to generate a new boxed obkv. /// /// This function is only meant to be used when doing an update and not a replacement. fn merge<'doc>( &self, docid: DocumentId, external_docid: &'doc str, is_new: bool, doc_alloc: &'doc Bump, operations: &'doc [InnerDocOp], ) -> Result>> { if operations.is_empty() { unreachable!("We must not have empty set of operations on a document"); } let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion)); let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; let has_deletion = last_deletion.is_some(); if operations.is_empty() { return if is_new { Ok(None) } else { let deletion = Deletion::create(docid, external_docid); Ok(Some(DocumentChange::Deletion(deletion))) }; } let versions = match operations { [single] => { let DocumentOffset { content } = match single { InnerDocOp::Addition(offset) => offset, InnerDocOp::Deletion => { unreachable!("Deletion in document operations") } }; let document = serde_json::from_slice(content).unwrap(); let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) .map_err(UserError::SerdeJson)?; Some(Versions::single(document)) } operations => { let versions = operations.iter().map(|operation| { let DocumentOffset { content } = match operation { InnerDocOp::Addition(offset) => offset, InnerDocOp::Deletion => { unreachable!("Deletion in document operations") } }; let document = serde_json::from_slice(content).unwrap(); let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) .map_err(UserError::SerdeJson)?; Ok(document) }); Versions::multiple(versions)? } }; let Some(versions) = versions else { return Ok(None) }; if is_new { Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, versions)))) } else { Ok(Some(DocumentChange::Update(Update::create( docid, external_docid, versions, has_deletion, )))) } } }