Pause the index scheduler for one second when a fatal error occurs

This commit is contained in:
Loïc Lecrenier 2022-10-24 14:16:14 +02:00 committed by Tamo
parent 44b8c8b713
commit c0a58f9141
3 changed files with 72 additions and 28 deletions

View File

@ -11,8 +11,6 @@ pub enum Error {
IndexNotFound(String),
#[error("Index `{0}` already exists.")]
IndexAlreadyExists(String),
#[error("Corrupted task queue.")]
CorruptedTaskQueue,
#[error("Corrupted dump.")]
CorruptedDump,
#[error("Task `{0}` not found.")]
@ -29,7 +27,7 @@ pub enum Error {
#[error(transparent)]
Milli(#[from] milli::Error),
#[error("An unexpected crash occurred when processing the task")]
MilliPanic,
ProcessBatchPanicked,
#[error(transparent)]
FileStore(#[from] file_store::Error),
#[error(transparent)]
@ -37,6 +35,16 @@ pub enum Error {
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
// Irrecoverable errors:
#[error(transparent)]
CreateBatch(Box<Self>),
#[error("Corrupted task queue.")]
CorruptedTaskQueue,
#[error(transparent)]
TaskDatabaseUpdate(Box<Self>),
#[error(transparent)]
HeedTransaction(heed::Error),
}
impl ErrorCode for Error {
@ -50,7 +58,7 @@ impl ErrorCode for Error {
Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(),
Error::MilliPanic => Code::Internal,
Error::ProcessBatchPanicked => Code::Internal,
// TODO: TAMO: are all these errors really internal?
Error::Heed(_) => Code::Internal,
Error::FileStore(_) => Code::Internal,
@ -58,6 +66,9 @@ impl ErrorCode for Error {
Error::Anyhow(_) => Code::Internal,
Error::CorruptedTaskQueue => Code::Internal,
Error::CorruptedDump => Code::Internal,
Error::TaskDatabaseUpdate(_) => Code::Internal,
Error::CreateBatch(_) => Code::Internal,
Error::HeedTransaction(_) => Code::Internal,
}
}
}

View File

@ -41,6 +41,7 @@ use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use file_store::FileStore;
use meilisearch_types::error::ResponseError;
@ -294,6 +295,9 @@ pub enum Breakpoint {
BatchCreated,
BeforeProcessing,
AfterProcessing,
AbortedIndexation,
ProcessBatchSucceeded,
ProcessBatchFailed,
}
impl IndexScheduler {
@ -377,7 +381,19 @@ impl IndexScheduler {
match run.tick() {
Ok(0) => (),
Ok(_) => run.wake_up.signal(),
Err(e) => log::error!("{}", e),
Err(e) => {
log::error!("{}", e);
// Wait one second when an irrecoverable error occurs.
match e {
Error::CorruptedTaskQueue
| Error::TaskDatabaseUpdate(_)
| Error::HeedTransaction(_)
| Error::CreateBatch(_) => {
std::thread::sleep(Duration::from_secs(1));
}
_ => {}
}
}
}
});
}
@ -761,11 +777,12 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start);
}
let rtxn = self.env.read_txn()?;
let batch = match self.create_next_batch(&rtxn)? {
Some(batch) => batch,
None => return Ok(0),
};
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch,
None => return Ok(0),
};
drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks.
@ -786,17 +803,19 @@ impl IndexScheduler {
let res = {
let cloned_index_scheduler = self.private_clone();
let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch));
handle.join().unwrap_or_else(|_| Err(Error::MilliPanic))
handle.join().unwrap_or_else(|_| Err(Error::ProcessBatchPanicked))
};
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
let mut wtxn = self.env.write_txn()?;
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let finished_at = OffsetDateTime::now_utc();
match res {
Ok(tasks) => {
#[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchSucceeded);
#[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() {
task.started_at = Some(started_at);
@ -809,8 +828,11 @@ impl IndexScheduler {
},
)?;
self.update_task(&mut wtxn, &task)?;
self.delete_persisted_task_data(&task)?;
self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
if let Err(e) = self.delete_persisted_task_data(&task) {
log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
}
}
log::info!("A batch of tasks was successfully completed.");
}
@ -818,15 +840,21 @@ impl IndexScheduler {
Err(Error::Milli(milli::Error::InternalError(
milli::InternalError::AbortedIndexation,
))) => {
// TODO should we add a breakpoint here?
wtxn.abort()?;
#[cfg(test)]
self.breakpoint(Breakpoint::AbortedIndexation);
wtxn.abort().map_err(Error::HeedTransaction)?;
return Ok(0);
}
// In case of a failure we must get back and patch all the tasks with the error.
Err(err) => {
#[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchFailed);
let error: ResponseError = err.into();
for id in ids {
let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let mut task = self
.get_task(&wtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?;
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
task.status = Status::Failed;
@ -835,8 +863,11 @@ impl IndexScheduler {
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?;
self.delete_persisted_task_data(&task)?;
self.update_task(&mut wtxn, &task)?;
if let Err(e) = self.delete_persisted_task_data(&task) {
log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
}
self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
}
}
}
@ -845,7 +876,7 @@ impl IndexScheduler {
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
wtxn.commit()?;
wtxn.commit().map_err(Error::HeedTransaction)?;
#[cfg(test)]
self.breakpoint(Breakpoint::AfterProcessing);
@ -875,6 +906,8 @@ impl IndexScheduler {
#[cfg(test)]
mod tests {
use std::time::Instant;
use big_s::S;
use file_store::File;
use meili_snap::snapshot;
@ -1762,13 +1795,6 @@ mod tests {
#[test]
fn fail_in_update_task_after_process_batch_success_for_document_addition() {
// TODO: this is not the correct behaviour, because we process the
// same tasks twice.
//
// I wonder whether the run loop should maybe be paused if we encounter a failure
// at that point. Alternatively, maybe we should preemptively mark the tasks
// as failed at the beginning of `IndexScheduler::tick`.
let (index_scheduler, handle) = IndexScheduler::test(
true,
vec![(1, FailureLocation::UpdatingTaskAfterProcessBatchSuccess { task_uid: 0 })],
@ -1795,6 +1821,10 @@ mod tests {
allow_index_creation: true,
})
.unwrap();
// This tests that the index scheduler pauses for one second when an irrecoverable failure occurs
let start_time = Instant::now();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::Start);
@ -1803,7 +1833,10 @@ mod tests {
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteratino");
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_iteration");
let test_duration = start_time.elapsed();
assert!(test_duration.as_millis() > 1000);
}
#[test]