diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 9b2a17ed0..c4965491c 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -224,8 +224,9 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); match format.as_str() { - "json" => builder.update_format(UpdateFormat::Json), "csv" => builder.update_format(UpdateFormat::Csv), + "json" => builder.update_format(UpdateFormat::Json), + "json-stream" => builder.update_format(UpdateFormat::JsonStream), otherwise => panic!("invalid update format {:?}", otherwise), }; @@ -491,6 +492,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let format = match update_format { UpdateFormat::Csv => String::from("csv"), UpdateFormat::Json => String::from("json"), + UpdateFormat::JsonStream => String::from("json-stream"), }; let meta = UpdateMeta::DocumentsAddition { method, format }; @@ -540,6 +542,23 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { ) }); + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let indexing_route_json_stream = warp::filters::method::post() + .and(warp::path!("documents")) + .and(warp::header::exact_ignore_case("content-type", "application/x-ndjson")) + .and(warp::filters::query::query()) + .and(warp::body::stream()) + .and_then(move |params: QueryUpdate, stream| { + buf_stream( + update_store_cloned.clone(), + update_status_sender_cloned.clone(), + params.method, + UpdateFormat::JsonStream, + stream, + ) + }); + let update_status_sender_cloned = update_status_sender.clone(); let clearing_route = warp::filters::method::post() .and(warp::path!("clear-documents")) @@ -595,6 +614,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .or(query_route) .or(indexing_route_csv) .or(indexing_route_json) + .or(indexing_route_json_stream) .or(clearing_route) .or(update_ws_route); diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 8a1571328..e79190679 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -187,6 +187,8 @@ pub enum UpdateFormat { Csv, /// The given update is a JSON array with documents inside. Json, + /// The given update is a JSON stream with a document on each line. + JsonStream, } pub struct IndexDocuments<'t, 'u, 'i> { @@ -306,6 +308,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { let output = match self.update_format { UpdateFormat::Csv => transform.from_csv(reader)?, UpdateFormat::Json => transform.from_json(reader)?, + UpdateFormat::JsonStream => transform.from_json_stream(reader)?, }; let TransformOutput { @@ -844,4 +847,30 @@ mod tests { assert_eq!(count, 0); drop(rtxn); } + + #[test] + fn json_stream_documents() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + + // First we send 3 documents with an id for only one of them. + let mut wtxn = index.write_txn().unwrap(); + let content = &br#" + { "name": "kevin" } + { "name": "kevina", "id": 21 } + { "name": "benoit" } + "#[..]; + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::JsonStream); + builder.execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is 3 documents now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + drop(rtxn); + } } diff --git a/src/update/index_documents/transform.rs b/src/update/index_documents/transform.rs index 1a4c599eb..3fd7170dd 100644 --- a/src/update/index_documents/transform.rs +++ b/src/update/index_documents/transform.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::convert::TryFrom; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; +use std::iter::Peekable; use anyhow::{anyhow, Context}; use fst::{IntoStreamer, Streamer}; @@ -24,6 +25,12 @@ pub struct TransformOutput { pub documents_file: File, } +/// Extract the users ids, deduplicate and compute the new internal documents ids +/// and fields ids, writing all the documents under their internal ids into a final file. +/// +/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, +/// the replaced documents ids, the number of documents in this update and the file +/// containing all those documents. pub struct Transform<'t, 'i> { pub rtxn: &'t heed::RoTxn<'i>, pub index: &'i Index, @@ -37,26 +44,41 @@ pub struct Transform<'t, 'i> { } impl Transform<'_, '_> { - /// Extract the users ids, deduplicate and compute the new internal documents ids - /// and fields ids, writing all the documents under their internal ids into a final file. - /// - /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, - /// the replaced documents ids, the number of documents in this update and the file - /// containing all those documents. pub fn from_json(self, reader: R) -> anyhow::Result { + self.from_generic_json(reader, false) + } + + pub fn from_json_stream(self, reader: R) -> anyhow::Result { + self.from_generic_json(reader, true) + } + + fn from_generic_json(self, reader: R, is_stream: bool) -> anyhow::Result { let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); let primary_key = self.index.primary_key(self.rtxn)?; // Deserialize the whole batch of documents in memory. - let documents: Vec> = serde_json::from_reader(reader)?; + let mut documents: Peekable>>>> = if is_stream { + let iter = serde_json::Deserializer::from_reader(reader).into_iter(); + let iter = Box::new(iter) as Box>; + iter.peekable() + } else { + let vec: Vec<_> = serde_json::from_reader(reader)?; + let iter = vec.into_iter().map(Ok); + let iter = Box::new(iter) as Box>; + iter.peekable() + }; // We extract the primary key from the first document in // the batch if it hasn't already been defined in the index. let primary_key = match primary_key { Some(primary_key) => primary_key, None => { - match documents.get(0).and_then(|doc| doc.keys().find(|k| k.contains("id"))) { + // We ignore a potential error here as we can't early return it now, + // the peek method gives us only a reference on the next item, + // we will eventually return it in the iteration just after. + let first = documents.peek().and_then(|r| r.as_ref().ok()); + match first.and_then(|doc| doc.keys().find(|k| k.contains("id"))) { Some(key) => fields_ids_map.insert(&key).context("field id limit reached")?, None => { if !self.autogenerate_docids { @@ -70,7 +92,7 @@ impl Transform<'_, '_> { }, }; - if documents.is_empty() { + if documents.peek().is_none() { return Ok(TransformOutput { primary_key, fields_ids_map, @@ -110,7 +132,9 @@ impl Transform<'_, '_> { let mut obkv_buffer = Vec::new(); let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; - for mut document in documents { + for result in documents { + let mut document = result?; + obkv_buffer.clear(); let mut writer = obkv::KvWriter::new(&mut obkv_buffer); @@ -155,12 +179,6 @@ impl Transform<'_, '_> { self.from_sorter(sorter, primary_key, fields_ids_map, users_ids_documents_ids) } - /// Extract the users ids, deduplicate and compute the new internal documents ids - /// and fields ids, writing all the documents under their internal ids into a final file. - /// - /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, - /// the replaced documents ids, the number of documents in this update and the file - /// containing all those documents. pub fn from_csv(self, reader: R) -> anyhow::Result { let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); @@ -261,8 +279,8 @@ impl Transform<'_, '_> { self.from_sorter(sorter, primary_key_field_id, fields_ids_map, users_ids_documents_ids) } - /// Generate the TransformOutput based on the given sorter that can be generated from any - /// format like CSV, JSON or JSON lines. This sorter must contain a key that is the document + /// Generate the `TransformOutput` based on the given sorter that can be generated from any + /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document /// id for the user side and the value must be an obkv where keys are valid fields ids. fn from_sorter( self,