Compare commits

...

3 Commits

Author SHA1 Message Date
Yukun Wang
e78a76e15c
Merge fdcd483740 into c1d8ee2a8d 2024-11-19 03:17:14 +00:00
meili-bors[bot]
c1d8ee2a8d
Merge #5048
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 14s
Test suite / Run tests in debug (push) Failing after 24s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 57s
Test suite / Run Rustfmt (push) Successful in 1m36s
Test suite / Run Clippy (push) Successful in 6m8s
Indexing bench (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of indexing (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for geo (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for songs (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for Wikipedia articles (push) / Run and upload benchmarks (push) Waiting to run
Run the indexing fuzzer / Setup the action (push) Successful in 1h4m23s
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Has been cancelled
5048: Reverse the order of the task queue r=Kerollmops a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/5047

## What does this PR do?
- Provide a new parameter to reverse the order of the task queue
- Add tests
- Remove some unrelated tests that were duplicated in tests/tasks/mod.rs and tests/tasks/error.rs


Co-authored-by: Tamo <tamo@meilisearch.com>
2024-11-18 16:24:12 +00:00
Tamo
2eb1801e85 reverse the order of the task queue 2024-11-07 19:19:44 +01:00
5 changed files with 115 additions and 133 deletions

View File

@ -84,6 +84,8 @@ pub struct Query {
pub limit: Option<u32>, pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched /// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
pub from: Option<u32>, pub from: Option<u32>,
/// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`.
pub reverse: Option<bool>,
/// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls /// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls
pub statuses: Option<Vec<Status>>, pub statuses: Option<Vec<Status>>,
/// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks. /// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks.
@ -126,6 +128,7 @@ impl Query {
Query { Query {
limit: None, limit: None,
from: None, from: None,
reverse: None,
statuses: None, statuses: None,
types: None, types: None,
index_uids: None, index_uids: None,
@ -706,7 +709,12 @@ impl IndexScheduler {
let mut tasks = self.all_task_ids(rtxn)?; let mut tasks = self.all_task_ids(rtxn)?;
if let Some(from) = &query.from { if let Some(from) = &query.from {
tasks.remove_range(from.saturating_add(1)..); let range = if query.reverse.unwrap_or_default() {
u32::MIN..*from
} else {
from.saturating_add(1)..u32::MAX
};
tasks.remove_range(range);
} }
if let Some(status) = &query.statuses { if let Some(status) = &query.statuses {
@ -826,7 +834,11 @@ impl IndexScheduler {
)?; )?;
if let Some(limit) = query.limit { if let Some(limit) = query.limit {
tasks = tasks.into_iter().rev().take(limit as usize).collect(); tasks = if query.reverse.unwrap_or_default() {
tasks.into_iter().take(limit as usize).collect()
} else {
tasks.into_iter().rev().take(limit as usize).collect()
};
} }
Ok(tasks) Ok(tasks)
@ -951,10 +963,13 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?; let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?;
let tasks = self.get_existing_tasks( let tasks = if query.reverse.unwrap_or_default() {
&rtxn, Box::new(tasks.into_iter()) as Box<dyn Iterator<Item = u32>>
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), } else {
)?; Box::new(tasks.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let tasks =
self.get_existing_tasks(&rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
let ProcessingTasks { started_at, processing, .. } = let ProcessingTasks { started_at, processing, .. } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();

View File

@ -318,6 +318,7 @@ InvalidTaskBeforeStartedAt , InvalidRequest , BAD_REQUEST ;
InvalidTaskCanceledBy , InvalidRequest , BAD_REQUEST ; InvalidTaskCanceledBy , InvalidRequest , BAD_REQUEST ;
InvalidTaskFrom , InvalidRequest , BAD_REQUEST ; InvalidTaskFrom , InvalidRequest , BAD_REQUEST ;
InvalidTaskLimit , InvalidRequest , BAD_REQUEST ; InvalidTaskLimit , InvalidRequest , BAD_REQUEST ;
InvalidTaskReverse , InvalidRequest , BAD_REQUEST ;
InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ; InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ;
InvalidTaskTypes , InvalidRequest , BAD_REQUEST ; InvalidTaskTypes , InvalidRequest , BAD_REQUEST ;
InvalidTaskUids , InvalidRequest , BAD_REQUEST ; InvalidTaskUids , InvalidRequest , BAD_REQUEST ;

View File

@ -42,6 +42,8 @@ pub struct TasksFilterQuery {
pub limit: Param<u32>, pub limit: Param<u32>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)] #[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
pub from: Option<Param<TaskId>>, pub from: Option<Param<TaskId>>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskReverse>)]
pub reverse: Option<Param<bool>>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)] #[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)]
pub uids: OptionStarOrList<u32>, pub uids: OptionStarOrList<u32>,
@ -73,6 +75,7 @@ impl TasksFilterQuery {
Query { Query {
limit: Some(self.limit.0), limit: Some(self.limit.0),
from: self.from.as_deref().copied(), from: self.from.as_deref().copied(),
reverse: self.reverse.as_deref().copied(),
statuses: self.statuses.merge_star_and_none(), statuses: self.statuses.merge_star_and_none(),
types: self.types.merge_star_and_none(), types: self.types.merge_star_and_none(),
index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(), index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(),
@ -142,6 +145,7 @@ impl TaskDeletionOrCancelationQuery {
Query { Query {
limit: None, limit: None,
from: None, from: None,
reverse: None,
statuses: self.statuses.merge_star_and_none(), statuses: self.statuses.merge_star_and_none(),
types: self.types.merge_star_and_none(), types: self.types.merge_star_and_none(),
index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(), index_uids: self.index_uids.map(|x| x.to_string()).merge_star_and_none(),
@ -701,14 +705,14 @@ mod tests {
{ {
let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3"; let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3";
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap(); let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###); snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), reverse: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###);
} }
{ {
// Stars should translate to `None` in the query // Stars should translate to `None` in the query
// Verify value of the default limit // Verify value of the default limit
let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3"; let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3";
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap(); let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"); snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, reverse: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
} }
{ {
// Stars should also translate to `None` in task deletion/cancelation queries // Stars should also translate to `None` in task deletion/cancelation queries

View File

@ -279,6 +279,55 @@ async fn task_bad_from() {
"###); "###);
} }
#[actix_rt::test]
async fn task_bad_reverse() {
let server = Server::new_shared();
let (response, code) = server.tasks_filter("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
let (response, code) = server.tasks_filter("reverse=*").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
let (response, code) = server.cancel_tasks("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Unknown parameter `reverse`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
let (response, code) = server.delete_tasks("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Unknown parameter `reverse`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
}
#[actix_rt::test] #[actix_rt::test]
async fn task_bad_after_enqueued_at() { async fn task_bad_after_enqueued_at() {
let server = Server::new_shared(); let server = Server::new_shared();

View File

@ -62,6 +62,44 @@ async fn list_tasks() {
assert_eq!(response["results"].as_array().unwrap().len(), 2); assert_eq!(response["results"].as_array().unwrap().len(), 2);
} }
#[actix_rt::test]
async fn list_tasks_pagination_and_reverse() {
let server = Server::new().await;
// First of all we want to create a lot of tasks very quickly. The fastest way is to delete a lot of unexisting indexes
let mut last_task = None;
for i in 0..10 {
let index = server.index(format!("test-{i}"));
last_task = Some(index.create(None).await.0.uid());
}
server.wait_task(last_task.unwrap()).await;
let (response, code) = server.tasks_filter("limit=3").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[9, 8, 7]");
let (response, code) = server.tasks_filter("limit=3&from=1").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[1, 0]");
// In reversed order
let (response, code) = server.tasks_filter("limit=3&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[0, 1, 2]");
let (response, code) = server.tasks_filter("limit=3&from=8&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[8, 9]");
}
#[actix_rt::test] #[actix_rt::test]
async fn list_tasks_with_star_filters() { async fn list_tasks_with_star_filters() {
let server = Server::new().await; let server = Server::new().await;
@ -193,131 +231,6 @@ async fn list_tasks_status_and_type_filtered() {
assert_eq!(response["results"].as_array().unwrap().len(), 2); assert_eq!(response["results"].as_array().unwrap().len(), 2);
} }
#[actix_rt::test]
async fn get_task_filter_error() {
let server = Server::new().await;
let (response, code) = server.tasks_filter("lol=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown parameter `lol`: expected one of `limit`, `from`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
let (response, code) = server.tasks_filter("uids=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
"code": "invalid_task_uids",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
}
"###);
let (response, code) = server.tasks_filter("from=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `from`: could not parse `pied` as a positive integer",
"code": "invalid_task_from",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_from"
}
"###);
let (response, code) = server.tasks_filter("beforeStartedAt=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `beforeStartedAt`: `pied` is an invalid date-time. It should follow the YYYY-MM-DD or RFC 3339 date-time format.",
"code": "invalid_task_before_started_at",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_before_started_at"
}
"###);
}
#[actix_rt::test]
async fn delete_task_filter_error() {
let server = Server::new().await;
let (response, code) = server.delete_tasks("").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.",
"code": "missing_task_filters",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#missing_task_filters"
}
"###);
let (response, code) = server.delete_tasks("lol=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown parameter `lol`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
let (response, code) = server.delete_tasks("uids=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
"code": "invalid_task_uids",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
}
"###);
}
#[actix_rt::test]
async fn cancel_task_filter_error() {
let server = Server::new().await;
let (response, code) = server.cancel_tasks("").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.",
"code": "missing_task_filters",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#missing_task_filters"
}
"###);
let (response, code) = server.cancel_tasks("lol=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown parameter `lol`: expected one of `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
let (response, code) = server.cancel_tasks("uids=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
"code": "invalid_task_uids",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
}
"###);
}
macro_rules! assert_valid_summarized_task { macro_rules! assert_valid_summarized_task {
($response:expr, $task_type:literal, $index:literal) => {{ ($response:expr, $task_type:literal, $index:literal) => {{
assert_eq!($response.as_object().unwrap().len(), 5); assert_eq!($response.as_object().unwrap().len(), 5);