From 1f537e1b601af511a9aef79b2e68b0e670ab48ad Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 29 Sep 2021 10:17:52 +0200 Subject: [PATCH] jsonl support --- Cargo.lock | 1 + .../src/routes/indexes/documents.rs | 97 +++++++++++++++++-- meilisearch-lib/Cargo.toml | 2 +- meilisearch-lib/src/document_formats.rs | 27 +++--- meilisearch-lib/src/index/dump.rs | 4 +- .../index_controller/dump_actor/loaders/v1.rs | 6 +- meilisearch-lib/src/index_controller/mod.rs | 1 + .../src/index_controller/update_file_store.rs | 17 ++-- .../src/index_controller/updates/error.rs | 2 - .../src/index_controller/updates/mod.rs | 5 +- 10 files changed, 121 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7ab9dbe4..429eff063 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1779,6 +1779,7 @@ dependencies = [ [[package]] name = "milli" version = "0.16.0" +source = "git+https://github.com/meilisearch/milli.git?rev=f65153ad6454317213680e9a9a908ec78d5645a7#f65153ad6454317213680e9a9a908ec78d5645a7" dependencies = [ "bimap", "bincode", diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index ae66d439d..5f465c394 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -35,6 +35,7 @@ macro_rules! guard_content_type { guard_content_type!(guard_json, "application/json"); guard_content_type!(guard_csv, "application/csv"); +guard_content_type!(guard_ndjson, "application/ndjson"); fn empty_application_type(head: &actix_web::dev::RequestHead) -> bool { head.headers.get("Content-Type").is_none() @@ -61,16 +62,26 @@ pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") .route(web::get().to(get_all_documents)) - - .route(web::post().guard(empty_application_type).to(|| HttpResponse::UnsupportedMediaType())) + // replace documents routes + .route( + web::post() + .guard(empty_application_type) + .to(HttpResponse::UnsupportedMediaType), + ) .route(web::post().guard(guard_json).to(add_documents_json)) + .route(web::post().guard(guard_ndjson).to(add_documents_ndjson)) .route(web::post().guard(guard_csv).to(add_documents_csv)) - .route(web::post().to(|| HttpResponse::UnsupportedMediaType())) - - .route(web::put().guard(empty_application_type).to(|| HttpResponse::UnsupportedMediaType())) + .route(web::post().to(HttpResponse::UnsupportedMediaType)) + // update documents routes + .route( + web::put() + .guard(empty_application_type) + .to(HttpResponse::UnsupportedMediaType), + ) .route(web::put().guard(guard_json).to(update_documents_json)) + .route(web::put().guard(guard_ndjson).to(update_documents_ndjson)) .route(web::put().guard(guard_csv).to(update_documents_csv)) - .route(web::put().to(|| HttpResponse::UnsupportedMediaType())) + .route(web::put().to(HttpResponse::UnsupportedMediaType)) .route(web::delete().to(clear_all_documents)), ) // this route needs to be before the /documents/{document_id} to match properly @@ -160,7 +171,32 @@ pub async fn add_documents_json( params: web::Query, body: Payload, ) -> Result { - document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Json, IndexDocumentsMethod::ReplaceDocuments).await + document_addition( + meilisearch, + path, + params, + body, + DocumentAdditionFormat::Json, + IndexDocumentsMethod::ReplaceDocuments, + ) + .await +} + +pub async fn add_documents_ndjson( + meilisearch: GuardedData, + path: web::Path, + params: web::Query, + body: Payload, +) -> Result { + document_addition( + meilisearch, + path, + params, + body, + DocumentAdditionFormat::Ndjson, + IndexDocumentsMethod::ReplaceDocuments, + ) + .await } pub async fn add_documents_csv( @@ -169,7 +205,15 @@ pub async fn add_documents_csv( params: web::Query, body: Payload, ) -> Result { - document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Csv, IndexDocumentsMethod::ReplaceDocuments).await + document_addition( + meilisearch, + path, + params, + body, + DocumentAdditionFormat::Csv, + IndexDocumentsMethod::ReplaceDocuments, + ) + .await } pub async fn update_documents_json( @@ -178,7 +222,32 @@ pub async fn update_documents_json( params: web::Query, body: Payload, ) -> Result { - document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Json, IndexDocumentsMethod::UpdateDocuments).await + document_addition( + meilisearch, + path, + params, + body, + DocumentAdditionFormat::Json, + IndexDocumentsMethod::UpdateDocuments, + ) + .await +} + +pub async fn update_documents_ndjson( + meilisearch: GuardedData, + path: web::Path, + params: web::Query, + body: Payload, +) -> Result { + document_addition( + meilisearch, + path, + params, + body, + DocumentAdditionFormat::Ndjson, + IndexDocumentsMethod::UpdateDocuments, + ) + .await } pub async fn update_documents_csv( @@ -187,7 +256,15 @@ pub async fn update_documents_csv( params: web::Query, body: Payload, ) -> Result { - document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Csv, IndexDocumentsMethod::UpdateDocuments).await + document_addition( + meilisearch, + path, + params, + body, + DocumentAdditionFormat::Csv, + IndexDocumentsMethod::UpdateDocuments, + ) + .await } /// Route used when the payload type is "application/json" /// Used to add or replace documents diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index a52bd9e43..baf95d286 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -31,7 +31,7 @@ log = "0.4.14" meilisearch-error = { path = "../meilisearch-error" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } memmap = "0.7.0" -milli = { path = "../../milli/milli" } +milli = { git = "https://github.com/meilisearch/milli.git", rev = "f65153ad6454317213680e9a9a908ec78d5645a7"} mime = "0.3.16" num_cpus = "1.13.0" once_cell = "1.8.0" diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index f06a509c2..334b6f601 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -1,5 +1,5 @@ -use std::io::{self, Read, Result as IoResult, Seek, Write}; use std::fmt; +use std::io::{self, Read, Result as IoResult, Seek, Write}; use csv::{Reader as CsvReader, StringRecordsIntoIter}; use milli::documents::DocumentBatchBuilder; @@ -9,7 +9,7 @@ type Result = std::result::Result; #[derive(Debug)] pub enum PayloadType { - Jsonl, + Ndjson, Json, Csv, } @@ -17,7 +17,7 @@ pub enum PayloadType { impl fmt::Display for PayloadType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - PayloadType::Jsonl => write!(f, "ndjson"), + PayloadType::Ndjson => write!(f, "ndjson"), PayloadType::Json => write!(f, "json"), PayloadType::Csv => write!(f, "csv"), } @@ -56,14 +56,13 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<()> { Ok(()) } - /// read jsonl from input and write an obkv batch to writer. -pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> { +pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<()> { let mut builder = DocumentBatchBuilder::new(writer)?; let stream = Deserializer::from_reader(input).into_iter::>(); for value in stream { - let value = malformed!(PayloadType::Jsonl, value)?; + let value = malformed!(PayloadType::Ndjson, value)?; builder.add_documents(&value)?; } @@ -84,7 +83,6 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> { Ok(()) } - enum AllowedType { String, Number, @@ -141,12 +139,12 @@ impl Iterator for CsvDocumentIter { for ((field_name, field_type), value) in self.headers.iter().zip(csv_document.into_iter()) { - let parsed_value = (|| match field_type { - AllowedType::Number => malformed!(PayloadType::Csv, value - .parse::() - .map(Value::from)), + let parsed_value = match field_type { + AllowedType::Number => { + malformed!(PayloadType::Csv, value.parse::().map(Value::from)) + } AllowedType::String => Ok(Value::String(value.to_string())), - })(); + }; match parsed_value { Ok(value) => drop(document.insert(field_name.to_string(), value)), @@ -156,7 +154,10 @@ impl Iterator for CsvDocumentIter { Some(Ok(document)) } - Err(e) => Some(Err(DocumentFormatError::MalformedPayload(Box::new(e), PayloadType::Csv))), + Err(e) => Some(Err(DocumentFormatError::MalformedPayload( + Box::new(e), + PayloadType::Csv, + ))), } } } diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index f6e081760..4a769f136 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -9,7 +9,7 @@ use milli::documents::DocumentBatchReader; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::document_formats::read_jsonl; +use crate::document_formats::read_ndjson; use crate::index::update_handler::UpdateHandler; use crate::index::updates::apply_settings_to_builder; use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; @@ -142,7 +142,7 @@ impl Index { let mut tmp_doc_file = tempfile::tempfile()?; - read_jsonl(reader, &mut tmp_doc_file)?; + read_ndjson(reader, &mut tmp_doc_file)?; tmp_doc_file.seek(SeekFrom::Start(0))?; diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs index 840fd7ccc..1ad92dd56 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs @@ -11,7 +11,7 @@ use milli::update::Setting; use serde::{Deserialize, Deserializer, Serialize}; use uuid::Uuid; -use crate::document_formats::read_jsonl; +use crate::document_formats::read_ndjson; use crate::index::apply_settings_to_builder; use crate::index::update_handler::UpdateHandler; use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; @@ -124,7 +124,7 @@ fn load_index( let mut tmp_doc_file = tempfile::tempfile()?; - read_jsonl(reader, &mut tmp_doc_file)?; + read_ndjson(reader, &mut tmp_doc_file)?; tmp_doc_file.seek(SeekFrom::Start(0))?; @@ -213,7 +213,7 @@ impl From for index_controller::Settings { } } -// /// Extract Settings from `settings.json` file present at provided `dir_path` +/// Extract Settings from `settings.json` file present at provided `dir_path` fn import_settings(dir_path: impl AsRef) -> anyhow::Result { let path = dir_path.as_ref().join("settings.json"); let file = File::open(path)?; diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 438e5af3e..4938e7c8d 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -72,6 +72,7 @@ pub struct IndexController { pub enum DocumentAdditionFormat { Json, Csv, + Ndjson, } #[derive(Serialize, Debug)] diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index f7a7e3a1a..483fa80f8 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -10,7 +10,7 @@ use uuid::Uuid; const UPDATE_FILES_PATH: &str = "updates/updates_files"; -use crate::document_formats::read_jsonl; +use crate::document_formats::read_ndjson; pub struct UpdateFile { path: PathBuf, @@ -86,7 +86,7 @@ impl UpdateFileStore { .ok_or_else(|| anyhow::anyhow!("invalid update file name"))?; let dst_path = dst_update_files_path.join(file_uuid); let dst_file = BufWriter::new(File::create(dst_path)?); - read_jsonl(update_file, dst_file)?; + read_ndjson(update_file, dst_file)?; } Ok(()) @@ -98,9 +98,9 @@ impl UpdateFileStore { Ok(Self { path }) } - /// Created a new temporary update file. + /// Creates a new temporary update file. /// - /// A call to persist is needed to persist in the database. + /// A call to `persist` is needed to persist the file in the database. pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { let file = NamedTempFile::new()?; let uuid = Uuid::new_v4(); @@ -110,14 +110,14 @@ impl UpdateFileStore { Ok((uuid, update_file)) } - /// Returns a the file corresponding to the requested uuid. + /// Returns the file corresponding to the requested uuid. pub fn get_update(&self, uuid: Uuid) -> Result { let path = self.path.join(uuid.to_string()); let file = File::open(path)?; Ok(file) } - /// Copies the content of the update file poited to by uuid to dst directory. + /// Copies the content of the update file pointed to by `uuid` to the `dst` directory. pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { let src = self.path.join(uuid.to_string()); let mut dst = dst.as_ref().join(UPDATE_FILES_PATH); @@ -127,7 +127,7 @@ impl UpdateFileStore { Ok(()) } - /// Peform a dump of the given update file uuid into the provided snapshot path. + /// Peforms a dump of the given update file uuid into the provided dump path. pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { let uuid_string = uuid.to_string(); let update_file_path = self.path.join(&uuid_string); @@ -140,7 +140,8 @@ impl UpdateFileStore { let mut document_reader = DocumentBatchReader::from_reader(update_file)?; let mut document_buffer = Map::new(); - // TODO: we need to find a way to do this more efficiently. (create a custom serializer to + // TODO: we need to find a way to do this more efficiently. (create a custom serializer + // for // jsonl for example...) while let Some((index, document)) = document_reader.next_document_with_index()? { for (field_id, content) in document.iter() { diff --git a/meilisearch-lib/src/index_controller/updates/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs index 217567569..4948ea164 100644 --- a/meilisearch-lib/src/index_controller/updates/error.rs +++ b/meilisearch-lib/src/index_controller/updates/error.rs @@ -17,8 +17,6 @@ pub enum UpdateLoopError { UnexistingUpdate(u64), #[error("Internal error: {0}")] Internal(Box), - //#[error("{0}")] - //IndexActor(#[from] IndexActorError), #[error( "update store was shut down due to a fatal error, please check your logs for more info." )] diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 474f0e77e..c3b15e5af 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use self::error::{Result, UpdateLoopError}; pub use self::message::UpdateMsg; use self::store::{UpdateStore, UpdateStoreInfo}; -use crate::document_formats::{read_csv, read_json}; +use crate::document_formats::{read_csv, read_json, read_ndjson}; use crate::index::{Index, Settings, Unchecked}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; @@ -40,7 +40,7 @@ pub fn create_update_handler( let (sender, receiver) = mpsc::channel(100); let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?; - tokio::task::spawn_local(actor.run()); + tokio::task::spawn(actor.run()); Ok(sender) } @@ -197,6 +197,7 @@ impl UpdateLoop { match format { DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, + DocumentAdditionFormat::Ndjson => read_ndjson(reader, &mut *update_file)?, } update_file.persist()?;