From a95c44193d1c7f6366da5c2dae88364194dba88f Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 30 Jun 2021 17:29:22 +0200 Subject: [PATCH] Do not block when sending update notifications --- .../update_actor/store/mod.rs | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index cf5b846c6..f4abeae09 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::{ collections::{BTreeMap, HashSet}, path::PathBuf, + time::Duration, }; use arc_swap::ArcSwap; @@ -19,6 +20,8 @@ use log::error; use parking_lot::{Mutex, MutexGuard}; use tokio::runtime::Handle; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio::time::timeout; use uuid::Uuid; use codec::*; @@ -120,7 +123,7 @@ impl UpdateStore { let state = Arc::new(StateLock::from_state(State::Idle)); - let (notification_sender, notification_receiver) = mpsc::channel(10); + let (notification_sender, notification_receiver) = mpsc::channel(1); Ok(( Self { @@ -146,22 +149,22 @@ impl UpdateStore { let update_store = Arc::new(update_store); // Send a first notification to trigger the process. - let _ = update_store.notification_sender.send(()); - - // Init update loop to perform any pending updates at launch. - // Since we just launched the update store, and we still own the receiving end of the - // channel, this call is guaranteed to succeed. - update_store - .notification_sender - .try_send(()) - .expect("Failed to init update store"); + if let Err(TrySendError::Closed(())) = update_store.notification_sender.try_send(()) { + panic!("Failed to init update store"); + } // We need a weak reference so we can take ownership on the arc later when we // want to close the index. + let duration = Duration::from_secs(10 * 60); // 10 minutes let update_store_weak = Arc::downgrade(&update_store); tokio::task::spawn(async move { - // Block and wait for something to process. - 'outer: while notification_receiver.recv().await.is_some() { + // Block and wait for something to process with a timeout. The timeout + // function returns a Result and we must just unlock the loop on Result. + 'outer: while timeout(duration, notification_receiver.recv()) + .await + .transpose() + .map_or(false, |r| r.is_ok()) + { loop { match update_store_weak.upgrade() { Some(update_store) => { @@ -245,9 +248,10 @@ impl UpdateStore { txn.commit()?; - self.notification_sender - .blocking_send(()) - .expect("Update store loop exited."); + if let Err(TrySendError::Closed(())) = self.notification_sender.try_send(()) { + panic!("Update store loop exited"); + } + Ok(meta) }