rename the method to get a permit and use it in all search requests

This commit is contained in:
Tamo 2024-03-26 17:28:03 +01:00
parent 3f23fbb46d
commit e433fd53e6
6 changed files with 47 additions and 26 deletions

View File

@ -13,9 +13,10 @@ pub mod search_queue;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::Arc;
use std::thread;
use std::thread::{self, available_parallelism};
use std::time::Duration;
use actix_cors::Cors;
@ -39,6 +40,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file};
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
pub use option::Opt;
use option::ScheduleSnapshot;
use search_queue::SearchQueue;
use tracing::{error, info_span};
use tracing_subscriber::filter::Targets;
@ -470,10 +472,15 @@ pub fn configure_data(
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
) {
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
config
.app_data(index_scheduler)
.app_data(auth)
.app_data(web::Data::new(search_queue))
.app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs_route))
.app_data(web::Data::new(logs_stderr))

View File

@ -17,6 +17,7 @@ use crate::search::{
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET,
};
use crate::search_queue::SearchQueue;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(search)));
@ -48,6 +49,7 @@ pub struct FacetSearchQuery {
pub async fn search(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: Data<SearchQueue>,
index_uid: web::Path<String>,
params: AwebJson<FacetSearchQuery, DeserrJsonError>,
req: HttpRequest,
@ -71,6 +73,7 @@ pub async fn search(
let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features();
let _permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search(&index, search_query, facet_query, facet_name, features)
})

View File

@ -23,6 +23,7 @@ use crate::search::{
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO,
};
use crate::search_queue::SearchQueue;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
@ -182,6 +183,7 @@ fn fix_sort_query_parameters(sort_query: &str) -> Vec<String> {
pub async fn search_with_url_query(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: web::Data<SearchQueue>,
index_uid: web::Path<String>,
params: AwebQueryParameter<SearchQueryGet, DeserrQueryParamError>,
req: HttpRequest,
@ -204,6 +206,7 @@ pub async fn search_with_url_query(
let distribution = embed(&mut query, index_scheduler.get_ref(), &index).await?;
let _permit = search_queue.try_get_search_permit().await?;
let search_result =
tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution))
.await?;
@ -220,6 +223,7 @@ pub async fn search_with_url_query(
pub async fn search_with_post(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: web::Data<SearchQueue>,
index_uid: web::Path<String>,
params: AwebJson<SearchQuery, DeserrJsonError>,
req: HttpRequest,
@ -243,6 +247,7 @@ pub async fn search_with_post(
let distribution = embed(&mut query, index_scheduler.get_ref(), &index).await?;
let _permit = search_queue.try_get_search_permit().await?;
let search_result =
tokio::task::spawn_blocking(move || perform_search(&index, query, features, distribution))
.await?;

View File

@ -17,6 +17,7 @@ use crate::routes::indexes::search::embed;
use crate::search::{
add_search_rules, perform_search, SearchQueryWithIndex, SearchResultWithIndex,
};
use crate::search_queue::SearchQueue;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(multi_search_with_post))));
@ -35,6 +36,7 @@ pub struct SearchQueries {
pub async fn multi_search_with_post(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: Data<SearchQueue>,
params: AwebJson<SearchQueries, DeserrJsonError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
@ -44,6 +46,10 @@ pub async fn multi_search_with_post(
let mut multi_aggregate = MultiSearchAggregator::from_queries(&queries, &req);
let features = index_scheduler.features();
// Since we don't want to process half of the search requests and then get a permit refused
// we're going to get one permit for the whole duration of the multi-search request.
let _permit = search_queue.try_get_search_permit().await?;
// Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only,
// so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code
// changes.

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{num::NonZeroUsize, time::Duration};
use rand::{rngs::StdRng, Rng, SeedableRng};
use tokio::sync::{mpsc, oneshot};
@ -24,7 +24,7 @@ impl Drop for Permit {
}
impl SearchQueue {
pub fn new(capacity: usize, paralellism: usize) -> Self {
pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self {
// We can make the search requests wait until we're available.
// they're going to wait anyway right after, so let's not allocate any
// RAM by keeping a capacity of 1.
@ -35,21 +35,21 @@ impl SearchQueue {
async fn run(
capacity: usize,
parallelism: usize,
parallelism: NonZeroUsize,
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
) {
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
let mut rng: StdRng = StdRng::from_entropy();
let mut searches_running = 0;
let mut searches_running: usize = 0;
// by having a capacity of parallelism we ensures that every time a search finish it can release its RAM asap
let (sender, mut search_finished) = mpsc::channel(parallelism);
let (sender, mut search_finished) = mpsc::channel(parallelism.into());
loop {
tokio::select! {
search_request = receive_new_searches.recv() => {
let search_request = search_request.unwrap();
println!("queue contains {} elements and already running {}", queue.len(), searches_running);
if searches_running < parallelism && queue.is_empty() {
if searches_running < usize::from(parallelism) && queue.is_empty() {
println!("We can process the search straight away");
searches_running += 1;
// if the search requests die it's not a hard error on our side
@ -78,7 +78,7 @@ impl SearchQueue {
}
}
pub async fn register_search(&self) -> Result<Permit, MeilisearchHttpError> {
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
let (sender, receiver) = oneshot::channel();
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
receiver.await.map_err(|_| {

View File

@ -1,18 +1,18 @@
use std::{sync::Arc, time::Duration};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use meili_snap::snapshot;
use meilisearch::search_queue::SearchQueue;
#[actix_rt::test]
async fn search_queue_register() {
let queue = SearchQueue::new(4, 2);
let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());
// First, use all the cores
let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search())
let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
.await
.expect("I should get a permit straight away")
.unwrap();
let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.register_search())
let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
.await
.expect("I should get a permit straight away")
.unwrap();
@ -20,7 +20,7 @@ async fn search_queue_register() {
// If we free one spot we should be able to register one new search
drop(permit1);
let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.register_search())
let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
.await
.expect("I should get a permit straight away")
.unwrap();
@ -28,7 +28,7 @@ async fn search_queue_register() {
// And again
drop(permit3);
let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.register_search())
let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
.await
.expect("I should get a permit straight away")
.unwrap();
@ -36,19 +36,19 @@ async fn search_queue_register() {
#[actix_rt::test]
async fn search_queue_wait_till_cores_available() {
let queue = Arc::new(SearchQueue::new(4, 1));
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));
// First, use all the cores
let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search())
let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
.await
.expect("I should get a permit straight away")
.unwrap();
let ret = tokio::time::timeout(Duration::from_secs(1), queue.register_search()).await;
let ret = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()).await;
assert!(ret.is_err(), "The capacity is full, we should not get a permit");
let q = queue.clone();
let task = tokio::task::spawn(async move { q.register_search().await });
let task = tokio::task::spawn(async move { q.try_get_search_permit().await });
// after dropping a permit the previous task should be able to finish
drop(permit1);
@ -60,20 +60,20 @@ async fn search_queue_wait_till_cores_available() {
#[actix_rt::test]
async fn search_queue_refuse_search_requests() {
let queue = Arc::new(SearchQueue::new(1, 1));
let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap()));
// First, use the whole capacity of the
let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search())
let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
.await
.expect("I should get a permit straight away")
.unwrap();
let q = queue.clone();
let permit2 = tokio::task::spawn(async move { q.register_search().await });
let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await });
// Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out
let q = queue.clone();
let _permit3 = tokio::task::spawn(async move { q.register_search().await });
let _permit3 = tokio::task::spawn(async move { q.try_get_search_permit().await });
let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2)
.await
@ -85,14 +85,14 @@ async fn search_queue_refuse_search_requests() {
#[actix_rt::test]
async fn search_request_crashes_while_holding_permits() {
let queue = Arc::new(SearchQueue::new(1, 1));
let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap()));
let (send, recv) = tokio::sync::oneshot::channel();
// This first request take a cpu
let q = queue.clone();
tokio::task::spawn(async move {
let _permit = q.register_search().await.unwrap();
let _permit = q.try_get_search_permit().await.unwrap();
recv.await.unwrap();
panic!("oops an unexpected crash happened")
});
@ -100,7 +100,7 @@ async fn search_request_crashes_while_holding_permits() {
// This second request waits in the queue till the first request finishes
let q = queue.clone();
let task = tokio::task::spawn(async move {
let _permit = q.register_search().await.unwrap();
let _permit = q.try_get_search_permit().await.unwrap();
});
// By sending something in the channel the request holding a CPU will panic and should lose its permit
@ -113,7 +113,7 @@ async fn search_request_crashes_while_holding_permits() {
.unwrap();
// I should even be able to take second permit here
let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.register_search())
let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
.await
.expect("I should get a permit straight away")
.unwrap();