diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index e1e48ab90..31634237f 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -88,11 +88,11 @@ pub enum BatchKind { DocumentClear { ids: Vec, }, - DocumentImport { + DocumentOperation { method: IndexDocumentsMethod, allow_index_creation: bool, primary_key: Option, - import_ids: Vec, + operation_ids: Vec, }, DocumentDeletion { deletion_ids: Vec, @@ -102,12 +102,12 @@ pub enum BatchKind { allow_index_creation: bool, settings_ids: Vec, }, - SettingsAndDocumentImport { + SettingsAndDocumentOperation { settings_ids: Vec, method: IndexDocumentsMethod, allow_index_creation: bool, primary_key: Option, - import_ids: Vec, + operation_ids: Vec, }, Settings { allow_index_creation: bool, @@ -131,9 +131,9 @@ impl BatchKind { #[rustfmt::skip] fn allow_index_creation(&self) -> Option { match self { - BatchKind::DocumentImport { allow_index_creation, .. } + BatchKind::DocumentOperation { allow_index_creation, .. } | BatchKind::ClearAndSettings { allow_index_creation, .. } - | BatchKind::SettingsAndDocumentImport { allow_index_creation, .. } + | BatchKind::SettingsAndDocumentOperation { allow_index_creation, .. } | BatchKind::Settings { allow_index_creation, .. } => Some(*allow_index_creation), _ => None, } @@ -141,8 +141,8 @@ impl BatchKind { fn primary_key(&self) -> Option> { match self { - BatchKind::DocumentImport { primary_key, .. } - | BatchKind::SettingsAndDocumentImport { primary_key, .. } => { + BatchKind::DocumentOperation { primary_key, .. } + | BatchKind::SettingsAndDocumentOperation { primary_key, .. } => { Some(primary_key.as_deref()) } _ => None, @@ -173,22 +173,22 @@ impl BatchKind { if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() => { ( - Continue(BatchKind::DocumentImport { + Continue(BatchKind::DocumentOperation { method, allow_index_creation, primary_key: pk, - import_ids: vec![task_id], + operation_ids: vec![task_id], }), allow_index_creation, ) } // if the primary key set in the task was different than ours we should stop and make this batch fail asap. K::DocumentImport { method, allow_index_creation, primary_key } => ( - Break(BatchKind::DocumentImport { + Break(BatchKind::DocumentOperation { method, allow_index_creation, primary_key, - import_ids: vec![task_id], + operation_ids: vec![task_id], }), allow_index_creation, ), @@ -249,7 +249,7 @@ impl BatchKind { ( BatchKind::DocumentClear { mut ids } | BatchKind::DocumentDeletion { deletion_ids: mut ids } - | BatchKind::DocumentImport { method: _, allow_index_creation: _, primary_key: _, import_ids: mut ids } + | BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, operation_ids: mut ids } | BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids }, K::IndexDeletion, ) => { @@ -258,7 +258,7 @@ impl BatchKind { } ( BatchKind::ClearAndSettings { settings_ids: mut ids, allow_index_creation: _, mut other } - | BatchKind::SettingsAndDocumentImport { import_ids: mut ids, method: _, allow_index_creation: _, primary_key: _, settings_ids: mut other }, + | BatchKind::SettingsAndDocumentOperation { operation_ids: mut ids, method: _, allow_index_creation: _, primary_key: _, settings_ids: mut other }, K::IndexDeletion, ) => { ids.push(id); @@ -278,63 +278,108 @@ impl BatchKind { K::DocumentImport { .. } | K::Settings { .. }, ) => Break(this), ( - BatchKind::DocumentImport { method: _, allow_index_creation: _, primary_key: _, import_ids: mut ids }, + BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, mut operation_ids }, K::DocumentClear, ) => { - ids.push(id); - Continue(BatchKind::DocumentClear { ids }) + operation_ids.push(id); + Continue(BatchKind::DocumentClear { ids: operation_ids }) } // we can autobatch the same kind of document additions / updates ( - BatchKind::DocumentImport { method: ReplaceDocuments, allow_index_creation, primary_key: _, mut import_ids }, + BatchKind::DocumentOperation { method: ReplaceDocuments, allow_index_creation, primary_key: _, mut operation_ids }, K::DocumentImport { method: ReplaceDocuments, primary_key: pk, .. }, ) => { - import_ids.push(id); - Continue(BatchKind::DocumentImport { + operation_ids.push(id); + Continue(BatchKind::DocumentOperation { method: ReplaceDocuments, allow_index_creation, - import_ids, + operation_ids, primary_key: pk, }) } ( - BatchKind::DocumentImport { method: UpdateDocuments, allow_index_creation, primary_key: _, mut import_ids }, + BatchKind::DocumentOperation { method: UpdateDocuments, allow_index_creation, primary_key: _, mut operation_ids }, K::DocumentImport { method: UpdateDocuments, primary_key: pk, .. }, ) => { - - import_ids.push(id); - Continue(BatchKind::DocumentImport { + operation_ids.push(id); + Continue(BatchKind::DocumentOperation { method: UpdateDocuments, allow_index_creation, primary_key: pk, - import_ids, + operation_ids, }) } + ( + BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids }, + K::DocumentDeletion, + ) => { + operation_ids.push(id); + Continue(BatchKind::DocumentOperation { + method, + allow_index_creation, + primary_key, + operation_ids, + }) + } // but we can't autobatch documents if it's not the same kind // this match branch MUST be AFTER the previous one ( - this @ BatchKind::DocumentImport { .. }, - K::DocumentDeletion | K::DocumentImport { .. }, + this @ BatchKind::DocumentOperation { .. }, + K::DocumentImport { .. }, ) => Break(this), ( - BatchKind::DocumentImport { method, allow_index_creation, primary_key, import_ids }, + BatchKind::DocumentOperation { method, allow_index_creation, primary_key, operation_ids }, K::Settings { .. }, - ) => Continue(BatchKind::SettingsAndDocumentImport { + ) => Continue(BatchKind::SettingsAndDocumentOperation { settings_ids: vec![id], method, allow_index_creation, primary_key, - import_ids, + operation_ids, }), (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentClear) => { deletion_ids.push(id); Continue(BatchKind::DocumentClear { ids: deletion_ids }) } - (this @ BatchKind::DocumentDeletion { .. }, K::DocumentImport { .. }) => Break(this), + // we can autobatch the deletion and import if the index already exists + ( + BatchKind::DocumentDeletion { mut deletion_ids }, + K::DocumentImport { method, allow_index_creation, primary_key } + ) if index_already_exists => { + deletion_ids.push(id); + + Continue(BatchKind::DocumentOperation { + method, + allow_index_creation, + primary_key, + operation_ids: deletion_ids, + }) + } + // we can autobatch the deletion and import if both can't create an index + ( + BatchKind::DocumentDeletion { mut deletion_ids }, + K::DocumentImport { method, allow_index_creation, primary_key } + ) if !allow_index_creation => { + deletion_ids.push(id); + + Continue(BatchKind::DocumentOperation { + method, + allow_index_creation, + primary_key, + operation_ids: deletion_ids, + }) + } + // we can't autobatch a deletion and an import if the index does not exists but would be created by an addition + ( + this @ BatchKind::DocumentDeletion { .. }, + K::DocumentImport { .. } + ) => { + Break(this) + } (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentDeletion) => { deletion_ids.push(id); Continue(BatchKind::DocumentDeletion { deletion_ids }) @@ -403,60 +448,60 @@ impl BatchKind { }) } ( - BatchKind::SettingsAndDocumentImport { settings_ids, method: _, import_ids: mut other, allow_index_creation, primary_key: _ }, + BatchKind::SettingsAndDocumentOperation { settings_ids, method: _, mut operation_ids, allow_index_creation, primary_key: _ }, K::DocumentClear, ) => { - other.push(id); + operation_ids.push(id); Continue(BatchKind::ClearAndSettings { settings_ids, - other, + other: operation_ids, allow_index_creation, }) } ( - BatchKind::SettingsAndDocumentImport { settings_ids, method: ReplaceDocuments, mut import_ids, allow_index_creation, primary_key: _}, + BatchKind::SettingsAndDocumentOperation { settings_ids, method: ReplaceDocuments, mut operation_ids, allow_index_creation, primary_key: _}, K::DocumentImport { method: ReplaceDocuments, primary_key: pk2, .. }, ) => { - import_ids.push(id); - Continue(BatchKind::SettingsAndDocumentImport { + operation_ids.push(id); + Continue(BatchKind::SettingsAndDocumentOperation { settings_ids, method: ReplaceDocuments, allow_index_creation, primary_key: pk2, - import_ids, + operation_ids, }) } ( - BatchKind::SettingsAndDocumentImport { settings_ids, method: UpdateDocuments, allow_index_creation, primary_key: _, mut import_ids }, + BatchKind::SettingsAndDocumentOperation { settings_ids, method: UpdateDocuments, allow_index_creation, primary_key: _, mut operation_ids }, K::DocumentImport { method: UpdateDocuments, primary_key: pk2, .. }, ) => { - import_ids.push(id); - Continue(BatchKind::SettingsAndDocumentImport { + operation_ids.push(id); + Continue(BatchKind::SettingsAndDocumentOperation { settings_ids, method: UpdateDocuments, allow_index_creation, primary_key: pk2, - import_ids, + operation_ids, }) } // But we can't batch a settings and a doc op with another doc op // this MUST be AFTER the two previous branch ( - this @ BatchKind::SettingsAndDocumentImport { .. }, + this @ BatchKind::SettingsAndDocumentOperation { .. }, K::DocumentDeletion | K::DocumentImport { .. }, ) => Break(this), ( - BatchKind::SettingsAndDocumentImport { mut settings_ids, method, allow_index_creation,primary_key, import_ids }, + BatchKind::SettingsAndDocumentOperation { mut settings_ids, method, allow_index_creation,primary_key, operation_ids }, K::Settings { .. }, ) => { settings_ids.push(id); - Continue(BatchKind::SettingsAndDocumentImport { + Continue(BatchKind::SettingsAndDocumentOperation { settings_ids, method, allow_index_creation, primary_key, - import_ids, + operation_ids, }) } ( @@ -588,29 +633,29 @@ mod tests { fn autobatch_simple_operation_together() { // we can autobatch one or multiple `ReplaceDocuments` together. // if the index exists. - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp( ReplaceDocuments, true , None), doc_imp(ReplaceDocuments, true , None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_imp( ReplaceDocuments, false , None), doc_imp(ReplaceDocuments, false , None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp( ReplaceDocuments, true , None), doc_imp(ReplaceDocuments, true , None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_imp( ReplaceDocuments, false , None), doc_imp(ReplaceDocuments, false , None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1, 2] }, false))"); // if it doesn't exists. - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None), doc_imp( ReplaceDocuments, true , None), doc_imp(ReplaceDocuments, true , None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), doc_imp( ReplaceDocuments, true , None), doc_imp(ReplaceDocuments, true , None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None), doc_imp( ReplaceDocuments, true , None), doc_imp(ReplaceDocuments, true , None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), doc_imp( ReplaceDocuments, true , None), doc_imp(ReplaceDocuments, true , None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); // we can autobatch one or multiple `UpdateDocuments` together. // if the index exists. - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1, 2] }, false))"); // if it doesn't exists. - debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1, 2] }, true))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1, 2] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1, 2] }, true))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1, 2] }, false))"); // we can autobatch one or multiple DocumentDeletion together debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); @@ -628,56 +673,83 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [settings(true), settings(true), settings(true)]), @"Some((Settings { allow_index_creation: true, settings_ids: [0, 1, 2] }, true))"); debug_snapshot!(autobatch_from(false,None, [settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [settings(false), settings(false), settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0, 1, 2] }, false))"); + + // We can autobatch document addition with document deletion + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + // And the other way around + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); } #[test] fn simple_document_operation_dont_autobatch_with_other() { // addition, updates and deletion can't batch together - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_create()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_create()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_update()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_update()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_swap()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_swap()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); } #[test] fn document_addition_batch_with_settings() { // simple case - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); // multiple settings and doc addition - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None), settings(true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None), settings(true), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None), settings(true), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None), settings(true), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); // addition and setting unordered - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 2] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), doc_imp(UpdateDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1, 3], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 2] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1, 3], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 2] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), doc_imp(UpdateDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1, 3], method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 2] }, true))"); // We ensure this kind of batch doesn't batch with forbidden operations - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), doc_imp(UpdateDocuments, true, None)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), doc_imp(ReplaceDocuments, true, None)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), doc_del()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), doc_del()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), idx_create()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), idx_create()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), idx_update()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), idx_update()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), idx_swap()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), idx_swap()]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), doc_imp(UpdateDocuments, true, None)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), doc_imp(ReplaceDocuments, true, None)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), doc_del()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), doc_del()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), idx_create()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), idx_create()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), idx_update()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), idx_update()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true), idx_swap()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), settings(true), idx_swap()]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); } #[test] @@ -789,67 +861,73 @@ mod tests { debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), settings(false), doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [1, 3, 0, 2] }, false))"); // The third and final case is when the first task doesn't create an index but is directly followed by a task creating an index. In this case we can't batch whith what // follows because we first need to process the erronous batch. - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments,false, None), settings(true), idx_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), settings(true), idx_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments,false, None), settings(true), doc_clr(), idx_del()]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), settings(true), doc_clr(), idx_del()]), @"Some((DocumentImport { method: UpdateDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments,false, None), settings(true), idx_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), settings(true), idx_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments,false, None), settings(true), doc_clr(), idx_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), settings(true), doc_clr(), idx_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); } #[test] fn allowed_and_disallowed_index_creation() { // `DocumentImport` can't be mixed with those disallowed to do so except if the index already exists. - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0, 1] }, false))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, import_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, true, None), settings(true)]), @"Some((SettingsAndDocumentOperation { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); + + // batch deletion and addition + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); + debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); } #[test] fn autobatch_primary_key() { // ==> If I have a pk // With a single update - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); // With a multiple updates - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("other"))]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("other"))]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0, 1] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0, 1] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, Some("id"), [doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("other")), doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); // ==> If I don't have a pk // With a single update - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0] }, true))"###); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), import_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("id"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("other"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("other"), operation_ids: [0] }, true))"###); // With a multiple updates - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0, 1] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("id"))]), @"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, import_ids: [0] }, true))"); - debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentImport { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), import_ids: [0] }, true))"###); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(ReplaceDocuments, true, Some("id"))]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); + debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("id")), doc_imp(ReplaceDocuments, true, None)]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("id"), operation_ids: [0] }, true))"###); } } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 8a479a12b..655922785 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -28,8 +28,7 @@ use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::{ - DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, - Settings as MilliSettings, + DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, Settings as MilliSettings, }; use meilisearch_types::milli::{self, BEU32}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; @@ -86,15 +85,21 @@ pub(crate) enum Batch { }, } +#[derive(Debug)] +pub(crate) enum DocumentOperation { + Add(Uuid), + Delete(Vec), +} + /// A [batch](Batch) that combines multiple tasks operating on an index. #[derive(Debug)] pub(crate) enum IndexOperation { - DocumentImport { + DocumentOperation { index_uid: String, primary_key: Option, method: IndexDocumentsMethod, documents_counts: Vec, - content_files: Vec, + operations: Vec, tasks: Vec, }, DocumentDeletion { @@ -121,13 +126,13 @@ pub(crate) enum IndexOperation { settings: Vec<(bool, Settings)>, settings_tasks: Vec, }, - SettingsAndDocumentImport { + SettingsAndDocumentOperation { index_uid: String, primary_key: Option, method: IndexDocumentsMethod, documents_counts: Vec, - content_files: Vec, + operations: Vec, document_import_tasks: Vec, // The boolean indicates if it's a settings deletion or creation. @@ -149,13 +154,13 @@ impl Batch { tasks.iter().map(|task| task.uid).collect() } Batch::IndexOperation { op, .. } => match op { - IndexOperation::DocumentImport { tasks, .. } + IndexOperation::DocumentOperation { tasks, .. } | IndexOperation::DocumentDeletion { tasks, .. } | IndexOperation::Settings { tasks, .. } | IndexOperation::DocumentClear { tasks, .. } => { tasks.iter().map(|task| task.uid).collect() } - IndexOperation::SettingsAndDocumentImport { + IndexOperation::SettingsAndDocumentOperation { document_import_tasks: tasks, settings_tasks: other, .. @@ -169,17 +174,33 @@ impl Batch { Batch::IndexSwap { task } => vec![task.uid], } } + + /// Return the index UID associated with this batch + pub fn index_uid(&self) -> Option<&str> { + use Batch::*; + match self { + TaskCancelation { .. } + | TaskDeletion(_) + | SnapshotCreation(_) + | Dump(_) + | IndexSwap { .. } => None, + IndexOperation { op, .. } => Some(op.index_uid()), + IndexCreation { index_uid, .. } + | IndexUpdate { index_uid, .. } + | IndexDeletion { index_uid, .. } => Some(index_uid), + } + } } impl IndexOperation { pub fn index_uid(&self) -> &str { match self { - IndexOperation::DocumentImport { index_uid, .. } + IndexOperation::DocumentOperation { index_uid, .. } | IndexOperation::DocumentDeletion { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::Settings { index_uid, .. } | IndexOperation::DocumentClearAndSetting { index_uid, .. } - | IndexOperation::SettingsAndDocumentImport { index_uid, .. } => index_uid, + | IndexOperation::SettingsAndDocumentOperation { index_uid, .. } => index_uid, } } } @@ -206,17 +227,22 @@ impl IndexScheduler { }, must_create_index, })), - BatchKind::DocumentImport { method, import_ids, .. } => { - let tasks = self.get_existing_tasks(rtxn, import_ids)?; - let primary_key = match &tasks[0].kind { - KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => { - primary_key.clone() - } - _ => unreachable!(), - }; + BatchKind::DocumentOperation { method, operation_ids, .. } => { + let tasks = self.get_existing_tasks(rtxn, operation_ids)?; + let primary_key = tasks + .iter() + .find_map(|task| match task.kind { + KindWithContent::DocumentAdditionOrUpdate { ref primary_key, .. } => { + // we want to stop on the first document addition + Some(primary_key.clone()) + } + KindWithContent::DocumentDeletion { .. } => None, + _ => unreachable!(), + }) + .flatten(); let mut documents_counts = Vec::new(); - let mut content_files = Vec::new(); + let mut operations = Vec::new(); for task in tasks.iter() { match task.kind { @@ -226,19 +252,23 @@ impl IndexScheduler { .. } => { documents_counts.push(documents_count); - content_files.push(content_file); + operations.push(DocumentOperation::Add(content_file)); + } + KindWithContent::DocumentDeletion { ref documents_ids, .. } => { + documents_counts.push(documents_ids.len() as u64); + operations.push(DocumentOperation::Delete(documents_ids.clone())); } _ => unreachable!(), } } Ok(Some(Batch::IndexOperation { - op: IndexOperation::DocumentImport { + op: IndexOperation::DocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, tasks, }, must_create_index, @@ -322,12 +352,12 @@ impl IndexScheduler { must_create_index, })) } - BatchKind::SettingsAndDocumentImport { + BatchKind::SettingsAndDocumentOperation { settings_ids, method, allow_index_creation, primary_key, - import_ids, + operation_ids, } => { let settings = self.create_next_batch_index( rtxn, @@ -339,11 +369,11 @@ impl IndexScheduler { let document_import = self.create_next_batch_index( rtxn, index_uid.clone(), - BatchKind::DocumentImport { + BatchKind::DocumentOperation { method, allow_index_creation, primary_key, - import_ids, + operation_ids, }, must_create_index, )?; @@ -352,10 +382,10 @@ impl IndexScheduler { ( Some(Batch::IndexOperation { op: - IndexOperation::DocumentImport { + IndexOperation::DocumentOperation { primary_key, documents_counts, - content_files, + operations, tasks: document_import_tasks, .. }, @@ -366,12 +396,12 @@ impl IndexScheduler { .. }), ) => Ok(Some(Batch::IndexOperation { - op: IndexOperation::SettingsAndDocumentImport { + op: IndexOperation::SettingsAndDocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, document_import_tasks, settings, settings_tasks, @@ -987,12 +1017,12 @@ impl IndexScheduler { Ok(tasks) } - IndexOperation::DocumentImport { + IndexOperation::DocumentOperation { index_uid: _, primary_key, method, - documents_counts, - content_files, + documents_counts: _, + operations, mut tasks, } => { let mut primary_key_has_been_set = false; @@ -1037,26 +1067,82 @@ impl IndexScheduler { || must_stop_processing.get(), )?; - let mut results = Vec::new(); - for content_uuid in content_files.into_iter() { - let content_file = self.file_store.get_update(content_uuid)?; - let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(milli::Error::from)?; - let (new_builder, user_result) = builder.add_documents(reader)?; - builder = new_builder; + for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { + match operation { + DocumentOperation::Add(content_uuid) => { + let content_file = self.file_store.get_update(content_uuid)?; + let reader = DocumentsBatchReader::from_reader(content_file) + .map_err(milli::Error::from)?; + let (new_builder, user_result) = builder.add_documents(reader)?; + builder = new_builder; - let user_result = match user_result { - Ok(count) => Ok(DocumentAdditionResult { - indexed_documents: count, - number_of_documents: count, // TODO: this is wrong, we should use the value stored in the Details. - }), - Err(e) => Err(milli::Error::from(e)), - }; + let received_documents = + if let Some(Details::DocumentAdditionOrUpdate { + received_documents, + .. + }) = task.details + { + received_documents + } else { + // In the case of a `documentAdditionOrUpdate` the details MUST be set + unreachable!(); + }; - results.push(user_result); + match user_result { + Ok(count) => { + task.status = Status::Succeeded; + task.details = Some(Details::DocumentAdditionOrUpdate { + received_documents, + indexed_documents: Some(count), + }) + } + Err(e) => { + task.status = Status::Failed; + task.details = Some(Details::DocumentAdditionOrUpdate { + received_documents, + indexed_documents: Some(0), + }); + task.error = Some(milli::Error::from(e).into()); + } + } + } + DocumentOperation::Delete(document_ids) => { + let (new_builder, user_result) = + builder.remove_documents(document_ids)?; + builder = new_builder; + + let provided_ids = + if let Some(Details::DocumentDeletion { provided_ids, .. }) = + task.details + { + provided_ids + } else { + // In the case of a `documentAdditionOrUpdate` the details MUST be set + unreachable!(); + }; + + match user_result { + Ok(count) => { + task.status = Status::Succeeded; + task.details = Some(Details::DocumentDeletion { + provided_ids, + deleted_documents: Some(count), + }); + } + Err(e) => { + task.status = Status::Failed; + task.details = Some(Details::DocumentDeletion { + provided_ids, + deleted_documents: Some(0), + }); + task.error = Some(milli::Error::from(e).into()); + } + } + } + } } - if results.iter().any(|res| res.is_ok()) { + if !tasks.iter().all(|res| res.error.is_some()) { let addition = builder.execute()?; info!("document addition done: {:?}", addition); } else if primary_key_has_been_set { @@ -1071,29 +1157,6 @@ impl IndexScheduler { )?; } - for (task, (ret, count)) in - tasks.iter_mut().zip(results.into_iter().zip(documents_counts)) - { - match ret { - Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentAdditionOrUpdate { - received_documents: number_of_documents, - indexed_documents: Some(indexed_documents), - }); - } - Err(error) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentAdditionOrUpdate { - received_documents: count, - // if there was an error we indexed 0 documents. - indexed_documents: Some(0), - }); - task.error = Some(error.into()) - } - } - } - Ok(tasks) } IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => { @@ -1136,12 +1199,12 @@ impl IndexScheduler { Ok(tasks) } - IndexOperation::SettingsAndDocumentImport { + IndexOperation::SettingsAndDocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, document_import_tasks, settings, settings_tasks, @@ -1159,12 +1222,12 @@ impl IndexScheduler { let mut import_tasks = self.apply_index_operation( index_wtxn, index, - IndexOperation::DocumentImport { + IndexOperation::DocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, tasks: document_import_tasks, }, )?; diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 02b53749f..d1fe7c57d 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -9,10 +9,11 @@ use meilisearch_types::heed::types::Str; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::Index; +use synchronoise::SignalEvent; use time::OffsetDateTime; use uuid::Uuid; -use self::IndexStatus::{Available, BeingDeleted}; +use self::IndexStatus::{Available, BeingDeleted, BeingResized}; use crate::uuid_codec::UuidCodec; use crate::{clamp_to_page_size, Error, Result}; @@ -45,6 +46,8 @@ pub struct IndexMapper { pub enum IndexStatus { /// Do not insert it back in the index map as it is currently being deleted. BeingDeleted, + /// Temporarily do not insert the index in the index map as it is currently being resized. + BeingResized(Arc), /// You can use the index without worrying about anything. Available(Index), } @@ -71,9 +74,10 @@ impl IndexMapper { &self, path: &Path, date: Option<(OffsetDateTime, OffsetDateTime)>, + map_size: usize, ) -> Result { let mut options = EnvOpenOptions::new(); - options.map_size(clamp_to_page_size(self.index_size)); + options.map_size(clamp_to_page_size(map_size)); options.max_readers(1024); if let Some((created, updated)) = date { @@ -102,14 +106,15 @@ impl IndexMapper { let index_path = self.base_path.join(uuid.to_string()); fs::create_dir_all(&index_path)?; - let index = self.create_or_open_index(&index_path, date)?; + let index = self.create_or_open_index(&index_path, date, self.index_size)?; wtxn.commit()?; + // Error if the UUIDv4 somehow already exists in the map, since it should be fresh. + // This is very unlikely to happen in practice. // TODO: it would be better to lazily create the index. But we need an Index::open function for milli. - if let Some(BeingDeleted) = - self.index_map.write().unwrap().insert(uuid, Available(index.clone())) + if self.index_map.write().unwrap().insert(uuid, Available(index.clone())).is_some() { - panic!("Uuid v4 conflict."); + panic!("Uuid v4 conflict: index with UUID {uuid} already exists."); } Ok(index) @@ -131,13 +136,23 @@ impl IndexMapper { wtxn.commit()?; // We remove the index from the in-memory index map. - let mut lock = self.index_map.write().unwrap(); - let closing_event = match lock.insert(uuid, BeingDeleted) { - Some(Available(index)) => Some(index.prepare_for_closing()), - _ => None, - }; + let closing_event = loop { + let mut lock = self.index_map.write().unwrap(); + let resize_operation = match lock.insert(uuid, BeingDeleted) { + Some(Available(index)) => break Some(index.prepare_for_closing()), + // The target index is in the middle of a resize operation. + // Wait for this operation to complete, then try again. + Some(BeingResized(resize_operation)) => resize_operation.clone(), + // The index is already being deleted or doesn't exist. + // It's OK to remove it from the map again. + _ => break None, + }; - drop(lock); + // Avoiding deadlocks: we need to drop the lock before waiting for the end of the resize, which + // will involve operations on the very map we're locking. + drop(lock); + resize_operation.wait(); + }; let index_map = self.index_map.clone(); let index_path = self.base_path.join(uuid.to_string()); @@ -171,6 +186,87 @@ impl IndexMapper { Ok(self.index_mapping.get(rtxn, name)?.is_some()) } + /// Resizes the maximum size of the specified index to the double of its current maximum size. + /// + /// This operation involves closing the underlying environment and so can take a long time to complete. + /// + /// # Panics + /// + /// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the + /// in memory hash map. + pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> { + // fixme: factor to a function? + let uuid = self + .index_mapping + .get(rtxn, name)? + .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; + + // We remove the index from the in-memory index map. + let mut lock = self.index_map.write().unwrap(); + // signal that will be sent when the resize operation completes + let resize_operation = Arc::new(SignalEvent::manual(false)); + let index = match lock.insert(uuid, BeingResized(resize_operation)) { + Some(Available(index)) => index, + Some(previous_status) => { + lock.insert(uuid, previous_status); + panic!( + "Attempting to resize index {name} that is already being resized or deleted." + ) + } + None => { + panic!("Could not find the status of index {name} in the in-memory index mapper.") + } + }; + + drop(lock); + + let resize_succeeded = (move || { + let current_size = index.map_size()?; + let new_size = current_size * 2; + let closing_event = index.prepare_for_closing(); + + log::debug!("Waiting for index {name} to close"); + + if !closing_event.wait_timeout(std::time::Duration::from_secs(600)) { + // fail after 10 minutes waiting + panic!("Could not resize index {name} (unable to close it)"); + } + + log::info!("Resized index {name} from {current_size} to {new_size} bytes"); + let index_path = self.base_path.join(uuid.to_string()); + let index = self.create_or_open_index(&index_path, None, new_size)?; + Ok(index) + })(); + + // Put the map back to a consistent state. + // Even if there was an error we don't want to leave the map in an inconsistent state as it would cause + // deadlocks. + let mut lock = self.index_map.write().unwrap(); + let (resize_operation, resize_succeeded) = match resize_succeeded { + Ok(index) => { + // insert the resized index + let Some(BeingResized(resize_operation)) = lock.insert(uuid, Available(index)) else { + panic!("Index state for index {name} was modified while it was being resized") + }; + + (resize_operation, Ok(())) + } + Err(error) => { + // there was an error, not much we can do... delete the index from the in-memory map to prevent future errors + let Some(BeingResized(resize_operation)) = lock.remove(&uuid) else { + panic!("Index state for index {name} was modified while it was being resized") + }; + (resize_operation, Err(error)) + } + }; + + // drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up. + drop(lock); + resize_operation.signal(); + + resize_succeeded + } + /// Return an index, may open it if it wasn't already opened. pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result { let uuid = self @@ -179,31 +275,47 @@ impl IndexMapper { .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; // we clone here to drop the lock before entering the match - let index = self.index_map.read().unwrap().get(&uuid).cloned(); - let index = match index { - Some(Available(index)) => index, - Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), - // since we're lazy, it's possible that the index has not been opened yet. - None => { - let mut index_map = self.index_map.write().unwrap(); - // between the read lock and the write lock it's not impossible - // that someone already opened the index (eg if two search happens - // at the same time), thus before opening it we check a second time - // if it's not already there. - // Since there is a good chance it's not already there we can use - // the entry method. - match index_map.entry(uuid) { - Entry::Vacant(entry) => { - let index_path = self.base_path.join(uuid.to_string()); + let index = loop { + let index = self.index_map.read().unwrap().get(&uuid).cloned(); - let index = self.create_or_open_index(&index_path, None)?; - entry.insert(Available(index.clone())); - index + match index { + Some(Available(index)) => break index, + Some(BeingResized(ref resize_operation)) => { + // Avoiding deadlocks: no lock taken while doing this operation. + resize_operation.wait(); + continue; + } + Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), + // since we're lazy, it's possible that the index has not been opened yet. + None => { + let mut index_map = self.index_map.write().unwrap(); + // between the read lock and the write lock it's not impossible + // that someone already opened the index (eg if two search happens + // at the same time), thus before opening it we check a second time + // if it's not already there. + // Since there is a good chance it's not already there we can use + // the entry method. + match index_map.entry(uuid) { + Entry::Vacant(entry) => { + let index_path = self.base_path.join(uuid.to_string()); + + let index = + self.create_or_open_index(&index_path, None, self.index_size)?; + entry.insert(Available(index.clone())); + break index; + } + Entry::Occupied(entry) => match entry.get() { + Available(index) => break index.clone(), + BeingResized(resize_operation) => { + // Avoiding the deadlock: we drop the lock before waiting + let resize_operation = resize_operation.clone(); + drop(index_map); + resize_operation.wait(); + continue; + } + BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), + }, } - Entry::Occupied(entry) => match entry.get() { - Available(index) => index.clone(), - BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), - }, } } }; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8dd16f961..d80131681 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -423,12 +423,12 @@ impl IndexScheduler { #[cfg(test)] run.breakpoint(Breakpoint::Init); - loop { - run.wake_up.wait(); + run.wake_up.wait(); + loop { match run.tick() { - Ok(0) => (), - Ok(_) => run.wake_up.signal(), + Ok(TickOutcome::TickAgain(_)) => (), + Ok(TickOutcome::WaitForSignal) => run.wake_up.wait(), Err(e) => { log::error!("{}", e); // Wait one second when an irrecoverable error occurs. @@ -441,7 +441,6 @@ impl IndexScheduler { ) { std::thread::sleep(Duration::from_secs(1)); } - run.wake_up.signal(); } } } @@ -765,8 +764,8 @@ impl IndexScheduler { Ok(task) } - /// Register a new task comming from a dump in the scheduler. - /// By takinig a mutable ref we're pretty sure no one will ever import a dump while actix is running. + /// Register a new task coming from a dump in the scheduler. + /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. pub fn register_dumped_task( &mut self, task: TaskDump, @@ -927,7 +926,7 @@ impl IndexScheduler { /// 5. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. - fn tick(&self) -> Result { + fn tick(&self) -> Result { #[cfg(test)] { *self.run_loop_iteration.write().unwrap() += 1; @@ -938,8 +937,9 @@ impl IndexScheduler { let batch = match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { Some(batch) => batch, - None => return Ok(0), + None => return Ok(TickOutcome::WaitForSignal), }; + let index_uid = batch.index_uid().map(ToOwned::to_owned); drop(rtxn); // 1. store the starting date with the bitmap of processing tasks. @@ -1010,7 +1010,23 @@ impl IndexScheduler { // the `started_at` date times and `processings` of the current processing tasks. // This date time is used by the task cancelation to store the right `started_at` // date in the task on disk. - return Ok(0); + return Ok(TickOutcome::TickAgain(0)); + } + // If an index said it was full, we need to: + // 1. identify which index is full + // 2. close the associated environment + // 3. resize it + // 4. re-schedule tasks + Err(Error::Milli(milli::Error::UserError( + milli::UserError::MaxDatabaseSizeReached, + ))) if index_uid.is_some() => { + // fixme: add index_uid to match to avoid the unwrap + let index_uid = index_uid.unwrap(); + // fixme: handle error more gracefully? not sure when this could happen + self.index_mapper.resize_index(&wtxn, &index_uid)?; + wtxn.abort().map_err(Error::HeedTransaction)?; + + return Ok(TickOutcome::TickAgain(0)); } // In case of a failure we must get back and patch all the tasks with the error. Err(err) => { @@ -1050,7 +1066,7 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::AfterProcessing); - Ok(processed_tasks) + Ok(TickOutcome::TickAgain(processed_tasks)) } pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { @@ -1085,6 +1101,16 @@ impl IndexScheduler { } } +/// The outcome of calling the [`IndexScheduler::tick`] function. +pub enum TickOutcome { + /// The scheduler should immediately attempt another `tick`. + /// + /// The `usize` field contains the number of processed tasks. + TickAgain(usize), + /// The scheduler should wait for an external signal before attempting another `tick`. + WaitForSignal, +} + #[cfg(test)] mod tests { use std::io::{BufWriter, Seek, Write}; @@ -1680,6 +1706,105 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded"); } + #[test] + fn document_addition_and_document_deletion() { + let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + + let content = r#"[ + { "id": 1, "doggo": "jean bob" }, + { "id": 2, "catto": "jorts" }, + { "id": 3, "doggo": "bork" } + ]"#; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); + index_scheduler + .register(KindWithContent::DocumentDeletion { + index_uid: S("doggos"), + documents_ids: vec![S("1"), S("2")], + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); + + handle.advance_one_successful_batch(); // The addition AND deletion should've been batched together + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch"); + + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); + } + + #[test] + fn document_deletion_and_document_addition() { + let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + index_scheduler + .register(KindWithContent::DocumentDeletion { + index_uid: S("doggos"), + documents_ids: vec![S("1"), S("2")], + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); + + let content = r#"[ + { "id": 1, "doggo": "jean bob" }, + { "id": 2, "catto": "jorts" }, + { "id": 3, "doggo": "bork" } + ]"#; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); + + // The deletion should have failed because it can't create an index + handle.advance_one_failed_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_failing_the_deletion"); + + // The addition should works + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_last_successful_addition"); + + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); + } + #[test] fn do_not_batch_task_of_different_indexes() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/after_processing_the_batch.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/after_processing_the_batch.snap new file mode 100644 index 000000000..f70496b81 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/after_processing_the_batch.snap @@ -0,0 +1,42 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +1 {uid: 1, status: succeeded, details: { received_document_ids: 2, deleted_documents: Some(2) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +"documentDeletion" [1,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +["doggos"] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/documents.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/documents.snap new file mode 100644 index 000000000..2b56b71d1 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/documents.snap @@ -0,0 +1,9 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "id": 3, + "doggo": "bork" + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_first_task.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_first_task.snap new file mode 100644 index 000000000..35dc0b41a --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_first_task.snap @@ -0,0 +1,37 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_second_task.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_second_task.snap new file mode 100644 index 000000000..bd65a6d99 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_second_task.snap @@ -0,0 +1,40 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +"documentDeletion" [1,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/after_failing_the_deletion.snap b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/after_failing_the_deletion.snap new file mode 100644 index 000000000..2850af744 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/after_failing_the_deletion.snap @@ -0,0 +1,43 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Index `doggos` not found.", error_code: "index_not_found", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_not_found" }, details: { received_document_ids: 2, deleted_documents: Some(0) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [1,] +failed [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"documentDeletion" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/after_last_successful_addition.snap b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/after_last_successful_addition.snap new file mode 100644 index 000000000..59e18bdb0 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/after_last_successful_addition.snap @@ -0,0 +1,45 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Index `doggos` not found.", error_code: "index_not_found", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_not_found" }, details: { received_document_ids: 2, deleted_documents: Some(0) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [1,] +failed [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"documentDeletion" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +["doggos"] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/documents.snap b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/documents.snap new file mode 100644 index 000000000..8204d059b --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/documents.snap @@ -0,0 +1,17 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "id": 1, + "doggo": "jean bob" + }, + { + "id": 2, + "catto": "jorts" + }, + { + "id": 3, + "doggo": "bork" + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_first_task.snap b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_first_task.snap new file mode 100644 index 000000000..9356e6dba --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_first_task.snap @@ -0,0 +1,36 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"documentDeletion" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_second_task.snap b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_second_task.snap new file mode 100644 index 000000000..89e341184 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_second_task.snap @@ -0,0 +1,40 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"documentDeletion" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index c9b71b523..acb520513 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -439,20 +439,29 @@ impl IndexScheduler { provided_ids: received_document_ids, deleted_documents, } => { - if let Some(deleted_documents) = deleted_documents { - assert_eq!(status, Status::Succeeded); - assert!(deleted_documents <= received_document_ids as u64); - assert_eq!(kind.as_kind(), Kind::DocumentDeletion); + assert_eq!(kind.as_kind(), Kind::DocumentDeletion); + let (index_uid, documents_ids) = + if let KindWithContent::DocumentDeletion { + ref index_uid, + ref documents_ids, + } = kind + { + (index_uid, documents_ids) + } else { + unreachable!() + }; + assert_eq!(&task_index_uid.unwrap(), index_uid); - match &kind { - KindWithContent::DocumentDeletion { index_uid, documents_ids } => { - assert_eq!(&task_index_uid.unwrap(), index_uid); - assert!(documents_ids.len() >= received_document_ids); - } - _ => panic!(), + match status { + Status::Enqueued | Status::Processing => (), + Status::Succeeded => { + assert!(deleted_documents.unwrap() <= received_document_ids as u64); + assert!(documents_ids.len() == received_document_ids); + } + Status::Failed | Status::Canceled => { + assert!(deleted_documents == Some(0)); + assert!(documents_ids.len() == received_document_ids); } - } else { - assert_ne!(status, Status::Succeeded); } } Details::ClearAll { deleted_documents } => { diff --git a/milli/src/snapshot_tests.rs b/milli/src/snapshot_tests.rs index 49f9fbe92..f7f1a97e6 100644 --- a/milli/src/snapshot_tests.rs +++ b/milli/src/snapshot_tests.rs @@ -6,7 +6,7 @@ use roaring::RoaringBitmap; use crate::facet::FacetType; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupValue}; -use crate::{make_db_snap_from_iter, ExternalDocumentsIds, Index}; +use crate::{make_db_snap_from_iter, obkv_to_json, ExternalDocumentsIds, Index}; #[track_caller] pub fn default_db_snapshot_settings_for_test(name: Option<&str>) -> (insta::Settings, String) { @@ -427,8 +427,26 @@ pub fn snap_settings(index: &Index) -> String { snap } +pub fn snap_documents(index: &Index) -> String { + let mut snap = String::new(); + let rtxn = index.read_txn().unwrap(); + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let display = fields_ids_map.ids().collect::>(); + + for document in index.all_documents(&rtxn).unwrap() { + let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap(); + snap.push_str(&serde_json::to_string(&doc).unwrap()); + snap.push('\n'); + } + + snap +} + #[macro_export] macro_rules! full_snap_of_db { + ($index:ident, documents) => {{ + $crate::snapshot_tests::snap_documents(&$index) + }}; ($index:ident, settings) => {{ $crate::snapshot_tests::snap_settings(&$index) }}; diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 37af7ab6a..7b8891a7a 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -6,6 +6,7 @@ use roaring::RoaringBitmap; use super::read_u32_ne_bytes; use crate::heed_codec::CboRoaringBitmapCodec; +use crate::update::index_documents::transform::Operation; use crate::Result; pub type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> Result>; @@ -57,21 +58,6 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result> { - Ok(obkvs - .iter() - .cloned() - .reduce(|acc, current| { - let first = obkv::KvReader::new(&acc); - let second = obkv::KvReader::new(¤t); - let mut buffer = Vec::new(); - merge_two_obkvs(first, second, &mut buffer); - Cow::from(buffer) - }) - .unwrap()) -} - pub fn merge_two_obkvs(base: obkv::KvReaderU16, update: obkv::KvReaderU16, buffer: &mut Vec) { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; @@ -88,6 +74,41 @@ pub fn merge_two_obkvs(base: obkv::KvReaderU16, update: obkv::KvReaderU16, buffe writer.finish().unwrap(); } +/// Merge all the obks in the order we see them. +pub fn merge_obkvs_and_operations<'a>( + _key: &[u8], + obkvs: &[Cow<'a, [u8]>], +) -> Result> { + // [add, add, delete, add, add] + // we can ignore everything that happened before the last delete. + let starting_position = + obkvs.iter().rposition(|obkv| obkv[0] == Operation::Deletion as u8).unwrap_or(0); + + // [add, add, delete] + // if the last operation was a deletion then we simply return the deletion + if starting_position == obkvs.len() - 1 && obkvs.last().unwrap()[0] == Operation::Deletion as u8 + { + return Ok(obkvs[obkvs.len() - 1].clone()); + } + let mut buffer = Vec::new(); + + // (add, add, delete) [add, add] + // in the other case, no deletion will be encountered during the merge + let mut ret = + obkvs[starting_position..].iter().cloned().fold(Vec::new(), |mut acc, current| { + let first = obkv::KvReader::new(&acc); + let second = obkv::KvReader::new(¤t[1..]); + merge_two_obkvs(first, second, &mut buffer); + + // we want the result of the merge into our accumulator + std::mem::swap(&mut acc, &mut buffer); + acc + }); + + ret.insert(0, Operation::Addition as u8); + Ok(Cow::from(ret)) +} + pub fn merge_cbo_roaring_bitmaps<'a>( _key: &[u8], values: &[Cow<'a, [u8]>], diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index a496ccd6e..ce6a2abe9 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -13,9 +13,9 @@ pub use grenad_helpers::{ GrenadParameters, MergeableReader, }; pub use merge_functions::{ - concat_u32s_array, keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_obkvs, - merge_roaring_bitmaps, merge_two_obkvs, roaring_bitmap_from_u32s_array, - serialize_roaring_bitmap, MergeFn, + concat_u32s_array, keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, + merge_obkvs_and_operations, merge_roaring_bitmaps, merge_two_obkvs, + roaring_bitmap_from_u32s_array, serialize_roaring_bitmap, MergeFn, }; use crate::MAX_WORD_LENGTH; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 3e9edf3a2..5e547a049 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -79,6 +79,7 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a, FP, FA> { progress: FP, should_abort: FA, added_documents: u64, + deleted_documents: u64, } #[derive(Default, Debug, Clone)] @@ -122,6 +123,7 @@ where wtxn, index, added_documents: 0, + deleted_documents: 0, }) } @@ -166,6 +168,30 @@ where Ok((self, Ok(indexed_documents))) } + /// Remove a batch of documents from the current builder. + /// + /// Returns the number of documents deleted from the builder. + pub fn remove_documents( + mut self, + to_delete: Vec, + ) -> Result<(Self, StdResult)> { + // Early return when there is no document to add + if to_delete.is_empty() { + return Ok((self, Ok(0))); + } + + let deleted_documents = self + .transform + .as_mut() + .expect("Invalid document deletion state") + .remove_documents(to_delete, self.wtxn, &self.should_abort)? + as u64; + + self.deleted_documents += deleted_documents; + + Ok((self, Ok(deleted_documents))) + } + #[logging_timer::time("IndexDocuments::{}")] pub fn execute(mut self) -> Result { if self.added_documents == 0 { @@ -1879,4 +1905,328 @@ mod tests { index.add_documents(doc1).unwrap(); } + + #[test] + fn add_and_delete_documents_in_single_transform() { + let mut index = TempIndex::new(); + index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + { "id": 2, "doggo": { "name": "bob", "age": 20 } }, + { "id": 3, "name": "jean", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"3"); + + let (builder, removed) = builder.remove_documents(vec![S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"1"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 3, + number_of_documents: 2, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":1,"doggo":"kevin"} + {"id":3,"name":"jean","age":25} + "###); + } + + #[test] + fn add_update_and_delete_documents_in_single_transform() { + let mut index = TempIndex::new(); + index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + { "id": 2, "doggo": { "name": "bob", "age": 20 } }, + { "id": 3, "name": "jean", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"3"); + + let documents = documents!([ + { "id": 2, "catto": "jorts" }, + { "id": 3, "legs": 4 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"2"); + + let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"2"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 5, + number_of_documents: 1, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":3,"name":"jean","age":25,"legs":4} + "###); + } + + #[test] + fn add_document_and_in_another_transform_update_and_delete_documents() { + let mut index = TempIndex::new(); + index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + { "id": 2, "doggo": { "name": "bob", "age": 20 } }, + { "id": 3, "name": "jean", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"3"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 3, + number_of_documents: 3, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":1,"doggo":"kevin"} + {"id":2,"doggo":{"name":"bob","age":20}} + {"id":3,"name":"jean","age":25} + "###); + + // A first batch of documents has been inserted + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 2, "catto": "jorts" }, + { "id": 3, "legs": 4 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"2"); + + let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"2"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 2, + number_of_documents: 1, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":3,"name":"jean","age":25,"legs":4} + "###); + } + + #[test] + fn delete_document_and_then_add_documents_in_the_same_transform() { + let mut index = TempIndex::new(); + index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"0"); + + let documents = documents!([ + { "id": 2, "doggo": { "name": "jean", "age": 20 } }, + { "id": 3, "name": "bob", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"2"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 2, + number_of_documents: 2, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":2,"doggo":{"name":"jean","age":20}} + {"id":3,"name":"bob","age":25} + "###); + } + + #[test] + fn delete_the_same_document_multiple_time() { + let mut index = TempIndex::new(); + index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let (builder, removed) = + builder.remove_documents(vec![S("1"), S("2"), S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"0"); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + { "id": 2, "doggo": { "name": "jean", "age": 20 } }, + { "id": 3, "name": "bob", "age": 25 }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"3"); + + let (builder, removed) = + builder.remove_documents(vec![S("1"), S("2"), S("1"), S("2")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"2"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 3, + number_of_documents: 1, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":3,"name":"bob","age":25} + "###); + } + + #[test] + fn add_document_and_in_another_transform_delete_the_document_then_add_it_again() { + let mut index = TempIndex::new(); + index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let documents = documents!([ + { "id": 1, "doggo": "kevin" }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"1"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 1, + number_of_documents: 1, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":1,"doggo":"kevin"} + "###); + + // A first batch of documents has been inserted + + let mut wtxn = index.write_txn().unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || false, + ) + .unwrap(); + + let (builder, removed) = builder.remove_documents(vec![S("1")]).unwrap(); + insta::assert_display_snapshot!(removed.unwrap(), @"1"); + + let documents = documents!([ + { "id": 1, "catto": "jorts" }, + ]); + let (builder, added) = builder.add_documents(documents).unwrap(); + insta::assert_display_snapshot!(added.unwrap(), @"1"); + + let addition = builder.execute().unwrap(); + insta::assert_debug_snapshot!(addition, @r###" + DocumentAdditionResult { + indexed_documents: 1, + number_of_documents: 1, + } + "###); + wtxn.commit().unwrap(); + + db_snap!(index, documents, @r###" + {"id":1,"catto":"jorts"} + "###); + } } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 9e07e78ad..6097278a7 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -12,7 +12,9 @@ use roaring::RoaringBitmap; use serde_json::Value; use smartstring::SmartString; -use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn}; +use super::helpers::{ + create_sorter, create_writer, keep_latest_obkv, merge_obkvs_and_operations, MergeFn, +}; use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; @@ -50,8 +52,12 @@ pub struct Transform<'a, 'i> { pub index_documents_method: IndexDocumentsMethod, available_documents_ids: AvailableDocumentsIds, + // Both grenad follows the same format: + // key | value + // u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored original_sorter: grenad::Sorter, flattened_sorter: grenad::Sorter, + replaced_documents_ids: RoaringBitmap, new_documents_ids: RoaringBitmap, // To increase the cache locality and decrease the heap usage we use compact smartstring. @@ -59,6 +65,14 @@ pub struct Transform<'a, 'i> { documents_count: usize, } +/// This enum is specific to the grenad sorter stored in the transform. +/// It's used as the first byte of the grenads and tells you if the document id was an addition or a deletion. +#[repr(u8)] +pub enum Operation { + Addition, + Deletion, +} + /// Create a mapping between the field ids found in the document batch and the one that were /// already present in the index. /// @@ -94,7 +108,7 @@ impl<'a, 'i> Transform<'a, 'i> { // with the same user id must be merged or fully replaced in the same batch. let merge_function = match index_documents_method { IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv, - IndexDocumentsMethod::UpdateDocuments => merge_obkvs, + IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations, }; // We initialize the sorter with the user indexing settings. @@ -151,9 +165,7 @@ impl<'a, 'i> Transform<'a, 'i> { FA: Fn() -> bool + Sync, { let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); - let external_documents_ids = self.index.external_documents_ids(wtxn)?; - let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; let primary_key = cursor.primary_key().to_string(); @@ -161,6 +173,7 @@ impl<'a, 'i> Transform<'a, 'i> { self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; let mut obkv_buffer = Vec::new(); + let mut document_sorter_buffer = Vec::new(); let mut documents_count = 0; let mut docid_buffer: Vec = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); @@ -212,10 +225,13 @@ impl<'a, 'i> Transform<'a, 'i> { Entry::Occupied(entry) => *entry.get() as u32, Entry::Vacant(entry) => { // If the document was already in the db we mark it as a replaced document. - // It'll be deleted later. We keep its original docid to insert it in the grenad. + // It'll be deleted later. if let Some(docid) = external_documents_ids.get(entry.key()) { - self.replaced_documents_ids.insert(docid); - original_docid = Some(docid); + // If it was already in the list of replaced documents it means it was deleted + // by the remove_document method. We should starts as if it never existed. + if self.replaced_documents_ids.insert(docid) { + original_docid = Some(docid); + } } let docid = self .available_documents_ids @@ -248,26 +264,46 @@ impl<'a, 'i> Transform<'a, 'i> { skip_insertion = true; } else { // we associate the base document with the new key, everything will get merged later. - self.original_sorter.insert(docid.to_be_bytes(), base_obkv)?; + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(base_obkv); + self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(buffer) => { - self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)? + Some(flattened_obkv) => { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(&flattened_obkv); + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)? } - None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?, + None => self + .flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)?, } } } if !skip_insertion { self.new_documents_ids.insert(docid); + + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(&obkv_buffer); // We use the extracted/generated user id as the key for this document. - self.original_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?; + self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { - Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?, - None => { - self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())? + Some(flattened_obkv) => { + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_buffer.extend_from_slice(&flattened_obkv); + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)? } + None => self + .flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_buffer)?, } } documents_count += 1; @@ -293,6 +329,73 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(documents_count) } + /// The counter part of `read_documents` that removes documents either from the transform or the database. + /// It can be called before, after or in between two calls of the `read_documents`. + /// + /// It needs to update all the internal datastructure in the transform. + /// - If the document is coming from the database -> it's marked as a to_delete document + /// - If the document to remove was inserted by the `read_documents` method before AND was present in the db, + /// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it. + /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, + /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. + /// - If the document to remove was not present in either the db or the transform we do nothing. + pub fn remove_documents( + &mut self, + mut to_remove: Vec, + wtxn: &mut heed::RwTxn, + should_abort: FA, + ) -> Result + where + FA: Fn() -> bool + Sync, + { + // there may be duplicates in the documents to remove. + to_remove.sort_unstable(); + to_remove.dedup(); + + let external_documents_ids = self.index.external_documents_ids(wtxn)?; + + let mut documents_deleted = 0; + for to_remove in to_remove { + if should_abort() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + + match self.new_external_documents_ids_builder.entry((*to_remove).into()) { + // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. + Entry::Occupied(entry) => { + let doc_id = *entry.get() as u32; + self.original_sorter + .insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?; + self.flattened_sorter + .insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?; + + // we must NOT update the list of replaced_documents_ids + // Either: + // 1. It's already in it and there is nothing to do + // 2. It wasn't in it because the document was created by a previous batch and since + // we're removing it there is nothing to do. + self.new_documents_ids.remove(doc_id); + entry.remove_entry(); + } + Entry::Vacant(entry) => { + // If the document was already in the db we mark it as a `to_delete` document. + // It'll be deleted later. We don't need to push anything to the sorters. + if let Some(docid) = external_documents_ids.get(entry.key()) { + self.replaced_documents_ids.insert(docid); + } else { + // if the document is nowehere to be found, there is nothing to do and we must NOT + // increment the count of documents_deleted + continue; + } + } + }; + + documents_deleted += 1; + } + + Ok(documents_deleted) + } + // Flatten a document from the fields ids map contained in self and insert the new // created fields. Returns `None` if the document doesn't need to be flattened. fn flatten_from_fields_ids_map(&mut self, obkv: KvReader) -> Result>> { @@ -487,6 +590,11 @@ impl<'a, 'i> Transform<'a, 'i> { let mut documents_count = 0; while let Some((key, val)) = iter.next()? { + if val[0] == Operation::Deletion as u8 { + continue; + } + let val = &val[1..]; + // send a callback to show at which step we are documents_count += 1; progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { @@ -518,9 +626,18 @@ impl<'a, 'i> Transform<'a, 'i> { self.indexer_settings.chunk_compression_level, tempfile::tempfile()?, ); - // Once we have written all the documents into the final sorter, we write the documents - // into this writer, extract the file and reset the seek to be able to read it again. - self.flattened_sorter.write_into_stream_writer(&mut writer)?; + + // Once we have written all the documents into the final sorter, we write the nested documents + // into this writer. + // We get rids of the `Operation` byte and skip the deleted documents as well. + let mut iter = self.flattened_sorter.into_stream_merger_iter()?; + while let Some((key, val)) = iter.next()? { + if val[0] == Operation::Deletion as u8 { + continue; + } + let val = &val[1..]; + writer.insert(key, val)?; + } let mut flattened_documents = writer.into_inner()?; flattened_documents.rewind()?; @@ -701,3 +818,45 @@ impl TransformOutput { .collect()) } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn merge_obkvs() { + let mut doc_0 = Vec::new(); + let mut kv_writer = KvWriter::new(&mut doc_0); + kv_writer.insert(0_u8, [0]).unwrap(); + kv_writer.finish().unwrap(); + doc_0.insert(0, Operation::Addition as u8); + + let ret = merge_obkvs_and_operations(&[], &[Cow::from(doc_0.as_slice())]).unwrap(); + assert_eq!(*ret, doc_0); + + let ret = merge_obkvs_and_operations( + &[], + &[Cow::from([Operation::Deletion as u8].as_slice()), Cow::from(doc_0.as_slice())], + ) + .unwrap(); + assert_eq!(*ret, doc_0); + + let ret = merge_obkvs_and_operations( + &[], + &[Cow::from(doc_0.as_slice()), Cow::from([Operation::Deletion as u8].as_slice())], + ) + .unwrap(); + assert_eq!(*ret, [Operation::Deletion as u8]); + + let ret = merge_obkvs_and_operations( + &[], + &[ + Cow::from([Operation::Addition as u8, 1].as_slice()), + Cow::from([Operation::Deletion as u8].as_slice()), + Cow::from(doc_0.as_slice()), + ], + ) + .unwrap(); + assert_eq!(*ret, doc_0); + } +}