From 35c9a3c55887db0c72adc0c54f945b1499d5df67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 20 Oct 2020 11:19:34 +0200 Subject: [PATCH] Brodacast the updates infos to every ws clients --- Cargo.lock | 33 --------------------------------- Cargo.toml | 1 - src/subcommand/serve.rs | 33 ++++++++++++++++++++++----------- 3 files changed, 22 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index daeb32d1c..dd5335894 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,17 +77,6 @@ dependencies = [ "warp", ] -[[package]] -name = "async-channel" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - [[package]] name = "atty" version = "0.2.11" @@ -206,12 +195,6 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" -[[package]] -name = "cache-padded" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" - [[package]] name = "cast" version = "0.2.3" @@ -274,15 +257,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "concurrent-queue" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" -dependencies = [ - "cache-padded", -] - [[package]] name = "const_fn" version = "0.4.2" @@ -446,12 +420,6 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" -[[package]] -name = "event-listener" -version = "2.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" - [[package]] name = "fake-simd" version = "0.1.2" @@ -1033,7 +1001,6 @@ dependencies = [ "anyhow", "askama", "askama_warp", - "async-channel", "bstr", "byteorder", "bytes", diff --git a/Cargo.toml b/Cargo.toml index d1e3d8343..4589341d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dependencies] anyhow = "1.0.28" -async-channel = "1.5.1" bstr = "0.2.13" byteorder = "1.3.4" crossbeam-channel = "0.5.0" diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 93f37b5cf..2662cdd56 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -8,13 +8,14 @@ use std::time::Duration; use std::time::Instant; use askama_warp::Template; -use futures::FutureExt; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; +use futures::stream; use heed::EnvOpenOptions; use serde::Deserialize; use structopt::StructOpt; use tokio::fs::File as TFile; use tokio::io::AsyncWriteExt; +use tokio::sync::broadcast; use warp::filters::ws::Message; use warp::{Filter, http::Response}; @@ -110,15 +111,15 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let update_store_path = opt.database.join("updates.mdb"); create_dir_all(&update_store_path)?; - let (update_status_sender, update_status_receiver) = async_channel::unbounded(); + let (update_status_sender, _) = broadcast::channel(100); let update_status_sender_cloned = update_status_sender.clone(); let update_store = UpdateStore::open( update_store_options, update_store_path, move |uid, meta: String, _content| { - let _ = update_status_sender_cloned.try_send(format!("processing update {}", uid)); + let _ = update_status_sender_cloned.send(format!("processing update {}", uid)); std::thread::sleep(Duration::from_secs(3)); - let _ = update_status_sender_cloned.try_send(format!("update {} processed", uid)); + let _ = update_status_sender_cloned.send(format!("update {} processed", uid)); Ok(meta) })?; @@ -288,7 +289,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { async fn buf_stream( update_store: Arc>, - update_status_sender: async_channel::Sender, + update_status_sender: broadcast::Sender, mut stream: impl futures::Stream> + Unpin, ) -> Result { @@ -305,29 +306,39 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let meta = String::from("I am the metadata"); let uid = update_store.register_update(&meta, &mmap[..]).unwrap(); - update_status_sender.try_send(format!("update {} pending", uid)).unwrap(); + update_status_sender.send(format!("update {} pending", uid)).unwrap(); eprintln!("Registering update {}", uid); Ok(warp::reply()) } let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); let indexing_route = warp::filters::method::post() .and(warp::path!("documents")) .and(warp::body::stream()) .and_then(move |stream| { - buf_stream(update_store_cloned.clone(), update_status_sender.clone(), stream) + buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream) }); let update_ws_route = warp::ws() .and(warp::path!("updates" / "ws")) .map(move |ws: warp::ws::Ws| { // And then our closure will be called when it completes... - let update_status_receiver_cloned = update_status_receiver.clone(); + let update_status_receiver = update_status_sender.subscribe(); ws.on_upgrade(|websocket| { // Just echo all updates messages... - update_status_receiver_cloned - .map(|msg| Ok(Message::text(msg))) + update_status_receiver + .into_stream() + .flat_map(|result| { + match result{ + Ok(msg) => stream::iter(Some(Ok(Message::text(msg)))), + Err(e) => { + eprintln!("channel error: {:?}", e); + stream::iter(None) + }, + } + }) .forward(websocket) .map(|result| { if let Err(e) = result {