From 448f44f63100a89ce3ba33bc14401e0b24bb6f53 Mon Sep 17 00:00:00 2001 From: Irevoire Date: Fri, 9 Sep 2022 12:16:19 +0200 Subject: [PATCH] move the autobatcher logic to another file --- index-scheduler/src/autobatcher.rs | 231 +++++++++++++++++++ index-scheduler/src/batch.rs | 354 +++++------------------------ index-scheduler/src/lib.rs | 6 +- 3 files changed, 289 insertions(+), 302 deletions(-) create mode 100644 index-scheduler/src/autobatcher.rs diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs new file mode 100644 index 000000000..26842bb4d --- /dev/null +++ b/index-scheduler/src/autobatcher.rs @@ -0,0 +1,231 @@ +use crate::{task::Kind, TaskId}; + +pub enum BatchKind { + ClearAll { + ids: Vec, + }, + DocumentAddition { + addition_ids: Vec, + }, + DocumentDeletion { + deletion_ids: Vec, + }, + ClearAllAndSettings { + other: Vec, + settings_ids: Vec, + }, + SettingsAndDocumentAddition { + settings_ids: Vec, + addition_ids: Vec, + }, + Settings { + settings_ids: Vec, + }, + DeleteIndex { + ids: Vec, + }, + CreateIndex { + id: TaskId, + }, + SwapIndex { + id: TaskId, + }, + RenameIndex { + id: TaskId, + }, +} + +impl BatchKind { + /// return true if you must stop right there. + pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) { + match kind { + Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true), + Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true), + Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true), + Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true), + Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false), + Kind::DocumentAddition => ( + BatchKind::DocumentAddition { + addition_ids: vec![task_id], + }, + false, + ), + Kind::DocumentDeletion => ( + BatchKind::DocumentDeletion { + deletion_ids: vec![task_id], + }, + false, + ), + Kind::Settings => ( + BatchKind::Settings { + settings_ids: vec![task_id], + }, + false, + ), + + Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(), + } + } + + /// Return true if you must stop. + fn accumulate(&mut self, id: TaskId, kind: Kind) -> bool { + match (self, kind) { + // must handle the deleteIndex + (_, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => true, + + (BatchKind::ClearAll { ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => { + ids.push(id); + false + } + (BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => true, + (BatchKind::DocumentAddition { addition_ids }, Kind::ClearAllDocuments) => { + addition_ids.push(id); + *self = BatchKind::ClearAll { + ids: addition_ids.clone(), + }; + false + } + + (BatchKind::DocumentAddition { addition_ids }, Kind::DocumentAddition) => { + addition_ids.push(id); + false + } + (BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => true, + (BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => { + *self = BatchKind::SettingsAndDocumentAddition { + settings_ids: vec![id], + addition_ids: addition_ids.clone(), + }; + false + } + + (BatchKind::DocumentDeletion { deletion_ids }, Kind::ClearAllDocuments) => { + deletion_ids.push(id); + *self = BatchKind::ClearAll { + ids: deletion_ids.clone(), + }; + false + } + (BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => true, + (BatchKind::DocumentDeletion { deletion_ids }, Kind::DocumentDeletion) => { + deletion_ids.push(id); + false + } + (BatchKind::DocumentDeletion { .. }, Kind::Settings) => true, + + (BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => { + *self = BatchKind::ClearAllAndSettings { + settings_ids: settings_ids.clone(), + other: vec![id], + }; + false + } + (BatchKind::Settings { .. }, Kind::DocumentAddition) => true, + (BatchKind::Settings { .. }, Kind::DocumentDeletion) => true, + (BatchKind::Settings { settings_ids }, Kind::Settings) => { + settings_ids.push(id); + false + } + + ( + BatchKind::ClearAllAndSettings { + other, + settings_ids, + }, + Kind::ClearAllDocuments, + ) => { + other.push(id); + false + } + (BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => true, + ( + BatchKind::ClearAllAndSettings { + other, + settings_ids, + }, + Kind::DocumentDeletion, + ) => { + other.push(id); + false + } + ( + BatchKind::ClearAllAndSettings { + settings_ids, + other, + }, + Kind::Settings, + ) => { + settings_ids.push(id); + false + } + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::ClearAllDocuments, + ) => { + addition_ids.push(id); + *self = BatchKind::ClearAllAndSettings { + settings_ids: settings_ids.clone(), + other: addition_ids.clone(), + }; + false + } + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::DocumentAddition, + ) => { + addition_ids.push(id); + false + } + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::DocumentDeletion, + ) => true, + ( + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + }, + Kind::Settings, + ) => { + settings_ids.push(id); + false + } + (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), + ( + BatchKind::CreateIndex { .. } + | BatchKind::DeleteIndex { .. } + | BatchKind::SwapIndex { .. } + | BatchKind::RenameIndex { .. }, + _, + ) => { + unreachable!() + } + } + } +} + +pub fn autobatch(enqueued: Vec<(TaskId, Kind)>) -> Option { + let mut enqueued = enqueued.into_iter(); + let (id, kind) = enqueued.next()?; + let (mut acc, is_finished) = BatchKind::new(id, kind); + if is_finished { + return Some(acc); + } + + for (id, kind) in enqueued { + if acc.accumulate(id, kind) { + break; + } + } + + Some(acc) +} diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 0b1b0f20d..8adba7534 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,7 +1,4 @@ -use crate::{ - task::{KindWithContent, Status}, - Error, IndexScheduler, Result, TaskId, -}; +use crate::{autobatcher::BatchKind, task::Status, Error, IndexScheduler, Result, TaskId}; use milli::{heed::RoTxn, update::IndexDocumentsMethod}; use uuid::Uuid; @@ -14,47 +11,6 @@ pub(crate) enum Batch { IndexSpecific { index_uid: String, kind: BatchKind }, } -impl IndexScheduler { - /* - pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result> { - match batch { - Batch::DocumentAddition { - tasks, - primary_key, - content_files, - index_uid, - } => { - let index = self.create_index(wtxn, &index_uid)?; - let ret = index.update_documents( - IndexDocumentsMethod::UpdateDocuments, - primary_key, - self.file_store, - content_files, - )?; - - assert_eq!(ret.len(), tasks.len(), "Update documents must return the same number of `Result` than the number of tasks."); - - Ok(tasks - .into_iter() - .zip(ret) - .map(|(mut task, res)| match res { - Ok(info) => { - task.status = Status::Succeeded; - task.info = Some(info.to_string()); - } - Err(error) => { - task.status = Status::Failed; - task.error = Some(error.to_string()); - } - }) - .collect()) - } - _ => unreachable!(), - } - } - */ -} - impl IndexScheduler { /// Create the next batch to be processed; /// 1. We get the *last* task to cancel. @@ -109,12 +65,12 @@ impl IndexScheduler { }) .collect::>>()?; - return Ok( - autobatcher(enqueued).map(|batch_kind| Batch::IndexSpecific { + return Ok(crate::autobatcher::autobatch(enqueued).map(|batch_kind| { + Batch::IndexSpecific { index_uid: index_name.to_string(), kind: batch_kind, - }), - ); + } + })); } // If we found no tasks then we were notified for something that got autobatched @@ -122,33 +78,60 @@ impl IndexScheduler { Ok(None) } - /// Batch all the consecutive tasks coming next that shares the same `Kind` - /// for a specific index. There *MUST* be at least ONE task of this kind. - fn batch_contiguous_kind(&self, rtxn: &RoTxn, index: &str, kind: Kind) -> Result { - let enqueued = &self.get_status(rtxn, Status::Enqueued)?; + pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result> { + match batch { + Batch::IndexSpecific { index_uid, kind } => { + let index = create_index(); + match kind { + BatchKind::ClearAll { ids } => todo!(), + BatchKind::DocumentAddition { addition_ids } => { + let index = self.create_index(wtxn, &index_uid)?; + let ret = index.update_documents( + IndexDocumentsMethod::UpdateDocuments, + None, // TODO primary key + self.file_store, + content_files, + )?; - // [1, 2, 4, 5] - let index_tasks = self.get_index(rtxn, &index)? & enqueued; - // [1, 2, 5] - let tasks_kind = &index_tasks & self.get_kind(rtxn, kind)?; - // [4] - let not_kind = &index_tasks - &tasks_kind; + assert_eq!(ret.len(), tasks.len(), "Update documents must return the same number of `Result` than the number of tasks."); - // [1, 2] - let mut to_process = tasks_kind.clone(); - if let Some(max) = not_kind.max() { - // it's safe to unwrap since we already ensured there - // was AT LEAST one task with the document addition tasks_kind. - to_process.remove_range(tasks_kind.min().unwrap()..max); + Ok(tasks + .into_iter() + .zip(ret) + .map(|(mut task, res)| match res { + Ok(info) => { + task.status = Status::Succeeded; + task.info = Some(info.to_string()); + } + Err(error) => { + task.status = Status::Failed; + task.error = Some(error.to_string()); + } + }) + .collect()) + } + BatchKind::DocumentDeletion { deletion_ids } => todo!(), + BatchKind::ClearAllAndSettings { + other, + settings_ids, + } => todo!(), + BatchKind::SettingsAndDocumentAddition { + settings_ids, + addition_ids, + } => todo!(), + BatchKind::Settings { settings_ids } => todo!(), + BatchKind::DeleteIndex { ids } => todo!(), + BatchKind::CreateIndex { id } => todo!(), + BatchKind::SwapIndex { id } => todo!(), + BatchKind::RenameIndex { id } => todo!(), + } + } + _ => unreachable!(), } - - Ok(Batch::Contiguous { - tasks: self.get_existing_tasks(rtxn, to_process)?, - kind, - }) } } +/* impl Batch { pub fn task_ids(&self) -> impl IntoIterator + '_ { match self { @@ -162,233 +145,4 @@ impl Batch { } } } - -pub(crate) enum BatchKind { - ClearAll { - ids: Vec, - }, - DocumentAddition { - addition_ids: Vec, - }, - DocumentDeletion { - deletion_ids: Vec, - }, - ClearAllAndSettings { - other: Vec, - settings_ids: Vec, - }, - SettingsAndDocumentAddition { - settings_ids: Vec, - addition_ids: Vec, - }, - Settings { - settings_ids: Vec, - }, - DeleteIndex { - ids: Vec, - }, - CreateIndex { - id: TaskId, - }, - SwapIndex { - id: TaskId, - }, - RenameIndex { - id: TaskId, - }, -} - -impl BatchKind { - /// return true if you must stop right there. - pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) { - match kind { - Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true), - Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true), - Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true), - Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true), - Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false), - Kind::DocumentAddition => ( - BatchKind::DocumentAddition { - addition_ids: vec![task_id], - }, - false, - ), - Kind::DocumentDeletion => ( - BatchKind::DocumentDeletion { - deletion_ids: vec![task_id], - }, - false, - ), - Kind::Settings => ( - BatchKind::Settings { - settings_ids: vec![task_id], - }, - false, - ), - - Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(), - } - } - - /// Return true if you must stop. - fn accumulate(&mut self, id: TaskId, kind: Kind) -> bool { - match (self, kind) { - // must handle the deleteIndex - (_, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => true, - - (BatchKind::ClearAll { ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => { - ids.push(id); - false - } - (BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => true, - (BatchKind::DocumentAddition { addition_ids }, Kind::ClearAllDocuments) => { - addition_ids.push(id); - *self = BatchKind::ClearAll { - ids: addition_ids.clone(), - }; - false - } - - (BatchKind::DocumentAddition { addition_ids }, Kind::DocumentAddition) => { - addition_ids.push(id); - false - } - (BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => true, - (BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => { - *self = BatchKind::SettingsAndDocumentAddition { - settings_ids: vec![id], - addition_ids: addition_ids.clone(), - }; - false - } - - (BatchKind::DocumentDeletion { deletion_ids }, Kind::ClearAllDocuments) => { - deletion_ids.push(id); - *self = BatchKind::ClearAll { - ids: deletion_ids.clone(), - }; - false - } - (BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => true, - (BatchKind::DocumentDeletion { deletion_ids }, Kind::DocumentDeletion) => { - deletion_ids.push(id); - false - } - (BatchKind::DocumentDeletion { .. }, Kind::Settings) => true, - - (BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => { - *self = BatchKind::ClearAllAndSettings { - settings_ids: settings_ids.clone(), - other: vec![id], - }; - false - } - (BatchKind::Settings { .. }, Kind::DocumentAddition) => true, - (BatchKind::Settings { .. }, Kind::DocumentDeletion) => true, - (BatchKind::Settings { settings_ids }, Kind::Settings) => { - settings_ids.push(id); - false - } - - ( - BatchKind::ClearAllAndSettings { - other, - settings_ids, - }, - Kind::ClearAllDocuments, - ) => { - other.push(id); - false - } - (BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => true, - ( - BatchKind::ClearAllAndSettings { - other, - settings_ids, - }, - Kind::DocumentDeletion, - ) => { - other.push(id); - false - } - ( - BatchKind::ClearAllAndSettings { - settings_ids, - other, - }, - Kind::Settings, - ) => { - settings_ids.push(id); - false - } - ( - BatchKind::SettingsAndDocumentAddition { - settings_ids, - addition_ids, - }, - Kind::ClearAllDocuments, - ) => { - addition_ids.push(id); - *self = BatchKind::ClearAllAndSettings { - settings_ids: settings_ids.clone(), - other: addition_ids.clone(), - }; - false - } - ( - BatchKind::SettingsAndDocumentAddition { - settings_ids, - addition_ids, - }, - Kind::DocumentAddition, - ) => { - addition_ids.push(id); - false - } - ( - BatchKind::SettingsAndDocumentAddition { - settings_ids, - addition_ids, - }, - Kind::DocumentDeletion, - ) => true, - ( - BatchKind::SettingsAndDocumentAddition { - settings_ids, - addition_ids, - }, - Kind::Settings, - ) => { - settings_ids.push(id); - false - } - (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), - ( - BatchKind::CreateIndex { .. } - | BatchKind::DeleteIndex { .. } - | BatchKind::SwapIndex { .. } - | BatchKind::RenameIndex { .. }, - _, - ) => { - unreachable!() - } - } - } -} - -pub fn autobatcher(enqueued: Vec<(TaskId, Kind)>) -> Option { - let mut enqueued = enqueued.into_iter(); - let (id, kind) = enqueued.next()?; - let (mut acc, is_finished) = BatchKind::new(id, kind); - if is_finished { - return Some(acc); - } - - for (id, kind) in enqueued { - if acc.accumulate(id, kind) { - break; - } - } - - Some(acc) -} +*/ diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 5da767416..accd13efa 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1,3 +1,4 @@ +mod autobatcher; mod batch; mod document_formats; pub mod error; @@ -219,7 +220,7 @@ impl IndexScheduler { continue; } }; - let mut batch = match self.get_next_batch(&wtxn) { + let mut batch = match self.create_next_batch(&wtxn) { Ok(batch) => batch, Err(e) => { log::error!("{}", e); @@ -227,7 +228,7 @@ impl IndexScheduler { } }; - let res = self.process_batch(&mut wtxn, &mut batch); + let res = self.process_batch(&mut wtxn, batch); // TODO: TAMO: do this later // must delete the file on disk @@ -245,6 +246,7 @@ impl IndexScheduler { } } + #[cfg(truc)] fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> { match batch { Batch::One(task) => match &task.kind {