Add the tasks cancel route to cancel tasks

This commit is contained in:
Kerollmops 2022-10-18 14:48:40 +02:00 committed by Clément Renault
parent 01ed1fb128
commit 973e2f71eb
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 73 additions and 8 deletions

View File

@ -17,9 +17,10 @@ pub enum Error {
CorruptedDump,
#[error("Task `{0}` not found")]
TaskNotFound(TaskId),
// TODO: Lo: proper error message for this
#[error("Cannot delete all tasks")]
#[error("Query parameters to filter the tasks to delete are missing. Available query parameters are: `uid`, `indexUid`, `status`, `type`")]
TaskDeletionWithEmptyQuery,
#[error("Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uid`, `indexUid`, `status`, `type`")]
TaskCancelationWithEmptyQuery,
// maybe the two next errors are going to move to the frontend
#[error("`{0}` is not a status. Available status are")]
InvalidStatus(String),
@ -48,6 +49,7 @@ impl ErrorCode for Error {
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
Error::TaskNotFound(_) => Code::TaskNotFound,
Error::TaskDeletionWithEmptyQuery => Code::TaskDeletionWithEmptyQuery,
Error::TaskCancelationWithEmptyQuery => Code::TaskCancelationWithEmptyQuery,
Error::InvalidStatus(_) => Code::BadRequest,
Error::InvalidKind(_) => Code::BadRequest,

View File

@ -829,7 +829,7 @@ mod tests {
replace_document_import_task("catto", None, 0, 12),
KindWithContent::TaskCancelation {
query: format!("uid=0,1"),
tasks: vec![0, 1],
tasks: RoaringBitmap::from_iter([0, 1]),
},
replace_document_import_task("catto", None, 1, 50),
replace_document_import_task("doggo", Some("bone"), 2, 5000),

View File

@ -104,6 +104,7 @@ fn snapshot_task(task: &Task) -> String {
started_at: _,
finished_at: _,
error,
canceled_by: _,
details,
status,
kind,

View File

@ -25,7 +25,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.route(web::get().to(SeqHandler(get_tasks)))
.route(web::delete().to(SeqHandler(delete_tasks))),
)
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))))
.service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks))));
}
#[derive(Debug, Clone, PartialEq, Serialize)]
@ -200,6 +201,60 @@ pub struct TaskDeletionQuery {
index_uid: Option<CS<IndexUid>>,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct TaskCancelationQuery {
#[serde(rename = "type")]
type_: Option<CS<Kind>>,
uid: Option<CS<u32>>,
status: Option<CS<Status>>,
index_uid: Option<CS<IndexUid>>,
}
async fn cancel_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>,
params: web::Query<TaskCancelationQuery>,
) -> Result<HttpResponse, ResponseError> {
let TaskCancelationQuery {
type_,
uid,
status,
index_uid,
} = params.into_inner();
let kind: Option<Vec<_>> = type_.map(|x| x.into_iter().collect());
let uid: Option<Vec<_>> = uid.map(|x| x.into_iter().collect());
let status: Option<Vec<_>> = status.map(|x| x.into_iter().collect());
let index_uid: Option<Vec<_>> =
index_uid.map(|x| x.into_iter().map(|x| x.to_string()).collect());
let query = Query {
limit: None,
from: None,
status,
kind,
index_uid,
uid,
};
if query.is_empty() {
return Err(index_scheduler::Error::TaskCancelationWithEmptyQuery.into());
}
let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query);
let tasks = index_scheduler.get_task_ids(&filtered_query)?;
let filtered_query_string = yaup::to_string(&filtered_query).unwrap();
let task_cancelation = KindWithContent::TaskCancelation {
query: filtered_query_string,
tasks,
};
let task = index_scheduler.register(task_cancelation)?;
let task_view = TaskView::from_task(&task);
Ok(HttpResponse::Ok().json(task_view))
}
async fn delete_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>,
params: web::Query<TaskDeletionQuery>,
@ -225,12 +280,12 @@ async fn delete_tasks(
index_uid,
uid,
};
if query.is_empty() {
return Err(index_scheduler::Error::TaskDeletionWithEmptyQuery.into());
}
let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query);
let tasks = index_scheduler.get_task_ids(&filtered_query)?;
let filtered_query_string = yaup::to_string(&filtered_query).unwrap();
let task_deletion = KindWithContent::TaskDeletion {
@ -239,7 +294,6 @@ async fn delete_tasks(
};
let task = index_scheduler.register(task_deletion)?;
let task_view = TaskView::from_task(&task);
Ok(HttpResponse::Ok().json(task_view))

View File

@ -150,6 +150,7 @@ pub enum Code {
DumpNotFound,
TaskNotFound,
TaskDeletionWithEmptyQuery,
TaskCancelationWithEmptyQuery,
PayloadTooLarge,
RetrieveDocument,
SearchDocuments,
@ -237,9 +238,11 @@ impl Code {
ErrCode::authentication("missing_authorization_header", StatusCode::UNAUTHORIZED)
}
TaskNotFound => ErrCode::invalid("task_not_found", StatusCode::NOT_FOUND),
// TODO: Lo: find the proper error name & message for this one
TaskDeletionWithEmptyQuery => {
ErrCode::invalid("task_deletion_with_empty_query", StatusCode::BAD_REQUEST)
ErrCode::invalid("missing_filters", StatusCode::BAD_REQUEST)
}
TaskCancelationWithEmptyQuery => {
ErrCode::invalid("missing_filters", StatusCode::BAD_REQUEST)
}
DumpNotFound => ErrCode::invalid("dump_not_found", StatusCode::NOT_FOUND),
NoSpaceLeftOnDevice => {

View File

@ -224,6 +224,8 @@ pub enum Action {
IndexesDelete,
#[serde(rename = "tasks.*")]
TasksAll,
#[serde(rename = "tasks.cancel")]
TasksCancel,
#[serde(rename = "tasks.delete")]
TasksDelete,
#[serde(rename = "tasks.get")]
@ -274,6 +276,8 @@ impl Action {
INDEXES_UPDATE => Some(Self::IndexesUpdate),
INDEXES_DELETE => Some(Self::IndexesDelete),
TASKS_ALL => Some(Self::TasksAll),
TASKS_CANCEL => Some(Self::TasksCancel),
TASKS_DELETE => Some(Self::TasksDelete),
TASKS_GET => Some(Self::TasksGet),
SETTINGS_ALL => Some(Self::SettingsAll),
SETTINGS_GET => Some(Self::SettingsGet),
@ -313,6 +317,7 @@ pub mod actions {
pub const INDEXES_UPDATE: u8 = IndexesUpdate.repr();
pub const INDEXES_DELETE: u8 = IndexesDelete.repr();
pub const TASKS_ALL: u8 = TasksAll.repr();
pub const TASKS_CANCEL: u8 = TasksCancel.repr();
pub const TASKS_DELETE: u8 = TasksDelete.repr();
pub const TASKS_GET: u8 = TasksGet.repr();
pub const SETTINGS_ALL: u8 = SettingsAll.repr();