From cd271b87627cc931a392ce44a9ece6447ffe62b2 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 28 Aug 2024 19:01:54 +0200 Subject: [PATCH] stop trying to process searches after one minute --- meilisearch/src/search_queue.rs | 26 ++++++++++++++++++++++-- meilisearch/tests/search/search_queue.rs | 18 ++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index ecdaaf3ff..195fa1b6f 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -18,6 +18,7 @@ //! And should drop the Permit only once you have freed all the RAM consumed by the method. use std::num::NonZeroUsize; +use std::time::Duration; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -29,6 +30,9 @@ use crate::error::MeilisearchHttpError; pub struct SearchQueue { sender: mpsc::Sender>, capacity: usize, + /// If we have waited longer than this to get a permit, we should abort the search request entirely. + /// The client probably already closed the connection, but we have no way to find out. + time_to_abort: Duration, } /// You should only run search requests while holding this permit. @@ -65,7 +69,11 @@ impl SearchQueue { let (sender, receiver) = mpsc::channel(1); tokio::task::spawn(Self::run(capacity, paralellism, receiver)); - Self { sender, capacity } + Self { sender, capacity, time_to_abort: Duration::from_secs(60) } + } + + pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self { + Self { time_to_abort, ..self } } /// This function is the main loop, it's in charge on scheduling which search request should execute first and @@ -131,9 +139,23 @@ impl SearchQueue { /// Returns a search `Permit`. /// It should be dropped as soon as you've freed all the RAM associated with the search request being processed. pub async fn try_get_search_permit(&self) -> Result { + let now = std::time::Instant::now(); let (sender, receiver) = oneshot::channel(); self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; - receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) + let permit = receiver + .await + .map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?; + + // If we've been for more than one minute to get a search permit, it's better to simply + // abort the search request than spending time processing something were the client + // most certainly exited or got a timeout a long time ago. + // We may find a better solution in https://github.com/actix/actix-web/issues/3462. + if now.elapsed() > self.time_to_abort { + permit.drop().await; + Err(MeilisearchHttpError::TooManySearchRequests(self.capacity)) + } else { + Ok(permit) + } } /// Returns `Ok(())` if everything seems normal. diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index cbd16097f..498b741e5 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -56,6 +56,24 @@ async fn search_queue_register_with_explicit_drop() { let _permit4 = queue.try_get_search_permit().await.unwrap(); } +#[actix_rt::test] +async fn search_queue_register_with_time_to_abort() { + let queue = Arc::new( + SearchQueue::new(1, NonZeroUsize::new(1).unwrap()) + .with_time_to_abort(Duration::from_secs(1)), + ); + + // First, use all the cores + let permit1 = queue.try_get_search_permit().await.unwrap(); + let q = queue.clone(); + let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await }); + tokio::time::sleep(Duration::from_secs(1)).await; + permit1.drop().await; + let ret = permit2.await.unwrap(); + + snapshot!(ret.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s."); +} + #[actix_rt::test] async fn wait_till_cores_are_available() { let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));