From d79f75f63045586aa7b027c24e8157be3f7bcac1 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Wed, 2 Oct 2024 11:32:19 +0200 Subject: [PATCH] Compute and Write external-documents-ids database --- milli/src/update/new/channel.rs | 31 ++++++++++--- milli/src/update/new/document_change.rs | 44 ++++++++++++++----- .../update/new/indexer/document_deletion.rs | 20 +++++++-- .../update/new/indexer/document_operation.rs | 14 +++--- milli/src/update/new/indexer/mod.rs | 19 +++----- milli/src/update/new/indexer/partial_dump.rs | 2 +- milli/src/update/new/merger.rs | 29 ++++++++---- 7 files changed, 111 insertions(+), 48 deletions(-) diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 10c0a706b..bd06b5123 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -122,6 +122,7 @@ pub struct WriterOperation { pub enum Database { Documents, + ExternalDocumentsIds, ExactWordDocids, FidWordCountDocids, Main, @@ -140,6 +141,7 @@ impl Database { pub fn database(&self, index: &Index) -> heed::Database { match self { Database::Documents => index.documents.remap_types(), + Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(), Database::Main => index.main.remap_types(), Database::WordDocids => index.word_docids.remap_types(), @@ -431,6 +433,7 @@ impl DocumentsSender<'_> { pub fn uncompressed( &self, docid: DocumentId, + external_id: String, document: &KvReaderFieldId, ) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value( @@ -440,14 +443,29 @@ impl DocumentsSender<'_> { match self.0.send(WriterOperation { database: Database::Documents, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), + }?; + + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value( + external_id.as_bytes(), + &docid.to_be_bytes(), + )); + match self.0.send(WriterOperation { database: Database::ExternalDocumentsIds, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), } } - pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Delete(KeyEntry::from_key(&docid.to_be_bytes())); match self.0.send(WriterOperation { database: Database::Documents, entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), + }?; + + let entry = EntryOperation::Delete(KeyEntry::from_key(external_id.as_bytes())); + match self.0.send(WriterOperation { database: Database::ExternalDocumentsIds, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), } } } @@ -460,8 +478,8 @@ pub enum MergerOperation { WordPairProximityDocidsMerger(Merger), WordPositionDocidsMerger(Merger), FacetDocidsMerger(Merger), - DeleteDocument { docid: DocumentId }, - InsertDocument { docid: DocumentId, document: Box }, + DeleteDocument { docid: DocumentId, external_id: String }, + InsertDocument { docid: DocumentId, external_id: String, document: Box }, FinishedDocument, } @@ -500,18 +518,19 @@ impl DocumentSender<'_> { pub fn insert( &self, docid: DocumentId, + external_id: String, document: Box, ) -> StdResult<(), SendError<()>> { let sender = self.0.unwrap(); - match sender.send(MergerOperation::InsertDocument { docid, document }) { + match sender.send(MergerOperation::InsertDocument { docid, external_id, document }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } - pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> { let sender = self.0.unwrap(); - match sender.send(MergerOperation::DeleteDocument { docid }) { + match sender.send(MergerOperation::DeleteDocument { docid, external_id }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } diff --git a/milli/src/update/new/document_change.rs b/milli/src/update/new/document_change.rs index 3e6473e77..7be8d1958 100644 --- a/milli/src/update/new/document_change.rs +++ b/milli/src/update/new/document_change.rs @@ -11,19 +11,22 @@ pub enum DocumentChange { } pub struct Deletion { - docid: DocumentId, + pub docid: DocumentId, + pub external_document_id: String, current: Box, } pub struct Update { - docid: DocumentId, + pub docid: DocumentId, + pub external_document_id: String, current: Box, - new: Box, + pub new: Box, } pub struct Insertion { - docid: DocumentId, - new: Box, + pub docid: DocumentId, + pub external_document_id: String, + pub new: Box, } impl DocumentChange { @@ -37,14 +40,22 @@ impl DocumentChange { } impl Deletion { - pub fn create(docid: DocumentId, current: Box) -> Self { - Self { docid, current } + pub fn create( + docid: DocumentId, + external_document_id: String, + current: Box, + ) -> Self { + Self { docid, external_document_id, current } } pub fn docid(&self) -> DocumentId { self.docid } + pub fn external_document_id(&self) -> &str { + &self.external_document_id + } + // TODO shouldn't we use the one in self? pub fn current<'a>( &self, @@ -56,14 +67,22 @@ impl Deletion { } impl Insertion { - pub fn create(docid: DocumentId, new: Box) -> Self { - Insertion { docid, new } + pub fn create( + docid: DocumentId, + external_document_id: String, + new: Box, + ) -> Self { + Insertion { docid, external_document_id, new } } pub fn docid(&self) -> DocumentId { self.docid } + pub fn external_document_id(&self) -> &str { + &self.external_document_id + } + pub fn new(&self) -> &KvReader { self.new.as_ref() } @@ -72,16 +91,21 @@ impl Insertion { impl Update { pub fn create( docid: DocumentId, + external_document_id: String, current: Box, new: Box, ) -> Self { - Update { docid, current, new } + Update { docid, external_document_id, current, new } } pub fn docid(&self) -> DocumentId { self.docid } + pub fn external_document_id(&self) -> &str { + &self.external_document_id + } + pub fn current<'a>( &self, rtxn: &'a RoTxn, diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index 400b51af6..21d7635c9 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -4,9 +4,11 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; +use crate::documents::PrimaryKey; +use crate::index::db_name::EXTERNAL_DOCUMENTS_IDS; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, DocumentChange}; -use crate::{Error, FieldsIdsMap, Index, Result}; +use crate::{Error, FieldsIdsMap, Index, InternalError, Result}; pub struct DocumentDeletion { pub to_delete: RoaringBitmap, @@ -23,7 +25,7 @@ impl DocumentDeletion { } impl<'p> DocumentChanges<'p> for DocumentDeletion { - type Parameter = &'p Index; + type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>); fn document_changes( self, @@ -34,13 +36,23 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { + Clone + 'p, > { - let index = param; + let (index, fields_ids_map, primary_key) = param; let to_delete: Vec<_> = self.to_delete.into_iter().collect(); Ok(to_delete.into_par_iter().try_map_try_init( || index.read_txn().map_err(crate::Error::from), |rtxn, docid| { let current = index.document(rtxn, docid)?; - Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) + let external_document_id = primary_key + .document_id(¤t, fields_ids_map)? + .map_err(|_| InternalError::DatabaseMissingEntry { + db_name: EXTERNAL_DOCUMENTS_IDS, + key: None, + })?; + Ok(DocumentChange::Deletion(Deletion::create( + docid, + external_document_id, + current.boxed(), + ))) }, )) } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index f9e1bb8f3..7341f4e5c 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -288,15 +288,17 @@ impl MergeChanges for MergeDocumentForReplacement { match current { Some(current) => { - let update = Update::create(docid, current.boxed(), new); + let update = Update::create(docid, external_docid, current.boxed(), new); Ok(DocumentChange::Update(update)) } - None => Ok(DocumentChange::Insertion(Insertion::create(docid, new))), + None => { + Ok(DocumentChange::Insertion(Insertion::create(docid, external_docid, new))) + } } } Some(InnerDocOp::Deletion) => { let deletion = match current { - Some(current) => Deletion::create(docid, current.boxed()), + Some(current) => Deletion::create(docid, external_docid, current.boxed()), None => todo!("Do that with Louis"), }; Ok(DocumentChange::Deletion(deletion)) @@ -355,7 +357,7 @@ impl MergeChanges for MergeDocumentForUpdates { if operations.is_empty() { let deletion = match current { - Some(current) => Deletion::create(docid, current.boxed()), + Some(current) => Deletion::create(docid, external_docid, current.boxed()), None => todo!("Do that with Louis"), }; return Ok(DocumentChange::Deletion(deletion)); @@ -382,11 +384,11 @@ impl MergeChanges for MergeDocumentForUpdates { match current { Some(current) => { - let update = Update::create(docid, current.boxed(), new); + let update = Update::create(docid, external_docid, current.boxed(), new); Ok(DocumentChange::Update(update)) } None => { - let insertion = Insertion::create(docid, new); + let insertion = Insertion::create(docid, external_docid, new); Ok(DocumentChange::Insertion(insertion)) } } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 28165c3a8..17de2b310 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -11,7 +11,7 @@ use rayon::ThreadPool; pub use update_by_function::UpdateByFunction; use super::channel::*; -use super::document_change::DocumentChange; +use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::extract::*; use super::merger::merge_grenad_entries; use super::word_fst_builder::PrefixDelta; @@ -84,19 +84,14 @@ where document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>( |result| { match result? { - DocumentChange::Deletion(deletion) => { - let docid = deletion.docid(); - document_sender.delete(docid).unwrap(); + DocumentChange::Deletion(Deletion { docid, external_document_id, ..}) => { + document_sender.delete(docid, external_document_id).unwrap(); } - DocumentChange::Update(update) => { - let docid = update.docid(); - let content = update.new(); - document_sender.insert(docid, content.boxed()).unwrap(); + DocumentChange::Update(Update { docid, external_document_id, new, ..}) => { + document_sender.insert(docid, external_document_id, new).unwrap(); } - DocumentChange::Insertion(insertion) => { - let docid = insertion.docid(); - let content = insertion.new(); - document_sender.insert(docid, content.boxed()).unwrap(); + DocumentChange::Insertion(Insertion { docid, external_document_id, new, ..}) => { + document_sender.insert(docid, external_document_id, new).unwrap(); // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 325e13cc4..08b97b931 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -77,7 +77,7 @@ where } }?; - let insertion = Insertion::create(docid, document); + let insertion = Insertion::create(docid, external_docid, document); Ok(DocumentChange::Insertion(insertion)) }, )) diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 0d80f75ec..c010a5d83 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -136,37 +136,48 @@ pub fn merge_grenad_entries( |_, _key| Ok(()), )?; } - MergerOperation::InsertDocument { docid, document } => { + MergerOperation::InsertDocument { docid, external_id, document } => { let span = tracing::trace_span!(target: "indexing::documents::merge", "insert_document"); let _entered = span.enter(); documents_ids.insert(docid); - sender.documents().uncompressed(docid, &document).unwrap(); + sender.documents().uncompressed(docid, external_id.clone(), &document).unwrap(); if let Some(geo_extractor) = geo_extractor.as_mut() { let current = index.documents.remap_data_type::().get(rtxn, &docid)?; let current: Option<&KvReaderFieldId> = current.map(Into::into); let change = match current { - Some(current) => { - DocumentChange::Update(Update::create(docid, current.boxed(), document)) - } - None => DocumentChange::Insertion(Insertion::create(docid, document)), + Some(current) => DocumentChange::Update(Update::create( + docid, + external_id, + current.boxed(), + document, + )), + None => DocumentChange::Insertion(Insertion::create( + docid, + external_id, + document, + )), }; geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; } } - MergerOperation::DeleteDocument { docid } => { + MergerOperation::DeleteDocument { docid, external_id } => { let span = tracing::trace_span!(target: "indexing::documents::merge", "delete_document"); let _entered = span.enter(); if !documents_ids.remove(docid) { unreachable!("Tried deleting a document that we do not know about"); } - sender.documents().delete(docid).unwrap(); + sender.documents().delete(docid, external_id.clone()).unwrap(); if let Some(geo_extractor) = geo_extractor.as_mut() { let current = index.document(rtxn, docid)?; - let change = DocumentChange::Deletion(Deletion::create(docid, current.boxed())); + let change = DocumentChange::Deletion(Deletion::create( + docid, + external_id, + current.boxed(), + )); geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; } }