diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index aa955e3c3..1e3f25e4c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1019,6 +1019,13 @@ impl IndexScheduler { .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let mut index_wtxn = index.write_txn()?; + + if index.is_corrupted(&index_wtxn)? { + tracing::error!("Aborting task due to corrupted index"); + index_wtxn.abort(); + return Err(crate::Error::CorruptedIndex); + } + let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; index.check_document_facet_consistency(&index_wtxn)?.check(); diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 223b84762..ad23c6cb9 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -138,6 +138,8 @@ pub enum Error { CreateBatch(Box), #[error("Corrupted task queue.")] CorruptedTaskQueue, + #[error("Corrupted index.")] + CorruptedIndex, #[error(transparent)] TaskDatabaseUpdate(Box), #[error(transparent)] @@ -192,6 +194,7 @@ impl Error { | Error::Anyhow(_) => true, Error::CreateBatch(_) | Error::CorruptedTaskQueue + | Error::CorruptedIndex | Error::TaskDatabaseUpdate(_) | Error::HeedTransaction(_) => false, #[cfg(test)] @@ -242,6 +245,7 @@ impl ErrorCode for Error { Error::CorruptedDump => Code::Internal, Error::TaskDatabaseUpdate(_) => Code::Internal, Error::CreateBatch(_) => Code::Internal, + Error::CorruptedIndex => Code::Internal, // This one should never be seen by the end user Error::AbortedTask => Code::Internal, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 59b921c09..26073ac86 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1186,8 +1186,48 @@ impl IndexScheduler { }; match res { - Ok(_) => tracing::info!("Created a dump after failed task"), - Err(error) => tracing::error!(%error, "Could not create a dump after failed task"), + Ok(_) => tracing::info!("Created a dump after panicked task"), + Err(error) => { + tracing::error!(%error, "Could not create a dump after panicked task") + } + } + + let snap_batch = batch::Batch::SnapshotCreation(vec![Task { + uid: u32::MAX, + enqueued_at: OffsetDateTime::now_utc(), + started_at: Some(OffsetDateTime::now_utc()), + finished_at: None, + error: None, + canceled_by: None, + details: None, + status: Status::Processing, + kind: KindWithContent::SnapshotCreation, + }]); + + let res = { + let cloned_index_scheduler = self.private_clone(); + let handle = std::thread::Builder::new() + .name(String::from("batch-operation")) + .spawn(move || cloned_index_scheduler.process_batch(snap_batch)) + .unwrap(); + handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) + }; + + match res { + Ok(_) => tracing::info!("Created a snapshot after panicked task"), + Err(error) => { + tracing::error!(%error, "Could not create a snapshot after panicked task") + } + } + + { + if let Some(index_uid) = index_uid.as_deref() { + if let Ok(index) = self.index(index_uid) { + let mut index_wtxn = index.write_txn()?; + index.mark_as_corrupted(&mut index_wtxn)?; + index_wtxn.commit()?; + } + } } let user = std::env::var("MEILI_LOUIS_PUSHOVER_USER").unwrap(); @@ -1198,7 +1238,7 @@ impl IndexScheduler { "token": app, "user": user, "title": "Issue 138 db inconsistency", - "message": "A dump has been created", + "message": "Dump and snapshot created, the index has been marked as corrupted", }), ) { tracing::error!(%error, "could not send pushover") diff --git a/milli/src/index.rs b/milli/src/index.rs index 98de0b82a..bef9bc28d 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -73,6 +73,8 @@ pub mod main_key { pub const PROXIMITY_PRECISION: &str = "proximity-precision"; pub const EMBEDDING_CONFIGS: &str = "embedding_configs"; pub const SEARCH_CUTOFF: &str = "search_cutoff"; + + pub const CORRUPTED: &str = "corrupted"; } pub mod db_name { @@ -1716,6 +1718,14 @@ impl Index { Ok(DocumentFacetConsistency { documents, facets, facet_exists }) } + + pub fn mark_as_corrupted(&self, wtxn: &mut RwTxn<'_>) -> Result<()> { + Ok(self.main.remap_types::().put(wtxn, main_key::CORRUPTED, "corrupted")?) + } + + pub fn is_corrupted(&self, txn: &RoTxn<'_>) -> Result { + Ok(self.main.remap_types::().get(txn, main_key::CORRUPTED)?.is_some()) + } } pub struct DocumentFacetConsistency { @@ -1727,20 +1737,22 @@ pub struct DocumentFacetConsistency { impl DocumentFacetConsistency { pub fn check(&self) { let mut inconsistencies = 0; - for ((field_name, facet), facet_exists) in self.facets.iter().zip(self.facet_exists.iter()) + for ((field_name, facet), _facet_exists) in self.facets.iter().zip(self.facet_exists.iter()) { if field_name == "_geo" { continue; } - let documents = self.documents.clone() & facet_exists; - let missing_in_facets = &documents - facet; + // only check the internal ids missing in documents as it is the grave condition + // let documents = self.documents.clone() & facet_exists; + let documents = self.documents.clone(); + // let missing_in_facets = &documents - facet; let missing_in_documents = facet - documents; - for id in missing_in_facets { + /*for id in missing_in_facets { tracing::error!(id, field_name, "Missing in facets"); inconsistencies += 1; - } + }*/ for id in missing_in_documents { tracing::error!(id, field_name, "Missing in documents"); inconsistencies += 1;