diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8dd16f961..4ef218143 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -423,12 +423,12 @@ impl IndexScheduler { #[cfg(test)] run.breakpoint(Breakpoint::Init); - loop { - run.wake_up.wait(); + run.wake_up.wait(); + loop { match run.tick() { - Ok(0) => (), - Ok(_) => run.wake_up.signal(), + Ok(TickOutcome::TickAgain(_)) => (), + Ok(TickOutcome::WaitForSignal) => run.wake_up.wait(), Err(e) => { log::error!("{}", e); // Wait one second when an irrecoverable error occurs. @@ -441,7 +441,6 @@ impl IndexScheduler { ) { std::thread::sleep(Duration::from_secs(1)); } - run.wake_up.signal(); } } } @@ -927,7 +926,7 @@ impl IndexScheduler { /// 5. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. - fn tick(&self) -> Result { + fn tick(&self) -> Result { #[cfg(test)] { *self.run_loop_iteration.write().unwrap() += 1; @@ -938,7 +937,7 @@ impl IndexScheduler { let batch = match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { Some(batch) => batch, - None => return Ok(0), + None => return Ok(TickOutcome::WaitForSignal), }; drop(rtxn); @@ -1010,7 +1009,7 @@ impl IndexScheduler { // the `started_at` date times and `processings` of the current processing tasks. // This date time is used by the task cancelation to store the right `started_at` // date in the task on disk. - return Ok(0); + return Ok(TickOutcome::TickAgain(0)); } // In case of a failure we must get back and patch all the tasks with the error. Err(err) => { @@ -1050,7 +1049,7 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::AfterProcessing); - Ok(processed_tasks) + Ok(TickOutcome::TickAgain(processed_tasks)) } pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { @@ -1085,6 +1084,16 @@ impl IndexScheduler { } } +/// The outcome of calling the [`IndexScheduler::tick`] function. +pub enum TickOutcome { + /// The scheduler should immediately attempt another `tick`. + /// + /// The `usize` field contains the number of processed tasks. + TickAgain(usize), + /// The scheduler should wait for an external signal before attempting another `tick`. + WaitForSignal, +} + #[cfg(test)] mod tests { use std::io::{BufWriter, Seek, Write};