diff --git a/Cargo.lock b/Cargo.lock index ac3a1aeef..dfc9818aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2378,6 +2378,7 @@ dependencies = [ "fst", "insta", "meili-snap", + "memmap2", "milli", "proptest", "proptest-derive", diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b57adb077..3ad546eb4 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1065,19 +1065,21 @@ impl IndexScheduler { #[cfg(test)] mod tests { + use std::io::{BufWriter, Seek, Write}; use std::time::Instant; use big_s::S; use crossbeam::channel::RecvTimeoutError; use file_store::File; use meili_snap::snapshot; + use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::update::IndexDocumentsMethod::{ ReplaceDocuments, UpdateDocuments, }; use meilisearch_types::tasks::IndexSwap; use meilisearch_types::VERSION_FILE_NAME; - use tempfile::TempDir; + use tempfile::{NamedTempFile, TempDir}; use time::Duration; use uuid::Uuid; use Breakpoint::*; @@ -1184,6 +1186,18 @@ mod tests { } } + /// Adapting to the new json reading interface + pub fn read_json( + bytes: &[u8], + write: impl Write + Seek, + ) -> std::result::Result { + let temp_file = NamedTempFile::new().unwrap(); + let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); + buffer.write_all(bytes).unwrap(); + buffer.flush().unwrap(); + meilisearch_types::document_formats::read_json(temp_file.as_file(), write) + } + /// Create an update file with the given file uuid. /// /// The update file contains just one simple document whose id is given by `document_id`. @@ -1202,9 +1216,7 @@ mod tests { ); let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap(); - let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; (file, documents_count) } @@ -1584,9 +1596,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1623,9 +1633,7 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1792,9 +1800,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1952,11 +1958,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2003,11 +2005,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2056,11 +2054,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2110,11 +2104,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2165,11 +2155,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2616,9 +2602,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2656,9 +2640,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2714,11 +2696,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2766,11 +2744,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2824,11 +2798,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2887,11 +2857,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2955,11 +2921,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -3012,11 +2974,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { diff --git a/meilisearch-types/Cargo.toml b/meilisearch-types/Cargo.toml index 1d136c2c1..787737edb 100644 --- a/meilisearch-types/Cargo.toml +++ b/meilisearch-types/Cargo.toml @@ -12,6 +12,7 @@ either = { version = "1.6.1", features = ["serde"] } enum-iterator = "1.1.3" flate2 = "1.0.24" fst = "0.4.7" +memmap2 = "0.5.7" milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.37.2", default-features = false } proptest = { version = "1.0.0", optional = true } proptest-derive = { version = "0.3.0", optional = true } diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 42a37eb43..8357690cd 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,11 +1,15 @@ use std::borrow::Borrow; use std::fmt::{self, Debug, Display}; -use std::io::{self, BufReader, Read, Seek, Write}; +use std::fs::File; +use std::io::{self, Seek, Write}; +use std::marker::PhantomData; use either::Either; +use memmap2::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; use milli::Object; -use serde::Deserialize; +use serde::de::{SeqAccess, Visitor}; +use serde::{Deserialize, Deserializer}; use serde_json::error::Category; use crate::error::{Code, ErrorCode}; @@ -99,10 +103,10 @@ impl ErrorCode for DocumentFormatError { internal_error!(DocumentFormatError: io::Error); /// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { +pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - - let csv = csv::Reader::from_reader(input); + let mmap = unsafe { MmapOptions::new().map(file)? }; + let csv = csv::Reader::from_reader(mmap.as_ref()); builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; let count = builder.documents_count(); @@ -111,17 +115,56 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { Ok(count as usize) } -/// Reads JSON Lines from input and write an obkv batch to writer. -pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); +/// Reads JSON from temporary file and write an obkv batch to writer. +pub fn read_json(file: &File, writer: impl Write + Seek) -> Result { + read_json_inner(file, writer, PayloadType::Json) +} - for result in serde_json::Deserializer::from_reader(reader).into_iter() { - let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?; - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; +/// Reads JSON from temporary file and write an obkv batch to writer. +pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { + read_json_inner(file, writer, PayloadType::Ndjson) +} + +/// Reads JSON from temporary file and write an obkv batch to writer. +fn read_json_inner( + file: &File, + writer: impl Write + Seek, + payload_type: PayloadType, +) -> Result { + let mut builder = DocumentsBatchBuilder::new(writer); + let mmap = unsafe { MmapOptions::new().map(file)? }; + let mut deserializer = serde_json::Deserializer::from_slice(&mmap); + + match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { + // The json data has been successfully deserialised and does not need to be processed again. + // the data has been successfully transferred to the "update_file" during the deserialisation process. + // count ==0 means an empty array + Ok(Ok(count)) => { + if count == 0 { + return Ok(count as usize); + } + } + Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), + // Prefer deserialization as a json array. Failure to do deserialisation using the traditional method. + Err(_e) => { + #[derive(Deserialize, Debug)] + #[serde(transparent)] + struct ArrayOrSingleObject { + #[serde(with = "either::serde_untagged")] + inner: Either, Object>, + } + + let content: ArrayOrSingleObject = serde_json::from_reader(file) + .map_err(Error::Json) + .map_err(|e| (payload_type, e))?; + + for object in content.inner.map_right(|o| vec![o]).into_inner() { + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + } + } } let count = builder.documents_count(); @@ -130,30 +173,48 @@ pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result Ok(count as usize) } -/// Reads JSON from input and write an obkv batch to writer. -pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); +/** + * The actual handling of the deserialization process in the serde avoids storing the deserialized object in memory. + * Reference: + * https://serde.rs/stream-array.html + * https://github.com/serde-rs/json/issues/160 + */ +fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, + T: Deserialize<'de>, + F: FnMut(T) -> io::Result<()>, +{ + struct SeqVisitor(F, PhantomData); - #[derive(Deserialize, Debug)] - #[serde(transparent)] - struct ArrayOrSingleObject { - #[serde(with = "either::serde_untagged")] - inner: Either, Object>, + impl<'de, T, F> Visitor<'de> for SeqVisitor + where + T: Deserialize<'de>, + F: FnMut(T) -> io::Result<()>, + { + type Value = io::Result; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a nonempty sequence") + } + + fn visit_seq( + mut self, + mut seq: A, + ) -> std::result::Result, >::Error> + where + A: SeqAccess<'de>, + { + let mut max: u64 = 0; + while let Some(value) = seq.next_element::()? { + match self.0(value) { + Ok(()) => max += 1, + Err(e) => return Ok(Err(e)), + }; + } + Ok(Ok(max)) + } } - - let content: ArrayOrSingleObject = - serde_json::from_reader(reader).map_err(Error::Json).map_err(|e| (PayloadType::Json, e))?; - - for object in content.inner.map_right(|o| vec![o]).into_inner() { - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } - - let count = builder.documents_count(); - let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) + let visitor = SeqVisitor(f, PhantomData); + deserializer.deserialize_seq(visitor) } diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index e3cc396d7..53b16f9f5 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -95,6 +95,8 @@ pub enum PayloadError { MalformedPayload(serde_json::error::Error), #[error("A json payload is missing.")] MissingPayload, + #[error("Error while writing the playload to disk: `{0}`.")] + ReceivePayloadErr(Box), } impl ErrorCode for PayloadError { @@ -126,6 +128,7 @@ impl ErrorCode for PayloadError { }, PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload, + PayloadError::ReceivePayloadErr(_) => Code::Internal, } } } diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 0fe3cf102..5353c1506 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,4 +1,4 @@ -use std::io::{Cursor, ErrorKind}; +use std::io::ErrorKind; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; @@ -20,9 +20,13 @@ use once_cell::sync::Lazy; use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; +use tempfile::tempfile; +use tokio::fs::File; +use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter}; use crate::analytics::{Analytics, DocumentDeletionKind}; use crate::error::MeilisearchHttpError; +use crate::error::PayloadError::ReceivePayloadErr; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; @@ -227,26 +231,52 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; - // TODO: this can be slow, maybe we should spawn a thread? But the payload isn't Send+Sync :weary: - // push the entire stream into a `Vec`. - // If someone sends us a never ending stream we're going to block the thread. - // TODO: Maybe we should write it to a file to reduce the RAM consumption - // and then reread it to convert it to obkv? - let mut buffer = Vec::new(); + let temp_file = match tempfile() { + Ok(temp_file) => temp_file, + Err(e) => { + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); + } + }; + + let async_file = File::from_std(temp_file); + let mut buffer = BufWriter::new(async_file); + + let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { - buffer.extend_from_slice(&bytes?); + let byte = &bytes?; + + if byte.is_empty() && buffer_write_size == 0 { + return Err(MeilisearchHttpError::MissingPayload(format)); + } + + match buffer.write_all(byte).await { + Ok(()) => buffer_write_size += 1, + Err(e) => { + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); + } + }; } - if buffer.is_empty() { + + if let Err(e) = buffer.flush().await { + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); + } + + if buffer_write_size == 0 { return Err(MeilisearchHttpError::MissingPayload(format)); } - let reader = Cursor::new(buffer); + + if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); + }; + + let read_file = buffer.into_inner().into_std().await; let documents_count = tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> { let documents_count = match format { - PayloadType::Json => read_json(reader, update_file.as_file_mut())?, - PayloadType::Csv => read_csv(reader, update_file.as_file_mut())?, - PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?, + PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, + PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?, + PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, }; // we NEED to persist the file here because we moved the `udpate_file` in another task. update_file.persist()?; diff --git a/meilisearch/tests/documents/add_documents.rs b/meilisearch/tests/documents/add_documents.rs index 6f1fabeae..d7f5df58b 100644 --- a/meilisearch/tests/documents/add_documents.rs +++ b/meilisearch/tests/documents/add_documents.rs @@ -436,7 +436,7 @@ async fn error_add_malformed_ndjson_documents() { assert_eq!( response["message"], json!( - r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`."# + r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: trailing characters at line 2 column 1`."# ) ); assert_eq!(response["code"], json!("malformed_payload")); @@ -456,7 +456,7 @@ async fn error_add_malformed_ndjson_documents() { assert_eq!(status_code, 400); assert_eq!( response["message"], - json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`.") + json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: trailing characters at line 2 column 1`.") ); assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["type"], json!("invalid_request"));