Make sure we correctly mix different document operations

This commit is contained in:
Kerollmops 2025-01-28 12:03:04 +01:00
parent d018346f18
commit 8e6893ddbe
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 33 additions and 20 deletions

View File

@ -54,7 +54,8 @@ pub(crate) enum Batch {
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum DocumentOperation { pub(crate) enum DocumentOperation {
Add(Uuid), Replace(Uuid),
Update(Uuid),
Delete(Vec<String>), Delete(Vec<String>),
} }
@ -253,7 +254,7 @@ impl IndexScheduler {
_ => unreachable!(), _ => unreachable!(),
} }
} }
BatchKind::DocumentOperation { method, operation_ids, .. } => { BatchKind::DocumentOperation { operation_ids, .. } => {
let tasks = self.queue.get_existing_tasks_for_processing_batch( let tasks = self.queue.get_existing_tasks_for_processing_batch(
rtxn, rtxn,
current_batch, current_batch,
@ -275,9 +276,17 @@ impl IndexScheduler {
for task in tasks.iter() { for task in tasks.iter() {
match task.kind { match task.kind {
KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => { KindWithContent::DocumentAdditionOrUpdate {
operations.push(DocumentOperation::Add(content_file)); content_file, method, ..
} } => match method {
IndexDocumentsMethod::ReplaceDocuments => {
operations.push(DocumentOperation::Replace(content_file))
}
IndexDocumentsMethod::UpdateDocuments => {
operations.push(DocumentOperation::Update(content_file))
}
_ => unreachable!("Unknown document merging method"),
},
KindWithContent::DocumentDeletion { ref documents_ids, .. } => { KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
operations.push(DocumentOperation::Delete(documents_ids.clone())); operations.push(DocumentOperation::Delete(documents_ids.clone()));
} }
@ -289,7 +298,6 @@ impl IndexScheduler {
op: IndexOperation::DocumentOperation { op: IndexOperation::DocumentOperation {
index_uid, index_uid,
primary_key, primary_key,
method,
operations, operations,
tasks, tasks,
}, },

View File

@ -62,23 +62,21 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentOperation { IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
index_uid,
primary_key,
method,
operations,
mut tasks,
} => {
progress.update_progress(DocumentOperationProgress::RetrievingConfig); progress.update_progress(DocumentOperationProgress::RetrievingConfig);
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it // this is made difficult by the fact we're doing private clones of the index scheduler and sending it
// to a fresh thread. // to a fresh thread.
let mut content_files = Vec::new(); let mut content_files = Vec::new();
for operation in &operations { for operation in &operations {
if let DocumentOperation::Add(content_uuid) = operation { match operation {
let content_file = self.queue.file_store.get_update(*content_uuid)?; DocumentOperation::Replace(content_uuid)
let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; | DocumentOperation::Update(content_uuid) => {
content_files.push(mmap); let content_file = self.queue.file_store.get_update(*content_uuid)?;
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
content_files.push(mmap);
}
_ => (),
} }
} }
@ -87,17 +85,23 @@ impl IndexScheduler {
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let mut content_files_iter = content_files.iter(); let mut content_files_iter = content_files.iter();
let mut indexer = indexer::DocumentOperation::new(method); let mut indexer = indexer::DocumentOperation::new();
let embedders = index let embedders = index
.embedding_configs(index_wtxn) .embedding_configs(index_wtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
let embedders = self.embedders(index_uid.clone(), embedders)?; let embedders = self.embedders(index_uid.clone(), embedders)?;
for operation in operations { for operation in operations {
match operation { match operation {
DocumentOperation::Add(_content_uuid) => { DocumentOperation::Replace(_content_uuid) => {
let mmap = content_files_iter.next().unwrap(); let mmap = content_files_iter.next().unwrap();
indexer indexer
.add_documents(mmap) .replace_documents(mmap)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
}
DocumentOperation::Update(_content_uuid) => {
let mmap = content_files_iter.next().unwrap();
indexer
.update_documents(mmap)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
} }
DocumentOperation::Delete(document_ids) => { DocumentOperation::Delete(document_ids) => {

View File

@ -23,6 +23,7 @@ use crate::update::new::{Deletion, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
#[derive(Default)]
pub struct DocumentOperation<'pl> { pub struct DocumentOperation<'pl> {
operations: Vec<Payload<'pl>>, operations: Vec<Payload<'pl>>,
method: MergeMethod, method: MergeMethod,