requested changes fix

This commit is contained in:
jiangbo212 2022-12-03 18:52:20 +08:00
parent c63748723d
commit 7b08d700f7
6 changed files with 27 additions and 37 deletions

13
Cargo.lock generated
View File

@ -2376,9 +2376,8 @@ dependencies = [
"flate2", "flate2",
"fst", "fst",
"insta", "insta",
"log",
"meili-snap", "meili-snap",
"memmap", "memmap2",
"milli", "milli",
"proptest", "proptest",
"proptest-derive", "proptest-derive",
@ -2398,16 +2397,6 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "memmap2" name = "memmap2"
version = "0.5.7" version = "0.5.7"

View File

@ -128,7 +128,7 @@ impl ErrorCode for PayloadError {
}, },
PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MissingPayload => Code::MissingPayload,
PayloadError::MalformedPayload(_) => Code::MalformedPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload,
PayloadError::ReceivePayloadErr => Code::ReceivePayloadErr, PayloadError::ReceivePayloadErr => Code::Internal,
} }
} }
} }

View File

@ -26,8 +26,10 @@ use once_cell::sync::Lazy;
use serde::Deserialize; use serde::Deserialize;
use serde_cs::vec::CS; use serde_cs::vec::CS;
use serde_json::Value; use serde_json::Value;
use std::io::{BufWriter, ErrorKind, Write}; use std::io::ErrorKind;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| { static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
@ -227,14 +229,11 @@ async fn document_addition(
let (uuid, mut update_file) = index_scheduler.create_update_file()?; let (uuid, mut update_file) = index_scheduler.create_update_file()?;
let err: Result<SummarizedTaskView, MeilisearchHttpError> =
Err(MeilisearchHttpError::Payload(ReceivePayloadErr));
let temp_file = match NamedTempFile::new() { let temp_file = match NamedTempFile::new() {
Ok(temp_file) => temp_file, Ok(temp_file) => temp_file,
Err(e) => { Err(e) => {
error!("create a temporary file error: {}", e); error!("create a temporary file error: {}", e);
return err; return Err(MeilisearchHttpError::Payload(ReceivePayloadErr));
} }
}; };
debug!("temp file path: {:?}", temp_file.as_ref()); debug!("temp file path: {:?}", temp_file.as_ref());
@ -242,24 +241,24 @@ async fn document_addition(
Ok(buffer_file) => buffer_file, Ok(buffer_file) => buffer_file,
Err(e) => { Err(e) => {
error!("reopen payload temporary file error: {}", e); error!("reopen payload temporary file error: {}", e);
return err; return Err(MeilisearchHttpError::Payload(ReceivePayloadErr));
} }
}; };
let mut buffer = BufWriter::new(buffer_file); let mut buffer = BufWriter::new(File::from_std(buffer_file));
let mut buffer_write_size: usize = 0; let mut buffer_write_size: usize = 0;
while let Some(bytes) = body.next().await { while let Some(bytes) = body.next().await {
match buffer.write(&bytes?) { match buffer.write(&bytes?).await {
Ok(size) => buffer_write_size += size, Ok(size) => buffer_write_size += size,
Err(e) => { Err(e) => {
error!("bufWriter write error: {}", e); error!("bufWriter write error: {}", e);
return err; return Err(MeilisearchHttpError::Payload(ReceivePayloadErr));
} }
} }
} }
if let Err(e) = buffer.flush() { if let Err(e) = buffer.flush().await {
error!("bufWriter flush error: {}", e); error!("bufWriter flush error: {}", e);
return err; return Err(MeilisearchHttpError::Payload(ReceivePayloadErr));
}; };
if buffer_write_size == 0 { if buffer_write_size == 0 {

View File

@ -12,6 +12,7 @@ either = { version = "1.6.1", features = ["serde"] }
enum-iterator = "1.1.3" enum-iterator = "1.1.3"
flate2 = "1.0.24" flate2 = "1.0.24"
fst = "0.4.7" fst = "0.4.7"
memmap2 = "0.5.7"
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.37.0", default-features = false } milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.37.0", default-features = false }
proptest = { version = "1.0.0", optional = true } proptest = { version = "1.0.0", optional = true }
proptest-derive = { version = "0.3.0", optional = true } proptest-derive = { version = "0.3.0", optional = true }
@ -23,8 +24,6 @@ thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] }
tokio = "1.0" tokio = "1.0"
uuid = { version = "1.1.2", features = ["serde", "v4"] } uuid = { version = "1.1.2", features = ["serde", "v4"] }
memmap = "0.7.0"
log = "0.4.17"
[dev-dependencies] [dev-dependencies]
insta = "1.19.1" insta = "1.19.1"

View File

@ -1,8 +1,7 @@
use crate::error::{Code, ErrorCode}; use crate::error::{Code, ErrorCode};
use crate::internal_error; use crate::internal_error;
use either::Either; use either::Either;
use log::debug; use memmap2::MmapOptions;
use memmap::MmapOptions;
use milli::documents::{DocumentsBatchBuilder, Error}; use milli::documents::{DocumentsBatchBuilder, Error};
use milli::Object; use milli::Object;
use serde::de::{SeqAccess, Visitor}; use serde::de::{SeqAccess, Visitor};
@ -104,7 +103,7 @@ internal_error!(DocumentFormatError: io::Error);
/// Reads CSV from input and write an obkv batch to writer. /// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> { pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; let mmap = unsafe { MmapOptions::new().map(file)? };
let csv = csv::Reader::from_reader(mmap.as_ref()); let csv = csv::Reader::from_reader(mmap.as_ref());
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
@ -131,14 +130,21 @@ fn read_json_inner(
payload_type: PayloadType, payload_type: PayloadType,
) -> Result<usize> { ) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; let mmap = unsafe { MmapOptions::new().map(file)? };
let mut deserializer = serde_json::Deserializer::from_slice(&mmap); let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) {
Ok(Ok(count)) => debug!("serde json array size: {}", count), // The json data has been successfully deserialised and does not need to be processed again.
// the data has been successfully transferred to the "update_file" during the deserialisation process.
// count ==0 means an empty array
Ok(Ok(count)) => {
if count == 0 {
return Ok(count as usize);
}
}
Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))),
// Prefer deserialization as a json array. Failure to do deserialisation using the traditional method.
Err(_e) => { Err(_e) => {
debug!("deserialize single json");
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(transparent)] #[serde(transparent)]
struct ArrayOrSingleObject { struct ArrayOrSingleObject {
@ -166,6 +172,8 @@ fn read_json_inner(
} }
/** /**
* The actual handling of the deserialization process in the serde avoids storing the deserialized object in memory.
* Reference:
* https://serde.rs/stream-array.html * https://serde.rs/stream-array.html
* https://github.com/serde-rs/json/issues/160 * https://github.com/serde-rs/json/issues/160
*/ */

View File

@ -169,7 +169,6 @@ pub enum Code {
MissingContentType, MissingContentType,
MalformedPayload, MalformedPayload,
MissingPayload, MissingPayload,
ReceivePayloadErr,
ApiKeyNotFound, ApiKeyNotFound,
MissingParameter, MissingParameter,
@ -324,10 +323,6 @@ impl Code {
DuplicateIndexFound => { DuplicateIndexFound => {
ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST) ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST)
} }
ReceivePayloadErr => ErrCode::internal(
"receive_payload_internal_exceptions",
StatusCode::INTERNAL_SERVER_ERROR,
),
} }
} }