From 38e1c40f38174876221fa1c5ccc9c1dc7798785e Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 31 Jan 2024 17:47:30 +0100 Subject: [PATCH] meilisearch: logs route disconnects in profile mode --- meilisearch/src/routes/logs.rs | 128 +++++++++++++++++++++------------ 1 file changed, 84 insertions(+), 44 deletions(-) diff --git a/meilisearch/src/routes/logs.rs b/meilisearch/src/routes/logs.rs index ca1ca2d2f..b327acab1 100644 --- a/meilisearch/src/routes/logs.rs +++ b/meilisearch/src/routes/logs.rs @@ -1,6 +1,6 @@ use std::fmt; use std::io::Write; -use std::ops::ControlFlow; +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; @@ -8,11 +8,12 @@ use actix_web::web::{Bytes, Data}; use actix_web::{web, HttpRequest, HttpResponse}; use deserr::actix_web::AwebJson; use deserr::Deserr; +use futures_util::Stream; use meilisearch_auth::AuthController; use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::error::deserr_codes::*; -use meilisearch_types::error::ResponseError; -use tokio::sync::mpsc::{self, UnboundedSender}; +use meilisearch_types::error::{Code, ResponseError}; +use tokio::sync::mpsc::{self}; use tracing_subscriber::Layer; use crate::error::MeilisearchHttpError; @@ -91,13 +92,12 @@ impl Write for LogWriter { } } -struct LogStreamer { - receiver: mpsc::UnboundedReceiver>, +struct HandleGuard { /// We need to keep an handle on the logs to make it available again when the streamer is dropped logs: Arc, } -impl Drop for LogStreamer { +impl Drop for HandleGuard { fn drop(&mut self) { println!("log streamer being dropped"); if let Err(e) = self.logs.modify(|layer| *layer.inner_mut() = None) { @@ -106,56 +106,99 @@ impl Drop for LogStreamer { } } -impl LogStreamer { - pub fn into_stream(self) -> impl futures_util::Stream> { - futures_util::stream::unfold(self, move |mut this| async move { - let vec = this.receiver.recv().await; +fn byte_stream( + receiver: mpsc::UnboundedReceiver>, + guard: HandleGuard, +) -> impl futures_util::Stream> { + futures_util::stream::unfold((receiver, guard), move |(mut receiver, guard)| async move { + let vec = receiver.recv().await; - vec.map(From::from).map(Ok).map(|a| (a, this)) - }) - } + vec.map(From::from).map(Ok).map(|a| (a, (receiver, guard))) + }) } -pub fn make_layer< +type PinnedByteStream = Pin>>>; + +fn make_layer< S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, >( opt: &GetLogs, - sender: UnboundedSender>, -) -> Box + Send + Sync> { + logs: Data, +) -> (Box + Send + Sync>, PinnedByteStream) { + let guard = HandleGuard { logs: logs.into_inner() }; match opt.mode { LogMode::Fmt => { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let fmt_layer = tracing_subscriber::fmt::layer() .with_line_number(true) .with_writer(move || LogWriter { sender: sender.clone() }) .with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE); - Box::new(fmt_layer) as Box + Send + Sync> + let stream = byte_stream(receiver, guard); + (Box::new(fmt_layer) as Box + Send + Sync>, Box::pin(stream)) } LogMode::Profile => { - let (mut trace, layer) = - tracing_trace::Trace::new(LogWriter { sender: sender.clone() }); + let (trace, layer) = tracing_trace::Trace::new(); - tokio::task::spawn(async move { - loop { - match tokio::time::timeout(std::time::Duration::from_secs(1), trace.receive()) - .await - { - Ok(Ok(ControlFlow::Continue(()))) => continue, - Ok(Ok(ControlFlow::Break(_))) => break, - // the other half of the channel was dropped - Ok(Err(_)) => break, - Err(_) => trace.flush().unwrap(), - } - } - while trace.try_receive().is_ok() {} - trace.flush().unwrap(); - }); + let stream = entry_stream(trace, guard); - Box::new(layer) as Box + Send + Sync> + (Box::new(layer) as Box + Send + Sync>, Box::pin(stream)) } } } +fn entry_stream( + trace: tracing_trace::Trace, + guard: HandleGuard, +) -> impl Stream> { + let receiver = trace.into_receiver(); + let entry_buf = Vec::new(); + + futures_util::stream::unfold( + (receiver, entry_buf, guard), + move |(mut receiver, mut entry_buf, guard)| async move { + let mut bytes = Vec::new(); + + while bytes.len() < 8192 { + entry_buf.clear(); + + let Ok(count) = tokio::time::timeout( + std::time::Duration::from_secs(1), + receiver.recv_many(&mut entry_buf, 100), + ) + .await + else { + break; + }; + + if count == 0 { + // channel closed, exit + return None; + } + + for entry in &entry_buf { + if let Err(error) = serde_json::to_writer(&mut bytes, entry) { + tracing::error!( + error = &error as &dyn std::error::Error, + "deserializing entry" + ); + return Some(( + Err(ResponseError::from_msg( + format!("error deserializing entry: {error}"), + Code::Internal, + )), + (receiver, entry_buf, guard), + )); + } + } + } + + Some((Ok(bytes.into()), (receiver, entry_buf, guard))) + }, + ) +} + pub async fn get_logs( _auth_controller: GuardedData, Data>, logs: Data, @@ -163,30 +206,27 @@ pub async fn get_logs( _req: HttpRequest, ) -> Result { let opt = body.into_inner(); - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - let mut was_available = false; + let mut stream = None; logs.modify(|layer| match layer.inner_mut() { None => { // there is no one getting logs - was_available = true; *layer.filter_mut() = tracing_subscriber::filter::Targets::from_str(&opt.target).unwrap(); - let new_layer = make_layer(&opt, sender); + let (new_layer, new_stream) = make_layer(&opt, logs.clone()); - *layer.inner_mut() = Some(new_layer) + *layer.inner_mut() = Some(new_layer); + stream = Some(new_stream); } Some(_) => { // there is already someone getting logs - was_available = false; } }) .unwrap(); - if was_available { - Ok(HttpResponse::Ok() - .streaming(LogStreamer { receiver, logs: logs.into_inner() }.into_stream())) + if let Some(stream) = stream { + Ok(HttpResponse::Ok().streaming(stream)) } else { Err(MeilisearchHttpError::AlreadyUsedLogRoute.into()) }