Compute and Write external-documents-ids database

This commit is contained in:
ManyTheFish 2024-10-02 11:32:19 +02:00
parent ccf01c2471
commit d79f75f630
7 changed files with 111 additions and 48 deletions

View File

@ -122,6 +122,7 @@ pub struct WriterOperation {
pub enum Database {
Documents,
ExternalDocumentsIds,
ExactWordDocids,
FidWordCountDocids,
Main,
@ -140,6 +141,7 @@ impl Database {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
match self {
Database::Documents => index.documents.remap_types(),
Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(),
Database::ExactWordDocids => index.exact_word_docids.remap_types(),
Database::Main => index.main.remap_types(),
Database::WordDocids => index.word_docids.remap_types(),
@ -431,6 +433,7 @@ impl DocumentsSender<'_> {
pub fn uncompressed(
&self,
docid: DocumentId,
external_id: String,
document: &KvReaderFieldId,
) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(
@ -440,14 +443,29 @@ impl DocumentsSender<'_> {
match self.0.send(WriterOperation { database: Database::Documents, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}?;
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(
external_id.as_bytes(),
&docid.to_be_bytes(),
));
match self.0.send(WriterOperation { database: Database::ExternalDocumentsIds, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(&docid.to_be_bytes()));
match self.0.send(WriterOperation { database: Database::Documents, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}?;
let entry = EntryOperation::Delete(KeyEntry::from_key(external_id.as_bytes()));
match self.0.send(WriterOperation { database: Database::ExternalDocumentsIds, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
@ -460,8 +478,8 @@ pub enum MergerOperation {
WordPairProximityDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
FacetDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
DeleteDocument { docid: DocumentId },
InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> },
DeleteDocument { docid: DocumentId, external_id: String },
InsertDocument { docid: DocumentId, external_id: String, document: Box<KvReaderFieldId> },
FinishedDocument,
}
@ -500,18 +518,19 @@ impl DocumentSender<'_> {
pub fn insert(
&self,
docid: DocumentId,
external_id: String,
document: Box<KvReaderFieldId>,
) -> StdResult<(), SendError<()>> {
let sender = self.0.unwrap();
match sender.send(MergerOperation::InsertDocument { docid, document }) {
match sender.send(MergerOperation::InsertDocument { docid, external_id, document }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> {
let sender = self.0.unwrap();
match sender.send(MergerOperation::DeleteDocument { docid }) {
match sender.send(MergerOperation::DeleteDocument { docid, external_id }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}

View File

@ -11,19 +11,22 @@ pub enum DocumentChange {
}
pub struct Deletion {
docid: DocumentId,
pub docid: DocumentId,
pub external_document_id: String,
current: Box<KvReaderFieldId>,
}
pub struct Update {
docid: DocumentId,
pub docid: DocumentId,
pub external_document_id: String,
current: Box<KvReaderFieldId>,
new: Box<KvReaderFieldId>,
pub new: Box<KvReaderFieldId>,
}
pub struct Insertion {
docid: DocumentId,
new: Box<KvReaderFieldId>,
pub docid: DocumentId,
pub external_document_id: String,
pub new: Box<KvReaderFieldId>,
}
impl DocumentChange {
@ -37,14 +40,22 @@ impl DocumentChange {
}
impl Deletion {
pub fn create(docid: DocumentId, current: Box<KvReaderFieldId>) -> Self {
Self { docid, current }
pub fn create(
docid: DocumentId,
external_document_id: String,
current: Box<KvReaderFieldId>,
) -> Self {
Self { docid, external_document_id, current }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &str {
&self.external_document_id
}
// TODO shouldn't we use the one in self?
pub fn current<'a>(
&self,
@ -56,14 +67,22 @@ impl Deletion {
}
impl Insertion {
pub fn create(docid: DocumentId, new: Box<KvReaderFieldId>) -> Self {
Insertion { docid, new }
pub fn create(
docid: DocumentId,
external_document_id: String,
new: Box<KvReaderFieldId>,
) -> Self {
Insertion { docid, external_document_id, new }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &str {
&self.external_document_id
}
pub fn new(&self) -> &KvReader<FieldId> {
self.new.as_ref()
}
@ -72,16 +91,21 @@ impl Insertion {
impl Update {
pub fn create(
docid: DocumentId,
external_document_id: String,
current: Box<KvReaderFieldId>,
new: Box<KvReaderFieldId>,
) -> Self {
Update { docid, current, new }
Update { docid, external_document_id, current, new }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &str {
&self.external_document_id
}
pub fn current<'a>(
&self,
rtxn: &'a RoTxn,

View File

@ -4,9 +4,11 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelIterator};
use roaring::RoaringBitmap;
use super::DocumentChanges;
use crate::documents::PrimaryKey;
use crate::index::db_name::EXTERNAL_DOCUMENTS_IDS;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::{Deletion, DocumentChange};
use crate::{Error, FieldsIdsMap, Index, Result};
use crate::{Error, FieldsIdsMap, Index, InternalError, Result};
pub struct DocumentDeletion {
pub to_delete: RoaringBitmap,
@ -23,7 +25,7 @@ impl DocumentDeletion {
}
impl<'p> DocumentChanges<'p> for DocumentDeletion {
type Parameter = &'p Index;
type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>);
fn document_changes(
self,
@ -34,13 +36,23 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion {
+ Clone
+ 'p,
> {
let index = param;
let (index, fields_ids_map, primary_key) = param;
let to_delete: Vec<_> = self.to_delete.into_iter().collect();
Ok(to_delete.into_par_iter().try_map_try_init(
|| index.read_txn().map_err(crate::Error::from),
|rtxn, docid| {
let current = index.document(rtxn, docid)?;
Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed())))
let external_document_id = primary_key
.document_id(&current, fields_ids_map)?
.map_err(|_| InternalError::DatabaseMissingEntry {
db_name: EXTERNAL_DOCUMENTS_IDS,
key: None,
})?;
Ok(DocumentChange::Deletion(Deletion::create(
docid,
external_document_id,
current.boxed(),
)))
},
))
}

View File

@ -288,15 +288,17 @@ impl MergeChanges for MergeDocumentForReplacement {
match current {
Some(current) => {
let update = Update::create(docid, current.boxed(), new);
let update = Update::create(docid, external_docid, current.boxed(), new);
Ok(DocumentChange::Update(update))
}
None => Ok(DocumentChange::Insertion(Insertion::create(docid, new))),
None => {
Ok(DocumentChange::Insertion(Insertion::create(docid, external_docid, new)))
}
}
}
Some(InnerDocOp::Deletion) => {
let deletion = match current {
Some(current) => Deletion::create(docid, current.boxed()),
Some(current) => Deletion::create(docid, external_docid, current.boxed()),
None => todo!("Do that with Louis"),
};
Ok(DocumentChange::Deletion(deletion))
@ -355,7 +357,7 @@ impl MergeChanges for MergeDocumentForUpdates {
if operations.is_empty() {
let deletion = match current {
Some(current) => Deletion::create(docid, current.boxed()),
Some(current) => Deletion::create(docid, external_docid, current.boxed()),
None => todo!("Do that with Louis"),
};
return Ok(DocumentChange::Deletion(deletion));
@ -382,11 +384,11 @@ impl MergeChanges for MergeDocumentForUpdates {
match current {
Some(current) => {
let update = Update::create(docid, current.boxed(), new);
let update = Update::create(docid, external_docid, current.boxed(), new);
Ok(DocumentChange::Update(update))
}
None => {
let insertion = Insertion::create(docid, new);
let insertion = Insertion::create(docid, external_docid, new);
Ok(DocumentChange::Insertion(insertion))
}
}

View File

@ -11,7 +11,7 @@ use rayon::ThreadPool;
pub use update_by_function::UpdateByFunction;
use super::channel::*;
use super::document_change::DocumentChange;
use super::document_change::{Deletion, DocumentChange, Insertion, Update};
use super::extract::*;
use super::merger::merge_grenad_entries;
use super::word_fst_builder::PrefixDelta;
@ -84,19 +84,14 @@ where
document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>(
|result| {
match result? {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
document_sender.delete(docid).unwrap();
DocumentChange::Deletion(Deletion { docid, external_document_id, ..}) => {
document_sender.delete(docid, external_document_id).unwrap();
}
DocumentChange::Update(update) => {
let docid = update.docid();
let content = update.new();
document_sender.insert(docid, content.boxed()).unwrap();
DocumentChange::Update(Update { docid, external_document_id, new, ..}) => {
document_sender.insert(docid, external_document_id, new).unwrap();
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let content = insertion.new();
document_sender.insert(docid, content.boxed()).unwrap();
DocumentChange::Insertion(Insertion { docid, external_document_id, new, ..}) => {
document_sender.insert(docid, external_document_id, new).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
}
}

View File

@ -77,7 +77,7 @@ where
}
}?;
let insertion = Insertion::create(docid, document);
let insertion = Insertion::create(docid, external_docid, document);
Ok(DocumentChange::Insertion(insertion))
},
))

View File

@ -136,37 +136,48 @@ pub fn merge_grenad_entries(
|_, _key| Ok(()),
)?;
}
MergerOperation::InsertDocument { docid, document } => {
MergerOperation::InsertDocument { docid, external_id, document } => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "insert_document");
let _entered = span.enter();
documents_ids.insert(docid);
sender.documents().uncompressed(docid, &document).unwrap();
sender.documents().uncompressed(docid, external_id.clone(), &document).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into);
let change = match current {
Some(current) => {
DocumentChange::Update(Update::create(docid, current.boxed(), document))
}
None => DocumentChange::Insertion(Insertion::create(docid, document)),
Some(current) => DocumentChange::Update(Update::create(
docid,
external_id,
current.boxed(),
document,
)),
None => DocumentChange::Insertion(Insertion::create(
docid,
external_id,
document,
)),
};
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
}
MergerOperation::DeleteDocument { docid } => {
MergerOperation::DeleteDocument { docid, external_id } => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "delete_document");
let _entered = span.enter();
if !documents_ids.remove(docid) {
unreachable!("Tried deleting a document that we do not know about");
}
sender.documents().delete(docid).unwrap();
sender.documents().delete(docid, external_id.clone()).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.document(rtxn, docid)?;
let change = DocumentChange::Deletion(Deletion::create(docid, current.boxed()));
let change = DocumentChange::Deletion(Deletion::create(
docid,
external_id,
current.boxed(),
));
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
}