Switch to tokio channel

This commit is contained in:
Louis Dureuil 2024-01-23 16:52:48 +01:00
parent b141c82a04
commit 8febbf64ce
No known key found for this signature in database
4 changed files with 17 additions and 6 deletions

1
Cargo.lock generated
View File

@ -5716,6 +5716,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"stats_alloc", "stats_alloc",
"tokio",
"tracing", "tracing",
"tracing-error", "tracing-error",
"tracing-subscriber", "tracing-subscriber",

View File

@ -18,4 +18,5 @@ byte-unit = { version = "4.0.19", default-features = false, features = [
"std", "std",
"serde", "serde",
] } ] }
tokio = { version = "1.35.1", features = ["sync"] }

View File

@ -19,7 +19,7 @@ use crate::{Error, Trace};
/// Layer that measures the time spent in spans. /// Layer that measures the time spent in spans.
pub struct TraceLayer<A: GlobalAlloc + 'static = System> { pub struct TraceLayer<A: GlobalAlloc + 'static = System> {
sender: std::sync::mpsc::Sender<Entry>, sender: tokio::sync::mpsc::UnboundedSender<Entry>,
callsites: RwLock<HashMap<OpaqueIdentifier, ResourceId>>, callsites: RwLock<HashMap<OpaqueIdentifier, ResourceId>>,
start_time: std::time::Instant, start_time: std::time::Instant,
memory_allocator: Option<&'static StatsAlloc<A>>, memory_allocator: Option<&'static StatsAlloc<A>>,
@ -27,7 +27,7 @@ pub struct TraceLayer<A: GlobalAlloc + 'static = System> {
impl<W: Write> Trace<W> { impl<W: Write> Trace<W> {
pub fn new(writer: W) -> (Self, TraceLayer<System>) { pub fn new(writer: W) -> (Self, TraceLayer<System>) {
let (sender, receiver) = std::sync::mpsc::channel(); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let trace = Trace { writer, receiver }; let trace = Trace { writer, receiver };
let layer = TraceLayer { let layer = TraceLayer {
sender, sender,
@ -42,7 +42,7 @@ impl<W: Write> Trace<W> {
writer: W, writer: W,
stats_alloc: &'static StatsAlloc<A>, stats_alloc: &'static StatsAlloc<A>,
) -> (Self, TraceLayer<A>) { ) -> (Self, TraceLayer<A>) {
let (sender, receiver) = std::sync::mpsc::channel(); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let trace = Trace { writer, receiver }; let trace = Trace { writer, receiver };
let layer = TraceLayer { let layer = TraceLayer {
sender, sender,
@ -53,8 +53,17 @@ impl<W: Write> Trace<W> {
(trace, layer) (trace, layer)
} }
pub fn receive(&mut self) -> Result<ControlFlow<(), ()>, Error> { pub async fn receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
let Ok(entry) = self.receiver.recv() else { let Some(entry) = self.receiver.recv().await else {
return Ok(ControlFlow::Break(()));
};
self.write(entry)?;
Ok(ControlFlow::Continue(()))
}
/// Panics if called from an asynchronous context
pub fn blocking_receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
let Some(entry) = self.receiver.blocking_recv() else {
return Ok(ControlFlow::Break(())); return Ok(ControlFlow::Break(()));
}; };
self.write(entry)?; self.write(entry)?;

View File

@ -11,7 +11,7 @@ pub use error::Error;
pub struct Trace<W: Write> { pub struct Trace<W: Write> {
writer: W, writer: W,
receiver: std::sync::mpsc::Receiver<Entry>, receiver: tokio::sync::mpsc::UnboundedReceiver<Entry>,
} }
pub struct TraceReader<R: Read> { pub struct TraceReader<R: Read> {