mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-31 07:23:15 +08:00
IndexScheduler::tick returns a TickOutcome
This commit is contained in:
parent
faf1e17a27
commit
6cc3797aa1
@ -423,12 +423,12 @@ impl IndexScheduler {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
run.breakpoint(Breakpoint::Init);
|
run.breakpoint(Breakpoint::Init);
|
||||||
|
|
||||||
loop {
|
run.wake_up.wait();
|
||||||
run.wake_up.wait();
|
|
||||||
|
|
||||||
|
loop {
|
||||||
match run.tick() {
|
match run.tick() {
|
||||||
Ok(0) => (),
|
Ok(TickOutcome::TickAgain(_)) => (),
|
||||||
Ok(_) => run.wake_up.signal(),
|
Ok(TickOutcome::WaitForSignal) => run.wake_up.wait(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("{}", e);
|
log::error!("{}", e);
|
||||||
// Wait one second when an irrecoverable error occurs.
|
// Wait one second when an irrecoverable error occurs.
|
||||||
@ -441,7 +441,6 @@ impl IndexScheduler {
|
|||||||
) {
|
) {
|
||||||
std::thread::sleep(Duration::from_secs(1));
|
std::thread::sleep(Duration::from_secs(1));
|
||||||
}
|
}
|
||||||
run.wake_up.signal();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -927,7 +926,7 @@ impl IndexScheduler {
|
|||||||
/// 5. Reset the in-memory list of processed tasks.
|
/// 5. Reset the in-memory list of processed tasks.
|
||||||
///
|
///
|
||||||
/// Returns the number of processed tasks.
|
/// Returns the number of processed tasks.
|
||||||
fn tick(&self) -> Result<usize> {
|
fn tick(&self) -> Result<TickOutcome> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
{
|
{
|
||||||
*self.run_loop_iteration.write().unwrap() += 1;
|
*self.run_loop_iteration.write().unwrap() += 1;
|
||||||
@ -938,7 +937,7 @@ impl IndexScheduler {
|
|||||||
let batch =
|
let batch =
|
||||||
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
|
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
|
||||||
Some(batch) => batch,
|
Some(batch) => batch,
|
||||||
None => return Ok(0),
|
None => return Ok(TickOutcome::WaitForSignal),
|
||||||
};
|
};
|
||||||
drop(rtxn);
|
drop(rtxn);
|
||||||
|
|
||||||
@ -1010,7 +1009,7 @@ impl IndexScheduler {
|
|||||||
// the `started_at` date times and `processings` of the current processing tasks.
|
// the `started_at` date times and `processings` of the current processing tasks.
|
||||||
// This date time is used by the task cancelation to store the right `started_at`
|
// This date time is used by the task cancelation to store the right `started_at`
|
||||||
// date in the task on disk.
|
// date in the task on disk.
|
||||||
return Ok(0);
|
return Ok(TickOutcome::TickAgain(0));
|
||||||
}
|
}
|
||||||
// In case of a failure we must get back and patch all the tasks with the error.
|
// In case of a failure we must get back and patch all the tasks with the error.
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -1050,7 +1049,7 @@ impl IndexScheduler {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.breakpoint(Breakpoint::AfterProcessing);
|
self.breakpoint(Breakpoint::AfterProcessing);
|
||||||
|
|
||||||
Ok(processed_tasks)
|
Ok(TickOutcome::TickAgain(processed_tasks))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
|
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
|
||||||
@ -1085,6 +1084,16 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The outcome of calling the [`IndexScheduler::tick`] function.
|
||||||
|
pub enum TickOutcome {
|
||||||
|
/// The scheduler should immediately attempt another `tick`.
|
||||||
|
///
|
||||||
|
/// The `usize` field contains the number of processed tasks.
|
||||||
|
TickAgain(usize),
|
||||||
|
/// The scheduler should wait for an external signal before attempting another `tick`.
|
||||||
|
WaitForSignal,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::io::{BufWriter, Seek, Write};
|
use std::io::{BufWriter, Seek, Write};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user