mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 10:37:41 +08:00
check consistency, create a dump and send push event for failed checks
This commit is contained in:
parent
0df84bbba7
commit
26bd82a6e8
@ -1020,6 +1020,9 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
let mut index_wtxn = index.write_txn()?;
|
let mut index_wtxn = index.write_txn()?;
|
||||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||||
|
|
||||||
|
index.check_document_facet_consistency(&index_wtxn)?.check();
|
||||||
|
|
||||||
index_wtxn.commit()?;
|
index_wtxn.commit()?;
|
||||||
|
|
||||||
// if the update processed successfully, we're going to store the new
|
// if the update processed successfully, we're going to store the new
|
||||||
@ -1395,6 +1398,7 @@ impl IndexScheduler {
|
|||||||
} else {
|
} else {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
};
|
};
|
||||||
|
|
||||||
let deleted_documents = delete_document_by_filter(
|
let deleted_documents = delete_document_by_filter(
|
||||||
index_wtxn,
|
index_wtxn,
|
||||||
filter,
|
filter,
|
||||||
|
@ -1163,6 +1163,48 @@ impl IndexScheduler {
|
|||||||
// Reset the currently updating index to relinquish the index handle
|
// Reset the currently updating index to relinquish the index handle
|
||||||
self.index_mapper.set_currently_updating_index(None);
|
self.index_mapper.set_currently_updating_index(None);
|
||||||
|
|
||||||
|
if let Err(_error) = &res {
|
||||||
|
let dump_batch = batch::Batch::Dump(Task {
|
||||||
|
uid: u32::MAX,
|
||||||
|
enqueued_at: OffsetDateTime::now_utc(),
|
||||||
|
started_at: Some(OffsetDateTime::now_utc()),
|
||||||
|
finished_at: None,
|
||||||
|
error: None,
|
||||||
|
canceled_by: None,
|
||||||
|
details: None,
|
||||||
|
status: Status::Processing,
|
||||||
|
kind: KindWithContent::DumpCreation { keys: vec![], instance_uid: None },
|
||||||
|
});
|
||||||
|
|
||||||
|
let res = {
|
||||||
|
let cloned_index_scheduler = self.private_clone();
|
||||||
|
let handle = std::thread::Builder::new()
|
||||||
|
.name(String::from("batch-operation"))
|
||||||
|
.spawn(move || cloned_index_scheduler.process_batch(dump_batch))
|
||||||
|
.unwrap();
|
||||||
|
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
||||||
|
};
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(_) => tracing::info!("Created a dump after failed task"),
|
||||||
|
Err(error) => tracing::error!(%error, "Could not create a dump after failed task"),
|
||||||
|
}
|
||||||
|
|
||||||
|
let user = std::env::var("MEILI_LOUIS_PUSHOVER_USER").unwrap();
|
||||||
|
let app = std::env::var("MEILI_LOUIS_PUSHOVER_APP").unwrap();
|
||||||
|
|
||||||
|
if let Err(error) = ureq::post("https://api.pushover.net/1/messages.json").send_json(
|
||||||
|
serde_json::json!({
|
||||||
|
"token": app,
|
||||||
|
"user": user,
|
||||||
|
"title": "Issue 138 db inconsistency",
|
||||||
|
"message": "A dump has been created",
|
||||||
|
}),
|
||||||
|
) {
|
||||||
|
tracing::error!(%error, "could not send pushover")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
||||||
|
|
||||||
|
@ -74,6 +74,9 @@ fn on_panic(info: &std::panic::PanicInfo) {
|
|||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let (opt, config_read_from) = Opt::try_build()?;
|
let (opt, config_read_from) = Opt::try_build()?;
|
||||||
|
|
||||||
|
std::env::var("MEILI_LOUIS_PUSHOVER_USER").expect("MEILI_LOUIS_PUSHOVER_USER not set");
|
||||||
|
std::env::var("MEILI_LOUIS_PUSHOVER_APP").expect("MEILI_LOUIS_PUSHOVER_APP not set");
|
||||||
|
|
||||||
std::panic::set_hook(Box::new(on_panic));
|
std::panic::set_hook(Box::new(on_panic));
|
||||||
|
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
|
@ -1666,6 +1666,93 @@ impl Index {
|
|||||||
}
|
}
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn check_document_facet_consistency(
|
||||||
|
&self,
|
||||||
|
rtxn: &RoTxn<'_>,
|
||||||
|
) -> Result<DocumentFacetConsistency> {
|
||||||
|
let documents = self.documents_ids(rtxn)?;
|
||||||
|
|
||||||
|
let field_ids_map = self.fields_ids_map(rtxn)?;
|
||||||
|
|
||||||
|
let mut facets = Vec::new();
|
||||||
|
let mut facet_exists = Vec::new();
|
||||||
|
let faceted_fields = self.user_defined_faceted_fields(rtxn)?;
|
||||||
|
for fid in field_ids_map.ids() {
|
||||||
|
let facet_name = field_ids_map.name(fid).unwrap();
|
||||||
|
if !faceted_fields.contains(facet_name) {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let mut facet = RoaringBitmap::new();
|
||||||
|
|
||||||
|
// value doesn't matter here we'll truncate to the level
|
||||||
|
let key = crate::heed_codec::facet::FacetGroupKey {
|
||||||
|
field_id: fid,
|
||||||
|
level: 0,
|
||||||
|
left_bound: &[] as _,
|
||||||
|
};
|
||||||
|
|
||||||
|
for res in self
|
||||||
|
.facet_id_f64_docids
|
||||||
|
.remap_key_type::<FacetGroupKeyCodec<crate::heed_codec::BytesRefCodec>>()
|
||||||
|
.prefix_iter(rtxn, &key)?
|
||||||
|
{
|
||||||
|
let (_k, v) = res?;
|
||||||
|
facet |= v.bitmap;
|
||||||
|
}
|
||||||
|
|
||||||
|
for res in self
|
||||||
|
.facet_id_string_docids
|
||||||
|
.remap_key_type::<FacetGroupKeyCodec<crate::heed_codec::BytesRefCodec>>()
|
||||||
|
.prefix_iter(rtxn, &key)?
|
||||||
|
{
|
||||||
|
let (_k, v) = res?;
|
||||||
|
facet |= v.bitmap;
|
||||||
|
}
|
||||||
|
|
||||||
|
facets.push((field_ids_map.name(fid).unwrap().to_owned(), facet));
|
||||||
|
facet_exists.push(self.exists_faceted_documents_ids(rtxn, fid)?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(DocumentFacetConsistency { documents, facets, facet_exists })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DocumentFacetConsistency {
|
||||||
|
documents: RoaringBitmap,
|
||||||
|
facets: Vec<(String, RoaringBitmap)>,
|
||||||
|
facet_exists: Vec<RoaringBitmap>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DocumentFacetConsistency {
|
||||||
|
pub fn check(&self) {
|
||||||
|
let mut inconsistencies = 0;
|
||||||
|
for ((field_name, facet), facet_exists) in self.facets.iter().zip(self.facet_exists.iter())
|
||||||
|
{
|
||||||
|
if field_name == "_geo" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let documents = self.documents.clone() & facet_exists;
|
||||||
|
let missing_in_facets = &documents - facet;
|
||||||
|
let missing_in_documents = facet - documents;
|
||||||
|
|
||||||
|
for id in missing_in_facets {
|
||||||
|
tracing::error!(id, field_name, "Missing in facets");
|
||||||
|
inconsistencies += 1;
|
||||||
|
}
|
||||||
|
for id in missing_in_documents {
|
||||||
|
tracing::error!(id, field_name, "Missing in documents");
|
||||||
|
inconsistencies += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if inconsistencies > 0 {
|
||||||
|
panic!(
|
||||||
|
"Panicked due to the previous {} inconsistencies between documents and facets",
|
||||||
|
inconsistencies
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
Loading…
Reference in New Issue
Block a user