Fix the stats of the documents deletion by filter

The issue was that the operation « DocumentDeletionByFilter » was not
declared as an index operation. That means the indexes stats were not
reprocessed after the application of the operation.
This commit is contained in:
Tamo 2023-09-07 18:22:42 +02:00 committed by curquiza
parent 9889390d13
commit 036b846e4d
2 changed files with 98 additions and 60 deletions

View File

@ -67,10 +67,6 @@ pub(crate) enum Batch {
op: IndexOperation, op: IndexOperation,
must_create_index: bool, must_create_index: bool,
}, },
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
},
IndexCreation { IndexCreation {
index_uid: String, index_uid: String,
primary_key: Option<String>, primary_key: Option<String>,
@ -114,6 +110,10 @@ pub(crate) enum IndexOperation {
documents: Vec<Vec<String>>, documents: Vec<Vec<String>>,
tasks: Vec<Task>, tasks: Vec<Task>,
}, },
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
},
DocumentClear { DocumentClear {
index_uid: String, index_uid: String,
tasks: Vec<Task>, tasks: Vec<Task>,
@ -155,7 +155,6 @@ impl Batch {
| Batch::TaskDeletion(task) | Batch::TaskDeletion(task)
| Batch::Dump(task) | Batch::Dump(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
| Batch::IndexDocumentDeletionByFilter { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid], | Batch::IndexUpdate { task, .. } => vec![task.uid],
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => { Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect() tasks.iter().map(|task| task.uid).collect()
@ -167,6 +166,7 @@ impl Batch {
| IndexOperation::DocumentClear { tasks, .. } => { | IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect() tasks.iter().map(|task| task.uid).collect()
} }
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid],
IndexOperation::SettingsAndDocumentOperation { IndexOperation::SettingsAndDocumentOperation {
document_import_tasks: tasks, document_import_tasks: tasks,
settings_tasks: other, settings_tasks: other,
@ -194,8 +194,7 @@ impl Batch {
IndexOperation { op, .. } => Some(op.index_uid()), IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. } IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. } | IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid, .. } | IndexDeletion { index_uid, .. } => Some(index_uid),
| IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid),
} }
} }
} }
@ -205,6 +204,7 @@ impl IndexOperation {
match self { match self {
IndexOperation::DocumentOperation { index_uid, .. } IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentDeletion { index_uid, .. } | IndexOperation::DocumentDeletion { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. } | IndexOperation::Settings { index_uid, .. }
| IndexOperation::DocumentClearAndSetting { index_uid, .. } | IndexOperation::DocumentClearAndSetting { index_uid, .. }
@ -239,9 +239,12 @@ impl IndexScheduler {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind { match &task.kind {
KindWithContent::DocumentDeletionByFilter { index_uid, .. } => { KindWithContent::DocumentDeletionByFilter { index_uid, .. } => {
Ok(Some(Batch::IndexDocumentDeletionByFilter { Ok(Some(Batch::IndexOperation {
op: IndexOperation::IndexDocumentDeletionByFilter {
index_uid: index_uid.clone(), index_uid: index_uid.clone(),
task, task,
},
must_create_index: false,
})) }))
} }
_ => unreachable!(), _ => unreachable!(),
@ -891,51 +894,6 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
Batch::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let (index_uid, filter) =
if let KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } =
&task.kind
{
(index_uid, filter_expr)
} else {
unreachable!()
};
let index = {
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, index_uid)?
};
let deleted_documents = delete_document_by_filter(filter, index);
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
}) = task.details
{
original_filter
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
};
match deleted_documents {
Ok(deleted_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(deleted_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(0),
});
task.error = Some(e.into());
}
}
Ok(vec![task])
}
Batch::IndexCreation { index_uid, primary_key, task } => { Batch::IndexCreation { index_uid, primary_key, task } => {
let wtxn = self.env.write_txn()?; let wtxn = self.env.write_txn()?;
if self.index_mapper.exists(&wtxn, &index_uid)? { if self.index_mapper.exists(&wtxn, &index_uid)? {
@ -1292,6 +1250,47 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
&task.kind
{
filter_expr
} else {
unreachable!()
};
let deleted_documents = delete_document_by_filter(index_wtxn, filter, index);
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
}) = task.details
{
original_filter
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
};
match deleted_documents {
Ok(deleted_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(deleted_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(0),
});
task.error = Some(e.into());
}
}
Ok(vec![task])
}
IndexOperation::Settings { index_uid: _, settings, mut tasks } => { IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config);
@ -1491,22 +1490,24 @@ impl IndexScheduler {
} }
} }
fn delete_document_by_filter(filter: &serde_json::Value, index: Index) -> Result<u64> { fn delete_document_by_filter<'a>(
wtxn: &mut RwTxn<'a, '_>,
filter: &serde_json::Value,
index: &'a Index,
) -> Result<u64> {
let filter = Filter::from_json(filter)?; let filter = Filter::from_json(filter)?;
Ok(if let Some(filter) = filter { Ok(if let Some(filter) = filter {
let mut wtxn = index.write_txn()?;
let candidates = filter.evaluate(&wtxn, &index).map_err(|err| match err { let candidates = filter.evaluate(&wtxn, &index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
} }
e => e.into(), e => e.into(),
})?; })?;
let mut delete_operation = DeleteDocuments::new(&mut wtxn, &index)?; let mut delete_operation = DeleteDocuments::new(wtxn, index)?;
delete_operation.delete_documents(&candidates); delete_operation.delete_documents(&candidates);
let deleted_documents = let deleted_documents =
delete_operation.execute().map(|result| result.deleted_documents)?; delete_operation.execute().map(|result| result.deleted_documents)?;
wtxn.commit()?;
deleted_documents deleted_documents
} else { } else {
0 0

View File

@ -154,6 +154,19 @@ async fn delete_document_by_filter() {
) )
.await; .await;
index.wait_task(1).await; index.wait_task(1).await;
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
{
"numberOfDocuments": 4,
"isIndexing": false,
"fieldDistribution": {
"color": 3,
"id": 4
}
}
"###);
let (response, code) = let (response, code) =
index.delete_document_by_filter(json!({ "filter": "color = blue"})).await; index.delete_document_by_filter(json!({ "filter": "color = blue"})).await;
snapshot!(code, @"202 Accepted"); snapshot!(code, @"202 Accepted");
@ -188,6 +201,18 @@ async fn delete_document_by_filter() {
} }
"###); "###);
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
{
"numberOfDocuments": 2,
"isIndexing": false,
"fieldDistribution": {
"color": 1,
"id": 2
}
}
"###);
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await; let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!(json_string!(documents), @r###" snapshot!(json_string!(documents), @r###"
@ -241,6 +266,18 @@ async fn delete_document_by_filter() {
} }
"###); "###);
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
{
"numberOfDocuments": 1,
"isIndexing": false,
"fieldDistribution": {
"color": 1,
"id": 1
}
}
"###);
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await; let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!(json_string!(documents), @r###" snapshot!(json_string!(documents), @r###"