From df40533741b11362854cea033068b07f83a5e42f Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 10 Feb 2025 14:05:32 +0100 Subject: [PATCH 1/6] Expose a route to get the update file content of a task --- crates/index-scheduler/src/error.rs | 4 ++ crates/index-scheduler/src/queue/mod.rs | 6 ++ crates/meilisearch-types/src/error.rs | 1 + crates/meilisearch/src/routes/tasks.rs | 88 ++++++++++++++++++++++++- 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index e749a1bcb..b5072276c 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -109,6 +109,8 @@ pub enum Error { InvalidIndexUid { index_uid: String }, #[error("Task `{0}` not found.")] TaskNotFound(TaskId), + #[error("Task `{0}` does not provide any content file.")] + TaskFileNotFound(TaskId), #[error("Batch `{0}` not found.")] BatchNotFound(BatchId), #[error("Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")] @@ -189,6 +191,7 @@ impl Error { | Error::InvalidTaskCanceledBy { .. } | Error::InvalidIndexUid { .. } | Error::TaskNotFound(_) + | Error::TaskFileNotFound(_) | Error::BatchNotFound(_) | Error::TaskDeletionWithEmptyQuery | Error::TaskCancelationWithEmptyQuery @@ -250,6 +253,7 @@ impl ErrorCode for Error { Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy, Error::InvalidIndexUid { .. } => Code::InvalidIndexUid, Error::TaskNotFound(_) => Code::TaskNotFound, + Error::TaskFileNotFound(_) => Code::TaskFileNotFound, Error::BatchNotFound(_) => Code::BatchNotFound, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, diff --git a/crates/index-scheduler/src/queue/mod.rs b/crates/index-scheduler/src/queue/mod.rs index c6a79fbb2..8850eb8fa 100644 --- a/crates/index-scheduler/src/queue/mod.rs +++ b/crates/index-scheduler/src/queue/mod.rs @@ -8,6 +8,7 @@ mod tasks_test; mod test; use std::collections::BTreeMap; +use std::fs::File as StdFile; use std::time::Duration; use file_store::FileStore; @@ -216,6 +217,11 @@ impl Queue { } } + /// Open and returns the task's content File. + pub fn update_file(&self, uuid: Uuid) -> file_store::Result { + self.file_store.get_update(uuid) + } + /// Delete a file from the index scheduler. /// /// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method. diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 5acc8aa27..f64301b8c 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -372,6 +372,7 @@ RemoteRemoteError , System , BAD_GATEWAY ; RemoteTimeout , System , BAD_GATEWAY ; TooManySearchRequests , System , SERVICE_UNAVAILABLE ; TaskNotFound , InvalidRequest , NOT_FOUND ; +TaskFileNotFound , InvalidRequest , NOT_FOUND ; BatchNotFound , InvalidRequest , NOT_FOUND ; TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ; TooManyVectors , InvalidRequest , BAD_REQUEST ; diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index 90fdc9c16..b0fd0f002 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -16,6 +16,7 @@ use serde::Serialize; use time::format_description::well_known::Rfc3339; use time::macros::format_description; use time::{Date, Duration, OffsetDateTime, Time}; +use tokio::io::AsyncReadExt; use tokio::task; use utoipa::{IntoParams, OpenApi, ToSchema}; @@ -44,7 +45,10 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .route(web::delete().to(SeqHandler(delete_tasks))), ) .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) - .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); + .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))) + .service( + web::resource("/{task_id}/file").route(web::get().to(SeqHandler(get_task_update_file))), + ); } #[derive(Debug, Deserr, IntoParams)] @@ -639,6 +643,88 @@ async fn get_task( } } +/// Get a task's update file. +/// +/// Get a [task's file](https://www.meilisearch.com/docs/learn/async/asynchronous_operations). +#[utoipa::path( + get, + path = "/{taskUid}/file", + tag = "Tasks", + security(("Bearer" = ["tasks.get", "tasks.*", "*"])), + params(("taskUid", format = UInt32, example = 0, description = "The task identifier", nullable = false)), + responses( + (status = 200, description = "Task successfully retrieved", body = TaskView, content_type = "application/x-ndjson", example = json!( + { + "uid": 1, + "indexUid": "movies", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 79000, + "indexedDocuments": 79000 + }, + "error": null, + "duration": "PT1S", + "enqueuedAt": "2021-01-01T09:39:00.000000Z", + "startedAt": "2021-01-01T09:39:01.000000Z", + "finishedAt": "2021-01-01T09:39:02.000000Z" + } + )), + (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( + { + "message": "The Authorization header is missing. It must use the bearer authorization method.", + "code": "missing_authorization_header", + "type": "auth", + "link": "https://docs.meilisearch.com/errors#missing_authorization_header" + } + )), + (status = 404, description = "The task uid does not exists", body = ResponseError, content_type = "application/json", example = json!( + { + "message": "Task :taskUid not found.", + "code": "task_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors/#task_not_found" + } + )) + ) +)] +async fn get_task_update_file( + index_scheduler: GuardedData, Data>, + task_uid: web::Path, +) -> Result { + /// TODO change the example + let task_uid_string = task_uid.into_inner(); + + let task_uid: TaskId = match task_uid_string.parse() { + Ok(id) => id, + Err(_e) => { + return Err(index_scheduler::Error::InvalidTaskUid { task_uid: task_uid_string }.into()) + } + }; + + let query = index_scheduler::Query { uids: Some(vec![task_uid]), ..Query::default() }; + let filters = index_scheduler.filters(); + let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(&query, filters)?; + + if let Some(task) = tasks.first() { + match task.content_uuid() { + Some(uuid) => { + // Yes, that's awful to put everything in memory when we could have streamed it from + // disk but it's really (really) complex to do with the current state of async Rust. + let file = index_scheduler.queue.update_file(uuid)?; + let mut tfile = tokio::fs::File::from_std(file); + let mut content = String::new(); + tfile.read_to_string(&mut content).await?; + Ok(HttpResponse::Ok().content_type("application/x-ndjson").body(content)) + } + None => Err(index_scheduler::Error::TaskFileNotFound(task_uid).into()), + } + } else { + Err(index_scheduler::Error::TaskNotFound(task_uid).into()) + } +} + pub enum DeserializeDateOption { Before, After, From c71eea8023809d8dd7460f7e51de204337348723 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 10 Feb 2025 14:33:01 +0100 Subject: [PATCH 2/6] Improve error message when update file has been processed --- crates/meilisearch/src/routes/tasks.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index b0fd0f002..af01b426e 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -1,3 +1,5 @@ +use std::io::ErrorKind; + use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; use deserr::actix_web::AwebQueryParameter; @@ -710,10 +712,15 @@ async fn get_task_update_file( if let Some(task) = tasks.first() { match task.content_uuid() { Some(uuid) => { + let mut tfile = match index_scheduler.queue.update_file(uuid) { + Ok(file) => tokio::fs::File::from_std(file), + Err(file_store::Error::IoError(e)) if e.kind() == ErrorKind::NotFound => { + return Err(index_scheduler::Error::TaskFileNotFound(task_uid).into()) + } + Err(e) => return Err(e.into()), + }; // Yes, that's awful to put everything in memory when we could have streamed it from // disk but it's really (really) complex to do with the current state of async Rust. - let file = index_scheduler.queue.update_file(uuid)?; - let mut tfile = tokio::fs::File::from_std(file); let mut content = String::new(); tfile.read_to_string(&mut content).await?; Ok(HttpResponse::Ok().content_type("application/x-ndjson").body(content)) From 55fa2dda00e9dc37e06ee53afe6b0e0fc25ac100 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 10 Feb 2025 14:52:48 +0100 Subject: [PATCH 3/6] Update the Open API example --- crates/meilisearch/src/routes/tasks.rs | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index af01b426e..c82019d12 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -655,24 +655,7 @@ async fn get_task( security(("Bearer" = ["tasks.get", "tasks.*", "*"])), params(("taskUid", format = UInt32, example = 0, description = "The task identifier", nullable = false)), responses( - (status = 200, description = "Task successfully retrieved", body = TaskView, content_type = "application/x-ndjson", example = json!( - { - "uid": 1, - "indexUid": "movies", - "status": "succeeded", - "type": "documentAdditionOrUpdate", - "canceledBy": null, - "details": { - "receivedDocuments": 79000, - "indexedDocuments": 79000 - }, - "error": null, - "duration": "PT1S", - "enqueuedAt": "2021-01-01T09:39:00.000000Z", - "startedAt": "2021-01-01T09:39:01.000000Z", - "finishedAt": "2021-01-01T09:39:02.000000Z" - } - )), + (status = 200, description = "The content of the task update", body = serde_json::Value, content_type = "application/x-ndjson"), (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( { "message": "The Authorization header is missing. It must use the bearer authorization method.", @@ -695,7 +678,6 @@ async fn get_task_update_file( index_scheduler: GuardedData, Data>, task_uid: web::Path, ) -> Result { - /// TODO change the example let task_uid_string = task_uid.into_inner(); let task_uid: TaskId = match task_uid_string.parse() { From 491d115c3c023e0442cc924b8382f78438d88163 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 10 Feb 2025 14:55:07 +0100 Subject: [PATCH 4/6] Change the route to get the task documents --- crates/meilisearch/src/routes/tasks.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index c82019d12..bd945a945 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -49,7 +49,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))) .service( - web::resource("/{task_id}/file").route(web::get().to(SeqHandler(get_task_update_file))), + web::resource("/{task_id}/documents") + .route(web::get().to(SeqHandler(get_task_documents_file))), ); } @@ -645,12 +646,12 @@ async fn get_task( } } -/// Get a task's update file. +/// Get a task's documents. /// -/// Get a [task's file](https://www.meilisearch.com/docs/learn/async/asynchronous_operations). +/// Get a [task's documents file](https://www.meilisearch.com/docs/learn/async/asynchronous_operations). #[utoipa::path( get, - path = "/{taskUid}/file", + path = "/{taskUid}/documents", tag = "Tasks", security(("Bearer" = ["tasks.get", "tasks.*", "*"])), params(("taskUid", format = UInt32, example = 0, description = "The task identifier", nullable = false)), @@ -674,7 +675,7 @@ async fn get_task( )) ) )] -async fn get_task_update_file( +async fn get_task_documents_file( index_scheduler: GuardedData, Data>, task_uid: web::Path, ) -> Result { From 7d0d8f4445b56b397df7ce8459cfab98ec2eef7c Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 10 Feb 2025 15:02:03 +0100 Subject: [PATCH 5/6] Make the feature experimental --- crates/index-scheduler/src/features.rs | 13 ++++++++++++ crates/meilisearch-types/src/features.rs | 1 + .../src/analytics/segment_analytics.rs | 3 +++ crates/meilisearch/src/routes/features.rs | 14 +++++++++++++ crates/meilisearch/src/routes/tasks.rs | 1 + crates/meilisearch/tests/dumps/mod.rs | 6 ++++-- crates/meilisearch/tests/features/mod.rs | 20 ++++++++++++------- 7 files changed, 49 insertions(+), 9 deletions(-) diff --git a/crates/index-scheduler/src/features.rs b/crates/index-scheduler/src/features.rs index 5dbe70444..394e6518f 100644 --- a/crates/index-scheduler/src/features.rs +++ b/crates/index-scheduler/src/features.rs @@ -105,6 +105,19 @@ impl RoFeatures { .into()) } } + + pub fn check_get_task_documents_route(&self) -> Result<()> { + if self.runtime.get_task_documents_route { + Ok(()) + } else { + Err(FeatureNotEnabledError { + disabled_action: "Getting the documents of an enqueued task", + feature: "get task documents route", + issue_link: "https://github.com/orgs/meilisearch/discussions/808", + } + .into()) + } + } } impl FeatureData { diff --git a/crates/meilisearch-types/src/features.rs b/crates/meilisearch-types/src/features.rs index a11e39aa6..37a504039 100644 --- a/crates/meilisearch-types/src/features.rs +++ b/crates/meilisearch-types/src/features.rs @@ -10,6 +10,7 @@ pub struct RuntimeTogglableFeatures { pub edit_documents_by_function: bool, pub contains_filter: bool, pub network: bool, + pub get_task_documents_route: bool, } #[derive(Default, Debug, Clone, Copy)] diff --git a/crates/meilisearch/src/analytics/segment_analytics.rs b/crates/meilisearch/src/analytics/segment_analytics.rs index 388644884..63882468a 100644 --- a/crates/meilisearch/src/analytics/segment_analytics.rs +++ b/crates/meilisearch/src/analytics/segment_analytics.rs @@ -197,6 +197,7 @@ struct Infos { experimental_max_number_of_batched_tasks: usize, experimental_limit_batched_tasks_total_size: u64, experimental_network: bool, + experimental_get_task_documents_route: bool, gpu_enabled: bool, db_path: bool, import_dump: bool, @@ -288,6 +289,7 @@ impl Infos { edit_documents_by_function, contains_filter, network, + get_task_documents_route, } = features; // We're going to override every sensible information. @@ -306,6 +308,7 @@ impl Infos { experimental_enable_logs_route: experimental_enable_logs_route | logs_route, experimental_reduce_indexing_memory_usage, experimental_network: network, + experimental_get_task_documents_route: get_task_documents_route, gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(), db_path: db_path != PathBuf::from("./data.ms"), import_dump: import_dump.is_some(), diff --git a/crates/meilisearch/src/routes/features.rs b/crates/meilisearch/src/routes/features.rs index e30bc8e8e..402bc11ae 100644 --- a/crates/meilisearch/src/routes/features.rs +++ b/crates/meilisearch/src/routes/features.rs @@ -51,6 +51,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { edit_documents_by_function: Some(false), contains_filter: Some(false), network: Some(false), + get_task_documents_route: Some(false), })), (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( { @@ -91,6 +92,8 @@ pub struct RuntimeTogglableFeatures { pub contains_filter: Option, #[deserr(default)] pub network: Option, + #[deserr(default)] + pub get_task_documents_route: Option, } impl From for RuntimeTogglableFeatures { @@ -101,6 +104,7 @@ impl From for RuntimeTogg edit_documents_by_function, contains_filter, network, + get_task_documents_route, } = value; Self { @@ -109,6 +113,7 @@ impl From for RuntimeTogg edit_documents_by_function: Some(edit_documents_by_function), contains_filter: Some(contains_filter), network: Some(network), + get_task_documents_route: Some(get_task_documents_route), } } } @@ -120,6 +125,7 @@ pub struct PatchExperimentalFeatureAnalytics { edit_documents_by_function: bool, contains_filter: bool, network: bool, + get_task_documents_route: bool, } impl Aggregate for PatchExperimentalFeatureAnalytics { @@ -134,6 +140,7 @@ impl Aggregate for PatchExperimentalFeatureAnalytics { edit_documents_by_function: new.edit_documents_by_function, contains_filter: new.contains_filter, network: new.network, + get_task_documents_route: new.get_task_documents_route, }) } @@ -157,6 +164,7 @@ impl Aggregate for PatchExperimentalFeatureAnalytics { edit_documents_by_function: Some(false), contains_filter: Some(false), network: Some(false), + get_task_documents_route: Some(false), })), (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( { @@ -190,6 +198,10 @@ async fn patch_features( .unwrap_or(old_features.edit_documents_by_function), contains_filter: new_features.0.contains_filter.unwrap_or(old_features.contains_filter), network: new_features.0.network.unwrap_or(old_features.network), + get_task_documents_route: new_features + .0 + .get_task_documents_route + .unwrap_or(old_features.get_task_documents_route), }; // explicitly destructure for analytics rather than using the `Serialize` implementation, because @@ -201,6 +213,7 @@ async fn patch_features( edit_documents_by_function, contains_filter, network, + get_task_documents_route, } = new_features; analytics.publish( @@ -210,6 +223,7 @@ async fn patch_features( edit_documents_by_function, contains_filter, network, + get_task_documents_route, }, &req, ); diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index bd945a945..3ef116dd7 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -679,6 +679,7 @@ async fn get_task_documents_file( index_scheduler: GuardedData, Data>, task_uid: web::Path, ) -> Result { + index_scheduler.features().check_get_task_documents_route()?; let task_uid_string = task_uid.into_inner(); let task_uid: TaskId = match task_uid_string.parse() { diff --git a/crates/meilisearch/tests/dumps/mod.rs b/crates/meilisearch/tests/dumps/mod.rs index 2b4c32cc7..6102a2817 100644 --- a/crates/meilisearch/tests/dumps/mod.rs +++ b/crates/meilisearch/tests/dumps/mod.rs @@ -1909,7 +1909,8 @@ async fn import_dump_v6_containing_experimental_features() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); @@ -2071,7 +2072,8 @@ async fn generate_and_import_dump_containing_vectors() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); diff --git a/crates/meilisearch/tests/features/mod.rs b/crates/meilisearch/tests/features/mod.rs index 36559daf6..d417efa4c 100644 --- a/crates/meilisearch/tests/features/mod.rs +++ b/crates/meilisearch/tests/features/mod.rs @@ -22,7 +22,8 @@ async fn experimental_features() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); @@ -35,7 +36,8 @@ async fn experimental_features() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); @@ -48,7 +50,8 @@ async fn experimental_features() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); @@ -62,7 +65,8 @@ async fn experimental_features() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); @@ -76,7 +80,8 @@ async fn experimental_features() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); } @@ -97,7 +102,8 @@ async fn experimental_feature_metrics() { "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } "###); @@ -152,7 +158,7 @@ async fn errors() { meili_snap::snapshot!(code, @"400 Bad Request"); meili_snap::snapshot!(meili_snap::json_string!(response), @r###" { - "message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `network`", + "message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `network`, `getTaskDocumentsRoute`", "code": "bad_request", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#bad_request" From acb06cb3e6d160039883af7a73aa518bb0da2b67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 10 Feb 2025 16:53:50 +0100 Subject: [PATCH 6/6] Improve the error message when missing documents Co-authored-by: Tamo --- crates/index-scheduler/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index b5072276c..280127d04 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -109,7 +109,7 @@ pub enum Error { InvalidIndexUid { index_uid: String }, #[error("Task `{0}` not found.")] TaskNotFound(TaskId), - #[error("Task `{0}` does not provide any content file.")] + #[error("Task `{0}` does not contain any documents. Only `documentAdditionOrUpdate` tasks with the statuses `enqueued` or `processing` contain documents")] TaskFileNotFound(TaskId), #[error("Batch `{0}` not found.")] BatchNotFound(BatchId),