diff --git a/Cargo.lock b/Cargo.lock index 403d5210f..bddde66f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1763,13 +1763,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" [[package]] -name = "memmap" -version = "0.7.0" +name = "memmap2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +checksum = "4647a11b578fead29cdbb34d4adef8dd3dc35b876c9c6d5240d83f205abfe96e" dependencies = [ "libc", - "winapi", ] [[package]] @@ -1783,8 +1782,8 @@ dependencies = [ [[package]] name = "milli" -version = "0.17.2" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.3#1e8acaa20b323a198229ad8ede96d045072e45c8" +version = "0.19.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.19.0#d7943fe22553b8205b86c32a0f2656d9e42de351" dependencies = [ "bimap", "bincode", @@ -1793,6 +1792,7 @@ dependencies = [ "chrono", "concat-arrays", "crossbeam-channel", + "csv", "either", "flate2", "fst", @@ -1807,7 +1807,7 @@ dependencies = [ "log", "logging_timer", "meilisearch-tokenizer", - "memmap", + "memmap2", "obkv", "once_cell", "ordered-float", diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 698ee914b..429481539 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -30,7 +30,7 @@ lazy_static = "1.4.0" log = "0.4.14" meilisearch-error = { path = "../meilisearch-error" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.3" } +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.19.0" } mime = "0.3.16" num_cpus = "1.13.0" once_cell = "1.8.0" diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index 878d9465b..781ab63a0 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -1,10 +1,8 @@ use std::fmt; -use std::io::{self, Read, Result as IoResult, Seek, Write}; +use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Seek, Write}; -use csv::{Reader as CsvReader, StringRecordsIntoIter}; use meilisearch_error::{Code, ErrorCode}; use milli::documents::DocumentBatchBuilder; -use serde_json::{Deserializer, Map, Value}; type Result = std::result::Result; @@ -36,6 +34,15 @@ pub enum DocumentFormatError { ), } +impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError { + fn from((ty, error): (PayloadType, milli::documents::Error)) -> Self { + match error { + milli::documents::Error::Io(e) => Self::Internal(Box::new(e)), + e => Self::MalformedPayload(Box::new(e), ty), + } + } +} + impl ErrorCode for DocumentFormatError { fn error_code(&self) -> Code { match self { @@ -45,330 +52,47 @@ impl ErrorCode for DocumentFormatError { } } -internal_error!(DocumentFormatError: milli::documents::Error, io::Error); - -macro_rules! malformed { - ($type:path, $e:expr) => { - $e.map_err(|e| DocumentFormatError::MalformedPayload(Box::new(e), $type)) - }; -} +internal_error!(DocumentFormatError: io::Error); +/// reads csv from input and write an obkv batch to writer. pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<()> { - let mut builder = DocumentBatchBuilder::new(writer).unwrap(); - - let iter = CsvDocumentIter::from_reader(input)?; - for doc in iter { - let doc = doc?; - builder.add_documents(doc).unwrap(); - } - builder.finish().unwrap(); + let writer = BufWriter::new(writer); + DocumentBatchBuilder::from_csv(input, writer) + .map_err(|e| (PayloadType::Csv, e))? + .finish() + .map_err(|e| (PayloadType::Csv, e))?; Ok(()) } -/// read jsonl from input and write an obkv batch to writer. +/// reads jsonl from input and write an obkv batch to writer. pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<()> { - let mut builder = DocumentBatchBuilder::new(writer)?; - let stream = Deserializer::from_reader(input).into_iter::>(); + let mut reader = BufReader::new(input); + let writer = BufWriter::new(writer); - for value in stream { - let value = malformed!(PayloadType::Ndjson, value)?; - builder.add_documents(&value)?; + let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Ndjson, e))?; + let mut buf = String::new(); + + while reader.read_line(&mut buf)? > 0 { + builder + .extend_from_json(Cursor::new(&buf.as_bytes())) + .map_err(|e| (PayloadType::Ndjson, e))?; + buf.clear(); } - builder.finish()?; + builder.finish().map_err(|e| (PayloadType::Ndjson, e))?; Ok(()) } -/// read json from input and write an obkv batch to writer. +/// reads json from input and write an obkv batch to writer. pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> { - let mut builder = DocumentBatchBuilder::new(writer).unwrap(); - - let documents: Vec> = - malformed!(PayloadType::Json, serde_json::from_reader(input))?; - builder.add_documents(documents).unwrap(); - builder.finish().unwrap(); + let writer = BufWriter::new(writer); + let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Json, e))?; + builder + .extend_from_json(input) + .map_err(|e| (PayloadType::Json, e))?; + builder.finish().map_err(|e| (PayloadType::Json, e))?; Ok(()) } - -enum AllowedType { - String, - Number, -} - -fn parse_csv_header(header: &str) -> (String, AllowedType) { - // if there are several separators we only split on the last one. - match header.rsplit_once(':') { - Some((field_name, field_type)) => match field_type { - "string" => (field_name.to_string(), AllowedType::String), - "number" => (field_name.to_string(), AllowedType::Number), - // if the pattern isn't reconized, we keep the whole field. - _otherwise => (header.to_string(), AllowedType::String), - }, - None => (header.to_string(), AllowedType::String), - } -} - -pub struct CsvDocumentIter -where - R: Read, -{ - documents: StringRecordsIntoIter, - headers: Vec<(String, AllowedType)>, -} - -impl CsvDocumentIter { - pub fn from_reader(reader: R) -> IoResult { - let mut records = CsvReader::from_reader(reader); - - let headers = records - .headers()? - .into_iter() - .map(parse_csv_header) - .collect(); - - Ok(Self { - documents: records.into_records(), - headers, - }) - } -} - -impl Iterator for CsvDocumentIter { - type Item = Result>; - - fn next(&mut self) -> Option { - let csv_document = self.documents.next()?; - - match csv_document { - Ok(csv_document) => { - let mut document = Map::new(); - - for ((field_name, field_type), value) in - self.headers.iter().zip(csv_document.into_iter()) - { - let parsed_value = match field_type { - AllowedType::Number => { - malformed!(PayloadType::Csv, value.parse::().map(Value::from)) - } - AllowedType::String => Ok(Value::String(value.to_string())), - }; - - match parsed_value { - Ok(value) => drop(document.insert(field_name.to_string(), value)), - Err(e) => return Some(Err(e)), - } - } - - Some(Ok(document)) - } - Err(e) => Some(Err(DocumentFormatError::MalformedPayload( - Box::new(e), - PayloadType::Csv, - ))), - } - } -} - -#[cfg(test)] -mod test { - use serde_json::json; - - use super::*; - - #[test] - fn simple_csv_document() { - let documents = r#"city,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn coma_in_field() { - let documents = r#"city,country,pop -"Boston","United, States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United, States", - "pop": "4628910", - }) - ); - } - - #[test] - fn quote_in_field() { - let documents = r#"city,country,pop -"Boston","United"" States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United\" States", - "pop": "4628910", - }) - ); - } - - #[test] - fn integer_in_field() { - let documents = r#"city,country,pop:number -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": 4628910.0, - }) - ); - } - - #[test] - fn float_in_field() { - let documents = r#"city,country,pop:number -"Boston","United States","4628910.01""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": 4628910.01, - }) - ); - } - - #[test] - fn several_colon_in_header() { - let documents = r#"city:love:string,country:state,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:love": "Boston", - "country:state": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn ending_by_colon_in_header() { - let documents = r#"city:,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn starting_by_colon_in_header() { - let documents = r#":city,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - ":city": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[ignore] - #[test] - fn starting_by_colon_in_header2() { - let documents = r#":string,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn double_colon_in_header() { - let documents = r#"city::string,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn bad_type_in_header() { - let documents = r#"city,country:number,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn bad_column_count1() { - let documents = r#"city,country,pop -"Boston","United States","4628910", "too much""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn bad_column_count2() { - let documents = r#"city,country,pop -"Boston","United States""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } -} diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index db19c2d4c..c4a319c83 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -149,7 +149,7 @@ impl UpdateFileStore { // for jsonl for example...) while let Some((index, document)) = document_reader.next_document_with_index()? { for (field_id, content) in document.iter() { - if let Some(field_name) = index.get_by_left(&field_id) { + if let Some(field_name) = index.name(field_id) { let content = serde_json::from_slice(content)?; document_buffer.insert(field_name.to_string(), content); } diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index f106b87f3..07ceed92b 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -3,15 +3,13 @@ mod message; pub mod status; pub mod store; -use std::io::{self, BufRead, BufReader}; +use std::io::Cursor; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use actix_web::error::PayloadError; use async_stream::stream; -use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use log::trace; use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; @@ -51,48 +49,6 @@ where Ok(sender) } -/// A wrapper type to implement read on a `Stream>`. -struct StreamReader { - stream: S, - current: Option, -} - -impl StreamReader { - fn new(stream: S) -> Self { - Self { - stream, - current: None, - } - } -} - -impl> + Unpin> io::Read - for StreamReader -{ - fn read(&mut self, buf: &mut [u8]) -> io::Result { - // TODO: optimize buf filling - match self.current.take() { - Some(mut bytes) => { - let split_at = bytes.len().min(buf.len()); - let copied = bytes.split_to(split_at); - buf[..split_at].copy_from_slice(&copied); - if !bytes.is_empty() { - self.current.replace(bytes); - } - Ok(copied.len()) - } - None => match tokio::runtime::Handle::current().block_on(self.stream.next()) { - Some(Ok(bytes)) => { - self.current.replace(bytes); - self.read(buf) - } - Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)), - None => Ok(0), - }, - } - } -} - pub struct UpdateLoop { store: Arc, inbox: Option>, @@ -196,20 +152,28 @@ impl UpdateLoop { async fn handle_update(&self, index_uuid: Uuid, update: Update) -> Result { let registration = match update { Update::DocumentAddition { - payload, + mut payload, primary_key, method, format, } => { - let mut reader = BufReader::new(StreamReader::new(payload)); + let mut buffer = Vec::new(); + while let Some(bytes) = payload.next().await { + match bytes { + Ok(bytes) => { + buffer.extend_from_slice(&bytes); + } + Err(e) => return Err(e.into()), + } + } let (content_uuid, mut update_file) = self.update_file_store.new_update()?; tokio::task::spawn_blocking(move || -> Result<_> { // check if the payload is empty, and return an error - reader.fill_buf()?; - if reader.buffer().is_empty() { + if buffer.is_empty() { return Err(UpdateLoopError::MissingPayload(format)); } + let reader = Cursor::new(buffer); match format { DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index b232d11ea..c7ffca5d6 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -11,7 +11,7 @@ pub use index_controller::MeiliSearch; pub use milli; mod compression; -mod document_formats; +pub mod document_formats; use walkdir::WalkDir;