From 38982d13fe6ec2dc0d23f3e9c71b7b229ab91073 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Wed, 30 Nov 2022 00:03:22 +0800 Subject: [PATCH 01/13] fix issue 3037 --- Cargo.lock | 12 ++ index-scheduler/src/lib.rs | 47 ++++--- meilisearch-http/src/error.rs | 3 + .../src/routes/indexes/documents.rs | 61 ++++++--- .../tests/documents/add_documents.rs | 4 +- meilisearch-types/Cargo.toml | 2 + meilisearch-types/src/document_formats.rs | 128 ++++++++++++------ meilisearch-types/src/error.rs | 4 + 8 files changed, 182 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01eb9cd36..6140979d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2376,7 +2376,9 @@ dependencies = [ "flate2", "fst", "insta", + "log", "meili-snap", + "memmap", "milli", "proptest", "proptest-derive", @@ -2396,6 +2398,16 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "memmap2" version = "0.5.7" diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 1807bdb40..0d84ccb62 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1024,18 +1024,20 @@ impl IndexScheduler { #[cfg(test)] mod tests { + use std::io::{Seek, Write, BufWriter}; use std::time::Instant; use big_s::S; use file_store::File; use meili_snap::snapshot; + use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::update::IndexDocumentsMethod::{ ReplaceDocuments, UpdateDocuments, }; use meilisearch_types::tasks::IndexSwap; use meilisearch_types::VERSION_FILE_NAME; - use tempfile::TempDir; + use tempfile::{TempDir, NamedTempFile}; use time::Duration; use uuid::Uuid; @@ -1128,6 +1130,15 @@ mod tests { } } + /// Adapting to the new json reading interface + pub fn read_json(bytes: &[u8], write: impl Write + Seek) -> std::result::Result { + let temp_file = NamedTempFile::new().unwrap(); + let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); + buffer.write(bytes).unwrap(); + buffer.flush().unwrap(); + meilisearch_types::document_formats::read_json(temp_file.as_file(), write) + } + /// Create an update file with the given file uuid. /// /// The update file contains just one simple document whose id is given by `document_id`. @@ -1147,7 +1158,7 @@ mod tests { let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap(); let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + read_json(content.as_bytes(), file.as_file_mut()) .unwrap() as u64; (file, documents_count) } @@ -1450,7 +1461,7 @@ mod tests { let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + read_json(content.as_bytes(), file.as_file_mut()) .unwrap() as u64; file.persist().unwrap(); index_scheduler @@ -1496,7 +1507,7 @@ mod tests { let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + read_json(content.as_bytes(), file.as_file_mut()) .unwrap() as u64; file.persist().unwrap(); index_scheduler @@ -1678,7 +1689,7 @@ mod tests { let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + read_json(content.as_bytes(), file.as_file_mut()) .unwrap() as u64; file.persist().unwrap(); index_scheduler @@ -1847,7 +1858,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -1902,7 +1913,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -1959,7 +1970,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2016,7 +2027,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2076,7 +2087,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2505,7 +2516,7 @@ mod tests { let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + read_json(content.as_bytes(), file.as_file_mut()) .unwrap() as u64; file.persist().unwrap(); index_scheduler @@ -2547,7 +2558,7 @@ mod tests { let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); let documents_count = - meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut()) + read_json(content.as_bytes(), file.as_file_mut()) .unwrap() as u64; file.persist().unwrap(); index_scheduler @@ -2596,7 +2607,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2645,7 +2656,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2708,7 +2719,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2773,7 +2784,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2845,7 +2856,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) @@ -2905,7 +2916,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( + let documents_count = read_json( content.as_bytes(), file.as_file_mut(), ) diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index e3cc396d7..fd18203fc 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -95,6 +95,8 @@ pub enum PayloadError { MalformedPayload(serde_json::error::Error), #[error("A json payload is missing.")] MissingPayload, + #[error("Exception when accepting a playload to a temporary file")] + ReceivePayloadErr, } impl ErrorCode for PayloadError { @@ -126,6 +128,7 @@ impl ErrorCode for PayloadError { }, PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload, + PayloadError::ReceivePayloadErr => Code::ReceivePayloadErr, } } } diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 0cdb11e8a..a58cf503f 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -1,13 +1,12 @@ -use std::io::{Cursor, ErrorKind}; - +use std::io::{ErrorKind, BufWriter, Write}; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; use bstr::ByteSlice; use futures::StreamExt; use index_scheduler::IndexScheduler; -use log::debug; -use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; +use log::{debug, error}; +use meilisearch_types::document_formats::{read_csv, PayloadType, read_json, read_ndjson}; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::RoTxn; use meilisearch_types::index_uid::IndexUid; @@ -20,9 +19,10 @@ use once_cell::sync::Lazy; use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; - +use tempfile::NamedTempFile; use crate::analytics::Analytics; use crate::error::MeilisearchHttpError; +use crate::error::PayloadError::ReceivePayloadErr; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; @@ -223,26 +223,51 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; - // TODO: this can be slow, maybe we should spawn a thread? But the payload isn't Send+Sync :weary: - // push the entire stream into a `Vec`. - // If someone sends us a never ending stream we're going to block the thread. - // TODO: Maybe we should write it to a file to reduce the RAM consumption - // and then reread it to convert it to obkv? - let mut buffer = Vec::new(); + let err: Result = Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + + let temp_file = match NamedTempFile::new() { + Ok(temp_file) => temp_file, + Err(e) => { + error!("create a temporary file error: {}", e); + return err; + }, + }; + debug!("temp file path: {:?}", temp_file.as_ref()); + let buffer_file = match temp_file.reopen() { + Ok(buffer_file) => buffer_file, + Err(e) => { + error!("reopen payload temporary file error: {}", e); + return err; + } + }; + let mut buffer = BufWriter::new(buffer_file); + let mut buffer_write_size:usize = 0; while let Some(bytes) = body.next().await { - buffer.extend_from_slice(&bytes?); - } - if buffer.is_empty() { + match buffer.write(&bytes?) { + Ok(size) => buffer_write_size = buffer_write_size + size, + Err(e) => { + error!("bufWriter write error: {}", e); + return err + } + } + + }; + + if let Err(e) = buffer.flush() { + error!("bufWriter flush error: {}", e); + return err + }; + + if buffer_write_size == 0 { return Err(MeilisearchHttpError::MissingPayload(format)); } - let reader = Cursor::new(buffer); let documents_count = tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> { let documents_count = match format { - PayloadType::Json => read_json(reader, update_file.as_file_mut())?, - PayloadType::Csv => read_csv(reader, update_file.as_file_mut())?, - PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?, + PayloadType::Json => read_json(temp_file.as_file(), update_file.as_file_mut())?, + PayloadType::Csv => read_csv(temp_file.as_file(), update_file.as_file_mut())?, + PayloadType::Ndjson => read_ndjson(temp_file.as_file(), update_file.as_file_mut())?, }; // we NEED to persist the file here because we moved the `udpate_file` in another task. update_file.persist()?; diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 8dd3ba39a..e6858473e 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -436,7 +436,7 @@ async fn error_add_malformed_ndjson_documents() { assert_eq!( response["message"], json!( - r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`."# + r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: trailing characters at line 2 column 1`."# ) ); assert_eq!(response["code"], json!("malformed_payload")); @@ -456,7 +456,7 @@ async fn error_add_malformed_ndjson_documents() { assert_eq!(status_code, 400); assert_eq!( response["message"], - json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`.") + json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: trailing characters at line 2 column 1`.") ); assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["type"], json!("invalid_request")); diff --git a/meilisearch-types/Cargo.toml b/meilisearch-types/Cargo.toml index 3b5346438..7b6d2640c 100644 --- a/meilisearch-types/Cargo.toml +++ b/meilisearch-types/Cargo.toml @@ -23,6 +23,8 @@ thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } tokio = "1.0" uuid = { version = "1.1.2", features = ["serde", "v4"] } +memmap = "0.7.0" +log = "0.4.17" [dev-dependencies] insta = "1.19.1" diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 42a37eb43..6e19768a8 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,13 +1,16 @@ use std::borrow::Borrow; use std::fmt::{self, Debug, Display}; -use std::io::{self, BufReader, Read, Seek, Write}; - +use std::fs::File; +use std::io::{self, Seek, Write}; +use std::marker::PhantomData; use either::Either; +use log::debug; +use memmap::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; use milli::Object; -use serde::Deserialize; +use serde::de::{Visitor, SeqAccess}; +use serde::{Deserialize, Deserializer}; use serde_json::error::Category; - use crate::error::{Code, ErrorCode}; use crate::internal_error; @@ -99,10 +102,10 @@ impl ErrorCode for DocumentFormatError { internal_error!(DocumentFormatError: io::Error); /// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { +pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - - let csv = csv::Reader::from_reader(input); + let mmap = unsafe { MmapOptions::new().map(file).unwrap()}; + let csv = csv::Reader::from_reader(mmap.as_ref()); builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; let count = builder.documents_count(); @@ -111,17 +114,47 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { Ok(count as usize) } -/// Reads JSON Lines from input and write an obkv batch to writer. -pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); +/// Reads JSON from temporary file and write an obkv batch to writer. +pub fn read_json(file: &File, writer: impl Write + Seek) -> Result { + read_json_inner(file, writer, PayloadType::Json) +} - for result in serde_json::Deserializer::from_reader(reader).into_iter() { - let object = result.map_err(Error::Json).map_err(|e| (PayloadType::Ndjson, e))?; +/// Reads JSON from temporary file and write an obkv batch to writer. +pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { + read_json_inner(file, writer, PayloadType::Ndjson) +} + +/// Reads JSON from temporary file and write an obkv batch to writer. +fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: PayloadType) -> Result { + let mut builder = DocumentsBatchBuilder::new(writer); + let mmap = unsafe { MmapOptions::new().map(file).unwrap()}; + let mut deserializer = serde_json::Deserializer::from_slice(&mmap); + + match array_each(&mut deserializer, |obj: Object | { builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; + .append_json_object(&obj) + }) { + Ok(Ok(count)) => debug!("serde json array size: {}", count), + Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), + Err(_e) => { + debug!("deserialize single json"); + #[derive(Deserialize, Debug)] + #[serde(transparent)] + struct ArrayOrSingleObject { + #[serde(with = "either::serde_untagged")] + inner: Either, Object>, + } + + let content: ArrayOrSingleObject = + serde_json::from_reader(file).map_err(Error::Json).map_err(|e| (payload_type, e))?; + + for object in content.inner.map_right(|o| vec![o]).into_inner() { + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + } + } } let count = builder.documents_count(); @@ -130,30 +163,43 @@ pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result Ok(count as usize) } -/// Reads JSON from input and write an obkv batch to writer. -pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); +/** + * https://serde.rs/stream-array.html + * https://github.com/serde-rs/json/issues/160 + */ +fn array_each<'de, D, T, F>(deserializer: D, f: F) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, + T: Deserialize<'de>, + F: FnMut(T) -> io::Result<()>, +{ + struct SeqVisitor(F, PhantomData); - #[derive(Deserialize, Debug)] - #[serde(transparent)] - struct ArrayOrSingleObject { - #[serde(with = "either::serde_untagged")] - inner: Either, Object>, + impl<'de, T, F> Visitor<'de> for SeqVisitor + where + T: Deserialize<'de>, + F: FnMut(T) -> io::Result<()>, + { + type Value = io::Result; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a nonempty sequence") + } + + fn visit_seq(mut self, mut seq: A) -> std::result::Result, >::Error> + where + A: SeqAccess<'de>, + { + let mut max: u64 = 0; + while let Some(value) = seq.next_element::()? { + match self.0(value) { + Ok(()) => max = max + 1, + Err(e) => return Ok(Err(e)), + }; + } + Ok(Ok(max)) + } } - - let content: ArrayOrSingleObject = - serde_json::from_reader(reader).map_err(Error::Json).map_err(|e| (PayloadType::Json, e))?; - - for object in content.inner.map_right(|o| vec![o]).into_inner() { - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } - - let count = builder.documents_count(); - let _ = builder.into_inner().map_err(Into::into).map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) -} + let visitor = SeqVisitor(f, PhantomData); + deserializer.deserialize_seq(visitor) +} \ No newline at end of file diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 330a6f082..0a852bfb4 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -164,6 +164,7 @@ pub enum Code { MissingContentType, MalformedPayload, MissingPayload, + ReceivePayloadErr, ApiKeyNotFound, MissingParameter, @@ -303,6 +304,9 @@ impl Code { DuplicateIndexFound => { ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST) } + ReceivePayloadErr => { + ErrCode::internal("receive_payload_internal_exceptions", StatusCode::INTERNAL_SERVER_ERROR) + } } } From bf96b6df9380bb4732b82b8acc5be16b925581ed Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Wed, 30 Nov 2022 17:59:06 +0800 Subject: [PATCH 02/13] clippy fix change --- index-scheduler/src/lib.rs | 99 +++++-------------- .../src/routes/indexes/documents.rs | 36 +++---- meilisearch-types/src/document_formats.rs | 47 +++++---- meilisearch-types/src/error.rs | 7 +- 4 files changed, 71 insertions(+), 118 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8b05eec9a..8fafd99d9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1065,7 +1065,7 @@ impl IndexScheduler { #[cfg(test)] mod tests { - use std::io::{Seek, Write, BufWriter}; + use std::io::{BufWriter, Seek, Write}; use std::time::Instant; use big_s::S; @@ -1079,7 +1079,7 @@ mod tests { }; use meilisearch_types::tasks::IndexSwap; use meilisearch_types::VERSION_FILE_NAME; - use tempfile::{TempDir, NamedTempFile}; + use tempfile::{NamedTempFile, TempDir}; use time::Duration; use uuid::Uuid; use Breakpoint::*; @@ -1187,7 +1187,10 @@ mod tests { } /// Adapting to the new json reading interface - pub fn read_json(bytes: &[u8], write: impl Write + Seek) -> std::result::Result { + pub fn read_json( + bytes: &[u8], + write: impl Write + Seek, + ) -> std::result::Result { let temp_file = NamedTempFile::new().unwrap(); let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); buffer.write(bytes).unwrap(); @@ -1213,9 +1216,7 @@ mod tests { ); let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; (file, documents_count) } @@ -1595,9 +1596,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1634,9 +1633,7 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1803,9 +1800,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1963,11 +1958,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2014,11 +2005,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2067,11 +2054,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2121,11 +2104,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2176,11 +2155,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2627,9 +2602,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2667,9 +2640,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2725,11 +2696,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2777,11 +2744,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2835,11 +2798,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2898,11 +2857,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2966,11 +2921,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -3023,11 +2974,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 1a3bbba99..8051689d6 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -1,4 +1,11 @@ -use std::io::{ErrorKind, BufWriter, Write}; +use crate::analytics::{Analytics, DocumentDeletionKind}; +use crate::error::MeilisearchHttpError; +use crate::error::PayloadError::ReceivePayloadErr; +use crate::extractors::authentication::policies::*; +use crate::extractors::authentication::GuardedData; +use crate::extractors::payload::Payload; +use crate::extractors::sequential_extractor::SeqHandler; +use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView}; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; @@ -6,7 +13,7 @@ use bstr::ByteSlice; use futures::StreamExt; use index_scheduler::IndexScheduler; use log::{debug, error}; -use meilisearch_types::document_formats::{read_csv, PayloadType, read_json, read_ndjson}; +use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::RoTxn; use meilisearch_types::index_uid::IndexUid; @@ -19,15 +26,8 @@ use once_cell::sync::Lazy; use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; +use std::io::{BufWriter, ErrorKind, Write}; use tempfile::NamedTempFile; -use crate::analytics::{Analytics, DocumentDeletionKind}; -use crate::error::MeilisearchHttpError; -use crate::error::PayloadError::ReceivePayloadErr; -use crate::extractors::authentication::policies::*; -use crate::extractors::authentication::GuardedData; -use crate::extractors::payload::Payload; -use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView}; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] @@ -227,14 +227,15 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; - let err: Result = Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + let err: Result = + Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); let temp_file = match NamedTempFile::new() { Ok(temp_file) => temp_file, Err(e) => { error!("create a temporary file error: {}", e); return err; - }, + } }; debug!("temp file path: {:?}", temp_file.as_ref()); let buffer_file = match temp_file.reopen() { @@ -245,21 +246,20 @@ async fn document_addition( } }; let mut buffer = BufWriter::new(buffer_file); - let mut buffer_write_size:usize = 0; + let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { match buffer.write(&bytes?) { - Ok(size) => buffer_write_size = buffer_write_size + size, + Ok(size) => buffer_write_size += size, Err(e) => { error!("bufWriter write error: {}", e); - return err + return err; } } - - }; + } if let Err(e) = buffer.flush() { error!("bufWriter flush error: {}", e); - return err + return err; }; if buffer_write_size == 0 { diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 6e19768a8..3ad8e20e0 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,18 +1,18 @@ -use std::borrow::Borrow; -use std::fmt::{self, Debug, Display}; -use std::fs::File; -use std::io::{self, Seek, Write}; -use std::marker::PhantomData; +use crate::error::{Code, ErrorCode}; +use crate::internal_error; use either::Either; use log::debug; use memmap::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; use milli::Object; -use serde::de::{Visitor, SeqAccess}; +use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::error::Category; -use crate::error::{Code, ErrorCode}; -use crate::internal_error; +use std::borrow::Borrow; +use std::fmt::{self, Debug, Display}; +use std::fs::File; +use std::io::{self, Seek, Write}; +use std::marker::PhantomData; type Result = std::result::Result; @@ -104,7 +104,7 @@ internal_error!(DocumentFormatError: io::Error); /// Reads CSV from input and write an obkv batch to writer. pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap()}; + let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; let csv = csv::Reader::from_reader(mmap.as_ref()); builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; @@ -125,15 +125,16 @@ pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { } /// Reads JSON from temporary file and write an obkv batch to writer. -fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: PayloadType) -> Result { +fn read_json_inner( + file: &File, + writer: impl Write + Seek, + payload_type: PayloadType, +) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap()}; + let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; let mut deserializer = serde_json::Deserializer::from_slice(&mmap); - match array_each(&mut deserializer, |obj: Object | { - builder - .append_json_object(&obj) - }) { + match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { Ok(Ok(count)) => debug!("serde json array size: {}", count), Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), Err(_e) => { @@ -145,8 +146,9 @@ fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: Payload inner: Either, Object>, } - let content: ArrayOrSingleObject = - serde_json::from_reader(file).map_err(Error::Json).map_err(|e| (payload_type, e))?; + let content: ArrayOrSingleObject = serde_json::from_reader(file) + .map_err(Error::Json) + .map_err(|e| (payload_type, e))?; for object in content.inner.map_right(|o| vec![o]).into_inner() { builder @@ -154,7 +156,7 @@ fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: Payload .map_err(Into::into) .map_err(DocumentFormatError::Internal)?; } - } + } } let count = builder.documents_count(); @@ -186,14 +188,17 @@ where formatter.write_str("a nonempty sequence") } - fn visit_seq(mut self, mut seq: A) -> std::result::Result, >::Error> + fn visit_seq( + mut self, + mut seq: A, + ) -> std::result::Result, >::Error> where A: SeqAccess<'de>, { let mut max: u64 = 0; while let Some(value) = seq.next_element::()? { match self.0(value) { - Ok(()) => max = max + 1, + Ok(()) => max += 1, Err(e) => return Ok(Err(e)), }; } @@ -202,4 +207,4 @@ where } let visitor = SeqVisitor(f, PhantomData); deserializer.deserialize_seq(visitor) -} \ No newline at end of file +} diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 9b9b2115c..5062fc4a0 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -324,9 +324,10 @@ impl Code { DuplicateIndexFound => { ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST) } - ReceivePayloadErr => { - ErrCode::internal("receive_payload_internal_exceptions", StatusCode::INTERNAL_SERVER_ERROR) - } + ReceivePayloadErr => ErrCode::internal( + "receive_payload_internal_exceptions", + StatusCode::INTERNAL_SERVER_ERROR, + ), } } From 7b08d700f77b0876d767211936d9fcd538c09f00 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Sat, 3 Dec 2022 18:52:20 +0800 Subject: [PATCH 03/13] requested changes fix --- Cargo.lock | 13 +----------- meilisearch-http/src/error.rs | 2 +- .../src/routes/indexes/documents.rs | 21 +++++++++---------- meilisearch-types/Cargo.toml | 3 +-- meilisearch-types/src/document_formats.rs | 20 ++++++++++++------ meilisearch-types/src/error.rs | 5 ----- 6 files changed, 27 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ba0d69d4..ecfc94666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2376,9 +2376,8 @@ dependencies = [ "flate2", "fst", "insta", - "log", "meili-snap", - "memmap", + "memmap2", "milli", "proptest", "proptest-derive", @@ -2398,16 +2397,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "memmap2" version = "0.5.7" diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index fd18203fc..13e01ac0a 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -128,7 +128,7 @@ impl ErrorCode for PayloadError { }, PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload, - PayloadError::ReceivePayloadErr => Code::ReceivePayloadErr, + PayloadError::ReceivePayloadErr => Code::Internal, } } } diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 8051689d6..821361814 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -26,8 +26,10 @@ use once_cell::sync::Lazy; use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; -use std::io::{BufWriter, ErrorKind, Write}; +use std::io::ErrorKind; use tempfile::NamedTempFile; +use tokio::fs::File; +use tokio::io::{AsyncWriteExt, BufWriter}; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] @@ -227,14 +229,11 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; - let err: Result = - Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); - let temp_file = match NamedTempFile::new() { Ok(temp_file) => temp_file, Err(e) => { error!("create a temporary file error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } }; debug!("temp file path: {:?}", temp_file.as_ref()); @@ -242,24 +241,24 @@ async fn document_addition( Ok(buffer_file) => buffer_file, Err(e) => { error!("reopen payload temporary file error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } }; - let mut buffer = BufWriter::new(buffer_file); + let mut buffer = BufWriter::new(File::from_std(buffer_file)); let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { - match buffer.write(&bytes?) { + match buffer.write(&bytes?).await { Ok(size) => buffer_write_size += size, Err(e) => { error!("bufWriter write error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } } } - if let Err(e) = buffer.flush() { + if let Err(e) = buffer.flush().await { error!("bufWriter flush error: {}", e); - return err; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); }; if buffer_write_size == 0 { diff --git a/meilisearch-types/Cargo.toml b/meilisearch-types/Cargo.toml index 92f32a4b5..7bc66a37e 100644 --- a/meilisearch-types/Cargo.toml +++ b/meilisearch-types/Cargo.toml @@ -12,6 +12,7 @@ either = { version = "1.6.1", features = ["serde"] } enum-iterator = "1.1.3" flate2 = "1.0.24" fst = "0.4.7" +memmap2 = "0.5.7" milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.37.0", default-features = false } proptest = { version = "1.0.0", optional = true } proptest-derive = { version = "0.3.0", optional = true } @@ -23,8 +24,6 @@ thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } tokio = "1.0" uuid = { version = "1.1.2", features = ["serde", "v4"] } -memmap = "0.7.0" -log = "0.4.17" [dev-dependencies] insta = "1.19.1" diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 3ad8e20e0..9180c0ea8 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,8 +1,7 @@ use crate::error::{Code, ErrorCode}; use crate::internal_error; use either::Either; -use log::debug; -use memmap::MmapOptions; +use memmap2::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; use milli::Object; use serde::de::{SeqAccess, Visitor}; @@ -104,7 +103,7 @@ internal_error!(DocumentFormatError: io::Error); /// Reads CSV from input and write an obkv batch to writer. pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; + let mmap = unsafe { MmapOptions::new().map(file)? }; let csv = csv::Reader::from_reader(mmap.as_ref()); builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; @@ -131,14 +130,21 @@ fn read_json_inner( payload_type: PayloadType, ) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; + let mmap = unsafe { MmapOptions::new().map(file)? }; let mut deserializer = serde_json::Deserializer::from_slice(&mmap); match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { - Ok(Ok(count)) => debug!("serde json array size: {}", count), + // The json data has been successfully deserialised and does not need to be processed again. + // the data has been successfully transferred to the "update_file" during the deserialisation process. + // count ==0 means an empty array + Ok(Ok(count)) => { + if count == 0 { + return Ok(count as usize); + } + } Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), + // Prefer deserialization as a json array. Failure to do deserialisation using the traditional method. Err(_e) => { - debug!("deserialize single json"); #[derive(Deserialize, Debug)] #[serde(transparent)] struct ArrayOrSingleObject { @@ -166,6 +172,8 @@ fn read_json_inner( } /** + * The actual handling of the deserialization process in the serde avoids storing the deserialized object in memory. + * Reference: * https://serde.rs/stream-array.html * https://github.com/serde-rs/json/issues/160 */ diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 5062fc4a0..5c0e1d9b8 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -169,7 +169,6 @@ pub enum Code { MissingContentType, MalformedPayload, MissingPayload, - ReceivePayloadErr, ApiKeyNotFound, MissingParameter, @@ -324,10 +323,6 @@ impl Code { DuplicateIndexFound => { ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST) } - ReceivePayloadErr => ErrCode::internal( - "receive_payload_internal_exceptions", - StatusCode::INTERNAL_SERVER_ERROR, - ), } } From 5a770ffe474a02c0d2888cea350613109ceeecdd Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Sat, 3 Dec 2022 22:48:38 +0800 Subject: [PATCH 04/13] test fail fix --- index-scheduler/src/lib.rs | 2 +- meilisearch-http/src/routes/indexes/documents.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8fafd99d9..a7dab68d1 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1193,7 +1193,7 @@ mod tests { ) -> std::result::Result { let temp_file = NamedTempFile::new().unwrap(); let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); - buffer.write(bytes).unwrap(); + buffer.write_all(bytes).unwrap(); buffer.flush().unwrap(); meilisearch_types::document_formats::read_json(temp_file.as_file(), write) } diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 821361814..c10d15e81 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -29,7 +29,7 @@ use serde_json::Value; use std::io::ErrorKind; use tempfile::NamedTempFile; use tokio::fs::File; -use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::io::AsyncWriteExt; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] @@ -244,16 +244,22 @@ async fn document_addition( return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } }; - let mut buffer = BufWriter::new(File::from_std(buffer_file)); + let mut buffer = File::from_std(buffer_file); let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { - match buffer.write(&bytes?).await { - Ok(size) => buffer_write_size += size, + let byte = &bytes?; + + if byte.is_empty() && buffer_write_size == 0 { + return Err(MeilisearchHttpError::MissingPayload(format)); + } + + match buffer.write_all(byte).await { + Ok(()) => buffer_write_size += 1, Err(e) => { error!("bufWriter write error: {}", e); return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); } - } + }; } if let Err(e) = buffer.flush().await { From 6bdd37beb8ece86a68811c71cd192e740b6ea4a4 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Sun, 4 Dec 2022 18:25:06 +0800 Subject: [PATCH 05/13] tokio file write update --- .../src/routes/indexes/documents.rs | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index c10d15e81..5788fc815 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -1,11 +1,3 @@ -use crate::analytics::{Analytics, DocumentDeletionKind}; -use crate::error::MeilisearchHttpError; -use crate::error::PayloadError::ReceivePayloadErr; -use crate::extractors::authentication::policies::*; -use crate::extractors::authentication::GuardedData; -use crate::extractors::payload::Payload; -use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView}; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; @@ -31,6 +23,15 @@ use tempfile::NamedTempFile; use tokio::fs::File; use tokio::io::AsyncWriteExt; +use crate::analytics::{Analytics, DocumentDeletionKind}; +use crate::error::MeilisearchHttpError; +use crate::error::PayloadError::ReceivePayloadErr; +use crate::extractors::authentication::policies::*; +use crate::extractors::authentication::GuardedData; +use crate::extractors::payload::Payload; +use crate::extractors::sequential_extractor::SeqHandler; +use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView}; + static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] }); @@ -262,11 +263,6 @@ async fn document_addition( }; } - if let Err(e) = buffer.flush().await { - error!("bufWriter flush error: {}", e); - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); - }; - if buffer_write_size == 0 { return Err(MeilisearchHttpError::MissingPayload(format)); } From 980776b6466e394b74072623adcf28a672f1ac1e Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Sun, 4 Dec 2022 22:31:23 +0800 Subject: [PATCH 06/13] test fail fix --- meilisearch-http/src/routes/indexes/documents.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 5788fc815..863b21650 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -263,6 +263,11 @@ async fn document_addition( }; } + if let Err(e) = buffer.flush().await { + error!("bufWriter flush error: {}", e); + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + }; + if buffer_write_size == 0 { return Err(MeilisearchHttpError::MissingPayload(format)); } From 6766712840580a29f07606bf4bc03d321f4435da Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Sun, 4 Dec 2022 23:05:34 +0800 Subject: [PATCH 07/13] fmt fix --- meilisearch-http/src/routes/indexes/documents.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 863b21650..61afed89b 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -263,9 +263,9 @@ async fn document_addition( }; } - if let Err(e) = buffer.flush().await { - error!("bufWriter flush error: {}", e); - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + if let Err(e) = buffer.flush().await { + error!("bufWriter flush error: {}", e); + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); }; if buffer_write_size == 0 { From 35f3dd68b6a0709ef19c01d1182e3aecb9a4a119 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Wed, 7 Dec 2022 16:20:36 +0800 Subject: [PATCH 08/13] error change and tokio file use change --- meilisearch-http/src/error.rs | 6 ++--- .../src/routes/indexes/documents.rs | 24 ++++++++----------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index 13e01ac0a..53b16f9f5 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -95,8 +95,8 @@ pub enum PayloadError { MalformedPayload(serde_json::error::Error), #[error("A json payload is missing.")] MissingPayload, - #[error("Exception when accepting a playload to a temporary file")] - ReceivePayloadErr, + #[error("Error while writing the playload to disk: `{0}`.")] + ReceivePayloadErr(Box), } impl ErrorCode for PayloadError { @@ -128,7 +128,7 @@ impl ErrorCode for PayloadError { }, PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload, - PayloadError::ReceivePayloadErr => Code::Internal, + PayloadError::ReceivePayloadErr(_) => Code::Internal, } } } diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 61afed89b..13c522999 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -4,7 +4,7 @@ use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; use bstr::ByteSlice; use futures::StreamExt; use index_scheduler::IndexScheduler; -use log::{debug, error}; +use log::debug; use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::RoTxn; @@ -233,19 +233,17 @@ async fn document_addition( let temp_file = match NamedTempFile::new() { Ok(temp_file) => temp_file, Err(e) => { - error!("create a temporary file error: {}", e); - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); } }; - debug!("temp file path: {:?}", temp_file.as_ref()); - let buffer_file = match temp_file.reopen() { - Ok(buffer_file) => buffer_file, + + let mut buffer = match File::create(&temp_file.as_ref()).await { + Ok(buffer) => buffer, Err(e) => { - error!("reopen payload temporary file error: {}", e); - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); } }; - let mut buffer = File::from_std(buffer_file); + let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { let byte = &bytes?; @@ -257,16 +255,14 @@ async fn document_addition( match buffer.write_all(byte).await { Ok(()) => buffer_write_size += 1, Err(e) => { - error!("bufWriter write error: {}", e); - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); } }; } if let Err(e) = buffer.flush().await { - error!("bufWriter flush error: {}", e); - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); - }; + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); + } if buffer_write_size == 0 { return Err(MeilisearchHttpError::MissingPayload(format)); From cb1d184904e474d178d0041bfe6b9ded0638b0b6 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Wed, 7 Dec 2022 17:04:24 +0800 Subject: [PATCH 09/13] fmt fix --- meilisearch-http/src/routes/indexes/documents.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 13c522999..926a93476 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -236,7 +236,7 @@ async fn document_addition( return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); } }; - + let mut buffer = match File::create(&temp_file.as_ref()).await { Ok(buffer) => buffer, Err(e) => { From 538030c2dad0f62edfc9672abe4c9baea5f38355 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Wed, 7 Dec 2022 22:47:32 +0800 Subject: [PATCH 10/13] change NameTempFile to tempfile() --- .../src/routes/indexes/documents.rs | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 926a93476..e0d9c9fed 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -19,9 +19,9 @@ use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; use std::io::ErrorKind; -use tempfile::NamedTempFile; +use tempfile::tempfile; use tokio::fs::File; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use crate::analytics::{Analytics, DocumentDeletionKind}; use crate::error::MeilisearchHttpError; @@ -230,19 +230,14 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; - let temp_file = match NamedTempFile::new() { + let temp_file = match tempfile() { Ok(temp_file) => temp_file, Err(e) => { return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); } }; - let mut buffer = match File::create(&temp_file.as_ref()).await { - Ok(buffer) => buffer, - Err(e) => { - return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); - } - }; + let mut buffer = File::from_std(temp_file); let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { @@ -268,12 +263,18 @@ async fn document_addition( return Err(MeilisearchHttpError::MissingPayload(format)); } + if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { + return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); + }; + + let read_file = buffer.into_std().await; + let documents_count = tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> { let documents_count = match format { - PayloadType::Json => read_json(temp_file.as_file(), update_file.as_file_mut())?, - PayloadType::Csv => read_csv(temp_file.as_file(), update_file.as_file_mut())?, - PayloadType::Ndjson => read_ndjson(temp_file.as_file(), update_file.as_file_mut())?, + PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, + PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?, + PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, }; // we NEED to persist the file here because we moved the `udpate_file` in another task. update_file.persist()?; From fa46dfb7bb7c0fb704b29a73469d7cf25317d994 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Mon, 12 Dec 2022 22:02:56 +0800 Subject: [PATCH 11/13] fmt fix --- meilisearch-types/src/document_formats.rs | 5 +++-- meilisearch/src/routes/indexes/documents.rs | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 9180c0ea8..c67e7ed08 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,5 +1,3 @@ -use crate::error::{Code, ErrorCode}; -use crate::internal_error; use either::Either; use memmap2::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; @@ -13,6 +11,9 @@ use std::fs::File; use std::io::{self, Seek, Write}; use std::marker::PhantomData; +use crate::error::{Code, ErrorCode}; +use crate::internal_error; + type Result = std::result::Result; #[derive(Debug)] diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index e0d9c9fed..0d895236e 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,3 +1,5 @@ +use std::io::ErrorKind; + use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; @@ -18,7 +20,6 @@ use once_cell::sync::Lazy; use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; -use std::io::ErrorKind; use tempfile::tempfile; use tokio::fs::File; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; From b1c3174061ceb1a632cf58fa0b6e2893a623458f Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Mon, 12 Dec 2022 22:06:24 +0800 Subject: [PATCH 12/13] fix fmt --- meilisearch-types/src/document_formats.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index c67e7ed08..8357690cd 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,3 +1,9 @@ +use std::borrow::Borrow; +use std::fmt::{self, Debug, Display}; +use std::fs::File; +use std::io::{self, Seek, Write}; +use std::marker::PhantomData; + use either::Either; use memmap2::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; @@ -5,11 +11,6 @@ use milli::Object; use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::error::Category; -use std::borrow::Borrow; -use std::fmt::{self, Debug, Display}; -use std::fs::File; -use std::io::{self, Seek, Write}; -use std::marker::PhantomData; use crate::error::{Code, ErrorCode}; use crate::internal_error; From 87ae0032bf6792545c24693dd50453c2be6b5ab5 Mon Sep 17 00:00:00 2001 From: jiangbo212 Date: Tue, 13 Dec 2022 10:41:43 +0800 Subject: [PATCH 13/13] review change --- meilisearch/src/routes/indexes/documents.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index e0d9c9fed..8f1ce357c 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -21,7 +21,7 @@ use serde_json::Value; use std::io::ErrorKind; use tempfile::tempfile; use tokio::fs::File; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter}; use crate::analytics::{Analytics, DocumentDeletionKind}; use crate::error::MeilisearchHttpError; @@ -237,7 +237,8 @@ async fn document_addition( } }; - let mut buffer = File::from_std(temp_file); + let async_file = File::from_std(temp_file); + let mut buffer = BufWriter::new(async_file); let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { @@ -267,7 +268,7 @@ async fn document_addition( return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); }; - let read_file = buffer.into_std().await; + let read_file = buffer.into_inner().into_std().await; let documents_count = tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> {