diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index e5e38837b..8617e1dbe 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -31,6 +31,7 @@ pub struct Query { #[serde(rename = "type")] pub kind: Option>, pub index_uid: Option>, + pub uid: Option>, } impl Default for Query { @@ -41,6 +42,7 @@ impl Default for Query { status: None, kind: None, index_uid: None, + uid: None, } } } @@ -72,6 +74,15 @@ impl Query { ..self } } + + pub fn with_uid(self, uid: TaskId) -> Self { + let mut task_vec = self.uid.unwrap_or_default(); + task_vec.push(uid); + Self { + uid: Some(task_vec), + ..self + } + } } pub mod db_name { @@ -172,7 +183,11 @@ impl IndexScheduler { }; // This is the list of all the tasks. - let mut tasks = RoaringBitmap::from_iter(0..last_task_id); + let mut tasks = RoaringBitmap::from_sorted_iter(0..last_task_id).unwrap(); + + if let Some(uids) = query.uid { + tasks &= RoaringBitmap::from_iter(uids); + } if let Some(status) = query.status { let mut status_tasks = RoaringBitmap::new(); @@ -256,6 +271,10 @@ impl IndexScheduler { Ok(self.file_store.new_update()?) } + pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> { + Ok(self.file_store.delete(uuid)?) + } + /// This worker function must be run in a different thread and must be run only once. pub fn run(&self) -> ! { loop { diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index d34e9a1c9..3d60be30f 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -18,16 +18,27 @@ pub struct TaskView { #[serde(rename = "type")] pub kind: Kind, + #[serde(skip_serializing_if = "Option::is_none")] pub details: Option
, + #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, - #[serde(serialize_with = "serialize_duration")] + #[serde( + serialize_with = "serialize_duration", + skip_serializing_if = "Option::is_none" + )] pub duration: Option, #[serde(with = "time::serde::rfc3339")] pub enqueued_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339::option")] + #[serde( + with = "time::serde::rfc3339::option", + skip_serializing_if = "Option::is_none" + )] pub started_at: Option, - #[serde(with = "time::serde::rfc3339::option")] + #[serde( + with = "time::serde::rfc3339::option", + skip_serializing_if = "Option::is_none" + )] pub finished_at: Option, } diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index 86b7c1964..9b2709028 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -1,6 +1,9 @@ use actix_web as aweb; use aweb::error::{JsonPayloadError, QueryPayloadError}; +use document_formats::DocumentFormatError; +use meilisearch_lib::IndexControllerError; use meilisearch_types::error::{Code, ErrorCode, ResponseError}; +use tokio::task::JoinError; #[derive(Debug, thiserror::Error)] pub enum MeilisearchHttpError { @@ -12,6 +15,16 @@ pub enum MeilisearchHttpError { .1.iter().map(|s| format!("`{}`", s)).collect::>().join(", ") )] InvalidContentType(String, Vec), + #[error(transparent)] + IndexScheduler(#[from] index_scheduler::Error), + #[error(transparent)] + Payload(#[from] PayloadError), + #[error(transparent)] + DocumentFormat(#[from] DocumentFormatError), + #[error(transparent)] + IndexController(#[from] IndexControllerError), + #[error(transparent)] + Join(#[from] JoinError), } impl ErrorCode for MeilisearchHttpError { @@ -19,6 +32,11 @@ impl ErrorCode for MeilisearchHttpError { match self { MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType, MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType, + MeilisearchHttpError::IndexScheduler(e) => e.error_code(), + MeilisearchHttpError::Payload(e) => e.error_code(), + MeilisearchHttpError::DocumentFormat(e) => e.error_code(), + MeilisearchHttpError::IndexController(e) => e.error_code(), + MeilisearchHttpError::Join(_) => Code::Internal, } } } @@ -29,11 +47,19 @@ impl From for aweb::Error { } } +impl From for MeilisearchHttpError { + fn from(error: aweb::error::PayloadError) -> Self { + MeilisearchHttpError::Payload(PayloadError::Payload(error)) + } +} + #[derive(Debug, thiserror::Error)] pub enum PayloadError { - #[error("{0}")] + #[error(transparent)] + Payload(aweb::error::PayloadError), + #[error(transparent)] Json(JsonPayloadError), - #[error("{0}")] + #[error(transparent)] Query(QueryPayloadError), #[error("The json payload provided is malformed. `{0}`.")] MalformedPayload(serde_json::error::Error), @@ -44,6 +70,15 @@ pub enum PayloadError { impl ErrorCode for PayloadError { fn error_code(&self) -> Code { match self { + PayloadError::Payload(e) => match e { + aweb::error::PayloadError::Incomplete(_) => todo!(), + aweb::error::PayloadError::EncodingCorrupted => todo!(), + aweb::error::PayloadError::Overflow => todo!(), + aweb::error::PayloadError::UnknownLength => todo!(), + aweb::error::PayloadError::Http2Payload(_) => todo!(), + aweb::error::PayloadError::Io(_) => todo!(), + _ => todo!(), + }, PayloadError::Json(err) => match err { JsonPayloadError::Overflow { .. } => Code::PayloadTooLarge, JsonPayloadError::ContentType => Code::UnsupportedMediaType, diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 31f0c1858..6125cce96 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -1,10 +1,12 @@ +use std::io::Cursor; + use actix_web::error::PayloadError; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Bytes; use actix_web::HttpMessage; use actix_web::{web, HttpRequest, HttpResponse}; use bstr::ByteSlice; -use document_formats::PayloadType; +use document_formats::{read_csv, read_json, read_ndjson, PayloadType}; use futures::{Stream, StreamExt}; use index_scheduler::{KindWithContent, TaskView}; use log::debug; @@ -235,10 +237,10 @@ async fn document_addition( meilisearch: GuardedData, MeiliSearch>, index_uid: String, primary_key: Option, - body: Payload, + mut body: Payload, method: IndexDocumentsMethod, allow_index_creation: bool, -) -> Result { +) -> Result { let format = match mime_type .as_ref() .map(|m| (m.type_().as_str(), m.subtype().as_str())) @@ -260,14 +262,46 @@ async fn document_addition( } }; - // TODO: TAMO: do something with the update file - // Box::new(payload_to_stream(body)) - let (uuid, file) = meilisearch.create_update_file()?; + let (uuid, mut update_file) = meilisearch.create_update_file()?; + + // push the entire stream into a `Vec`. + // TODO: Maybe we should write it to a file to reduce the RAM consumption + // and then reread it to convert it to obkv? + let mut buffer = Vec::new(); + while let Some(bytes) = body.next().await { + buffer.extend_from_slice(&bytes?); + } + let reader = Cursor::new(buffer); + + let documents_count = + tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> { + let documents_count = match format { + PayloadType::Json => read_json(reader, update_file.as_file_mut())?, + PayloadType::Csv => read_csv(reader, update_file.as_file_mut())?, + PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?, + }; + // we NEED to persist the file here because we moved the `udpate_file` in another task. + update_file.persist(); + Ok(documents_count) + }) + .await; + + let documents_count = match documents_count { + Ok(Ok(documents_count)) => documents_count, + Ok(Err(e)) => { + meilisearch.delete_update_file(uuid)?; + return Err(e.into()); + } + Err(e) => { + meilisearch.delete_update_file(uuid)?; + return Err(e.into()); + } + }; let task = match method { IndexDocumentsMethod::ReplaceDocuments => KindWithContent::DocumentAddition { content_file: uuid, - documents_count: 0, // TODO: TAMO: get the document count + documents_count, primary_key, allow_index_creation, index_uid, @@ -275,7 +309,7 @@ async fn document_addition( IndexDocumentsMethod::UpdateDocuments => KindWithContent::DocumentUpdate { content_file: uuid, - documents_count: 0, // TODO: TAMO: get the document count + documents_count, primary_key, allow_index_creation, index_uid, @@ -284,7 +318,13 @@ async fn document_addition( _ => todo!(), }; - let task = meilisearch.register_task(task).await?; + let task = match meilisearch.register_task(task).await { + Ok(task) => task, + Err(e) => { + meilisearch.delete_update_file(uuid)?; + return Err(e.into()); + } + }; debug!("returns: {:?}", task); Ok(task) diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 209345d00..83a351b17 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -156,6 +156,8 @@ async fn get_task( req: HttpRequest, analytics: web::Data, ) -> Result { + let task_id = task_id.into_inner(); + analytics.publish( "Tasks Seen".to_string(), json!({ "per_task_uid": true }), @@ -170,10 +172,11 @@ async fn get_task( } } - filters.limit = 1; - filters.from = Some(*task_id); + filters.uid = Some(vec![task_id]); - let task = meilisearch.list_tasks(filters).await?; - - Ok(HttpResponse::Ok().json(task)) + if let Some(task) = meilisearch.list_tasks(filters).await?.first() { + Ok(HttpResponse::Ok().json(task)) + } else { + Err(index_scheduler::Error::TaskNotFound(task_id).into()) + } } diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index 2e74298b6..a2706a1a6 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -51,8 +51,8 @@ impl ErrorCode for IndexControllerError { IndexControllerError::DocumentFormatError(e) => e.error_code(), IndexControllerError::MissingPayload(_) => Code::MissingPayload, IndexControllerError::PayloadTooLarge => Code::PayloadTooLarge, - IndexControllerError::IndexResolver(_) => todo!(), - IndexControllerError::IndexError(_) => todo!(), + IndexControllerError::IndexResolver(e) => e.error_code(), + IndexControllerError::IndexError(e) => e.error_code(), } } } diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 264d42050..3a16daeea 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -15,6 +15,7 @@ use std::ffi::OsStr; use std::path::Path; // TODO: TAMO: rename the MeiliSearch in Meilisearch +pub use index_controller::error::IndexControllerError; pub use index_controller::Meilisearch as MeiliSearch; pub use milli; pub use milli::heed;