From 803f2157aff23e2f98899330f18a118795022d00 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 14 Sep 2022 00:34:02 +0200 Subject: [PATCH] split the DocumentAdditionOrUpdate in two tasks; DocumentAddition and DocumentUpdate --- index-scheduler/src/autobatcher.rs | 128 +++++++++++++++++++++++------ index-scheduler/src/batch.rs | 13 +-- index-scheduler/src/task.rs | 26 ++++-- 3 files changed, 128 insertions(+), 39 deletions(-) diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 2a85792ac..9a57dbe7b 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -9,6 +9,9 @@ pub enum BatchKind { DocumentAddition { addition_ids: Vec, }, + DocumentUpdate { + update_ids: Vec, + }, DocumentDeletion { deletion_ids: Vec, }, @@ -20,6 +23,10 @@ pub enum BatchKind { settings_ids: Vec, addition_ids: Vec, }, + SettingsAndDocumentUpdate { + settings_ids: Vec, + update_ids: Vec, + }, Settings { settings_ids: Vec, }, @@ -50,12 +57,18 @@ impl BatchKind { Kind::IndexRename => (BatchKind::IndexRename { id: task_id }, true), Kind::IndexSwap => (BatchKind::IndexSwap { id: task_id }, true), Kind::DocumentClear => (BatchKind::DocumentClear { ids: vec![task_id] }, false), - Kind::DocumentAdditionOrUpdate => ( + Kind::DocumentAddition => ( BatchKind::DocumentAddition { addition_ids: vec![task_id], }, false, ), + Kind::DocumentUpdate => ( + BatchKind::DocumentUpdate { + update_ids: vec![task_id], + }, + false, + ), Kind::DocumentDeletion => ( BatchKind::DocumentDeletion { deletion_ids: vec![task_id], @@ -87,6 +100,9 @@ impl BatchKind { | BatchKind::DocumentAddition { addition_ids: mut ids, } + | BatchKind::DocumentUpdate { + update_ids: mut ids, + } | BatchKind::DocumentDeletion { deletion_ids: mut ids, } @@ -106,6 +122,10 @@ impl BatchKind { | BatchKind::SettingsAndDocumentAddition { addition_ids: mut ids, settings_ids: mut other, + } + | BatchKind::SettingsAndDocumentUpdate { + update_ids: mut ids, + settings_ids: mut other, }, Kind::IndexDeletion, ) => { @@ -123,34 +143,57 @@ impl BatchKind { } ( this @ BatchKind::DocumentClear { .. }, - Kind::DocumentAdditionOrUpdate | Kind::Settings, + Kind::DocumentAddition | Kind::DocumentUpdate | Kind::Settings, ) => ControlFlow::Break(this), - (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentClear) => { - addition_ids.push(id); - ControlFlow::Continue(BatchKind::DocumentClear { ids: addition_ids }) + ( + BatchKind::DocumentAddition { + addition_ids: mut ids, + } + | BatchKind::DocumentUpdate { + update_ids: mut ids, + }, + Kind::DocumentClear, + ) => { + ids.push(id); + ControlFlow::Continue(BatchKind::DocumentClear { ids }) } - (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAdditionOrUpdate) => { + // we can autobatch the same kind of document additions / updates + (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAddition) => { addition_ids.push(id); ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids }) } - (this @ BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => { - ControlFlow::Break(this) + (BatchKind::DocumentUpdate { mut update_ids }, Kind::DocumentUpdate) => { + update_ids.push(id); + ControlFlow::Continue(BatchKind::DocumentUpdate { update_ids }) } + // but we can't autobatch documents if it's not the same kind + // this match branch MUST be AFTER the previous one + ( + this @ BatchKind::DocumentAddition { .. } | this @ BatchKind::DocumentUpdate { .. }, + Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate, + ) => ControlFlow::Break(this), (BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => { ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { settings_ids: vec![id], addition_ids, }) } + (BatchKind::DocumentUpdate { update_ids }, Kind::Settings) => { + ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate { + settings_ids: vec![id], + update_ids, + }) + } (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentClear) => { deletion_ids.push(id); ControlFlow::Continue(BatchKind::DocumentClear { ids: deletion_ids }) } - (this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAdditionOrUpdate) => { - ControlFlow::Break(this) - } + ( + this @ BatchKind::DocumentDeletion { .. }, + Kind::DocumentAddition | Kind::DocumentUpdate, + ) => ControlFlow::Break(this), (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => { deletion_ids.push(id); ControlFlow::Continue(BatchKind::DocumentDeletion { deletion_ids }) @@ -163,10 +206,10 @@ impl BatchKind { other: vec![id], }) } - (this @ BatchKind::Settings { .. }, Kind::DocumentAdditionOrUpdate) => { - ControlFlow::Break(this) - } - (this @ BatchKind::Settings { .. }, Kind::DocumentDeletion) => ControlFlow::Break(this), + ( + this @ BatchKind::Settings { .. }, + Kind::DocumentAddition | Kind::DocumentUpdate | Kind::DocumentDeletion, + ) => ControlFlow::Break(this), (BatchKind::Settings { mut settings_ids }, Kind::Settings) => { settings_ids.push(id); ControlFlow::Continue(BatchKind::Settings { settings_ids }) @@ -185,9 +228,10 @@ impl BatchKind { settings_ids, }) } - (this @ BatchKind::ClearAndSettings { .. }, Kind::DocumentAdditionOrUpdate) => { - ControlFlow::Break(this) - } + ( + this @ BatchKind::ClearAndSettings { .. }, + Kind::DocumentAddition | Kind::DocumentUpdate, + ) => ControlFlow::Break(this), ( BatchKind::ClearAndSettings { mut other, @@ -217,23 +261,29 @@ impl BatchKind { ( BatchKind::SettingsAndDocumentAddition { settings_ids, - mut addition_ids, + addition_ids: mut other, + } + | BatchKind::SettingsAndDocumentUpdate { + settings_ids, + update_ids: mut other, }, Kind::DocumentClear, ) => { - addition_ids.push(id); + other.push(id); ControlFlow::Continue(BatchKind::ClearAndSettings { settings_ids, - other: addition_ids, + other, }) } + + // we can batch the settings with a kind of document operation with the same kind of document operation ( BatchKind::SettingsAndDocumentAddition { mut addition_ids, settings_ids, }, - Kind::DocumentAdditionOrUpdate, + Kind::DocumentAddition, ) => { addition_ids.push(id); ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { @@ -241,9 +291,26 @@ impl BatchKind { settings_ids, }) } - (this @ BatchKind::SettingsAndDocumentAddition { .. }, Kind::DocumentDeletion) => { - ControlFlow::Break(this) + ( + BatchKind::SettingsAndDocumentUpdate { + mut update_ids, + settings_ids, + }, + Kind::DocumentUpdate, + ) => { + update_ids.push(id); + ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate { + update_ids, + settings_ids, + }) } + // But we can't batch a settings and a doc op with another doc op + // this MUST be AFTER the two previous branch + ( + this @ BatchKind::SettingsAndDocumentAddition { .. } + | this @ BatchKind::SettingsAndDocumentUpdate { .. }, + Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate, + ) => ControlFlow::Break(this), ( BatchKind::SettingsAndDocumentAddition { mut settings_ids, @@ -257,6 +324,19 @@ impl BatchKind { addition_ids, }) } + ( + BatchKind::SettingsAndDocumentUpdate { + mut settings_ids, + update_ids, + }, + Kind::Settings, + ) => { + settings_ids.push(id); + ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate { + settings_ids, + update_ids, + }) + } (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), ( BatchKind::IndexCreation { .. } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 73bc9a9a9..ba37ba20a 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -45,6 +45,7 @@ impl IndexScheduler { match batch { BatchKind::DocumentClear { ids } => todo!(), BatchKind::DocumentAddition { addition_ids } => todo!(), + BatchKind::DocumentUpdate { update_ids } => todo!(), BatchKind::DocumentDeletion { deletion_ids } => todo!(), BatchKind::ClearAndSettings { other, @@ -74,17 +75,13 @@ impl IndexScheduler { .collect::>>()?; let primary_key = match &document_addition_tasks[0].kind { - KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => { - primary_key.clone() - } + KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(), _ => unreachable!(), }; let content_files = document_addition_tasks .iter() .map(|task| match task.kind { - KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => { - content_file - } + KindWithContent::DocumentAddition { content_file, .. } => content_file, _ => unreachable!(), }) .collect(); @@ -106,6 +103,10 @@ impl IndexScheduler { settings_tasks, })) } + BatchKind::SettingsAndDocumentUpdate { + update_ids, + settings_ids, + } => todo!(), BatchKind::Settings { settings_ids } => todo!(), BatchKind::IndexCreation { id } => todo!(), BatchKind::IndexDeletion { ids } => todo!(), diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 40197e81c..bd556685f 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -56,9 +56,15 @@ impl Task { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { - DocumentAdditionOrUpdate { + DocumentAddition { + index_uid: String, + primary_key: Option, + content_file: Uuid, + documents_count: usize, + allow_index_creation: bool, + }, + DocumentUpdate { index_uid: String, - merge_strategy: IndexDocumentsMethod, primary_key: Option, content_file: Uuid, documents_count: usize, @@ -108,7 +114,8 @@ pub enum KindWithContent { impl KindWithContent { pub fn as_kind(&self) -> Kind { match self { - KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate, + KindWithContent::DocumentAddition { .. } => Kind::DocumentAddition, + KindWithContent::DocumentUpdate { .. } => Kind::DocumentUpdate, KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, KindWithContent::DocumentClear { .. } => Kind::DocumentClear, KindWithContent::Settings { .. } => Kind::Settings, @@ -127,7 +134,7 @@ impl KindWithContent { use KindWithContent::*; match self { - DocumentAdditionOrUpdate { .. } => { + DocumentAddition { .. } | DocumentUpdate { .. } => { // TODO: TAMO: persist the file // content_file.persist(); Ok(()) @@ -150,13 +157,12 @@ impl KindWithContent { use KindWithContent::*; match self { - DocumentAdditionOrUpdate { .. } => { + DocumentAddition { .. } | DocumentUpdate { .. } => { // TODO: TAMO: delete the file // content_file.delete(); Ok(()) } - DocumentAdditionOrUpdate { .. } - | IndexCreation { .. } + IndexCreation { .. } | DocumentDeletion { .. } | DocumentClear { .. } | Settings { .. } @@ -175,7 +181,8 @@ impl KindWithContent { match self { DumpExport { .. } | Snapshot | CancelTask { .. } => None, - DocumentAdditionOrUpdate { index_uid, .. } + DocumentAddition { index_uid, .. } + | DocumentUpdate { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentClear { index_uid } | Settings { index_uid, .. } @@ -194,7 +201,8 @@ impl KindWithContent { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Kind { - DocumentAdditionOrUpdate, + DocumentAddition, + DocumentUpdate, DocumentDeletion, DocumentClear, Settings,