diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 257844237..2d054c41a 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -71,7 +71,9 @@ impl BatchKind { allow_index_creation, settings_ids: vec![task_id], }), - Kind::DumpExport | Kind::Snapshot | Kind::CancelTask | Kind::DeleteTasks => unreachable!(), + Kind::DumpExport | Kind::Snapshot | Kind::CancelTask | Kind::DeleteTasks => { + unreachable!() + } } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 04ffcacd7..6c7a0ceaf 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -456,10 +456,66 @@ mod tests { use tempfile::TempDir; use uuid::Uuid; - use crate::{assert_smol_debug_snapshot, snapshot::snapshot_index_scheduler}; + use crate::snapshot::snapshot_index_scheduler; use super::*; + /// Return a `KindWithContent::IndexCreation` task + fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent { + KindWithContent::IndexCreation { + index_uid: S(index), + primary_key: Some(S(primary_key)), + } + } + /// Create a `KindWithContent::DocumentImport` task that imports documents. + /// + /// - `index_uid` is given as parameter + /// - `primary_key` is given as parameter + /// - `method` is set to `ReplaceDocuments` + /// - `content_file` is given as parameter + /// - `documents_count` is given as parameter + /// - `allow_index_creation` is set to `true` + fn replace_document_import_task( + index: &'static str, + primary_key: Option<&'static str>, + content_file_uuid: u128, + documents_count: u64, + ) -> KindWithContent { + KindWithContent::DocumentImport { + index_uid: S(index), + primary_key: primary_key.map(ToOwned::to_owned), + method: ReplaceDocuments, + content_file: Uuid::from_u128(content_file_uuid), + documents_count: documents_count, + allow_index_creation: true, + } + } + + /// Create an update file with the given file uuid. + /// + /// The update file contains just one simple document whose id is given by `document_id`. + /// + /// The uuid of the file and its documents count is returned. + fn sample_documents( + index_scheduler: &IndexScheduler, + file_uuid: u128, + document_id: usize, + ) -> (File, u64) { + let content = format!( + r#" + {{ + "id" : "{document_id}" + }}"# + ); + + let (_uuid, mut file) = index_scheduler + .create_update_file_with_uuid(file_uuid) + .unwrap(); + let documents_count = + document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; + (file, documents_count) + } + impl IndexScheduler { pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) { let tempdir = TempDir::new().unwrap(); @@ -496,6 +552,7 @@ mod tests { self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); } + #[allow(unused)] /// Wait until the provided breakpoint is reached. fn next_breakpoint(&self) -> Breakpoint { self.test_breakpoint_rcv.recv().unwrap() @@ -512,40 +569,16 @@ mod tests { #[test] fn register() { - let (index_scheduler, handle) = IndexScheduler::test(); + // In this test, the handle doesn't make any progress, we only check that the tasks are registered + let (index_scheduler, _handle) = IndexScheduler::test(); let kinds = [ - KindWithContent::IndexCreation { - index_uid: S("catto"), - primary_key: Some(S("mouse")), - }, - KindWithContent::DocumentImport { - index_uid: S("catto"), - primary_key: None, - method: ReplaceDocuments, - content_file: Uuid::from_u128(0), - documents_count: 12, - allow_index_creation: true, - }, + index_creation_task("catto", "mouse"), + replace_document_import_task("catto", None, 0, 12), KindWithContent::CancelTask { tasks: vec![0, 1] }, - KindWithContent::DocumentImport { - index_uid: S("catto"), - primary_key: None, - method: ReplaceDocuments, - content_file: Uuid::from_u128(1), - documents_count: 50, - allow_index_creation: true, - }, - KindWithContent::DocumentImport { - index_uid: S("doggo"), - primary_key: Some(S("bone")), - method: ReplaceDocuments, - content_file: Uuid::from_u128(2), - documents_count: 5000, - allow_index_creation: true, - }, + replace_document_import_task("catto", None, 1, 50), + replace_document_import_task("doggo", Some("bone"), 2, 5000), ]; - let mut inserted_tasks = Vec::new(); for (idx, kind) in kinds.into_iter().enumerate() { let k = kind.as_kind(); let task = index_scheduler.register(kind).unwrap(); @@ -553,8 +586,6 @@ mod tests { assert_eq!(task.uid, idx as u32); assert_eq!(task.status, Status::Enqueued); assert_eq!(task.kind, k); - - inserted_tasks.push(task); } assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); @@ -656,49 +687,85 @@ mod tests { } #[test] - fn task_deletion() { + fn task_deletion_undeleteable() { let (index_scheduler, handle) = IndexScheduler::test(); let to_enqueue = [ - KindWithContent::IndexCreation { - index_uid: S("catto"), - primary_key: Some(S("mouse")), - }, - KindWithContent::DocumentImport { - index_uid: S("catto"), - primary_key: None, - method: ReplaceDocuments, - content_file: Uuid::from_u128(0), - documents_count: 12, - allow_index_creation: true, - }, - KindWithContent::DocumentImport { - index_uid: S("doggo"), - primary_key: Some(S("bone")), - method: ReplaceDocuments, - content_file: Uuid::from_u128(1), - documents_count: 5000, - allow_index_creation: true, - }, + index_creation_task("catto", "mouse"), + replace_document_import_task("catto", None, 0, 12), + replace_document_import_task("doggo", Some("bone"), 1, 5000), ]; for task in to_enqueue { let _ = index_scheduler.register(task).unwrap(); } + // here we have registered all the tasks, but the index scheduler + // has not progressed at all assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); - index_scheduler.register(KindWithContent::DeleteTasks { - query: "test_query".to_owned(), - tasks: vec![0, 1], - }); + index_scheduler + .register(KindWithContent::DeleteTasks { + query: "test_query".to_owned(), + tasks: vec![0, 1], + }) + .unwrap(); + + // again, no progress made at all, but one more task is registered assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); + // now we create the first batch handle.wait_till(Breakpoint::BatchCreated); + // the task deletion should now be "processing" assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); handle.wait_till(Breakpoint::AfterProcessing); + // after the task deletion is processed, no task should actually have been deleted, + // because the tasks with ids 0 and 1 were still "enqueued", and thus undeleteable + // the "task deletion" task should be marked as "succeeded" and, in its details, the + // number of deleted tasks should be 0 + assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); + + handle.dont_block(); + } + + #[test] + fn task_deletion_deleteable() { + let (index_scheduler, handle) = IndexScheduler::test(); + + let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0); + let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1); + + let to_enqueue = [ + replace_document_import_task("catto", None, 0, documents_count0), + replace_document_import_task("doggo", Some("bone"), 1, documents_count1), + ]; + + for task in to_enqueue { + let _ = index_scheduler.register(task).unwrap(); + } + file0.persist().unwrap(); + file1.persist().unwrap(); + + assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); + + handle.wait_till(Breakpoint::AfterProcessing); + // first addition of documents should be successful + // TODO: currently the result of this operation is incorrect! + // only the first task should be successful, because it should not be batched with + // the second task, that operates on a different index! + assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Now we delete the first task + index_scheduler + .register(KindWithContent::DeleteTasks { + query: "test_query".to_owned(), + tasks: vec![0], + }) + .unwrap(); + + handle.wait_till(Breakpoint::AfterProcessing); assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); handle.dont_block(); diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable-2.snap b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable-2.snap new file mode 100644 index 000000000..a426399c4 --- /dev/null +++ b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable-2.snap @@ -0,0 +1,26 @@ +--- +source: index-scheduler/src/lib.rs +expression: snapshot_index_scheduler(&index_scheduler) +--- +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: 1 }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} +1 {uid: 1, status: succeeded, details: { received_documents: 1, indexed_documents: 1 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,] +---------------------------------------------------------------------- +### Kind: +{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [0,1,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [1,] +---------------------------------------------------------------------- +### Index Mapper: +["catto"] +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable-3.snap b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable-3.snap new file mode 100644 index 000000000..5767a5b36 --- /dev/null +++ b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable-3.snap @@ -0,0 +1,27 @@ +--- +source: index-scheduler/src/lib.rs +expression: snapshot_index_scheduler(&index_scheduler) +--- +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +1 {uid: 1, status: succeeded, details: { received_documents: 1, indexed_documents: 1 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} +2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_query: "test_query" }, kind: DeleteTasks { query: "test_query", tasks: [0] }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [1,2,] +---------------------------------------------------------------------- +### Kind: +{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [1,] +"deleteTasks" [2,] +---------------------------------------------------------------------- +### Index Tasks: +catto [] +doggo [1,] +---------------------------------------------------------------------- +### Index Mapper: +["catto"] +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable.snap b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable.snap new file mode 100644 index 000000000..bc10d0f55 --- /dev/null +++ b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_deleteable.snap @@ -0,0 +1,25 @@ +--- +source: index-scheduler/src/lib.rs +expression: snapshot_index_scheduler(&index_scheduler) +--- +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: 0 }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: 0 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [0,1,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [1,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion-2.snap b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable-2.snap similarity index 100% rename from index-scheduler/src/snapshots/index_scheduler__tests__task_deletion-2.snap rename to index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable-2.snap diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion-3.snap b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable-3.snap similarity index 100% rename from index-scheduler/src/snapshots/index_scheduler__tests__task_deletion-3.snap rename to index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable-3.snap diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion-4.snap b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable-4.snap similarity index 100% rename from index-scheduler/src/snapshots/index_scheduler__tests__task_deletion-4.snap rename to index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable-4.snap diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion.snap b/index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable.snap similarity index 100% rename from index-scheduler/src/snapshots/index_scheduler__tests__task_deletion.snap rename to index-scheduler/src/snapshots/index_scheduler__tests__task_deletion_undeleteable.snap