mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
Support Events in trace layer
This commit is contained in:
parent
ef49ae7d6f
commit
eb5c725931
@ -33,6 +33,9 @@ pub enum Entry {
|
||||
|
||||
/// A call ended
|
||||
SpanClose(SpanClose),
|
||||
|
||||
/// An event occurred
|
||||
Event(Event),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
@ -74,6 +77,15 @@ pub struct SpanExit {
|
||||
pub memory: Option<MemoryStats>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct Event {
|
||||
pub call_id: ResourceId,
|
||||
pub thread_id: ResourceId,
|
||||
pub parent_id: Option<SpanId>,
|
||||
pub time: std::time::Duration,
|
||||
pub memory: Option<MemoryStats>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct NewSpan {
|
||||
pub id: SpanId,
|
||||
|
@ -12,7 +12,8 @@ use tracing_subscriber::layer::Context;
|
||||
use tracing_subscriber::Layer;
|
||||
|
||||
use crate::entry::{
|
||||
Entry, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId,
|
||||
Entry, Event, MemoryStats, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter,
|
||||
SpanExit, SpanId,
|
||||
};
|
||||
use crate::{Error, Trace};
|
||||
|
||||
@ -98,6 +99,10 @@ impl<A: GlobalAlloc> TraceLayer<A> {
|
||||
self.start_time.elapsed()
|
||||
}
|
||||
|
||||
fn memory_stats(&self) -> Option<MemoryStats> {
|
||||
self.memory_allocator.map(|ma| ma.stats().into())
|
||||
}
|
||||
|
||||
fn send(&self, entry: Entry) {
|
||||
// we never care that the other end hanged on us
|
||||
let _ = self.sender.send(entry);
|
||||
@ -159,7 +164,7 @@ where
|
||||
self.send(Entry::SpanEnter(SpanEnter {
|
||||
id: id.into(),
|
||||
time: self.elapsed(),
|
||||
memory: self.memory_allocator.map(|ma| ma.stats().into()),
|
||||
memory: self.memory_stats(),
|
||||
}))
|
||||
}
|
||||
|
||||
@ -167,7 +172,31 @@ where
|
||||
self.send(Entry::SpanExit(SpanExit {
|
||||
id: id.into(),
|
||||
time: self.elapsed(),
|
||||
memory: self.memory_allocator.map(|ma| ma.stats().into()),
|
||||
memory: self.memory_stats(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
|
||||
let call_id = self
|
||||
.resource_id(OpaqueIdentifier::Call(event.metadata().callsite()))
|
||||
.unwrap_or_else(|| self.register_callsite(event.metadata()));
|
||||
|
||||
let thread_id = self
|
||||
.resource_id(OpaqueIdentifier::Thread(std::thread::current().id()))
|
||||
.unwrap_or_else(|| self.register_thread());
|
||||
|
||||
let parent_id = event
|
||||
.parent()
|
||||
.cloned()
|
||||
.or_else(|| tracing::Span::current().id())
|
||||
.map(|id| SpanId::from(&id));
|
||||
|
||||
self.send(Entry::Event(Event {
|
||||
call_id,
|
||||
thread_id,
|
||||
parent_id,
|
||||
time: self.elapsed(),
|
||||
memory: self.memory_stats(),
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,8 @@ use fxprof_processed_profile::{
|
||||
use serde_json::json;
|
||||
|
||||
use crate::entry::{
|
||||
Entry, MemoryStats, NewCallsite, NewSpan, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId,
|
||||
Entry, Event, MemoryStats, NewCallsite, NewSpan, ResourceId, SpanClose, SpanEnter, SpanExit,
|
||||
SpanId,
|
||||
};
|
||||
use crate::{Error, TraceReader};
|
||||
|
||||
@ -32,7 +33,7 @@ pub fn to_firefox_profile<R: std::io::Read>(
|
||||
let category = profile.add_category("general", fxprof_processed_profile::CategoryColor::Blue);
|
||||
let subcategory = profile.add_subcategory(category, "subcategory");
|
||||
|
||||
let mut current_memory = MemoryStats::default();
|
||||
let mut last_memory = MemoryStats::default();
|
||||
|
||||
let mut memory_counters = None;
|
||||
|
||||
@ -75,7 +76,7 @@ pub fn to_firefox_profile<R: std::io::Read>(
|
||||
memory,
|
||||
last_timestamp,
|
||||
&mut memory_counters,
|
||||
&mut current_memory,
|
||||
&mut last_memory,
|
||||
);
|
||||
}
|
||||
Entry::SpanExit(SpanExit { id, time, memory }) => {
|
||||
@ -117,7 +118,7 @@ pub fn to_firefox_profile<R: std::io::Read>(
|
||||
memory,
|
||||
last_timestamp,
|
||||
&mut memory_counters,
|
||||
&mut current_memory,
|
||||
&mut last_memory,
|
||||
);
|
||||
|
||||
let (callsite, _) = calls.get(&span.call_id).unwrap();
|
||||
@ -137,9 +138,58 @@ pub fn to_firefox_profile<R: std::io::Read>(
|
||||
frames.iter().rev().cloned(),
|
||||
)
|
||||
}
|
||||
Entry::Event(event) => {
|
||||
let span = event
|
||||
.parent_id
|
||||
.as_ref()
|
||||
.and_then(|parent_id| spans.get(parent_id))
|
||||
.and_then(|(span, status)| match status {
|
||||
SpanStatus::Outside => None,
|
||||
SpanStatus::Inside { .. } => Some(span),
|
||||
})
|
||||
.copied();
|
||||
let timestamp = to_timestamp(event.time);
|
||||
|
||||
let thread_handle = threads.get(&event.thread_id).unwrap();
|
||||
|
||||
let frames = span
|
||||
.map(|span| make_frames(span, &spans, &calls, subcategory))
|
||||
.unwrap_or_default();
|
||||
|
||||
profile.add_sample(
|
||||
*thread_handle,
|
||||
timestamp,
|
||||
frames.iter().rev().cloned(),
|
||||
CpuDelta::ZERO,
|
||||
1,
|
||||
);
|
||||
|
||||
let memory_delta = add_memory_samples(
|
||||
&mut profile,
|
||||
main,
|
||||
event.memory,
|
||||
last_timestamp,
|
||||
&mut memory_counters,
|
||||
&mut last_memory,
|
||||
);
|
||||
|
||||
let (callsite, _) = calls.get(&event.call_id).unwrap();
|
||||
|
||||
let marker = EventMarker { callsite, event: &event, memory_delta };
|
||||
|
||||
profile.add_marker_with_stack(
|
||||
*thread_handle,
|
||||
&callsite.name,
|
||||
marker,
|
||||
fxprof_processed_profile::MarkerTiming::Instant(timestamp),
|
||||
frames.iter().rev().cloned(),
|
||||
);
|
||||
|
||||
last_timestamp = timestamp;
|
||||
}
|
||||
Entry::SpanClose(SpanClose { id, time }) => {
|
||||
spans.remove(&id);
|
||||
last_timestamp = Timestamp::from_nanos_since_reference(time.as_nanos() as u64);
|
||||
last_timestamp = to_timestamp(time);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -166,9 +216,9 @@ fn add_memory_samples(
|
||||
last_timestamp: Timestamp,
|
||||
memory_counters: &mut Option<MemoryCounterHandles>,
|
||||
last_memory: &mut MemoryStats,
|
||||
) {
|
||||
) -> Option<MemoryStats> {
|
||||
let Some(stats) = memory else {
|
||||
return;
|
||||
return None;
|
||||
};
|
||||
|
||||
let memory_counters =
|
||||
@ -181,7 +231,9 @@ fn add_memory_samples(
|
||||
stats.operations().checked_sub(last_memory.operations()).unwrap_or_default() as u32,
|
||||
);
|
||||
|
||||
let delta = stats.checked_sub(*last_memory);
|
||||
*last_memory = stats;
|
||||
delta
|
||||
}
|
||||
|
||||
fn to_timestamp(time: std::time::Duration) -> Timestamp {
|
||||
@ -352,3 +404,133 @@ impl<'a> ProfilerMarker for SpanMarker<'a> {
|
||||
value
|
||||
}
|
||||
}
|
||||
|
||||
struct EventMarker<'a> {
|
||||
event: &'a Event,
|
||||
callsite: &'a NewCallsite,
|
||||
memory_delta: Option<MemoryStats>,
|
||||
}
|
||||
|
||||
impl<'a> ProfilerMarker for EventMarker<'a> {
|
||||
const MARKER_TYPE_NAME: &'static str = "tracing-event";
|
||||
|
||||
fn schema() -> MarkerSchema {
|
||||
let fields = vec![
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "filename",
|
||||
label: "File name",
|
||||
format: MarkerFieldFormat::FilePath,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "line",
|
||||
label: "Line",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "module_path",
|
||||
label: "Module path",
|
||||
format: MarkerFieldFormat::String,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "parent_span_id",
|
||||
label: "Parent Span ID",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "thread_id",
|
||||
label: "Thread ID",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: true,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "allocations",
|
||||
label: "Number of allocation operations since last measure",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: false,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "deallocations",
|
||||
label: "Number of deallocation operations since last measure",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: false,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "reallocations",
|
||||
label: "Number of reallocation operations since last measure",
|
||||
format: MarkerFieldFormat::Integer,
|
||||
searchable: false,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "allocated_bytes",
|
||||
label: "Number of allocated bytes since last measure",
|
||||
format: MarkerFieldFormat::Bytes,
|
||||
searchable: false,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "deallocated_bytes",
|
||||
label: "Number of deallocated bytes since last measure",
|
||||
format: MarkerFieldFormat::Bytes,
|
||||
searchable: false,
|
||||
}),
|
||||
MarkerSchemaField::Dynamic(MarkerDynamicField {
|
||||
key: "reallocated_bytes",
|
||||
label: "Number of reallocated bytes since last measure",
|
||||
format: MarkerFieldFormat::Bytes,
|
||||
searchable: false,
|
||||
}),
|
||||
];
|
||||
|
||||
MarkerSchema {
|
||||
type_name: Self::MARKER_TYPE_NAME,
|
||||
locations: vec![
|
||||
MarkerLocation::MarkerTable,
|
||||
MarkerLocation::MarkerChart,
|
||||
MarkerLocation::TimelineOverview,
|
||||
],
|
||||
chart_label: None,
|
||||
tooltip_label: Some("{marker.name} - {marker.data.filename}:{marker.data.line}"),
|
||||
table_label: Some("{marker.data.filename}:{marker.data.line}"),
|
||||
fields,
|
||||
}
|
||||
}
|
||||
|
||||
fn json_marker_data(&self) -> serde_json::Value {
|
||||
let filename = self.callsite.file.as_deref();
|
||||
let line = self.callsite.line;
|
||||
let module_path = self.callsite.module_path.as_deref();
|
||||
let span_id = self.event.parent_id;
|
||||
let thread_id = self.event.thread_id;
|
||||
|
||||
let mut value = json!({
|
||||
"type": Self::MARKER_TYPE_NAME,
|
||||
"filename": filename,
|
||||
"line": line,
|
||||
"module_path": module_path,
|
||||
"parent_span_id": span_id,
|
||||
"thread_id": thread_id,
|
||||
});
|
||||
|
||||
if let Some(MemoryStats {
|
||||
allocations,
|
||||
deallocations,
|
||||
reallocations,
|
||||
bytes_allocated,
|
||||
bytes_deallocated,
|
||||
bytes_reallocated,
|
||||
}) = self.memory_delta
|
||||
{
|
||||
value["allocations"] = json!(allocations);
|
||||
value["deallocations"] = json!(deallocations);
|
||||
value["reallocations"] = json!(reallocations);
|
||||
value["allocated_bytes"] = json!(bytes_allocated);
|
||||
value["deallocated_bytes"] = json!(bytes_deallocated);
|
||||
value["reallocated_bytes"] = json!(bytes_reallocated);
|
||||
}
|
||||
|
||||
value
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
use std::io::Read;
|
||||
|
||||
use crate::entry::{
|
||||
Entry, MemoryStats, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter,
|
||||
Entry, Event, MemoryStats, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter,
|
||||
SpanExit, SpanId,
|
||||
};
|
||||
use crate::{Error, TraceReader};
|
||||
@ -89,6 +89,42 @@ pub fn print_trace<R: Read>(trace: TraceReader<R>) -> Result<(), Error> {
|
||||
Entry::SpanClose(SpanClose { id, time: _ }) => {
|
||||
spans.remove(&id);
|
||||
}
|
||||
Entry::Event(Event { call_id, thread_id, parent_id, time: _, memory }) => {
|
||||
let parent_span = parent_id.and_then(|parent_id| spans.get(&parent_id)).and_then(
|
||||
|(span, status)| match status {
|
||||
SpanStatus::Outside => None,
|
||||
SpanStatus::Inside(_) => Some(span),
|
||||
},
|
||||
);
|
||||
match (parent_span, memory) {
|
||||
(Some(parent_span), Some(stats)) => println!(
|
||||
"[{}]{}::{} ({}) event: {}",
|
||||
print_thread(&threads, thread_id),
|
||||
print_backtrace(&spans, &calls, parent_span),
|
||||
print_span(&calls, parent_span),
|
||||
print_memory(stats),
|
||||
print_call(&calls, call_id)
|
||||
),
|
||||
(Some(parent_span), None) => println!(
|
||||
"[{}]{}::{} event: {}",
|
||||
print_thread(&threads, thread_id),
|
||||
print_backtrace(&spans, &calls, parent_span),
|
||||
print_span(&calls, parent_span),
|
||||
print_call(&calls, call_id)
|
||||
),
|
||||
(None, None) => println!(
|
||||
"[{}] event: {}",
|
||||
print_thread(&threads, thread_id),
|
||||
print_call(&calls, call_id)
|
||||
),
|
||||
(None, Some(stats)) => println!(
|
||||
"[{}] ({}) event: {}",
|
||||
print_thread(&threads, thread_id),
|
||||
print_memory(stats),
|
||||
print_call(&calls, call_id)
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@ -121,7 +157,11 @@ fn print_backtrace(
|
||||
}
|
||||
|
||||
fn print_span(calls: &HashMap<ResourceId, NewCallsite>, span: &NewSpan) -> String {
|
||||
let callsite = calls.get(&span.call_id).unwrap();
|
||||
print_call(calls, span.call_id)
|
||||
}
|
||||
|
||||
fn print_call(calls: &HashMap<ResourceId, NewCallsite>, call_id: ResourceId) -> String {
|
||||
let callsite = calls.get(&call_id).unwrap();
|
||||
match (callsite.file.clone(), callsite.line) {
|
||||
(Some(file), None) => format!("{} ({})", callsite.name, file),
|
||||
(Some(file), Some(line)) => format!("{} ({}:{})", callsite.name, file, line),
|
||||
|
Loading…
Reference in New Issue
Block a user