Do not fail the whole batch when a single document deletion by filter fails

This commit is contained in:
Tamo 2024-09-02 10:34:28 +02:00
parent 6e3839d8b6
commit e6dd66e4a0
2 changed files with 142 additions and 7 deletions

View File

@ -1441,6 +1441,8 @@ impl IndexScheduler {
for task in tasks.iter_mut() { for task in tasks.iter_mut() {
let before = to_delete.len(); let before = to_delete.len();
task.status = Status::Succeeded;
match &task.kind { match &task.kind {
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => { KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
for id in documents_ids { for id in documents_ids {
@ -1456,18 +1458,40 @@ impl IndexScheduler {
} }
KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => { KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => {
let before = to_delete.len(); let before = to_delete.len();
let filter = Filter::from_json(filter_expr)?; let filter = match Filter::from_json(filter_expr) {
Ok(filter) => filter,
Err(err) => {
// theorically, this should be catched by deserr before reaching the index-scheduler and cannot happens
task.status = Status::Failed;
task.error = match err {
milli::Error::UserError(
milli::UserError::InvalidFilterExpression { .. },
) => Some(
Error::from(err)
.with_custom_error_code(Code::InvalidDocumentFilter)
.into(),
),
e => Some(e.into()),
};
None
}
};
if let Some(filter) = filter { if let Some(filter) = filter {
let candidates = filter.evaluate(index_wtxn, index).map_err( let candidates =
|err| match err { filter.evaluate(index_wtxn, index).map_err(|err| match err {
milli::Error::UserError( milli::Error::UserError(
milli::UserError::InvalidFilter(_), milli::UserError::InvalidFilter(_),
) => Error::from(err) ) => Error::from(err)
.with_custom_error_code(Code::InvalidDocumentFilter), .with_custom_error_code(Code::InvalidDocumentFilter),
e => e.into(), e => e.into(),
}, });
)?; match candidates {
to_delete |= candidates; Ok(candidates) => to_delete |= candidates,
Err(err) => {
task.status = Status::Failed;
task.error = Some(err.into());
}
};
} }
let will_be_removed = to_delete.len() - before; let will_be_removed = to_delete.len() - before;
if let Some(Details::DocumentDeletionByFilter { if let Some(Details::DocumentDeletionByFilter {
@ -1483,7 +1507,6 @@ impl IndexScheduler {
} }
_ => unreachable!(), _ => unreachable!(),
} }
task.status = Status::Succeeded;
} }
let config = IndexDocumentsConfig { let config = IndexDocumentsConfig {

View File

@ -1764,6 +1764,7 @@ mod tests {
use crossbeam::channel::RecvTimeoutError; use crossbeam::channel::RecvTimeoutError;
use file_store::File; use file_store::File;
use insta::assert_json_snapshot; use insta::assert_json_snapshot;
use maplit::btreeset;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use meilisearch_auth::AuthFilter; use meilisearch_auth::AuthFilter;
use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::document_formats::DocumentFormatError;
@ -2553,6 +2554,117 @@ mod tests {
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
#[test]
fn fail_in_process_batch_for_document_deletion() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
use meilisearch_types::settings::{Settings, Unchecked};
let mut new_settings: Box<Settings<Unchecked>> = Box::default();
new_settings.filterable_attributes = Setting::Set(btreeset!(S("catto")));
index_scheduler
.register(
KindWithContent::SettingsUpdate {
index_uid: S("doggos"),
new_settings,
is_deletion: false,
allow_index_creation: true,
},
None,
false,
)
.unwrap();
let content = r#"[
{ "id": 1, "doggo": "jean bob" },
{ "id": 2, "catto": "jorts" },
{ "id": 3, "doggo": "bork" }
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
},
None,
false,
)
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_setting_and_document_addition");
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_adding_the_settings");
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_adding_the_documents");
index_scheduler
.register(
KindWithContent::DocumentDeletion {
index_uid: S("doggos"),
documents_ids: vec![S("1")],
},
None,
false,
)
.unwrap();
// This one should not be catched by Meilisearch but it's still nice to handle it because if one day we break the filters it could happens
index_scheduler
.register(
KindWithContent::DocumentDeletionByFilter {
index_uid: S("doggos"),
filter_expr: serde_json::json!(true),
},
None,
false,
)
.unwrap();
// Should fail because the ids are not filterable
index_scheduler
.register(
KindWithContent::DocumentDeletionByFilter {
index_uid: S("doggos"),
filter_expr: serde_json::json!("id = 2"),
},
None,
false,
)
.unwrap();
index_scheduler
.register(
KindWithContent::DocumentDeletionByFilter {
index_uid: S("doggos"),
filter_expr: serde_json::json!("catto EXISTS"),
},
None,
false,
)
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_document_deletions");
// Everything should be batched together
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_removing_the_documents");
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents_remaining_should_only_be_bork");
}
#[test] #[test]
fn do_not_batch_task_of_different_indexes() { fn do_not_batch_task_of_different_indexes() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);