diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index bfe25b5a7..bb7562c85 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -13,9 +13,10 @@ pub mod search_queue; use std::fs::File; use std::io::{BufReader, BufWriter}; +use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; -use std::thread; +use std::thread::{self, available_parallelism}; use std::time::Duration; use actix_cors::Cors; @@ -39,6 +40,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; +use search_queue::SearchQueue; use tracing::{error, info_span}; use tracing_subscriber::filter::Targets; @@ -470,10 +472,15 @@ pub fn configure_data( (logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle), analytics: Arc, ) { + let search_queue = SearchQueue::new( + opt.experimental_search_queue_size, + available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()), + ); let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize; config .app_data(index_scheduler) .app_data(auth) + .app_data(web::Data::new(search_queue)) .app_data(web::Data::from(analytics)) .app_data(web::Data::new(logs_route)) .app_data(web::Data::new(logs_stderr)) diff --git a/meilisearch/src/routes/indexes/facet_search.rs b/meilisearch/src/routes/indexes/facet_search.rs index a980fb278..272b8156f 100644 --- a/meilisearch/src/routes/indexes/facet_search.rs +++ b/meilisearch/src/routes/indexes/facet_search.rs @@ -17,6 +17,7 @@ use crate::search::{ DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(search))); @@ -48,6 +49,7 @@ pub struct FacetSearchQuery { pub async fn search( index_scheduler: GuardedData, Data>, + search_queue: Data, index_uid: web::Path, params: AwebJson, req: HttpRequest, @@ -71,6 +73,7 @@ pub async fn search( let index = index_scheduler.index(&index_uid)?; let features = index_scheduler.features(); + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_facet_search(&index, search_query, facet_query, facet_name, features) }) diff --git a/meilisearch/src/routes/indexes/search.rs b/meilisearch/src/routes/indexes/search.rs index 6a430b6a3..880786138 100644 --- a/meilisearch/src/routes/indexes/search.rs +++ b/meilisearch/src/routes/indexes/search.rs @@ -23,6 +23,7 @@ use crate::search::{ DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( @@ -182,6 +183,7 @@ fn fix_sort_query_parameters(sort_query: &str) -> Vec { pub async fn search_with_url_query( index_scheduler: GuardedData, Data>, + search_queue: web::Data, index_uid: web::Path, params: AwebQueryParameter, req: HttpRequest, @@ -204,6 +206,7 @@ pub async fn search_with_url_query( let distribution = embed(&mut query, index_scheduler.get_ref(), &index).await?; + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution)) .await?; @@ -220,6 +223,7 @@ pub async fn search_with_url_query( pub async fn search_with_post( index_scheduler: GuardedData, Data>, + search_queue: web::Data, index_uid: web::Path, params: AwebJson, req: HttpRequest, @@ -243,6 +247,7 @@ pub async fn search_with_post( let distribution = embed(&mut query, index_scheduler.get_ref(), &index).await?; + let _permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution)) .await?; diff --git a/meilisearch/src/routes/multi_search.rs b/meilisearch/src/routes/multi_search.rs index 86aa58e70..55f633e97 100644 --- a/meilisearch/src/routes/multi_search.rs +++ b/meilisearch/src/routes/multi_search.rs @@ -17,6 +17,7 @@ use crate::routes::indexes::search::embed; use crate::search::{ add_search_rules, perform_search, SearchQueryWithIndex, SearchResultWithIndex, }; +use crate::search_queue::SearchQueue; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(multi_search_with_post)))); @@ -35,6 +36,7 @@ pub struct SearchQueries { pub async fn multi_search_with_post( index_scheduler: GuardedData, Data>, + search_queue: Data, params: AwebJson, req: HttpRequest, analytics: web::Data, @@ -44,6 +46,10 @@ pub async fn multi_search_with_post( let mut multi_aggregate = MultiSearchAggregator::from_queries(&queries, &req); let features = index_scheduler.features(); + // Since we don't want to process half of the search requests and then get a permit refused + // we're going to get one permit for the whole duration of the multi-search request. + let _permit = search_queue.try_get_search_permit().await?; + // Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only, // so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code // changes. diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 1f3cda1a2..570394e34 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{num::NonZeroUsize, time::Duration}; use rand::{rngs::StdRng, Rng, SeedableRng}; use tokio::sync::{mpsc, oneshot}; @@ -24,7 +24,7 @@ impl Drop for Permit { } impl SearchQueue { - pub fn new(capacity: usize, paralellism: usize) -> Self { + pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self { // We can make the search requests wait until we're available. // they're going to wait anyway right after, so let's not allocate any // RAM by keeping a capacity of 1. @@ -35,21 +35,21 @@ impl SearchQueue { async fn run( capacity: usize, - parallelism: usize, + parallelism: NonZeroUsize, mut receive_new_searches: mpsc::Receiver>, ) { let mut queue: Vec> = Default::default(); let mut rng: StdRng = StdRng::from_entropy(); - let mut searches_running = 0; + let mut searches_running: usize = 0; // by having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap - let (sender, mut search_finished) = mpsc::channel(parallelism); + let (sender, mut search_finished) = mpsc::channel(parallelism.into()); loop { tokio::select! { search_request = receive_new_searches.recv() => { let search_request = search_request.unwrap(); println!("queue contains {} elements and already running {}", queue.len(), searches_running); - if searches_running < parallelism && queue.is_empty() { + if searches_running < usize::from(parallelism) && queue.is_empty() { println!("We can process the search straight away"); searches_running += 1; // if the search requests die it's not a hard error on our side @@ -78,7 +78,7 @@ impl SearchQueue { } } - pub async fn register_search(&self) -> Result { + pub async fn try_get_search_permit(&self) -> Result { let (sender, receiver) = oneshot::channel(); self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; receiver.await.map_err(|_| { diff --git a/meilisearch/tests/search/search_queue.rs b/meilisearch/tests/search/search_queue.rs index 078e8c152..15e62ab6d 100644 --- a/meilisearch/tests/search/search_queue.rs +++ b/meilisearch/tests/search/search_queue.rs @@ -1,18 +1,18 @@ -use std::{sync::Arc, time::Duration}; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use meili_snap::snapshot; use meilisearch::search_queue::SearchQueue; #[actix_rt::test] async fn search_queue_register() { - let queue = SearchQueue::new(4, 2); + let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap()); // First, use all the cores - let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); - let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); @@ -20,7 +20,7 @@ async fn search_queue_register() { // If we free one spot we should be able to register one new search drop(permit1); - let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); @@ -28,7 +28,7 @@ async fn search_queue_register() { // And again drop(permit3); - let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); @@ -36,19 +36,19 @@ async fn search_queue_register() { #[actix_rt::test] async fn search_queue_wait_till_cores_available() { - let queue = Arc::new(SearchQueue::new(4, 1)); + let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); // First, use all the cores - let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); - let ret = tokio::time::timeout(Duration::from_secs(1), queue.register_search()).await; + let ret = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()).await; assert!(ret.is_err(), "The capacity is full, we should not get a permit"); let q = queue.clone(); - let task = tokio::task::spawn(async move { q.register_search().await }); + let task = tokio::task::spawn(async move { q.try_get_search_permit().await }); // after dropping a permit the previous task should be able to finish drop(permit1); @@ -60,20 +60,20 @@ async fn search_queue_wait_till_cores_available() { #[actix_rt::test] async fn search_queue_refuse_search_requests() { - let queue = Arc::new(SearchQueue::new(1, 1)); + let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap())); // First, use the whole capacity of the - let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap(); let q = queue.clone(); - let permit2 = tokio::task::spawn(async move { q.register_search().await }); + let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await }); // Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out let q = queue.clone(); - let _permit3 = tokio::task::spawn(async move { q.register_search().await }); + let _permit3 = tokio::task::spawn(async move { q.try_get_search_permit().await }); let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2) .await @@ -85,14 +85,14 @@ async fn search_queue_refuse_search_requests() { #[actix_rt::test] async fn search_request_crashes_while_holding_permits() { - let queue = Arc::new(SearchQueue::new(1, 1)); + let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap())); let (send, recv) = tokio::sync::oneshot::channel(); // This first request take a cpu let q = queue.clone(); tokio::task::spawn(async move { - let _permit = q.register_search().await.unwrap(); + let _permit = q.try_get_search_permit().await.unwrap(); recv.await.unwrap(); panic!("oops an unexpected crash happened") }); @@ -100,7 +100,7 @@ async fn search_request_crashes_while_holding_permits() { // This second request waits in the queue till the first request finishes let q = queue.clone(); let task = tokio::task::spawn(async move { - let _permit = q.register_search().await.unwrap(); + let _permit = q.try_get_search_permit().await.unwrap(); }); // By sending something in the channel the request holding a CPU will panic and should lose its permit @@ -113,7 +113,7 @@ async fn search_request_crashes_while_holding_permits() { .unwrap(); // I should even be able to take second permit here - let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search()) + let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()) .await .expect("I should get a permit straight away") .unwrap();