From ed745591e1f2dd6f5119c86eb3bf63c57795c858 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 7 Sep 2022 00:10:14 +0200 Subject: [PATCH] split the scheduler into multiples files --- index-scheduler/src/batch.rs | 110 +++++++++++++++++++++ index-scheduler/src/lib.rs | 179 +---------------------------------- index-scheduler/src/utils.rs | 99 +++++++++++++++++++ 3 files changed, 213 insertions(+), 175 deletions(-) create mode 100644 index-scheduler/src/batch.rs create mode 100644 index-scheduler/src/utils.rs diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs new file mode 100644 index 000000000..3e97a8fd0 --- /dev/null +++ b/index-scheduler/src/batch.rs @@ -0,0 +1,110 @@ +use crate::{ + task::{KindWithContent, Status}, + Error, IndexScheduler, Result, +}; +use milli::heed::RoTxn; + +use crate::{task::Kind, Task}; + +pub enum Batch { + Cancel(Task), + Snapshot(Vec), + Dump(Vec), + Contiguous { tasks: Vec, kind: Kind }, + One(Task), + Empty, +} + +impl IndexScheduler { + /// Create the next batch to be processed; + /// 1. We get the *last* task to cancel. + /// 2. We get the *next* snapshot to process. + /// 3. We get the *next* dump to process. + /// 4. We get the *next* tasks to process for a specific index. + fn get_next_batch(&self, rtxn: &RoTxn) -> Result { + let enqueued = &self.get_status(rtxn, Status::Enqueued)?; + let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; + + // 1. we get the last task to cancel. + if let Some(task_id) = to_cancel.max() { + return Ok(Batch::Cancel( + self.get_task(rtxn, task_id)? + .ok_or(Error::CorruptedTaskQueue)?, + )); + } + + // 2. we batch the snapshot. + let to_snapshot = self.get_kind(rtxn, Kind::Snapshot)? & enqueued; + if !to_snapshot.is_empty() { + return Ok(Batch::Snapshot(self.get_existing_tasks(rtxn, to_snapshot)?)); + } + + // 3. we batch the dumps. + let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued; + if !to_dump.is_empty() { + return Ok(Batch::Dump(self.get_existing_tasks(rtxn, to_dump)?)); + } + + // 4. We take the next task and try to batch all the tasks associated with this index. + if let Some(task_id) = enqueued.min() { + let task = self + .get_task(rtxn, task_id)? + .ok_or(Error::CorruptedTaskQueue)?; + match task.kind { + // We can batch all the consecutive tasks coming next which + // have the kind `DocumentAddition`. + KindWithContent::DocumentAddition { index_name, .. } => { + return self.batch_contiguous_kind(rtxn, &index_name, Kind::DocumentAddition) + } + // We can batch all the consecutive tasks coming next which + // have the kind `DocumentDeletion`. + KindWithContent::DocumentDeletion { index_name, .. } => { + return self.batch_contiguous_kind(rtxn, &index_name, Kind::DocumentAddition) + } + // The following tasks can't be batched + KindWithContent::ClearAllDocuments { .. } + | KindWithContent::RenameIndex { .. } + | KindWithContent::CreateIndex { .. } + | KindWithContent::DeleteIndex { .. } + | KindWithContent::SwapIndex { .. } => return Ok(Batch::One(task)), + + // The following tasks have already been batched and thus can't appear here. + KindWithContent::CancelTask { .. } + | KindWithContent::DumpExport { .. } + | KindWithContent::Snapshot => { + unreachable!() + } + } + } + + // If we found no tasks then we were notified for something that got autobatched + // somehow and there is nothing to do. + Ok(Batch::Empty) + } + + /// Batch all the consecutive tasks coming next that shares the same `Kind` + /// for a specific index. There *MUST* be at least ONE task of this kind. + fn batch_contiguous_kind(&self, rtxn: &RoTxn, index: &str, kind: Kind) -> Result { + let enqueued = &self.get_status(rtxn, Status::Enqueued)?; + + // [1, 2, 4, 5] + let index_tasks = self.get_index(rtxn, &index)? & enqueued; + // [1, 2, 5] + let tasks_kind = &index_tasks & self.get_kind(rtxn, kind)?; + // [4] + let not_kind = &index_tasks - &tasks_kind; + + // [1, 2] + let mut to_process = tasks_kind.clone(); + if let Some(max) = not_kind.max() { + // it's safe to unwrap since we already ensured there + // was AT LEAST one task with the document addition tasks_kind. + to_process.remove_range(tasks_kind.min().unwrap()..max); + } + + Ok(Batch::Contiguous { + tasks: self.get_existing_tasks(rtxn, to_process)?, + kind, + }) + } +} diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 244d6e5b9..52b22b6f6 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1,10 +1,12 @@ +mod batch; pub mod error; pub mod task; +mod utils; -use error::Error; +pub use error::Error; use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str}; pub use task::Task; -use task::{Kind, KindWithContent, Status}; +use task::{Kind, Status}; use std::collections::hash_map::Entry; use std::sync::atomic::AtomicBool; @@ -134,181 +136,8 @@ impl IndexScheduler { Ok(()) } - /// Create the next batch to be processed; - /// 1. We get the *last* task to cancel. - /// 2. We get the *next* snapshot to process. - /// 3. We get the *next* dump to process. - /// 4. We get the *next* tasks to process for a specific index. - fn get_next_batch(&self, rtxn: &RoTxn) -> Result { - let enqueued = &self.get_status(rtxn, Status::Enqueued)?; - let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; - - // 1. we get the last task to cancel. - if let Some(task_id) = to_cancel.max() { - return Ok(Batch::Cancel( - self.get_task(rtxn, task_id)? - .ok_or(Error::CorruptedTaskQueue)?, - )); - } - - // 2. we batch the snapshot. - let to_snapshot = self.get_kind(rtxn, Kind::Snapshot)? & enqueued; - if !to_snapshot.is_empty() { - return Ok(Batch::Snapshot(self.get_existing_tasks(rtxn, to_snapshot)?)); - } - - // 3. we batch the dumps. - let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued; - if !to_dump.is_empty() { - return Ok(Batch::Dump(self.get_existing_tasks(rtxn, to_dump)?)); - } - - // 4. We take the next task and try to batch all the tasks associated with this index. - if let Some(task_id) = enqueued.min() { - let task = self - .get_task(rtxn, task_id)? - .ok_or(Error::CorruptedTaskQueue)?; - match task.kind { - // We can batch all the consecutive tasks coming next which - // have the kind `DocumentAddition`. - KindWithContent::DocumentAddition { index_name, .. } => { - return self.batch_contiguous_kind(rtxn, &index_name, Kind::DocumentAddition) - } - // We can batch all the consecutive tasks coming next which - // have the kind `DocumentDeletion`. - KindWithContent::DocumentDeletion { index_name, .. } => { - return self.batch_contiguous_kind(rtxn, &index_name, Kind::DocumentAddition) - } - // The following tasks can't be batched - KindWithContent::ClearAllDocuments { .. } - | KindWithContent::RenameIndex { .. } - | KindWithContent::CreateIndex { .. } - | KindWithContent::DeleteIndex { .. } - | KindWithContent::SwapIndex { .. } => return Ok(Batch::One(task)), - - // The following tasks have already been batched and thus can't appear here. - KindWithContent::CancelTask { .. } - | KindWithContent::DumpExport { .. } - | KindWithContent::Snapshot => { - unreachable!() - } - } - } - - // If we found no tasks then we were notified for something that got autobatched - // somehow and there is nothing to do. - Ok(Batch::Empty) - } - - /// Batch all the consecutive tasks coming next that shares the same `Kind` - /// for a specific index. There *MUST* be at least ONE task of this kind. - fn batch_contiguous_kind(&self, rtxn: &RoTxn, index: &str, kind: Kind) -> Result { - let enqueued = &self.get_status(rtxn, Status::Enqueued)?; - - // [1, 2, 4, 5] - let index_tasks = self.get_index(rtxn, &index)? & enqueued; - // [1, 2, 5] - let tasks_kind = &index_tasks & self.get_kind(rtxn, kind)?; - // [4] - let not_kind = &index_tasks - &tasks_kind; - - // [1, 2] - let mut to_process = tasks_kind.clone(); - if let Some(max) = not_kind.max() { - // it's safe to unwrap since we already ensured there - // was AT LEAST one task with the document addition tasks_kind. - to_process.remove_range(tasks_kind.min().unwrap()..max); - } - - Ok(Batch::Contiguous { - tasks: self.get_existing_tasks(rtxn, to_process)?, - kind, - }) - } - - fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result> { - Ok(self.all_tasks.get(rtxn, &BEU32::new(task_id))?) - } - pub fn notify(&self) { self.wake_up .store(true, std::sync::atomic::Ordering::Relaxed); } - - // =========== Utility functions on the DBs - - /// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a - // `CorruptedTaskQueue` error will be throwed. - fn get_existing_tasks( - &self, - rtxn: &RoTxn, - tasks: impl IntoIterator, - ) -> Result> { - tasks - .into_iter() - .map(|task_id| { - self.get_task(rtxn, task_id) - .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - }) - .collect::>() - } - - fn get_index(&self, rtxn: &RoTxn, index: &str) -> Result { - Ok(self.index_tasks.get(&rtxn, index)?.unwrap_or_default()) - } - - fn put_index(&self, wtxn: &mut RwTxn, index: &str, bitmap: &RoaringBitmap) -> Result<()> { - Ok(self.index_tasks.put(wtxn, index, bitmap)?) - } - - fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result { - Ok(self.status.get(&rtxn, &status)?.unwrap_or_default()) - } - - fn put_status(&self, wtxn: &mut RwTxn, status: Status, bitmap: &RoaringBitmap) -> Result<()> { - Ok(self.status.put(wtxn, &status, bitmap)?) - } - - fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result { - Ok(self.kind.get(&rtxn, &kind)?.unwrap_or_default()) - } - - fn put_kind(&self, wtxn: &mut RwTxn, kind: Kind, bitmap: &RoaringBitmap) -> Result<()> { - Ok(self.kind.put(wtxn, &kind, bitmap)?) - } - - fn update_status( - &self, - wtxn: &mut RwTxn, - status: Status, - f: impl Fn(RoaringBitmap) -> RoaringBitmap, - ) -> Result<()> { - let tasks = self.get_status(&wtxn, status)?; - let tasks = f(tasks); - self.put_status(wtxn, status, &tasks)?; - - Ok(()) - } - - fn update_kind( - &self, - wtxn: &mut RwTxn, - kind: Kind, - f: impl Fn(RoaringBitmap) -> RoaringBitmap, - ) -> Result<()> { - let tasks = self.get_kind(&wtxn, kind)?; - let tasks = f(tasks); - self.put_kind(wtxn, kind, &tasks)?; - - Ok(()) - } -} - -enum Batch { - Cancel(Task), - Snapshot(Vec), - Dump(Vec), - Contiguous { tasks: Vec, kind: Kind }, - One(Task), - Empty, } diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs new file mode 100644 index 000000000..2d27bd111 --- /dev/null +++ b/index-scheduler/src/utils.rs @@ -0,0 +1,99 @@ +//! Utility functions on the DBs. Mainly getter and setters. + +use milli::{ + heed::{RoTxn, RwTxn}, + BEU32, +}; +use roaring::RoaringBitmap; + +use crate::{ + task::{Kind, Status}, + Error, IndexScheduler, Result, Task, TaskId, +}; + +impl IndexScheduler { + pub(crate) fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result> { + Ok(self.all_tasks.get(rtxn, &BEU32::new(task_id))?) + } + + /// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a + /// `CorruptedTaskQueue` error will be throwed. + pub(crate) fn get_existing_tasks( + &self, + rtxn: &RoTxn, + tasks: impl IntoIterator, + ) -> Result> { + tasks + .into_iter() + .map(|task_id| { + self.get_task(rtxn, task_id) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) + }) + .collect::>() + } + + pub(crate) fn get_index(&self, rtxn: &RoTxn, index: &str) -> Result { + Ok(self.index_tasks.get(&rtxn, index)?.unwrap_or_default()) + } + + pub(crate) fn put_index( + &self, + wtxn: &mut RwTxn, + index: &str, + bitmap: &RoaringBitmap, + ) -> Result<()> { + Ok(self.index_tasks.put(wtxn, index, bitmap)?) + } + + pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result { + Ok(self.status.get(&rtxn, &status)?.unwrap_or_default()) + } + + pub(crate) fn put_status( + &self, + wtxn: &mut RwTxn, + status: Status, + bitmap: &RoaringBitmap, + ) -> Result<()> { + Ok(self.status.put(wtxn, &status, bitmap)?) + } + + pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result { + Ok(self.kind.get(&rtxn, &kind)?.unwrap_or_default()) + } + + pub(crate) fn put_kind( + &self, + wtxn: &mut RwTxn, + kind: Kind, + bitmap: &RoaringBitmap, + ) -> Result<()> { + Ok(self.kind.put(wtxn, &kind, bitmap)?) + } + + pub(crate) fn update_status( + &self, + wtxn: &mut RwTxn, + status: Status, + f: impl Fn(RoaringBitmap) -> RoaringBitmap, + ) -> Result<()> { + let tasks = self.get_status(&wtxn, status)?; + let tasks = f(tasks); + self.put_status(wtxn, status, &tasks)?; + + Ok(()) + } + + pub(crate) fn update_kind( + &self, + wtxn: &mut RwTxn, + kind: Kind, + f: impl Fn(RoaringBitmap) -> RoaringBitmap, + ) -> Result<()> { + let tasks = self.get_kind(&wtxn, kind)?; + let tasks = f(tasks); + self.put_kind(wtxn, kind, &tasks)?; + + Ok(()) + } +}