diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index cae74c03c..257844237 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -71,7 +71,7 @@ impl BatchKind { allow_index_creation, settings_ids: vec![task_id], }), - Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(), + Kind::DumpExport | Kind::Snapshot | Kind::CancelTask | Kind::DeleteTasks => unreachable!(), } } @@ -320,7 +320,9 @@ impl BatchKind { import_ids, }) } - (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), + (_, Kind::CancelTask | Kind::DeleteTasks | Kind::DumpExport | Kind::Snapshot) => { + unreachable!() + } ( BatchKind::IndexCreation { .. } | BatchKind::IndexDeletion { .. } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 1f9fbdcd4..f445ddc1c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -7,14 +7,16 @@ use index::apply_settings_to_builder; use index::error::IndexError; use index::{Settings, Unchecked}; use log::{debug, info}; -use milli::documents::DocumentsBatchReader; use milli::heed::{RoTxn, RwTxn}; use milli::update::IndexDocumentsConfig; use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod}; +use milli::{documents::DocumentsBatchReader, BEU32}; +use roaring::RoaringBitmap; use uuid::Uuid; pub(crate) enum Batch { Cancel(Task), + DeleteTasks(Task), Snapshot(Vec), Dump(Vec), IndexOperation(IndexOperation), @@ -89,6 +91,7 @@ impl Batch { pub fn ids(&self) -> Vec { match self { Batch::Cancel(task) + | Batch::DeleteTasks(task) | Batch::IndexCreation { task, .. } | Batch::IndexUpdate { task, .. } => vec![task.uid], Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::IndexDeletion { tasks, .. } => { @@ -355,9 +358,10 @@ impl IndexScheduler { /// Create the next batch to be processed; /// 1. We get the *last* task to cancel. - /// 2. We get the *next* snapshot to process. - /// 3. We get the *next* dump to process. - /// 4. We get the *next* tasks to process for a specific index. + /// 2. We get the *next* task to delete. + /// 3. We get the *next* snapshot to process. + /// 4. We get the *next* dump to process. + /// 5. We get the *next* tasks to process for a specific index. pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result> { let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; @@ -370,7 +374,17 @@ impl IndexScheduler { ))); } - // 2. we batch the snapshot. + // 2. we get the next task to delete + let to_delete = self.get_kind(rtxn, Kind::DeleteTasks)?; + if let Some(task_id) = to_delete.min() { + let task = self + .get_task(rtxn, task_id)? + .ok_or(Error::CorruptedTaskQueue)?; + println!("DeletionTask: {task:?}"); + return Ok(Some(Batch::DeleteTasks(task))); + } + + // 3. we batch the snapshot. let to_snapshot = self.get_kind(rtxn, Kind::Snapshot)? & enqueued; if !to_snapshot.is_empty() { return Ok(Some(Batch::Snapshot( @@ -378,13 +392,13 @@ impl IndexScheduler { ))); } - // 3. we batch the dumps. + // 4. we batch the dumps. let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued; if !to_dump.is_empty() { return Ok(Some(Batch::Dump(self.get_existing_tasks(rtxn, to_dump)?))); } - // 4. We take the next task and try to batch all the tasks associated with this index. + // 5. We take the next task and try to batch all the tasks associated with this index. if let Some(task_id) = enqueued.min() { let task = self .get_task(rtxn, task_id)? @@ -427,6 +441,34 @@ impl IndexScheduler { pub(crate) fn process_batch(&self, batch: Batch) -> Result> { match batch { Batch::Cancel(_) => todo!(), + Batch::DeleteTasks(mut task) => { + println!("delete task: {task:?}"); + // 1. Retrieve the tasks that matched the quety at enqueue-time + let matched_tasks = + if let KindWithContent::DeleteTasks { tasks, query: _ } = &task.kind { + tasks + } else { + unreachable!() + }; + println!("matched tasks: {matched_tasks:?}"); + let mut wtxn = self.env.write_txn()?; + let nbr_deleted_tasks = self.delete_matched_tasks(&mut wtxn, matched_tasks)?; + println!("nbr_deleted_tasks: {nbr_deleted_tasks}"); + task.status = Status::Succeeded; + match &mut task.details { + Some(Details::DeleteTasks { + matched_tasks: _, + deleted_tasks, + original_query: _, + }) => { + *deleted_tasks = Some(nbr_deleted_tasks); + } + _ => unreachable!(), + } + + wtxn.commit()?; + Ok(vec![task]) + } Batch::Snapshot(_) => todo!(), Batch::Dump(_) => todo!(), Batch::IndexOperation(operation) => { @@ -481,6 +523,7 @@ impl IndexScheduler { primary_key, mut task, } => { + println!("IndexUpdate task: {task:?}"); let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; @@ -767,4 +810,55 @@ impl IndexScheduler { } } } + + /// Delete each given task from all the databases (if it is deleteable). + /// + /// Return the number of tasks that were actually deleted + fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &[u32]) -> Result { + // 1. Remove from this list the tasks that we are not allowed to delete + let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; + + let processing_tasks = &self.processing_tasks.read().unwrap().1; + let to_delete_tasks = matched_tasks + .iter() + .filter(|&&task_id| { + !processing_tasks.contains(task_id) && !enqueued_tasks.contains(task_id) + }) + .copied(); + let to_delete_tasks = RoaringBitmap::from_iter(to_delete_tasks); + // 2. We now have a list of tasks to delete, delete them + + for task_id in to_delete_tasks.iter() { + let task = self.all_tasks.get(wtxn, &BEU32::new(task_id))?.unwrap(); + self.delete_task(wtxn, &task)?; + } + + Ok(to_delete_tasks.len() as usize) + } + + /// Delete the given task from all the databases + fn delete_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { + let task_id = BEU32::new(task.uid); + + if let Some(indexes) = task.indexes() { + for index in indexes { + self.update_index(wtxn, index, |bitmap| { + bitmap.remove(task.uid); + })?; + } + } + + self.update_status(wtxn, task.status, |bitmap| { + bitmap.remove(task.uid); + })?; + + self.update_kind(wtxn, task.kind.as_kind(), |bitmap| { + (bitmap.remove(task.uid)); + })?; + + task.remove_data()?; + self.all_tasks.delete(wtxn, &task_id)?; + + Ok(()) + } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 917ed9d55..59d3bd4ac 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock}; use file_store::{File, FileStore}; use meilisearch_types::error::ResponseError; use roaring::RoaringBitmap; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use synchronoise::SignalEvent; use time::OffsetDateTime; use uuid::Uuid; @@ -34,7 +34,7 @@ use crate::task::Task; const DEFAULT_LIMIT: fn() -> u32 = || 20; -#[derive(derive_builder::Builder, Debug, Clone, Deserialize)] +#[derive(derive_builder::Builder, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct Query { #[serde(default = "DEFAULT_LIMIT")] @@ -319,7 +319,7 @@ impl IndexScheduler { started_at: None, finished_at: None, error: None, - details: None, + details: task.default_details(), status: Status::Enqueued, kind: task, }; @@ -449,6 +449,8 @@ impl IndexScheduler { #[cfg(test)] mod tests { + use std::os::unix::process; + use big_s::S; use insta::*; use milli::update::IndexDocumentsMethod::ReplaceDocuments; @@ -688,6 +690,100 @@ mod tests { assert_eq!(tasks[3].status, Status::Succeeded); } + #[test] + fn task_deletion() { + let (index_scheduler, handle) = IndexScheduler::test(); + + let to_enqueue = [ + KindWithContent::IndexCreation { + index_uid: S("catto"), + primary_key: Some(S("mouse")), + }, + KindWithContent::DocumentImport { + index_uid: S("catto"), + primary_key: None, + method: ReplaceDocuments, + content_file: Uuid::new_v4(), + documents_count: 12, + allow_index_creation: true, + }, + KindWithContent::DocumentImport { + index_uid: S("doggo"), + primary_key: Some(S("bone")), + method: ReplaceDocuments, + content_file: Uuid::new_v4(), + documents_count: 5000, + allow_index_creation: true, + }, + ]; + + for task in to_enqueue { + let _ = index_scheduler.register(task).unwrap(); + } + let rtxn = index_scheduler.env.read_txn().unwrap(); + let mut all_tasks = Vec::new(); + for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() { + all_tasks.push(ret.unwrap().0); + } + rtxn.commit().unwrap(); + + assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2)]"); + + index_scheduler.register(KindWithContent::DeleteTasks { + query: "test_query".to_owned(), + tasks: vec![0, 1], + }); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let task = index_scheduler + .all_tasks + .get(&rtxn, &BEU32::new(3)) + .unwrap() + .unwrap(); + rtxn.commit().unwrap(); + println!("TASK IN DB: {task:?}"); + + let rtxn = index_scheduler.env.read_txn().unwrap(); + let mut all_tasks = Vec::new(); + for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() { + all_tasks.push(ret.unwrap().0); + } + rtxn.commit().unwrap(); + + assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]"); + + handle.wait_till(Breakpoint::BatchCreated); + + // the last task, with uid = 3, should be marked as processing + let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1; + assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>"); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let task = index_scheduler + .all_tasks + .get(&rtxn, &BEU32::new(3)) + .unwrap() + .unwrap(); + rtxn.commit().unwrap(); + println!("TASK IN DB: {task:?}"); + + // handle.wait_till(Breakpoint::AfterProcessing); + + // let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1; + // assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>"); + + // let rtxn = index_scheduler.env.read_txn().unwrap(); + // let mut all_tasks = Vec::new(); + // for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() { + // all_tasks.push(ret.unwrap().0); + // } + // rtxn.commit().unwrap(); + + // assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]"); + + // index_scheduler.register(KindWithContent::DocumentClear { index_uid: 0 }); + // index_scheduler.register(KindWithContent::CancelTask { tasks: vec![0] }); + // index_scheduler.register(KindWithContendt::DeleteTasks { tasks: vec![0] }); + } + #[test] fn document_addition() { let (index_scheduler, handle) = IndexScheduler::test(true); diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 4564ad3c4..f781a5c74 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -8,7 +8,7 @@ use std::{fmt::Write, path::PathBuf, str::FromStr}; use time::{Duration, OffsetDateTime}; use uuid::Uuid; -use crate::{Error, TaskId}; +use crate::{Error, Query, TaskId}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -170,6 +170,10 @@ pub enum KindWithContent { CancelTask { tasks: Vec, }, + DeleteTasks { + query: String, + tasks: Vec, + }, DumpExport { output: PathBuf, }, @@ -200,6 +204,7 @@ impl KindWithContent { KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate, KindWithContent::IndexSwap { .. } => Kind::IndexSwap, KindWithContent::CancelTask { .. } => Kind::CancelTask, + KindWithContent::DeleteTasks { .. } => Kind::DeleteTasks, KindWithContent::DumpExport { .. } => Kind::DumpExport, KindWithContent::Snapshot => Kind::Snapshot, } @@ -222,6 +227,7 @@ impl KindWithContent { | IndexUpdate { .. } | IndexSwap { .. } | CancelTask { .. } + | DeleteTasks { .. } | DumpExport { .. } | Snapshot => Ok(()), // There is nothing to persist for all these tasks } @@ -244,6 +250,7 @@ impl KindWithContent { | IndexUpdate { .. } | IndexSwap { .. } | CancelTask { .. } + | DeleteTasks { .. } | DumpExport { .. } | Snapshot => Ok(()), // There is no data associated with all these tasks } @@ -253,7 +260,7 @@ impl KindWithContent { use KindWithContent::*; match self { - DumpExport { .. } | Snapshot | CancelTask { .. } => None, + DumpExport { .. } | Snapshot | CancelTask { .. } | DeleteTasks { .. } => None, DocumentImport { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentClear { index_uid } @@ -299,6 +306,11 @@ impl KindWithContent { KindWithContent::CancelTask { .. } => { todo!() } + KindWithContent::DeleteTasks { query, tasks } => Some(Details::DeleteTasks { + matched_tasks: tasks.len(), + deleted_tasks: None, + original_query: query.clone(), + }), KindWithContent::DumpExport { .. } => None, KindWithContent::Snapshot => None, } @@ -322,6 +334,7 @@ pub enum Kind { IndexUpdate, IndexSwap, CancelTask, + DeleteTasks, DumpExport, Snapshot, } @@ -352,6 +365,7 @@ impl FromStr for Kind { "index_update" => Ok(Kind::IndexUpdate), "index_swap" => Ok(Kind::IndexSwap), "cancel_task" => Ok(Kind::CancelTask), + "delete_tasks" => Ok(Kind::DeleteTasks), "dump_export" => Ok(Kind::DumpExport), "snapshot" => Ok(Kind::Snapshot), s => Err(Error::InvalidKind(s.to_string())), @@ -384,6 +398,12 @@ pub enum Details { #[serde(rename_all = "camelCase")] ClearAll { deleted_documents: Option }, #[serde(rename_all = "camelCase")] + DeleteTasks { + matched_tasks: usize, + deleted_tasks: Option, + original_query: String, + }, + #[serde(rename_all = "camelCase")] Dump { dump_uid: String }, } @@ -437,3 +457,25 @@ fn serialize_duration( None => serializer.serialize_none(), } } + +#[cfg(test)] +mod tests { + use milli::heed::{types::SerdeJson, BytesDecode, BytesEncode}; + + use crate::assert_smol_debug_snapshot; + + use super::Details; + + #[test] + fn bad_deser() { + let details = Details::DeleteTasks { + matched_tasks: 1, + deleted_tasks: None, + original_query: "hello".to_owned(), + }; + let serialised = SerdeJson::
::bytes_encode(&details).unwrap(); + let deserialised = SerdeJson::
::bytes_decode(&serialised).unwrap(); + assert_smol_debug_snapshot!(details, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###); + assert_smol_debug_snapshot!(deserialised, @"Settings { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, synonyms: NotSet, distinct_attribute: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, _kind: PhantomData } }"); + } +}