diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index aed77411a..fe0d75dae 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -305,6 +305,7 @@ MissingSwapIndexes , InvalidRequest , BAD_REQUEST ; MissingTaskFilters , InvalidRequest , BAD_REQUEST ; NoSpaceLeftOnDevice , System , UNPROCESSABLE_ENTITY; PayloadTooLarge , InvalidRequest , PAYLOAD_TOO_LARGE ; +TooManySearchRequests , System , SERVICE_UNAVAILABLE ; TaskNotFound , InvalidRequest , NOT_FOUND ; TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ; TooManyVectors , InvalidRequest , BAD_REQUEST ; diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index a8351fd1f..48c44c12d 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -29,6 +29,10 @@ pub enum MeilisearchHttpError { InvalidExpression(&'static [&'static str], Value), #[error("A {0} payload is missing.")] MissingPayload(PayloadType), + #[error("Too many search requests running at the same time: {0}. Retry after {1:?}.")] + TooManySearchRequests(usize, std::time::Duration), + #[error("Internal error: Search limiter is down")] + SearchLimiterIsDown, #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))] PayloadTooLarge(usize), #[error("Two indexes must be given for each swap. The list `[{}]` contains {} indexes.", @@ -69,6 +73,8 @@ impl ErrorCode for MeilisearchHttpError { MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter, MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter, MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge, + MeilisearchHttpError::TooManySearchRequests(_, _) => Code::TooManySearchRequests, + MeilisearchHttpError::SearchLimiterIsDown => Code::Internal, MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes, MeilisearchHttpError::IndexUid(e) => e.error_code(), MeilisearchHttpError::SerdeJson(_) => Code::Internal, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 820f1ae42..bfe25b5a7 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -9,6 +9,7 @@ pub mod middleware; pub mod option; pub mod routes; pub mod search; +pub mod search_queue; use std::fs::File; use std::io::{BufReader, BufWriter}; diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs new file mode 100644 index 000000000..1f3cda1a2 --- /dev/null +++ b/meilisearch/src/search_queue.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use rand::{rngs::StdRng, Rng, SeedableRng}; +use tokio::sync::{mpsc, oneshot}; + +use crate::error::MeilisearchHttpError; + +#[derive(Debug)] +pub struct SearchQueue { + sender: mpsc::Sender>, + capacity: usize, +} + +#[derive(Debug)] +pub struct Permit { + sender: mpsc::Sender<()>, +} + +impl Drop for Permit { + fn drop(&mut self) { + // if the channel is closed then the whole instance is down + let _ = futures::executor::block_on(self.sender.send(())); + } +} + +impl SearchQueue { + pub fn new(capacity: usize, paralellism: usize) -> Self { + // We can make the search requests wait until we're available. + // they're going to wait anyway right after, so let's not allocate any + // RAM by keeping a capacity of 1. + let (sender, receiver) = mpsc::channel(1); + tokio::task::spawn(Self::run(capacity, paralellism, receiver)); + Self { sender, capacity } + } + + async fn run( + capacity: usize, + parallelism: usize, + mut receive_new_searches: mpsc::Receiver>, + ) { + let mut queue: Vec> = Default::default(); + let mut rng: StdRng = StdRng::from_entropy(); + let mut searches_running = 0; + // by having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap + let (sender, mut search_finished) = mpsc::channel(parallelism); + + loop { + tokio::select! { + search_request = receive_new_searches.recv() => { + let search_request = search_request.unwrap(); + println!("queue contains {} elements and already running {}", queue.len(), searches_running); + if searches_running < parallelism && queue.is_empty() { + println!("We can process the search straight away"); + searches_running += 1; + // if the search requests die it's not a hard error on our side + let _ = search_request.send(Permit { sender: sender.clone() }); + continue; + } + if queue.len() >= capacity { + println!("we're above capacity, dropping a random request"); + let remove = rng.gen_range(0..queue.len()); + let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed + drop(thing); + } + println!("pushed a new search request to the queue {}", queue.len()); + queue.push(search_request); + }, + _ = search_finished.recv() => { + searches_running = searches_running.saturating_sub(1); + if !queue.is_empty() { + println!("processed an element in the queue"); + let remove = rng.gen_range(0..queue.len()); + let channel = queue.swap_remove(remove); + let _ = channel.send(Permit { sender: sender.clone() }); + } + }, + } + } + } + + pub async fn register_search(&self) -> Result { + let (sender, receiver) = oneshot::channel(); + self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; + receiver.await.map_err(|_| { + MeilisearchHttpError::TooManySearchRequests(self.capacity, Duration::from_secs(10)) + }) + } +} diff --git a/meilisearch/tests/search/mod.rs b/meilisearch/tests/search/mod.rs index 88470187a..e5925d77e 100644 --- a/meilisearch/tests/search/mod.rs +++ b/meilisearch/tests/search/mod.rs @@ -10,6 +10,7 @@ mod hybrid; mod multi; mod pagination; mod restrict_searchable; +mod search_queue; use once_cell::sync::Lazy; diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs new file mode 100644 index 000000000..078e8c152 --- /dev/null +++ b/meilisearch/tests/search/search_queue.rs @@ -0,0 +1,120 @@ +use std::{sync::Arc, time::Duration}; + +use meili_snap::snapshot; +use meilisearch::search_queue::SearchQueue; + +#[actix_rt::test] +async fn search_queue_register() { + let queue = SearchQueue::new(4, 2); + + // First, use all the cores + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // If we free one spot we should be able to register one new search + drop(permit1); + + let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // And again + drop(permit3); + + let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); +} + +#[actix_rt::test] +async fn search_queue_wait_till_cores_available() { + let queue = Arc::new(SearchQueue::new(4, 1)); + + // First, use all the cores + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + let ret = tokio::time::timeout(Duration::from_secs(1), queue.register_search()).await; + assert!(ret.is_err(), "The capacity is full, we should not get a permit"); + + let q = queue.clone(); + let task = tokio::task::spawn(async move { q.register_search().await }); + + // after dropping a permit the previous task should be able to finish + drop(permit1); + let _permit2 = tokio::time::timeout(Duration::from_secs(1), task) + .await + .expect("I should get a permit straight away") + .unwrap(); +} + +#[actix_rt::test] +async fn search_queue_refuse_search_requests() { + let queue = Arc::new(SearchQueue::new(1, 1)); + + // First, use the whole capacity of the + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + let q = queue.clone(); + let permit2 = tokio::task::spawn(async move { q.register_search().await }); + + // Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out + let q = queue.clone(); + let _permit3 = tokio::task::spawn(async move { q.register_search().await }); + + let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2) + .await + .expect("I should get a result straight away") + .unwrap(); // task should end successfully + + snapshot!(permit2.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s."); +} + +#[actix_rt::test] +async fn search_request_crashes_while_holding_permits() { + let queue = Arc::new(SearchQueue::new(1, 1)); + + let (send, recv) = tokio::sync::oneshot::channel(); + + // This first request take a cpu + let q = queue.clone(); + tokio::task::spawn(async move { + let _permit = q.register_search().await.unwrap(); + recv.await.unwrap(); + panic!("oops an unexpected crash happened") + }); + + // This second request waits in the queue till the first request finishes + let q = queue.clone(); + let task = tokio::task::spawn(async move { + let _permit = q.register_search().await.unwrap(); + }); + + // By sending something in the channel the request holding a CPU will panic and should lose its permit + send.send(()).unwrap(); + + // Then the second request should be able to process and finishes correctly without panic + tokio::time::timeout(Duration::from_secs(1), task) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // I should even be able to take second permit here + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); +}