mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 03:55:07 +08:00
Merge #4057
4057: Fix stats delete by filter for v1.3.4 r=irevoire a=curquiza Fixes https://github.com/meilisearch/meilisearch/issues/4018 for v1.3.4 Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
8822ca234e
@ -67,10 +67,6 @@ pub(crate) enum Batch {
|
|||||||
op: IndexOperation,
|
op: IndexOperation,
|
||||||
must_create_index: bool,
|
must_create_index: bool,
|
||||||
},
|
},
|
||||||
IndexDocumentDeletionByFilter {
|
|
||||||
index_uid: String,
|
|
||||||
task: Task,
|
|
||||||
},
|
|
||||||
IndexCreation {
|
IndexCreation {
|
||||||
index_uid: String,
|
index_uid: String,
|
||||||
primary_key: Option<String>,
|
primary_key: Option<String>,
|
||||||
@ -114,6 +110,10 @@ pub(crate) enum IndexOperation {
|
|||||||
documents: Vec<Vec<String>>,
|
documents: Vec<Vec<String>>,
|
||||||
tasks: Vec<Task>,
|
tasks: Vec<Task>,
|
||||||
},
|
},
|
||||||
|
IndexDocumentDeletionByFilter {
|
||||||
|
index_uid: String,
|
||||||
|
task: Task,
|
||||||
|
},
|
||||||
DocumentClear {
|
DocumentClear {
|
||||||
index_uid: String,
|
index_uid: String,
|
||||||
tasks: Vec<Task>,
|
tasks: Vec<Task>,
|
||||||
@ -155,7 +155,6 @@ impl Batch {
|
|||||||
| Batch::TaskDeletion(task)
|
| Batch::TaskDeletion(task)
|
||||||
| Batch::Dump(task)
|
| Batch::Dump(task)
|
||||||
| Batch::IndexCreation { task, .. }
|
| Batch::IndexCreation { task, .. }
|
||||||
| Batch::IndexDocumentDeletionByFilter { task, .. }
|
|
||||||
| Batch::IndexUpdate { task, .. } => vec![task.uid],
|
| Batch::IndexUpdate { task, .. } => vec![task.uid],
|
||||||
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => {
|
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => {
|
||||||
tasks.iter().map(|task| task.uid).collect()
|
tasks.iter().map(|task| task.uid).collect()
|
||||||
@ -167,6 +166,7 @@ impl Batch {
|
|||||||
| IndexOperation::DocumentClear { tasks, .. } => {
|
| IndexOperation::DocumentClear { tasks, .. } => {
|
||||||
tasks.iter().map(|task| task.uid).collect()
|
tasks.iter().map(|task| task.uid).collect()
|
||||||
}
|
}
|
||||||
|
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid],
|
||||||
IndexOperation::SettingsAndDocumentOperation {
|
IndexOperation::SettingsAndDocumentOperation {
|
||||||
document_import_tasks: tasks,
|
document_import_tasks: tasks,
|
||||||
settings_tasks: other,
|
settings_tasks: other,
|
||||||
@ -194,8 +194,7 @@ impl Batch {
|
|||||||
IndexOperation { op, .. } => Some(op.index_uid()),
|
IndexOperation { op, .. } => Some(op.index_uid()),
|
||||||
IndexCreation { index_uid, .. }
|
IndexCreation { index_uid, .. }
|
||||||
| IndexUpdate { index_uid, .. }
|
| IndexUpdate { index_uid, .. }
|
||||||
| IndexDeletion { index_uid, .. }
|
| IndexDeletion { index_uid, .. } => Some(index_uid),
|
||||||
| IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -205,6 +204,7 @@ impl IndexOperation {
|
|||||||
match self {
|
match self {
|
||||||
IndexOperation::DocumentOperation { index_uid, .. }
|
IndexOperation::DocumentOperation { index_uid, .. }
|
||||||
| IndexOperation::DocumentDeletion { index_uid, .. }
|
| IndexOperation::DocumentDeletion { index_uid, .. }
|
||||||
|
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
|
||||||
| IndexOperation::DocumentClear { index_uid, .. }
|
| IndexOperation::DocumentClear { index_uid, .. }
|
||||||
| IndexOperation::Settings { index_uid, .. }
|
| IndexOperation::Settings { index_uid, .. }
|
||||||
| IndexOperation::DocumentClearAndSetting { index_uid, .. }
|
| IndexOperation::DocumentClearAndSetting { index_uid, .. }
|
||||||
@ -239,9 +239,12 @@ impl IndexScheduler {
|
|||||||
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
match &task.kind {
|
match &task.kind {
|
||||||
KindWithContent::DocumentDeletionByFilter { index_uid, .. } => {
|
KindWithContent::DocumentDeletionByFilter { index_uid, .. } => {
|
||||||
Ok(Some(Batch::IndexDocumentDeletionByFilter {
|
Ok(Some(Batch::IndexOperation {
|
||||||
|
op: IndexOperation::IndexDocumentDeletionByFilter {
|
||||||
index_uid: index_uid.clone(),
|
index_uid: index_uid.clone(),
|
||||||
task,
|
task,
|
||||||
|
},
|
||||||
|
must_create_index: false,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
@ -891,51 +894,6 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
Ok(tasks)
|
Ok(tasks)
|
||||||
}
|
}
|
||||||
Batch::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
|
|
||||||
let (index_uid, filter) =
|
|
||||||
if let KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } =
|
|
||||||
&task.kind
|
|
||||||
{
|
|
||||||
(index_uid, filter_expr)
|
|
||||||
} else {
|
|
||||||
unreachable!()
|
|
||||||
};
|
|
||||||
let index = {
|
|
||||||
let rtxn = self.env.read_txn()?;
|
|
||||||
self.index_mapper.index(&rtxn, index_uid)?
|
|
||||||
};
|
|
||||||
let deleted_documents = delete_document_by_filter(filter, index);
|
|
||||||
let original_filter = if let Some(Details::DocumentDeletionByFilter {
|
|
||||||
original_filter,
|
|
||||||
deleted_documents: _,
|
|
||||||
}) = task.details
|
|
||||||
{
|
|
||||||
original_filter
|
|
||||||
} else {
|
|
||||||
// In the case of a `documentDeleteByFilter` the details MUST be set
|
|
||||||
unreachable!();
|
|
||||||
};
|
|
||||||
|
|
||||||
match deleted_documents {
|
|
||||||
Ok(deleted_documents) => {
|
|
||||||
task.status = Status::Succeeded;
|
|
||||||
task.details = Some(Details::DocumentDeletionByFilter {
|
|
||||||
original_filter,
|
|
||||||
deleted_documents: Some(deleted_documents),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
task.status = Status::Failed;
|
|
||||||
task.details = Some(Details::DocumentDeletionByFilter {
|
|
||||||
original_filter,
|
|
||||||
deleted_documents: Some(0),
|
|
||||||
});
|
|
||||||
task.error = Some(e.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(vec![task])
|
|
||||||
}
|
|
||||||
Batch::IndexCreation { index_uid, primary_key, task } => {
|
Batch::IndexCreation { index_uid, primary_key, task } => {
|
||||||
let wtxn = self.env.write_txn()?;
|
let wtxn = self.env.write_txn()?;
|
||||||
if self.index_mapper.exists(&wtxn, &index_uid)? {
|
if self.index_mapper.exists(&wtxn, &index_uid)? {
|
||||||
@ -1292,6 +1250,47 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
Ok(tasks)
|
Ok(tasks)
|
||||||
}
|
}
|
||||||
|
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
|
||||||
|
let filter =
|
||||||
|
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
|
||||||
|
&task.kind
|
||||||
|
{
|
||||||
|
filter_expr
|
||||||
|
} else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
let deleted_documents = delete_document_by_filter(index_wtxn, filter, index);
|
||||||
|
let original_filter = if let Some(Details::DocumentDeletionByFilter {
|
||||||
|
original_filter,
|
||||||
|
deleted_documents: _,
|
||||||
|
}) = task.details
|
||||||
|
{
|
||||||
|
original_filter
|
||||||
|
} else {
|
||||||
|
// In the case of a `documentDeleteByFilter` the details MUST be set
|
||||||
|
unreachable!();
|
||||||
|
};
|
||||||
|
|
||||||
|
match deleted_documents {
|
||||||
|
Ok(deleted_documents) => {
|
||||||
|
task.status = Status::Succeeded;
|
||||||
|
task.details = Some(Details::DocumentDeletionByFilter {
|
||||||
|
original_filter,
|
||||||
|
deleted_documents: Some(deleted_documents),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
task.status = Status::Failed;
|
||||||
|
task.details = Some(Details::DocumentDeletionByFilter {
|
||||||
|
original_filter,
|
||||||
|
deleted_documents: Some(0),
|
||||||
|
});
|
||||||
|
task.error = Some(e.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(vec![task])
|
||||||
|
}
|
||||||
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
|
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
|
||||||
let indexer_config = self.index_mapper.indexer_config();
|
let indexer_config = self.index_mapper.indexer_config();
|
||||||
let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config);
|
let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config);
|
||||||
@ -1491,23 +1490,22 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete_document_by_filter(filter: &serde_json::Value, index: Index) -> Result<u64> {
|
fn delete_document_by_filter<'a>(
|
||||||
|
wtxn: &mut RwTxn<'a, '_>,
|
||||||
|
filter: &serde_json::Value,
|
||||||
|
index: &'a Index,
|
||||||
|
) -> Result<u64> {
|
||||||
let filter = Filter::from_json(filter)?;
|
let filter = Filter::from_json(filter)?;
|
||||||
Ok(if let Some(filter) = filter {
|
Ok(if let Some(filter) = filter {
|
||||||
let mut wtxn = index.write_txn()?;
|
let candidates = filter.evaluate(wtxn, index).map_err(|err| match err {
|
||||||
|
|
||||||
let candidates = filter.evaluate(&wtxn, &index).map_err(|err| match err {
|
|
||||||
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
||||||
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
|
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
|
||||||
}
|
}
|
||||||
e => e.into(),
|
e => e.into(),
|
||||||
})?;
|
})?;
|
||||||
let mut delete_operation = DeleteDocuments::new(&mut wtxn, &index)?;
|
let mut delete_operation = DeleteDocuments::new(wtxn, index)?;
|
||||||
delete_operation.delete_documents(&candidates);
|
delete_operation.delete_documents(&candidates);
|
||||||
let deleted_documents =
|
delete_operation.execute().map(|result| result.deleted_documents)?
|
||||||
delete_operation.execute().map(|result| result.deleted_documents)?;
|
|
||||||
wtxn.commit()?;
|
|
||||||
deleted_documents
|
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
})
|
})
|
||||||
|
@ -154,6 +154,19 @@ async fn delete_document_by_filter() {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
index.wait_task(1).await;
|
index.wait_task(1).await;
|
||||||
|
|
||||||
|
let (stats, _) = index.stats().await;
|
||||||
|
snapshot!(json_string!(stats), @r###"
|
||||||
|
{
|
||||||
|
"numberOfDocuments": 4,
|
||||||
|
"isIndexing": false,
|
||||||
|
"fieldDistribution": {
|
||||||
|
"color": 3,
|
||||||
|
"id": 4
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
let (response, code) =
|
let (response, code) =
|
||||||
index.delete_document_by_filter(json!({ "filter": "color = blue"})).await;
|
index.delete_document_by_filter(json!({ "filter": "color = blue"})).await;
|
||||||
snapshot!(code, @"202 Accepted");
|
snapshot!(code, @"202 Accepted");
|
||||||
@ -188,6 +201,18 @@ async fn delete_document_by_filter() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
|
|
||||||
|
let (stats, _) = index.stats().await;
|
||||||
|
snapshot!(json_string!(stats), @r###"
|
||||||
|
{
|
||||||
|
"numberOfDocuments": 2,
|
||||||
|
"isIndexing": false,
|
||||||
|
"fieldDistribution": {
|
||||||
|
"color": 1,
|
||||||
|
"id": 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||||
snapshot!(code, @"200 OK");
|
snapshot!(code, @"200 OK");
|
||||||
snapshot!(json_string!(documents), @r###"
|
snapshot!(json_string!(documents), @r###"
|
||||||
@ -241,6 +266,18 @@ async fn delete_document_by_filter() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
|
|
||||||
|
let (stats, _) = index.stats().await;
|
||||||
|
snapshot!(json_string!(stats), @r###"
|
||||||
|
{
|
||||||
|
"numberOfDocuments": 1,
|
||||||
|
"isIndexing": false,
|
||||||
|
"fieldDistribution": {
|
||||||
|
"color": 1,
|
||||||
|
"id": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||||
snapshot!(code, @"200 OK");
|
snapshot!(code, @"200 OK");
|
||||||
snapshot!(json_string!(documents), @r###"
|
snapshot!(json_string!(documents), @r###"
|
||||||
|
Loading…
Reference in New Issue
Block a user