Refactor addition of documents: save update file as NDJson

This commit is contained in:
Loïc Lecrenier 2022-06-28 16:44:49 +02:00
parent 669a3ff85f
commit 5572f0c2c8
9 changed files with 33 additions and 47 deletions

8
Cargo.lock generated
View File

@ -1124,7 +1124,7 @@ dependencies = [
[[package]] [[package]]
name = "filter-parser" name = "filter-parser"
version = "0.29.3" version = "0.29.3"
source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2"
dependencies = [ dependencies = [
"nom", "nom",
"nom_locate", "nom_locate",
@ -1149,7 +1149,7 @@ dependencies = [
[[package]] [[package]]
name = "flatten-serde-json" name = "flatten-serde-json"
version = "0.29.3" version = "0.29.3"
source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2"
dependencies = [ dependencies = [
"serde_json", "serde_json",
] ]
@ -1662,7 +1662,7 @@ dependencies = [
[[package]] [[package]]
name = "json-depth-checker" name = "json-depth-checker"
version = "0.29.3" version = "0.29.3"
source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2"
dependencies = [ dependencies = [
"serde_json", "serde_json",
] ]
@ -2190,7 +2190,7 @@ dependencies = [
[[package]] [[package]]
name = "milli" name = "milli"
version = "0.29.3" version = "0.29.3"
source = "git+https://github.com/meilisearch/milli?branch=use-grenad-for-documents-batches#f51c30f50aaeaaa068a00018d4c5fad7b9f264a2" source = "git+https://github.com/meilisearch/milli?branch=add-documents-ndjson#cf1c5588ea95f78160ec83de2e95161652cc10e2"
dependencies = [ dependencies = [
"bimap", "bimap",
"bincode", "bincode",

View File

@ -8,7 +8,7 @@ base64 = "0.13.0"
enum-iterator = "0.7.0" enum-iterator = "0.7.0"
hmac = "0.12.1" hmac = "0.12.1"
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
milli = { git = "https://github.com/meilisearch/milli", branch = "use-grenad-for-documents-batches" } milli = { git = "https://github.com/meilisearch/milli", branch = 'add-documents-ndjson' }
rand = "0.8.4" rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] } serde = { version = "1.0.136", features = ["derive"] }
serde_json = { version = "1.0.79", features = ["preserve_order"] } serde_json = { version = "1.0.79", features = ["preserve_order"] }

View File

@ -83,17 +83,19 @@ pub fn configure_data(
web::JsonConfig::default() web::JsonConfig::default()
.content_type(|mime| mime == mime::APPLICATION_JSON) .content_type(|mime| mime == mime::APPLICATION_JSON)
.error_handler(|err, req: &HttpRequest| match err { .error_handler(|err, req: &HttpRequest| match err {
JsonPayloadError::ContentType => match req.headers().get(CONTENT_TYPE) { JsonPayloadError::ContentType => {
Some(content_type) => MeilisearchHttpError::InvalidContentType( match req.headers().get(CONTENT_TYPE) {
content_type.to_str().unwrap_or("unknown").to_string(), Some(content_type) => MeilisearchHttpError::InvalidContentType(
vec![mime::APPLICATION_JSON.to_string()], content_type.to_str().unwrap_or("unknown").to_string(),
) vec![mime::APPLICATION_JSON.to_string()],
.into(), )
None => MeilisearchHttpError::MissingContentType(vec![ .into(),
mime::APPLICATION_JSON.to_string(), None => MeilisearchHttpError::MissingContentType(vec![
]) mime::APPLICATION_JSON.to_string(),
.into(), ])
}, .into(),
}
}
err => PayloadError::from(err).into(), err => PayloadError::from(err).into(),
}), }),
) )

View File

@ -326,7 +326,7 @@ async fn error_add_malformed_json_documents() {
assert_eq!( assert_eq!(
response["message"], response["message"],
json!( json!(
r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...890123456789012345678901234567890123456789", expected a sequence at line 1 column 102`."# r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...7890123456789", expected a document, or a sequence of documents. at line 1 column 102`."#
) )
); );
assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["code"], json!("malformed_payload"));
@ -349,7 +349,7 @@ async fn error_add_malformed_json_documents() {
assert_eq!(status_code, 400); assert_eq!(status_code, 400);
assert_eq!( assert_eq!(
response["message"], response["message"],
json!("The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string \"0123456789012345678901234567...90123456789012345678901234567890123456789m\", expected a sequence at line 1 column 103`.") json!("The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string \"0123456789012345678901234567...890123456789m\", expected a document, or a sequence of documents. at line 1 column 103`.")
); );
assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["code"], json!("malformed_payload"));
assert_eq!(response["type"], json!("invalid_request")); assert_eq!(response["type"], json!("invalid_request"));

View File

@ -30,7 +30,7 @@ lazy_static = "1.4.0"
log = "0.4.14" log = "0.4.14"
meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
milli = { git = "https://github.com/meilisearch/milli", branch = "use-grenad-for-documents-batches" } milli = { git = "https://github.com/meilisearch/milli", branch = 'add-documents-ndjson' }
mime = "0.3.16" mime = "0.3.16"
num_cpus = "1.13.1" num_cpus = "1.13.1"
obkv = "0.2.0" obkv = "0.2.0"

View File

@ -124,17 +124,10 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(writer);
let reader = BufReader::new(input); let reader = BufReader::new(input);
let objects: Vec<_> = serde_json::from_reader(reader) builder
.map_err(Error::Json) .append_json(reader)
.map_err(|e| (PayloadType::Json, e))?; .map_err(|e| (PayloadType::Json, e))?;
for object in objects {
builder
.append_json_object(&object)
.map_err(Into::into)
.map_err(DocumentFormatError::Internal)?;
}
let count = builder.documents_count(); let count = builder.documents_count();
let _ = builder let _ = builder
.into_inner() .into_inner()

View File

@ -16,6 +16,7 @@ pub enum DumpError {
internal_error!( internal_error!(
DumpError: milli::heed::Error, DumpError: milli::heed::Error,
IndexResolverError,
std::io::Error, std::io::Error,
tokio::task::JoinError, tokio::task::JoinError,
tokio::sync::oneshot::error::RecvError, tokio::sync::oneshot::error::RecvError,

View File

@ -1,7 +1,7 @@
use meilisearch_auth::SearchRules; use meilisearch_auth::SearchRules;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt; use std::fmt;
use std::io::Cursor; use std::io::{BufWriter, Cursor};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
@ -392,6 +392,7 @@ where
} }
let (content_uuid, mut update_file) = self.update_file_store.new_update()?; let (content_uuid, mut update_file) = self.update_file_store.new_update()?;
let documents_count = tokio::task::spawn_blocking(move || -> Result<_> { let documents_count = tokio::task::spawn_blocking(move || -> Result<_> {
let writer = BufWriter::new(&mut *update_file);
// check if the payload is empty, and return an error // check if the payload is empty, and return an error
if buffer.is_empty() { if buffer.is_empty() {
return Err(IndexControllerError::MissingPayload(format)); return Err(IndexControllerError::MissingPayload(format));
@ -399,9 +400,9 @@ where
let reader = Cursor::new(buffer); let reader = Cursor::new(buffer);
let count = match format { let count = match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, DocumentAdditionFormat::Json => read_json(reader, writer)?,
DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, DocumentAdditionFormat::Csv => read_csv(reader, writer)?,
DocumentAdditionFormat::Ndjson => read_ndjson(reader, &mut *update_file)?, DocumentAdditionFormat::Ndjson => read_ndjson(reader, writer)?,
}; };
update_file.persist()?; update_file.persist()?;

View File

@ -4,7 +4,6 @@ use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use milli::documents::DocumentsBatchReader; use milli::documents::DocumentsBatchReader;
use serde_json::Map;
use tempfile::{NamedTempFile, PersistError}; use tempfile::{NamedTempFile, PersistError};
use uuid::Uuid; use uuid::Uuid;
@ -151,23 +150,13 @@ mod store {
let update_file = File::open(update_file_path)?; let update_file = File::open(update_file_path)?;
let mut dst_file = NamedTempFile::new_in(&dump_path)?; let mut dst_file = NamedTempFile::new_in(&dump_path)?;
let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor(); let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor();
let index = document_cursor.documents_batch_index().clone();
let mut document_buffer = Map::new(); let mut dst_file_buf_writer = BufWriter::new(&mut dst_file);
// TODO: we need to find a way to do this more efficiently. (create a custom serializer
// for jsonl for example...)
while let Some(document) = document_cursor.next_document()? { while let Some(document) = document_cursor.next_document()? {
for (field_id, content) in document.iter() { serde_json::to_writer(&mut dst_file_buf_writer, &document)?;
if let Some(field_name) = index.name(field_id) { dst_file_buf_writer.write_all(b"\n")?;
let content = serde_json::from_slice(content)?;
document_buffer.insert(field_name.to_string(), content);
}
}
serde_json::to_writer(&mut dst_file, &document_buffer)?;
dst_file.write_all(b"\n")?;
document_buffer.clear();
} }
drop(dst_file_buf_writer);
dst_file.persist(dst)?; dst_file.persist(dst)?;