make it compile and runtime error

This commit is contained in:
Tamo 2024-01-29 17:56:43 +01:00
parent 279c56a665
commit 894c92cd5a
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
5 changed files with 75 additions and 29 deletions

View File

@ -310,6 +310,8 @@ TooManyVectors , InvalidRequest , BAD_REQUEST ;
UnretrievableDocument , Internal , BAD_REQUEST ; UnretrievableDocument , Internal , BAD_REQUEST ;
UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ; UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ;
UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ; UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ;
// Experimental features
VectorEmbeddingError , InvalidRequest , BAD_REQUEST VectorEmbeddingError , InvalidRequest , BAD_REQUEST
} }

View File

@ -12,6 +12,8 @@ pub enum MeilisearchHttpError {
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}", #[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}",
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", "))] .0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", "))]
MissingContentType(Vec<String>), MissingContentType(Vec<String>),
#[error("Log route is currently used by someone else.")]
AlreadyUsedLogRoute,
#[error("The Content-Type `{0}` does not support the use of a csv delimiter. The csv delimiter can only be used with the Content-Type `text/csv`.")] #[error("The Content-Type `{0}` does not support the use of a csv delimiter. The csv delimiter can only be used with the Content-Type `text/csv`.")]
CsvDelimiterWithWrongContentType(String), CsvDelimiterWithWrongContentType(String),
#[error( #[error(
@ -59,6 +61,8 @@ impl ErrorCode for MeilisearchHttpError {
fn error_code(&self) -> Code { fn error_code(&self) -> Code {
match self { match self {
MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType, MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType,
/// TODO: TAMO: create a new error code
MeilisearchHttpError::AlreadyUsedLogRoute => Code::BadRequest,
MeilisearchHttpError::CsvDelimiterWithWrongContentType(_) => Code::InvalidContentType, MeilisearchHttpError::CsvDelimiterWithWrongContentType(_) => Code::InvalidContentType,
MeilisearchHttpError::MissingPayload(_) => Code::MissingPayload, MeilisearchHttpError::MissingPayload(_) => Code::MissingPayload,
MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType, MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,

View File

@ -86,10 +86,17 @@ fn is_empty_db(db_path: impl AsRef<Path>) -> bool {
} }
} }
/// The handle used to update the logs at runtime. Must be accessible from the `main.rs` and the `route/logs.rs`.
pub type LogRouteHandle =
tracing_subscriber::reload::Handle<Option<LogRouteType>, tracing_subscriber::Registry>;
pub type LogRouteType =
Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>;
pub fn create_app( pub fn create_app(
index_scheduler: Data<IndexScheduler>, index_scheduler: Data<IndexScheduler>,
auth_controller: Data<AuthController>, auth_controller: Data<AuthController>,
opt: Opt, opt: Opt,
logs: LogRouteHandle,
analytics: Arc<dyn Analytics>, analytics: Arc<dyn Analytics>,
enable_dashboard: bool, enable_dashboard: bool,
) -> actix_web::App< ) -> actix_web::App<
@ -108,6 +115,7 @@ pub fn create_app(
index_scheduler.clone(), index_scheduler.clone(),
auth_controller.clone(), auth_controller.clone(),
&opt, &opt,
logs,
analytics.clone(), analytics.clone(),
) )
}) })
@ -391,6 +399,7 @@ pub fn configure_data(
index_scheduler: Data<IndexScheduler>, index_scheduler: Data<IndexScheduler>,
auth: Data<AuthController>, auth: Data<AuthController>,
opt: &Opt, opt: &Opt,
logs: LogRouteHandle,
analytics: Arc<dyn Analytics>, analytics: Arc<dyn Analytics>,
) { ) {
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize; let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
@ -398,6 +407,7 @@ pub fn configure_data(
.app_data(index_scheduler) .app_data(index_scheduler)
.app_data(auth) .app_data(auth)
.app_data(web::Data::from(analytics)) .app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs))
.app_data( .app_data(
web::JsonConfig::default() web::JsonConfig::default()
.limit(http_payload_size_limit) .limit(http_payload_size_limit)

View File

@ -12,7 +12,7 @@ use anyhow::Context;
use index_scheduler::IndexScheduler; use index_scheduler::IndexScheduler;
use is_terminal::IsTerminal; use is_terminal::IsTerminal;
use meilisearch::analytics::Analytics; use meilisearch::analytics::Analytics;
use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, Opt}; use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, LogRouteHandle, Opt};
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE}; use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
use mimalloc::MiMalloc; use mimalloc::MiMalloc;
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
@ -27,12 +27,12 @@ static ALLOC: MiMalloc = MiMalloc;
#[global_allocator] #[global_allocator]
static ALLOC: stats_alloc::StatsAlloc<MiMalloc> = stats_alloc::StatsAlloc::new(MiMalloc); static ALLOC: stats_alloc::StatsAlloc<MiMalloc> = stats_alloc::StatsAlloc::new(MiMalloc);
fn f<S: Sized>() -> Option<Box<dyn Layer<S>>> { fn f<S>() -> Option<Box<dyn Layer<S> + Send + Sync>> {
None None
} }
/// does all the setup before meilisearch is launched /// does all the setup before meilisearch is launched
fn setup(opt: &Opt) -> anyhow::Result<()> { fn setup(opt: &Opt) -> anyhow::Result<LogRouteHandle> {
let now = time::OffsetDateTime::now_utc(); let now = time::OffsetDateTime::now_utc();
let format = time::format_description::parse("[year]-[month]-[day]_[hour]:[minute]:[second]")?; let format = time::format_description::parse("[year]-[month]-[day]_[hour]:[minute]:[second]")?;
let trace_file = format!("{}-indexing-trace.json", now.format(&format)?); let trace_file = format!("{}-indexing-trace.json", now.format(&format)?);
@ -44,12 +44,11 @@ fn setup(opt: &Opt) -> anyhow::Result<()> {
#[cfg(feature = "stats_alloc")] #[cfg(feature = "stats_alloc")]
let (mut trace, layer) = tracing_trace::Trace::with_stats_alloc(file, &ALLOC); let (mut trace, layer) = tracing_trace::Trace::with_stats_alloc(file, &ALLOC);
// let (route_layer, route_layer_handle) = tracing_subscriber::reload::Layer::new(vec![]);
let (route_layer, route_layer_handle) = tracing_subscriber::reload::Layer::new(f()); let (route_layer, route_layer_handle) = tracing_subscriber::reload::Layer::new(f());
let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer; let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer;
let subscriber = tracing_subscriber::registry() let subscriber = tracing_subscriber::registry()
.with(route_layer) .with(route_layer.boxed())
.with( .with(
tracing_subscriber::fmt::layer() tracing_subscriber::fmt::layer()
.with_line_number(true) .with_line_number(true)
@ -82,7 +81,7 @@ fn setup(opt: &Opt) -> anyhow::Result<()> {
// set the subscriber as the default for the application // set the subscriber as the default for the application
tracing::subscriber::set_global_default(subscriber).unwrap(); tracing::subscriber::set_global_default(subscriber).unwrap();
Ok(()) Ok(route_layer_handle)
} }
#[actix_web::main] #[actix_web::main]
@ -94,7 +93,7 @@ async fn main() -> anyhow::Result<()> {
"The `experimental-reduce-indexing-memory-usage` flag is not supported on Windows" "The `experimental-reduce-indexing-memory-usage` flag is not supported on Windows"
); );
setup(&opt)?; let log_handle = setup(&opt)?;
match (opt.env.as_ref(), &opt.master_key) { match (opt.env.as_ref(), &opt.master_key) {
("production", Some(master_key)) if master_key.len() < MASTER_KEY_MIN_SIZE => { ("production", Some(master_key)) if master_key.len() < MASTER_KEY_MIN_SIZE => {
@ -132,7 +131,7 @@ async fn main() -> anyhow::Result<()> {
print_launch_resume(&opt, analytics.clone(), config_read_from); print_launch_resume(&opt, analytics.clone(), config_read_from);
run_http(index_scheduler, auth_controller, opt, analytics).await?; run_http(index_scheduler, auth_controller, opt, log_handle, analytics).await?;
Ok(()) Ok(())
} }
@ -141,6 +140,7 @@ async fn run_http(
index_scheduler: Arc<IndexScheduler>, index_scheduler: Arc<IndexScheduler>,
auth_controller: Arc<AuthController>, auth_controller: Arc<AuthController>,
opt: Opt, opt: Opt,
logs: LogRouteHandle,
analytics: Arc<dyn Analytics>, analytics: Arc<dyn Analytics>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let enable_dashboard = &opt.env == "development"; let enable_dashboard = &opt.env == "development";
@ -153,6 +153,7 @@ async fn run_http(
index_scheduler.clone(), index_scheduler.clone(),
auth_controller.clone(), auth_controller.clone(),
opt.clone(), opt.clone(),
logs.clone(),
analytics.clone(), analytics.clone(),
enable_dashboard, enable_dashboard,
) )

View File

@ -13,14 +13,15 @@ use meilisearch_auth::AuthController;
use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::deserr_codes::*;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use tokio::pin; use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::mpsc;
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::{LogRouteHandle, LogRouteType};
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(get_logs)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(get_logs))));
@ -73,9 +74,6 @@ impl Write for LogWriter {
struct LogStreamer { struct LogStreamer {
receiver: mpsc::UnboundedReceiver<Vec<u8>>, receiver: mpsc::UnboundedReceiver<Vec<u8>>,
// We just need to hold the guard until the struct is dropped
#[allow(unused)]
subscriber: tracing::subscriber::DefaultGuard,
} }
impl futures_util::Stream for LogStreamer { impl futures_util::Stream for LogStreamer {
@ -101,8 +99,27 @@ impl futures_util::Stream for LogStreamer {
} }
} }
pub fn make_subscriber<
S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
>(
opt: &GetLogs,
sender: UnboundedSender<Vec<u8>>,
) -> Box<dyn Layer<S> + Send + Sync> {
let fmt_layer = tracing_subscriber::fmt::layer()
.with_line_number(true)
.with_writer(move || LogWriter { sender: sender.clone() })
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE)
.with_filter(
tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap(),
);
// let subscriber = tracing_subscriber::registry().with(fmt_layer);
Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync>
}
pub async fn get_logs( pub async fn get_logs(
_auth_controller: GuardedData<ActionPolicy<{ actions::METRICS_ALL }>, Data<AuthController>>, _auth_controller: GuardedData<ActionPolicy<{ actions::METRICS_ALL }>, Data<AuthController>>,
logs: Data<LogRouteHandle>,
body: AwebJson<GetLogs, DeserrJsonError>, body: AwebJson<GetLogs, DeserrJsonError>,
_req: HttpRequest, _req: HttpRequest,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
@ -115,23 +132,35 @@ pub async fn get_logs(
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let layer = tracing_subscriber::fmt::layer() // let fmt_layer = tracing_subscriber::fmt::layer()
.with_line_number(true) // .with_line_number(true)
.with_writer(move || LogWriter { sender: sender.clone() }) // .with_writer(move || LogWriter { sender: sender.clone() })
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE) // .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE)
.with_filter( // .with_filter(
tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap(), // tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap(),
); // );
// let subscriber = tracing_subscriber::registry().with(fmt_layer);
// let subscriber = Box::new(subscriber) as Box<dyn Layer<S> + Send + Sync>;
let subscriber = tracing_subscriber::registry().with(layer); let mut was_available = false;
// .with(
// layer.with_filter(
// tracing_subscriber::filter::Targets::new()
// .with_target("indexing::", tracing::Level::TRACE),
// ),
// );
let subscriber = tracing::subscriber::set_default(subscriber); logs.modify(|layer| match layer {
None => {
was_available = true;
// there is already someone getting logs
let subscriber = make_subscriber(&opt, sender);
*layer = Some(subscriber)
}
Some(_) => {
// there is already someone getting logs
was_available = false;
}
})
.unwrap();
Ok(HttpResponse::Ok().streaming(LogStreamer { receiver, subscriber })) if was_available {
Ok(HttpResponse::Ok().streaming(LogStreamer { receiver }))
} else {
Err(MeilisearchHttpError::AlreadyUsedLogRoute.into())
}
} }