mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 12:05:05 +08:00
Merge #4899
4899: stop trying to process searches after one minute r=ManyTheFish a=irevoire # Pull Request ## Related issue May be related to #4654 and https://github.com/meilisearch/meilisearch-support/issues/350 ## What does this PR do? - If we've been waiting for one whole minute for a search to process, we cancel it - Ideally we should check if the connection was closed instead but that’s not possible currently: https://github.com/actix/actix-web/issues/3462 Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
5fed4d035a
@ -18,6 +18,7 @@
|
|||||||
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
|
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
|
||||||
|
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use rand::rngs::StdRng;
|
use rand::rngs::StdRng;
|
||||||
use rand::{Rng, SeedableRng};
|
use rand::{Rng, SeedableRng};
|
||||||
@ -29,6 +30,9 @@ use crate::error::MeilisearchHttpError;
|
|||||||
pub struct SearchQueue {
|
pub struct SearchQueue {
|
||||||
sender: mpsc::Sender<oneshot::Sender<Permit>>,
|
sender: mpsc::Sender<oneshot::Sender<Permit>>,
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
|
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
|
||||||
|
/// The client probably already closed the connection, but we have no way to find out.
|
||||||
|
time_to_abort: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// You should only run search requests while holding this permit.
|
/// You should only run search requests while holding this permit.
|
||||||
@ -65,7 +69,11 @@ impl SearchQueue {
|
|||||||
let (sender, receiver) = mpsc::channel(1);
|
let (sender, receiver) = mpsc::channel(1);
|
||||||
|
|
||||||
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
|
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
|
||||||
Self { sender, capacity }
|
Self { sender, capacity, time_to_abort: Duration::from_secs(60) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
|
||||||
|
Self { time_to_abort, ..self }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
|
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
|
||||||
@ -131,9 +139,23 @@ impl SearchQueue {
|
|||||||
/// Returns a search `Permit`.
|
/// Returns a search `Permit`.
|
||||||
/// It should be dropped as soon as you've freed all the RAM associated with the search request being processed.
|
/// It should be dropped as soon as you've freed all the RAM associated with the search request being processed.
|
||||||
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
|
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
|
||||||
|
let now = std::time::Instant::now();
|
||||||
let (sender, receiver) = oneshot::channel();
|
let (sender, receiver) = oneshot::channel();
|
||||||
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
|
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
|
||||||
receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))
|
let permit = receiver
|
||||||
|
.await
|
||||||
|
.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?;
|
||||||
|
|
||||||
|
// If we've been for more than one minute to get a search permit, it's better to simply
|
||||||
|
// abort the search request than spending time processing something were the client
|
||||||
|
// most certainly exited or got a timeout a long time ago.
|
||||||
|
// We may find a better solution in https://github.com/actix/actix-web/issues/3462.
|
||||||
|
if now.elapsed() > self.time_to_abort {
|
||||||
|
permit.drop().await;
|
||||||
|
Err(MeilisearchHttpError::TooManySearchRequests(self.capacity))
|
||||||
|
} else {
|
||||||
|
Ok(permit)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `Ok(())` if everything seems normal.
|
/// Returns `Ok(())` if everything seems normal.
|
||||||
|
@ -56,6 +56,24 @@ async fn search_queue_register_with_explicit_drop() {
|
|||||||
let _permit4 = queue.try_get_search_permit().await.unwrap();
|
let _permit4 = queue.try_get_search_permit().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn search_queue_register_with_time_to_abort() {
|
||||||
|
let queue = Arc::new(
|
||||||
|
SearchQueue::new(1, NonZeroUsize::new(1).unwrap())
|
||||||
|
.with_time_to_abort(Duration::from_secs(1)),
|
||||||
|
);
|
||||||
|
|
||||||
|
// First, use all the cores
|
||||||
|
let permit1 = queue.try_get_search_permit().await.unwrap();
|
||||||
|
let q = queue.clone();
|
||||||
|
let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await });
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
permit1.drop().await;
|
||||||
|
let ret = permit2.await.unwrap();
|
||||||
|
|
||||||
|
snapshot!(ret.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s.");
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn wait_till_cores_are_available() {
|
async fn wait_till_cores_are_available() {
|
||||||
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));
|
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));
|
||||||
|
Loading…
Reference in New Issue
Block a user