diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index fe0d75dae..f16097da1 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -2,6 +2,7 @@ use std::{fmt, io}; use actix_web::http::StatusCode; use actix_web::{self as aweb, HttpResponseBuilder}; +use aweb::http::header; use aweb::rt::task::JoinError; use convert_case::Casing; use milli::heed::{Error as HeedError, MdbError}; @@ -56,7 +57,14 @@ where impl aweb::error::ResponseError for ResponseError { fn error_response(&self) -> aweb::HttpResponse { let json = serde_json::to_vec(self).unwrap(); - HttpResponseBuilder::new(self.status_code()).content_type("application/json").body(json) + let mut builder = HttpResponseBuilder::new(self.status_code()); + builder.content_type("application/json"); + + if self.code == StatusCode::SERVICE_UNAVAILABLE { + builder.insert_header((header::RETRY_AFTER, "10")); + } + + builder.body(json) } fn status_code(&self) -> StatusCode { diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index 48c44c12d..bb1156997 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -29,8 +29,8 @@ pub enum MeilisearchHttpError { InvalidExpression(&'static [&'static str], Value), #[error("A {0} payload is missing.")] MissingPayload(PayloadType), - #[error("Too many search requests running at the same time: {0}. Retry after {1:?}.")] - TooManySearchRequests(usize, std::time::Duration), + #[error("Too many search requests running at the same time: {0}. Retry after 10s.")] + TooManySearchRequests(usize), #[error("Internal error: Search limiter is down")] SearchLimiterIsDown, #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))] @@ -73,7 +73,7 @@ impl ErrorCode for MeilisearchHttpError { MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter, MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter, MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge, - MeilisearchHttpError::TooManySearchRequests(_, _) => Code::TooManySearchRequests, + MeilisearchHttpError::TooManySearchRequests(_) => Code::TooManySearchRequests, MeilisearchHttpError::SearchLimiterIsDown => Code::Internal, MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes, MeilisearchHttpError::IndexUid(e) => e.error_code(), diff --git a/meilisearch/src/search_queue.rs b/meilisearch/src/search_queue.rs index 570394e34..b677f81a4 100644 --- a/meilisearch/src/search_queue.rs +++ b/meilisearch/src/search_queue.rs @@ -1,4 +1,4 @@ -use std::{num::NonZeroUsize, time::Duration}; +use std::num::NonZeroUsize; use rand::{rngs::StdRng, Rng, SeedableRng}; use tokio::sync::{mpsc, oneshot}; @@ -25,10 +25,10 @@ impl Drop for Permit { impl SearchQueue { pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self { - // We can make the search requests wait until we're available. - // they're going to wait anyway right after, so let's not allocate any - // RAM by keeping a capacity of 1. + // Search requests are going to wait until we're available anyway, + // so let's not allocate any RAM and keep a capacity of 1. let (sender, receiver) = mpsc::channel(1); + tokio::task::spawn(Self::run(capacity, paralellism, receiver)); Self { sender, capacity } } @@ -48,27 +48,22 @@ impl SearchQueue { tokio::select! { search_request = receive_new_searches.recv() => { let search_request = search_request.unwrap(); - println!("queue contains {} elements and already running {}", queue.len(), searches_running); if searches_running < usize::from(parallelism) && queue.is_empty() { - println!("We can process the search straight away"); searches_running += 1; // if the search requests die it's not a hard error on our side let _ = search_request.send(Permit { sender: sender.clone() }); continue; } if queue.len() >= capacity { - println!("we're above capacity, dropping a random request"); let remove = rng.gen_range(0..queue.len()); let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed drop(thing); } - println!("pushed a new search request to the queue {}", queue.len()); queue.push(search_request); }, _ = search_finished.recv() => { searches_running = searches_running.saturating_sub(1); if !queue.is_empty() { - println!("processed an element in the queue"); let remove = rng.gen_range(0..queue.len()); let channel = queue.swap_remove(remove); let _ = channel.send(Permit { sender: sender.clone() }); @@ -81,8 +76,6 @@ impl SearchQueue { pub async fn try_get_search_permit(&self) -> Result { let (sender, receiver) = oneshot::channel(); self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; - receiver.await.map_err(|_| { - MeilisearchHttpError::TooManySearchRequests(self.capacity, Duration::from_secs(10)) - }) + receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) } }