From 1ea9c0b4c0d69bced0ad8553c76d752bced132b2 Mon Sep 17 00:00:00 2001 From: Tamo Date: Fri, 16 Sep 2022 01:58:08 +0200 Subject: [PATCH] write most of the run loop --- index-scheduler/src/batch.rs | 22 ++++++- index-scheduler/src/index_scheduler.rs | 91 ++++++++++++++++---------- index-scheduler/src/utils.rs | 2 +- 3 files changed, 80 insertions(+), 35 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 9742116fb..6d7f0e088 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,7 +1,7 @@ use crate::{ autobatcher::BatchKind, task::{Kind, KindWithContent, Status, Task}, - Error, IndexScheduler, Result, + Error, IndexScheduler, Result, TaskId, }; use index::{Settings, Unchecked}; use milli::{ @@ -33,6 +33,26 @@ pub(crate) enum Batch { }, } +impl Batch { + pub fn ids(&self) -> Vec { + match self { + Batch::Cancel(task) => vec![task.uid], + Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::DocumentAddition { tasks, .. } => { + tasks.iter().map(|task| task.uid).collect() + } + Batch::SettingsAndDocumentAddition { + document_addition_tasks, + settings_tasks, + .. + } => document_addition_tasks + .iter() + .chain(settings_tasks) + .map(|task| task.uid) + .collect(), + } + } +} + impl IndexScheduler { pub(crate) fn create_next_batch_index( &self, diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index bac48d427..e6f207368 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -1,10 +1,11 @@ use crate::index_mapper::IndexMapper; use crate::task::{Kind, KindWithContent, Status, Task, TaskView}; -use crate::Result; +use crate::{Error, Result}; use file_store::FileStore; use index::Index; use milli::update::IndexerConfig; use synchronoise::SignalEvent; +use time::OffsetDateTime; use std::path::PathBuf; use std::sync::Arc; @@ -45,8 +46,8 @@ pub mod db_name { /// 2. Schedule the tasks. #[derive(Clone)] pub struct IndexScheduler { - /// The list of tasks currently processing. - pub(crate) processing_tasks: Arc>, + /// The list of tasks currently processing and their starting date. + pub(crate) processing_tasks: Arc>, pub(crate) file_store: FileStore, @@ -89,9 +90,11 @@ impl IndexScheduler { // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things let wake_up = SignalEvent::auto(true); + let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); + Ok(Self { // by default there is no processing tasks - processing_tasks: Arc::default(), + processing_tasks: Arc::new(RwLock::new(processing_tasks)), file_store: FileStore::new(update_file_path)?, all_tasks: env.create_database(Some(db_name::ALL_TASKS))?, status: env.create_database(Some(db_name::STATUS))?, @@ -201,38 +204,63 @@ impl IndexScheduler { } /// This worker function must be run in a different thread and must be run only once. - fn run(&self) -> ! { + pub fn run(&self) -> ! { loop { self.wake_up.wait(); - self.tick() + match self.tick() { + Ok(()) => (), + Err(e) => log::error!("{}", e), + } } } /// Create and execute and store the result of one batch of registered tasks. - fn tick(&self) { - let mut wtxn = match self.env.write_txn() { - Ok(wtxn) => wtxn, - Err(e) => { - log::error!("{}", e); - return; - } + fn tick(&self) -> Result<()> { + let mut wtxn = self.env.write_txn()?; + let batch = match self.create_next_batch(&wtxn)? { + Some(batch) => batch, + None => return Ok(()), }; - let batch = match self.create_next_batch(&wtxn) { - Ok(Some(batch)) => batch, - Ok(None) => return, - Err(e) => { - log::error!("{}", e); - return; + + // 1. store the starting date with the bitmap of processing tasks. + let mut ids = batch.ids(); + ids.sort_unstable(); + let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); + let started_at = OffsetDateTime::now_utc(); + *self.processing_tasks.write().unwrap() = (started_at, processing_tasks); + + // 2. process the tasks + let res = self.process_batch(&mut wtxn, batch); + + let finished_at = OffsetDateTime::now_utc(); + match res { + Ok(tasks) => { + for mut task in tasks { + task.started_at = Some(started_at); + task.finished_at = Some(finished_at); + task.status = Status::Succeeded; + // the info field should've been set by the process_batch function + + self.update_task(&mut wtxn, &task)?; + task.remove_data()?; + } } - }; - // 1. store the starting date with the bitmap of processing tasks - // 2. update the tasks with a starting date *but* do not write anything on disk + // In case of a failure we must get back and patch all the tasks with the error. + Err(_err) => { + for id in ids { + let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + task.started_at = Some(started_at); + task.finished_at = Some(finished_at); + task.status = Status::Failed; + // TODO: TAMO: set the error correctly + // task.error = Some(err); - // 3. process the tasks - let _res = self.process_batch(&mut wtxn, batch); - - // 4. store the updated tasks on disk + self.update_task(&mut wtxn, &task)?; + task.remove_data()?; + } + } + } // TODO: TAMO: do this later // must delete the file on disk @@ -240,13 +268,10 @@ impl IndexScheduler { // in case of « success » we must update all the task on disk // self.handle_batch_result(res); - match wtxn.commit() { - Ok(()) => log::info!("A batch of tasks was successfully completed."), - Err(e) => { - log::error!("{}", e); - return; - } - } + wtxn.commit()?; + log::info!("A batch of tasks was successfully completed."); + + Ok(()) } #[cfg(truc)] diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index effb81a33..8d767aec5 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -44,7 +44,7 @@ impl IndexScheduler { .collect::>() } - pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: Task) -> Result<()> { + pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { let old_task = self .get_task(wtxn, task.uid)? .ok_or(Error::CorruptedTaskQueue)?;