From 2eb1801e857c8e46ac59fec735f94278ae53765d Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 7 Nov 2024 19:17:15 +0100 Subject: [PATCH] reverse the order of the task queue --- crates/index-scheduler/src/lib.rs | 27 +++- crates/meilisearch-types/src/error.rs | 1 + crates/meilisearch/src/routes/tasks.rs | 8 +- crates/meilisearch/tests/tasks/errors.rs | 49 +++++++ crates/meilisearch/tests/tasks/mod.rs | 163 ++++++----------------- 5 files changed, 115 insertions(+), 133 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index e0e2bfb75..336a43b1b 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -84,6 +84,8 @@ pub struct Query { pub limit: Option, /// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched pub from: Option, + /// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`. + pub reverse: Option, /// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls pub statuses: Option>, /// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks. @@ -126,6 +128,7 @@ impl Query { Query { limit: None, from: None, + reverse: None, statuses: None, types: None, index_uids: None, @@ -706,7 +709,12 @@ impl IndexScheduler { let mut tasks = self.all_task_ids(rtxn)?; if let Some(from) = &query.from { - tasks.remove_range(from.saturating_add(1)..); + let range = if query.reverse.unwrap_or_default() { + u32::MIN..*from + } else { + from.saturating_add(1)..u32::MAX + }; + tasks.remove_range(range); } if let Some(status) = &query.statuses { @@ -826,7 +834,11 @@ impl IndexScheduler { )?; if let Some(limit) = query.limit { - tasks = tasks.into_iter().rev().take(limit as usize).collect(); + tasks = if query.reverse.unwrap_or_default() { + tasks.into_iter().take(limit as usize).collect() + } else { + tasks.into_iter().rev().take(limit as usize).collect() + }; } Ok(tasks) @@ -951,10 +963,13 @@ impl IndexScheduler { let rtxn = self.env.read_txn()?; let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?; - let tasks = self.get_existing_tasks( - &rtxn, - tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), - )?; + let tasks = if query.reverse.unwrap_or_default() { + Box::new(tasks.into_iter()) as Box> + } else { + Box::new(tasks.into_iter().rev()) as Box> + }; + let tasks = + self.get_existing_tasks(&rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?; let ProcessingTasks { started_at, processing, .. } = self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 514ed18c3..ef530fc6b 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -318,6 +318,7 @@ InvalidTaskBeforeStartedAt , InvalidRequest , BAD_REQUEST ; InvalidTaskCanceledBy , InvalidRequest , BAD_REQUEST ; InvalidTaskFrom , InvalidRequest , BAD_REQUEST ; InvalidTaskLimit , InvalidRequest , BAD_REQUEST ; +InvalidTaskReverse , InvalidRequest , BAD_REQUEST ; InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ; InvalidTaskTypes , InvalidRequest , BAD_REQUEST ; InvalidTaskUids , InvalidRequest , BAD_REQUEST ; diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index 95959d6d5..e2a787d12 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -42,6 +42,8 @@ pub struct TasksFilterQuery { pub limit: Param, #[deserr(default, error = DeserrQueryParamError)] pub from: Option>, + #[deserr(default, error = DeserrQueryParamError)] + pub reverse: Option>, #[deserr(default, error = DeserrQueryParamError)] pub uids: OptionStarOrList, @@ -73,6 +75,7 @@ impl TasksFilterQuery { Query { limit: Some(self.limit.0), from: self.from.as_deref().copied(), + reverse: self.reverse.as_deref().copied(), statuses: self.statuses.merge_star_and_none(), types: self.types.merge_star_and_none(), index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(), @@ -142,6 +145,7 @@ impl TaskDeletionOrCancelationQuery { Query { limit: None, from: None, + reverse: None, statuses: self.statuses.merge_star_and_none(), types: self.types.merge_star_and_none(), index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(), @@ -701,14 +705,14 @@ mod tests { { let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3"; let query = deserr_query_params::(params).unwrap(); - snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###); + snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), reverse: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###); } { // Stars should translate to `None` in the query // Verify value of the default limit let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3"; let query = deserr_query_params::(params).unwrap(); - snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"); + snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, reverse: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"); } { // Stars should also translate to `None` in task deletion/cancelation queries diff --git a/crates/meilisearch/tests/tasks/errors.rs b/crates/meilisearch/tests/tasks/errors.rs index 42ec42997..0e2966c8f 100644 --- a/crates/meilisearch/tests/tasks/errors.rs +++ b/crates/meilisearch/tests/tasks/errors.rs @@ -279,6 +279,55 @@ async fn task_bad_from() { "###); } +#[actix_rt::test] +async fn task_bad_reverse() { + let server = Server::new_shared(); + + let (response, code) = server.tasks_filter("reverse=doggo").await; + snapshot!(code, @"400 Bad Request"); + snapshot!(response, @r###" + { + "message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`", + "code": "invalid_task_reverse", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#invalid_task_reverse" + } + "###); + + let (response, code) = server.tasks_filter("reverse=*").await; + snapshot!(code, @"400 Bad Request"); + snapshot!(response, @r###" + { + "message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`", + "code": "invalid_task_reverse", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#invalid_task_reverse" + } + "###); + + let (response, code) = server.cancel_tasks("reverse=doggo").await; + snapshot!(code, @"400 Bad Request"); + snapshot!(response, @r###" + { + "message": "Unknown parameter `reverse`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`", + "code": "bad_request", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#bad_request" + } + "###); + + let (response, code) = server.delete_tasks("reverse=doggo").await; + snapshot!(code, @"400 Bad Request"); + snapshot!(response, @r###" + { + "message": "Unknown parameter `reverse`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`", + "code": "bad_request", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#bad_request" + } + "###); +} + #[actix_rt::test] async fn task_bad_after_enqueued_at() { let server = Server::new_shared(); diff --git a/crates/meilisearch/tests/tasks/mod.rs b/crates/meilisearch/tests/tasks/mod.rs index c59313885..8cff3c447 100644 --- a/crates/meilisearch/tests/tasks/mod.rs +++ b/crates/meilisearch/tests/tasks/mod.rs @@ -62,6 +62,44 @@ async fn list_tasks() { assert_eq!(response["results"].as_array().unwrap().len(), 2); } +#[actix_rt::test] +async fn list_tasks_pagination_and_reverse() { + let server = Server::new().await; + // First of all we want to create a lot of tasks very quickly. The fastest way is to delete a lot of unexisting indexes + let mut last_task = None; + for i in 0..10 { + let index = server.index(format!("test-{i}")); + last_task = Some(index.create(None).await.0.uid()); + } + server.wait_task(last_task.unwrap()).await; + + let (response, code) = server.tasks_filter("limit=3").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{task_ids:?}"), @"[9, 8, 7]"); + + let (response, code) = server.tasks_filter("limit=3&from=1").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{task_ids:?}"), @"[1, 0]"); + + // In reversed order + + let (response, code) = server.tasks_filter("limit=3&reverse=true").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{task_ids:?}"), @"[0, 1, 2]"); + + let (response, code) = server.tasks_filter("limit=3&from=8&reverse=true").await; + assert_eq!(code, 200); + let results = response["results"].as_array().unwrap(); + let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect(); + snapshot!(format!("{task_ids:?}"), @"[8, 9]"); +} + #[actix_rt::test] async fn list_tasks_with_star_filters() { let server = Server::new().await; @@ -193,131 +231,6 @@ async fn list_tasks_status_and_type_filtered() { assert_eq!(response["results"].as_array().unwrap().len(), 2); } -#[actix_rt::test] -async fn get_task_filter_error() { - let server = Server::new().await; - - let (response, code) = server.tasks_filter("lol=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Unknown parameter `lol`: expected one of `limit`, `from`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`", - "code": "bad_request", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#bad_request" - } - "###); - - let (response, code) = server.tasks_filter("uids=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer", - "code": "invalid_task_uids", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#invalid_task_uids" - } - "###); - - let (response, code) = server.tasks_filter("from=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Invalid value in parameter `from`: could not parse `pied` as a positive integer", - "code": "invalid_task_from", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#invalid_task_from" - } - "###); - - let (response, code) = server.tasks_filter("beforeStartedAt=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Invalid value in parameter `beforeStartedAt`: `pied` is an invalid date-time. It should follow the YYYY-MM-DD or RFC 3339 date-time format.", - "code": "invalid_task_before_started_at", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#invalid_task_before_started_at" - } - "###); -} - -#[actix_rt::test] -async fn delete_task_filter_error() { - let server = Server::new().await; - - let (response, code) = server.delete_tasks("").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.", - "code": "missing_task_filters", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#missing_task_filters" - } - "###); - - let (response, code) = server.delete_tasks("lol=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Unknown parameter `lol`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`", - "code": "bad_request", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#bad_request" - } - "###); - - let (response, code) = server.delete_tasks("uids=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer", - "code": "invalid_task_uids", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#invalid_task_uids" - } - "###); -} - -#[actix_rt::test] -async fn cancel_task_filter_error() { - let server = Server::new().await; - - let (response, code) = server.cancel_tasks("").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.", - "code": "missing_task_filters", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#missing_task_filters" - } - "###); - - let (response, code) = server.cancel_tasks("lol=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Unknown parameter `lol`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`", - "code": "bad_request", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#bad_request" - } - "###); - - let (response, code) = server.cancel_tasks("uids=pied").await; - assert_eq!(code, 400, "{}", response); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" - { - "message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer", - "code": "invalid_task_uids", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#invalid_task_uids" - } - "###); -} - macro_rules! assert_valid_summarized_task { ($response:expr, $task_type:literal, $index:literal) => {{ assert_eq!($response.as_object().unwrap().len(), 5);