meilisearch: logs route disconnects in profile mode

This commit is contained in:
Louis Dureuil 2024-01-31 17:47:30 +01:00
parent afc0585c1c
commit 38e1c40f38
No known key found for this signature in database

View File

@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
use std::io::Write; use std::io::Write;
use std::ops::ControlFlow; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
@ -8,11 +8,12 @@ use actix_web::web::{Bytes, Data};
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use deserr::actix_web::AwebJson; use deserr::actix_web::AwebJson;
use deserr::Deserr; use deserr::Deserr;
use futures_util::Stream;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::deserr_codes::*;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::{Code, ResponseError};
use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::mpsc::{self};
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
@ -91,13 +92,12 @@ impl Write for LogWriter {
} }
} }
struct LogStreamer { struct HandleGuard {
receiver: mpsc::UnboundedReceiver<Vec<u8>>,
/// We need to keep an handle on the logs to make it available again when the streamer is dropped /// We need to keep an handle on the logs to make it available again when the streamer is dropped
logs: Arc<LogRouteHandle>, logs: Arc<LogRouteHandle>,
} }
impl Drop for LogStreamer { impl Drop for HandleGuard {
fn drop(&mut self) { fn drop(&mut self) {
println!("log streamer being dropped"); println!("log streamer being dropped");
if let Err(e) = self.logs.modify(|layer| *layer.inner_mut() = None) { if let Err(e) = self.logs.modify(|layer| *layer.inner_mut() = None) {
@ -106,56 +106,99 @@ impl Drop for LogStreamer {
} }
} }
impl LogStreamer { fn byte_stream(
pub fn into_stream(self) -> impl futures_util::Stream<Item = Result<Bytes, ResponseError>> { receiver: mpsc::UnboundedReceiver<Vec<u8>>,
futures_util::stream::unfold(self, move |mut this| async move { guard: HandleGuard,
let vec = this.receiver.recv().await; ) -> impl futures_util::Stream<Item = Result<Bytes, ResponseError>> {
futures_util::stream::unfold((receiver, guard), move |(mut receiver, guard)| async move {
let vec = receiver.recv().await;
vec.map(From::from).map(Ok).map(|a| (a, this)) vec.map(From::from).map(Ok).map(|a| (a, (receiver, guard)))
}) })
}
} }
pub fn make_layer< type PinnedByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, ResponseError>>>>;
fn make_layer<
S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
>( >(
opt: &GetLogs, opt: &GetLogs,
sender: UnboundedSender<Vec<u8>>, logs: Data<LogRouteHandle>,
) -> Box<dyn Layer<S> + Send + Sync> { ) -> (Box<dyn Layer<S> + Send + Sync>, PinnedByteStream) {
let guard = HandleGuard { logs: logs.into_inner() };
match opt.mode { match opt.mode {
LogMode::Fmt => { LogMode::Fmt => {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let fmt_layer = tracing_subscriber::fmt::layer() let fmt_layer = tracing_subscriber::fmt::layer()
.with_line_number(true) .with_line_number(true)
.with_writer(move || LogWriter { sender: sender.clone() }) .with_writer(move || LogWriter { sender: sender.clone() })
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE); .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE);
Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync> let stream = byte_stream(receiver, guard);
(Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync>, Box::pin(stream))
} }
LogMode::Profile => { LogMode::Profile => {
let (mut trace, layer) = let (trace, layer) = tracing_trace::Trace::new();
tracing_trace::Trace::new(LogWriter { sender: sender.clone() });
tokio::task::spawn(async move { let stream = entry_stream(trace, guard);
loop {
match tokio::time::timeout(std::time::Duration::from_secs(1), trace.receive())
.await
{
Ok(Ok(ControlFlow::Continue(()))) => continue,
Ok(Ok(ControlFlow::Break(_))) => break,
// the other half of the channel was dropped
Ok(Err(_)) => break,
Err(_) => trace.flush().unwrap(),
}
}
while trace.try_receive().is_ok() {}
trace.flush().unwrap();
});
Box::new(layer) as Box<dyn Layer<S> + Send + Sync> (Box::new(layer) as Box<dyn Layer<S> + Send + Sync>, Box::pin(stream))
} }
} }
} }
fn entry_stream(
trace: tracing_trace::Trace,
guard: HandleGuard,
) -> impl Stream<Item = Result<Bytes, ResponseError>> {
let receiver = trace.into_receiver();
let entry_buf = Vec::new();
futures_util::stream::unfold(
(receiver, entry_buf, guard),
move |(mut receiver, mut entry_buf, guard)| async move {
let mut bytes = Vec::new();
while bytes.len() < 8192 {
entry_buf.clear();
let Ok(count) = tokio::time::timeout(
std::time::Duration::from_secs(1),
receiver.recv_many(&mut entry_buf, 100),
)
.await
else {
break;
};
if count == 0 {
// channel closed, exit
return None;
}
for entry in &entry_buf {
if let Err(error) = serde_json::to_writer(&mut bytes, entry) {
tracing::error!(
error = &error as &dyn std::error::Error,
"deserializing entry"
);
return Some((
Err(ResponseError::from_msg(
format!("error deserializing entry: {error}"),
Code::Internal,
)),
(receiver, entry_buf, guard),
));
}
}
}
Some((Ok(bytes.into()), (receiver, entry_buf, guard)))
},
)
}
pub async fn get_logs( pub async fn get_logs(
_auth_controller: GuardedData<ActionPolicy<{ actions::METRICS_ALL }>, Data<AuthController>>, _auth_controller: GuardedData<ActionPolicy<{ actions::METRICS_ALL }>, Data<AuthController>>,
logs: Data<LogRouteHandle>, logs: Data<LogRouteHandle>,
@ -163,30 +206,27 @@ pub async fn get_logs(
_req: HttpRequest, _req: HttpRequest,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let opt = body.into_inner(); let opt = body.into_inner();
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let mut was_available = false; let mut stream = None;
logs.modify(|layer| match layer.inner_mut() { logs.modify(|layer| match layer.inner_mut() {
None => { None => {
// there is no one getting logs // there is no one getting logs
was_available = true;
*layer.filter_mut() = *layer.filter_mut() =
tracing_subscriber::filter::Targets::from_str(&opt.target).unwrap(); tracing_subscriber::filter::Targets::from_str(&opt.target).unwrap();
let new_layer = make_layer(&opt, sender); let (new_layer, new_stream) = make_layer(&opt, logs.clone());
*layer.inner_mut() = Some(new_layer) *layer.inner_mut() = Some(new_layer);
stream = Some(new_stream);
} }
Some(_) => { Some(_) => {
// there is already someone getting logs // there is already someone getting logs
was_available = false;
} }
}) })
.unwrap(); .unwrap();
if was_available { if let Some(stream) = stream {
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok().streaming(stream))
.streaming(LogStreamer { receiver, logs: logs.into_inner() }.into_stream()))
} else { } else {
Err(MeilisearchHttpError::AlreadyUsedLogRoute.into()) Err(MeilisearchHttpError::AlreadyUsedLogRoute.into())
} }