From 5572f0c2c806f9c133c1d36ed673952b01824620 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Tue, 28 Jun 2022 16:44:49 +0200 Subject: [PATCH] Refactor addition of documents: save update file as NDJson --- Cargo.lock | 8 +++---- meilisearch-auth/Cargo.toml | 2 +- meilisearch-http/src/lib.rs | 24 ++++++++++--------- .../tests/documents/add_documents.rs | 4 ++-- meilisearch-lib/Cargo.toml | 2 +- meilisearch-lib/src/document_formats.rs | 11 ++------- meilisearch-lib/src/dump/error.rs | 1 + meilisearch-lib/src/index_controller/mod.rs | 9 +++---- meilisearch-lib/src/update_file_store.rs | 19 ++++----------- 9 files changed, 33 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 312699c9f..9275772b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1124,7 +1124,7 @@ dependencies = [ [[package]] name = "filter-parser" version = "0.29.3" -source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" +source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2" dependencies = [ "nom", "nom_locate", @@ -1149,7 +1149,7 @@ dependencies = [ [[package]] name = "flatten-serde-json" version = "0.29.3" -source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" +source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2" dependencies = [ "serde_json", ] @@ -1662,7 +1662,7 @@ dependencies = [ [[package]] name = "json-depth-checker" version = "0.29.3" -source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" +source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2" dependencies = [ "serde_json", ] @@ -2190,7 +2190,7 @@ dependencies = [ [[package]] name = "milli" version = "0.29.3" -source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" +source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2" dependencies = [ "bimap", "bincode", diff --git a/meilisearch-auth/Cargo.toml b/meilisearch-auth/Cargo.toml index 9a33a4d0b..a18d2736b 100644 --- a/meilisearch-auth/Cargo.toml +++ b/meilisearch-auth/Cargo.toml @@ -8,7 +8,7 @@ base64 = "0.13.0" enum-iterator = "0.7.0" hmac = "0.12.1" meilisearch-types = { path = "../meilisearch-types" } -milli = { git = "https://github.com/meilisearch/milli", branch = "use-grenad-for-documents-batches" } +milli = { git = "https://github.com/meilisearch/milli", branch = 'add-documents-ndjson' } rand = "0.8.4" serde = { version = "1.0.136", features = ["derive"] } serde_json = { version = "1.0.79", features = ["preserve_order"] } diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 91a984796..afae12d6d 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -83,17 +83,19 @@ pub fn configure_data( web::JsonConfig::default() .content_type(|mime| mime == mime::APPLICATION_JSON) .error_handler(|err, req: &HttpRequest| match err { - JsonPayloadError::ContentType => match req.headers().get(CONTENT_TYPE) { - Some(content_type) => MeilisearchHttpError::InvalidContentType( - content_type.to_str().unwrap_or("unknown").to_string(), - vec![mime::APPLICATION_JSON.to_string()], - ) - .into(), - None => MeilisearchHttpError::MissingContentType(vec![ - mime::APPLICATION_JSON.to_string(), - ]) - .into(), - }, + JsonPayloadError::ContentType => { + match req.headers().get(CONTENT_TYPE) { + Some(content_type) => MeilisearchHttpError::InvalidContentType( + content_type.to_str().unwrap_or("unknown").to_string(), + vec![mime::APPLICATION_JSON.to_string()], + ) + .into(), + None => MeilisearchHttpError::MissingContentType(vec![ + mime::APPLICATION_JSON.to_string(), + ]) + .into(), + } + } err => PayloadError::from(err).into(), }), ) diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 66aec798e..7c4df66f6 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -326,7 +326,7 @@ async fn error_add_malformed_json_documents() { assert_eq!( response["message"], json!( - r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...890123456789012345678901234567890123456789", expected a sequence at line 1 column 102`."# + r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...7890123456789", expected a document, or a sequence of documents. at line 1 column 102`."# ) ); assert_eq!(response["code"], json!("malformed_payload")); @@ -349,7 +349,7 @@ async fn error_add_malformed_json_documents() { assert_eq!(status_code, 400); assert_eq!( response["message"], - json!("The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string \"0123456789012345678901234567...90123456789012345678901234567890123456789m\", expected a sequence at line 1 column 103`.") + json!("The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string \"0123456789012345678901234567...890123456789m\", expected a document, or a sequence of documents. at line 1 column 103`.") ); assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["type"], json!("invalid_request")); diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 4d036b019..a701f839b 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -30,7 +30,7 @@ lazy_static = "1.4.0" log = "0.4.14" meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } -milli = { git = "https://github.com/meilisearch/milli", branch = "use-grenad-for-documents-batches" } +milli = { git = "https://github.com/meilisearch/milli", branch = 'add-documents-ndjson' } mime = "0.3.16" num_cpus = "1.13.1" obkv = "0.2.0" diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index 72e899845..a0db4e164 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -124,17 +124,10 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); let reader = BufReader::new(input); - let objects: Vec<_> = serde_json::from_reader(reader) - .map_err(Error::Json) + builder + .append_json(reader) .map_err(|e| (PayloadType::Json, e))?; - for object in objects { - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } - let count = builder.documents_count(); let _ = builder .into_inner() diff --git a/meilisearch-lib/src/dump/error.rs b/meilisearch-lib/src/dump/error.rs index 5afbf9244..7e4c58e66 100644 --- a/meilisearch-lib/src/dump/error.rs +++ b/meilisearch-lib/src/dump/error.rs @@ -16,6 +16,7 @@ pub enum DumpError { internal_error!( DumpError: milli::heed::Error, + IndexResolverError, std::io::Error, tokio::task::JoinError, tokio::sync::oneshot::error::RecvError, diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 88782c5ea..8b1be1226 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -1,7 +1,7 @@ use meilisearch_auth::SearchRules; use std::collections::BTreeMap; use std::fmt; -use std::io::Cursor; +use std::io::{BufWriter, Cursor}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; @@ -392,6 +392,7 @@ where } let (content_uuid, mut update_file) = self.update_file_store.new_update()?; let documents_count = tokio::task::spawn_blocking(move || -> Result<_> { + let writer = BufWriter::new(&mut *update_file); // check if the payload is empty, and return an error if buffer.is_empty() { return Err(IndexControllerError::MissingPayload(format)); @@ -399,9 +400,9 @@ where let reader = Cursor::new(buffer); let count = match format { - DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, - DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, - DocumentAdditionFormat::Ndjson => read_ndjson(reader, &mut *update_file)?, + DocumentAdditionFormat::Json => read_json(reader, writer)?, + DocumentAdditionFormat::Csv => read_csv(reader, writer)?, + DocumentAdditionFormat::Ndjson => read_ndjson(reader, writer)?, }; update_file.persist()?; diff --git a/meilisearch-lib/src/update_file_store.rs b/meilisearch-lib/src/update_file_store.rs index d4c50c447..c9c8eedbd 100644 --- a/meilisearch-lib/src/update_file_store.rs +++ b/meilisearch-lib/src/update_file_store.rs @@ -4,7 +4,6 @@ use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use milli::documents::DocumentsBatchReader; -use serde_json::Map; use tempfile::{NamedTempFile, PersistError}; use uuid::Uuid; @@ -151,23 +150,13 @@ mod store { let update_file = File::open(update_file_path)?; let mut dst_file = NamedTempFile::new_in(&dump_path)?; let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor(); - let index = document_cursor.documents_batch_index().clone(); - let mut document_buffer = Map::new(); - // TODO: we need to find a way to do this more efficiently. (create a custom serializer - // for jsonl for example...) + let mut dst_file_buf_writer = BufWriter::new(&mut dst_file); while let Some(document) = document_cursor.next_document()? { - for (field_id, content) in document.iter() { - if let Some(field_name) = index.name(field_id) { - let content = serde_json::from_slice(content)?; - document_buffer.insert(field_name.to_string(), content); - } - } - - serde_json::to_writer(&mut dst_file, &document_buffer)?; - dst_file.write_all(b"\n")?; - document_buffer.clear(); + serde_json::to_writer(&mut dst_file_buf_writer, &document)?; + dst_file_buf_writer.write_all(b"\n")?; } + drop(dst_file_buf_writer); dst_file.persist(dst)?;