mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
Implement the status and type filtering on the tasks route
This commit is contained in:
parent
3684c822f1
commit
8509243e68
@ -1,6 +1,7 @@
|
||||
use actix_web::{web, HttpRequest, HttpResponse};
|
||||
use meilisearch_error::ResponseError;
|
||||
use meilisearch_lib::tasks::task::TaskId;
|
||||
use meilisearch_lib::milli::update::IndexDocumentsMethod;
|
||||
use meilisearch_lib::tasks::task::{DocumentDeletion, TaskContent, TaskEvent, TaskId};
|
||||
use meilisearch_lib::tasks::TaskFilter;
|
||||
use meilisearch_lib::{IndexUid, MeiliSearch};
|
||||
use serde::Deserialize;
|
||||
@ -19,16 +20,51 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||
pub struct TasksFilter {
|
||||
pub struct TaskFilterQuery {
|
||||
#[serde(rename = "type")]
|
||||
type_: Option<CS<TaskType>>,
|
||||
status: Option<CS<TaskStatus>>,
|
||||
index_uid: Option<CS<IndexUid>>,
|
||||
}
|
||||
|
||||
#[rustfmt::skip]
|
||||
fn task_type_matches_content(type_: &TaskType, content: &TaskContent) -> bool {
|
||||
matches!((type_, content),
|
||||
(TaskType::IndexCreation, TaskContent::IndexCreation { .. })
|
||||
| (TaskType::IndexUpdate, TaskContent::IndexUpdate { .. })
|
||||
| (TaskType::IndexDeletion, TaskContent::IndexDeletion)
|
||||
| (TaskType::DocumentAddition, TaskContent::DocumentAddition {
|
||||
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
|
||||
..
|
||||
})
|
||||
| (TaskType::DocumentPartial, TaskContent::DocumentAddition {
|
||||
merge_strategy: IndexDocumentsMethod::UpdateDocuments,
|
||||
..
|
||||
})
|
||||
| (TaskType::DocumentDeletion, TaskContent::DocumentDeletion(DocumentDeletion::Ids(_)))
|
||||
| (TaskType::SettingsUpdate, TaskContent::SettingsUpdate { .. })
|
||||
| (TaskType::ClearAll, TaskContent::DocumentDeletion(DocumentDeletion::Clear))
|
||||
)
|
||||
}
|
||||
|
||||
fn task_status_matches_events(status: &TaskStatus, events: &[TaskEvent]) -> bool {
|
||||
events.last().map_or(false, |event| {
|
||||
matches!(
|
||||
(status, event),
|
||||
(TaskStatus::Enqueued, TaskEvent::Created(_))
|
||||
| (
|
||||
TaskStatus::Processing,
|
||||
TaskEvent::Processing(_) | TaskEvent::Batched { .. }
|
||||
)
|
||||
| (TaskStatus::Succeeded, TaskEvent::Succeded { .. })
|
||||
| (TaskStatus::Failed, TaskEvent::Failed { .. }),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_tasks(
|
||||
meilisearch: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, MeiliSearch>,
|
||||
params: web::Query<TasksFilter>,
|
||||
params: web::Query<TaskFilterQuery>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
@ -38,14 +74,17 @@ async fn get_tasks(
|
||||
Some(&req),
|
||||
);
|
||||
|
||||
let TasksFilter {
|
||||
let TaskFilterQuery {
|
||||
type_,
|
||||
status,
|
||||
index_uid,
|
||||
} = params.into_inner();
|
||||
|
||||
let search_rules = &meilisearch.filters().search_rules;
|
||||
let filters = match index_uid {
|
||||
|
||||
// We first filter on potential indexes and make sure
|
||||
// that the search filter restrictions are also applied.
|
||||
let indexes_filters = match index_uid {
|
||||
Some(indexes) => {
|
||||
let mut filters = TaskFilter::default();
|
||||
for name in indexes.into_inner() {
|
||||
@ -68,6 +107,42 @@ async fn get_tasks(
|
||||
}
|
||||
};
|
||||
|
||||
// Then we complete the task filter with other potential status and types filters.
|
||||
let filters = match (type_, status) {
|
||||
(Some(CS(types)), Some(CS(statuses))) => {
|
||||
let mut filters = indexes_filters.unwrap_or_default();
|
||||
filters.filter_fn(move |task| {
|
||||
let matches_type = types
|
||||
.iter()
|
||||
.any(|t| task_type_matches_content(&t, &task.content));
|
||||
let matches_status = statuses
|
||||
.iter()
|
||||
.any(|s| task_status_matches_events(&s, &task.events));
|
||||
matches_type && matches_status
|
||||
});
|
||||
Some(filters)
|
||||
}
|
||||
(Some(CS(types)), None) => {
|
||||
let mut filters = indexes_filters.unwrap_or_default();
|
||||
filters.filter_fn(move |task| {
|
||||
types
|
||||
.iter()
|
||||
.any(|t| task_type_matches_content(&t, &task.content))
|
||||
});
|
||||
Some(filters)
|
||||
}
|
||||
(None, Some(CS(statuses))) => {
|
||||
let mut filters = indexes_filters.unwrap_or_default();
|
||||
filters.filter_fn(move |task| {
|
||||
statuses
|
||||
.iter()
|
||||
.any(|s| task_status_matches_events(&s, &task.events))
|
||||
});
|
||||
Some(filters)
|
||||
}
|
||||
(None, None) => indexes_filters,
|
||||
};
|
||||
|
||||
let tasks: TaskListView = meilisearch
|
||||
.list_tasks(filters, None, None)
|
||||
.await?
|
||||
|
@ -52,9 +52,9 @@ impl From<TaskContent> for TaskType {
|
||||
}
|
||||
|
||||
impl FromStr for TaskType {
|
||||
type Err = &'static str;
|
||||
type Err = String;
|
||||
|
||||
fn from_str(status: &str) -> Result<Self, &'static str> {
|
||||
fn from_str(status: &str) -> Result<Self, String> {
|
||||
match status {
|
||||
"indexCreation" => Ok(TaskType::IndexCreation),
|
||||
"indexUpdate" => Ok(TaskType::IndexUpdate),
|
||||
@ -64,7 +64,12 @@ impl FromStr for TaskType {
|
||||
"documentDeletion" => Ok(TaskType::DocumentDeletion),
|
||||
"settingsUpdate" => Ok(TaskType::SettingsUpdate),
|
||||
"clearAll" => Ok(TaskType::ClearAll),
|
||||
_ => Err("invalid task type value"),
|
||||
unknown => Err(format!(
|
||||
"invalid task type `{}` value, expecting one of: \
|
||||
indexCreation, indexUpdate, indexDeletion, documentAddition, \
|
||||
documentPartial, documentDeletion, settingsUpdate, or clearAll",
|
||||
unknown
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -79,15 +84,19 @@ pub enum TaskStatus {
|
||||
}
|
||||
|
||||
impl FromStr for TaskStatus {
|
||||
type Err = &'static str;
|
||||
type Err = String;
|
||||
|
||||
fn from_str(status: &str) -> Result<Self, &'static str> {
|
||||
fn from_str(status: &str) -> Result<Self, String> {
|
||||
match status {
|
||||
"enqueued" => Ok(TaskStatus::Enqueued),
|
||||
"processing" => Ok(TaskStatus::Processing),
|
||||
"succeeded" => Ok(TaskStatus::Succeeded),
|
||||
"failed" => Ok(TaskStatus::Failed),
|
||||
_ => Err("invalid task status value"),
|
||||
unknown => Err(format!(
|
||||
"invalid task status `{}` value, expecting one of: \
|
||||
enqueued, processing, succeeded, or failed",
|
||||
unknown
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -131,6 +131,17 @@ impl Index<'_> {
|
||||
self.service.get(url).await
|
||||
}
|
||||
|
||||
pub async fn filtered_tasks(&self, type_: &[&str], status: &[&str]) -> (Value, StatusCode) {
|
||||
let mut url = format!("/tasks?indexUid={}", self.uid);
|
||||
if !type_.is_empty() {
|
||||
url += &format!("&type={}", type_.join(","));
|
||||
}
|
||||
if !status.is_empty() {
|
||||
url += &format!("&status={}", status.join(","));
|
||||
}
|
||||
self.service.get(url).await
|
||||
}
|
||||
|
||||
pub async fn get_document(
|
||||
&self,
|
||||
id: u64,
|
||||
|
@ -59,6 +59,85 @@ async fn list_tasks() {
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn list_tasks_status_filtered() {
|
||||
let server = Server::new().await;
|
||||
let index = server.index("test");
|
||||
index.create(None).await;
|
||||
index.wait_task(0).await;
|
||||
index
|
||||
.add_documents(
|
||||
serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let (response, code) = index.filtered_tasks(&[], &["succeeded"]).await;
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 1);
|
||||
|
||||
let (response, code) = index.filtered_tasks(&[], &["processing"]).await;
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 1);
|
||||
|
||||
index.wait_task(1).await;
|
||||
|
||||
let (response, code) = index.filtered_tasks(&[], &["succeeded"]).await;
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn list_tasks_type_filtered() {
|
||||
let server = Server::new().await;
|
||||
let index = server.index("test");
|
||||
index.create(None).await;
|
||||
index.wait_task(0).await;
|
||||
index
|
||||
.add_documents(
|
||||
serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let (response, code) = index.filtered_tasks(&["indexCreation"], &[]).await;
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 1);
|
||||
|
||||
let (response, code) = index
|
||||
.filtered_tasks(&["indexCreation", "documentAddition"], &[])
|
||||
.await;
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn list_tasks_status_and_type_filtered() {
|
||||
let server = Server::new().await;
|
||||
let index = server.index("test");
|
||||
index.create(None).await;
|
||||
index.wait_task(0).await;
|
||||
index
|
||||
.add_documents(
|
||||
serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let (response, code) = index.filtered_tasks(&["indexCreation"], &["failed"]).await;
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 0);
|
||||
|
||||
let (response, code) = index
|
||||
.filtered_tasks(
|
||||
&["indexCreation", "documentAddition"],
|
||||
&["succeeded", "processing"],
|
||||
)
|
||||
.await;
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
||||
}
|
||||
|
||||
macro_rules! assert_valid_summarized_task {
|
||||
($response:expr, $task_type:literal, $index:literal) => {{
|
||||
assert_eq!($response.as_object().unwrap().len(), 5);
|
||||
|
Loading…
Reference in New Issue
Block a user