mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 17:11:15 +08:00
Only spawn one search queue in actix-web
This commit is contained in:
parent
9a756cf2c5
commit
41aa1e1424
@ -13,11 +13,10 @@ pub mod search_queue;
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, BufWriter};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, available_parallelism};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_cors::Cors;
|
||||
@ -118,6 +117,7 @@ pub type LogStderrType = tracing_subscriber::filter::Filtered<
|
||||
pub fn create_app(
|
||||
index_scheduler: Data<IndexScheduler>,
|
||||
auth_controller: Data<AuthController>,
|
||||
search_queue: Data<SearchQueue>,
|
||||
opt: Opt,
|
||||
logs: (LogRouteHandle, LogStderrHandle),
|
||||
analytics: Arc<dyn Analytics>,
|
||||
@ -137,6 +137,7 @@ pub fn create_app(
|
||||
s,
|
||||
index_scheduler.clone(),
|
||||
auth_controller.clone(),
|
||||
search_queue.clone(),
|
||||
&opt,
|
||||
logs,
|
||||
analytics.clone(),
|
||||
@ -469,19 +470,16 @@ pub fn configure_data(
|
||||
config: &mut web::ServiceConfig,
|
||||
index_scheduler: Data<IndexScheduler>,
|
||||
auth: Data<AuthController>,
|
||||
search_queue: Data<SearchQueue>,
|
||||
opt: &Opt,
|
||||
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
|
||||
analytics: Arc<dyn Analytics>,
|
||||
) {
|
||||
let search_queue = SearchQueue::new(
|
||||
opt.experimental_search_queue_size,
|
||||
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
|
||||
);
|
||||
let http_payload_size_limit = opt.http_payload_size_limit.as_u64() as usize;
|
||||
config
|
||||
.app_data(index_scheduler)
|
||||
.app_data(auth)
|
||||
.app_data(web::Data::new(search_queue))
|
||||
.app_data(search_queue)
|
||||
.app_data(web::Data::from(analytics))
|
||||
.app_data(web::Data::new(logs_route))
|
||||
.app_data(web::Data::new(logs_stderr))
|
||||
|
@ -1,8 +1,10 @@
|
||||
use std::env;
|
||||
use std::io::{stderr, LineWriter, Write};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::thread::available_parallelism;
|
||||
|
||||
use actix_web::http::KeepAlive;
|
||||
use actix_web::web::Data;
|
||||
@ -11,6 +13,7 @@ use index_scheduler::IndexScheduler;
|
||||
use is_terminal::IsTerminal;
|
||||
use meilisearch::analytics::Analytics;
|
||||
use meilisearch::option::LogMode;
|
||||
use meilisearch::search_queue::SearchQueue;
|
||||
use meilisearch::{
|
||||
analytics, create_app, setup_meilisearch, LogRouteHandle, LogRouteType, LogStderrHandle,
|
||||
LogStderrType, Opt, SubscriberForSecondLayer,
|
||||
@ -148,11 +151,17 @@ async fn run_http(
|
||||
let opt_clone = opt.clone();
|
||||
let index_scheduler = Data::from(index_scheduler);
|
||||
let auth_controller = Data::from(auth_controller);
|
||||
let search_queue = SearchQueue::new(
|
||||
opt.experimental_search_queue_size,
|
||||
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
|
||||
);
|
||||
let search_queue = Data::new(search_queue);
|
||||
|
||||
let http_server = HttpServer::new(move || {
|
||||
create_app(
|
||||
index_scheduler.clone(),
|
||||
auth_controller.clone(),
|
||||
search_queue.clone(),
|
||||
opt.clone(),
|
||||
logs.clone(),
|
||||
analytics.clone(),
|
||||
|
@ -1,8 +1,10 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_http::body::MessageBody;
|
||||
@ -11,6 +13,7 @@ use actix_web::http::StatusCode;
|
||||
use byte_unit::{Byte, Unit};
|
||||
use clap::Parser;
|
||||
use meilisearch::option::{IndexerOpts, MaxMemory, MaxThreads, Opt};
|
||||
use meilisearch::search_queue::SearchQueue;
|
||||
use meilisearch::{analytics, create_app, setup_meilisearch, SubscriberForSecondLayer};
|
||||
use once_cell::sync::Lazy;
|
||||
use tempfile::TempDir;
|
||||
@ -53,7 +56,13 @@ impl Server<Owned> {
|
||||
let options = default_settings(dir.path());
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
let service = Service {
|
||||
index_scheduler,
|
||||
auth,
|
||||
search_queue: Self::new_search_queue(&options),
|
||||
options,
|
||||
api_key: None,
|
||||
};
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
}
|
||||
@ -68,7 +77,13 @@ impl Server<Owned> {
|
||||
options.master_key = Some("MASTER_KEY".to_string());
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
let service = Service {
|
||||
index_scheduler,
|
||||
auth,
|
||||
search_queue: Self::new_search_queue(&options),
|
||||
options,
|
||||
api_key: None,
|
||||
};
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
}
|
||||
@ -81,7 +96,13 @@ impl Server<Owned> {
|
||||
|
||||
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options)?;
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
let service = Service {
|
||||
index_scheduler,
|
||||
auth,
|
||||
search_queue: Self::new_search_queue(&options),
|
||||
options,
|
||||
api_key: None,
|
||||
};
|
||||
|
||||
Ok(Server { service, _dir: None, _marker: PhantomData })
|
||||
}
|
||||
@ -256,6 +277,12 @@ impl Server<Shared> {
|
||||
}
|
||||
|
||||
impl<State> Server<State> {
|
||||
fn new_search_queue(options: &Opt) -> Arc<SearchQueue> {
|
||||
let search_queue =
|
||||
SearchQueue::new(options.experimental_search_queue_size, NonZeroUsize::new(1).unwrap());
|
||||
Arc::new(search_queue)
|
||||
}
|
||||
|
||||
pub async fn init_web_app(
|
||||
&self,
|
||||
) -> impl actix_web::dev::Service<
|
||||
@ -279,6 +306,7 @@ impl<State> Server<State> {
|
||||
actix_web::test::init_service(create_app(
|
||||
self.service.index_scheduler.clone().into(),
|
||||
self.service.auth.clone().into(),
|
||||
self.service.search_queue.clone().into(),
|
||||
self.service.options.clone(),
|
||||
(route_layer_handle, stderr_layer_handle),
|
||||
analytics::MockAnalytics::new(&self.service.options),
|
||||
|
@ -5,6 +5,7 @@ use actix_web::http::StatusCode;
|
||||
use actix_web::test;
|
||||
use actix_web::test::TestRequest;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use meilisearch::search_queue::SearchQueue;
|
||||
use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer};
|
||||
use meilisearch_auth::AuthController;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
@ -16,6 +17,7 @@ use crate::common::Value;
|
||||
pub struct Service {
|
||||
pub index_scheduler: Arc<IndexScheduler>,
|
||||
pub auth: Arc<AuthController>,
|
||||
pub search_queue: Arc<SearchQueue>,
|
||||
pub options: Opt,
|
||||
pub api_key: Option<String>,
|
||||
}
|
||||
@ -123,6 +125,7 @@ impl Service {
|
||||
let app = test::init_service(create_app(
|
||||
self.index_scheduler.clone().into(),
|
||||
self.auth.clone().into(),
|
||||
self.search_queue.clone().into(),
|
||||
self.options.clone(),
|
||||
(route_layer_handle, stderr_layer_handle),
|
||||
analytics::MockAnalytics::new(&self.options),
|
||||
|
@ -44,6 +44,7 @@ async fn basic_test_log_stream_route() {
|
||||
let app = actix_web::test::init_service(create_app(
|
||||
server.service.index_scheduler.clone().into(),
|
||||
server.service.auth.clone().into(),
|
||||
server.service.search_queue.clone().into(),
|
||||
server.service.options.clone(),
|
||||
(route_layer_handle, stderr_layer_handle),
|
||||
analytics::MockAnalytics::new(&server.service.options),
|
||||
|
Loading…
Reference in New Issue
Block a user