mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-19 01:18:31 +08:00
Merge #4536
4536: Limit concurrent search requests r=ManyTheFish a=irevoire # Pull Request ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/4489 ## What does this PR do? - Adds a « search queue » that limits the number of search requests we can process at the same time and stores search requests to be processed - Process only one search request per core/thread (we use available_parallelism) - When the search queue is full, new search requests replace old ones **randomly**. The reason is that: - If we serve the oldest one first, like Typesense, we give the worst performances to everyone - If we serve the latest one, it gets too easy to DoS us (you just need to fill the queue with as many search requests as we can process simultaneously to ensure no other request will ever be processed) - By picking the search request randomly, we give a chance to recent search requests to be processed while ensuring that we can't be owned unless they fill our queue entirely and we start returning errors 5xx - Adds an experimental parameter to control the size of the queue - Adds a bunch of tests to ensure the search queue works correctly - Ensure the loop consuming the search queue is running in the health route and crashes if it’s not the case Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
fa9748cc99
@ -2,6 +2,7 @@ use std::{fmt, io};
|
||||
|
||||
use actix_web::http::StatusCode;
|
||||
use actix_web::{self as aweb, HttpResponseBuilder};
|
||||
use aweb::http::header;
|
||||
use aweb::rt::task::JoinError;
|
||||
use convert_case::Casing;
|
||||
use milli::heed::{Error as HeedError, MdbError};
|
||||
@ -56,7 +57,14 @@ where
|
||||
impl aweb::error::ResponseError for ResponseError {
|
||||
fn error_response(&self) -> aweb::HttpResponse {
|
||||
let json = serde_json::to_vec(self).unwrap();
|
||||
HttpResponseBuilder::new(self.status_code()).content_type("application/json").body(json)
|
||||
let mut builder = HttpResponseBuilder::new(self.status_code());
|
||||
builder.content_type("application/json");
|
||||
|
||||
if self.code == StatusCode::SERVICE_UNAVAILABLE {
|
||||
builder.insert_header((header::RETRY_AFTER, "10"));
|
||||
}
|
||||
|
||||
builder.body(json)
|
||||
}
|
||||
|
||||
fn status_code(&self) -> StatusCode {
|
||||
@ -305,6 +313,7 @@ MissingSwapIndexes , InvalidRequest , BAD_REQUEST ;
|
||||
MissingTaskFilters , InvalidRequest , BAD_REQUEST ;
|
||||
NoSpaceLeftOnDevice , System , UNPROCESSABLE_ENTITY;
|
||||
PayloadTooLarge , InvalidRequest , PAYLOAD_TOO_LARGE ;
|
||||
TooManySearchRequests , System , SERVICE_UNAVAILABLE ;
|
||||
TaskNotFound , InvalidRequest , NOT_FOUND ;
|
||||
TooManyOpenFiles , System , UNPROCESSABLE_ENTITY ;
|
||||
TooManyVectors , InvalidRequest , BAD_REQUEST ;
|
||||
|
@ -252,6 +252,7 @@ impl super::Analytics for SegmentAnalytics {
|
||||
struct Infos {
|
||||
env: String,
|
||||
experimental_enable_metrics: bool,
|
||||
experimental_search_queue_size: usize,
|
||||
experimental_logs_mode: LogMode,
|
||||
experimental_replication_parameters: bool,
|
||||
experimental_enable_logs_route: bool,
|
||||
@ -293,6 +294,7 @@ impl From<Opt> for Infos {
|
||||
let Opt {
|
||||
db_path,
|
||||
experimental_enable_metrics,
|
||||
experimental_search_queue_size,
|
||||
experimental_logs_mode,
|
||||
experimental_replication_parameters,
|
||||
experimental_enable_logs_route,
|
||||
@ -342,6 +344,7 @@ impl From<Opt> for Infos {
|
||||
Self {
|
||||
env,
|
||||
experimental_enable_metrics,
|
||||
experimental_search_queue_size,
|
||||
experimental_logs_mode,
|
||||
experimental_replication_parameters,
|
||||
experimental_enable_logs_route,
|
||||
|
@ -29,6 +29,10 @@ pub enum MeilisearchHttpError {
|
||||
InvalidExpression(&'static [&'static str], Value),
|
||||
#[error("A {0} payload is missing.")]
|
||||
MissingPayload(PayloadType),
|
||||
#[error("Too many search requests running at the same time: {0}. Retry after 10s.")]
|
||||
TooManySearchRequests(usize),
|
||||
#[error("Internal error: Search limiter is down.")]
|
||||
SearchLimiterIsDown,
|
||||
#[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))]
|
||||
PayloadTooLarge(usize),
|
||||
#[error("Two indexes must be given for each swap. The list `[{}]` contains {} indexes.",
|
||||
@ -69,6 +73,8 @@ impl ErrorCode for MeilisearchHttpError {
|
||||
MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter,
|
||||
MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter,
|
||||
MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge,
|
||||
MeilisearchHttpError::TooManySearchRequests(_) => Code::TooManySearchRequests,
|
||||
MeilisearchHttpError::SearchLimiterIsDown => Code::Internal,
|
||||
MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes,
|
||||
MeilisearchHttpError::IndexUid(e) => e.error_code(),
|
||||
MeilisearchHttpError::SerdeJson(_) => Code::Internal,
|
||||
|
@ -9,12 +9,14 @@ pub mod middleware;
|
||||
pub mod option;
|
||||
pub mod routes;
|
||||
pub mod search;
|
||||
pub mod search_queue;
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, BufWriter};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::{self, available_parallelism};
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_cors::Cors;
|
||||
@ -38,6 +40,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file};
|
||||
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
|
||||
pub use option::Opt;
|
||||
use option::ScheduleSnapshot;
|
||||
use search_queue::SearchQueue;
|
||||
use tracing::{error, info_span};
|
||||
use tracing_subscriber::filter::Targets;
|
||||
|
||||
@ -469,10 +472,15 @@ pub fn configure_data(
|
||||
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
|
||||
analytics: Arc<dyn Analytics>,
|
||||
) {
|
||||
let search_queue = SearchQueue::new(
|
||||
opt.experimental_search_queue_size,
|
||||
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
|
||||
);
|
||||
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
|
||||
config
|
||||
.app_data(index_scheduler)
|
||||
.app_data(auth)
|
||||
.app_data(web::Data::new(search_queue))
|
||||
.app_data(web::Data::from(analytics))
|
||||
.app_data(web::Data::new(logs_route))
|
||||
.app_data(web::Data::new(logs_stderr))
|
||||
|
@ -54,6 +54,7 @@ const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE";
|
||||
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
|
||||
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
|
||||
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
|
||||
const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE";
|
||||
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
|
||||
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
|
||||
const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str =
|
||||
@ -344,6 +345,15 @@ pub struct Opt {
|
||||
#[serde(default)]
|
||||
pub experimental_enable_metrics: bool,
|
||||
|
||||
/// Experimental search queue size. For more information, see: <https://github.com/orgs/meilisearch/discussions/729>
|
||||
///
|
||||
/// Lets you customize the size of the search queue. Meilisearch processes your search requests as fast as possible but once the
|
||||
/// queue is full it starts returning HTTP 503, Service Unavailable.
|
||||
/// The default value is 1000.
|
||||
#[clap(long, env = MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE, default_value_t = 1000)]
|
||||
#[serde(default)]
|
||||
pub experimental_search_queue_size: usize,
|
||||
|
||||
/// Experimental logs mode feature. For more information, see: <https://github.com/orgs/meilisearch/discussions/723>
|
||||
///
|
||||
/// Change the mode of the logs on the console.
|
||||
@ -473,6 +483,7 @@ impl Opt {
|
||||
#[cfg(feature = "analytics")]
|
||||
no_analytics,
|
||||
experimental_enable_metrics,
|
||||
experimental_search_queue_size,
|
||||
experimental_logs_mode,
|
||||
experimental_enable_logs_route,
|
||||
experimental_replication_parameters,
|
||||
@ -532,6 +543,10 @@ impl Opt {
|
||||
MEILI_EXPERIMENTAL_ENABLE_METRICS,
|
||||
experimental_enable_metrics.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE,
|
||||
experimental_search_queue_size.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_LOGS_MODE,
|
||||
experimental_logs_mode.to_string(),
|
||||
|
@ -17,6 +17,7 @@ use crate::search::{
|
||||
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
|
||||
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET,
|
||||
};
|
||||
use crate::search_queue::SearchQueue;
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(web::resource("").route(web::post().to(search)));
|
||||
@ -48,6 +49,7 @@ pub struct FacetSearchQuery {
|
||||
|
||||
pub async fn search(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
|
||||
search_queue: Data<SearchQueue>,
|
||||
index_uid: web::Path<String>,
|
||||
params: AwebJson<FacetSearchQuery, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
@ -71,6 +73,7 @@ pub async fn search(
|
||||
|
||||
let index = index_scheduler.index(&index_uid)?;
|
||||
let features = index_scheduler.features();
|
||||
let _permit = search_queue.try_get_search_permit().await?;
|
||||
let search_result = tokio::task::spawn_blocking(move || {
|
||||
perform_facet_search(&index, search_query, facet_query, facet_name, features)
|
||||
})
|
||||
|
@ -23,6 +23,7 @@ use crate::search::{
|
||||
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
|
||||
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO,
|
||||
};
|
||||
use crate::search_queue::SearchQueue;
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(
|
||||
@ -182,6 +183,7 @@ fn fix_sort_query_parameters(sort_query: &str) -> Vec<String> {
|
||||
|
||||
pub async fn search_with_url_query(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
|
||||
search_queue: web::Data<SearchQueue>,
|
||||
index_uid: web::Path<String>,
|
||||
params: AwebQueryParameter<SearchQueryGet, DeserrQueryParamError>,
|
||||
req: HttpRequest,
|
||||
@ -204,6 +206,7 @@ pub async fn search_with_url_query(
|
||||
|
||||
let distribution = embed(&mut query, index_scheduler.get_ref(), &index)?;
|
||||
|
||||
let _permit = search_queue.try_get_search_permit().await?;
|
||||
let search_result =
|
||||
tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution))
|
||||
.await?;
|
||||
@ -220,6 +223,7 @@ pub async fn search_with_url_query(
|
||||
|
||||
pub async fn search_with_post(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
|
||||
search_queue: web::Data<SearchQueue>,
|
||||
index_uid: web::Path<String>,
|
||||
params: AwebJson<SearchQuery, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
@ -243,6 +247,7 @@ pub async fn search_with_post(
|
||||
|
||||
let distribution = embed(&mut query, index_scheduler.get_ref(), &index)?;
|
||||
|
||||
let _permit = search_queue.try_get_search_permit().await?;
|
||||
let search_result =
|
||||
tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution))
|
||||
.await?;
|
||||
|
@ -15,6 +15,7 @@ use tracing::debug;
|
||||
use crate::analytics::Analytics;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::search_queue::SearchQueue;
|
||||
use crate::Opt;
|
||||
|
||||
const PAGINATION_DEFAULT_LIMIT: usize = 20;
|
||||
@ -385,10 +386,12 @@ pub async fn get_health(
|
||||
req: HttpRequest,
|
||||
index_scheduler: Data<IndexScheduler>,
|
||||
auth_controller: Data<AuthController>,
|
||||
search_queue: Data<SearchQueue>,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
analytics.health_seen(&req);
|
||||
|
||||
search_queue.health().unwrap();
|
||||
index_scheduler.health().unwrap();
|
||||
auth_controller.health().unwrap();
|
||||
|
||||
|
@ -17,6 +17,7 @@ use crate::routes::indexes::search::embed;
|
||||
use crate::search::{
|
||||
add_search_rules, perform_search, SearchQueryWithIndex, SearchResultWithIndex,
|
||||
};
|
||||
use crate::search_queue::SearchQueue;
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(web::resource("").route(web::post().to(SeqHandler(multi_search_with_post))));
|
||||
@ -35,6 +36,7 @@ pub struct SearchQueries {
|
||||
|
||||
pub async fn multi_search_with_post(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
|
||||
search_queue: Data<SearchQueue>,
|
||||
params: AwebJson<SearchQueries, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
@ -44,6 +46,10 @@ pub async fn multi_search_with_post(
|
||||
let mut multi_aggregate = MultiSearchAggregator::from_queries(&queries, &req);
|
||||
let features = index_scheduler.features();
|
||||
|
||||
// Since we don't want to process half of the search requests and then get a permit refused
|
||||
// we're going to get one permit for the whole duration of the multi-search request.
|
||||
let _permit = search_queue.try_get_search_permit().await?;
|
||||
|
||||
// Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only,
|
||||
// so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code
|
||||
// changes.
|
||||
|
130
meilisearch/src/search_queue.rs
Normal file
130
meilisearch/src/search_queue.rs
Normal file
@ -0,0 +1,130 @@
|
||||
//! This file implements a queue of searches to process and the ability to control how many searches can be run in parallel.
|
||||
//! We need this because we don't want to process more search requests than we have cores.
|
||||
//! That slows down everything and consumes RAM for no reason.
|
||||
//! The steps to do a search are to get the `SearchQueue` data structure and try to get a search permit.
|
||||
//! This can fail if the queue is full, and we need to drop your search request to register a new one.
|
||||
//!
|
||||
//! ### How to do a search request
|
||||
//!
|
||||
//! In order to do a search request you should try to get a search permit.
|
||||
//! Retrieve the `SearchQueue` structure from actix-web (`search_queue: Data<SearchQueue>`)
|
||||
//! and right before processing the search, calls the `SearchQueue::try_get_search_permit` method: `search_queue.try_get_search_permit().await?;`
|
||||
//!
|
||||
//! What is going to happen at this point is that you're going to send a oneshot::Sender over an async mpsc channel.
|
||||
//! Then, the queue/scheduler is going to either:
|
||||
//! - Drop your oneshot channel => that means there are too many searches going on, and yours won't be executed.
|
||||
//! You should exit and free all the RAM you use ASAP.
|
||||
//! - Sends you a Permit => that will unlock the method, and you will be able to process your search.
|
||||
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
|
||||
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use crate::error::MeilisearchHttpError;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SearchQueue {
|
||||
sender: mpsc::Sender<oneshot::Sender<Permit>>,
|
||||
capacity: usize,
|
||||
}
|
||||
|
||||
/// You should only run search requests while holding this permit.
|
||||
/// Once it's dropped, a new search request will be able to process.
|
||||
#[derive(Debug)]
|
||||
pub struct Permit {
|
||||
sender: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
impl Drop for Permit {
|
||||
fn drop(&mut self) {
|
||||
// if the channel is closed then the whole instance is down
|
||||
let _ = futures::executor::block_on(self.sender.send(()));
|
||||
}
|
||||
}
|
||||
|
||||
impl SearchQueue {
|
||||
pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self {
|
||||
// Search requests are going to wait until we're available anyway,
|
||||
// so let's not allocate any RAM and keep a capacity of 1.
|
||||
let (sender, receiver) = mpsc::channel(1);
|
||||
|
||||
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
|
||||
Self { sender, capacity }
|
||||
}
|
||||
|
||||
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
|
||||
/// how many should executes at the same time.
|
||||
///
|
||||
/// It **must never** panic or exit.
|
||||
async fn run(
|
||||
capacity: usize,
|
||||
parallelism: NonZeroUsize,
|
||||
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
|
||||
) {
|
||||
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
|
||||
let mut rng: StdRng = StdRng::from_entropy();
|
||||
let mut searches_running: usize = 0;
|
||||
// By having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap
|
||||
let (sender, mut search_finished) = mpsc::channel(parallelism.into());
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// biased select because we wants to free up space before trying to register new tasks
|
||||
biased;
|
||||
_ = search_finished.recv() => {
|
||||
searches_running = searches_running.saturating_sub(1);
|
||||
if !queue.is_empty() {
|
||||
// Can't panic: the queue wasn't empty thus the range isn't empty.
|
||||
let remove = rng.gen_range(0..queue.len());
|
||||
let channel = queue.swap_remove(remove);
|
||||
let _ = channel.send(Permit { sender: sender.clone() });
|
||||
}
|
||||
},
|
||||
|
||||
search_request = receive_new_searches.recv() => {
|
||||
// this unwrap is safe because we're sure the `SearchQueue` still lives somewhere in actix-web
|
||||
let search_request = search_request.unwrap();
|
||||
if searches_running < usize::from(parallelism) && queue.is_empty() {
|
||||
searches_running += 1;
|
||||
// if the search requests die it's not a hard error on our side
|
||||
let _ = search_request.send(Permit { sender: sender.clone() });
|
||||
continue;
|
||||
} else if capacity == 0 {
|
||||
// in the very specific case where we have a capacity of zero
|
||||
// we must refuse the request straight away without going through
|
||||
// the queue stuff.
|
||||
drop(search_request);
|
||||
continue;
|
||||
|
||||
} else if queue.len() >= capacity {
|
||||
let remove = rng.gen_range(0..queue.len());
|
||||
let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed
|
||||
drop(thing);
|
||||
}
|
||||
queue.push(search_request);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a search `Permit`.
|
||||
/// It should be dropped as soon as you've freed all the RAM associated with the search request being processed.
|
||||
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
|
||||
receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))
|
||||
}
|
||||
|
||||
/// Returns `Ok(())` if everything seems normal.
|
||||
/// Returns `Err(MeilisearchHttpError::SearchLimiterIsDown)` if the search limiter seems down.
|
||||
pub fn health(&self) -> Result<(), MeilisearchHttpError> {
|
||||
if self.sender.is_closed() {
|
||||
Err(MeilisearchHttpError::SearchLimiterIsDown)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
@ -10,6 +10,7 @@ mod hybrid;
|
||||
mod multi;
|
||||
mod pagination;
|
||||
mod restrict_searchable;
|
||||
mod search_queue;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
|
184
meilisearch/tests/search/search_queue.rs
Normal file
184
meilisearch/tests/search/search_queue.rs
Normal file
@ -0,0 +1,184 @@
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::ResponseError;
|
||||
use meili_snap::snapshot;
|
||||
use meilisearch::search_queue::SearchQueue;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn search_queue_register() {
|
||||
let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());
|
||||
|
||||
// First, use all the cores
|
||||
let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
|
||||
// If we free one spot we should be able to register one new search
|
||||
drop(permit1);
|
||||
|
||||
let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
|
||||
// And again
|
||||
drop(permit3);
|
||||
|
||||
let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn wait_till_cores_are_available() {
|
||||
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));
|
||||
|
||||
// First, use all the cores
|
||||
let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
|
||||
let ret = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()).await;
|
||||
assert!(ret.is_err(), "The capacity is full, we should not get a permit");
|
||||
|
||||
let q = queue.clone();
|
||||
let task = tokio::task::spawn(async move { q.try_get_search_permit().await });
|
||||
|
||||
// after dropping a permit the previous task should be able to finish
|
||||
drop(permit1);
|
||||
let _permit2 = tokio::time::timeout(Duration::from_secs(1), task)
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn refuse_search_requests_when_queue_is_full() {
|
||||
let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap()));
|
||||
|
||||
// First, use the whole capacity of the
|
||||
let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
|
||||
let q = queue.clone();
|
||||
let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await });
|
||||
|
||||
// Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out
|
||||
let q = queue.clone();
|
||||
let _permit3 = tokio::task::spawn(async move { q.try_get_search_permit().await });
|
||||
|
||||
let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2)
|
||||
.await
|
||||
.expect("I should get a result straight away")
|
||||
.unwrap(); // task should end successfully
|
||||
|
||||
let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err());
|
||||
let http_response = err.error_response();
|
||||
let mut headers: Vec<_> = http_response
|
||||
.headers()
|
||||
.iter()
|
||||
.map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string()))
|
||||
.collect();
|
||||
headers.sort();
|
||||
snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###);
|
||||
|
||||
let err = serde_json::to_string_pretty(&err).unwrap();
|
||||
snapshot!(err, @r###"
|
||||
{
|
||||
"message": "Too many search requests running at the same time: 1. Retry after 10s.",
|
||||
"code": "too_many_search_requests",
|
||||
"type": "system",
|
||||
"link": "https://docs.meilisearch.com/errors#too_many_search_requests"
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn search_request_crashes_while_holding_permits() {
|
||||
let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap()));
|
||||
|
||||
let (send, recv) = tokio::sync::oneshot::channel();
|
||||
|
||||
// This first request take a cpu
|
||||
let q = queue.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let _permit = q.try_get_search_permit().await.unwrap();
|
||||
recv.await.unwrap();
|
||||
panic!("oops an unexpected crash happened")
|
||||
});
|
||||
|
||||
// This second request waits in the queue till the first request finishes
|
||||
let q = queue.clone();
|
||||
let task = tokio::task::spawn(async move {
|
||||
let _permit = q.try_get_search_permit().await.unwrap();
|
||||
});
|
||||
|
||||
// By sending something in the channel the request holding a CPU will panic and should lose its permit
|
||||
send.send(()).unwrap();
|
||||
|
||||
// Then the second request should be able to process and finishes correctly without panic
|
||||
tokio::time::timeout(Duration::from_secs(1), task)
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
|
||||
// I should even be able to take second permit here
|
||||
let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn works_with_capacity_of_zero() {
|
||||
let queue = Arc::new(SearchQueue::new(0, NonZeroUsize::new(1).unwrap()));
|
||||
|
||||
// First, use the whole capacity of the
|
||||
let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
|
||||
// then we should get an error if we try to register a second search request.
|
||||
let permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a result straight away");
|
||||
|
||||
let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err());
|
||||
let http_response = err.error_response();
|
||||
let mut headers: Vec<_> = http_response
|
||||
.headers()
|
||||
.iter()
|
||||
.map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string()))
|
||||
.collect();
|
||||
headers.sort();
|
||||
snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###);
|
||||
|
||||
let err = serde_json::to_string_pretty(&err).unwrap();
|
||||
snapshot!(err, @r###"
|
||||
{
|
||||
"message": "Too many search requests running at the same time: 0. Retry after 10s.",
|
||||
"code": "too_many_search_requests",
|
||||
"type": "system",
|
||||
"link": "https://docs.meilisearch.com/errors#too_many_search_requests"
|
||||
}
|
||||
"###);
|
||||
|
||||
drop(permit1);
|
||||
// After dropping the first permit we should be able to get a new permit
|
||||
let _permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
|
||||
.await
|
||||
.expect("I should get a permit straight away")
|
||||
.unwrap();
|
||||
}
|
Loading…
Reference in New Issue
Block a user