From 637a9c8bdd1c8cf5325998f900feaf3993ead9fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 29 Aug 2024 12:06:44 +0200 Subject: [PATCH] Implement the document merge function for the update method --- milli/src/documents/reader.rs | 13 ++++++ milli/src/update/new/document_change.rs | 19 +++++++- milli/src/update/new/mod.rs | 58 ++++++++++++++++--------- 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/milli/src/documents/reader.rs b/milli/src/documents/reader.rs index c7c125c80..ebdc514fd 100644 --- a/milli/src/documents/reader.rs +++ b/milli/src/documents/reader.rs @@ -72,6 +72,19 @@ impl DocumentsBatchCursor { } impl DocumentsBatchCursor { + /// Returns a single document from the database. + pub fn get( + &mut self, + offset: u32, + ) -> Result>, DocumentsBatchCursorError> { + match self.cursor.move_on_key_equal_to(offset.to_be_bytes())? { + Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => { + Ok(Some(KvReader::new(value))) + } + _otherwise => Ok(None), + } + } + /// Returns the next document, starting from the first one. Subsequent calls to /// `next_document` advance the document reader until all the documents have been read. pub fn next_document( diff --git a/milli/src/update/new/document_change.rs b/milli/src/update/new/document_change.rs index e7c8bf012..311e22404 100644 --- a/milli/src/update/new/document_change.rs +++ b/milli/src/update/new/document_change.rs @@ -40,7 +40,11 @@ impl DocumentChange { } impl Deletion { - pub fn new(docid: DocumentId, external_docid: String, current: Box) -> Self { + pub fn create( + docid: DocumentId, + external_docid: String, + current: Box, + ) -> Self { Self { docid, external_docid, current } } @@ -54,6 +58,10 @@ impl Deletion { } impl Insertion { + pub fn create(docid: DocumentId, external_docid: String, new: Box) -> Self { + Insertion { docid, external_docid, new } + } + fn docid(&self) -> DocumentId { self.docid } @@ -64,6 +72,15 @@ impl Insertion { } impl Update { + pub fn create( + docid: DocumentId, + external_docid: String, + current: Box, + new: Box, + ) -> Self { + Update { docid, external_docid, current, new } + } + fn docid(&self) -> DocumentId { self.docid } diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 41b04219f..e5d376534 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -5,19 +5,21 @@ mod items_pool; mod global_fields_ids_map; mod indexer { + use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; use std::fs::File; use std::io::Cursor; use std::os::unix::fs::MetadataExt; use std::sync::Arc; + use heed::types::Bytes; use heed::RoTxn; use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use roaring::RoaringBitmap; use serde_json::Value; - use super::document_change::{self, DocumentChange}; + use super::document_change::{self, DocumentChange, Insertion, Update}; use super::items_pool::ItemsPool; use crate::documents::{ obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, @@ -28,7 +30,7 @@ mod indexer { }; pub type KvReaderFieldId = obkv2::KvReader; - pub type KvWriterFieldId = obkv2::KvWriter; + pub type KvWriterFieldId = obkv2::KvWriter; pub struct DocumentOperationIndexer { operations: Vec, @@ -293,9 +295,13 @@ mod indexer { ) .into()), }?; - Ok(DocumentChange::Deletion(document_change::Deletion::new( + + /// TODO create a function for this + let document = document.as_bytes().to_vec().into_boxed_slice().into(); + Ok(DocumentChange::Deletion(document_change::Deletion::create( docid, external_docid, + document, ))) }) })) @@ -358,14 +364,13 @@ mod indexer { external_docid: String, operations: &[DocumentOperation], ) -> Result> { - let mut document = BTreeMap::new(); - let original_obkv = - index.documents.remap_data_type::().get(rtxn, &docid)?; - let original_obkv: Option<&KvReaderFieldId> = original_obkv.map(Into::into); + let mut document = BTreeMap::<_, Cow<_>>::new(); + let original = index.documents.remap_data_type::().get(rtxn, &docid)?; + let original: Option<&KvReaderFieldId> = original.map(Into::into); - if let Some(original_obkv) = original_obkv { - original_obkv.into_iter().for_each(|(k, v)| { - document.insert(k, v.to_vec()); + if let Some(original) = original { + original.into_iter().for_each(|(k, v)| { + document.insert(k, v.into()); }); } @@ -376,10 +381,10 @@ mod indexer { let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; if operations.is_empty() { - match original_obkv { + match original { Some(original_obkv) => { let current = original_obkv.as_bytes().to_vec().into_boxed_slice().into(); - return Ok(Some(DocumentChange::Deletion(document_change::Deletion::new( + return Ok(Some(DocumentChange::Deletion(document_change::Deletion::create( docid, external_docid, current, @@ -390,25 +395,38 @@ mod indexer { } for operation in operations { - let DocumentOffset { content, offset } = ; + let DocumentOffset { content, offset } = match operation { + DocumentOperation::Addition(offset) => offset, + DocumentOperation::Deletion => unreachable!("Deletion in document operations"), + }; let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); - let obkv = cursor.get(*offset)?.expect("must exists"); + let update = cursor.get(*offset)?.expect("must exists"); - obkv.into_iter().for_each(|(k, v)| { + update.into_iter().for_each(|(k, v)| { let field_name = batch_index.name(k).unwrap(); let id = fields_ids_map.id(field_name).unwrap(); - document.insert(id, v.to_vec()); + document.insert(id, v.to_vec().into()); }); } let mut writer = KvWriterFieldId::memory(); document.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); - let boxed = writer.into_inner().unwrap().into_boxed_slice(); + /// TODO create a function for this conversion + let new = writer.into_inner().unwrap().into_boxed_slice().into(); - // Box - - Ok(boxed.into()) + match original { + Some(original) => { + /// TODO create a function for this conversion + let current = original.as_bytes().to_vec().into_boxed_slice().into(); + let update = Update::create(docid, external_docid, current, new); + Ok(Some(DocumentChange::Update(update))) + } + None => { + let insertion = Insertion::create(docid, external_docid, new); + Ok(Some(DocumentChange::Insertion(insertion))) + } + } } }