From 9ff0fe952e1902339e6e4d4767baf96e44e80acf Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 15 Sep 2022 12:47:09 +0200 Subject: [PATCH] split the run function in two --- index-scheduler/src/index_scheduler.rs | 69 ++++++++++++++------------ 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index 027932e44..bac48d427 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -201,45 +201,50 @@ impl IndexScheduler { } /// This worker function must be run in a different thread and must be run only once. - fn run(&self) { + fn run(&self) -> ! { loop { self.wake_up.wait(); - let mut wtxn = match self.env.write_txn() { - Ok(wtxn) => wtxn, - Err(e) => { - log::error!("{}", e); - continue; - } - }; - let batch = match self.create_next_batch(&wtxn) { - Ok(Some(batch)) => batch, - Ok(None) => continue, - Err(e) => { - log::error!("{}", e); - continue; - } - }; - // 1. store the starting date with the bitmap of processing tasks - // 2. update the tasks with a starting date *but* do not write anything on disk + self.tick() + } + } - // 3. process the tasks - let _res = self.process_batch(&mut wtxn, batch); + /// Create and execute and store the result of one batch of registered tasks. + fn tick(&self) { + let mut wtxn = match self.env.write_txn() { + Ok(wtxn) => wtxn, + Err(e) => { + log::error!("{}", e); + return; + } + }; + let batch = match self.create_next_batch(&wtxn) { + Ok(Some(batch)) => batch, + Ok(None) => return, + Err(e) => { + log::error!("{}", e); + return; + } + }; + // 1. store the starting date with the bitmap of processing tasks + // 2. update the tasks with a starting date *but* do not write anything on disk - // 4. store the updated tasks on disk + // 3. process the tasks + let _res = self.process_batch(&mut wtxn, batch); - // TODO: TAMO: do this later - // must delete the file on disk - // in case of error, must update the tasks with the error - // in case of « success » we must update all the task on disk - // self.handle_batch_result(res); + // 4. store the updated tasks on disk - match wtxn.commit() { - Ok(()) => log::info!("A batch of tasks was successfully completed."), - Err(e) => { - log::error!("{}", e); - continue; - } + // TODO: TAMO: do this later + // must delete the file on disk + // in case of error, must update the tasks with the error + // in case of « success » we must update all the task on disk + // self.handle_batch_result(res); + + match wtxn.commit() { + Ok(()) => log::info!("A batch of tasks was successfully completed."), + Err(e) => { + log::error!("{}", e); + return; } } }