From d683f5980ce7232c7e9be4d0b3d3f5aefb0335af Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 10 Dec 2024 19:19:27 +0100 Subject: [PATCH] Do not duplicate NDJson when unecessary --- crates/file-store/src/lib.rs | 8 ++ .../meilisearch-types/src/document_formats.rs | 21 ++- .../src/routes/indexes/documents.rs | 127 +++++++++++------- 3 files changed, 98 insertions(+), 58 deletions(-) diff --git a/crates/file-store/src/lib.rs b/crates/file-store/src/lib.rs index c8b3849ab..39ed9482b 100644 --- a/crates/file-store/src/lib.rs +++ b/crates/file-store/src/lib.rs @@ -136,6 +136,14 @@ pub struct File { } impl File { + pub fn from_parts(path: PathBuf, file: Option) -> Self { + Self { path, file } + } + + pub fn into_parts(self) -> (PathBuf, Option) { + (self.path, self.file) + } + pub fn dry_file() -> Result { Ok(Self { path: PathBuf::new(), file: None }) } diff --git a/crates/meilisearch-types/src/document_formats.rs b/crates/meilisearch-types/src/document_formats.rs index c6e8ad907..4820ac523 100644 --- a/crates/meilisearch-types/src/document_formats.rs +++ b/crates/meilisearch-types/src/document_formats.rs @@ -250,26 +250,25 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result { } } -/// Reads NDJSON from file and write it in NDJSON in a file checking it along the way. -pub fn read_ndjson(input: &File, output: impl io::Write) -> Result { +/// Reads NDJSON from file and checks it. +pub fn read_ndjson(input: &File) -> Result { // We memory map to be able to deserialize into a RawMap that // does not allocate when possible and only materialize the first/top level. let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? }; - let mut output = BufWriter::new(output); - let mut bump = Bump::with_capacity(1024 * 1024); let mut count = 0; for result in serde_json::Deserializer::from_slice(&input).into_iter() { bump.reset(); - count += 1; - result - .and_then(|raw: &RawValue| { + match result { + Ok(raw) => { // try to deserialize as a map - let map = RawMap::from_raw_value(raw, &bump)?; - to_writer(&mut output, &map) - }) - .map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?; + RawMap::from_raw_value(raw, &bump) + .map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?; + count += 1; + } + Err(e) => return Err(DocumentFormatError::from((PayloadType::Ndjson, e))), + } } Ok(count) diff --git a/crates/meilisearch/src/routes/indexes/documents.rs b/crates/meilisearch/src/routes/indexes/documents.rs index 47f73ef42..0b18810d7 100644 --- a/crates/meilisearch/src/routes/indexes/documents.rs +++ b/crates/meilisearch/src/routes/indexes/documents.rs @@ -1,5 +1,5 @@ use std::collections::HashSet; -use std::io::ErrorKind; +use std::io::{ErrorKind, Seek as _}; use std::marker::PhantomData; use actix_web::http::header::CONTENT_TYPE; @@ -572,7 +572,7 @@ async fn document_addition( index_uid: IndexUid, primary_key: Option, csv_delimiter: Option, - mut body: Payload, + body: Payload, method: IndexDocumentsMethod, task_id: Option, dry_run: bool, @@ -609,54 +609,54 @@ async fn document_addition( }; let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?; + let documents_count = match format { + PayloadType::Ndjson => { + let (path, file) = update_file.into_parts(); + let file = match file { + Some(file) => { + let (file, path) = file.into_parts(); + let mut file = copy_body_to_file(file, body, format).await?; + file.rewind().map_err(|e| { + index_scheduler::Error::FileStore(file_store::Error::IoError(e)) + })?; + Some(tempfile::NamedTempFile::from_parts(file, path)) + } + None => None, + }; - let temp_file = match tempfile() { - Ok(file) => file, - Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), + let documents_count = file + .as_ref() + .map_or(Ok(0), |ntf| read_ndjson(ntf.as_file())) + .map_err(|e| MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); + let update_file = file_store::File::from_parts(path, file); + update_file.persist()?; + Ok(documents_count) + } + PayloadType::Json | PayloadType::Csv { delimiter: _ } => { + let temp_file = match tempfile() { + Ok(file) => file, + Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), + }; + + let read_file = copy_body_to_file(temp_file, body, format).await?; + tokio::task::spawn_blocking(move || { + let documents_count = match format { + PayloadType::Json => read_json(&read_file, &mut update_file)?, + PayloadType::Csv { delimiter } => { + read_csv(&read_file, &mut update_file, delimiter)? + } + PayloadType::Ndjson => { + unreachable!("We already wrote the user content into the update file") + } + }; + // we NEED to persist the file here because we moved the `udpate_file` in another task. + update_file.persist()?; + Ok(documents_count) + }) + .await + } }; - let async_file = File::from_std(temp_file); - let mut buffer = BufWriter::new(async_file); - - let mut buffer_write_size: usize = 0; - while let Some(result) = body.next().await { - let byte = result?; - - if byte.is_empty() && buffer_write_size == 0 { - return Err(MeilisearchHttpError::MissingPayload(format)); - } - - match buffer.write_all(&byte).await { - Ok(()) => buffer_write_size += 1, - Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), - } - } - - if let Err(e) = buffer.flush().await { - return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); - } - - if buffer_write_size == 0 { - return Err(MeilisearchHttpError::MissingPayload(format)); - } - - if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { - return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); - } - - let read_file = buffer.into_inner().into_std().await; - let documents_count = tokio::task::spawn_blocking(move || { - let documents_count = match format { - PayloadType::Json => read_json(&read_file, &mut update_file)?, - PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?, - PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?, - }; - // we NEED to persist the file here because we moved the `udpate_file` in another task. - update_file.persist()?; - Ok(documents_count) - }) - .await; - let documents_count = match documents_count { Ok(Ok(documents_count)) => documents_count, // in this case the file has not possibly be persisted. @@ -703,6 +703,39 @@ async fn document_addition( Ok(task.into()) } +async fn copy_body_to_file( + output: std::fs::File, + mut body: Payload, + format: PayloadType, +) -> Result { + let async_file = File::from_std(output); + let mut buffer = BufWriter::new(async_file); + let mut buffer_write_size: usize = 0; + while let Some(result) = body.next().await { + let byte = result?; + + if byte.is_empty() && buffer_write_size == 0 { + return Err(MeilisearchHttpError::MissingPayload(format)); + } + + match buffer.write_all(&byte).await { + Ok(()) => buffer_write_size += 1, + Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))), + } + } + if let Err(e) = buffer.flush().await { + return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); + } + if buffer_write_size == 0 { + return Err(MeilisearchHttpError::MissingPayload(format)); + } + if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { + return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); + } + let read_file = buffer.into_inner().into_std().await; + Ok(read_file) +} + pub async fn delete_documents_batch( index_scheduler: GuardedData, Data>, index_uid: web::Path,