From 3adbc2b942e7bb0808720f46838aab2acef26e00 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Nov 2023 15:08:13 +0100 Subject: [PATCH] return a task view instead of a task --- index-scheduler/src/lib.rs | 4 +- meilisearch-types/src/lib.rs | 1 + meilisearch-types/src/task_view.rs | 139 ++++++++++++++++++++++++++++ meilisearch/src/routes/tasks.rs | 140 +---------------------------- meilisearch/tests/tasks/webhook.rs | 10 ++- 5 files changed, 153 insertions(+), 141 deletions(-) create mode 100644 meilisearch-types/src/task_view.rs diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8502da242..d96b6c2af 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -54,6 +54,7 @@ use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; +use meilisearch_types::task_view::TaskView; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use puffin::FrameView; use roaring::RoaringBitmap; @@ -1283,7 +1284,8 @@ impl IndexScheduler { for id in updated { let task = self.get_task(&rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; - let _ = serde_json::to_writer(&mut buffer, &task); + let _ = serde_json::to_writer(&mut buffer, &TaskView::from_task(&task)); + buffer.push(b'\n'); } println!("Sending request to {url}"); diff --git a/meilisearch-types/src/lib.rs b/meilisearch-types/src/lib.rs index b0762563a..e4f5cbeb4 100644 --- a/meilisearch-types/src/lib.rs +++ b/meilisearch-types/src/lib.rs @@ -9,6 +9,7 @@ pub mod index_uid_pattern; pub mod keys; pub mod settings; pub mod star_or; +pub mod task_view; pub mod tasks; pub mod versioning; pub use milli::{heed, Index}; diff --git a/meilisearch-types/src/task_view.rs b/meilisearch-types/src/task_view.rs new file mode 100644 index 000000000..02be91a88 --- /dev/null +++ b/meilisearch-types/src/task_view.rs @@ -0,0 +1,139 @@ +use serde::Serialize; +use time::{Duration, OffsetDateTime}; + +use crate::error::ResponseError; +use crate::settings::{Settings, Unchecked}; +use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskView { + pub uid: TaskId, + #[serde(default)] + pub index_uid: Option, + pub status: Status, + #[serde(rename = "type")] + pub kind: Kind, + pub canceled_by: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option, + pub error: Option, + #[serde(serialize_with = "serialize_duration", default)] + pub duration: Option, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339::option", default)] + pub started_at: Option, + #[serde(with = "time::serde::rfc3339::option", default)] + pub finished_at: Option, +} + +impl TaskView { + pub fn from_task(task: &Task) -> TaskView { + TaskView { + uid: task.uid, + index_uid: task.index_uid().map(ToOwned::to_owned), + status: task.status, + kind: task.kind.as_kind(), + canceled_by: task.canceled_by, + details: task.details.clone().map(DetailsView::from), + error: task.error.clone(), + duration: task.started_at.zip(task.finished_at).map(|(start, end)| end - start), + enqueued_at: task.enqueued_at, + started_at: task.started_at, + finished_at: task.finished_at, + } + } +} + +#[derive(Default, Debug, PartialEq, Eq, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DetailsView { + #[serde(skip_serializing_if = "Option::is_none")] + pub received_documents: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub indexed_documents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub primary_key: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub provided_ids: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_documents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub matched_tasks: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub canceled_tasks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_tasks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub original_filter: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub dump_uid: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(flatten)] + pub settings: Option>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub swaps: Option>, +} + +impl From
for DetailsView { + fn from(details: Details) -> Self { + match details { + Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => { + DetailsView { + received_documents: Some(received_documents), + indexed_documents: Some(indexed_documents), + ..DetailsView::default() + } + } + Details::SettingsUpdate { settings } => { + DetailsView { settings: Some(settings), ..DetailsView::default() } + } + Details::IndexInfo { primary_key } => { + DetailsView { primary_key: Some(primary_key), ..DetailsView::default() } + } + Details::DocumentDeletion { + provided_ids: received_document_ids, + deleted_documents, + } => DetailsView { + provided_ids: Some(received_document_ids), + deleted_documents: Some(deleted_documents), + original_filter: Some(None), + ..DetailsView::default() + }, + Details::DocumentDeletionByFilter { original_filter, deleted_documents } => { + DetailsView { + provided_ids: Some(0), + original_filter: Some(Some(original_filter)), + deleted_documents: Some(deleted_documents), + ..DetailsView::default() + } + } + Details::ClearAll { deleted_documents } => { + DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() } + } + Details::TaskCancelation { matched_tasks, canceled_tasks, original_filter } => { + DetailsView { + matched_tasks: Some(matched_tasks), + canceled_tasks: Some(canceled_tasks), + original_filter: Some(Some(original_filter)), + ..DetailsView::default() + } + } + Details::TaskDeletion { matched_tasks, deleted_tasks, original_filter } => { + DetailsView { + matched_tasks: Some(matched_tasks), + deleted_tasks: Some(deleted_tasks), + original_filter: Some(Some(original_filter)), + ..DetailsView::default() + } + } + Details::Dump { dump_uid } => { + DetailsView { dump_uid: Some(dump_uid), ..DetailsView::default() } + } + Details::IndexSwap { swaps } => { + DetailsView { swaps: Some(swaps), ..Default::default() } + } + } + } +} diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index f7d4c44d7..03b63001d 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -8,11 +8,9 @@ use meilisearch_types::deserr::DeserrQueryParamError; use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::{InvalidTaskDateError, ResponseError}; use meilisearch_types::index_uid::IndexUid; -use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::star_or::{OptionStarOr, OptionStarOrList}; -use meilisearch_types::tasks::{ - serialize_duration, Details, IndexSwap, Kind, KindWithContent, Status, Task, -}; +use meilisearch_types::task_view::TaskView; +use meilisearch_types::tasks::{Kind, KindWithContent, Status}; use serde::Serialize; use serde_json::json; use time::format_description::well_known::Rfc3339; @@ -37,140 +35,6 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); } - -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct TaskView { - pub uid: TaskId, - #[serde(default)] - pub index_uid: Option, - pub status: Status, - #[serde(rename = "type")] - pub kind: Kind, - pub canceled_by: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub details: Option, - pub error: Option, - #[serde(serialize_with = "serialize_duration", default)] - pub duration: Option, - #[serde(with = "time::serde::rfc3339")] - pub enqueued_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339::option", default)] - pub started_at: Option, - #[serde(with = "time::serde::rfc3339::option", default)] - pub finished_at: Option, -} - -impl TaskView { - pub fn from_task(task: &Task) -> TaskView { - TaskView { - uid: task.uid, - index_uid: task.index_uid().map(ToOwned::to_owned), - status: task.status, - kind: task.kind.as_kind(), - canceled_by: task.canceled_by, - details: task.details.clone().map(DetailsView::from), - error: task.error.clone(), - duration: task.started_at.zip(task.finished_at).map(|(start, end)| end - start), - enqueued_at: task.enqueued_at, - started_at: task.started_at, - finished_at: task.finished_at, - } - } -} - -#[derive(Default, Debug, PartialEq, Eq, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DetailsView { - #[serde(skip_serializing_if = "Option::is_none")] - pub received_documents: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub indexed_documents: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub primary_key: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub provided_ids: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_documents: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub matched_tasks: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub canceled_tasks: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_tasks: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub original_filter: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub dump_uid: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(flatten)] - pub settings: Option>>, - #[serde(skip_serializing_if = "Option::is_none")] - pub swaps: Option>, -} - -impl From
for DetailsView { - fn from(details: Details) -> Self { - match details { - Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => { - DetailsView { - received_documents: Some(received_documents), - indexed_documents: Some(indexed_documents), - ..DetailsView::default() - } - } - Details::SettingsUpdate { settings } => { - DetailsView { settings: Some(settings), ..DetailsView::default() } - } - Details::IndexInfo { primary_key } => { - DetailsView { primary_key: Some(primary_key), ..DetailsView::default() } - } - Details::DocumentDeletion { - provided_ids: received_document_ids, - deleted_documents, - } => DetailsView { - provided_ids: Some(received_document_ids), - deleted_documents: Some(deleted_documents), - original_filter: Some(None), - ..DetailsView::default() - }, - Details::DocumentDeletionByFilter { original_filter, deleted_documents } => { - DetailsView { - provided_ids: Some(0), - original_filter: Some(Some(original_filter)), - deleted_documents: Some(deleted_documents), - ..DetailsView::default() - } - } - Details::ClearAll { deleted_documents } => { - DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() } - } - Details::TaskCancelation { matched_tasks, canceled_tasks, original_filter } => { - DetailsView { - matched_tasks: Some(matched_tasks), - canceled_tasks: Some(canceled_tasks), - original_filter: Some(Some(original_filter)), - ..DetailsView::default() - } - } - Details::TaskDeletion { matched_tasks, deleted_tasks, original_filter } => { - DetailsView { - matched_tasks: Some(matched_tasks), - deleted_tasks: Some(deleted_tasks), - original_filter: Some(Some(original_filter)), - ..DetailsView::default() - } - } - Details::Dump { dump_uid } => { - DetailsView { dump_uid: Some(dump_uid), ..DetailsView::default() } - } - Details::IndexSwap { swaps } => { - DetailsView { swaps: Some(swaps), ..Default::default() } - } - } - } -} - #[derive(Debug, Deserr)] #[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)] pub struct TasksFilterQuery { diff --git a/meilisearch/tests/tasks/webhook.rs b/meilisearch/tests/tasks/webhook.rs index c95d3fc5b..613b4ff3f 100644 --- a/meilisearch/tests/tasks/webhook.rs +++ b/meilisearch/tests/tasks/webhook.rs @@ -73,6 +73,7 @@ async fn test_basic_webhook() { .unwrap(); let index = server.index("tamo"); + // TODO: may be flaky, we're relying on the fact that during the time the first document addition succeed, the two other operations will be received. for i in 0..3 { let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await; } @@ -81,11 +82,16 @@ async fn test_basic_webhook() { let jsonl = String::from_utf8(payload).unwrap(); snapshot!(jsonl, - @r###"{"uid":0,"enqueuedAt":"2023-11-28T13:43:24.754587Z","startedAt":"2023-11-28T13:43:24.756445Z","finishedAt":"2023-11-28T13:43:24.791527Z","error":null,"canceledBy":null,"details":{"DocumentAdditionOrUpdate":{"received_documents":1,"indexed_documents":1}},"status":"succeeded","kind":{"documentAdditionOrUpdate":{"index_uid":"tamo","primary_key":null,"method":"ReplaceDocuments","content_file":"ca77ac82-4504-4c85-81a5-1a8d68f1a386","documents_count":1,"allow_index_creation":true}}}"###); + @r###" + {"uid":0,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.027444S","enqueuedAt":"2023-11-28T14:05:37.767678Z","startedAt":"2023-11-28T14:05:37.769519Z","finishedAt":"2023-11-28T14:05:37.796963Z"} + "###); let payload = handle.receiver.recv().await.unwrap(); let jsonl = String::from_utf8(payload).unwrap(); snapshot!(jsonl, - @r###"{"uid":1,"enqueuedAt":"2023-11-28T13:43:24.761498Z","startedAt":"2023-11-28T13:43:24.793989Z","finishedAt":"2023-11-28T13:43:24.814623Z","error":null,"canceledBy":null,"details":{"DocumentAdditionOrUpdate":{"received_documents":1,"indexed_documents":1}},"status":"succeeded","kind":{"documentAdditionOrUpdate":{"index_uid":"tamo","primary_key":null,"method":"ReplaceDocuments","content_file":"c947aefa-7f98-433d-8ce4-5926d8d2ce10","documents_count":1,"allow_index_creation":true}}}{"uid":2,"enqueuedAt":"2023-11-28T13:43:24.76776Z","startedAt":"2023-11-28T13:43:24.793989Z","finishedAt":"2023-11-28T13:43:24.814623Z","error":null,"canceledBy":null,"details":{"DocumentAdditionOrUpdate":{"received_documents":1,"indexed_documents":1}},"status":"succeeded","kind":{"documentAdditionOrUpdate":{"index_uid":"tamo","primary_key":null,"method":"ReplaceDocuments","content_file":"a21d6da6-9322-4827-8c08-f33d2e1b6cae","documents_count":1,"allow_index_creation":true}}}"###); + @r###" + {"uid":1,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.020221S","enqueuedAt":"2023-11-28T14:05:37.773731Z","startedAt":"2023-11-28T14:05:37.799448Z","finishedAt":"2023-11-28T14:05:37.819669Z"} + {"uid":2,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.020221S","enqueuedAt":"2023-11-28T14:05:37.780466Z","startedAt":"2023-11-28T14:05:37.799448Z","finishedAt":"2023-11-28T14:05:37.819669Z"} + "###); }