simplify and improve the http error

This commit is contained in:
Tamo 2024-03-26 17:53:37 +01:00
parent e433fd53e6
commit e2a1bbae37
3 changed files with 17 additions and 16 deletions

View File

@ -2,6 +2,7 @@ use std::{fmt, io};
use actix_web::http::StatusCode; use actix_web::http::StatusCode;
use actix_web::{self as aweb, HttpResponseBuilder}; use actix_web::{self as aweb, HttpResponseBuilder};
use aweb::http::header;
use aweb::rt::task::JoinError; use aweb::rt::task::JoinError;
use convert_case::Casing; use convert_case::Casing;
use milli::heed::{Error as HeedError, MdbError}; use milli::heed::{Error as HeedError, MdbError};
@ -56,7 +57,14 @@ where
impl aweb::error::ResponseError for ResponseError { impl aweb::error::ResponseError for ResponseError {
fn error_response(&self) -> aweb::HttpResponse { fn error_response(&self) -> aweb::HttpResponse {
let json = serde_json::to_vec(self).unwrap(); let json = serde_json::to_vec(self).unwrap();
HttpResponseBuilder::new(self.status_code()).content_type("application/json").body(json) let mut builder = HttpResponseBuilder::new(self.status_code());
builder.content_type("application/json");
if self.code == StatusCode::SERVICE_UNAVAILABLE {
builder.insert_header((header::RETRY_AFTER, "10"));
}
builder.body(json)
} }
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {

View File

@ -29,8 +29,8 @@ pub enum MeilisearchHttpError {
InvalidExpression(&'static [&'static str], Value), InvalidExpression(&'static [&'static str], Value),
#[error("A {0} payload is missing.")] #[error("A {0} payload is missing.")]
MissingPayload(PayloadType), MissingPayload(PayloadType),
#[error("Too many search requests running at the same time: {0}. Retry after {1:?}.")] #[error("Too many search requests running at the same time: {0}. Retry after 10s.")]
TooManySearchRequests(usize, std::time::Duration), TooManySearchRequests(usize),
#[error("Internal error: Search limiter is down")] #[error("Internal error: Search limiter is down")]
SearchLimiterIsDown, SearchLimiterIsDown,
#[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))] #[error("The provided payload reached the size limit. The maximum accepted payload size is {}.", Byte::from_bytes(*.0 as u64).get_appropriate_unit(true))]
@ -73,7 +73,7 @@ impl ErrorCode for MeilisearchHttpError {
MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter, MeilisearchHttpError::EmptyFilter => Code::InvalidDocumentFilter,
MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter, MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter,
MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge, MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge,
MeilisearchHttpError::TooManySearchRequests(_, _) => Code::TooManySearchRequests, MeilisearchHttpError::TooManySearchRequests(_) => Code::TooManySearchRequests,
MeilisearchHttpError::SearchLimiterIsDown => Code::Internal, MeilisearchHttpError::SearchLimiterIsDown => Code::Internal,
MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes, MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes,
MeilisearchHttpError::IndexUid(e) => e.error_code(), MeilisearchHttpError::IndexUid(e) => e.error_code(),

View File

@ -1,4 +1,4 @@
use std::{num::NonZeroUsize, time::Duration}; use std::num::NonZeroUsize;
use rand::{rngs::StdRng, Rng, SeedableRng}; use rand::{rngs::StdRng, Rng, SeedableRng};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@ -25,10 +25,10 @@ impl Drop for Permit {
impl SearchQueue { impl SearchQueue {
pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self { pub fn new(capacity: usize, paralellism: NonZeroUsize) -> Self {
// We can make the search requests wait until we're available. // Search requests are going to wait until we're available anyway,
// they're going to wait anyway right after, so let's not allocate any // so let's not allocate any RAM and keep a capacity of 1.
// RAM by keeping a capacity of 1.
let (sender, receiver) = mpsc::channel(1); let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver)); tokio::task::spawn(Self::run(capacity, paralellism, receiver));
Self { sender, capacity } Self { sender, capacity }
} }
@ -48,27 +48,22 @@ impl SearchQueue {
tokio::select! { tokio::select! {
search_request = receive_new_searches.recv() => { search_request = receive_new_searches.recv() => {
let search_request = search_request.unwrap(); let search_request = search_request.unwrap();
println!("queue contains {} elements and already running {}", queue.len(), searches_running);
if searches_running < usize::from(parallelism) && queue.is_empty() { if searches_running < usize::from(parallelism) && queue.is_empty() {
println!("We can process the search straight away");
searches_running += 1; searches_running += 1;
// if the search requests die it's not a hard error on our side // if the search requests die it's not a hard error on our side
let _ = search_request.send(Permit { sender: sender.clone() }); let _ = search_request.send(Permit { sender: sender.clone() });
continue; continue;
} }
if queue.len() >= capacity { if queue.len() >= capacity {
println!("we're above capacity, dropping a random request");
let remove = rng.gen_range(0..queue.len()); let remove = rng.gen_range(0..queue.len());
let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed let thing = queue.swap_remove(remove); // this will drop the channel and notify the search that it won't be processed
drop(thing); drop(thing);
} }
println!("pushed a new search request to the queue {}", queue.len());
queue.push(search_request); queue.push(search_request);
}, },
_ = search_finished.recv() => { _ = search_finished.recv() => {
searches_running = searches_running.saturating_sub(1); searches_running = searches_running.saturating_sub(1);
if !queue.is_empty() { if !queue.is_empty() {
println!("processed an element in the queue");
let remove = rng.gen_range(0..queue.len()); let remove = rng.gen_range(0..queue.len());
let channel = queue.swap_remove(remove); let channel = queue.swap_remove(remove);
let _ = channel.send(Permit { sender: sender.clone() }); let _ = channel.send(Permit { sender: sender.clone() });
@ -81,8 +76,6 @@ impl SearchQueue {
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> { pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
receiver.await.map_err(|_| { receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))
MeilisearchHttpError::TooManySearchRequests(self.capacity, Duration::from_secs(10))
})
} }
} }