start exposing the profiling layer

This commit is contained in:
Tamo 2024-01-30 14:19:46 +01:00
parent 8b3e0b3826
commit a144ae10d2
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69

View File

@ -1,34 +1,32 @@
use std::fmt; use std::fmt;
use std::io::Write; use std::io::Write;
use std::pin::Pin; use std::ops::ControlFlow;
use std::str::FromStr; use std::str::FromStr;
use std::task::Poll;
use actix_web::web::{Bytes, Data}; use actix_web::web::{Bytes, Data};
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use deserr::actix_web::AwebJson; use deserr::actix_web::AwebJson;
use deserr::Deserr; use deserr::Deserr;
use futures_util::{pin_mut, FutureExt};
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::deserr_codes::*;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::mpsc::{self, UnboundedSender};
use tracing_subscriber::layer::SubscriberExt; use tracing::instrument::WithSubscriber;
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::{LogRouteHandle, LogRouteType}; use crate::LogRouteHandle;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(get_logs)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(get_logs))));
} }
#[derive(Debug, Default, Clone, Copy, Deserr)] #[derive(Debug, Default, Clone, Copy, Deserr)]
#[serde(rename_all = "lowercase")] #[deserr(rename_all = lowercase)]
pub enum LogLevel { pub enum LogLevel {
Error, Error,
Warn, Warn,
@ -38,11 +36,22 @@ pub enum LogLevel {
Trace, Trace,
} }
#[derive(Debug, Default, Clone, Copy, Deserr)]
#[deserr(rename_all = lowercase)]
pub enum LogMode {
#[default]
Fmt,
Profile,
}
#[derive(Debug, Deserr)] #[derive(Debug, Deserr)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct GetLogs { pub struct GetLogs {
#[deserr(default, error = DeserrJsonError<BadRequest>)] #[deserr(default, error = DeserrJsonError<BadRequest>)]
pub level: LogLevel, pub level: LogLevel,
#[deserr(default, error = DeserrJsonError<BadRequest>)]
pub mode: LogMode,
} }
impl fmt::Display for LogLevel { impl fmt::Display for LogLevel {
@ -76,29 +85,6 @@ struct LogStreamer {
receiver: mpsc::UnboundedReceiver<Vec<u8>>, receiver: mpsc::UnboundedReceiver<Vec<u8>>,
} }
impl futures_util::Stream for LogStreamer {
type Item = Result<Bytes, ResponseError>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let future = self.get_mut().receiver.recv();
pin_mut!(future);
match future.poll_unpin(cx) {
std::task::Poll::Ready(recv) => match recv {
Some(buf) => {
// let bytes = Bytes::copy_from_slice(buf.as_slice());
Poll::Ready(Some(Ok(buf.into())))
}
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
}
impl LogStreamer { impl LogStreamer {
pub fn into_stream(self) -> impl futures_util::Stream<Item = Result<Bytes, ResponseError>> { pub fn into_stream(self) -> impl futures_util::Stream<Item = Result<Bytes, ResponseError>> {
futures_util::stream::unfold(self, move |mut this| async move { futures_util::stream::unfold(self, move |mut this| async move {
@ -115,13 +101,38 @@ pub fn make_layer<
opt: &GetLogs, opt: &GetLogs,
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
) -> Box<dyn Layer<S> + Send + Sync> { ) -> Box<dyn Layer<S> + Send + Sync> {
let fmt_layer = tracing_subscriber::fmt::layer() match opt.mode {
.with_line_number(true) LogMode::Fmt => {
.with_writer(move || LogWriter { sender: sender.clone() }) let fmt_layer = tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE); .with_line_number(true)
// let subscriber = tracing_subscriber::registry().with(fmt_layer); .with_writer(move || LogWriter { sender: sender.clone() })
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE);
Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync> Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync>
}
LogMode::Profile => {
let (mut trace, layer) =
tracing_trace::Trace::new(LogWriter { sender: sender.clone() });
tokio::task::spawn(async move {
loop {
match tokio::time::timeout(std::time::Duration::from_secs(1), trace.receive())
.await
{
Ok(Ok(ControlFlow::Continue(()))) => continue,
Ok(Ok(ControlFlow::Break(_))) => break,
// the other half of the channel was dropped
Ok(Err(_)) => break,
Err(_) => trace.flush().unwrap(),
}
}
while trace.try_receive().is_ok() {}
trace.flush().unwrap();
});
Box::new(layer) as Box<dyn Layer<S> + Send + Sync>
}
}
} }
pub async fn get_logs( pub async fn get_logs(
@ -153,10 +164,28 @@ pub async fn get_logs(
logs.modify(|layer| match layer.inner_mut() { logs.modify(|layer| match layer.inner_mut() {
None => { None => {
was_available = true;
*layer.filter_mut() =
tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string()).unwrap();
// there is no one getting logs // there is no one getting logs
was_available = true;
match opt.mode {
LogMode::Fmt => {
*layer.filter_mut() =
tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string())
.unwrap();
}
LogMode::Profile => {
*layer.filter_mut() =
tracing_subscriber::filter::LevelFilter::from_str(&opt.level.to_string())
.unwrap();
// *layer.filter_mut() = tracing_subscriber::filter::Targets::new()
// .with_target("indexing::", tracing::Level::TRACE)
// .with_filter(
// tracing_subscriber::filter::LevelFilter::from_str(
// &opt.level.to_string(),
// )
// .unwrap(),
// )
}
}
let new_layer = make_layer(&opt, sender); let new_layer = make_layer(&opt, sender);
*layer.inner_mut() = Some(new_layer) *layer.inner_mut() = Some(new_layer)