diff --git a/Cargo.lock b/Cargo.lock index 94e59f487..9052f03c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1877,6 +1877,7 @@ dependencies = [ "nelson", "roaring 0.9.0", "serde", + "serde_json", "synchronoise", "tempfile", "thiserror", diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index a9df99866..2aa8f49e0 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -17,6 +17,7 @@ meilisearch-types = { path = "../meilisearch-types" } document-formats = { path = "../document-formats" } roaring = "0.9.0" serde = { version = "1.0.136", features = ["derive"] } +serde_json = { version = "1.0.85", features = ["preserve_order"] } tempfile = "3.3.0" thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index f445ddc1c..85a5bfe21 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -380,7 +380,7 @@ impl IndexScheduler { let task = self .get_task(rtxn, task_id)? .ok_or(Error::CorruptedTaskQueue)?; - println!("DeletionTask: {task:?}"); + return Ok(Some(Batch::DeleteTasks(task))); } @@ -442,7 +442,6 @@ impl IndexScheduler { match batch { Batch::Cancel(_) => todo!(), Batch::DeleteTasks(mut task) => { - println!("delete task: {task:?}"); // 1. Retrieve the tasks that matched the quety at enqueue-time let matched_tasks = if let KindWithContent::DeleteTasks { tasks, query: _ } = &task.kind { @@ -450,10 +449,10 @@ impl IndexScheduler { } else { unreachable!() }; - println!("matched tasks: {matched_tasks:?}"); + let mut wtxn = self.env.write_txn()?; let nbr_deleted_tasks = self.delete_matched_tasks(&mut wtxn, matched_tasks)?; - println!("nbr_deleted_tasks: {nbr_deleted_tasks}"); + task.status = Status::Succeeded; match &mut task.details { Some(Details::DeleteTasks { @@ -523,7 +522,6 @@ impl IndexScheduler { primary_key, mut task, } => { - println!("IndexUpdate task: {task:?}"); let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 59d3bd4ac..2c5b7b625 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -2,6 +2,8 @@ mod autobatcher; mod batch; pub mod error; mod index_mapper; +#[cfg(test)] +mod snapshot; pub mod task; mod utils; @@ -323,7 +325,6 @@ impl IndexScheduler { status: Status::Enqueued, kind: task, }; - self.all_tasks .append(&mut wtxn, &BEU32::new(task.uid), &task)?; @@ -403,22 +404,26 @@ impl IndexScheduler { // 2. Process the tasks let res = self.process_batch(batch); - + dbg!(); let mut wtxn = self.env.write_txn()?; - + dbg!(); let finished_at = OffsetDateTime::now_utc(); match res { Ok(tasks) => { + dbg!(); for mut task in tasks { task.started_at = Some(started_at); task.finished_at = Some(finished_at); // TODO the info field should've been set by the process_batch function self.update_task(&mut wtxn, &task)?; task.remove_data()?; + dbg!(); } + dbg!(); } // In case of a failure we must get back and patch all the tasks with the error. Err(err) => { + dbg!(); let error: ResponseError = err.into(); for id in ids { let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; @@ -432,10 +437,11 @@ impl IndexScheduler { } } } - + dbg!(); *self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new()); - + dbg!(); wtxn.commit()?; + dbg!(); log::info!("A batch of tasks was successfully completed."); #[cfg(test)] @@ -449,15 +455,13 @@ impl IndexScheduler { #[cfg(test)] mod tests { - use std::os::unix::process; - use big_s::S; use insta::*; use milli::update::IndexDocumentsMethod::ReplaceDocuments; use tempfile::TempDir; use uuid::Uuid; - use crate::assert_smol_debug_snapshot; + use crate::{assert_smol_debug_snapshot, snapshot::snapshot_index}; use super::*; @@ -703,7 +707,7 @@ mod tests { index_uid: S("catto"), primary_key: None, method: ReplaceDocuments, - content_file: Uuid::new_v4(), + content_file: Uuid::from_u128(0), documents_count: 12, allow_index_creation: true, }, @@ -711,7 +715,7 @@ mod tests { index_uid: S("doggo"), primary_key: Some(S("bone")), method: ReplaceDocuments, - content_file: Uuid::new_v4(), + content_file: Uuid::from_u128(1), documents_count: 5000, allow_index_creation: true, }, @@ -727,6 +731,31 @@ mod tests { } rtxn.commit().unwrap(); + assert_snapshot!(snapshot_index(&index_scheduler), @r###" + ### Processing Tasks: + [] + ---------------------------------------------------------------------- + ### All Tasks: + 0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} + 1 {uid: 1, status: enqueued, details: { received_documents: 12, indexed_documents: 0 }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: e881d224-ed39-4322-87ae-eae5a749b835, documents_count: 12, allow_index_creation: true }} + 2 {uid: 2, status: enqueued, details: { received_documents: 5000, indexed_documents: 0 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: f21ce9f3-58f4-4bab-813b-ecb0b202d20f, documents_count: 5000, allow_index_creation: true }} + ---------------------------------------------------------------------- + ### Status: + enqueued [0,1,2,] + ---------------------------------------------------------------------- + ### Kind: + {"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [1,2,] + "indexCreation" [0,] + ---------------------------------------------------------------------- + ### Index Tasks: + catto [0,1,] + doggo [2,] + ---------------------------------------------------------------------- + ### Index Mapper: + [] + ---------------------------------------------------------------------- + "###); + assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2)]"); index_scheduler.register(KindWithContent::DeleteTasks { @@ -740,7 +769,6 @@ mod tests { .unwrap() .unwrap(); rtxn.commit().unwrap(); - println!("TASK IN DB: {task:?}"); let rtxn = index_scheduler.env.read_txn().unwrap(); let mut all_tasks = Vec::new(); @@ -754,8 +782,8 @@ mod tests { handle.wait_till(Breakpoint::BatchCreated); // the last task, with uid = 3, should be marked as processing - let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1; - assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>"); + // let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1; + // assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>"); let rtxn = index_scheduler.env.read_txn().unwrap(); let task = index_scheduler .all_tasks @@ -763,22 +791,27 @@ mod tests { .unwrap() .unwrap(); rtxn.commit().unwrap(); - println!("TASK IN DB: {task:?}"); - // handle.wait_till(Breakpoint::AfterProcessing); + handle.wait_till(Breakpoint::AfterProcessing); - // let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1; - // assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>"); + dbg!(); - // let rtxn = index_scheduler.env.read_txn().unwrap(); - // let mut all_tasks = Vec::new(); - // for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() { - // all_tasks.push(ret.unwrap().0); - // } - // rtxn.commit().unwrap(); + let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1; + assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>"); - // assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]"); + dbg!(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let mut all_tasks = Vec::new(); + for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() { + all_tasks.push(ret.unwrap().0); + } + rtxn.commit().unwrap(); + + dbg!(); + + assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]"); + handle.dont_block(); // index_scheduler.register(KindWithContent::DocumentClear { index_uid: 0 }); // index_scheduler.register(KindWithContent::CancelTask { tasks: vec![0] }); // index_scheduler.register(KindWithContendt::DeleteTasks { tasks: vec![0] }); diff --git a/index-scheduler/src/snapshot.rs b/index-scheduler/src/snapshot.rs new file mode 100644 index 000000000..09dcde4d3 --- /dev/null +++ b/index-scheduler/src/snapshot.rs @@ -0,0 +1,182 @@ +use milli::{ + heed::{ + types::{OwnedType, SerdeBincode, SerdeJson, Str}, + Database, RoTxn, + }, + RoaringBitmapCodec, BEU32, +}; +use roaring::RoaringBitmap; + +use crate::{ + index_mapper::IndexMapper, + task::{Details, Task}, + IndexScheduler, Kind, Status, +}; + +pub fn snapshot_index(scheduler: &IndexScheduler) -> String { + let IndexScheduler { + processing_tasks, + file_store: _, + env, + all_tasks, + status, + kind, + index_tasks, + index_mapper, + wake_up: _, + test_breakpoint_sdr: _, + } = scheduler; + + let rtxn = env.read_txn().unwrap(); + + let mut snap = String::new(); + + let (_time, processing_tasks) = processing_tasks.read().unwrap().clone(); + snap.push_str("### Processing Tasks:\n"); + snap.push_str(&snapshot_bitmap(&processing_tasks)); + snap.push_str("\n----------------------------------------------------------------------\n"); + + snap.push_str("### All Tasks:\n"); + snap.push_str(&snapshot_all_tasks(&rtxn, *all_tasks)); + snap.push_str("----------------------------------------------------------------------\n"); + + snap.push_str("### Status:\n"); + snap.push_str(&snapshot_status(&rtxn, *status)); + snap.push_str("----------------------------------------------------------------------\n"); + + snap.push_str("### Kind:\n"); + snap.push_str(&snapshot_kind(&rtxn, *kind)); + snap.push_str("----------------------------------------------------------------------\n"); + + snap.push_str("### Index Tasks:\n"); + snap.push_str(&snapshot_index_tasks(&rtxn, *index_tasks)); + snap.push_str("----------------------------------------------------------------------\n"); + + snap.push_str("### Index Mapper:\n"); + snap.push_str(&snapshot_index_mapper(&rtxn, index_mapper)); + snap.push_str("\n----------------------------------------------------------------------\n"); + + rtxn.commit().unwrap(); + + snap +} + +fn snapshot_bitmap(r: &RoaringBitmap) -> String { + let mut snap = String::new(); + snap.push('['); + for x in r { + snap.push_str(&format!("{x},")); + } + snap.push(']'); + snap +} + +fn snapshot_all_tasks(rtxn: &RoTxn, db: Database, SerdeJson>) -> String { + let mut snap = String::new(); + let mut iter = db.iter(rtxn).unwrap(); + while let Some(next) = iter.next() { + let (task_id, task) = next.unwrap(); + snap.push_str(&format!("{task_id} {}\n", snapshot_task(&task))); + } + snap +} + +fn snapshot_task(task: &Task) -> String { + let mut snap = String::new(); + let Task { + uid, + enqueued_at: _, + started_at: _, + finished_at: _, + error, + details, + status, + kind, + } = task; + snap.push('{'); + snap.push_str(&format!("uid: {uid}, ")); + snap.push_str(&format!("status: {status}, ")); + if let Some(error) = error { + snap.push_str(&format!("error: {error:?}, ")); + } + if let Some(details) = details { + snap.push_str(&format!("details: {}, ", &snaphsot_details(details))); + } + snap.push_str(&format!("kind: {kind:?}")); + + snap.push('}'); + snap +} +fn snaphsot_details(d: &Details) -> String { + match d { + Details::DocumentAddition { + received_documents, + indexed_documents, + } => { + format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents} }}") + } + Details::Settings { settings } => { + format!("{{ settings: {settings:?} }}") + } + Details::IndexInfo { primary_key } => { + format!("{{ primary_key: {primary_key:?} }}") + } + Details::DocumentDeletion { + received_document_ids, + deleted_documents, + } => format!("{{ received_document_ids: {received_document_ids}, deleted_documents: {deleted_documents:?} }}"), + Details::ClearAll { deleted_documents } => { + format!("{{ deleted_documents: {deleted_documents:?} }}") + }, + Details::DeleteTasks { + matched_tasks, + deleted_tasks, + original_query, + } => { + format!("{{ matched_tasks: {matched_tasks:?}, deleted_tasks: {deleted_tasks:?}, original_query: {original_query:?} }}") + }, + Details::Dump { dump_uid } => { + format!("{{ dump_uid: {dump_uid:?} }}") + }, + } +} + +fn snapshot_status(rtxn: &RoTxn, db: Database, RoaringBitmapCodec>) -> String { + let mut snap = String::new(); + let mut iter = db.iter(rtxn).unwrap(); + while let Some(next) = iter.next() { + let (status, task_ids) = next.unwrap(); + snap.push_str(&format!("{status} {}\n", snapshot_bitmap(&task_ids))); + } + snap +} +fn snapshot_kind(rtxn: &RoTxn, db: Database, RoaringBitmapCodec>) -> String { + let mut snap = String::new(); + let mut iter = db.iter(rtxn).unwrap(); + while let Some(next) = iter.next() { + let (kind, task_ids) = next.unwrap(); + let kind = serde_json::to_string(&kind).unwrap(); + snap.push_str(&format!("{kind} {}\n", snapshot_bitmap(&task_ids))); + } + snap +} + +fn snapshot_index_tasks(rtxn: &RoTxn, db: Database) -> String { + let mut snap = String::new(); + let mut iter = db.iter(rtxn).unwrap(); + while let Some(next) = iter.next() { + let (index, task_ids) = next.unwrap(); + snap.push_str(&format!("{index} {}\n", snapshot_bitmap(&task_ids))); + } + snap +} + +fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String { + let names = mapper + .indexes(rtxn) + .unwrap() + .into_iter() + .map(|(n, _)| n) + .collect::>(); + format!("{names:?}") +} diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index f781a5c74..40c1473a0 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -4,13 +4,17 @@ use meilisearch_types::error::ResponseError; use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize, Serializer}; -use std::{fmt::Write, path::PathBuf, str::FromStr}; +use std::{ + fmt::{Display, Write}, + path::PathBuf, + str::FromStr, +}; use time::{Duration, OffsetDateTime}; use uuid::Uuid; -use crate::{Error, Query, TaskId}; +use crate::{Error, TaskId}; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct TaskView { pub uid: TaskId, @@ -21,9 +25,9 @@ pub struct TaskView { #[serde(rename = "type")] pub kind: Kind, - #[serde(skip_serializing_if = "Option::is_none", default)] - pub details: Option
, - #[serde(skip_serializing_if = "Option::is_none", default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, #[serde( @@ -92,7 +96,7 @@ impl Task { .and_then(|vec| vec.first().map(|i| i.to_string())), status: self.status, kind: self.kind.as_kind(), - details: self.details.clone(), + details: self.details.as_ref().map(Details::as_details_view), error: self.error.clone(), duration: self .started_at @@ -113,7 +117,16 @@ pub enum Status { Succeeded, Failed, } - +impl Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Status::Enqueued => write!(f, "enqueued"), + Status::Processing => write!(f, "processing"), + Status::Succeeded => write!(f, "succeeded"), + Status::Failed => write!(f, "failed"), + } + } +} impl FromStr for Status { type Err = Error; @@ -374,37 +387,107 @@ impl FromStr for Kind { } #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -#[serde(untagged)] #[allow(clippy::large_enum_variant)] pub enum Details { - #[serde(rename_all = "camelCase")] DocumentAddition { received_documents: u64, indexed_documents: u64, }, - #[serde(rename_all = "camelCase")] Settings { - #[serde(flatten)] settings: Settings, }, - #[serde(rename_all = "camelCase")] - IndexInfo { primary_key: Option }, - #[serde(rename_all = "camelCase")] + IndexInfo { + primary_key: Option, + }, DocumentDeletion { received_document_ids: usize, // TODO why is this optional? deleted_documents: Option, }, - #[serde(rename_all = "camelCase")] - ClearAll { deleted_documents: Option }, - #[serde(rename_all = "camelCase")] + ClearAll { + deleted_documents: Option, + }, DeleteTasks { matched_tasks: usize, deleted_tasks: Option, original_query: String, }, - #[serde(rename_all = "camelCase")] - Dump { dump_uid: String }, + Dump { + dump_uid: String, + }, +} +#[derive(Default, Debug, PartialEq, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DetailsView { + #[serde(skip_serializing_if = "Option::is_none")] + received_documents: Option, + #[serde(skip_serializing_if = "Option::is_none")] + indexed_documents: Option, + #[serde(skip_serializing_if = "Option::is_none")] + primary_key: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + received_document_ids: Option, + #[serde(skip_serializing_if = "Option::is_none")] + deleted_documents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + matched_tasks: Option, + #[serde(skip_serializing_if = "Option::is_none")] + deleted_tasks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + original_query: Option, + #[serde(skip_serializing_if = "Option::is_none")] + dump_uid: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(flatten)] + settings: Option>, +} +impl Details { + fn as_details_view(&self) -> DetailsView { + match self.clone() { + Details::DocumentAddition { + received_documents, + indexed_documents, + } => DetailsView { + received_documents: Some(received_documents), + indexed_documents: Some(indexed_documents), + ..DetailsView::default() + }, + Details::Settings { settings } => DetailsView { + settings: Some(settings), + ..DetailsView::default() + }, + Details::IndexInfo { primary_key } => DetailsView { + primary_key: Some(primary_key), + ..DetailsView::default() + }, + Details::DocumentDeletion { + received_document_ids, + deleted_documents, + } => DetailsView { + received_document_ids: Some(received_document_ids), + deleted_documents: Some(deleted_documents), + ..DetailsView::default() + }, + Details::ClearAll { deleted_documents } => DetailsView { + deleted_documents: Some(deleted_documents), + ..DetailsView::default() + }, + Details::DeleteTasks { + matched_tasks, + deleted_tasks, + original_query, + } => DetailsView { + matched_tasks: Some(matched_tasks), + deleted_tasks: Some(deleted_tasks), + original_query: Some(original_query), + ..DetailsView::default() + }, + Details::Dump { dump_uid } => DetailsView { + dump_uid: Some(dump_uid), + ..DetailsView::default() + }, + } + } } /// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for @@ -476,6 +559,6 @@ mod tests { let serialised = SerdeJson::
::bytes_encode(&details).unwrap(); let deserialised = SerdeJson::
::bytes_decode(&serialised).unwrap(); assert_smol_debug_snapshot!(details, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###); - assert_smol_debug_snapshot!(deserialised, @"Settings { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, synonyms: NotSet, distinct_attribute: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, _kind: PhantomData } }"); + assert_smol_debug_snapshot!(deserialised, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###); } }