mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-19 01:18:31 +08:00
Switch to tokio channel
This commit is contained in:
parent
eb5c725931
commit
07263bc0d7
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -5716,6 +5716,7 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"stats_alloc",
|
"stats_alloc",
|
||||||
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-error",
|
"tracing-error",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
@ -18,4 +18,5 @@ byte-unit = { version = "4.0.19", default-features = false, features = [
|
|||||||
"std",
|
"std",
|
||||||
"serde",
|
"serde",
|
||||||
] }
|
] }
|
||||||
|
tokio = { version = "1.35.1", features = ["sync"] }
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ use crate::{Error, Trace};
|
|||||||
|
|
||||||
/// Layer that measures the time spent in spans.
|
/// Layer that measures the time spent in spans.
|
||||||
pub struct TraceLayer<A: GlobalAlloc + 'static = System> {
|
pub struct TraceLayer<A: GlobalAlloc + 'static = System> {
|
||||||
sender: std::sync::mpsc::Sender<Entry>,
|
sender: tokio::sync::mpsc::UnboundedSender<Entry>,
|
||||||
callsites: RwLock<HashMap<OpaqueIdentifier, ResourceId>>,
|
callsites: RwLock<HashMap<OpaqueIdentifier, ResourceId>>,
|
||||||
start_time: std::time::Instant,
|
start_time: std::time::Instant,
|
||||||
memory_allocator: Option<&'static StatsAlloc<A>>,
|
memory_allocator: Option<&'static StatsAlloc<A>>,
|
||||||
@ -27,7 +27,7 @@ pub struct TraceLayer<A: GlobalAlloc + 'static = System> {
|
|||||||
|
|
||||||
impl<W: Write> Trace<W> {
|
impl<W: Write> Trace<W> {
|
||||||
pub fn new(writer: W) -> (Self, TraceLayer<System>) {
|
pub fn new(writer: W) -> (Self, TraceLayer<System>) {
|
||||||
let (sender, receiver) = std::sync::mpsc::channel();
|
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||||
let trace = Trace { writer, receiver };
|
let trace = Trace { writer, receiver };
|
||||||
let layer = TraceLayer {
|
let layer = TraceLayer {
|
||||||
sender,
|
sender,
|
||||||
@ -42,7 +42,7 @@ impl<W: Write> Trace<W> {
|
|||||||
writer: W,
|
writer: W,
|
||||||
stats_alloc: &'static StatsAlloc<A>,
|
stats_alloc: &'static StatsAlloc<A>,
|
||||||
) -> (Self, TraceLayer<A>) {
|
) -> (Self, TraceLayer<A>) {
|
||||||
let (sender, receiver) = std::sync::mpsc::channel();
|
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||||
let trace = Trace { writer, receiver };
|
let trace = Trace { writer, receiver };
|
||||||
let layer = TraceLayer {
|
let layer = TraceLayer {
|
||||||
sender,
|
sender,
|
||||||
@ -53,8 +53,17 @@ impl<W: Write> Trace<W> {
|
|||||||
(trace, layer)
|
(trace, layer)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
|
pub async fn receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
|
||||||
let Ok(entry) = self.receiver.recv() else {
|
let Some(entry) = self.receiver.recv().await else {
|
||||||
|
return Ok(ControlFlow::Break(()));
|
||||||
|
};
|
||||||
|
self.write(entry)?;
|
||||||
|
Ok(ControlFlow::Continue(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Panics if called from an asynchronous context
|
||||||
|
pub fn blocking_receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
|
||||||
|
let Some(entry) = self.receiver.blocking_recv() else {
|
||||||
return Ok(ControlFlow::Break(()));
|
return Ok(ControlFlow::Break(()));
|
||||||
};
|
};
|
||||||
self.write(entry)?;
|
self.write(entry)?;
|
||||||
|
@ -11,7 +11,7 @@ pub use error::Error;
|
|||||||
|
|
||||||
pub struct Trace<W: Write> {
|
pub struct Trace<W: Write> {
|
||||||
writer: W,
|
writer: W,
|
||||||
receiver: std::sync::mpsc::Receiver<Entry>,
|
receiver: tokio::sync::mpsc::UnboundedReceiver<Entry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TraceReader<R: Read> {
|
pub struct TraceReader<R: Read> {
|
||||||
|
Loading…
Reference in New Issue
Block a user