From c41e1274dc7aa8360d7ed0a64e5e04c2c73c95fd Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 15:56:43 +0100 Subject: [PATCH 01/14] push and test the search queue datastructure --- meilisearch-types/src/error.rs | 1 + meilisearch/src/error.rs | 6 ++ meilisearch/src/lib.rs | 1 + meilisearch/src/search_queue.rs | 88 +++++++++++++++++ meilisearch/tests/search/mod.rs | 1 + meilisearch/tests/search/search_queue.rs | 120 +++++++++++++++++++++++ 6 files changed, 217 insertions(+) create mode 100644 meilisearch/src/search_queue.rs create mode 100644 meilisearch/tests/search/search_queue.rs diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index aed77411a..fe0d75dae 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -305,6 +305,7 @@ MissingSwapIndexes , InvalidRequest , BAD_REQUEST ; MissingTaskFilters , InvalidRequest , BAD_REQUEST ; NoSpaceLeftOnDevice , System , UNPROCESSABLE_ENTITY; PayloadTooLarge , InvalidRequest , PAYLOAD_TOO_LARGE ; +TooManySearchRequests , System , SERVICE_UNAVAILABLE ; TaskNotFound , InvalidRequest , NOT_FOUND ; TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ; TooManyVectors , InvalidRequest , BAD_REQUEST ; diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index a8351fd1f..48c44c12d 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -29,6 +29,10 @@ pub enum MeilisearchHttpError { InvalidExpression(&'static [&'static str], Value), #[error("A {0} payload is missing.")] MissingPayload(PayloadType), + #[error("Too many search requests running at the same time: {0}. Retry after {1:?}.")] + TooManySearchRequests(usize, std::time::Duration), + #[error("Internal error: Search limiter is down")] + SearchLimiterIsDown, #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))] PayloadTooLarge(usize), #[error("Two indexes must be given for each swap. The list `[{}]` contains {} indexes.", @@ -69,6 +73,8 @@ impl ErrorCode for MeilisearchHttpError { MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter, MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter, MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge, + MeilisearchHttpError::TooManySearchRequests(_, _) => Code::TooManySearchRequests, + MeilisearchHttpError::SearchLimiterIsDown => Code::Internal, MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes, MeilisearchHttpError::IndexUid(e) => e.error_code(), MeilisearchHttpError::SerdeJson(_) => Code::Internal, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 820f1ae42..bfe25b5a7 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -9,6 +9,7 @@ pub mod middleware; pub mod option; pub mod routes; pub mod search; +pub mod search_queue; use std::fs::File; use std::io::{BufReader, BufWriter}; diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs new file mode 100644 index 000000000..1f3cda1a2 --- /dev/null +++ b/meilisearch/src/search_queue.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use rand::{rngs::StdRng, Rng, SeedableRng}; +use tokio::sync::{mpsc, oneshot}; + +use crate::error::MeilisearchHttpError; + +#[derive(Debug)] +pub struct SearchQueue { + sender: mpsc::Sender>, + capacity: usize, +} + +#[derive(Debug)] +pub struct Permit { + sender: mpsc::Sender<()>, +} + +impl Drop for Permit { + fn drop(&mut self) { + // if the channel is closed then the whole instance is down + let _ = futures::executor::block_on(self.sender.send(())); + } +} + +impl SearchQueue { + pub fn new(capacity: usize, paralellism: usize) -> Self { + // We can make the search requests wait until we're available. + // they're going to wait anyway right after, so let's not allocate any + // RAM by keeping a capacity of 1. + let (sender, receiver) = mpsc::channel(1); + tokio::task::spawn(Self::run(capacity, paralellism, receiver)); + Self { sender, capacity } + } + + async fn run( + capacity: usize, + parallelism: usize, + mut receive_new_searches: mpsc::Receiver>, + ) { + let mut queue: Vec> = Default::default(); + let mut rng: StdRng = StdRng::from_entropy(); + let mut searches_running = 0; + // by having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap + let (sender, mut search_finished) = mpsc::channel(parallelism); + + loop { + tokio::select! { + search_request = receive_new_searches.recv() => { + let search_request = search_request.unwrap(); + println!("queue contains {} elements and already running {}", queue.len(), searches_running); + if searches_running < parallelism && queue.is_empty() { + println!("We can process the search straight away"); + searches_running += 1; + // if the search requests die it's not a hard error on our side + let _ = search_request.send(Permit { sender: sender.clone() }); + continue; + } + if queue.len() >= capacity { + println!("we're above capacity, dropping a random request"); + let remove = rng.gen_range(0..queue.len()); + let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed + drop(thing); + } + println!("pushed a new search request to the queue {}", queue.len()); + queue.push(search_request); + }, + _ = search_finished.recv() => { + searches_running = searches_running.saturating_sub(1); + if !queue.is_empty() { + println!("processed an element in the queue"); + let remove = rng.gen_range(0..queue.len()); + let channel = queue.swap_remove(remove); + let _ = channel.send(Permit { sender: sender.clone() }); + } + }, + } + } + } + + pub async fn register_search(&self) -> Result { + let (sender, receiver) = oneshot::channel(); + self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; + receiver.await.map_err(|_| { + MeilisearchHttpError::TooManySearchRequests(self.capacity, Duration::from_secs(10)) + }) + } +} diff --git a/meilisearch/tests/search/mod.rs b/meilisearch/tests/search/mod.rs index 88470187a..e5925d77e 100644 --- a/meilisearch/tests/search/mod.rs +++ b/meilisearch/tests/search/mod.rs @@ -10,6 +10,7 @@ mod hybrid; mod multi; mod pagination; mod restrict_searchable; +mod search_queue; use once_cell::sync::Lazy; diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs new file mode 100644 index 000000000..078e8c152 --- /dev/null +++ b/meilisearch/tests/search/search_queue.rs @@ -0,0 +1,120 @@ +use std::{sync::Arc, time::Duration}; + +use meili_snap::snapshot; +use meilisearch::search_queue::SearchQueue; + +#[actix_rt::test] +async fn search_queue_register() { + let queue = SearchQueue::new(4, 2); + + // First, use all the cores + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // If we free one spot we should be able to register one new search + drop(permit1); + + let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // And again + drop(permit3); + + let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); +} + +#[actix_rt::test] +async fn search_queue_wait_till_cores_available() { + let queue = Arc::new(SearchQueue::new(4, 1)); + + // First, use all the cores + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + let ret = tokio::time::timeout(Duration::from_secs(1), queue.register_search()).await; + assert!(ret.is_err(), "The capacity is full, we should not get a permit"); + + let q = queue.clone(); + let task = tokio::task::spawn(async move { q.register_search().await }); + + // after dropping a permit the previous task should be able to finish + drop(permit1); + let _permit2 = tokio::time::timeout(Duration::from_secs(1), task) + .await + .expect("I should get a permit straight away") + .unwrap(); +} + +#[actix_rt::test] +async fn search_queue_refuse_search_requests() { + let queue = Arc::new(SearchQueue::new(1, 1)); + + // First, use the whole capacity of the + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + let q = queue.clone(); + let permit2 = tokio::task::spawn(async move { q.register_search().await }); + + // Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out + let q = queue.clone(); + let _permit3 = tokio::task::spawn(async move { q.register_search().await }); + + let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2) + .await + .expect("I should get a result straight away") + .unwrap(); // task should end successfully + + snapshot!(permit2.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s."); +} + +#[actix_rt::test] +async fn search_request_crashes_while_holding_permits() { + let queue = Arc::new(SearchQueue::new(1, 1)); + + let (send, recv) = tokio::sync::oneshot::channel(); + + // This first request take a cpu + let q = queue.clone(); + tokio::task::spawn(async move { + let _permit = q.register_search().await.unwrap(); + recv.await.unwrap(); + panic!("oops an unexpected crash happened") + }); + + // This second request waits in the queue till the first request finishes + let q = queue.clone(); + let task = tokio::task::spawn(async move { + let _permit = q.register_search().await.unwrap(); + }); + + // By sending something in the channel the request holding a CPU will panic and should lose its permit + send.send(()).unwrap(); + + // Then the second request should be able to process and finishes correctly without panic + tokio::time::timeout(Duration::from_secs(1), task) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // I should even be able to take second permit here + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + .await + .expect("I should get a permit straight away") + .unwrap(); +} From 3f23fbb46d410eaad6109aee4d849dc7f3234f11 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 16:43:40 +0100 Subject: [PATCH 02/14] create the experimental CLI argument --- meilisearch/src/analytics/segment_analytics.rs | 3 +++ meilisearch/src/option.rs | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 99298bd43..b334f651d 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -252,6 +252,7 @@ impl super::Analytics for SegmentAnalytics { struct Infos { env: String, experimental_enable_metrics: bool, + experimental_search_queue_size: usize, experimental_logs_mode: LogMode, experimental_replication_parameters: bool, experimental_enable_logs_route: bool, @@ -293,6 +294,7 @@ impl From for Infos { let Opt { db_path, experimental_enable_metrics, + experimental_search_queue_size, experimental_logs_mode, experimental_replication_parameters, experimental_enable_logs_route, @@ -342,6 +344,7 @@ impl From for Infos { Self { env, experimental_enable_metrics, + experimental_search_queue_size, experimental_logs_mode, experimental_replication_parameters, experimental_enable_logs_route, diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 43bf2c62c..a9b8578bb 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -54,6 +54,7 @@ const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE"; const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; +const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE"; const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = "MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE"; const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str = @@ -344,6 +345,16 @@ pub struct Opt { #[serde(default)] pub experimental_enable_metrics: bool, + /// TODO: Update the discussion link + /// Experimental search queue size. For more information, see: + /// + /// Lets you customize the size of the search queue. Meilisearch processes your search requests as fast as possible but once the + /// queue is full it starts returning HTTP 503, Service Unavailable. + /// The default value is 1000. + #[clap(long, env = MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, default_value_t = 1000)] + #[serde(default)] + pub experimental_search_queue_size: usize, + /// Experimental logs mode feature. For more information, see: /// /// Change the mode of the logs on the console. @@ -473,6 +484,7 @@ impl Opt { #[cfg(feature = "analytics")] no_analytics, experimental_enable_metrics, + experimental_search_queue_size, experimental_logs_mode, experimental_enable_logs_route, experimental_replication_parameters, @@ -532,6 +544,10 @@ impl Opt { MEILI_EXPERIMENTAL_ENABLE_METRICS, experimental_enable_metrics.to_string(), ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, + experimental_search_queue_size.to_string(), + ); export_to_env_if_not_present( MEILI_EXPERIMENTAL_LOGS_MODE, experimental_logs_mode.to_string(), From e433fd53e6fe1a5acbfa002bcc33b7c2e9468d1f Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 17:28:03 +0100 Subject: [PATCH 03/14] rename the method to get a permit and use it in all search requests --- meilisearch/src/lib.rs | 9 ++++- .../src/routes/indexes/facet_search.rs | 3 ++ meilisearch/src/routes/indexes/search.rs | 5 +++ meilisearch/src/routes/multi_search.rs | 6 ++++ meilisearch/src/search_queue.rs | 14 ++++---- meilisearch/tests/search/search_queue.rs | 36 +++++++++---------- 6 files changed, 47 insertions(+), 26 deletions(-) diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index bfe25b5a7..bb7562c85 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -13,9 +13,10 @@ pub mod search_queue; use std::fs::File; use std::io::{BufReader, BufWriter}; +use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; -use std::thread; +use std::thread::{self, available_parallelism}; use std::time::Duration; use actix_cors::Cors; @@ -39,6 +40,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; +use search_queue::SearchQueue; use tracing::{error, info_span}; use tracing_subscriber::filter::Targets; @@ -470,10 +472,15 @@ pub fn configure_data( (logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle), analytics: Arc, ) { + let search_queue = SearchQueue::new( + opt.experimental_search_queue_size, + available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()), + ); let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize; config .app_data(index_scheduler) .app_data(auth) + .app_data(web::Data::new(search_queue)) .app_data(web::Data::from(analytics)) .app_data(web::Data::new(logs_route)) .app_data(web::Data::new(logs_stderr)) diff --git a/meilisearch/src/routes/indexes/facet_search.rs b/meilisearch/src/routes/indexes/facet_search.rs index a980fb278..272b8156f 100644 --- a/meilisearch/src/routes/indexes/facet_search.rs +++ b/meilisearch/src/routes/indexes/facet_search.rs @@ -17,6 +17,7 @@ use crate::search::{ DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(search))); @@ -48,6 +49,7 @@ pub struct FacetSearchQuery { pub async fn search( index_scheduler: GuardedData, Data>, + search_queue: Data, index_uid: web::Path, params: AwebJson, req: HttpRequest, @@ -71,6 +73,7 @@ pub async fn search( let index = index_scheduler.index(&index_uid)?; let features = index_scheduler.features(); + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_facet_search(&index, search_query, facet_query, facet_name, features) }) diff --git a/meilisearch/src/routes/indexes/search.rs b/meilisearch/src/routes/indexes/search.rs index 6a430b6a3..880786138 100644 --- a/meilisearch/src/routes/indexes/search.rs +++ b/meilisearch/src/routes/indexes/search.rs @@ -23,6 +23,7 @@ use crate::search::{ DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( @@ -182,6 +183,7 @@ fn fix_sort_query_parameters(sort_query: &str) -> Vec { pub async fn search_with_url_query( index_scheduler: GuardedData, Data>, + search_queue: web::Data, index_uid: web::Path, params: AwebQueryParameter, req: HttpRequest, @@ -204,6 +206,7 @@ pub async fn search_with_url_query( let distribution = embed(&mut query, index_scheduler.get_ref(), &index).await?; + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution)) .await?; @@ -220,6 +223,7 @@ pub async fn search_with_url_query( pub async fn search_with_post( index_scheduler: GuardedData, Data>, + search_queue: web::Data, index_uid: web::Path, params: AwebJson, req: HttpRequest, @@ -243,6 +247,7 @@ pub async fn search_with_post( let distribution = embed(&mut query, index_scheduler.get_ref(), &index).await?; + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution)) .await?; diff --git a/meilisearch/src/routes/multi_search.rs b/meilisearch/src/routes/multi_search.rs index 86aa58e70..55f633e97 100644 --- a/meilisearch/src/routes/multi_search.rs +++ b/meilisearch/src/routes/multi_search.rs @@ -17,6 +17,7 @@ use crate::routes::indexes::search::embed; use crate::search::{ add_search_rules, perform_search, SearchQueryWithIndex, SearchResultWithIndex, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(multi_search_with_post)))); @@ -35,6 +36,7 @@ pub struct SearchQueries { pub async fn multi_search_with_post( index_scheduler: GuardedData, Data>, + search_queue: Data, params: AwebJson, req: HttpRequest, analytics: web::Data, @@ -44,6 +46,10 @@ pub async fn multi_search_with_post( let mut multi_aggregate = MultiSearchAggregator::from_queries(&queries, &req); let features = index_scheduler.features(); + // Since we don't want to process half of the search requests and then get a permit refused + // we're going to get one permit for the whole duration of the multi-search request. + let _permit = search_queue.try_get_search_permit().await?; + // Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only, // so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code // changes. diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 1f3cda1a2..570394e34 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{num::NonZeroUsize, time::Duration}; use rand::{rngs::StdRng, Rng, SeedableRng}; use tokio::sync::{mpsc, oneshot}; @@ -24,7 +24,7 @@ impl Drop for Permit { } impl SearchQueue { - pub fn new(capacity: usize, paralellism: usize) -> Self { + pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self { // We can make the search requests wait until we're available. // they're going to wait anyway right after, so let's not allocate any // RAM by keeping a capacity of 1. @@ -35,21 +35,21 @@ impl SearchQueue { async fn run( capacity: usize, - parallelism: usize, + parallelism: NonZeroUsize, mut receive_new_searches: mpsc::Receiver>, ) { let mut queue: Vec> = Default::default(); let mut rng: StdRng = StdRng::from_entropy(); - let mut searches_running = 0; + let mut searches_running: usize = 0; // by having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap - let (sender, mut search_finished) = mpsc::channel(parallelism); + let (sender, mut search_finished) = mpsc::channel(parallelism.into()); loop { tokio::select! { search_request = receive_new_searches.recv() => { let search_request = search_request.unwrap(); println!("queue contains {} elements and already running {}", queue.len(), searches_running); - if searches_running < parallelism && queue.is_empty() { + if searches_running < usize::from(parallelism) && queue.is_empty() { println!("We can process the search straight away"); searches_running += 1; // if the search requests die it's not a hard error on our side @@ -78,7 +78,7 @@ impl SearchQueue { } } - pub async fn register_search(&self) -> Result { + pub async fn try_get_search_permit(&self) -> Result { let (sender, receiver) = oneshot::channel(); self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; receiver.await.map_err(|_| { diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 078e8c152..15e62ab6d 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -1,18 +1,18 @@ -use std::{sync::Arc, time::Duration}; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use meili_snap::snapshot; use meilisearch::search_queue::SearchQueue; #[actix_rt::test] async fn search_queue_register() { - let queue = SearchQueue::new(4, 2); + let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap()); // First, use all the cores - let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); - let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); @@ -20,7 +20,7 @@ async fn search_queue_register() { // If we free one spot we should be able to register one new search drop(permit1); - let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); @@ -28,7 +28,7 @@ async fn search_queue_register() { // And again drop(permit3); - let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); @@ -36,19 +36,19 @@ async fn search_queue_register() { #[actix_rt::test] async fn search_queue_wait_till_cores_available() { - let queue = Arc::new(SearchQueue::new(4, 1)); + let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); // First, use all the cores - let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); - let ret = tokio::time::timeout(Duration::from_secs(1), queue.register_search()).await; + let ret = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()).await; assert!(ret.is_err(), "The capacity is full, we should not get a permit"); let q = queue.clone(); - let task = tokio::task::spawn(async move { q.register_search().await }); + let task = tokio::task::spawn(async move { q.try_get_search_permit().await }); // after dropping a permit the previous task should be able to finish drop(permit1); @@ -60,20 +60,20 @@ async fn search_queue_wait_till_cores_available() { #[actix_rt::test] async fn search_queue_refuse_search_requests() { - let queue = Arc::new(SearchQueue::new(1, 1)); + let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap())); // First, use the whole capacity of the - let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); let q = queue.clone(); - let permit2 = tokio::task::spawn(async move { q.register_search().await }); + let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await }); // Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out let q = queue.clone(); - let _permit3 = tokio::task::spawn(async move { q.register_search().await }); + let _permit3 = tokio::task::spawn(async move { q.try_get_search_permit().await }); let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2) .await @@ -85,14 +85,14 @@ async fn search_queue_refuse_search_requests() { #[actix_rt::test] async fn search_request_crashes_while_holding_permits() { - let queue = Arc::new(SearchQueue::new(1, 1)); + let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap())); let (send, recv) = tokio::sync::oneshot::channel(); // This first request take a cpu let q = queue.clone(); tokio::task::spawn(async move { - let _permit = q.register_search().await.unwrap(); + let _permit = q.try_get_search_permit().await.unwrap(); recv.await.unwrap(); panic!("oops an unexpected crash happened") }); @@ -100,7 +100,7 @@ async fn search_request_crashes_while_holding_permits() { // This second request waits in the queue till the first request finishes let q = queue.clone(); let task = tokio::task::spawn(async move { - let _permit = q.register_search().await.unwrap(); + let _permit = q.try_get_search_permit().await.unwrap(); }); // By sending something in the channel the request holding a CPU will panic and should lose its permit @@ -113,7 +113,7 @@ async fn search_request_crashes_while_holding_permits() { .unwrap(); // I should even be able to take second permit here - let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); From e2a1bbae378182897ecb03320e602b6d31e72bbe Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 17:53:37 +0100 Subject: [PATCH 04/14] simplify and improve the http error --- meilisearch-types/src/error.rs | 10 +++++++++- meilisearch/src/error.rs | 6 +++--- meilisearch/src/search_queue.rs | 17 +++++------------ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index fe0d75dae..f16097da1 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -2,6 +2,7 @@ use std::{fmt, io}; use actix_web::http::StatusCode; use actix_web::{self as aweb, HttpResponseBuilder}; +use aweb::http::header; use aweb::rt::task::JoinError; use convert_case::Casing; use milli::heed::{Error as HeedError, MdbError}; @@ -56,7 +57,14 @@ where impl aweb::error::ResponseError for ResponseError { fn error_response(&self) -> aweb::HttpResponse { let json = serde_json::to_vec(self).unwrap(); - HttpResponseBuilder::new(self.status_code()).content_type("application/json").body(json) + let mut builder = HttpResponseBuilder::new(self.status_code()); + builder.content_type("application/json"); + + if self.code == StatusCode::SERVICE_UNAVAILABLE { + builder.insert_header((header::RETRY_AFTER, "10")); + } + + builder.body(json) } fn status_code(&self) -> StatusCode { diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index 48c44c12d..bb1156997 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -29,8 +29,8 @@ pub enum MeilisearchHttpError { InvalidExpression(&'static [&'static str], Value), #[error("A {0} payload is missing.")] MissingPayload(PayloadType), - #[error("Too many search requests running at the same time: {0}. Retry after {1:?}.")] - TooManySearchRequests(usize, std::time::Duration), + #[error("Too many search requests running at the same time: {0}. Retry after 10s.")] + TooManySearchRequests(usize), #[error("Internal error: Search limiter is down")] SearchLimiterIsDown, #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))] @@ -73,7 +73,7 @@ impl ErrorCode for MeilisearchHttpError { MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter, MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter, MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge, - MeilisearchHttpError::TooManySearchRequests(_, _) => Code::TooManySearchRequests, + MeilisearchHttpError::TooManySearchRequests(_) => Code::TooManySearchRequests, MeilisearchHttpError::SearchLimiterIsDown => Code::Internal, MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes, MeilisearchHttpError::IndexUid(e) => e.error_code(), diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 570394e34..b677f81a4 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -1,4 +1,4 @@ -use std::{num::NonZeroUsize, time::Duration}; +use std::num::NonZeroUsize; use rand::{rngs::StdRng, Rng, SeedableRng}; use tokio::sync::{mpsc, oneshot}; @@ -25,10 +25,10 @@ impl Drop for Permit { impl SearchQueue { pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self { - // We can make the search requests wait until we're available. - // they're going to wait anyway right after, so let's not allocate any - // RAM by keeping a capacity of 1. + // Search requests are going to wait until we're available anyway, + // so let's not allocate any RAM and keep a capacity of 1. let (sender, receiver) = mpsc::channel(1); + tokio::task::spawn(Self::run(capacity, paralellism, receiver)); Self { sender, capacity } } @@ -48,27 +48,22 @@ impl SearchQueue { tokio::select! { search_request = receive_new_searches.recv() => { let search_request = search_request.unwrap(); - println!("queue contains {} elements and already running {}", queue.len(), searches_running); if searches_running < usize::from(parallelism) && queue.is_empty() { - println!("We can process the search straight away"); searches_running += 1; // if the search requests die it's not a hard error on our side let _ = search_request.send(Permit { sender: sender.clone() }); continue; } if queue.len() >= capacity { - println!("we're above capacity, dropping a random request"); let remove = rng.gen_range(0..queue.len()); let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed drop(thing); } - println!("pushed a new search request to the queue {}", queue.len()); queue.push(search_request); }, _ = search_finished.recv() => { searches_running = searches_running.saturating_sub(1); if !queue.is_empty() { - println!("processed an element in the queue"); let remove = rng.gen_range(0..queue.len()); let channel = queue.swap_remove(remove); let _ = channel.send(Permit { sender: sender.clone() }); @@ -81,8 +76,6 @@ impl SearchQueue { pub async fn try_get_search_permit(&self) -> Result { let (sender, receiver) = oneshot::channel(); self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; - receiver.await.map_err(|_| { - MeilisearchHttpError::TooManySearchRequests(self.capacity, Duration::from_secs(10)) - }) + receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) } } From e7704f1fc127268a3dfaea6ffff40633a47cf273 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 18:08:59 +0100 Subject: [PATCH 05/14] add a test to ensure we effectively returns a retry-after when the search queue is full --- meilisearch/tests/search/search_queue.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 15e62ab6d..8dca2ab31 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -1,5 +1,6 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use actix_web::ResponseError; use meili_snap::snapshot; use meilisearch::search_queue::SearchQueue; @@ -35,7 +36,7 @@ async fn search_queue_register() { } #[actix_rt::test] -async fn search_queue_wait_till_cores_available() { +async fn wait_till_cores_are_available() { let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); // First, use all the cores @@ -59,7 +60,7 @@ async fn search_queue_wait_till_cores_available() { } #[actix_rt::test] -async fn search_queue_refuse_search_requests() { +async fn refuse_search_requests_when_queue_is_full() { let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap())); // First, use the whole capacity of the @@ -80,7 +81,19 @@ async fn search_queue_refuse_search_requests() { .expect("I should get a result straight away") .unwrap(); // task should end successfully - snapshot!(permit2.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s."); + let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err()); + let http_response = err.error_response(); + snapshot!(format!("{:?}", http_response.headers()), @r###"HeaderMap { inner: {"retry-after": Value { inner: ["10"] }, "content-type": Value { inner: ["application/json"] }} }"###); + + let err = serde_json::to_string_pretty(&err).unwrap(); + snapshot!(err, @r###" + { + "message": "Too many search requests running at the same time: 1. Retry after 10s.", + "code": "too_many_search_requests", + "type": "system", + "link": "https://docs.meilisearch.com/errors#too_many_search_requests" + } + "###); } #[actix_rt::test] From 8127c9a115a0091654ecab1513f66a151f8819ba Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 19:04:39 +0100 Subject: [PATCH 06/14] handle the case of a queue of zero elements --- meilisearch/src/search_queue.rs | 11 +++++-- meilisearch/tests/search/search_queue.rs | 37 ++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index b677f81a4..0dd2abf17 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -47,14 +47,21 @@ impl SearchQueue { loop { tokio::select! { search_request = receive_new_searches.recv() => { + // this unwrap is safe because we're sure the `SearchQueue` still lives somewhere in actix-web let search_request = search_request.unwrap(); if searches_running < usize::from(parallelism) && queue.is_empty() { searches_running += 1; // if the search requests die it's not a hard error on our side let _ = search_request.send(Permit { sender: sender.clone() }); continue; - } - if queue.len() >= capacity { + } else if capacity == 0 { + // in the very specific case where we have a capacity of zero + // we must refuse the request straight away without going through + // the queue stuff. + drop(search_request); + continue; + + } else if queue.len() >= capacity { let remove = rng.gen_range(0..queue.len()); let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed drop(thing); diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 8dca2ab31..6bb8d2169 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -131,3 +131,40 @@ async fn search_request_crashes_while_holding_permits() { .expect("I should get a permit straight away") .unwrap(); } + +#[actix_rt::test] +async fn works_with_capacity_of_zero() { + let queue = Arc::new(SearchQueue::new(0, NonZeroUsize::new(1).unwrap())); + + // First, use the whole capacity of the + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); + + // then we should get an error if we try to register a second search request. + let permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a result straight away"); + + let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err()); + let http_response = err.error_response(); + snapshot!(format!("{:?}", http_response.headers()), @r###"HeaderMap { inner: {"content-type": Value { inner: ["application/json"] }, "retry-after": Value { inner: ["10"] }} }"###); + + let err = serde_json::to_string_pretty(&err).unwrap(); + snapshot!(err, @r###" + { + "message": "Too many search requests running at the same time: 0. Retry after 10s.", + "code": "too_many_search_requests", + "type": "system", + "link": "https://docs.meilisearch.com/errors#too_many_search_requests" + } + "###); + + drop(permit1); + // After dropping the first permit we should be able to get a new permit + let _permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) + .await + .expect("I should get a permit straight away") + .unwrap(); +} From 8f5d9f501ac265ffdd6b17b477843a78f85d0187 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 19:18:32 +0100 Subject: [PATCH 07/14] update the discussion link --- meilisearch/src/option.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index a9b8578bb..651af7336 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -345,8 +345,7 @@ pub struct Opt { #[serde(default)] pub experimental_enable_metrics: bool, - /// TODO: Update the discussion link - /// Experimental search queue size. For more information, see: + /// Experimental search queue size. For more information, see: /// /// Lets you customize the size of the search queue. Meilisearch processes your search requests as fast as possible but once the /// queue is full it starts returning HTTP 503, Service Unavailable. From 2e36f069c20fdc8da55894a87d2f4c0237308cf2 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 19:23:55 +0100 Subject: [PATCH 08/14] fmt imports --- meilisearch/src/search_queue.rs | 3 ++- meilisearch/tests/search/search_queue.rs | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 0dd2abf17..9c0a6704a 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -1,6 +1,7 @@ use std::num::NonZeroUsize; -use rand::{rngs::StdRng, Rng, SeedableRng}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; use tokio::sync::{mpsc, oneshot}; use crate::error::MeilisearchHttpError; diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 6bb8d2169..717becc3b 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -1,4 +1,6 @@ -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::time::Duration; use actix_web::ResponseError; use meili_snap::snapshot; From 55df9daaa0a4ea515e802f3985f162e2b2b54565 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 19:34:55 +0100 Subject: [PATCH 09/14] adds a comment about the safety of an operation --- meilisearch/src/search_queue.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 9c0a6704a..b661fefb0 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -72,6 +72,7 @@ impl SearchQueue { _ = search_finished.recv() => { searches_running = searches_running.saturating_sub(1); if !queue.is_empty() { + // Can't panic: the queue wasn't empty thus the range isn't empty. let remove = rng.gen_range(0..queue.len()); let channel = queue.swap_remove(remove); let _ = channel.send(Permit { sender: sender.clone() }); From 3a1f458139ee4313d4d015635a0c24874dab3097 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 26 Mar 2024 19:42:10 +0100 Subject: [PATCH 10/14] fix a flaky test --- meilisearch/tests/search/search_queue.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 717becc3b..3b4fbf252 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -85,7 +85,13 @@ async fn refuse_search_requests_when_queue_is_full() { let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err()); let http_response = err.error_response(); - snapshot!(format!("{:?}", http_response.headers()), @r###"HeaderMap { inner: {"retry-after": Value { inner: ["10"] }, "content-type": Value { inner: ["application/json"] }} }"###); + let mut headers: Vec<_> = http_response + .headers() + .iter() + .map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string())) + .collect(); + headers.sort(); + snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###); let err = serde_json::to_string_pretty(&err).unwrap(); snapshot!(err, @r###" @@ -151,7 +157,13 @@ async fn works_with_capacity_of_zero() { let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err()); let http_response = err.error_response(); - snapshot!(format!("{:?}", http_response.headers()), @r###"HeaderMap { inner: {"content-type": Value { inner: ["application/json"] }, "retry-after": Value { inner: ["10"] }} }"###); + let mut headers: Vec<_> = http_response + .headers() + .iter() + .map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string())) + .collect(); + headers.sort(); + snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###); let err = serde_json::to_string_pretty(&err).unwrap(); snapshot!(err, @r###" From 087a96d22e2b8d5e4506acc1f3c605abf4f978cd Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 27 Mar 2024 11:05:37 +0100 Subject: [PATCH 11/14] fix flaky test --- meilisearch/src/search_queue.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index b661fefb0..ad222cdf8 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -47,6 +47,18 @@ impl SearchQueue { loop { tokio::select! { + // biased select because we wants to free up space before trying to register new tasks + biased; + _ = search_finished.recv() => { + searches_running = searches_running.saturating_sub(1); + if !queue.is_empty() { + // Can't panic: the queue wasn't empty thus the range isn't empty. + let remove = rng.gen_range(0..queue.len()); + let channel = queue.swap_remove(remove); + let _ = channel.send(Permit { sender: sender.clone() }); + } + }, + search_request = receive_new_searches.recv() => { // this unwrap is safe because we're sure the `SearchQueue` still lives somewhere in actix-web let search_request = search_request.unwrap(); @@ -69,15 +81,6 @@ impl SearchQueue { } queue.push(search_request); }, - _ = search_finished.recv() => { - searches_running = searches_running.saturating_sub(1); - if !queue.is_empty() { - // Can't panic: the queue wasn't empty thus the range isn't empty. - let remove = rng.gen_range(0..queue.len()); - let channel = queue.swap_remove(remove); - let _ = channel.send(Permit { sender: sender.clone() }); - } - }, } } } From 03c886ac1b58179da966a9d30bc06481552e63ee Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 27 Mar 2024 15:38:36 +0100 Subject: [PATCH 12/14] adds a bit of documentation --- meilisearch/src/search_queue.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index ad222cdf8..c0d7c7706 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -1,3 +1,22 @@ +//! This file implements a queue of searches to process and the ability to control how many searches can be run in parallel. +//! We need this because we don't want to process more search requests than we have cores. +//! That slows down everything and consumes RAM for no reason. +//! The steps to do a search are to get the `SearchQueue` data structure and try to get a search permit. +//! This can fail if the queue is full, and we need to drop your search request to register a new one. +//! +//! ### How to do a search request +//! +//! In order to do a search request you should try to get a search permit. +//! Retrieve the `SearchQueue` structure from actix-web (`search_queue: Data`) +//! and right before processing the search, calls the `SearchQueue::try_get_search_permit` method: `search_queue.try_get_search_permit().await?;` +//! +//! What is going to happen at this point is that you're going to send a oneshot::Sender over an async mpsc channel. +//! Then, the queue/scheduler is going to either: +//! - Drop your oneshot channel => that means there are too many searches going on, and yours won't be executed. +//! You should exit and free all the RAM you use ASAP. +//! - Sends you a Permit => that will unlock the method, and you will be able to process your search. +//! And should drop the Permit only once you have freed all the RAM consumed by the method. + use std::num::NonZeroUsize; use rand::rngs::StdRng; @@ -12,6 +31,8 @@ pub struct SearchQueue { capacity: usize, } +/// You should only run search requests while holding this permit. +/// Once it's dropped, a new search request will be able to process. #[derive(Debug)] pub struct Permit { sender: mpsc::Sender<()>, @@ -34,6 +55,10 @@ impl SearchQueue { Self { sender, capacity } } + /// This function is the main loop, it's in charge on scheduling which search request should execute first and + /// how many should executes at the same time. + /// + /// It **must never** panic or exit. async fn run( capacity: usize, parallelism: NonZeroUsize, @@ -42,7 +67,7 @@ impl SearchQueue { let mut queue: Vec> = Default::default(); let mut rng: StdRng = StdRng::from_entropy(); let mut searches_running: usize = 0; - // by having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap + // By having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap let (sender, mut search_finished) = mpsc::channel(parallelism.into()); loop { @@ -85,6 +110,8 @@ impl SearchQueue { } } + /// Returns a search `Permit`. + /// It should be dropped as soon as you've freed all the RAM associated with the search request being processed. pub async fn try_get_search_permit(&self) -> Result { let (sender, receiver) = oneshot::channel(); self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; From b7c582e4f3a44bf96e1b34b46ab6731cb5d54665 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 27 Mar 2024 15:49:43 +0100 Subject: [PATCH 13/14] connect the search queue with the health route --- meilisearch/src/routes/mod.rs | 3 +++ meilisearch/src/search_queue.rs | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index 1c1465582..7cf886017 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -15,6 +15,7 @@ use tracing::debug; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; +use crate::search_queue::SearchQueue; use crate::Opt; const PAGINATION_DEFAULT_LIMIT: usize = 20; @@ -385,10 +386,12 @@ pub async fn get_health( req: HttpRequest, index_scheduler: Data, auth_controller: Data, + search_queue: Data, analytics: web::Data, ) -> Result { analytics.health_seen(&req); + search_queue.health().unwrap(); index_scheduler.health().unwrap(); auth_controller.health().unwrap(); diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index c0d7c7706..6d5044d20 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -117,4 +117,14 @@ impl SearchQueue { self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) } + + /// Returns `Ok(())` if everything seems normal. + /// Returns `Err(MeilisearchHttpError::SearchLimiterIsDown)` if the search limiter seems down. + pub fn health(&self) -> Result<(), MeilisearchHttpError> { + if self.sender.is_closed() { + Err(MeilisearchHttpError::SearchLimiterIsDown) + } else { + Ok(()) + } + } } From 06a11b5b216692e3a03c2b47aa49f6e531a8d19e Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 27 Mar 2024 17:34:49 +0100 Subject: [PATCH 14/14] Improve error message --- meilisearch/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index bb1156997..5a0b04020 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -31,7 +31,7 @@ pub enum MeilisearchHttpError { MissingPayload(PayloadType), #[error("Too many search requests running at the same time: {0}. Retry after 10s.")] TooManySearchRequests(usize), - #[error("Internal error: Search limiter is down")] + #[error("Internal error: Search limiter is down.")] SearchLimiterIsDown, #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))] PayloadTooLarge(usize),