diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8b05eec9a..8fafd99d9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1065,7 +1065,7 @@ impl IndexScheduler { #[cfg(test)] mod tests { - use std::io::{Seek, Write, BufWriter}; + use std::io::{BufWriter, Seek, Write}; use std::time::Instant; use big_s::S; @@ -1079,7 +1079,7 @@ mod tests { }; use meilisearch_types::tasks::IndexSwap; use meilisearch_types::VERSION_FILE_NAME; - use tempfile::{TempDir, NamedTempFile}; + use tempfile::{NamedTempFile, TempDir}; use time::Duration; use uuid::Uuid; use Breakpoint::*; @@ -1187,7 +1187,10 @@ mod tests { } /// Adapting to the new json reading interface - pub fn read_json(bytes: &[u8], write: impl Write + Seek) -> std::result::Result { + pub fn read_json( + bytes: &[u8], + write: impl Write + Seek, + ) -> std::result::Result { let temp_file = NamedTempFile::new().unwrap(); let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); buffer.write(bytes).unwrap(); @@ -1213,9 +1216,7 @@ mod tests { ); let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; (file, documents_count) } @@ -1595,9 +1596,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1634,9 +1633,7 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1803,9 +1800,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -1963,11 +1958,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2014,11 +2005,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2067,11 +2054,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2121,11 +2104,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2176,11 +2155,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2627,9 +2602,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2667,9 +2640,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = - read_json(content.as_bytes(), file.as_file_mut()) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2725,11 +2696,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2777,11 +2744,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2835,11 +2798,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2898,11 +2857,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -2966,11 +2921,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { @@ -3023,11 +2974,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; file.persist().unwrap(); index_scheduler .register(KindWithContent::DocumentAdditionOrUpdate { diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 1a3bbba99..8051689d6 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -1,4 +1,11 @@ -use std::io::{ErrorKind, BufWriter, Write}; +use crate::analytics::{Analytics, DocumentDeletionKind}; +use crate::error::MeilisearchHttpError; +use crate::error::PayloadError::ReceivePayloadErr; +use crate::extractors::authentication::policies::*; +use crate::extractors::authentication::GuardedData; +use crate::extractors::payload::Payload; +use crate::extractors::sequential_extractor::SeqHandler; +use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView}; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; @@ -6,7 +13,7 @@ use bstr::ByteSlice; use futures::StreamExt; use index_scheduler::IndexScheduler; use log::{debug, error}; -use meilisearch_types::document_formats::{read_csv, PayloadType, read_json, read_ndjson}; +use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::RoTxn; use meilisearch_types::index_uid::IndexUid; @@ -19,15 +26,8 @@ use once_cell::sync::Lazy; use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; +use std::io::{BufWriter, ErrorKind, Write}; use tempfile::NamedTempFile; -use crate::analytics::{Analytics, DocumentDeletionKind}; -use crate::error::MeilisearchHttpError; -use crate::error::PayloadError::ReceivePayloadErr; -use crate::extractors::authentication::policies::*; -use crate::extractors::authentication::GuardedData; -use crate::extractors::payload::Payload; -use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView}; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] @@ -227,14 +227,15 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; - let err: Result = Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); + let err: Result = + Err(MeilisearchHttpError::Payload(ReceivePayloadErr)); let temp_file = match NamedTempFile::new() { Ok(temp_file) => temp_file, Err(e) => { error!("create a temporary file error: {}", e); return err; - }, + } }; debug!("temp file path: {:?}", temp_file.as_ref()); let buffer_file = match temp_file.reopen() { @@ -245,21 +246,20 @@ async fn document_addition( } }; let mut buffer = BufWriter::new(buffer_file); - let mut buffer_write_size:usize = 0; + let mut buffer_write_size: usize = 0; while let Some(bytes) = body.next().await { match buffer.write(&bytes?) { - Ok(size) => buffer_write_size = buffer_write_size + size, + Ok(size) => buffer_write_size += size, Err(e) => { error!("bufWriter write error: {}", e); - return err + return err; } } - - }; + } if let Err(e) = buffer.flush() { error!("bufWriter flush error: {}", e); - return err + return err; }; if buffer_write_size == 0 { diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 6e19768a8..3ad8e20e0 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,18 +1,18 @@ -use std::borrow::Borrow; -use std::fmt::{self, Debug, Display}; -use std::fs::File; -use std::io::{self, Seek, Write}; -use std::marker::PhantomData; +use crate::error::{Code, ErrorCode}; +use crate::internal_error; use either::Either; use log::debug; use memmap::MmapOptions; use milli::documents::{DocumentsBatchBuilder, Error}; use milli::Object; -use serde::de::{Visitor, SeqAccess}; +use serde::de::{SeqAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::error::Category; -use crate::error::{Code, ErrorCode}; -use crate::internal_error; +use std::borrow::Borrow; +use std::fmt::{self, Debug, Display}; +use std::fs::File; +use std::io::{self, Seek, Write}; +use std::marker::PhantomData; type Result = std::result::Result; @@ -104,7 +104,7 @@ internal_error!(DocumentFormatError: io::Error); /// Reads CSV from input and write an obkv batch to writer. pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap()}; + let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; let csv = csv::Reader::from_reader(mmap.as_ref()); builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; @@ -125,15 +125,16 @@ pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { } /// Reads JSON from temporary file and write an obkv batch to writer. -fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: PayloadType) -> Result { +fn read_json_inner( + file: &File, + writer: impl Write + Seek, + payload_type: PayloadType, +) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mmap = unsafe { MmapOptions::new().map(file).unwrap()}; + let mmap = unsafe { MmapOptions::new().map(file).unwrap() }; let mut deserializer = serde_json::Deserializer::from_slice(&mmap); - match array_each(&mut deserializer, |obj: Object | { - builder - .append_json_object(&obj) - }) { + match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) { Ok(Ok(count)) => debug!("serde json array size: {}", count), Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))), Err(_e) => { @@ -145,8 +146,9 @@ fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: Payload inner: Either, Object>, } - let content: ArrayOrSingleObject = - serde_json::from_reader(file).map_err(Error::Json).map_err(|e| (payload_type, e))?; + let content: ArrayOrSingleObject = serde_json::from_reader(file) + .map_err(Error::Json) + .map_err(|e| (payload_type, e))?; for object in content.inner.map_right(|o| vec![o]).into_inner() { builder @@ -154,7 +156,7 @@ fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: Payload .map_err(Into::into) .map_err(DocumentFormatError::Internal)?; } - } + } } let count = builder.documents_count(); @@ -186,14 +188,17 @@ where formatter.write_str("a nonempty sequence") } - fn visit_seq(mut self, mut seq: A) -> std::result::Result, >::Error> + fn visit_seq( + mut self, + mut seq: A, + ) -> std::result::Result, >::Error> where A: SeqAccess<'de>, { let mut max: u64 = 0; while let Some(value) = seq.next_element::()? { match self.0(value) { - Ok(()) => max = max + 1, + Ok(()) => max += 1, Err(e) => return Ok(Err(e)), }; } @@ -202,4 +207,4 @@ where } let visitor = SeqVisitor(f, PhantomData); deserializer.deserialize_seq(visitor) -} \ No newline at end of file +} diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 9b9b2115c..5062fc4a0 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -324,9 +324,10 @@ impl Code { DuplicateIndexFound => { ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST) } - ReceivePayloadErr => { - ErrCode::internal("receive_payload_internal_exceptions", StatusCode::INTERNAL_SERVER_ERROR) - } + ReceivePayloadErr => ErrCode::internal( + "receive_payload_internal_exceptions", + StatusCode::INTERNAL_SERVER_ERROR, + ), } }