diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index 8540ce4b2..a535ec686 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -8,12 +8,14 @@ type Result = std::result::Result; #[derive(Debug)] pub enum PayloadType { Jsonl, + Json, } impl fmt::Display for PayloadType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { PayloadType::Jsonl => write!(f, "ndjson"), + PayloadType::Json => write!(f, "json"), } } } @@ -50,3 +52,14 @@ pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> { Ok(()) } + +/// read json from input and write an obkv batch to writer. +pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> { + let mut builder = DocumentBatchBuilder::new(writer).unwrap(); + + let documents: Vec> = malformed!(PayloadType::Json, serde_json::from_reader(input))?; + builder.add_documents(documents).unwrap(); + builder.finish().unwrap(); + + Ok(()) +} diff --git a/meilisearch-lib/src/index_controller/updates/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs index 0e667fe65..d6c3bcba4 100644 --- a/meilisearch-lib/src/index_controller/updates/error.rs +++ b/meilisearch-lib/src/index_controller/updates/error.rs @@ -3,7 +3,7 @@ use std::error::Error; use meilisearch_error::{Code, ErrorCode}; -use crate::index_controller::update_file_store::UpdateFileStoreError; +use crate::{document_formats::DocumentFormatError, index_controller::update_file_store::UpdateFileStoreError}; pub type Result = std::result::Result; @@ -21,7 +21,8 @@ pub enum UpdateLoopError { )] FatalUpdateStoreError, #[error("{0}")] - InvalidPayload(Box), + InvalidPayload(#[from] DocumentFormatError), + // TODO: The reference to actix has to go. #[error("{0}")] PayloadError(#[from] actix_web::error::PayloadError), } diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index e9b8cdd84..5296c03e5 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -13,22 +13,21 @@ use async_stream::stream; use bytes::Bytes; use futures::{Stream, StreamExt}; use log::trace; -use milli::documents::DocumentBatchBuilder; use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; use tokio::sync::mpsc; use uuid::Uuid; use self::error::{Result, UpdateLoopError}; pub use self::message::UpdateMsg; use self::store::{UpdateStore, UpdateStoreInfo}; +use crate::document_formats::read_json; use crate::index::{Index, Settings, Unchecked}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; use super::index_resolver::HardStateIndexResolver; -use super::{DocumentAdditionFormat, Payload, Update}; +use super::{DocumentAdditionFormat, Update}; pub type UpdateSender = mpsc::Sender; @@ -197,9 +196,18 @@ impl UpdateLoop { method, format, } => { - let content_uuid = match format { - DocumentAdditionFormat::Json => self.documents_from_json(payload).await?, - }; + let reader = StreamReader::new(payload); + let (content_uuid, mut update_file) = self.update_file_store.new_update()?; + tokio::task::spawn_blocking(move || -> Result<_> { + match format { + DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, + } + + update_file.persist()?; + + Ok(()) + }).await??; + RegisterUpdate::DocumentAddition { primary_key, @@ -220,23 +228,6 @@ impl UpdateLoop { Ok(status.into()) } - async fn documents_from_json(&self, payload: Payload) -> Result { - let file_store = self.update_file_store.clone(); - tokio::task::spawn_blocking(move || { - let (uuid, mut file) = file_store.new_update().unwrap(); - let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap(); - - let documents: Vec> = - serde_json::from_reader(StreamReader::new(payload))?; - builder.add_documents(documents).unwrap(); - builder.finish().unwrap(); - - file.persist()?; - - Ok(uuid) - }) - .await? - } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { let update_store = self.store.clone();