diff --git a/http-ui/Cargo.lock b/http-ui/Cargo.lock index 3d3581ca6..4b909a6eb 100644 --- a/http-ui/Cargo.lock +++ b/http-ui/Cargo.lock @@ -80,19 +80,6 @@ dependencies = [ "warp", ] -[[package]] -name = "async-compression" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb1ff21a63d3262af46b9f33a826a8d134e2d0d9b2179c86034948b732ea8b2a" -dependencies = [ - "flate2", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", -] - [[package]] name = "atty" version = "0.2.14" @@ -767,7 +754,6 @@ dependencies = [ "anyhow", "askama", "askama_warp", - "async-compression", "byte-unit", "bytes", "flate2", diff --git a/http-ui/Cargo.toml b/http-ui/Cargo.toml index 6b236496e..73470f7f4 100644 --- a/http-ui/Cargo.toml +++ b/http-ui/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] anyhow = "1.0.28" -async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] } byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" } heed = "0.10.5" diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 5299c0e13..51e6e9f85 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet}; +use std::fmt::Display; use std::fs::{File, create_dir_all}; use std::net::SocketAddr; use std::num::NonZeroUsize; @@ -10,7 +11,6 @@ use std::time::Instant; use std::{mem, io}; use askama_warp::Template; -use async_compression::tokio_02::write::GzipEncoder; use byte_unit::Byte; use flate2::read::GzDecoder; use futures::stream; @@ -178,7 +178,7 @@ struct IndexTemplate { #[derive(Template)] #[template(path = "updates.html")] -struct UpdatesTemplate { +struct UpdatesTemplate { db_name: String, db_size: usize, docs_count: usize, @@ -208,7 +208,7 @@ impl UpdateStatus { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UpdateMeta { - DocumentsAddition { method: String, format: String }, + DocumentsAddition { method: String, format: String, encoding: Option }, ClearDocuments, Settings(Settings), Facets(Facets), @@ -323,9 +323,10 @@ async fn main() -> anyhow::Result<()> { update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type); update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size.get_bytes()); + let before_update = Instant::now(); // we extract the update type and execute the update itself. let result: anyhow::Result<()> = match meta { - UpdateMeta::DocumentsAddition { method, format } => { + UpdateMeta::DocumentsAddition { method, format, encoding } => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); @@ -343,11 +344,10 @@ async fn main() -> anyhow::Result<()> { otherwise => panic!("invalid indexing method {:?}", otherwise), }; - let gzipped = true; - let reader = if gzipped { - Box::new(GzDecoder::new(content)) - } else { - Box::new(content) as Box + let reader = match encoding.as_deref() { + Some("gzip") => Box::new(GzDecoder::new(content)), + None => Box::new(content) as Box, + otherwise => panic!("invalid encoding format {:?}", otherwise), }; let result = builder.execute(reader, |indexing_step| { @@ -458,7 +458,7 @@ async fn main() -> anyhow::Result<()> { }; let meta = match result { - Ok(()) => format!("valid update content"), + Ok(()) => format!("valid update content processed in {:.02?}", before_update.elapsed()), Err(e) => format!("error while processing update content: {:?}", e), }; @@ -703,21 +703,18 @@ async fn main() -> anyhow::Result<()> { update_status_sender: broadcast::Sender>, update_method: Option, update_format: UpdateFormat, + encoding: Option, mut stream: impl futures::Stream> + Unpin, ) -> Result { let file = tokio::task::block_in_place(tempfile::tempfile).unwrap(); - let file = TFile::from_std(file); - let mut encoder = GzipEncoder::new(file); + let mut file = TFile::from_std(file); while let Some(result) = stream.next().await { let bytes = result.unwrap().to_bytes(); - encoder.write_all(&bytes[..]).await.unwrap(); + file.write_all(&bytes[..]).await.unwrap(); } - encoder.shutdown().await.unwrap(); - let mut file = encoder.into_inner(); - file.sync_all().await.unwrap(); let file = file.into_std().await; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; @@ -734,7 +731,7 @@ async fn main() -> anyhow::Result<()> { _ => panic!("Unknown update format"), }; - let meta = UpdateMeta::DocumentsAddition { method, format }; + let meta = UpdateMeta::DocumentsAddition { method, format, encoding }; let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); @@ -749,51 +746,26 @@ async fn main() -> anyhow::Result<()> { let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); - let indexing_csv_route = warp::filters::method::post() + let indexing_route = warp::filters::method::post() .and(warp::path!("documents")) - .and(warp::header::exact_ignore_case("content-type", "text/csv")) - .and(warp::filters::query::query()) + .and(warp::header::header("content-type")) + .and(warp::header::optional::("content-encoding")) + .and(warp::query::query()) .and(warp::body::stream()) - .and_then(move |params: QueryUpdate, stream| { - buf_stream( - update_store_cloned.clone(), - update_status_sender_cloned.clone(), - params.method, - UpdateFormat::Csv, - stream, - ) - }); + .and_then(move |content_type: String, content_encoding, params: QueryUpdate, stream| { + let format = match content_type.as_str() { + "text/csv" => UpdateFormat::Csv, + "application/json" => UpdateFormat::Json, + "application/x-ndjson" => UpdateFormat::JsonStream, + otherwise => panic!("invalid update format: {}", otherwise), + }; - let update_store_cloned = update_store.clone(); - let update_status_sender_cloned = update_status_sender.clone(); - let indexing_json_route = warp::filters::method::post() - .and(warp::path!("documents")) - .and(warp::header::exact_ignore_case("content-type", "application/json")) - .and(warp::filters::query::query()) - .and(warp::body::stream()) - .and_then(move |params: QueryUpdate, stream| { buf_stream( update_store_cloned.clone(), update_status_sender_cloned.clone(), params.method, - UpdateFormat::Json, - stream, - ) - }); - - let update_store_cloned = update_store.clone(); - let update_status_sender_cloned = update_status_sender.clone(); - let indexing_json_stream_route = warp::filters::method::post() - .and(warp::path!("documents")) - .and(warp::header::exact_ignore_case("content-type", "application/x-ndjson")) - .and(warp::filters::query::query()) - .and(warp::body::stream()) - .and_then(move |params: QueryUpdate, stream| { - buf_stream( - update_store_cloned.clone(), - update_status_sender_cloned.clone(), - params.method, - UpdateFormat::JsonStream, + format, + content_encoding, stream, ) }); @@ -904,9 +876,7 @@ async fn main() -> anyhow::Result<()> { .or(dash_logo_black_route) .or(query_route) .or(document_route) - .or(indexing_csv_route) - .or(indexing_json_route) - .or(indexing_json_stream_route) + .or(indexing_route) .or(abort_update_id_route) .or(abort_pending_updates_route) .or(clearing_route) diff --git a/http-ui/templates/updates.html b/http-ui/templates/updates.html index 514a006b3..276bee40c 100644 --- a/http-ui/templates/updates.html +++ b/http-ui/templates/updates.html @@ -12,7 +12,7 @@ -
+
@@ -68,7 +68,7 @@
  1. update id
    {{ update_id }}
    -
    update status
    processed
    +
    update status
    {{ meta }}