diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index 66f4d904d..10f480d12 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -54,7 +54,8 @@ pub(crate) enum Batch { #[derive(Debug)] pub(crate) enum DocumentOperation { - Add(Uuid), + Replace(Uuid), + Update(Uuid), Delete(Vec), } @@ -253,7 +254,7 @@ impl IndexScheduler { _ => unreachable!(), } } - BatchKind::DocumentOperation { method, operation_ids, .. } => { + BatchKind::DocumentOperation { operation_ids, .. } => { let tasks = self.queue.get_existing_tasks_for_processing_batch( rtxn, current_batch, @@ -275,9 +276,17 @@ impl IndexScheduler { for task in tasks.iter() { match task.kind { - KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => { - operations.push(DocumentOperation::Add(content_file)); - } + KindWithContent::DocumentAdditionOrUpdate { + content_file, method, .. + } => match method { + IndexDocumentsMethod::ReplaceDocuments => { + operations.push(DocumentOperation::Replace(content_file)) + } + IndexDocumentsMethod::UpdateDocuments => { + operations.push(DocumentOperation::Update(content_file)) + } + _ => unreachable!("Unknown document merging method"), + }, KindWithContent::DocumentDeletion { ref documents_ids, .. } => { operations.push(DocumentOperation::Delete(documents_ids.clone())); } @@ -289,7 +298,6 @@ impl IndexScheduler { op: IndexOperation::DocumentOperation { index_uid, primary_key, - method, operations, tasks, }, diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index eff3740a0..630ab62e4 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -62,23 +62,21 @@ impl IndexScheduler { Ok(tasks) } - IndexOperation::DocumentOperation { - index_uid, - primary_key, - method, - operations, - mut tasks, - } => { + IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => { progress.update_progress(DocumentOperationProgress::RetrievingConfig); // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // this is made difficult by the fact we're doing private clones of the index scheduler and sending it // to a fresh thread. let mut content_files = Vec::new(); for operation in &operations { - if let DocumentOperation::Add(content_uuid) = operation { - let content_file = self.queue.file_store.get_update(*content_uuid)?; - let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; - content_files.push(mmap); + match operation { + DocumentOperation::Replace(content_uuid) + | DocumentOperation::Update(content_uuid) => { + let content_file = self.queue.file_store.get_update(*content_uuid)?; + let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; + content_files.push(mmap); + } + _ => (), } } @@ -87,17 +85,23 @@ impl IndexScheduler { let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut content_files_iter = content_files.iter(); - let mut indexer = indexer::DocumentOperation::new(method); + let mut indexer = indexer::DocumentOperation::new(); let embedders = index .embedding_configs(index_wtxn) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; let embedders = self.embedders(index_uid.clone(), embedders)?; for operation in operations { match operation { - DocumentOperation::Add(_content_uuid) => { + DocumentOperation::Replace(_content_uuid) => { let mmap = content_files_iter.next().unwrap(); indexer - .add_documents(mmap) + .replace_documents(mmap) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; + } + DocumentOperation::Update(_content_uuid) => { + let mmap = content_files_iter.next().unwrap(); + indexer + .update_documents(mmap) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; } DocumentOperation::Delete(document_ids) => { diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 8216742ec..6bf0432c4 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -23,6 +23,7 @@ use crate::update::new::{Deletion, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; +#[derive(Default)] pub struct DocumentOperation<'pl> { operations: Vec>, method: MergeMethod,