diff --git a/Cargo.lock b/Cargo.lock index 3f9171edc..9e2cbbb31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -961,6 +961,18 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" +[[package]] +name = "color-spantrace" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6be1b2a7e382e2b98b43b2adcca6bb0e465af0bdd38123873ae61eb17a72c2" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -1286,6 +1298,15 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "uuid", +] + [[package]] name = "deduplicating_array" version = "0.1.5" @@ -1911,6 +1932,19 @@ dependencies = [ "byteorder", ] +[[package]] +name = "fxprof-processed-profile" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27d12c0aed7f1e24276a241aadc4cb8ea9f83000f34bc062b7cc2d51e3b0fabd" +dependencies = [ + "bitflags 2.4.1", + "debugid", + "fxhash", + "serde", + "serde_json", +] + [[package]] name = "gemm" version = "0.17.0" @@ -3916,6 +3950,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -4037,6 +4081,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "page_size" version = "0.5.0" @@ -4994,6 +5050,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -5326,6 +5391,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tiktoken-rs" version = "0.5.8" @@ -5554,11 +5629,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -5578,11 +5652,60 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "tracing-trace" +version = "0.1.0" +dependencies = [ + "color-spantrace", + "fxprof-processed-profile", + "serde", + "serde_json", + "tracing", + "tracing-error", + "tracing-subscriber", ] [[package]] @@ -5758,6 +5881,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index a0c6c3ac9..7f6a8088e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,16 @@ members = [ "json-depth-checker", "benchmarks", "fuzzers", + "tracing-trace", "xtask", ] [workspace.package] version = "1.6.1" -authors = ["Quentin de Quelen ", "Clément Renault "] +authors = [ + "Quentin de Quelen ", + "Clément Renault ", +] description = "Meilisearch HTTP server" homepage = "https://meilisearch.com" readme = "README.md" diff --git a/tracing-trace/.gitignore b/tracing-trace/.gitignore new file mode 100644 index 000000000..ea8c4bf7f --- /dev/null +++ b/tracing-trace/.gitignore @@ -0,0 +1 @@ +/target diff --git a/tracing-trace/Cargo.toml b/tracing-trace/Cargo.toml new file mode 100644 index 000000000..9215fdfd0 --- /dev/null +++ b/tracing-trace/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tracing-trace" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +color-spantrace = "0.2.1" +fxprof-processed-profile = "0.6.0" +serde = { version = "1.0.195", features = ["derive"] } +serde_json = "1.0.111" +tracing = "0.1.40" +tracing-error = "0.2.0" +tracing-subscriber = "0.3.18" diff --git a/tracing-trace/src/bin/trace-to-firefox.rs b/tracing-trace/src/bin/trace-to-firefox.rs new file mode 100644 index 000000000..21adff41d --- /dev/null +++ b/tracing-trace/src/bin/trace-to-firefox.rs @@ -0,0 +1,18 @@ +use std::ffi::OsString; +use std::io::Write; + +fn main() { + let input_file = std::env::args_os().nth(1).expect("missing file"); + let input = + std::io::BufReader::new(std::fs::File::open(&input_file).expect("could not open ")); + let trace = tracing_trace::TraceReader::new(input); + let profile = + tracing_trace::processor::firefox_profiler::to_firefox_profile(trace, "Meilisearch") + .unwrap(); + let mut output_file = OsString::new(); + output_file.push("firefox-"); + output_file.push(input_file); + let mut output_file = std::io::BufWriter::new(std::fs::File::create(output_file).unwrap()); + serde_json::to_writer(&mut output_file, &profile).unwrap(); + output_file.flush().unwrap(); +} diff --git a/tracing-trace/src/entry.rs b/tracing-trace/src/entry.rs new file mode 100644 index 000000000..dd91a2a61 --- /dev/null +++ b/tracing-trace/src/entry.rs @@ -0,0 +1,96 @@ +use std::borrow::Cow; + +use serde::{Deserialize, Serialize}; +use tracing::span::Id as TracingId; + +#[derive(Debug, Serialize, Deserialize)] +pub enum Entry { + /// A code location was accessed for the first time + NewCallsite(NewCallsite), + + /// A new thread was accessed + NewThread(NewThread), + + /// A new call started + NewSpan(NewSpan), + + /// An already in-flight call started doing work. + /// + /// For synchronous functions, open should always be followed immediately by enter, exit and close, + /// but for asynchronous functions, work can suspend (exiting the span without closing it), and then + /// later resume (entering the span again without opening it). + /// + /// The timer for a span only starts when the span is entered. + SpanEnter(SpanEnter), + + /// An in-flight call suspended and paused work. + /// + /// For synchronous functions, exit should always be followed immediately by close, + /// but for asynchronous functions, work can suspend and then later resume. + /// + /// The timer for a span pauses when the span is exited. + SpanExit(SpanExit), + + /// A call ended + SpanClose(SpanClose), +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct SpanId(u64); + +impl From<&TracingId> for SpanId { + fn from(value: &TracingId) -> Self { + Self(value.into_u64()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NewCallsite { + pub call_id: ResourceId, + pub name: Cow<'static, str>, + pub module_path: Option>, + pub file: Option>, + pub line: Option, + pub target: Cow<'static, str>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NewThread { + pub thread_id: ResourceId, + pub name: Option, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct SpanEnter { + pub id: SpanId, + pub time: std::time::Duration, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct SpanExit { + pub id: SpanId, + pub time: std::time::Duration, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct NewSpan { + pub id: SpanId, + pub call_id: ResourceId, + pub parent_id: Option, + pub thread_id: ResourceId, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct SpanClose { + pub id: SpanId, + pub time: std::time::Duration, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct ResourceId(pub(crate) usize); + +impl ResourceId { + pub fn to_usize(self) -> usize { + self.0 + } +} diff --git a/tracing-trace/src/error.rs b/tracing-trace/src/error.rs new file mode 100644 index 000000000..cce13f85c --- /dev/null +++ b/tracing-trace/src/error.rs @@ -0,0 +1,19 @@ +#[derive(Debug)] +pub enum Error { + Json(serde_json::Error), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("error de/serializing trace entry:")?; + match self { + Error::Json(error) => std::fmt::Display::fmt(&error, f), + } + } +} + +impl From for Error { + fn from(value: serde_json::Error) -> Self { + Self::Json(value) + } +} diff --git a/tracing-trace/src/layer.rs b/tracing-trace/src/layer.rs new file mode 100644 index 000000000..cbc5cf6b2 --- /dev/null +++ b/tracing-trace/src/layer.rs @@ -0,0 +1,152 @@ +use std::borrow::Cow; +use std::collections::HashMap; +use std::io::Write; +use std::ops::ControlFlow; +use std::sync::RwLock; + +use tracing::span::{Attributes, Id as TracingId}; +use tracing::{Metadata, Subscriber}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::Layer; + +use crate::entry::{ + Entry, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId, +}; +use crate::{Error, Trace}; + +/// Layer that measures the time spent in spans. +pub struct TraceLayer { + sender: std::sync::mpsc::Sender, + callsites: RwLock>, + start_time: std::time::Instant, + // TODO: kero add handle to allocator stats here +} + +impl Trace { + pub fn new(writer: W) -> (Self, TraceLayer) { + let (sender, receiver) = std::sync::mpsc::channel(); + let trace = Trace { writer, receiver }; + let layer = TraceLayer { + sender, + callsites: Default::default(), + start_time: std::time::Instant::now(), + }; + (trace, layer) + } + + pub fn receive(&mut self) -> Result, Error> { + let Ok(entry) = self.receiver.recv() else { + return Ok(ControlFlow::Break(())); + }; + self.write(entry)?; + Ok(ControlFlow::Continue(())) + } + + pub fn write(&mut self, entry: Entry) -> Result<(), Error> { + Ok(serde_json::ser::to_writer(&mut self.writer, &entry)?) + } + + pub fn try_receive(&mut self) -> Result, Error> { + let Ok(entry) = self.receiver.try_recv() else { + return Ok(ControlFlow::Break(())); + }; + self.write(entry)?; + Ok(ControlFlow::Continue(())) + } + + pub fn flush(&mut self) -> Result<(), std::io::Error> { + self.writer.flush() + } +} + +#[derive(PartialEq, Eq, Hash)] +enum OpaqueIdentifier { + Thread(std::thread::ThreadId), + Call(tracing::callsite::Identifier), +} + +impl TraceLayer { + fn resource_id(&self, opaque: OpaqueIdentifier) -> Option { + self.callsites.read().unwrap().get(&opaque).copied() + } + + fn register_resource_id(&self, opaque: OpaqueIdentifier) -> ResourceId { + let mut map = self.callsites.write().unwrap(); + let len = map.len(); + *map.entry(opaque).or_insert(ResourceId(len)) + } + + fn elapsed(&self) -> std::time::Duration { + self.start_time.elapsed() + } + + fn send(&self, entry: Entry) { + // we never care that the other end hanged on us + let _ = self.sender.send(entry); + } + + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> ResourceId { + let call_id = self.register_resource_id(OpaqueIdentifier::Call(metadata.callsite())); + + let module_path = metadata.module_path(); + let file = metadata.file(); + let line = metadata.line(); + let name = metadata.name(); + let target = metadata.target(); + + self.send(Entry::NewCallsite(NewCallsite { + call_id, + module_path: module_path.map(Cow::Borrowed), + file: file.map(Cow::Borrowed), + line, + name: Cow::Borrowed(name), + target: Cow::Borrowed(target), + })); + call_id + } + + fn register_thread(&self) -> ResourceId { + let thread_id = std::thread::current().id(); + let name = std::thread::current().name().map(ToOwned::to_owned); + let thread_id = self.register_resource_id(OpaqueIdentifier::Thread(thread_id)); + self.send(Entry::NewThread(NewThread { thread_id, name })); + thread_id + } +} + +impl Layer for TraceLayer +where + S: Subscriber, +{ + fn on_new_span(&self, attrs: &Attributes<'_>, id: &TracingId, _ctx: Context<'_, S>) { + let call_id = self + .resource_id(OpaqueIdentifier::Call(attrs.metadata().callsite())) + .unwrap_or_else(|| self.register_callsite(attrs.metadata())); + + let thread_id = self + .resource_id(OpaqueIdentifier::Thread(std::thread::current().id())) + .unwrap_or_else(|| self.register_thread()); + + let parent_id = attrs + .parent() + .cloned() + .or_else(|| tracing::Span::current().id()) + .map(|id| SpanId::from(&id)); + + self.send(Entry::NewSpan(NewSpan { id: id.into(), call_id, parent_id, thread_id })); + } + + fn on_enter(&self, id: &TracingId, _ctx: Context<'_, S>) { + // TODO kero: add memory here + self.send(Entry::SpanEnter(SpanEnter { id: id.into(), time: self.elapsed() })) + } + + fn on_exit(&self, id: &TracingId, _ctx: Context<'_, S>) { + // TODO kero: add memory here + self.send(Entry::SpanExit(SpanExit { id: id.into(), time: self.elapsed() })) + } + + fn on_close(&self, id: TracingId, _ctx: Context<'_, S>) { + self.send(Entry::SpanClose(SpanClose { id: Into::into(&id), time: self.elapsed() })) + } +} diff --git a/tracing-trace/src/lib.rs b/tracing-trace/src/lib.rs new file mode 100644 index 000000000..5e0f46d47 --- /dev/null +++ b/tracing-trace/src/lib.rs @@ -0,0 +1,40 @@ +use std::io::{Read, Write}; + +use entry::Entry; + +pub mod entry; +mod error; +pub mod layer; +pub mod processor; + +pub use error::Error; + +pub struct Trace { + writer: W, + receiver: std::sync::mpsc::Receiver, +} + +pub struct TraceReader { + reader: R, +} + +impl TraceReader { + pub fn new(reader: R) -> Self { + Self { reader } + } + + fn read(&mut self) -> Option> { + serde_json::Deserializer::from_reader(&mut self.reader) + .into_iter() + .next() + .map(|res| res.map_err(Into::into)) + } +} + +impl Iterator for TraceReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.read() + } +} diff --git a/tracing-trace/src/main.rs b/tracing-trace/src/main.rs new file mode 100644 index 000000000..f9f665861 --- /dev/null +++ b/tracing-trace/src/main.rs @@ -0,0 +1,133 @@ +use tracing::{instrument, Span}; +use tracing_error::{ErrorLayer, InstrumentResult, SpanTrace, TracedError}; + +#[instrument(level = "trace", target = "profile::indexing")] +fn foo() -> Result<(), TracedError> { + let _ = bar(40, 2); + bar(40, 2) +} + +#[derive(Debug)] +pub enum Error { + XTooBig, +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("x too big") + } +} + +impl std::error::Error for Error {} + +#[instrument(level = "trace", target = "profile::indexing")] +fn bar(x: u32, y: u32) -> Result<(), TracedError> { + let handle_ok = spawn_in_current_scope(move || baz(y)); + let handle = spawn_in_current_scope(move || baz(x + y)); + handle_ok.join().unwrap().and(handle.join().unwrap()) +} + +pub fn spawn_in_current_scope(f: F) -> std::thread::JoinHandle +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let current = Span::current(); + std::thread::spawn(move || { + let span = tracing::trace_span!(parent: ¤t, "thread_spawn", id = ?std::thread::current().id(), name = tracing::field::Empty); + if let Some(name) = std::thread::current().name() { + span.record("name", name); + } + span.in_scope(f) + }) +} + +#[instrument(level = "trace", target = "profile::indexing")] +fn baz(x: u32) -> Result<(), TracedError> { + if x > 10 { + fibo_recursive(10); + return Err(Error::XTooBig).in_current_span(); + } + Ok(()) +} + +#[instrument(level = "trace", target = "profile::indexing")] +fn fibo_recursive(n: u32) -> u32 { + if n == 0 { + return 1; + } + if n == 1 { + return 2; + } + return fibo_recursive(n - 1) - fibo_recursive(n - 2); +} + +use tracing_error::ExtractSpanTrace as _; +use tracing_subscriber::layer::SubscriberExt as _; +use tracing_trace::processor; + +fn on_panic(info: &std::panic::PanicInfo) { + let info = info.to_string(); + let trace = SpanTrace::capture(); + tracing::error!(%info, %trace); +} + +fn main() { + let (mut trace, profiling_layer) = + tracing_trace::Trace::new(std::fs::File::create("trace.json").unwrap()); + + let subscriber = tracing_subscriber::registry() + // any number of other subscriber layers may be added before or + // after the `ErrorLayer`... + .with(ErrorLayer::default()) + .with(profiling_layer) + /*.with( + tracing_subscriber::fmt::layer() + .with_line_number(true) + .with_span_events(FmtSpan::FULL), /*.with_filter( + tracing_subscriber::filter::LevelFilter::from_level(tracing::Level::TRACE).and( + tracing_subscriber::filter::Targets::new() + .with_target("profile", tracing::Level::TRACE) + .not(), + ), + )*/ + )*/; + + // set the subscriber as the default for the application + tracing::subscriber::set_global_default(subscriber).unwrap(); + + std::panic::set_hook(Box::new(on_panic)); + + let res = foo(); + + if let Err(error) = res { + print_extracted_spantraces(&error) + } + + while trace.try_receive().unwrap().is_continue() {} + + trace.flush().unwrap(); + + let trace = tracing_trace::TraceReader::new(std::fs::File::open("trace.json").unwrap()); + + let profile = processor::firefox_profiler::to_firefox_profile(trace, "test").unwrap(); + serde_json::to_writer(std::fs::File::create("processed.json").unwrap(), &profile).unwrap(); +} + +fn print_extracted_spantraces(error: &(dyn std::error::Error + 'static)) { + let mut error = Some(error); + let mut ind = 0; + + eprintln!("Error:"); + + while let Some(err) = error { + if let Some(spantrace) = err.span_trace() { + eprintln!("found a spantrace:\n{}", color_spantrace::colorize(spantrace)); + } else { + eprintln!("{:>4}: {}", ind, err); + } + + error = err.source(); + ind += 1; + } +} diff --git a/tracing-trace/src/processor/firefox_profiler.rs b/tracing-trace/src/processor/firefox_profiler.rs new file mode 100644 index 000000000..d3ac495d8 --- /dev/null +++ b/tracing-trace/src/processor/firefox_profiler.rs @@ -0,0 +1,255 @@ +use std::collections::HashMap; + +use fxprof_processed_profile::{ + CategoryPairHandle, CpuDelta, Frame, FrameFlags, FrameInfo, MarkerDynamicField, + MarkerFieldFormat, MarkerLocation, MarkerSchema, MarkerSchemaField, Profile, ProfilerMarker, + ReferenceTimestamp, SamplingInterval, StringHandle, Timestamp, +}; +use serde_json::json; + +use crate::entry::{ + Entry, NewCallsite, NewSpan, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId, +}; +use crate::{Error, TraceReader}; + +pub fn to_firefox_profile( + trace: TraceReader, + app: &str, +) -> Result { + let mut profile = Profile::new( + app, + ReferenceTimestamp::from_millis_since_unix_epoch(0.0), + SamplingInterval::from_nanos(15), + ); + + let mut last_timestamp = Timestamp::from_nanos_since_reference(0); + let main = profile.add_process(app, 0, last_timestamp); + + let mut calls = HashMap::new(); + let mut threads = HashMap::new(); + let mut spans = HashMap::new(); + + let category = profile.add_category("general", fxprof_processed_profile::CategoryColor::Blue); + let subcategory = profile.add_subcategory(category, "subcategory"); + + // TODO kero: add counters profile.add_counters + last_memory_value + + for entry in trace { + let entry = entry?; + match entry { + Entry::NewCallsite(callsite) => { + let string_handle = profile.intern_string(callsite.name.as_ref()); + calls.insert(callsite.call_id, (callsite, string_handle)); + } + Entry::NewThread(thread) => { + let thread_handle = profile.add_thread( + main, + thread.thread_id.to_usize() as u32, + last_timestamp, + threads.is_empty(), + ); + if let Some(name) = &thread.name { + profile.set_thread_name(thread_handle, name) + } + threads.insert(thread.thread_id, thread_handle); + } + Entry::NewSpan(span) => { + spans.insert(span.id, (span, SpanStatus::Outside)); + } + Entry::SpanEnter(SpanEnter { id, time }) => { + let (_span, status) = spans.get_mut(&id).unwrap(); + + let SpanStatus::Outside = status else { + continue; + }; + + *status = SpanStatus::Inside(time); + + last_timestamp = Timestamp::from_nanos_since_reference(time.as_nanos() as u64); + + /* TODO kero: compute delta and update them + profile.add_counter_sample( + counter, + timestamp, + value_delta, + number_of_operations_delta, + ) + */ + } + Entry::SpanExit(SpanExit { id, time }) => { + let (span, status) = spans.get_mut(&id).unwrap(); + + let SpanStatus::Inside(begin) = status else { + continue; + }; + last_timestamp = Timestamp::from_nanos_since_reference(time.as_nanos() as u64); + + let begin = *begin; + + *status = SpanStatus::Outside; + + let span = *span; + let thread_handle = threads.get(&span.thread_id).unwrap(); + + let frames = make_frames(span, &spans, &calls, subcategory); + + profile.add_sample( + *thread_handle, + to_timestamp(begin), + frames.iter().rev().cloned(), + CpuDelta::ZERO, + 1, + ); + profile.add_sample( + *thread_handle, + to_timestamp(time), + frames.iter().rev().cloned(), + CpuDelta::from_nanos((time - begin).as_nanos() as u64), + 1, + ); + + /* TODO kero: compute delta and update them + profile.add_counter_sample( + counter, + timestamp, + value_delta, + number_of_operations_delta, + ) + */ + + let (callsite, _) = calls.get(&span.call_id).unwrap(); + + let marker = SpanMarker { callsite, span: &span }; + + profile.add_marker_with_stack( + *thread_handle, + &callsite.name, + marker, + fxprof_processed_profile::MarkerTiming::Interval( + to_timestamp(begin), + to_timestamp(time), + ), + frames.iter().rev().cloned(), + ) + } + Entry::SpanClose(SpanClose { id, time }) => { + spans.remove(&id); + last_timestamp = Timestamp::from_nanos_since_reference(time.as_nanos() as u64); + } + } + } + + Ok(profile) +} + +fn to_timestamp(time: std::time::Duration) -> Timestamp { + Timestamp::from_nanos_since_reference(time.as_nanos() as u64) +} + +fn make_frames( + span: NewSpan, + spans: &HashMap, + calls: &HashMap, + subcategory: CategoryPairHandle, +) -> Vec { + let mut frames = Vec::new(); + let mut current_span = span; + loop { + let frame = make_frame(current_span, calls, subcategory); + frames.push(frame); + if let Some(parent) = current_span.parent_id { + current_span = spans.get(&parent).unwrap().0; + } else { + break; + } + } + frames +} + +fn make_frame( + span: NewSpan, + calls: &HashMap, + subcategory: CategoryPairHandle, +) -> FrameInfo { + let (_, call) = calls.get(&span.call_id).unwrap(); + FrameInfo { frame: Frame::Label(*call), category_pair: subcategory, flags: FrameFlags::empty() } +} + +#[derive(Debug, Clone, Copy)] +enum SpanStatus { + Outside, + Inside(std::time::Duration), +} + +struct SpanMarker<'a> { + span: &'a NewSpan, + callsite: &'a NewCallsite, +} + +impl<'a> ProfilerMarker for SpanMarker<'a> { + const MARKER_TYPE_NAME: &'static str = "span"; + + fn schema() -> MarkerSchema { + let fields = vec![ + MarkerSchemaField::Dynamic(MarkerDynamicField { + key: "filename", + label: "File name", + format: MarkerFieldFormat::FilePath, + searchable: true, + }), + MarkerSchemaField::Dynamic(MarkerDynamicField { + key: "line", + label: "Line", + format: MarkerFieldFormat::Integer, + searchable: true, + }), + MarkerSchemaField::Dynamic(MarkerDynamicField { + key: "module_path", + label: "Module path", + format: MarkerFieldFormat::String, + searchable: true, + }), + MarkerSchemaField::Dynamic(MarkerDynamicField { + key: "span_id", + label: "Span ID", + format: MarkerFieldFormat::Integer, + searchable: true, + }), + MarkerSchemaField::Dynamic(MarkerDynamicField { + key: "thread_id", + label: "Thread ID", + format: MarkerFieldFormat::Integer, + searchable: true, + }), + ]; + + MarkerSchema { + type_name: Self::MARKER_TYPE_NAME, + locations: vec![ + MarkerLocation::MarkerTable, + MarkerLocation::MarkerChart, + MarkerLocation::TimelineOverview, + ], + chart_label: None, + tooltip_label: Some("{marker.name} - {marker.data.filename}:{marker.data.line}"), + table_label: Some("{marker.data.filename}:{marker.data.line}"), + fields, + } + } + + fn json_marker_data(&self) -> serde_json::Value { + let filename = self.callsite.file.as_deref(); + let line = self.callsite.line; + let module_path = self.callsite.module_path.as_deref(); + let span_id = self.span.id; + let thread_id = self.span.thread_id; + json!({ + "type": Self::MARKER_TYPE_NAME, + "filename": filename, + "line": line, + "module_path": module_path, + "span_id": span_id, + "thread_id": thread_id, + }) + } +} diff --git a/tracing-trace/src/processor/fmt.rs b/tracing-trace/src/processor/fmt.rs new file mode 100644 index 000000000..a9356ba26 --- /dev/null +++ b/tracing-trace/src/processor/fmt.rs @@ -0,0 +1,128 @@ +use std::collections::HashMap; +use std::io::Read; + +use crate::entry::{ + Entry, NewCallsite, NewSpan, NewThread, ResourceId, SpanClose, SpanEnter, SpanExit, SpanId, +}; +use crate::{Error, TraceReader}; + +#[derive(Debug, Clone, Copy)] +enum SpanStatus { + Outside, + Inside(std::time::Duration), +} + +pub fn print_trace(trace: TraceReader) -> Result<(), Error> { + let mut calls = HashMap::new(); + let mut threads = HashMap::new(); + let mut spans = HashMap::new(); + for entry in trace { + let entry = entry?; + match entry { + Entry::NewCallsite(callsite) => { + calls.insert(callsite.call_id, callsite); + } + Entry::NewThread(NewThread { thread_id, name }) => { + threads.insert(thread_id, name); + } + Entry::NewSpan(span) => { + spans.insert(span.id, (span, SpanStatus::Outside)); + } + Entry::SpanEnter(SpanEnter { id, time }) => { + let (span, status) = spans.get_mut(&id).unwrap(); + + let SpanStatus::Outside = status else { + continue; + }; + + *status = SpanStatus::Inside(time); + + let span = *span; + + println!( + "[{}]{}::{} <-", + print_thread(&threads, span.thread_id), + print_backtrace(&spans, &calls, &span), + print_span(&calls, &span) + ); + } + Entry::SpanExit(SpanExit { id, time }) => { + let (span, status) = spans.get_mut(&id).unwrap(); + + let SpanStatus::Inside(begin) = status else { + continue; + }; + let begin = *begin; + + *status = SpanStatus::Outside; + + let span = *span; + + println!( + "[{}]{}::{} -> {}", + print_thread(&threads, span.thread_id), + print_backtrace(&spans, &calls, &span), + print_span(&calls, &span), + print_duration(time - begin), + ) + } + Entry::SpanClose(SpanClose { id, time: _ }) => { + spans.remove(&id); + } + } + } + Ok(()) +} + +fn print_thread(threads: &HashMap>, thread_id: ResourceId) -> String { + let thread = threads.get(&thread_id).unwrap(); + let thread = + thread.as_ref().cloned().unwrap_or_else(|| format!("ThreadId({})", thread_id.to_usize())); + thread +} + +fn print_backtrace( + spans: &HashMap, + calls: &HashMap, + span: &NewSpan, +) -> String { + let mut parents = Vec::new(); + let mut current = span.parent_id; + while let Some(current_id) = ¤t { + let (span, _) = spans.get(current_id).unwrap(); + let callsite = calls.get(&span.call_id).unwrap(); + parents.push(callsite.name.clone()); + + current = span.parent_id; + } + + let x: Vec = parents.into_iter().rev().map(|x| x.to_string()).collect(); + x.join("::") +} + +fn print_span(calls: &HashMap, span: &NewSpan) -> String { + let callsite = calls.get(&span.call_id).unwrap(); + match (callsite.file.clone(), callsite.line) { + (Some(file), None) => format!("{} ({})", callsite.name, file), + (Some(file), Some(line)) => format!("{} ({}:{})", callsite.name, file, line), + _ => callsite.name.to_string(), + } +} + +fn print_duration(duration: std::time::Duration) -> String { + if duration.as_nanos() < 1000 { + format!("{}ns", duration.as_nanos()) + } else if duration.as_micros() < 1000 { + format!("{}μs", duration.as_micros()) + } else if duration.as_millis() < 1000 { + format!("{}ms", duration.as_millis()) + } else if duration.as_secs() < 120 { + format!("{}s", duration.as_secs()) + } else if duration.as_secs_f64() / 60.0 < 60.0 { + format!("{}min", duration.as_secs_f64() / 60.0) + } else if duration.as_secs_f64() / 3600.0 < 8.0 { + format!("{}h", duration.as_secs_f64() / 3600.0) + } else { + format!("{}d", duration.as_secs_f64() / 3600.0 / 24.0) + } +} diff --git a/tracing-trace/src/processor/mod.rs b/tracing-trace/src/processor/mod.rs new file mode 100644 index 000000000..a84cb3b63 --- /dev/null +++ b/tracing-trace/src/processor/mod.rs @@ -0,0 +1,2 @@ +pub mod firefox_profiler; +pub mod fmt;