From 894c92cd5aef826ca1af7eeac2826937f2e7a6ca Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 29 Jan 2024 17:56:43 +0100 Subject: [PATCH] make it compile and runtime error --- meilisearch-types/src/error.rs | 2 + meilisearch/src/error.rs | 4 ++ meilisearch/src/lib.rs | 10 +++++ meilisearch/src/main.rs | 17 ++++---- meilisearch/src/routes/logs.rs | 71 ++++++++++++++++++++++++---------- 5 files changed, 75 insertions(+), 29 deletions(-) diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 2182b1836..80643008e 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -310,6 +310,8 @@ TooManyVectors , InvalidRequest , BAD_REQUEST ; UnretrievableDocument , Internal , BAD_REQUEST ; UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ; UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ; + +// Experimental features VectorEmbeddingError , InvalidRequest , BAD_REQUEST } diff --git a/meilisearch/src/error.rs b/meilisearch/src/error.rs index 3bd8f3edd..ee54cf831 100644 --- a/meilisearch/src/error.rs +++ b/meilisearch/src/error.rs @@ -12,6 +12,8 @@ pub enum MeilisearchHttpError { #[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}", .0.iter().map(|s| format!("`{}`", s)).collect::>().join(", "))] MissingContentType(Vec), + #[error("Log route is currently used by someone else.")] + AlreadyUsedLogRoute, #[error("The Content-Type `{0}` does not support the use of a csv delimiter. The csv delimiter can only be used with the Content-Type `text/csv`.")] CsvDelimiterWithWrongContentType(String), #[error( @@ -59,6 +61,8 @@ impl ErrorCode for MeilisearchHttpError { fn error_code(&self) -> Code { match self { MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType, + /// TODO: TAMO: create a new error code + MeilisearchHttpError::AlreadyUsedLogRoute => Code::BadRequest, MeilisearchHttpError::CsvDelimiterWithWrongContentType(_) => Code::InvalidContentType, MeilisearchHttpError::MissingPayload(_) => Code::MissingPayload, MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index f1111962c..fb2874472 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -86,10 +86,17 @@ fn is_empty_db(db_path: impl AsRef) -> bool { } } +/// The handle used to update the logs at runtime. Must be accessible from the `main.rs` and the `route/logs.rs`. +pub type LogRouteHandle = + tracing_subscriber::reload::Handle, tracing_subscriber::Registry>; +pub type LogRouteType = + Box + Sync + Send>; + pub fn create_app( index_scheduler: Data, auth_controller: Data, opt: Opt, + logs: LogRouteHandle, analytics: Arc, enable_dashboard: bool, ) -> actix_web::App< @@ -108,6 +115,7 @@ pub fn create_app( index_scheduler.clone(), auth_controller.clone(), &opt, + logs, analytics.clone(), ) }) @@ -391,6 +399,7 @@ pub fn configure_data( index_scheduler: Data, auth: Data, opt: &Opt, + logs: LogRouteHandle, analytics: Arc, ) { let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize; @@ -398,6 +407,7 @@ pub fn configure_data( .app_data(index_scheduler) .app_data(auth) .app_data(web::Data::from(analytics)) + .app_data(web::Data::new(logs)) .app_data( web::JsonConfig::default() .limit(http_payload_size_limit) diff --git a/meilisearch/src/main.rs b/meilisearch/src/main.rs index 863db22d0..5750e222d 100644 --- a/meilisearch/src/main.rs +++ b/meilisearch/src/main.rs @@ -12,7 +12,7 @@ use anyhow::Context; use index_scheduler::IndexScheduler; use is_terminal::IsTerminal; use meilisearch::analytics::Analytics; -use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, Opt}; +use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, LogRouteHandle, Opt}; use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE}; use mimalloc::MiMalloc; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; @@ -27,12 +27,12 @@ static ALLOC: MiMalloc = MiMalloc; #[global_allocator] static ALLOC: stats_alloc::StatsAlloc = stats_alloc::StatsAlloc::new(MiMalloc); -fn f() -> Option>> { +fn f() -> Option + Send + Sync>> { None } /// does all the setup before meilisearch is launched -fn setup(opt: &Opt) -> anyhow::Result<()> { +fn setup(opt: &Opt) -> anyhow::Result { let now = time::OffsetDateTime::now_utc(); let format = time::format_description::parse("[year]-[month]-[day]_[hour]:[minute]:[second]")?; let trace_file = format!("{}-indexing-trace.json", now.format(&format)?); @@ -44,12 +44,11 @@ fn setup(opt: &Opt) -> anyhow::Result<()> { #[cfg(feature = "stats_alloc")] let (mut trace, layer) = tracing_trace::Trace::with_stats_alloc(file, &ALLOC); - // let (route_layer, route_layer_handle) = tracing_subscriber::reload::Layer::new(vec![]); let (route_layer, route_layer_handle) = tracing_subscriber::reload::Layer::new(f()); let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer; let subscriber = tracing_subscriber::registry() - .with(route_layer) + .with(route_layer.boxed()) .with( tracing_subscriber::fmt::layer() .with_line_number(true) @@ -82,7 +81,7 @@ fn setup(opt: &Opt) -> anyhow::Result<()> { // set the subscriber as the default for the application tracing::subscriber::set_global_default(subscriber).unwrap(); - Ok(()) + Ok(route_layer_handle) } #[actix_web::main] @@ -94,7 +93,7 @@ async fn main() -> anyhow::Result<()> { "The `experimental-reduce-indexing-memory-usage` flag is not supported on Windows" ); - setup(&opt)?; + let log_handle = setup(&opt)?; match (opt.env.as_ref(), &opt.master_key) { ("production", Some(master_key)) if master_key.len() < MASTER_KEY_MIN_SIZE => { @@ -132,7 +131,7 @@ async fn main() -> anyhow::Result<()> { print_launch_resume(&opt, analytics.clone(), config_read_from); - run_http(index_scheduler, auth_controller, opt, analytics).await?; + run_http(index_scheduler, auth_controller, opt, log_handle, analytics).await?; Ok(()) } @@ -141,6 +140,7 @@ async fn run_http( index_scheduler: Arc, auth_controller: Arc, opt: Opt, + logs: LogRouteHandle, analytics: Arc, ) -> anyhow::Result<()> { let enable_dashboard = &opt.env == "development"; @@ -153,6 +153,7 @@ async fn run_http( index_scheduler.clone(), auth_controller.clone(), opt.clone(), + logs.clone(), analytics.clone(), enable_dashboard, ) diff --git a/meilisearch/src/routes/logs.rs b/meilisearch/src/routes/logs.rs index a20dac8bb..e5f5ae091 100644 --- a/meilisearch/src/routes/logs.rs +++ b/meilisearch/src/routes/logs.rs @@ -13,14 +13,15 @@ use meilisearch_auth::AuthController; use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::ResponseError; -use tokio::pin; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, UnboundedSender}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Layer; +use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; +use crate::{LogRouteHandle, LogRouteType}; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(get_logs)))); @@ -73,9 +74,6 @@ impl Write for LogWriter { struct LogStreamer { receiver: mpsc::UnboundedReceiver>, - // We just need to hold the guard until the struct is dropped - #[allow(unused)] - subscriber: tracing::subscriber::DefaultGuard, } impl futures_util::Stream for LogStreamer { @@ -101,8 +99,27 @@ impl futures_util::Stream for LogStreamer { } } +pub fn make_subscriber< + S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, +>( + opt: &GetLogs, + sender: UnboundedSender>, +) -> Box + Send + Sync> { + let fmt_layer = tracing_subscriber::fmt::layer() + .with_line_number(true) + .with_writer(move || LogWriter { sender: sender.clone() }) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE) + .with_filter( + tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap(), + ); + // let subscriber = tracing_subscriber::registry().with(fmt_layer); + + Box::new(fmt_layer) as Box + Send + Sync> +} + pub async fn get_logs( _auth_controller: GuardedData, Data>, + logs: Data, body: AwebJson, _req: HttpRequest, ) -> Result { @@ -115,23 +132,35 @@ pub async fn get_logs( let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - let layer = tracing_subscriber::fmt::layer() - .with_line_number(true) - .with_writer(move || LogWriter { sender: sender.clone() }) - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE) - .with_filter( - tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap(), - ); + // let fmt_layer = tracing_subscriber::fmt::layer() + // .with_line_number(true) + // .with_writer(move || LogWriter { sender: sender.clone() }) + // .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE) + // .with_filter( + // tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap(), + // ); + // let subscriber = tracing_subscriber::registry().with(fmt_layer); + // let subscriber = Box::new(subscriber) as Box + Send + Sync>; - let subscriber = tracing_subscriber::registry().with(layer); - // .with( - // layer.with_filter( - // tracing_subscriber::filter::Targets::new() - // .with_target("indexing::", tracing::Level::TRACE), - // ), - // ); + let mut was_available = false; - let subscriber = tracing::subscriber::set_default(subscriber); + logs.modify(|layer| match layer { + None => { + was_available = true; + // there is already someone getting logs + let subscriber = make_subscriber(&opt, sender); + *layer = Some(subscriber) + } + Some(_) => { + // there is already someone getting logs + was_available = false; + } + }) + .unwrap(); - Ok(HttpResponse::Ok().streaming(LogStreamer { receiver, subscriber })) + if was_available { + Ok(HttpResponse::Ok().streaming(LogStreamer { receiver })) + } else { + Err(MeilisearchHttpError::AlreadyUsedLogRoute.into()) + } }