From 6e3839d8b64c1e5c4f1a45837d24e02baef93f44 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 29 Aug 2024 15:56:24 +0200 Subject: [PATCH] autobatch document deletion by filter --- index-scheduler/src/autobatcher.rs | 146 ++++++++++++++------ index-scheduler/src/batch.rs | 205 +++++++++++------------------ 2 files changed, 186 insertions(+), 165 deletions(-) diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 96201bebb..0f6aa8a3a 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -25,8 +25,9 @@ enum AutobatchKind { primary_key: Option, }, DocumentEdition, - DocumentDeletion, - DocumentDeletionByFilter, + DocumentDeletion { + by_filter: bool, + }, DocumentClear, Settings { allow_index_creation: bool, @@ -65,10 +66,12 @@ impl From for AutobatchKind { .. } => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key }, KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition, - KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion, + KindWithContent::DocumentDeletion { .. } => { + AutobatchKind::DocumentDeletion { by_filter: false } + } KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear, KindWithContent::DocumentDeletionByFilter { .. } => { - AutobatchKind::DocumentDeletionByFilter + AutobatchKind::DocumentDeletion { by_filter: true } } KindWithContent::SettingsUpdate { allow_index_creation, is_deletion, .. } => { AutobatchKind::Settings { @@ -105,9 +108,7 @@ pub enum BatchKind { }, DocumentDeletion { deletion_ids: Vec, - }, - DocumentDeletionByFilter { - id: TaskId, + includes_by_filter: bool, }, ClearAndSettings { other: Vec, @@ -205,12 +206,13 @@ impl BatchKind { allow_index_creation, ), K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false), - K::DocumentDeletion => { - (Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false) - } - K::DocumentDeletionByFilter => { - (Break(BatchKind::DocumentDeletionByFilter { id: task_id }), false) - } + K::DocumentDeletion { by_filter: includes_by_filter } => ( + Continue(BatchKind::DocumentDeletion { + deletion_ids: vec![task_id], + includes_by_filter, + }), + false, + ), K::Settings { allow_index_creation } => ( Continue(BatchKind::Settings { allow_index_creation, settings_ids: vec![task_id] }), allow_index_creation, @@ -228,7 +230,7 @@ impl BatchKind { match (self, kind) { // We don't batch any of these operations - (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::DocumentDeletionByFilter) => Break(this), + (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break(this), // We must not batch tasks that don't have the same index creation rights if the index doesn't already exists. (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { Break(this) @@ -264,7 +266,7 @@ impl BatchKind { // The index deletion can batch with everything but must stop after ( BatchKind::DocumentClear { mut ids } - | BatchKind::DocumentDeletion { deletion_ids: mut ids } + | BatchKind::DocumentDeletion { deletion_ids: mut ids, includes_by_filter: _ } | BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, operation_ids: mut ids } | BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids }, K::IndexDeletion, @@ -284,7 +286,7 @@ impl BatchKind { ( BatchKind::DocumentClear { mut ids }, - K::DocumentClear | K::DocumentDeletion, + K::DocumentClear | K::DocumentDeletion { by_filter: _ }, ) => { ids.push(id); Continue(BatchKind::DocumentClear { ids }) @@ -328,7 +330,7 @@ impl BatchKind { } ( BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids }, - K::DocumentDeletion, + K::DocumentDeletion { by_filter: false }, ) => { operation_ids.push(id); @@ -339,6 +341,13 @@ impl BatchKind { operation_ids, }) } + // We can't batch a document operation with a delete by filter + ( + this @ BatchKind::DocumentOperation { .. }, + K::DocumentDeletion { by_filter: true }, + ) => { + Break(this) + } // but we can't autobatch documents if it's not the same kind // this match branch MUST be AFTER the previous one ( @@ -357,13 +366,18 @@ impl BatchKind { operation_ids, }), - (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentClear) => { + (BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: _ }, K::DocumentClear) => { deletion_ids.push(id); Continue(BatchKind::DocumentClear { ids: deletion_ids }) } + // we can't autobatch the deletion and import if the document deletion contained a filter + ( + this @ BatchKind::DocumentDeletion { deletion_ids: _, includes_by_filter: true }, + K::DocumentImport { .. } + ) => Break(this), // we can autobatch the deletion and import if the index already exists ( - BatchKind::DocumentDeletion { mut deletion_ids }, + BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false }, K::DocumentImport { method, allow_index_creation, primary_key } ) if index_already_exists => { deletion_ids.push(id); @@ -377,7 +391,7 @@ impl BatchKind { } // we can autobatch the deletion and import if both can't create an index ( - BatchKind::DocumentDeletion { mut deletion_ids }, + BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false }, K::DocumentImport { method, allow_index_creation, primary_key } ) if !allow_index_creation => { deletion_ids.push(id); @@ -396,9 +410,9 @@ impl BatchKind { ) => { Break(this) } - (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentDeletion) => { + (BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter }, K::DocumentDeletion { by_filter }) => { deletion_ids.push(id); - Continue(BatchKind::DocumentDeletion { deletion_ids }) + Continue(BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: includes_by_filter | by_filter }) } (this @ BatchKind::DocumentDeletion { .. }, K::Settings { .. }) => Break(this), @@ -412,7 +426,7 @@ impl BatchKind { }), ( this @ BatchKind::Settings { .. }, - K::DocumentImport { .. } | K::DocumentDeletion, + K::DocumentImport { .. } | K::DocumentDeletion { .. }, ) => Break(this), ( BatchKind::Settings { mut settings_ids, allow_index_creation }, @@ -443,7 +457,7 @@ impl BatchKind { settings_ids, allow_index_creation, }, - K::DocumentDeletion, + K::DocumentDeletion { .. }, ) => { other.push(id); Continue(BatchKind::ClearAndSettings { @@ -505,7 +519,7 @@ impl BatchKind { // this MUST be AFTER the two previous branch ( this @ BatchKind::SettingsAndDocumentOperation { .. }, - K::DocumentDeletion | K::DocumentImport { .. }, + K::DocumentDeletion { .. } | K::DocumentImport { .. }, ) => Break(this), ( BatchKind::SettingsAndDocumentOperation { mut settings_ids, method, allow_index_creation,primary_key, operation_ids }, @@ -525,8 +539,7 @@ impl BatchKind { | BatchKind::IndexDeletion { .. } | BatchKind::IndexUpdate { .. } | BatchKind::IndexSwap { .. } - | BatchKind::DocumentEdition { .. } - | BatchKind::DocumentDeletionByFilter { .. }, + | BatchKind::DocumentEdition { .. }, _, ) => { unreachable!() @@ -616,6 +629,13 @@ mod tests { } } + fn doc_del_fil() -> KindWithContent { + KindWithContent::DocumentDeletionByFilter { + index_uid: String::from("doggo"), + filter_expr: serde_json::json!("cuteness > 100"), + } + } + fn doc_clr() -> KindWithContent { KindWithContent::DocumentClear { index_uid: String::from("doggo") } } @@ -676,10 +696,16 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1, 2] }, false))"); // we can autobatch one or multiple DocumentDeletion together - debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))"); + + // we can autobatch one or multiple DocumentDeletionByFilter together + debug_snapshot!(autobatch_from(true, None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))"); // we can autobatch one or multiple Settings together debug_snapshot!(autobatch_from(true, None, [settings(true)]), @"Some((Settings { allow_index_creation: true, settings_ids: [0] }, true))"); @@ -722,25 +748,63 @@ mod tests { debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + + // But we can't autobatch document addition with document deletion by filter + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); + // And the other way around + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); } #[test] fn simple_document_operation_dont_autobatch_with_other() { - // addition, updates and deletion can't batch together + // addition, updates and deletion by filter can't batch together debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))"); } #[test] @@ -807,6 +871,7 @@ mod tests { debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))"); @@ -816,6 +881,7 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))"); @@ -827,6 +893,7 @@ mod tests { debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(true, None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); @@ -836,6 +903,7 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false,None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(false,None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))"); @@ -901,10 +969,10 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); // batch deletion and addition - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))"); } #[test] diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e6e78614..9b5368826 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -110,9 +110,9 @@ pub(crate) enum IndexOperation { index_uid: String, task: Task, }, - IndexDocumentDeletionByFilter { + DocumentDeletion { index_uid: String, - task: Task, + tasks: Vec, }, DocumentClear { index_uid: String, @@ -165,11 +165,11 @@ impl Batch { Batch::IndexOperation { op, .. } => match op { IndexOperation::DocumentOperation { tasks, .. } | IndexOperation::Settings { tasks, .. } + | IndexOperation::DocumentDeletion { tasks, .. } | IndexOperation::DocumentClear { tasks, .. } => { RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) } - IndexOperation::DocumentEdition { task, .. } - | IndexOperation::IndexDocumentDeletionByFilter { task, .. } => { + IndexOperation::DocumentEdition { task, .. } => { RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() } IndexOperation::SettingsAndDocumentOperation { @@ -234,7 +234,7 @@ impl IndexOperation { match self { IndexOperation::DocumentOperation { index_uid, .. } | IndexOperation::DocumentEdition { index_uid, .. } - | IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. } + | IndexOperation::DocumentDeletion { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::Settings { index_uid, .. } | IndexOperation::DocumentClearAndSetting { index_uid, .. } @@ -252,8 +252,8 @@ impl fmt::Display for IndexOperation { IndexOperation::DocumentEdition { .. } => { f.write_str("IndexOperation::DocumentEdition") } - IndexOperation::IndexDocumentDeletionByFilter { .. } => { - f.write_str("IndexOperation::IndexDocumentDeletionByFilter") + IndexOperation::DocumentDeletion { .. } => { + f.write_str("IndexOperation::DocumentDeletion") } IndexOperation::DocumentClear { .. } => f.write_str("IndexOperation::DocumentClear"), IndexOperation::Settings { .. } => f.write_str("IndexOperation::Settings"), @@ -289,21 +289,6 @@ impl IndexScheduler { }, must_create_index, })), - BatchKind::DocumentDeletionByFilter { id } => { - let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; - match &task.kind { - KindWithContent::DocumentDeletionByFilter { index_uid, .. } => { - Ok(Some(Batch::IndexOperation { - op: IndexOperation::IndexDocumentDeletionByFilter { - index_uid: index_uid.clone(), - task, - }, - must_create_index: false, - })) - } - _ => unreachable!(), - } - } BatchKind::DocumentEdition { id } => { let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; match &task.kind { @@ -366,30 +351,11 @@ impl IndexScheduler { must_create_index, })) } - BatchKind::DocumentDeletion { deletion_ids } => { + BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => { let tasks = self.get_existing_tasks(rtxn, deletion_ids)?; - let mut operations = Vec::with_capacity(tasks.len()); - let mut documents_counts = Vec::with_capacity(tasks.len()); - for task in &tasks { - match task.kind { - KindWithContent::DocumentDeletion { ref documents_ids, .. } => { - operations.push(DocumentOperation::Delete(documents_ids.clone())); - documents_counts.push(documents_ids.len() as u64); - } - _ => unreachable!(), - } - } - Ok(Some(Batch::IndexOperation { - op: IndexOperation::DocumentOperation { - index_uid, - primary_key: None, - method: IndexDocumentsMethod::ReplaceDocuments, - documents_counts, - operations, - tasks, - }, + op: IndexOperation::DocumentDeletion { index_uid, tasks }, must_create_index, })) } @@ -1439,7 +1405,7 @@ impl IndexScheduler { { (original_filter, context, function) } else { - // In the case of a `documentDeleteByFilter` the details MUST be set + // In the case of a `documentEdition` the details MUST be set unreachable!(); }; @@ -1469,52 +1435,79 @@ impl IndexScheduler { Ok(vec![task]) } - IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { - let filter = - if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } = - &task.kind - { - filter_expr - } else { - unreachable!() - }; - let deleted_documents = delete_document_by_filter( - index_wtxn, - filter, - self.index_mapper.indexer_config(), - self.must_stop_processing.clone(), - index, - ); - let original_filter = if let Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: _, - }) = task.details - { - original_filter - } else { - // In the case of a `documentDeleteByFilter` the details MUST be set - unreachable!(); - }; + IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => { + let mut to_delete = RoaringBitmap::new(); + let external_documents_ids = index.external_documents_ids(); - match deleted_documents { - Ok(deleted_documents) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: Some(deleted_documents), - }); - } - Err(e) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentDeletionByFilter { - original_filter, - deleted_documents: Some(0), - }); - task.error = Some(e.into()); + for task in tasks.iter_mut() { + let before = to_delete.len(); + match &task.kind { + KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => { + for id in documents_ids { + if let Some(id) = external_documents_ids.get(index_wtxn, id)? { + to_delete.insert(id); + } + } + let will_be_removed = to_delete.len() - before; + task.details = Some(Details::DocumentDeletion { + provided_ids: documents_ids.len(), + deleted_documents: Some(will_be_removed), + }); + } + KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => { + let before = to_delete.len(); + let filter = Filter::from_json(filter_expr)?; + if let Some(filter) = filter { + let candidates = filter.evaluate(index_wtxn, index).map_err( + |err| match err { + milli::Error::UserError( + milli::UserError::InvalidFilter(_), + ) => Error::from(err) + .with_custom_error_code(Code::InvalidDocumentFilter), + e => e.into(), + }, + )?; + to_delete |= candidates; + } + let will_be_removed = to_delete.len() - before; + if let Some(Details::DocumentDeletionByFilter { + original_filter: _, + deleted_documents, + }) = &mut task.details + { + *deleted_documents = Some(will_be_removed); + } else { + // In the case of a `documentDeleteByFilter` the details MUST be set + unreachable!() + } + } + _ => unreachable!(), } + task.status = Status::Succeeded; } - Ok(vec![task]) + let config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }; + + let must_stop_processing = self.must_stop_processing.clone(); + let mut builder = milli::update::IndexDocuments::new( + index_wtxn, + index, + self.index_mapper.indexer_config(), + config, + |indexing_step| tracing::debug!(update = ?indexing_step), + || must_stop_processing.get(), + )?; + + let (new_builder, _count) = + builder.remove_documents_from_db_no_batch(&to_delete)?; + builder = new_builder; + + let _ = builder.execute()?; + + Ok(tasks) } IndexOperation::Settings { index_uid: _, settings, mut tasks } => { let indexer_config = self.index_mapper.indexer_config(); @@ -1718,46 +1711,6 @@ impl IndexScheduler { } } -fn delete_document_by_filter<'a>( - wtxn: &mut RwTxn<'a>, - filter: &serde_json::Value, - indexer_config: &IndexerConfig, - must_stop_processing: MustStopProcessing, - index: &'a Index, -) -> Result { - let filter = Filter::from_json(filter)?; - Ok(if let Some(filter) = filter { - let candidates = filter.evaluate(wtxn, index).map_err(|err| match err { - milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { - Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) - } - e => e.into(), - })?; - - let config = IndexDocumentsConfig { - update_method: IndexDocumentsMethod::ReplaceDocuments, - ..Default::default() - }; - - let mut builder = milli::update::IndexDocuments::new( - wtxn, - index, - indexer_config, - config, - |indexing_step| tracing::debug!(update = ?indexing_step), - || must_stop_processing.get(), - )?; - - let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?; - builder = new_builder; - - let _ = builder.execute()?; - count - } else { - 0 - }) -} - fn edit_documents_by_function<'a>( wtxn: &mut RwTxn<'a>, filter: &Option,