Changes for tracking issue 138

- create a snapshot as well as a dump
- only detect inconsistencies in the facet -> document direction
- mark index as corrupted after creating snapshot and dump
- always abort tasks on indexes marked as corrupted
This commit is contained in:
Louis Dureuil 2024-04-04 10:22:49 +02:00
parent 26bd82a6e8
commit ba40c8f31c
No known key found for this signature in database
4 changed files with 71 additions and 8 deletions

View File

@ -1019,6 +1019,13 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone()))); .set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?; let mut index_wtxn = index.write_txn()?;
if index.is_corrupted(&index_wtxn)? {
tracing::error!("Aborting task due to corrupted index");
index_wtxn.abort();
return Err(crate::Error::CorruptedIndex);
}
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
index.check_document_facet_consistency(&index_wtxn)?.check(); index.check_document_facet_consistency(&index_wtxn)?.check();

View File

@ -138,6 +138,8 @@ pub enum Error {
CreateBatch(Box<Self>), CreateBatch(Box<Self>),
#[error("Corrupted task queue.")] #[error("Corrupted task queue.")]
CorruptedTaskQueue, CorruptedTaskQueue,
#[error("Corrupted index.")]
CorruptedIndex,
#[error(transparent)] #[error(transparent)]
TaskDatabaseUpdate(Box<Self>), TaskDatabaseUpdate(Box<Self>),
#[error(transparent)] #[error(transparent)]
@ -192,6 +194,7 @@ impl Error {
| Error::Anyhow(_) => true, | Error::Anyhow(_) => true,
Error::CreateBatch(_) Error::CreateBatch(_)
| Error::CorruptedTaskQueue | Error::CorruptedTaskQueue
| Error::CorruptedIndex
| Error::TaskDatabaseUpdate(_) | Error::TaskDatabaseUpdate(_)
| Error::HeedTransaction(_) => false, | Error::HeedTransaction(_) => false,
#[cfg(test)] #[cfg(test)]
@ -242,6 +245,7 @@ impl ErrorCode for Error {
Error::CorruptedDump => Code::Internal, Error::CorruptedDump => Code::Internal,
Error::TaskDatabaseUpdate(_) => Code::Internal, Error::TaskDatabaseUpdate(_) => Code::Internal,
Error::CreateBatch(_) => Code::Internal, Error::CreateBatch(_) => Code::Internal,
Error::CorruptedIndex => Code::Internal,
// This one should never be seen by the end user // This one should never be seen by the end user
Error::AbortedTask => Code::Internal, Error::AbortedTask => Code::Internal,

View File

@ -1186,8 +1186,48 @@ impl IndexScheduler {
}; };
match res { match res {
Ok(_) => tracing::info!("Created a dump after failed task"), Ok(_) => tracing::info!("Created a dump after panicked task"),
Err(error) => tracing::error!(%error, "Could not create a dump after failed task"), Err(error) => {
tracing::error!(%error, "Could not create a dump after panicked task")
}
}
let snap_batch = batch::Batch::SnapshotCreation(vec![Task {
uid: u32::MAX,
enqueued_at: OffsetDateTime::now_utc(),
started_at: Some(OffsetDateTime::now_utc()),
finished_at: None,
error: None,
canceled_by: None,
details: None,
status: Status::Processing,
kind: KindWithContent::SnapshotCreation,
}]);
let res = {
let cloned_index_scheduler = self.private_clone();
let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn(move || cloned_index_scheduler.process_batch(snap_batch))
.unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
};
match res {
Ok(_) => tracing::info!("Created a snapshot after panicked task"),
Err(error) => {
tracing::error!(%error, "Could not create a snapshot after panicked task")
}
}
{
if let Some(index_uid) = index_uid.as_deref() {
if let Ok(index) = self.index(index_uid) {
let mut index_wtxn = index.write_txn()?;
index.mark_as_corrupted(&mut index_wtxn)?;
index_wtxn.commit()?;
}
}
} }
let user = std::env::var("MEILI_LOUIS_PUSHOVER_USER").unwrap(); let user = std::env::var("MEILI_LOUIS_PUSHOVER_USER").unwrap();
@ -1198,7 +1238,7 @@ impl IndexScheduler {
"token": app, "token": app,
"user": user, "user": user,
"title": "Issue 138 db inconsistency", "title": "Issue 138 db inconsistency",
"message": "A dump has been created", "message": "Dump and snapshot created, the index has been marked as corrupted",
}), }),
) { ) {
tracing::error!(%error, "could not send pushover") tracing::error!(%error, "could not send pushover")

View File

@ -73,6 +73,8 @@ pub mod main_key {
pub const PROXIMITY_PRECISION: &str = "proximity-precision"; pub const PROXIMITY_PRECISION: &str = "proximity-precision";
pub const EMBEDDING_CONFIGS: &str = "embedding_configs"; pub const EMBEDDING_CONFIGS: &str = "embedding_configs";
pub const SEARCH_CUTOFF: &str = "search_cutoff"; pub const SEARCH_CUTOFF: &str = "search_cutoff";
pub const CORRUPTED: &str = "corrupted";
} }
pub mod db_name { pub mod db_name {
@ -1716,6 +1718,14 @@ impl Index {
Ok(DocumentFacetConsistency { documents, facets, facet_exists }) Ok(DocumentFacetConsistency { documents, facets, facet_exists })
} }
pub fn mark_as_corrupted(&self, wtxn: &mut RwTxn<'_>) -> Result<()> {
Ok(self.main.remap_types::<Str, Str>().put(wtxn, main_key::CORRUPTED, "corrupted")?)
}
pub fn is_corrupted(&self, txn: &RoTxn<'_>) -> Result<bool> {
Ok(self.main.remap_types::<Str, Str>().get(txn, main_key::CORRUPTED)?.is_some())
}
} }
pub struct DocumentFacetConsistency { pub struct DocumentFacetConsistency {
@ -1727,20 +1737,22 @@ pub struct DocumentFacetConsistency {
impl DocumentFacetConsistency { impl DocumentFacetConsistency {
pub fn check(&self) { pub fn check(&self) {
let mut inconsistencies = 0; let mut inconsistencies = 0;
for ((field_name, facet), facet_exists) in self.facets.iter().zip(self.facet_exists.iter()) for ((field_name, facet), _facet_exists) in self.facets.iter().zip(self.facet_exists.iter())
{ {
if field_name == "_geo" { if field_name == "_geo" {
continue; continue;
} }
let documents = self.documents.clone() & facet_exists; // only check the internal ids missing in documents as it is the grave condition
let missing_in_facets = &documents - facet; // let documents = self.documents.clone() & facet_exists;
let documents = self.documents.clone();
// let missing_in_facets = &documents - facet;
let missing_in_documents = facet - documents; let missing_in_documents = facet - documents;
for id in missing_in_facets { /*for id in missing_in_facets {
tracing::error!(id, field_name, "Missing in facets"); tracing::error!(id, field_name, "Missing in facets");
inconsistencies += 1; inconsistencies += 1;
} }*/
for id in missing_in_documents { for id in missing_in_documents {
tracing::error!(id, field_name, "Missing in documents"); tracing::error!(id, field_name, "Missing in documents");
inconsistencies += 1; inconsistencies += 1;