From c534a1b68764005018fceb767ec737a4dcc21784 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 25 Oct 2023 13:41:11 +0200 Subject: [PATCH] Stop using delete documents pipeline in batch runner --- index-scheduler/src/batch.rs | 68 ++++++++++++++----------- milli/src/update/index_documents/mod.rs | 2 + 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e2cc4281..a4b7e5c45 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -30,8 +30,7 @@ use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::{ - DeleteDocuments, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, - Settings as MilliSettings, + IndexDocumentsConfig, IndexDocumentsMethod, Settings as MilliSettings, }; use meilisearch_types::milli::{self, Filter, BEU32}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; @@ -1238,7 +1237,8 @@ impl IndexScheduler { let (new_builder, user_result) = builder.remove_documents(document_ids)?; builder = new_builder; - + // Uses Invariant: remove documents actually always returns Ok for the inner result + let count = user_result.unwrap(); let provided_ids = if let Some(Details::DocumentDeletion { provided_ids, .. }) = task.details @@ -1249,23 +1249,11 @@ impl IndexScheduler { unreachable!(); }; - match user_result { - Ok(count) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentDeletion { - provided_ids, - deleted_documents: Some(count), - }); - } - Err(e) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentDeletion { - provided_ids, - deleted_documents: Some(0), - }); - task.error = Some(milli::Error::from(e).into()); - } - } + task.status = Status::Succeeded; + task.details = Some(Details::DocumentDeletion { + provided_ids, + deleted_documents: Some(count), + }); } } } @@ -1288,21 +1276,42 @@ impl IndexScheduler { Ok(tasks) } IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => { - let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?; - documents.iter().flatten().for_each(|id| { - builder.delete_external_id(id); - }); + let indexer_config = self.index_mapper.indexer_config(); + let config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }; + let must_stop_processing = self.must_stop_processing.clone(); - let DocumentDeletionResult { deleted_documents, .. } = builder.execute()?; + let mut builder = milli::update::IndexDocuments::new( + index_wtxn, + index, + indexer_config, + config, + |indexing_step| debug!("update: {:?}", indexing_step), + || must_stop_processing.get(), + )?; + + let document_ids = documents.iter().cloned().flatten().collect(); + + let (new_builder, user_result) = builder.remove_documents(document_ids)?; + builder = new_builder; + // Uses Invariant: remove documents actually always returns Ok for the inner result + let count = user_result.unwrap(); for (task, documents) in tasks.iter_mut().zip(documents) { task.status = Status::Succeeded; task.details = Some(Details::DocumentDeletion { provided_ids: documents.len(), - deleted_documents: Some(deleted_documents.min(documents.len() as u64)), + deleted_documents: Some(count.min(documents.len() as u64)), }); } + if !tasks.iter().all(|res| res.error.is_some()) { + let addition = builder.execute()?; + info!("document deletion done: {:?}", addition); + } + Ok(tasks) } IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { @@ -1558,9 +1567,10 @@ fn delete_document_by_filter<'a>( } e => e.into(), })?; - let mut delete_operation = DeleteDocuments::new(wtxn, index)?; - delete_operation.delete_documents(&candidates); - delete_operation.execute().map(|result| result.deleted_documents)? + todo!("need a way to get back the external ids from the internal ids"); + // let mut delete_operation = DeleteDocuments::new(wtxn, index)?; + // delete_operation.delete_documents(&candidates); + // delete_operation.execute().map(|result| result.deleted_documents)? } else { 0 }) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 0b000da06..c8481bd48 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -180,6 +180,7 @@ where // Early return when there is no document to add if to_delete.is_empty() { + // Maintains Invariant: remove documents actually always returns Ok for the inner result return Ok((self, Ok(0))); } @@ -192,6 +193,7 @@ where self.deleted_documents += deleted_documents; + // Maintains Invariant: remove documents actually always returns Ok for the inner result Ok((self, Ok(deleted_documents))) }