diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 2948e7506..34f8ca135 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -67,10 +67,6 @@ pub(crate) enum Batch { op: IndexOperation, must_create_index: bool, }, - IndexDocumentDeletionByFilter { - index_uid: String, - task: Task, - }, IndexCreation { index_uid: String, primary_key: Option, @@ -114,6 +110,10 @@ pub(crate) enum IndexOperation { documents: Vec>, tasks: Vec, }, + IndexDocumentDeletionByFilter { + index_uid: String, + task: Task, + }, DocumentClear { index_uid: String, tasks: Vec, @@ -155,7 +155,6 @@ impl Batch { | Batch::TaskDeletion(task) | Batch::Dump(task) | Batch::IndexCreation { task, .. } - | Batch::IndexDocumentDeletionByFilter { task, .. } | Batch::IndexUpdate { task, .. } => vec![task.uid], Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => { tasks.iter().map(|task| task.uid).collect() @@ -167,6 +166,7 @@ impl Batch { | IndexOperation::DocumentClear { tasks, .. } => { tasks.iter().map(|task| task.uid).collect() } + IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid], IndexOperation::SettingsAndDocumentOperation { document_import_tasks: tasks, settings_tasks: other, @@ -194,8 +194,7 @@ impl Batch { IndexOperation { op, .. } => Some(op.index_uid()), IndexCreation { index_uid, .. } | IndexUpdate { index_uid, .. } - | IndexDeletion { index_uid, .. } - | IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid), + | IndexDeletion { index_uid, .. } => Some(index_uid), } } } @@ -205,6 +204,7 @@ impl IndexOperation { match self { IndexOperation::DocumentOperation { index_uid, .. } | IndexOperation::DocumentDeletion { index_uid, .. } + | IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::Settings { index_uid, .. } | IndexOperation::DocumentClearAndSetting { index_uid, .. } @@ -239,9 +239,12 @@ impl IndexScheduler { let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; match &task.kind { KindWithContent::DocumentDeletionByFilter { index_uid, .. } => { - Ok(Some(Batch::IndexDocumentDeletionByFilter { - index_uid: index_uid.clone(), - task, + Ok(Some(Batch::IndexOperation { + op: IndexOperation::IndexDocumentDeletionByFilter { + index_uid: index_uid.clone(), + task, + }, + must_create_index: false, })) } _ => unreachable!(), @@ -891,51 +894,6 @@ impl IndexScheduler { Ok(tasks) } - Batch::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { - let (index_uid, filter) = - if let KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } = - &task.kind - { - (index_uid, filter_expr) - } else { - unreachable!() - }; - let index = { - let rtxn = self.env.read_txn()?; - self.index_mapper.index(&rtxn, index_uid)? - }; - let deleted_documents = delete_document_by_filter(filter, index); - let original_filter = if let Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: _, - }) = task.details - { - original_filter - } else { - // In the case of a `documentDeleteByFilter` the details MUST be set - unreachable!(); - }; - - match deleted_documents { - Ok(deleted_documents) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: Some(deleted_documents), - }); - } - Err(e) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: Some(0), - }); - task.error = Some(e.into()); - } - } - - Ok(vec![task]) - } Batch::IndexCreation { index_uid, primary_key, task } => { let wtxn = self.env.write_txn()?; if self.index_mapper.exists(&wtxn, &index_uid)? { @@ -1292,6 +1250,47 @@ impl IndexScheduler { Ok(tasks) } + IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { + let filter = + if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } = + &task.kind + { + filter_expr + } else { + unreachable!() + }; + let deleted_documents = delete_document_by_filter(index_wtxn, filter, index); + let original_filter = if let Some(Details::DocumentDeletionByFilter { + original_filter, + deleted_documents: _, + }) = task.details + { + original_filter + } else { + // In the case of a `documentDeleteByFilter` the details MUST be set + unreachable!(); + }; + + match deleted_documents { + Ok(deleted_documents) => { + task.status = Status::Succeeded; + task.details = Some(Details::DocumentDeletionByFilter { + original_filter, + deleted_documents: Some(deleted_documents), + }); + } + Err(e) => { + task.status = Status::Failed; + task.details = Some(Details::DocumentDeletionByFilter { + original_filter, + deleted_documents: Some(0), + }); + task.error = Some(e.into()); + } + } + + Ok(vec![task]) + } IndexOperation::Settings { index_uid: _, settings, mut tasks } => { let indexer_config = self.index_mapper.indexer_config(); let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); @@ -1491,23 +1490,22 @@ impl IndexScheduler { } } -fn delete_document_by_filter(filter: &serde_json::Value, index: Index) -> Result { +fn delete_document_by_filter<'a>( + wtxn: &mut RwTxn<'a, '_>, + filter: &serde_json::Value, + index: &'a Index, +) -> Result { let filter = Filter::from_json(filter)?; Ok(if let Some(filter) = filter { - let mut wtxn = index.write_txn()?; - - let candidates = filter.evaluate(&wtxn, &index).map_err(|err| match err { + let candidates = filter.evaluate(wtxn, index).map_err(|err| match err { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) } e => e.into(), })?; - let mut delete_operation = DeleteDocuments::new(&mut wtxn, &index)?; + let mut delete_operation = DeleteDocuments::new(wtxn, index)?; delete_operation.delete_documents(&candidates); - let deleted_documents = - delete_operation.execute().map(|result| result.deleted_documents)?; - wtxn.commit()?; - deleted_documents + delete_operation.execute().map(|result| result.deleted_documents)? } else { 0 }) diff --git a/meilisearch/tests/documents/delete_documents.rs b/meilisearch/tests/documents/delete_documents.rs index 8f6ae1985..c2704dd1a 100644 --- a/meilisearch/tests/documents/delete_documents.rs +++ b/meilisearch/tests/documents/delete_documents.rs @@ -154,6 +154,19 @@ async fn delete_document_by_filter() { ) .await; index.wait_task(1).await; + + let (stats, _) = index.stats().await; + snapshot!(json_string!(stats), @r###" + { + "numberOfDocuments": 4, + "isIndexing": false, + "fieldDistribution": { + "color": 3, + "id": 4 + } + } + "###); + let (response, code) = index.delete_document_by_filter(json!({ "filter": "color = blue"})).await; snapshot!(code, @"202 Accepted"); @@ -188,6 +201,18 @@ async fn delete_document_by_filter() { } "###); + let (stats, _) = index.stats().await; + snapshot!(json_string!(stats), @r###" + { + "numberOfDocuments": 2, + "isIndexing": false, + "fieldDistribution": { + "color": 1, + "id": 2 + } + } + "###); + let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await; snapshot!(code, @"200 OK"); snapshot!(json_string!(documents), @r###" @@ -241,6 +266,18 @@ async fn delete_document_by_filter() { } "###); + let (stats, _) = index.stats().await; + snapshot!(json_string!(stats), @r###" + { + "numberOfDocuments": 1, + "isIndexing": false, + "fieldDistribution": { + "color": 1, + "id": 1 + } + } + "###); + let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await; snapshot!(code, @"200 OK"); snapshot!(json_string!(documents), @r###"