autobatch document deletion by filter

This commit is contained in:
Tamo 2024-08-29 15:56:24 +02:00
parent cd271b8762
commit 6e3839d8b6
2 changed files with 186 additions and 165 deletions

View File

@ -25,8 +25,9 @@ enum AutobatchKind {
primary_key: Option<String>,
},
DocumentEdition,
DocumentDeletion,
DocumentDeletionByFilter,
DocumentDeletion {
by_filter: bool,
},
DocumentClear,
Settings {
allow_index_creation: bool,
@ -65,10 +66,12 @@ impl From<KindWithContent> for AutobatchKind {
..
} => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key },
KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition,
KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion,
KindWithContent::DocumentDeletion { .. } => {
AutobatchKind::DocumentDeletion { by_filter: false }
}
KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear,
KindWithContent::DocumentDeletionByFilter { .. } => {
AutobatchKind::DocumentDeletionByFilter
AutobatchKind::DocumentDeletion { by_filter: true }
}
KindWithContent::SettingsUpdate { allow_index_creation, is_deletion, .. } => {
AutobatchKind::Settings {
@ -105,9 +108,7 @@ pub enum BatchKind {
},
DocumentDeletion {
deletion_ids: Vec<TaskId>,
},
DocumentDeletionByFilter {
id: TaskId,
includes_by_filter: bool,
},
ClearAndSettings {
other: Vec<TaskId>,
@ -205,12 +206,13 @@ impl BatchKind {
allow_index_creation,
),
K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false),
K::DocumentDeletion => {
(Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false)
}
K::DocumentDeletionByFilter => {
(Break(BatchKind::DocumentDeletionByFilter { id: task_id }), false)
}
K::DocumentDeletion { by_filter: includes_by_filter } => (
Continue(BatchKind::DocumentDeletion {
deletion_ids: vec![task_id],
includes_by_filter,
}),
false,
),
K::Settings { allow_index_creation } => (
Continue(BatchKind::Settings { allow_index_creation, settings_ids: vec![task_id] }),
allow_index_creation,
@ -228,7 +230,7 @@ impl BatchKind {
match (self, kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::DocumentDeletionByFilter) => Break(this),
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break(this),
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break(this)
@ -264,7 +266,7 @@ impl BatchKind {
// The index deletion can batch with everything but must stop after
(
BatchKind::DocumentClear { mut ids }
| BatchKind::DocumentDeletion { deletion_ids: mut ids }
| BatchKind::DocumentDeletion { deletion_ids: mut ids, includes_by_filter: _ }
| BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, operation_ids: mut ids }
| BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids },
K::IndexDeletion,
@ -284,7 +286,7 @@ impl BatchKind {
(
BatchKind::DocumentClear { mut ids },
K::DocumentClear | K::DocumentDeletion,
K::DocumentClear | K::DocumentDeletion { by_filter: _ },
) => {
ids.push(id);
Continue(BatchKind::DocumentClear { ids })
@ -328,7 +330,7 @@ impl BatchKind {
}
(
BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids },
K::DocumentDeletion,
K::DocumentDeletion { by_filter: false },
) => {
operation_ids.push(id);
@ -339,6 +341,13 @@ impl BatchKind {
operation_ids,
})
}
// We can't batch a document operation with a delete by filter
(
this @ BatchKind::DocumentOperation { .. },
K::DocumentDeletion { by_filter: true },
) => {
Break(this)
}
// but we can't autobatch documents if it's not the same kind
// this match branch MUST be AFTER the previous one
(
@ -357,13 +366,18 @@ impl BatchKind {
operation_ids,
}),
(BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentClear) => {
(BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: _ }, K::DocumentClear) => {
deletion_ids.push(id);
Continue(BatchKind::DocumentClear { ids: deletion_ids })
}
// we can't autobatch the deletion and import if the document deletion contained a filter
(
this @ BatchKind::DocumentDeletion { deletion_ids: _, includes_by_filter: true },
K::DocumentImport { .. }
) => Break(this),
// we can autobatch the deletion and import if the index already exists
(
BatchKind::DocumentDeletion { mut deletion_ids },
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
K::DocumentImport { method, allow_index_creation, primary_key }
) if index_already_exists => {
deletion_ids.push(id);
@ -377,7 +391,7 @@ impl BatchKind {
}
// we can autobatch the deletion and import if both can't create an index
(
BatchKind::DocumentDeletion { mut deletion_ids },
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
K::DocumentImport { method, allow_index_creation, primary_key }
) if !allow_index_creation => {
deletion_ids.push(id);
@ -396,9 +410,9 @@ impl BatchKind {
) => {
Break(this)
}
(BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentDeletion) => {
(BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter }, K::DocumentDeletion { by_filter }) => {
deletion_ids.push(id);
Continue(BatchKind::DocumentDeletion { deletion_ids })
Continue(BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: includes_by_filter | by_filter })
}
(this @ BatchKind::DocumentDeletion { .. }, K::Settings { .. }) => Break(this),
@ -412,7 +426,7 @@ impl BatchKind {
}),
(
this @ BatchKind::Settings { .. },
K::DocumentImport { .. } | K::DocumentDeletion,
K::DocumentImport { .. } | K::DocumentDeletion { .. },
) => Break(this),
(
BatchKind::Settings { mut settings_ids, allow_index_creation },
@ -443,7 +457,7 @@ impl BatchKind {
settings_ids,
allow_index_creation,
},
K::DocumentDeletion,
K::DocumentDeletion { .. },
) => {
other.push(id);
Continue(BatchKind::ClearAndSettings {
@ -505,7 +519,7 @@ impl BatchKind {
// this MUST be AFTER the two previous branch
(
this @ BatchKind::SettingsAndDocumentOperation { .. },
K::DocumentDeletion | K::DocumentImport { .. },
K::DocumentDeletion { .. } | K::DocumentImport { .. },
) => Break(this),
(
BatchKind::SettingsAndDocumentOperation { mut settings_ids, method, allow_index_creation,primary_key, operation_ids },
@ -525,8 +539,7 @@ impl BatchKind {
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexSwap { .. }
| BatchKind::DocumentEdition { .. }
| BatchKind::DocumentDeletionByFilter { .. },
| BatchKind::DocumentEdition { .. },
_,
) => {
unreachable!()
@ -616,6 +629,13 @@ mod tests {
}
}
fn doc_del_fil() -> KindWithContent {
KindWithContent::DocumentDeletionByFilter {
index_uid: String::from("doggo"),
filter_expr: serde_json::json!("cuteness > 100"),
}
}
fn doc_clr() -> KindWithContent {
KindWithContent::DocumentClear { index_uid: String::from("doggo") }
}
@ -676,10 +696,16 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1, 2] }, false))");
// we can autobatch one or multiple DocumentDeletion together
debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))");
// we can autobatch one or multiple DocumentDeletionByFilter together
debug_snapshot!(autobatch_from(true, None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))");
// we can autobatch one or multiple Settings together
debug_snapshot!(autobatch_from(true, None, [settings(true)]), @"Some((Settings { allow_index_creation: true, settings_ids: [0] }, true))");
@ -722,25 +748,63 @@ mod tests {
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###);
// But we can't autobatch document addition with document deletion by filter
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
// And the other way around
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
}
#[test]
fn simple_document_operation_dont_autobatch_with_other() {
// addition, updates and deletion can't batch together
// addition, updates and deletion by filter can't batch together
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
}
#[test]
@ -807,6 +871,7 @@ mod tests {
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))");
@ -816,6 +881,7 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))");
@ -827,6 +893,7 @@ mod tests {
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))");
debug_snapshot!(autobatch_from(true, None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
@ -836,6 +903,7 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))");
debug_snapshot!(autobatch_from(false,None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
@ -901,10 +969,10 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
// batch deletion and addition
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
}
#[test]

View File

@ -110,9 +110,9 @@ pub(crate) enum IndexOperation {
index_uid: String,
task: Task,
},
IndexDocumentDeletionByFilter {
DocumentDeletion {
index_uid: String,
task: Task,
tasks: Vec<Task>,
},
DocumentClear {
index_uid: String,
@ -165,11 +165,11 @@ impl Batch {
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. }
| IndexOperation::Settings { tasks, .. }
| IndexOperation::DocumentDeletion { tasks, .. }
| IndexOperation::DocumentClear { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
IndexOperation::DocumentEdition { task, .. }
| IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
IndexOperation::DocumentEdition { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
IndexOperation::SettingsAndDocumentOperation {
@ -234,7 +234,7 @@ impl IndexOperation {
match self {
IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentEdition { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentDeletion { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. }
| IndexOperation::DocumentClearAndSetting { index_uid, .. }
@ -252,8 +252,8 @@ impl fmt::Display for IndexOperation {
IndexOperation::DocumentEdition { .. } => {
f.write_str("IndexOperation::DocumentEdition")
}
IndexOperation::IndexDocumentDeletionByFilter { .. } => {
f.write_str("IndexOperation::IndexDocumentDeletionByFilter")
IndexOperation::DocumentDeletion { .. } => {
f.write_str("IndexOperation::DocumentDeletion")
}
IndexOperation::DocumentClear { .. } => f.write_str("IndexOperation::DocumentClear"),
IndexOperation::Settings { .. } => f.write_str("IndexOperation::Settings"),
@ -289,21 +289,6 @@ impl IndexScheduler {
},
must_create_index,
})),
BatchKind::DocumentDeletionByFilter { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind {
KindWithContent::DocumentDeletionByFilter { index_uid, .. } => {
Ok(Some(Batch::IndexOperation {
op: IndexOperation::IndexDocumentDeletionByFilter {
index_uid: index_uid.clone(),
task,
},
must_create_index: false,
}))
}
_ => unreachable!(),
}
}
BatchKind::DocumentEdition { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind {
@ -366,30 +351,11 @@ impl IndexScheduler {
must_create_index,
}))
}
BatchKind::DocumentDeletion { deletion_ids } => {
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
let mut operations = Vec::with_capacity(tasks.len());
let mut documents_counts = Vec::with_capacity(tasks.len());
for task in &tasks {
match task.kind {
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
operations.push(DocumentOperation::Delete(documents_ids.clone()));
documents_counts.push(documents_ids.len() as u64);
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentOperation {
index_uid,
primary_key: None,
method: IndexDocumentsMethod::ReplaceDocuments,
documents_counts,
operations,
tasks,
},
op: IndexOperation::DocumentDeletion { index_uid, tasks },
must_create_index,
}))
}
@ -1439,7 +1405,7 @@ impl IndexScheduler {
{
(original_filter, context, function)
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
// In the case of a `documentEdition` the details MUST be set
unreachable!();
};
@ -1469,52 +1435,79 @@ impl IndexScheduler {
Ok(vec![task])
}
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
&task.kind
IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => {
let mut to_delete = RoaringBitmap::new();
let external_documents_ids = index.external_documents_ids();
for task in tasks.iter_mut() {
let before = to_delete.len();
match &task.kind {
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
for id in documents_ids {
if let Some(id) = external_documents_ids.get(index_wtxn, id)? {
to_delete.insert(id);
}
}
let will_be_removed = to_delete.len() - before;
task.details = Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
deleted_documents: Some(will_be_removed),
});
}
KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => {
let before = to_delete.len();
let filter = Filter::from_json(filter_expr)?;
if let Some(filter) = filter {
let candidates = filter.evaluate(index_wtxn, index).map_err(
|err| match err {
milli::Error::UserError(
milli::UserError::InvalidFilter(_),
) => Error::from(err)
.with_custom_error_code(Code::InvalidDocumentFilter),
e => e.into(),
},
)?;
to_delete |= candidates;
}
let will_be_removed = to_delete.len() - before;
if let Some(Details::DocumentDeletionByFilter {
original_filter: _,
deleted_documents,
}) = &mut task.details
{
filter_expr
} else {
unreachable!()
};
let deleted_documents = delete_document_by_filter(
index_wtxn,
filter,
self.index_mapper.indexer_config(),
self.must_stop_processing.clone(),
index,
);
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
}) = task.details
{
original_filter
*deleted_documents = Some(will_be_removed);
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
unreachable!()
}
}
_ => unreachable!(),
}
task.status = Status::Succeeded;
}
let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
match deleted_documents {
Ok(deleted_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(deleted_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(0),
});
task.error = Some(e.into());
}
}
let must_stop_processing = self.must_stop_processing.clone();
let mut builder = milli::update::IndexDocuments::new(
index_wtxn,
index,
self.index_mapper.indexer_config(),
config,
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
)?;
Ok(vec![task])
let (new_builder, _count) =
builder.remove_documents_from_db_no_batch(&to_delete)?;
builder = new_builder;
let _ = builder.execute()?;
Ok(tasks)
}
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
let indexer_config = self.index_mapper.indexer_config();
@ -1718,46 +1711,6 @@ impl IndexScheduler {
}
}
fn delete_document_by_filter<'a>(
wtxn: &mut RwTxn<'a>,
filter: &serde_json::Value,
indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing,
index: &'a Index,
) -> Result<u64> {
let filter = Filter::from_json(filter)?;
Ok(if let Some(filter) = filter {
let candidates = filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
}
e => e.into(),
})?;
let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder = milli::update::IndexDocuments::new(
wtxn,
index,
indexer_config,
config,
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
)?;
let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?;
builder = new_builder;
let _ = builder.execute()?;
count
} else {
0
})
}
fn edit_documents_by_function<'a>(
wtxn: &mut RwTxn<'a>,
filter: &Option<serde_json::Value>,