From 1a38bfd31fcf482f0b479fe3b6bfad809bb44ea7 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 23 Dec 2020 13:52:28 +0100 Subject: [PATCH] data add documents --- Cargo.lock | 41 ++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 5 ++++- src/data.rs | 40 ++++++++++++++++++++++++++++++++++++++-- src/routes/document.rs | 4 ++-- src/updates/mod.rs | 35 ++++++++++++++++------------------- 5 files changed, 100 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 610c83318..451b8efc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,6 +332,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1ff21a63d3262af46b9f33a826a8d134e2d0d9b2179c86034948b732ea8b2a" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite 0.1.11", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.42" @@ -1530,6 +1543,7 @@ dependencies = [ "actix-web", "anyhow", "assert-json-diff", + "async-compression", "byte-unit", "bytes 0.6.0", "chrono", @@ -1537,6 +1551,7 @@ dependencies = [ "env_logger 0.8.2", "flate2", "futures", + "futures-util", "grenad", "heed", "http", @@ -1545,6 +1560,7 @@ dependencies = [ "log", "main_error", "meilisearch-error", + "memmap", "milli", "mime", "once_cell", @@ -1680,12 +1696,24 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio-named-pipes" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" +dependencies = [ + "log", + "mio", + "miow 0.3.6", + "winapi 0.3.9", +] + [[package]] name = "mio-uds" version = "0.6.8" @@ -1709,6 +1737,16 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" +dependencies = [ + "socket2", + "winapi 0.3.9", +] + [[package]] name = "near-proximity" version = "0.1.0" @@ -2895,6 +2933,7 @@ dependencies = [ "libc", "memchr", "mio", + "mio-named-pipes", "mio-uds", "num_cpus", "pin-project-lite 0.1.11", diff --git a/Cargo.toml b/Cargo.toml index cc25f0c8c..c6dd1db76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ actix-rt = "1" actix-service = "1.0.6" actix-web = { version = "3.3.2", features = ["rustls"] } anyhow = "1.0.36" +async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] } byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } bytes = "0.6.0" chrono = { version = "0.4.19", features = ["serde"] } @@ -49,10 +50,12 @@ slice-group-by = "0.2.6" structopt = "0.3.20" tar = "0.4.29" tempfile = "3.1.0" -tokio = "*" +tokio = { version = "*", features = ["full"] } ureq = { version = "1.5.1", default-features = false, features = ["tls"] } walkdir = "2.3.1" whoami = "1.0.0" +futures-util = "0.3.8" +memmap = "0.7.0" [dependencies.sentry] default-features = false diff --git a/src/data.rs b/src/data.rs index 621d8cf2b..c53042b8e 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,11 +1,15 @@ use std::ops::Deref; use std::sync::Arc; -use sha2::Digest; +use async_compression::tokio_02::write::GzipEncoder; +use futures_util::stream::StreamExt; +use tokio::io::AsyncWriteExt; use milli::Index; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; +use sha2::Digest; use crate::option::Opt; -use crate::updates::UpdateQueue; +use crate::updates::{UpdateQueue, UpdateMeta, UpdateStatus, UpdateMetaProgress}; #[derive(Clone)] pub struct Data { @@ -75,11 +79,43 @@ impl Data { Ok(Data { inner }) } + pub async fn add_documents( + &self, + method: IndexDocumentsMethod, + format: UpdateFormat, + mut stream: impl futures::Stream> + Unpin, + ) -> anyhow::Result> + where + B: Deref, + E: std::error::Error + Send + Sync + 'static, + { + let file = tokio::task::block_in_place(tempfile::tempfile)?; + let file = tokio::fs::File::from_std(file); + let mut encoder = GzipEncoder::new(file); + + while let Some(result) = stream.next().await { + let bytes = &*result?; + encoder.write_all(&bytes[..]).await?; + } + + encoder.shutdown().await?; + let mut file = encoder.into_inner(); + file.sync_all().await?; + let file = file.into_std().await; + let mmap = unsafe { memmap::Mmap::map(&file)? }; + + let meta = UpdateMeta::DocumentsAddition { method, format }; + let update_id = tokio::task::block_in_place(|| self.update_queue.register_update(&meta, &mmap[..]))?; + + Ok(UpdateStatus::Pending { update_id, meta }) + } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize } + #[inline] pub fn api_keys(&self) -> &ApiKeys { &self.api_keys } diff --git a/src/routes/document.rs b/src/routes/document.rs index 3dfa36880..bdd9a8336 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -93,10 +93,10 @@ async fn update_multiple_documents( #[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] async fn add_documents( - _data: web::Data, + data: web::Data, _path: web::Path, _params: web::Query, - _body: web::Json>, + body: web::Json>, ) -> Result { todo!() } diff --git a/src/updates/mod.rs b/src/updates/mod.rs index 30490ec43..00faa4d85 100644 --- a/src/updates/mod.rs +++ b/src/updates/mod.rs @@ -4,6 +4,7 @@ pub use settings::{Settings, Facets}; use std::io; use std::sync::Arc; +use std::ops::Deref; use anyhow::Result; use flate2::read::GzDecoder; @@ -20,8 +21,8 @@ use crate::option::Opt; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] -enum UpdateMeta { - DocumentsAddition { method: String, format: String }, +pub enum UpdateMeta { + DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat }, ClearDocuments, Settings(Settings), Facets(Facets), @@ -29,7 +30,7 @@ enum UpdateMeta { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] -enum UpdateMetaProgress { +pub enum UpdateMetaProgress { DocumentsAddition { step: usize, total_steps: usize, @@ -41,7 +42,7 @@ enum UpdateMetaProgress { #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] #[allow(dead_code)] -enum UpdateStatus { +pub enum UpdateStatus { Pending { update_id: u64, meta: M }, Progressing { update_id: u64, meta: P }, Processed { update_id: u64, meta: N }, @@ -53,6 +54,13 @@ pub struct UpdateQueue { inner: Arc>, } +impl Deref for UpdateQueue { + type Target = Arc>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} #[derive(Debug, Clone, StructOpt)] pub struct IndexerOpts { @@ -164,27 +172,16 @@ impl UpdateHandler { fn update_documents( &self, - format: String, - method: String, + format: UpdateFormat, + method: IndexDocumentsMethod, content: &[u8], update_builder: UpdateBuilder, ) -> Result<()> { // We must use the write transaction of the update here. let mut wtxn = self.indexes.write_txn()?; let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes); - - match format.as_str() { - "csv" => builder.update_format(UpdateFormat::Csv), - "json" => builder.update_format(UpdateFormat::Json), - "json-stream" => builder.update_format(UpdateFormat::JsonStream), - otherwise => panic!("invalid update format {:?}", otherwise), - }; - - match method.as_str() { - "replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments), - "update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments), - otherwise => panic!("invalid indexing method {:?}", otherwise), - }; + builder.update_format(format); + builder.index_documents_method(method); let gzipped = true; let reader = if gzipped {