From f9ddd3254544fd33d73a21783f5b8652d4d3e6a9 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 24 Apr 2023 20:04:50 +0200 Subject: [PATCH 1/8] implement the auto-deletion of tasks --- index-scheduler/src/lib.rs | 235 ++++++++++++++++++++++++++++++++- meilisearch/tests/tasks/mod.rs | 119 +---------------- 2 files changed, 230 insertions(+), 124 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d713fca17..524c8f32b 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -940,14 +940,15 @@ impl IndexScheduler { /// Perform one iteration of the run loop. /// - /// 1. Find the next batch of tasks to be processed. - /// 2. Update the information of these tasks following the start of their processing. - /// 3. Update the in-memory list of processed tasks accordingly. - /// 4. Process the batch: + /// 1. See if we need to cleanup the task queue + /// 2. Find the next batch of tasks to be processed. + /// 3. Update the information of these tasks following the start of their processing. + /// 4. Update the in-memory list of processed tasks accordingly. + /// 5. Process the batch: /// - perform the actions of each batched task /// - update the information of each batched task following the end /// of their processing. - /// 5. Reset the in-memory list of processed tasks. + /// 6. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. fn tick(&self) -> Result { @@ -957,6 +958,8 @@ impl IndexScheduler { self.breakpoint(Breakpoint::Start); } + self.cleanup_task_queue()?; + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let batch = match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { @@ -1093,6 +1096,41 @@ impl IndexScheduler { Ok(TickOutcome::TickAgain(processed_tasks)) } + /// Register a task to cleanup the task queue if needed + fn cleanup_task_queue(&self) -> Result<()> { + // if less than 42% (~9GiB) of the task queue are being used we don't need to do anything + if ((self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64) < 42 { + return Ok(()); + } + + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + + let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default(); + drop(rtxn); + + let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(1_000_000)); + + // /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete + // the deletion tasks we enqueued ourselves. + if to_delete.len() < 2 { + // the only thing we can do is hope that the user tasks are going to finish + return Ok(()); + } + + self.register(KindWithContent::TaskDeletion { + query: format!( + "?from={},limit={},status=succeeded,failed,canceled", + to_delete.iter().last().unwrap_or(u32::MAX), + to_delete.len(), + ), + tasks: to_delete, + })?; + + Ok(()) + } + pub fn index_stats(&self, index_uid: &str) -> Result { let is_indexing = self.is_index_processing(index_uid)?; let rtxn = self.read_txn()?; @@ -1350,9 +1388,10 @@ mod tests { use big_s::S; use crossbeam::channel::RecvTimeoutError; use file_store::File; - use meili_snap::snapshot; + use meili_snap::{json_string, snapshot}; use meilisearch_auth::AuthFilter; use meilisearch_types::document_formats::DocumentFormatError; + use meilisearch_types::error::ErrorCode; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::update::IndexDocumentsMethod::{ @@ -3718,4 +3757,188 @@ mod tests { // No matter what happens in process_batch, the index_scheduler should be internally consistent snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed"); } + + #[test] + fn test_task_queue_is_full_and_auto_deletion_of_tasks() { + let (mut index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + + // on average this task takes ~500+ bytes, and since our task queue have 1MiB of + // storage we can enqueue ~2000 tasks before reaching the limit. + + let mut dump = index_scheduler.register_dumped_task().unwrap(); + let now = OffsetDateTime::now_utc(); + for i in 0..2000 { + dump.register_dumped_task( + TaskDump { + uid: i, + index_uid: Some(S("doggo")), + status: Status::Enqueued, + kind: KindDump::IndexCreation { primary_key: None }, + canceled_by: None, + details: None, + error: None, + enqueued_at: now, + started_at: None, + finished_at: None, + }, + None, + ) + .unwrap(); + } + dump.finish().unwrap(); + + index_scheduler.assert_internally_consistent(); + + // at this point the task queue should be full and any new task should be refused + + let result = index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap_err(); + + snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); + // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code + snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); + + // after advancing one batch, the engine should not being able to push a taskDeletion task because everything is finished + handle.advance_one_successful_batch(); + index_scheduler.assert_internally_consistent(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let ids = index_scheduler + .get_task_ids( + &rtxn, + &Query { + statuses: Some(vec![Status::Succeeded, Status::Failed]), + ..Query::default() + }, + ) + .unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###" + [ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } + ] + "###); + + // The next batch should try to process another task + handle.advance_one_failed_batch(); + index_scheduler.assert_internally_consistent(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let ids = index_scheduler + .get_task_ids( + &rtxn, + &Query { + statuses: Some(vec![Status::Succeeded, Status::Failed]), + ..Query::default() + }, + ) + .unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###" + [ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": null, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } + ] + "###); + + // The next batch should create a task deletion tasks that delete the succeeded and failed tasks + handle.advance_one_successful_batch(); + index_scheduler.assert_internally_consistent(); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let ids = index_scheduler + .get_task_ids( + &rtxn, + &Query { + statuses: Some(vec![Status::Succeeded, Status::Failed]), + ..Query::default() + }, + ) + .unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", "[].kind" => "[kind]" }), @r###" + [ + { + "uid": 2000, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "?from=1,limit=2,status=succeeded,failed,canceled" + } + }, + "status": "succeeded", + "kind": "[kind]" + } + ] + "###); + + let to_delete = match tasks[0].kind { + KindWithContent::TaskDeletion { ref tasks, .. } => tasks, + _ => unreachable!("the snapshot above should prevent us from running in this case"), + }; + + snapshot!(format!("{:?}", to_delete), @"RoaringBitmap<[0, 1]>"); + } } diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 40093dc41..e9b5a2325 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1,14 +1,11 @@ mod errors; -use byte_unit::{Byte, ByteUnit}; use meili_snap::insta::assert_json_snapshot; -use meili_snap::{json_string, snapshot}; use serde_json::json; -use tempfile::TempDir; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use crate::common::{default_settings, Server}; +use crate::common::Server; #[actix_rt::test] async fn error_get_unexisting_task_status() { @@ -1003,117 +1000,3 @@ async fn test_summarized_dump_creation() { } "###); } - -#[actix_web::test] -async fn test_task_queue_is_full() { - let dir = TempDir::new().unwrap(); - let mut options = default_settings(dir.path()); - options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap(); - - let server = Server::new_with_options(options).await.unwrap(); - - // the first task should be enqueued without issue - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"202 Accepted"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###" - { - "taskUid": 0, - "indexUid": "doggo", - "status": "enqueued", - "type": "indexCreation", - "enqueuedAt": "[date]" - } - "###); - - loop { - let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; - if code == 422 { - break; - } - if res["taskUid"] == json!(null) { - panic!( - "Encountered the strange case:\n{}", - serde_json::to_string_pretty(&res).unwrap() - ); - } - } - - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"422 Unprocessable Entity"); - snapshot!(json_string!(result), @r###" - { - "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", - "code": "no_space_left_on_device", - "type": "system", - "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" - } - "###); - - // But we should still be able to register tasks deletion IF they delete something - let (result, code) = server.delete_tasks("uids=*").await; - snapshot!(code, @"200 OK"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": null, - "status": "enqueued", - "type": "taskDeletion", - "enqueuedAt": "[date]" - } - "###); - - let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await; - snapshot!(json_string!(result["status"]), @r###""succeeded""###); - - // Now we should be able to register tasks again - let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; - snapshot!(code, @"202 Accepted"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": "doggo", - "status": "enqueued", - "type": "indexCreation", - "enqueuedAt": "[date]" - } - "###); - - // we're going to fill up the queue once again - loop { - let (res, code) = server.delete_tasks("uids=0").await; - if code == 422 { - break; - } - if res["taskUid"] == json!(null) { - panic!( - "Encountered the strange case:\n{}", - serde_json::to_string_pretty(&res).unwrap() - ); - } - } - - // But we should NOT be able to register this task because it doesn't match any tasks - let (result, code) = server.delete_tasks("uids=0").await; - snapshot!(code, @"422 Unprocessable Entity"); - snapshot!(json_string!(result), @r###" - { - "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", - "code": "no_space_left_on_device", - "type": "system", - "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" - } - "###); - - // The deletion still works - let (result, code) = server.delete_tasks("uids=*").await; - snapshot!(code, @"200 OK"); - snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" - { - "taskUid": "uid", - "indexUid": null, - "status": "enqueued", - "type": "taskDeletion", - "enqueuedAt": "[date]" - } - "###); -} From 972bb2831cfae6546d00638ea1cc65e88a873174 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 25 Apr 2023 13:11:58 +0200 Subject: [PATCH 2/8] log when meilisearch need to delete tasks --- index-scheduler/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 524c8f32b..05cfc05b5 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1115,10 +1115,16 @@ impl IndexScheduler { // /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete // the deletion tasks we enqueued ourselves. if to_delete.len() < 2 { + log::warn!("The task queue is almost full, but no task can be deleted yet."); // the only thing we can do is hope that the user tasks are going to finish return Ok(()); } + log::info!( + "The task queue is almost full. Thus, meilisearch will delete the last {} finished tasks.", + to_delete.len() + ); + self.register(KindWithContent::TaskDeletion { query: format!( "?from={},limit={},status=succeeded,failed,canceled", From aa7537a11ecb6938dc45382b79b1cfb0f3bea71d Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 25 Apr 2023 17:26:34 +0200 Subject: [PATCH 3/8] make the autodeletion work with a fixed number of tasks and update the tests --- index-scheduler/src/insta_snapshot.rs | 1 + index-scheduler/src/lib.rs | 281 +++++++----------- .../above_the_max_number_of_tasks.snap | 49 +++ .../after_the_second_task_deletion.snap | 44 +++ .../everything_has_been_processed.snap | 41 +++ .../max_number_of_tasks.snap | 45 +++ .../task_deletion_have_been_enqueued.snap | 52 ++++ .../task_deletion_have_been_processed.snap | 46 +++ meilisearch/src/lib.rs | 1 + 9 files changed, 390 insertions(+), 170 deletions(-) create mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 43509aa84..4adea97e3 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -28,6 +28,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { started_at, finished_at, index_mapper, + max_number_of_tasks: _, wake_up: _, dumps_path: _, snapshots_path: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 05cfc05b5..e0a37aba1 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -241,6 +241,9 @@ pub struct IndexSchedulerOptions { /// Set to `true` iff the index scheduler is allowed to automatically /// batch tasks together, to process multiple tasks at once. pub autobatching_enabled: bool, + /// The maximum number of tasks stored in the task queue before starting + /// to auto schedule task deletions. + pub max_number_of_tasks: usize, } /// Structure which holds meilisearch's indexes and schedules the tasks @@ -290,6 +293,10 @@ pub struct IndexScheduler { /// Whether auto-batching is enabled or not. pub(crate) autobatching_enabled: bool, + /// The max number of tasks allowed before the scheduler starts to delete + /// the finished tasks automatically. + pub(crate) max_number_of_tasks: usize, + /// The path used to create the dumps. pub(crate) dumps_path: PathBuf, @@ -339,6 +346,7 @@ impl IndexScheduler { index_mapper: self.index_mapper.clone(), wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, + max_number_of_tasks: self.max_number_of_tasks, snapshots_path: self.snapshots_path.clone(), dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), @@ -412,6 +420,7 @@ impl IndexScheduler { // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), autobatching_enabled: options.autobatching_enabled, + max_number_of_tasks: options.max_number_of_tasks, dumps_path: options.dumps_path, snapshots_path: options.snapshots_path, auth_path: options.auth_path, @@ -1098,19 +1107,20 @@ impl IndexScheduler { /// Register a task to cleanup the task queue if needed fn cleanup_task_queue(&self) -> Result<()> { - // if less than 42% (~9GiB) of the task queue are being used we don't need to do anything - if ((self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64) < 42 { + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + + let nb_tasks = self.all_task_ids(&rtxn)?.len(); + // if we have less than 1M tasks everything is fine + if nb_tasks < self.max_number_of_tasks as u64 { return Ok(()); } - let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; - let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default() | self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default() | self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default(); drop(rtxn); - let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(1_000_000)); + let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000)); // /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete // the deletion tasks we enqueued ourselves. @@ -1394,7 +1404,7 @@ mod tests { use big_s::S; use crossbeam::channel::RecvTimeoutError; use file_store::File; - use meili_snap::{json_string, snapshot}; + use meili_snap::snapshot; use meilisearch_auth::AuthFilter; use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::error::ErrorCode; @@ -1428,13 +1438,22 @@ mod tests { pub fn test( autobatching_enabled: bool, planned_failures: Vec<(usize, FailureLocation)>, + ) -> (Self, IndexSchedulerHandle) { + Self::test_with_custom_config(planned_failures, |config| { + config.autobatching_enabled = autobatching_enabled; + }) + } + + pub fn test_with_custom_config( + planned_failures: Vec<(usize, FailureLocation)>, + configuration: impl Fn(&mut IndexSchedulerOptions), ) -> (Self, IndexSchedulerHandle) { let tempdir = TempDir::new().unwrap(); let (sender, receiver) = crossbeam::channel::bounded(0); let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() }; - let options = IndexSchedulerOptions { + let mut options = IndexSchedulerOptions { version_file_path: tempdir.path().join(VERSION_FILE_NAME), auth_path: tempdir.path().join("auth"), tasks_path: tempdir.path().join("db_path"), @@ -1447,8 +1466,10 @@ mod tests { index_growth_amount: 1000 * 1000, // 1 MB index_count: 5, indexer_config, - autobatching_enabled, + autobatching_enabled: true, + max_number_of_tasks: 1_000_000, }; + configuration(&mut options); let index_scheduler = Self::new(options, sender, planned_failures).unwrap(); @@ -3765,186 +3786,106 @@ mod tests { } #[test] - fn test_task_queue_is_full_and_auto_deletion_of_tasks() { - let (mut index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + fn test_task_queue_is_full() { + let (index_scheduler, mut handle) = + IndexScheduler::test_with_custom_config(vec![], |config| { + // that's the minimum map size possible + config.task_db_size = 1048576; + }); - // on average this task takes ~500+ bytes, and since our task queue have 1MiB of - // storage we can enqueue ~2000 tasks before reaching the limit. - - let mut dump = index_scheduler.register_dumped_task().unwrap(); - let now = OffsetDateTime::now_utc(); - for i in 0..2000 { - dump.register_dumped_task( - TaskDump { - uid: i, - index_uid: Some(S("doggo")), - status: Status::Enqueued, - kind: KindDump::IndexCreation { primary_key: None }, - canceled_by: None, - details: None, - error: None, - enqueued_at: now, - started_at: None, - finished_at: None, - }, - None, - ) + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) .unwrap(); + handle.advance_one_successful_batch(); + // on average this task takes ~600 bytes + loop { + let result = index_scheduler.register(KindWithContent::IndexCreation { + index_uid: S("doggo"), + primary_key: None, + }); + if result.is_err() { + break; + } + handle.advance_one_failed_batch(); } - dump.finish().unwrap(); - index_scheduler.assert_internally_consistent(); - // at this point the task queue should be full and any new task should be refused - + // at this point the task DB shoud have reached its limit and we should not be able to register new tasks let result = index_scheduler .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) .unwrap_err(); - snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); - // after advancing one batch, the engine should not being able to push a taskDeletion task because everything is finished - handle.advance_one_successful_batch(); - index_scheduler.assert_internally_consistent(); - let rtxn = index_scheduler.env.read_txn().unwrap(); - let ids = index_scheduler - .get_task_ids( - &rtxn, - &Query { - statuses: Some(vec![Status::Succeeded, Status::Failed]), - ..Query::default() - }, - ) - .unwrap(); - let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); - snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###" - [ - { - "uid": 0, - "enqueuedAt": "[date]", - "startedAt": "[date]", - "finishedAt": "[date]", - "error": null, - "canceledBy": null, - "details": { - "IndexInfo": { - "primary_key": null - } - }, - "status": "succeeded", - "kind": { - "indexCreation": { - "index_uid": "doggo", - "primary_key": null - } - } - } - ] - "###); + // Even the task deletion that doesn't delete anything shouldn't be accepted + let result = index_scheduler + .register(KindWithContent::TaskDeletion { + query: S("test"), + tasks: RoaringBitmap::new(), + }) + .unwrap_err(); + snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); + // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code + snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); - // The next batch should try to process another task + // But a task deletion that delete something should works + index_scheduler + .register(KindWithContent::TaskDeletion { query: S("test"), tasks: (0..50).collect() }) + .unwrap(); + handle.advance_one_successful_batch(); + + // Now we should be able to enqueue a few tasks again + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); handle.advance_one_failed_batch(); - index_scheduler.assert_internally_consistent(); - let rtxn = index_scheduler.env.read_txn().unwrap(); - let ids = index_scheduler - .get_task_ids( - &rtxn, - &Query { - statuses: Some(vec![Status::Succeeded, Status::Failed]), - ..Query::default() - }, - ) - .unwrap(); - let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); - snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), @r###" - [ - { - "uid": 0, - "enqueuedAt": "[date]", - "startedAt": "[date]", - "finishedAt": "[date]", - "error": null, - "canceledBy": null, - "details": { - "IndexInfo": { - "primary_key": null - } - }, - "status": "succeeded", - "kind": { - "indexCreation": { - "index_uid": "doggo", - "primary_key": null - } - } - }, - { - "uid": 1, - "enqueuedAt": "[date]", - "startedAt": "[date]", - "finishedAt": "[date]", - "error": { - "message": "Index `doggo` already exists.", - "code": "index_already_exists", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#index_already_exists" - }, - "canceledBy": null, - "details": null, - "status": "failed", - "kind": { - "indexCreation": { - "index_uid": "doggo", - "primary_key": null - } - } - } - ] - "###); + } - // The next batch should create a task deletion tasks that delete the succeeded and failed tasks + #[test] + fn test_auto_deletion_of_tasks() { + let (index_scheduler, mut handle) = + IndexScheduler::test_with_custom_config(vec![], |config| { + config.max_number_of_tasks = 2; + }); + + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); handle.advance_one_successful_batch(); - index_scheduler.assert_internally_consistent(); - let rtxn = index_scheduler.env.read_txn().unwrap(); - let ids = index_scheduler - .get_task_ids( - &rtxn, - &Query { - statuses: Some(vec![Status::Succeeded, Status::Failed]), - ..Query::default() - }, - ) + + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) .unwrap(); - let tasks = index_scheduler.get_existing_tasks(&rtxn, ids).unwrap(); - snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", "[].kind" => "[kind]" }), @r###" - [ - { - "uid": 2000, - "enqueuedAt": "[date]", - "startedAt": "[date]", - "finishedAt": "[date]", - "error": null, - "canceledBy": null, - "details": { - "TaskDeletion": { - "matched_tasks": 2, - "deleted_tasks": 2, - "original_filter": "?from=1,limit=2,status=succeeded,failed,canceled" - } - }, - "status": "succeeded", - "kind": "[kind]" - } - ] - "###); + handle.advance_one_failed_batch(); - let to_delete = match tasks[0].kind { - KindWithContent::TaskDeletion { ref tasks, .. } => tasks, - _ => unreachable!("the snapshot above should prevent us from running in this case"), - }; + // at this point the max number of tasks is reached + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "max_number_of_tasks"); - snapshot!(format!("{:?}", to_delete), @"RoaringBitmap<[0, 1]>"); + // we can still enqueue multiple tasks + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) + .unwrap(); + + // at this point the max number of tasks is reached + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "above_the_max_number_of_tasks"); + + // and if we try to advance in the tick function a new task deletion should be enqueued + handle.advance_till([Start, BatchCreated]); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_enqueued"); + + handle.advance_till([InsideProcessBatch, ProcessBatchSucceeded, AfterProcessing]); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_processed"); + + handle.advance_one_failed_batch(); + // a new task deletion has been enqueued + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_the_second_task_deletion"); + handle.advance_one_failed_batch(); + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "everything_has_been_processed"); } } diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap new file mode 100644 index 000000000..c5d345b97 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap @@ -0,0 +1,49 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +---------------------------------------------------------------------- +### Status: +enqueued [2,3,] +succeeded [0,] +failed [1,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,3,] +---------------------------------------------------------------------- +### Index Tasks: +doggo [0,1,2,3,] +---------------------------------------------------------------------- +### Index Mapper: +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap new file mode 100644 index 000000000..52bedb7db --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap @@ -0,0 +1,44 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +5 {uid: 5, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=4,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=4,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[2, 4]> }} +---------------------------------------------------------------------- +### Status: +enqueued [3,] +succeeded [5,] +failed [] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [3,] +"taskDeletion" [5,] +---------------------------------------------------------------------- +### Index Tasks: +doggo [3,] +---------------------------------------------------------------------- +### Index Mapper: +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [3,] +[timestamp] [5,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [5,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [5,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap new file mode 100644 index 000000000..641f57d9d --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap @@ -0,0 +1,41 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +6 {uid: 6, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=5,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=5,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[3, 5]> }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [6,] +failed [] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [] +"taskDeletion" [6,] +---------------------------------------------------------------------- +### Index Tasks: +---------------------------------------------------------------------- +### Index Mapper: +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [6,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [6,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [6,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap new file mode 100644 index 000000000..b7d367399 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap @@ -0,0 +1,45 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,] +failed [1,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,] +---------------------------------------------------------------------- +### Index Tasks: +doggo [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap new file mode 100644 index 000000000..81b773012 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap @@ -0,0 +1,52 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[4,] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +4 {uid: 4, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "?from=1,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=1,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[0, 1]> }} +---------------------------------------------------------------------- +### Status: +enqueued [2,3,4,] +succeeded [0,] +failed [1,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,3,] +"taskDeletion" [4,] +---------------------------------------------------------------------- +### Index Tasks: +doggo [0,1,2,3,] +---------------------------------------------------------------------- +### Index Mapper: +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap new file mode 100644 index 000000000..a1c2eb421 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap @@ -0,0 +1,46 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} +4 {uid: 4, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=1,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=1,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[0, 1]> }} +---------------------------------------------------------------------- +### Status: +enqueued [2,3,] +succeeded [4,] +failed [] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [2,3,] +"taskDeletion" [4,] +---------------------------------------------------------------------- +### Index Tasks: +doggo [2,3,] +---------------------------------------------------------------------- +### Index Mapper: +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [4,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [4,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 9f85a4c5c..67d8bbd5c 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -234,6 +234,7 @@ fn open_or_create_database_unchecked( index_base_map_size: opt.max_index_size.get_bytes() as usize, indexer_config: (&opt.indexer_options).try_into()?, autobatching_enabled: true, + max_number_of_tasks: 1_000_000, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_count: DEFAULT_INDEX_COUNT, })?) From 9ca6f59546091954b3ded4aba2ff0f9273bd932e Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 26 Apr 2023 12:02:06 +0200 Subject: [PATCH 4/8] Update index-scheduler/src/lib.rs Co-authored-by: Louis Dureuil --- index-scheduler/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e0a37aba1..48acab342 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1131,7 +1131,7 @@ impl IndexScheduler { } log::info!( - "The task queue is almost full. Thus, meilisearch will delete the last {} finished tasks.", + "The task queue is almost full. Deleting the oldest {} finished tasks.", to_delete.len() ); From dcbfecf42cdef024620720a6e3e397104e07b6a6 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 26 Apr 2023 13:55:02 +0200 Subject: [PATCH 5/8] make the generated filter valid --- index-scheduler/src/lib.rs | 49 +++-- .../above_the_max_number_of_tasks.snap | 49 ----- .../after_the_second_task_deletion.snap | 106 +++++++---- .../everything_has_been_processed.snap | 83 ++++---- .../max_number_of_tasks.snap | 45 ----- .../task_deletion_have_been_enqueued.snap | 179 +++++++++++++----- .../task_deletion_have_been_processed.snap | 128 ++++++++----- .../task_queue_is_full.snap | 90 +++++++++ 8 files changed, 451 insertions(+), 278 deletions(-) delete mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap delete mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_queue_is_full.snap diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 48acab342..8d81c6a4f 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -51,6 +51,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; +use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use uuid::Uuid; @@ -1118,7 +1119,6 @@ impl IndexScheduler { let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default() | self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default() | self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default(); - drop(rtxn); let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000)); @@ -1135,11 +1135,15 @@ impl IndexScheduler { to_delete.len() ); + // it's safe to unwrap here because we checked the len above + let newest_task_id = to_delete.iter().last().unwrap(); + let task = self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?; + drop(rtxn); + self.register(KindWithContent::TaskDeletion { query: format!( - "?from={},limit={},status=succeeded,failed,canceled", - to_delete.iter().last().unwrap_or(u32::MAX), - to_delete.len(), + "?beforeEnqueuedAt={},status=succeeded,failed,canceled", + task.enqueued_at.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, ), tasks: to_delete, })?; @@ -1404,7 +1408,7 @@ mod tests { use big_s::S; use crossbeam::channel::RecvTimeoutError; use file_store::File; - use meili_snap::snapshot; + use meili_snap::{json_string, snapshot}; use meilisearch_auth::AuthFilter; use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::error::ErrorCode; @@ -3860,8 +3864,6 @@ mod tests { handle.advance_one_failed_batch(); // at this point the max number of tasks is reached - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "max_number_of_tasks"); - // we can still enqueue multiple tasks index_scheduler .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) @@ -3870,22 +3872,43 @@ mod tests { .register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None }) .unwrap(); - // at this point the max number of tasks is reached - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "above_the_max_number_of_tasks"); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]" }), name: "task_queue_is_full"); + drop(rtxn); + // now we're above the max number of tasks // and if we try to advance in the tick function a new task deletion should be enqueued handle.advance_till([Start, BatchCreated]); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_enqueued"); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_enqueued"); + drop(rtxn); handle.advance_till([InsideProcessBatch, ProcessBatchSucceeded, AfterProcessing]); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_have_been_processed"); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "task_deletion_have_been_processed"); + drop(rtxn); handle.advance_one_failed_batch(); // a new task deletion has been enqueued handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_the_second_task_deletion"); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "after_the_second_task_deletion"); + drop(rtxn); + handle.advance_one_failed_batch(); handle.advance_one_successful_batch(); - snapshot!(snapshot_index_scheduler(&index_scheduler), name: "everything_has_been_processed"); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap(); + let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap(); + snapshot!(json_string!(tasks, { "[].enqueuedAt" => "[date]", "[].startedAt" => "[date]", "[].finishedAt" => "[date]", ".**.original_filter" => "[filter]", ".**.query" => "[query]" }), name: "everything_has_been_processed"); + drop(rtxn); } } diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap deleted file mode 100644 index c5d345b97..000000000 --- a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/above_the_max_number_of_tasks.snap +++ /dev/null @@ -1,49 +0,0 @@ ---- -source: index-scheduler/src/lib.rs ---- -### Autobatching Enabled = true -### Processing Tasks: -[] ----------------------------------------------------------------------- -### All Tasks: -0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} ----------------------------------------------------------------------- -### Status: -enqueued [2,3,] -succeeded [0,] -failed [1,] ----------------------------------------------------------------------- -### Kind: -"indexCreation" [0,1,2,3,] ----------------------------------------------------------------------- -### Index Tasks: -doggo [0,1,2,3,] ----------------------------------------------------------------------- -### Index Mapper: -doggo: { number_of_documents: 0, field_distribution: {} } - ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [0,] -[timestamp] [1,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [0,] -[timestamp] [1,] ----------------------------------------------------------------------- -### File Store: - ----------------------------------------------------------------------- - diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap index 52bedb7db..59948c58c 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/after_the_second_task_deletion.snap @@ -1,44 +1,68 @@ --- source: index-scheduler/src/lib.rs --- -### Autobatching Enabled = true -### Processing Tasks: -[] ----------------------------------------------------------------------- -### All Tasks: -3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -5 {uid: 5, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=4,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=4,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[2, 4]> }} ----------------------------------------------------------------------- -### Status: -enqueued [3,] -succeeded [5,] -failed [] ----------------------------------------------------------------------- -### Kind: -"indexCreation" [3,] -"taskDeletion" [5,] ----------------------------------------------------------------------- -### Index Tasks: -doggo [3,] ----------------------------------------------------------------------- -### Index Mapper: -doggo: { number_of_documents: 0, field_distribution: {} } - ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [3,] -[timestamp] [5,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [5,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [5,] ----------------------------------------------------------------------- -### File Store: - ----------------------------------------------------------------------- - +[ + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 5, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "[filter]" + } + }, + "status": "succeeded", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 2, + 0, + 4, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap index 641f57d9d..0f2f366e9 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/everything_has_been_processed.snap @@ -1,41 +1,48 @@ --- source: index-scheduler/src/lib.rs --- -### Autobatching Enabled = true -### Processing Tasks: -[] ----------------------------------------------------------------------- -### All Tasks: -6 {uid: 6, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=5,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=5,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[3, 5]> }} ----------------------------------------------------------------------- -### Status: -enqueued [] -succeeded [6,] -failed [] ----------------------------------------------------------------------- -### Kind: -"indexCreation" [] -"taskDeletion" [6,] ----------------------------------------------------------------------- -### Index Tasks: ----------------------------------------------------------------------- -### Index Mapper: -doggo: { number_of_documents: 0, field_distribution: {} } - ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [6,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [6,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [6,] ----------------------------------------------------------------------- -### File Store: - ----------------------------------------------------------------------- - +[ + { + "uid": 6, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "[filter]" + } + }, + "status": "succeeded", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 3, + 0, + 5, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap deleted file mode 100644 index b7d367399..000000000 --- a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/max_number_of_tasks.snap +++ /dev/null @@ -1,45 +0,0 @@ ---- -source: index-scheduler/src/lib.rs ---- -### Autobatching Enabled = true -### Processing Tasks: -[] ----------------------------------------------------------------------- -### All Tasks: -0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} ----------------------------------------------------------------------- -### Status: -enqueued [] -succeeded [0,] -failed [1,] ----------------------------------------------------------------------- -### Kind: -"indexCreation" [0,1,] ----------------------------------------------------------------------- -### Index Tasks: -doggo [0,1,] ----------------------------------------------------------------------- -### Index Mapper: -doggo: { number_of_documents: 0, field_distribution: {} } - ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [0,] -[timestamp] [1,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [0,] -[timestamp] [1,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [0,] -[timestamp] [1,] ----------------------------------------------------------------------- -### File Store: - ----------------------------------------------------------------------- - diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap index 81b773012..dc6b03517 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_enqueued.snap @@ -1,52 +1,133 @@ --- source: index-scheduler/src/lib.rs --- -### Autobatching Enabled = true -### Processing Tasks: -[4,] ----------------------------------------------------------------------- -### All Tasks: -0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -1 {uid: 1, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -4 {uid: 4, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "?from=1,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=1,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[0, 1]> }} ----------------------------------------------------------------------- -### Status: -enqueued [2,3,4,] -succeeded [0,] -failed [1,] ----------------------------------------------------------------------- -### Kind: -"indexCreation" [0,1,2,3,] -"taskDeletion" [4,] ----------------------------------------------------------------------- -### Index Tasks: -doggo [0,1,2,3,] ----------------------------------------------------------------------- -### Index Mapper: -doggo: { number_of_documents: 0, field_distribution: {} } - ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [0,] -[timestamp] [1,] -[timestamp] [2,] -[timestamp] [3,] -[timestamp] [4,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [0,] -[timestamp] [1,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [0,] -[timestamp] [1,] ----------------------------------------------------------------------- -### File Store: - ----------------------------------------------------------------------- - +[ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 4, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": null, + "original_filter": "[filter]" + } + }, + "status": "enqueued", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 0, + 0, + 1, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap index a1c2eb421..0200f7f4a 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_deletion_have_been_processed.snap @@ -1,46 +1,88 @@ --- source: index-scheduler/src/lib.rs --- -### Autobatching Enabled = true -### Processing Tasks: -[] ----------------------------------------------------------------------- -### All Tasks: -2 {uid: 2, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -3 {uid: 3, status: enqueued, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggo", primary_key: None }} -4 {uid: 4, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(2), original_filter: "?from=1,limit=2,status=succeeded,failed,canceled" }, kind: TaskDeletion { query: "?from=1,limit=2,status=succeeded,failed,canceled", tasks: RoaringBitmap<[0, 1]> }} ----------------------------------------------------------------------- -### Status: -enqueued [2,3,] -succeeded [4,] -failed [] ----------------------------------------------------------------------- -### Kind: -"indexCreation" [2,3,] -"taskDeletion" [4,] ----------------------------------------------------------------------- -### Index Tasks: -doggo [2,3,] ----------------------------------------------------------------------- -### Index Mapper: -doggo: { number_of_documents: 0, field_distribution: {} } - ----------------------------------------------------------------------- -### Canceled By: - ----------------------------------------------------------------------- -### Enqueued At: -[timestamp] [2,] -[timestamp] [3,] -[timestamp] [4,] ----------------------------------------------------------------------- -### Started At: -[timestamp] [4,] ----------------------------------------------------------------------- -### Finished At: -[timestamp] [4,] ----------------------------------------------------------------------- -### File Store: - ----------------------------------------------------------------------- - +[ + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 4, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "TaskDeletion": { + "matched_tasks": 2, + "deleted_tasks": 2, + "original_filter": "[filter]" + } + }, + "status": "succeeded", + "kind": { + "taskDeletion": { + "query": "[query]", + "tasks": [ + 58, + 48, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 16, + 0, + 0, + 0, + 0, + 0, + 1, + 0 + ] + } + } + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_queue_is_full.snap b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_queue_is_full.snap new file mode 100644 index 000000000..988df76ec --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/test_auto_deletion_of_tasks/task_queue_is_full.snap @@ -0,0 +1,90 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "uid": 0, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "succeeded", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 1, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": { + "message": "Index `doggo` already exists.", + "code": "index_already_exists", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#index_already_exists" + }, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "failed", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 2, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + }, + { + "uid": 3, + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]", + "error": null, + "canceledBy": null, + "details": { + "IndexInfo": { + "primary_key": null + } + }, + "status": "enqueued", + "kind": { + "indexCreation": { + "index_uid": "doggo", + "primary_key": null + } + } + } +] From 52ab114f6c742788bc7cfae0e9f4ed8f60c707b5 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 4 May 2023 00:05:39 +0200 Subject: [PATCH 6/8] Fix test on macOS: 50 tasks would result in the test consistently failing on a local macOS --- index-scheduler/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8d81c6a4f..02e45d0b9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -3835,7 +3835,7 @@ mod tests { // But a task deletion that delete something should works index_scheduler - .register(KindWithContent::TaskDeletion { query: S("test"), tasks: (0..50).collect() }) + .register(KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }) .unwrap(); handle.advance_one_successful_batch(); From b212aef5dbd2beadfa68ac1f6f08a3d519533747 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 4 May 2023 09:56:48 +0200 Subject: [PATCH 7/8] add one nanosecond to generated filter so as to generate a filter that would have matched the last task to delete --- index-scheduler/src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 02e45d0b9..abd161d10 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1137,13 +1137,17 @@ impl IndexScheduler { // it's safe to unwrap here because we checked the len above let newest_task_id = to_delete.iter().last().unwrap(); - let task = self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?; + let last_task_to_delete = + self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?; drop(rtxn); + // increase time by one nanosecond so that the enqueuedAt of the last task to delete is also lower than that date. + let delete_before = last_task_to_delete.enqueued_at + Duration::from_nanos(1); + self.register(KindWithContent::TaskDeletion { query: format!( "?beforeEnqueuedAt={},status=succeeded,failed,canceled", - task.enqueued_at.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, + delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, ), tasks: to_delete, })?; From d8381eb790ddabe6277de93fba05f383a27c048d Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 4 May 2023 10:07:59 +0200 Subject: [PATCH 8/8] Fix originalFilter --- index-scheduler/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index abd161d10..3fe0acf1a 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1146,7 +1146,7 @@ impl IndexScheduler { self.register(KindWithContent::TaskDeletion { query: format!( - "?beforeEnqueuedAt={},status=succeeded,failed,canceled", + "?beforeEnqueuedAt={}&statuses=succeeded,failed,canceled", delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, ), tasks: to_delete,