From a0588d6b94a95c05452c857c38aad9f2554792ad Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 13 Sep 2022 14:59:03 +0200 Subject: [PATCH] finishes the global skelton of the auto-batcher --- index-scheduler/src/autobatcher.rs | 178 ++++++++++++++++++----------- index-scheduler/src/batch.rs | 10 +- 2 files changed, 118 insertions(+), 70 deletions(-) diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 26842bb4d..3512b1b80 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -1,3 +1,5 @@ +use std::ops::ControlFlow; + use crate::{task::Kind, TaskId}; pub enum BatchKind { @@ -68,136 +70,181 @@ impl BatchKind { } /// Return true if you must stop. - fn accumulate(&mut self, id: TaskId, kind: Kind) -> bool { + fn accumulate(mut self, id: TaskId, kind: Kind) -> ControlFlow { match (self, kind) { - // must handle the deleteIndex - (_, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => true, - - (BatchKind::ClearAll { ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => { + // We don't batch any of these operations + (this, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => { + ControlFlow::Break(this) + } + // The index deletion can batch with everything but must stop after + ( + BatchKind::ClearAll { mut ids } + | BatchKind::DocumentAddition { + addition_ids: mut ids, + } + | BatchKind::DocumentDeletion { + deletion_ids: mut ids, + } + | BatchKind::Settings { + settings_ids: mut ids, + }, + Kind::DeleteIndex, + ) => { ids.push(id); - false + ControlFlow::Break(BatchKind::DeleteIndex { ids }) } - (BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => true, - (BatchKind::DocumentAddition { addition_ids }, Kind::ClearAllDocuments) => { - addition_ids.push(id); - *self = BatchKind::ClearAll { - ids: addition_ids.clone(), - }; - false + ( + BatchKind::ClearAllAndSettings { + settings_ids: mut ids, + mut other, + } + | BatchKind::SettingsAndDocumentAddition { + addition_ids: mut ids, + settings_ids: mut other, + }, + Kind::DeleteIndex, + ) => { + ids.push(id); + ids.append(&mut other); + ControlFlow::Break(BatchKind::DeleteIndex { ids }) } - (BatchKind::DocumentAddition { addition_ids }, Kind::DocumentAddition) => { - addition_ids.push(id); - false + (BatchKind::ClearAll { mut ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => { + ids.push(id); + ControlFlow::Continue(BatchKind::ClearAll { ids }) + } + (this @ BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => { + ControlFlow::Break(this) + } + (BatchKind::DocumentAddition { mut addition_ids }, Kind::ClearAllDocuments) => { + addition_ids.push(id); + ControlFlow::Continue(BatchKind::ClearAll { ids: addition_ids }) + } + + (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAddition) => { + addition_ids.push(id); + ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids }) + } + (this @ BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => { + ControlFlow::Break(this) } - (BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => true, (BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => { - *self = BatchKind::SettingsAndDocumentAddition { + ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { settings_ids: vec![id], - addition_ids: addition_ids.clone(), - }; - false + addition_ids, + }) } - (BatchKind::DocumentDeletion { deletion_ids }, Kind::ClearAllDocuments) => { + (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::ClearAllDocuments) => { deletion_ids.push(id); - *self = BatchKind::ClearAll { - ids: deletion_ids.clone(), - }; - false + ControlFlow::Continue(BatchKind::ClearAll { ids: deletion_ids }) } - (BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => true, - (BatchKind::DocumentDeletion { deletion_ids }, Kind::DocumentDeletion) => { + (this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => { + ControlFlow::Break(this) + } + (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => { deletion_ids.push(id); - false + ControlFlow::Continue(BatchKind::DocumentDeletion { deletion_ids }) } - (BatchKind::DocumentDeletion { .. }, Kind::Settings) => true, + (this @ BatchKind::DocumentDeletion { .. }, Kind::Settings) => ControlFlow::Break(this), (BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => { - *self = BatchKind::ClearAllAndSettings { + ControlFlow::Continue(BatchKind::ClearAllAndSettings { settings_ids: settings_ids.clone(), other: vec![id], - }; - false + }) } - (BatchKind::Settings { .. }, Kind::DocumentAddition) => true, - (BatchKind::Settings { .. }, Kind::DocumentDeletion) => true, - (BatchKind::Settings { settings_ids }, Kind::Settings) => { + (this @ BatchKind::Settings { .. }, Kind::DocumentAddition) => ControlFlow::Break(this), + (this @ BatchKind::Settings { .. }, Kind::DocumentDeletion) => ControlFlow::Break(this), + (BatchKind::Settings { mut settings_ids }, Kind::Settings) => { settings_ids.push(id); - false + ControlFlow::Continue(BatchKind::Settings { settings_ids }) } ( BatchKind::ClearAllAndSettings { - other, + mut other, settings_ids, }, Kind::ClearAllDocuments, ) => { other.push(id); - false + ControlFlow::Continue(BatchKind::ClearAllAndSettings { + other, + settings_ids, + }) + } + (this @ BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => { + ControlFlow::Break(this) } - (BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => true, ( BatchKind::ClearAllAndSettings { - other, + mut other, settings_ids, }, Kind::DocumentDeletion, ) => { other.push(id); - false + ControlFlow::Continue(BatchKind::ClearAllAndSettings { + other, + settings_ids, + }) } ( BatchKind::ClearAllAndSettings { - settings_ids, + mut settings_ids, other, }, Kind::Settings, ) => { settings_ids.push(id); - false + ControlFlow::Continue(BatchKind::ClearAllAndSettings { + other, + settings_ids, + }) } ( BatchKind::SettingsAndDocumentAddition { settings_ids, - addition_ids, + mut addition_ids, }, Kind::ClearAllDocuments, ) => { addition_ids.push(id); - *self = BatchKind::ClearAllAndSettings { - settings_ids: settings_ids.clone(), - other: addition_ids.clone(), - }; - false + + ControlFlow::Continue(BatchKind::ClearAllAndSettings { + settings_ids, + other: addition_ids, + }) } ( BatchKind::SettingsAndDocumentAddition { + mut addition_ids, settings_ids, - addition_ids, }, Kind::DocumentAddition, ) => { addition_ids.push(id); - false + ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { + addition_ids, + settings_ids, + }) + } + (this @ BatchKind::SettingsAndDocumentAddition { .. }, Kind::DocumentDeletion) => { + ControlFlow::Break(this) } ( BatchKind::SettingsAndDocumentAddition { - settings_ids, - addition_ids, - }, - Kind::DocumentDeletion, - ) => true, - ( - BatchKind::SettingsAndDocumentAddition { - settings_ids, + mut settings_ids, addition_ids, }, Kind::Settings, ) => { settings_ids.push(id); - false + ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }) } (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), ( @@ -222,10 +269,11 @@ pub fn autobatch(enqueued: Vec<(TaskId, Kind)>) -> Option { } for (id, kind) in enqueued { - if acc.accumulate(id, kind) { - break; - } + acc = match acc.accumulate(id, kind) { + ControlFlow::Continue(acc) => acc, + ControlFlow::Break(acc) => return Some(acc), + }; } - Some(acc) + None } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 1cf6e5bda..d555bee9d 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -72,8 +72,8 @@ impl IndexScheduler { }) .collect::>>()?; - let primary_key = match document_addition_tasks[0].kind { - KindWithContent::DocumentAddition { primary_key, .. } => primary_key, + let primary_key = match &document_addition_tasks[0].kind { + KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(), _ => unreachable!(), }; let content_files = document_addition_tasks @@ -86,7 +86,7 @@ impl IndexScheduler { let settings = settings_tasks .iter() - .map(|task| match task.kind { + .map(|task| match &task.kind { KindWithContent::Settings { new_settings, .. } => new_settings.to_string(), _ => unreachable!(), }) @@ -217,11 +217,11 @@ impl IndexScheduler { let ret = index.update_documents( IndexDocumentsMethod::ReplaceDocuments, primary_key, - self.file_store, + self.file_store.clone(), content_files.into_iter(), )?; - for (ret, task) in ret.iter().zip(document_addition_tasks) { + for (ret, mut task) in ret.iter().zip(document_addition_tasks.into_iter()) { match ret { Ok(ret) => task.info = Some(format!("{:?}", ret)), Err(err) => task.error = Some(err.to_string()),