diff --git a/file-store/src/lib.rs b/file-store/src/lib.rs index 75db9bb5f..e3851a2df 100644 --- a/file-store/src/lib.rs +++ b/file-store/src/lib.rs @@ -56,7 +56,7 @@ impl FileStore { let file = NamedTempFile::new_in(&self.path)?; let uuid = Uuid::new_v4(); let path = self.path.join(uuid.to_string()); - let update_file = File { file, path }; + let update_file = File { dry: false, file, path }; Ok((uuid, update_file)) } @@ -67,7 +67,7 @@ impl FileStore { let file = NamedTempFile::new_in(&self.path)?; let uuid = Uuid::from_u128(uuid); let path = self.path.join(uuid.to_string()); - let update_file = File { file, path }; + let update_file = File { dry: false, file, path }; Ok((uuid, update_file)) } @@ -135,13 +135,29 @@ impl FileStore { } pub struct File { + dry: bool, path: PathBuf, file: NamedTempFile, } impl File { + pub fn dry_file() -> Result { + #[cfg(target_family = "unix")] + let path = PathBuf::from_str("/dev/null").unwrap(); + #[cfg(target_family = "windows")] + let path = PathBuf::from_str("\\Device\\Null").unwrap(); + + Ok(Self { + dry: true, + path: path.clone(), + file: tempfile::Builder::new().make(|_| std::fs::File::create(path.clone()))?, + }) + } + pub fn persist(self) -> Result<()> { - self.file.persist(&self.path)?; + if !self.dry { + self.file.persist(&self.path)?; + } Ok(()) } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 9a1799469..5d0ce9eb9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1001,7 +1001,12 @@ impl IndexScheduler { /// Register a new task in the scheduler. /// /// If it fails and data was associated with the task, it tries to delete the associated data. - pub fn register(&self, kind: KindWithContent, task_id: Option) -> Result { + pub fn register( + &self, + kind: KindWithContent, + task_id: Option, + dry_run: bool, + ) -> Result { let mut wtxn = self.env.write_txn()?; // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task @@ -1037,6 +1042,11 @@ impl IndexScheduler { // (that it does not contain duplicate indexes). check_index_swap_validity(&task)?; + // At this point the task is going to be registered and no further checks will be done + if dry_run { + return Ok(task); + } + // Get rid of the mutability. let task = task; @@ -1101,8 +1111,12 @@ impl IndexScheduler { /// The returned file and uuid can be used to associate /// some data to a task. The file will be kept until /// the task has been fully processed. - pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { - Ok(self.file_store.new_update()?) + pub fn create_update_file(&self, dry_run: bool) -> Result<(Uuid, file_store::File)> { + if dry_run { + Ok((Uuid::nil(), file_store::File::dry_file()?)) + } else { + Ok(self.file_store.new_update()?) + } } #[cfg(test)] @@ -1413,6 +1427,7 @@ impl IndexScheduler { tasks: to_delete, }, None, + false, )?; Ok(()) @@ -1534,7 +1549,7 @@ impl<'a> Dump<'a> { ) -> Result { let content_uuid = match content_file { Some(content_file) if task.status == Status::Enqueued => { - let (uuid, mut file) = self.index_scheduler.create_update_file()?; + let (uuid, mut file) = self.index_scheduler.create_update_file(false)?; let mut builder = DocumentsBatchBuilder::new(file.as_file_mut()); for doc in content_file { builder.append_json_object(&doc?)?; @@ -2038,7 +2053,7 @@ mod tests { for (idx, kind) in kinds.into_iter().enumerate() { let k = kind.as_kind(); - let task = index_scheduler.register(kind, None).unwrap(); + let task = index_scheduler.register(kind, None, false).unwrap(); index_scheduler.assert_internally_consistent(); assert_eq!(task.uid, idx as u32); @@ -2053,18 +2068,18 @@ mod tests { fn insert_task_while_another_task_is_processing() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - index_scheduler.register(index_creation_task("index_a", "id"), None).unwrap(); + index_scheduler.register(index_creation_task("index_a", "id"), None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_till([Start, BatchCreated]); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_batch_creation"); // while the task is processing can we register another task? - index_scheduler.register(index_creation_task("index_b", "id"), None).unwrap(); + index_scheduler.register(index_creation_task("index_b", "id"), None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("index_a") }, None) + .register(KindWithContent::IndexDeletion { index_uid: S("index_a") }, None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); } @@ -2073,7 +2088,7 @@ mod tests { fn test_task_is_processing() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); - index_scheduler.register(index_creation_task("index_a", "id"), None).unwrap(); + index_scheduler.register(index_creation_task("index_a", "id"), None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_a_task"); handle.advance_till([Start, BatchCreated]); @@ -2090,6 +2105,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2098,12 +2114,13 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("cattos"), primary_key: None }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) + .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); @@ -2125,22 +2142,23 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); index_scheduler - .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) + .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) + .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); index_scheduler - .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None) + .register(KindWithContent::DocumentClear { index_uid: S("doggos") }, None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_fourth_task"); @@ -2173,7 +2191,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task, None).unwrap(); + let _ = index_scheduler.register(task, None, false).unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2188,6 +2206,7 @@ mod tests { tasks: RoaringBitmap::from_iter([0, 1]), }, None, + false, ) .unwrap(); // again, no progress made at all, but one more task is registered @@ -2222,7 +2241,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task, None).unwrap(); + let _ = index_scheduler.register(task, None, false).unwrap(); index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued"); @@ -2239,6 +2258,7 @@ mod tests { tasks: RoaringBitmap::from_iter([0]), }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_task_deletion"); @@ -2262,7 +2282,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task, None).unwrap(); + let _ = index_scheduler.register(task, None, false).unwrap(); index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued"); @@ -2280,6 +2300,7 @@ mod tests { tasks: RoaringBitmap::from_iter([0]), }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -2313,6 +2334,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register"); @@ -2338,6 +2360,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2356,12 +2379,13 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) + .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); @@ -2395,6 +2419,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2405,6 +2430,7 @@ mod tests { documents_ids: vec![S("1"), S("2")], }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); @@ -2434,6 +2460,7 @@ mod tests { documents_ids: vec![S("1"), S("2")], }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2458,6 +2485,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); @@ -2495,6 +2523,7 @@ mod tests { primary_key: None, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -2502,7 +2531,11 @@ mod tests { for name in index_names { index_scheduler - .register(KindWithContent::DocumentClear { index_uid: name.to_string() }, None) + .register( + KindWithContent::DocumentClear { index_uid: name.to_string() }, + None, + false, + ) .unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2527,7 +2560,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task, None).unwrap(); + let _ = index_scheduler.register(task, None, false).unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2549,6 +2582,7 @@ mod tests { ], }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_registered"); @@ -2558,6 +2592,7 @@ mod tests { swaps: vec![IndexSwap { indexes: ("a".to_owned(), "c".to_owned()) }], }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "two_swaps_registered"); @@ -2568,7 +2603,9 @@ mod tests { handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed"); - index_scheduler.register(KindWithContent::IndexSwap { swaps: vec![] }, None).unwrap(); + index_scheduler + .register(KindWithContent::IndexSwap { swaps: vec![] }, None, false) + .unwrap(); handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed"); } @@ -2585,7 +2622,7 @@ mod tests { ]; for task in to_enqueue { - let _ = index_scheduler.register(task, None).unwrap(); + let _ = index_scheduler.register(task, None, false).unwrap(); index_scheduler.assert_internally_consistent(); } handle.advance_n_successful_batches(4); @@ -2603,6 +2640,7 @@ mod tests { ], }, None, + false, ) .unwrap_err(); snapshot!(format!("{err}"), @"Indexes must be declared only once during a swap. `a`, `b` were specified several times."); @@ -2621,6 +2659,7 @@ mod tests { ], }, None, + false, ) .unwrap(); handle.advance_one_failed_batch(); @@ -2652,10 +2691,11 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler - .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None) + .register(KindWithContent::IndexDeletion { index_uid: S("doggos") }, None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler)); @@ -2680,7 +2720,7 @@ mod tests { }, ]; for task in to_enqueue { - let _ = index_scheduler.register(task, None).unwrap(); + let _ = index_scheduler.register(task, None, false).unwrap(); index_scheduler.assert_internally_consistent(); } @@ -2697,7 +2737,7 @@ mod tests { file0.persist().unwrap(); let _ = index_scheduler - .register(replace_document_import_task("catto", None, 0, documents_count0), None) + .register(replace_document_import_task("catto", None, 0, documents_count0), None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2711,6 +2751,7 @@ mod tests { tasks: RoaringBitmap::from_iter([0]), }, None, + false, ) .unwrap(); @@ -2726,7 +2767,7 @@ mod tests { file0.persist().unwrap(); let _ = index_scheduler - .register(replace_document_import_task("catto", None, 0, documents_count0), None) + .register(replace_document_import_task("catto", None, 0, documents_count0), None, false) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -2740,6 +2781,7 @@ mod tests { tasks: RoaringBitmap::from_iter([0]), }, None, + false, ) .unwrap(); @@ -2770,7 +2812,7 @@ mod tests { replace_document_import_task("wolfo", None, 2, documents_count2), ]; for task in to_enqueue { - let _ = index_scheduler.register(task, None).unwrap(); + let _ = index_scheduler.register(task, None, false).unwrap(); index_scheduler.assert_internally_consistent(); } handle.advance_one_successful_batch(); @@ -2784,6 +2826,7 @@ mod tests { tasks: RoaringBitmap::from_iter([0, 1, 2]), }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processing_second_task_cancel_enqueued"); @@ -2822,6 +2865,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -2872,6 +2916,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -2924,6 +2969,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -2977,6 +3023,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3031,6 +3078,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3076,13 +3124,13 @@ mod tests { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let kind = index_creation_task("doggo", "bone"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); let kind = index_creation_task("whalo", "plankton"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); let kind = index_creation_task("catto", "his_own_vomit"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_third_task"); handle.advance_n_successful_batches(3); @@ -3140,11 +3188,11 @@ mod tests { IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); let kind = index_creation_task("whalo", "fish"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); @@ -3363,17 +3411,17 @@ mod tests { IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); let kind = KindWithContent::IndexSwap { swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }], }; - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); let kind = KindWithContent::IndexSwap { swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }], }; - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); @@ -3449,20 +3497,20 @@ mod tests { IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); let kind = index_creation_task("catto", "mouse"); - let _ = index_scheduler.register(kind, None).unwrap(); + let _ = index_scheduler.register(kind, None, false).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _ = index_scheduler.register(kind, None).unwrap(); + let _ = index_scheduler.register(kind, None, false).unwrap(); let kind = KindWithContent::IndexSwap { swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }], }; - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); handle.advance_n_successful_batches(1); let kind = KindWithContent::TaskCancelation { query: "test_query".to_string(), tasks: [0, 1, 2, 3].into_iter().collect(), }; - let task_cancelation = index_scheduler.register(kind, None).unwrap(); + let task_cancelation = index_scheduler.register(kind, None, false).unwrap(); handle.advance_n_successful_batches(1); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); @@ -3497,7 +3545,7 @@ mod tests { let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_register"); handle.advance_one_failed_batch(); @@ -3532,6 +3580,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -3573,6 +3622,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -3632,6 +3682,7 @@ mod tests { allow_index_creation: false, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3683,6 +3734,7 @@ mod tests { allow_index_creation: false, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3714,6 +3766,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -3743,6 +3796,7 @@ mod tests { allow_index_creation: false, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3779,6 +3833,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -3808,6 +3863,7 @@ mod tests { allow_index_creation: false, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3848,6 +3904,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }, None, + false, ) .unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); @@ -3878,6 +3935,7 @@ mod tests { allow_index_creation, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3934,6 +3992,7 @@ mod tests { allow_index_creation, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -3989,6 +4048,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -4053,6 +4113,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -4113,6 +4174,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -4197,6 +4259,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -4283,6 +4346,7 @@ mod tests { allow_index_creation: true, }, None, + false, ) .unwrap(); index_scheduler.assert_internally_consistent(); @@ -4337,7 +4401,7 @@ mod tests { let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); handle.advance_till([Start, BatchCreated, ProcessBatchFailed, AfterProcessing]); @@ -4360,6 +4424,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); handle.advance_one_successful_batch(); @@ -4368,6 +4433,7 @@ mod tests { let result = index_scheduler.register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ); if result.is_err() { break; @@ -4381,6 +4447,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap_err(); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); @@ -4392,6 +4459,7 @@ mod tests { .register( KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() }, None, + false, ) .unwrap_err(); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); @@ -4403,6 +4471,7 @@ mod tests { .register( KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }, None, + false, ) .unwrap(); handle.advance_one_successful_batch(); @@ -4412,6 +4481,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); handle.advance_one_failed_batch(); @@ -4428,6 +4498,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); handle.advance_one_successful_batch(); @@ -4436,6 +4507,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); handle.advance_one_failed_batch(); @@ -4446,12 +4518,14 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); index_scheduler .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); @@ -4507,6 +4581,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); handle.advance_one_successful_batch(); @@ -4515,6 +4590,7 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); handle.advance_one_failed_batch(); @@ -4525,12 +4601,14 @@ mod tests { .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); index_scheduler .register( KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }, None, + false, ) .unwrap(); @@ -4555,11 +4633,11 @@ mod tests { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let kind = index_creation_task("catto", "mouse"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); let kind = index_creation_task("doggo", "sheep"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); let kind = index_creation_task("whalo", "fish"); - let _task = index_scheduler.register(kind, None).unwrap(); + let _task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###" { @@ -4709,11 +4787,11 @@ mod tests { query: "cancel dump".to_owned(), tasks: RoaringBitmap::from_iter([0]), }; - let _ = index_scheduler.register(dump_creation, None).unwrap(); + let _ = index_scheduler.register(dump_creation, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_dump_register"); handle.advance_till([Start, BatchCreated, InsideProcessBatch]); - let _ = index_scheduler.register(dump_cancellation, None).unwrap(); + let _ = index_scheduler.register(dump_cancellation, None, false).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_registered"); snapshot!(format!("{:?}", handle.advance()), @"AbortedIndexation"); @@ -4727,15 +4805,86 @@ mod tests { let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]); let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; - let task = index_scheduler.register(kind, None).unwrap(); + let task = index_scheduler.register(kind, None, false).unwrap(); snapshot!(task.uid, @"0"); let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; - let task = index_scheduler.register(kind, Some(12)).unwrap(); + let task = index_scheduler.register(kind, Some(12), false).unwrap(); snapshot!(task.uid, @"12"); let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; - let error = index_scheduler.register(kind, Some(5)).unwrap_err(); + let error = index_scheduler.register(kind, Some(5), false).unwrap_err(); snapshot!(error, @"Received bad task id: 5 should be >= to 13."); } + + #[test] + fn dry_run() { + let (index_scheduler, _handle) = IndexScheduler::test(true, vec![]); + + let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; + let task = index_scheduler.register(kind, None, true).unwrap(); + snapshot!(task.uid, @"0"); + snapshot!(snapshot_index_scheduler(&index_scheduler), @r###" + ### Autobatching Enabled = true + ### Processing Tasks: + [] + ---------------------------------------------------------------------- + ### All Tasks: + ---------------------------------------------------------------------- + ### Status: + ---------------------------------------------------------------------- + ### Kind: + ---------------------------------------------------------------------- + ### Index Tasks: + ---------------------------------------------------------------------- + ### Index Mapper: + + ---------------------------------------------------------------------- + ### Canceled By: + + ---------------------------------------------------------------------- + ### Enqueued At: + ---------------------------------------------------------------------- + ### Started At: + ---------------------------------------------------------------------- + ### Finished At: + ---------------------------------------------------------------------- + ### File Store: + + ---------------------------------------------------------------------- + "###); + + let kind = KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }; + let task = index_scheduler.register(kind, Some(12), true).unwrap(); + snapshot!(task.uid, @"12"); + snapshot!(snapshot_index_scheduler(&index_scheduler), @r###" + ### Autobatching Enabled = true + ### Processing Tasks: + [] + ---------------------------------------------------------------------- + ### All Tasks: + ---------------------------------------------------------------------- + ### Status: + ---------------------------------------------------------------------- + ### Kind: + ---------------------------------------------------------------------- + ### Index Tasks: + ---------------------------------------------------------------------- + ### Index Mapper: + + ---------------------------------------------------------------------- + ### Canceled By: + + ---------------------------------------------------------------------- + ### Enqueued At: + ---------------------------------------------------------------------- + ### Started At: + ---------------------------------------------------------------------- + ### Finished At: + ---------------------------------------------------------------------- + ### File Store: + + ---------------------------------------------------------------------- + "###); + } } diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 292a87259..7c40059d7 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -265,7 +265,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< .name(String::from("register-snapshot-tasks")) .spawn(move || loop { thread::sleep(snapshot_delay); - if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation, None) { + if let Err(e) = + index_scheduler.register(KindWithContent::SnapshotCreation, None, false) + { error!("Error while registering snapshot: {}", e); } }) diff --git a/meilisearch/src/routes/dump.rs b/meilisearch/src/routes/dump.rs index 56231a759..7f3cd06a5 100644 --- a/meilisearch/src/routes/dump.rs +++ b/meilisearch/src/routes/dump.rs @@ -11,7 +11,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{get_task_id, SummarizedTaskView}; +use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; use crate::Opt; pub fn configure(cfg: &mut web::ServiceConfig) { @@ -32,8 +32,11 @@ pub async fn create_dump( instance_uid: analytics.instance_uid().cloned(), }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Create dump"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 5bf7eaa8d..a74bbff49 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -36,7 +36,9 @@ use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; +use crate::routes::{ + get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT, +}; use crate::search::parse_filter; use crate::Opt; @@ -133,8 +135,11 @@ pub async fn delete_document( documents_ids: vec![document_id], }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) } @@ -282,6 +287,7 @@ pub async fn replace_documents( let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task = document_addition( extract_mime_type(&req)?, index_scheduler, @@ -291,6 +297,7 @@ pub async fn replace_documents( body, IndexDocumentsMethod::ReplaceDocuments, uid, + dry_run, allow_index_creation, ) .await?; @@ -317,6 +324,7 @@ pub async fn update_documents( let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task = document_addition( extract_mime_type(&req)?, index_scheduler, @@ -326,6 +334,7 @@ pub async fn update_documents( body, IndexDocumentsMethod::UpdateDocuments, uid, + dry_run, allow_index_creation, ) .await?; @@ -344,6 +353,7 @@ async fn document_addition( mut body: Payload, method: IndexDocumentsMethod, task_id: Option, + dry_run: bool, allow_index_creation: bool, ) -> Result { let format = match ( @@ -376,7 +386,7 @@ async fn document_addition( } }; - let (uuid, mut update_file) = index_scheduler.create_update_file()?; + let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?; let temp_file = match tempfile() { Ok(file) => file, @@ -460,7 +470,9 @@ async fn document_addition( }; let scheduler = index_scheduler.clone(); - let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? { + let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run)) + .await? + { Ok(task) => task, Err(e) => { index_scheduler.delete_update_file(uuid)?; @@ -492,8 +504,11 @@ pub async fn delete_documents_batch( let task = KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Delete documents by batch"); Ok(HttpResponse::Accepted().json(task)) @@ -530,8 +545,11 @@ pub async fn delete_documents_by_filter( let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Delete documents by filter"); Ok(HttpResponse::Accepted().json(task)) @@ -549,8 +567,11 @@ pub async fn clear_all_documents( let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Delete all documents"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index 59a1f0e64..59fa02dff 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -22,6 +22,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::sequential_extractor::SeqHandler; +use crate::routes::is_dry_run; use crate::Opt; pub mod documents; @@ -140,8 +141,11 @@ pub async fn create_index( let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Create index"); Ok(HttpResponse::Accepted().json(task)) @@ -211,8 +215,11 @@ pub async fn update_index( }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Update index"); Ok(HttpResponse::Accepted().json(task)) @@ -227,8 +234,11 @@ pub async fn delete_index( let index_uid = IndexUid::try_from(index_uid.into_inner())?; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Delete index"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/settings.rs b/meilisearch/src/routes/indexes/settings.rs index 6e43bce41..c71d83279 100644 --- a/meilisearch/src/routes/indexes/settings.rs +++ b/meilisearch/src/routes/indexes/settings.rs @@ -15,7 +15,7 @@ use tracing::debug; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; -use crate::routes::{get_task_id, SummarizedTaskView}; +use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; use crate::Opt; #[macro_export] @@ -36,7 +36,7 @@ macro_rules! make_setting_route { use $crate::extractors::authentication::GuardedData; use $crate::extractors::sequential_extractor::SeqHandler; use $crate::Opt; - use $crate::routes::{get_task_id, SummarizedTaskView}; + use $crate::routes::{is_dry_run, get_task_id, SummarizedTaskView}; pub async fn delete( index_scheduler: GuardedData< @@ -61,8 +61,9 @@ macro_rules! make_setting_route { allow_index_creation, }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) .await?? .into(); @@ -112,8 +113,9 @@ macro_rules! make_setting_route { allow_index_creation, }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) .await?? .into(); @@ -776,8 +778,11 @@ pub async fn update_all( allow_index_creation, }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Update all settings"); Ok(HttpResponse::Accepted().json(task)) @@ -815,8 +820,11 @@ pub async fn delete_all( allow_index_creation, }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Delete all settings"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index 2dc89b150..f98d4b4de 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -77,6 +77,25 @@ pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result, Respo Ok(task_id) } +pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result { + if !opt.experimental_ha_parameters { + return Ok(false); + } + Ok(req + .headers() + .get("DryRun") + .map(|header| { + header.to_str().map_err(|e| { + ResponseError::from_msg( + format!("DryRun is not a valid utf-8 string: {e}"), + Code::BadRequest, + ) + }) + }) + .transpose()? + .map_or(false, |s| s.to_lowercase() == "true")) +} + #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct SummarizedTaskView { diff --git a/meilisearch/src/routes/snapshot.rs b/meilisearch/src/routes/snapshot.rs index 6b3178126..84673729f 100644 --- a/meilisearch/src/routes/snapshot.rs +++ b/meilisearch/src/routes/snapshot.rs @@ -10,7 +10,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{get_task_id, SummarizedTaskView}; +use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; use crate::Opt; pub fn configure(cfg: &mut web::ServiceConfig) { @@ -27,8 +27,11 @@ pub async fn create_snapshot( let task = KindWithContent::SnapshotCreation; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); debug!(returns = ?task, "Create snapshot"); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/swap_indexes.rs b/meilisearch/src/routes/swap_indexes.rs index f8adeeb18..51a7b0707 100644 --- a/meilisearch/src/routes/swap_indexes.rs +++ b/meilisearch/src/routes/swap_indexes.rs @@ -10,7 +10,7 @@ use meilisearch_types::index_uid::IndexUid; use meilisearch_types::tasks::{IndexSwap, KindWithContent}; use serde_json::json; -use super::{get_task_id, SummarizedTaskView}; +use super::{get_task_id, is_dry_run, SummarizedTaskView}; use crate::analytics::Analytics; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::*; @@ -63,7 +63,10 @@ pub async fn swap_indexes( let task = KindWithContent::IndexSwap { swaps }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); Ok(HttpResponse::Accepted().json(task)) } diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index 279b57e3d..f35d97fe6 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -18,7 +18,7 @@ use time::macros::format_description; use time::{Date, Duration, OffsetDateTime, Time}; use tokio::task; -use super::{get_task_id, SummarizedTaskView}; +use super::{get_task_id, is_dry_run, SummarizedTaskView}; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; @@ -200,8 +200,10 @@ async fn cancel_tasks( KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; let task = - task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??; + task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid, dry_run)) + .await??; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task)) @@ -248,7 +250,9 @@ async fn delete_tasks( KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; let uid = get_task_id(&req, &opt)?; - let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??; + let dry_run = is_dry_run(&req, &opt)?; + let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid, dry_run)) + .await??; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task))