4893: Only spawn one search queue in actix-web r=dureuill a=irevoire

# Pull Request

## Related issue
May be related to #4654 and https://github.com/meilisearch/meilisearch-support/issues/350

## What does this PR do?
- We noticed a bug where multiple search queue were spawned instead of one


Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-08-27 20:26:59 +00:00 committed by GitHub
commit 9eee467226
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 49 additions and 10 deletions

View File

@ -13,11 +13,10 @@ pub mod search_queue;
use std::fs::File; use std::fs::File;
use std::io::{BufReader, BufWriter}; use std::io::{BufReader, BufWriter};
use std::num::NonZeroUsize;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, available_parallelism}; use std::thread;
use std::time::Duration; use std::time::Duration;
use actix_cors::Cors; use actix_cors::Cors;
@ -118,6 +117,7 @@ pub type LogStderrType = tracing_subscriber::filter::Filtered<
pub fn create_app( pub fn create_app(
index_scheduler: Data<IndexScheduler>, index_scheduler: Data<IndexScheduler>,
auth_controller: Data<AuthController>, auth_controller: Data<AuthController>,
search_queue: Data<SearchQueue>,
opt: Opt, opt: Opt,
logs: (LogRouteHandle, LogStderrHandle), logs: (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>, analytics: Arc<dyn Analytics>,
@ -137,6 +137,7 @@ pub fn create_app(
s, s,
index_scheduler.clone(), index_scheduler.clone(),
auth_controller.clone(), auth_controller.clone(),
search_queue.clone(),
&opt, &opt,
logs, logs,
analytics.clone(), analytics.clone(),
@ -469,19 +470,16 @@ pub fn configure_data(
config: &mut web::ServiceConfig, config: &mut web::ServiceConfig,
index_scheduler: Data<IndexScheduler>, index_scheduler: Data<IndexScheduler>,
auth: Data<AuthController>, auth: Data<AuthController>,
search_queue: Data<SearchQueue>,
opt: &Opt, opt: &Opt,
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle), (logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>, analytics: Arc<dyn Analytics>,
) { ) {
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let http_payload_size_limit = opt.http_payload_size_limit.as_u64() as usize; let http_payload_size_limit = opt.http_payload_size_limit.as_u64() as usize;
config config
.app_data(index_scheduler) .app_data(index_scheduler)
.app_data(auth) .app_data(auth)
.app_data(web::Data::new(search_queue)) .app_data(search_queue)
.app_data(web::Data::from(analytics)) .app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs_route)) .app_data(web::Data::new(logs_route))
.app_data(web::Data::new(logs_stderr)) .app_data(web::Data::new(logs_stderr))

View File

@ -1,8 +1,10 @@
use std::env; use std::env;
use std::io::{stderr, LineWriter, Write}; use std::io::{stderr, LineWriter, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::thread::available_parallelism;
use actix_web::http::KeepAlive; use actix_web::http::KeepAlive;
use actix_web::web::Data; use actix_web::web::Data;
@ -11,6 +13,7 @@ use index_scheduler::IndexScheduler;
use is_terminal::IsTerminal; use is_terminal::IsTerminal;
use meilisearch::analytics::Analytics; use meilisearch::analytics::Analytics;
use meilisearch::option::LogMode; use meilisearch::option::LogMode;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{ use meilisearch::{
analytics, create_app, setup_meilisearch, LogRouteHandle, LogRouteType, LogStderrHandle, analytics, create_app, setup_meilisearch, LogRouteHandle, LogRouteType, LogStderrHandle,
LogStderrType, Opt, SubscriberForSecondLayer, LogStderrType, Opt, SubscriberForSecondLayer,
@ -148,11 +151,17 @@ async fn run_http(
let opt_clone = opt.clone(); let opt_clone = opt.clone();
let index_scheduler = Data::from(index_scheduler); let index_scheduler = Data::from(index_scheduler);
let auth_controller = Data::from(auth_controller); let auth_controller = Data::from(auth_controller);
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let search_queue = Data::new(search_queue);
let http_server = HttpServer::new(move || { let http_server = HttpServer::new(move || {
create_app( create_app(
index_scheduler.clone(), index_scheduler.clone(),
auth_controller.clone(), auth_controller.clone(),
search_queue.clone(),
opt.clone(), opt.clone(),
logs.clone(), logs.clone(),
analytics.clone(), analytics.clone(),

View File

@ -1,7 +1,9 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::num::NonZeroUsize;
use std::path::Path; use std::path::Path;
use std::str::FromStr as _; use std::str::FromStr as _;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use actix_http::body::MessageBody; use actix_http::body::MessageBody;
@ -10,6 +12,7 @@ use actix_web::http::StatusCode;
use byte_unit::{Byte, Unit}; use byte_unit::{Byte, Unit};
use clap::Parser; use clap::Parser;
use meilisearch::option::{IndexerOpts, MaxMemory, MaxThreads, Opt}; use meilisearch::option::{IndexerOpts, MaxMemory, MaxThreads, Opt};
use meilisearch::search_queue::SearchQueue;
use meilisearch::{analytics, create_app, setup_meilisearch, SubscriberForSecondLayer}; use meilisearch::{analytics, create_app, setup_meilisearch, SubscriberForSecondLayer};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tempfile::TempDir; use tempfile::TempDir;
@ -32,6 +35,12 @@ pub struct Server {
pub static TEST_TEMP_DIR: Lazy<TempDir> = Lazy::new(|| TempDir::new().unwrap()); pub static TEST_TEMP_DIR: Lazy<TempDir> = Lazy::new(|| TempDir::new().unwrap());
impl Server { impl Server {
fn new_search_queue(options: &Opt) -> Arc<SearchQueue> {
let search_queue =
SearchQueue::new(options.experimental_search_queue_size, NonZeroUsize::new(1).unwrap());
Arc::new(search_queue)
}
pub async fn new() -> Self { pub async fn new() -> Self {
let dir = TempDir::new().unwrap(); let dir = TempDir::new().unwrap();
@ -44,7 +53,13 @@ impl Server {
let options = default_settings(dir.path()); let options = default_settings(dir.path());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap(); let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None }; let service = Service {
index_scheduler,
auth,
search_queue: Self::new_search_queue(&options),
options,
api_key: None,
};
Server { service, _dir: Some(dir) } Server { service, _dir: Some(dir) }
} }
@ -59,7 +74,13 @@ impl Server {
options.master_key = Some("MASTER_KEY".to_string()); options.master_key = Some("MASTER_KEY".to_string());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap(); let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None }; let service = Service {
index_scheduler,
auth,
search_queue: Self::new_search_queue(&options),
options,
api_key: None,
};
Server { service, _dir: Some(dir) } Server { service, _dir: Some(dir) }
} }
@ -72,7 +93,13 @@ impl Server {
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> { pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
let (index_scheduler, auth) = setup_meilisearch(&options)?; let (index_scheduler, auth) = setup_meilisearch(&options)?;
let service = Service { index_scheduler, auth, options, api_key: None }; let service = Service {
index_scheduler,
auth,
search_queue: Self::new_search_queue(&options),
options,
api_key: None,
};
Ok(Server { service, _dir: None }) Ok(Server { service, _dir: None })
} }
@ -100,6 +127,7 @@ impl Server {
actix_web::test::init_service(create_app( actix_web::test::init_service(create_app(
self.service.index_scheduler.clone().into(), self.service.index_scheduler.clone().into(),
self.service.auth.clone().into(), self.service.auth.clone().into(),
self.service.search_queue.clone().into(),
self.service.options.clone(), self.service.options.clone(),
(route_layer_handle, stderr_layer_handle), (route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.service.options), analytics::MockAnalytics::new(&self.service.options),

View File

@ -5,6 +5,7 @@ use actix_web::http::StatusCode;
use actix_web::test; use actix_web::test;
use actix_web::test::TestRequest; use actix_web::test::TestRequest;
use index_scheduler::IndexScheduler; use index_scheduler::IndexScheduler;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer}; use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer};
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use tracing::level_filters::LevelFilter; use tracing::level_filters::LevelFilter;
@ -16,6 +17,7 @@ use crate::common::Value;
pub struct Service { pub struct Service {
pub index_scheduler: Arc<IndexScheduler>, pub index_scheduler: Arc<IndexScheduler>,
pub auth: Arc<AuthController>, pub auth: Arc<AuthController>,
pub search_queue: Arc<SearchQueue>,
pub options: Opt, pub options: Opt,
pub api_key: Option<String>, pub api_key: Option<String>,
} }
@ -123,6 +125,7 @@ impl Service {
let app = test::init_service(create_app( let app = test::init_service(create_app(
self.index_scheduler.clone().into(), self.index_scheduler.clone().into(),
self.auth.clone().into(), self.auth.clone().into(),
self.search_queue.clone().into(),
self.options.clone(), self.options.clone(),
(route_layer_handle, stderr_layer_handle), (route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.options), analytics::MockAnalytics::new(&self.options),

View File

@ -44,6 +44,7 @@ async fn basic_test_log_stream_route() {
let app = actix_web::test::init_service(create_app( let app = actix_web::test::init_service(create_app(
server.service.index_scheduler.clone().into(), server.service.index_scheduler.clone().into(),
server.service.auth.clone().into(), server.service.auth.clone().into(),
server.service.search_queue.clone().into(),
server.service.options.clone(), server.service.options.clone(),
(route_layer_handle, stderr_layer_handle), (route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&server.service.options), analytics::MockAnalytics::new(&server.service.options),