From 26bd82a6e88a582859597c0141330d844d689f71 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 25 Mar 2024 16:28:23 +0100 Subject: [PATCH] check consistency, create a dump and send push event for failed checks --- index-scheduler/src/batch.rs | 4 ++ index-scheduler/src/lib.rs | 42 +++++++++++++++++ meilisearch/src/main.rs | 3 ++ milli/src/index.rs | 87 ++++++++++++++++++++++++++++++++++++ 4 files changed, 136 insertions(+) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index cd5525eea..aa955e3c3 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1020,6 +1020,9 @@ impl IndexScheduler { let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; + + index.check_document_facet_consistency(&index_wtxn)?.check(); + index_wtxn.commit()?; // if the update processed successfully, we're going to store the new @@ -1395,6 +1398,7 @@ impl IndexScheduler { } else { unreachable!() }; + let deleted_documents = delete_document_by_filter( index_wtxn, filter, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 213ec3230..59b921c09 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1163,6 +1163,48 @@ impl IndexScheduler { // Reset the currently updating index to relinquish the index handle self.index_mapper.set_currently_updating_index(None); + if let Err(_error) = &res { + let dump_batch = batch::Batch::Dump(Task { + uid: u32::MAX, + enqueued_at: OffsetDateTime::now_utc(), + started_at: Some(OffsetDateTime::now_utc()), + finished_at: None, + error: None, + canceled_by: None, + details: None, + status: Status::Processing, + kind: KindWithContent::DumpCreation { keys: vec![], instance_uid: None }, + }); + + let res = { + let cloned_index_scheduler = self.private_clone(); + let handle = std::thread::Builder::new() + .name(String::from("batch-operation")) + .spawn(move || cloned_index_scheduler.process_batch(dump_batch)) + .unwrap(); + handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) + }; + + match res { + Ok(_) => tracing::info!("Created a dump after failed task"), + Err(error) => tracing::error!(%error, "Could not create a dump after failed task"), + } + + let user = std::env::var("MEILI_LOUIS_PUSHOVER_USER").unwrap(); + let app = std::env::var("MEILI_LOUIS_PUSHOVER_APP").unwrap(); + + if let Err(error) = ureq::post("https://api.pushover.net/1/messages.json").send_json( + serde_json::json!({ + "token": app, + "user": user, + "title": "Issue 138 db inconsistency", + "message": "A dump has been created", + }), + ) { + tracing::error!(%error, "could not send pushover") + } + } + #[cfg(test)] self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; diff --git a/meilisearch/src/main.rs b/meilisearch/src/main.rs index af02f58e1..59e4b1095 100644 --- a/meilisearch/src/main.rs +++ b/meilisearch/src/main.rs @@ -74,6 +74,9 @@ fn on_panic(info: &std::panic::PanicInfo) { async fn main() -> anyhow::Result<()> { let (opt, config_read_from) = Opt::try_build()?; + std::env::var("MEILI_LOUIS_PUSHOVER_USER").expect("MEILI_LOUIS_PUSHOVER_USER not set"); + std::env::var("MEILI_LOUIS_PUSHOVER_APP").expect("MEILI_LOUIS_PUSHOVER_APP not set"); + std::panic::set_hook(Box::new(on_panic)); anyhow::ensure!( diff --git a/milli/src/index.rs b/milli/src/index.rs index 0a7a20ce0..98de0b82a 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1666,6 +1666,93 @@ impl Index { } Ok(res) } + + pub fn check_document_facet_consistency( + &self, + rtxn: &RoTxn<'_>, + ) -> Result { + let documents = self.documents_ids(rtxn)?; + + let field_ids_map = self.fields_ids_map(rtxn)?; + + let mut facets = Vec::new(); + let mut facet_exists = Vec::new(); + let faceted_fields = self.user_defined_faceted_fields(rtxn)?; + for fid in field_ids_map.ids() { + let facet_name = field_ids_map.name(fid).unwrap(); + if !faceted_fields.contains(facet_name) { + continue; + }; + let mut facet = RoaringBitmap::new(); + + // value doesn't matter here we'll truncate to the level + let key = crate::heed_codec::facet::FacetGroupKey { + field_id: fid, + level: 0, + left_bound: &[] as _, + }; + + for res in self + .facet_id_f64_docids + .remap_key_type::>() + .prefix_iter(rtxn, &key)? + { + let (_k, v) = res?; + facet |= v.bitmap; + } + + for res in self + .facet_id_string_docids + .remap_key_type::>() + .prefix_iter(rtxn, &key)? + { + let (_k, v) = res?; + facet |= v.bitmap; + } + + facets.push((field_ids_map.name(fid).unwrap().to_owned(), facet)); + facet_exists.push(self.exists_faceted_documents_ids(rtxn, fid)?); + } + + Ok(DocumentFacetConsistency { documents, facets, facet_exists }) + } +} + +pub struct DocumentFacetConsistency { + documents: RoaringBitmap, + facets: Vec<(String, RoaringBitmap)>, + facet_exists: Vec, +} + +impl DocumentFacetConsistency { + pub fn check(&self) { + let mut inconsistencies = 0; + for ((field_name, facet), facet_exists) in self.facets.iter().zip(self.facet_exists.iter()) + { + if field_name == "_geo" { + continue; + } + + let documents = self.documents.clone() & facet_exists; + let missing_in_facets = &documents - facet; + let missing_in_documents = facet - documents; + + for id in missing_in_facets { + tracing::error!(id, field_name, "Missing in facets"); + inconsistencies += 1; + } + for id in missing_in_documents { + tracing::error!(id, field_name, "Missing in documents"); + inconsistencies += 1; + } + } + if inconsistencies > 0 { + panic!( + "Panicked due to the previous {} inconsistencies between documents and facets", + inconsistencies + ) + } + } } #[derive(Debug, Deserialize, Serialize)]