Replace stats_alloc with procfs

This commit is contained in:
Clément Renault 2024-02-06 14:41:14 +01:00 committed by Louis Dureuil
parent e773dfa9ba
commit b393823f36
No known key found for this signature in database
9 changed files with 99 additions and 192 deletions

33
Cargo.lock generated
View File

@ -3667,7 +3667,6 @@ dependencies = [
"siphasher 1.0.0", "siphasher 1.0.0",
"slice-group-by", "slice-group-by",
"static-files", "static-files",
"stats_alloc",
"sysinfo", "sysinfo",
"tar", "tar",
"temp-env", "temp-env",
@ -4433,6 +4432,29 @@ dependencies = [
"rustix 0.36.16", "rustix 0.36.16",
] ]
[[package]]
name = "procfs"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4"
dependencies = [
"bitflags 2.4.1",
"hex",
"lazy_static",
"procfs-core",
"rustix 0.38.26",
]
[[package]]
name = "procfs-core"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29"
dependencies = [
"bitflags 2.4.1",
"hex",
]
[[package]] [[package]]
name = "prometheus" name = "prometheus"
version = "0.13.3" version = "0.13.3"
@ -4445,7 +4467,7 @@ dependencies = [
"libc", "libc",
"memchr", "memchr",
"parking_lot", "parking_lot",
"procfs", "procfs 0.14.2",
"protobuf", "protobuf",
"thiserror", "thiserror",
] ]
@ -5222,11 +5244,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stats_alloc"
version = "0.1.10"
source = "git+https://github.com/Kerollmops/stats_alloc?branch=stable-const-fn-trait#6f83c52160c7d0550fdf770e1f73d239b0ff9a97"
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.0" version = "0.10.0"
@ -5705,9 +5722,9 @@ dependencies = [
"byte-unit", "byte-unit",
"color-spantrace", "color-spantrace",
"fxprof-processed-profile", "fxprof-processed-profile",
"procfs 0.16.0",
"serde", "serde",
"serde_json", "serde_json",
"stats_alloc",
"tokio", "tokio",
"tracing", "tracing",
"tracing-error", "tracing-error",

View File

@ -107,7 +107,6 @@ url = { version = "2.5.0", features = ["serde"] }
tracing = "0.1.40" tracing = "0.1.40"
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"
tracing-trace = { version = "0.1.0", path = "../tracing-trace" } tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
stats_alloc = { git = "https://github.com/Kerollmops/stats_alloc", branch = "stable-const-fn-trait", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.9.0" actix-rt = "2.9.0"

View File

@ -20,14 +20,9 @@ use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
#[cfg(not(feature = "stats_alloc"))]
#[global_allocator] #[global_allocator]
static ALLOC: MiMalloc = MiMalloc; static ALLOC: MiMalloc = MiMalloc;
#[cfg(feature = "stats_alloc")]
#[global_allocator]
static ALLOC: stats_alloc::StatsAlloc<MiMalloc> = stats_alloc::StatsAlloc::new(MiMalloc);
fn default_layer() -> LogRouteType { fn default_layer() -> LogRouteType {
None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF)) None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF))
} }

View File

@ -14,7 +14,7 @@ use index_scheduler::IndexScheduler;
use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::deserr_codes::*;
use meilisearch_types::error::{Code, ResponseError}; use meilisearch_types::error::{Code, ResponseError};
use tokio::sync::mpsc::{self}; use tokio::sync::mpsc;
use tracing_subscriber::filter::Targets; use tracing_subscriber::filter::Targets;
use tracing_subscriber::Layer; use tracing_subscriber::Layer;

View File

@ -13,10 +13,11 @@ serde_json = "1.0.111"
tracing = "0.1.40" tracing = "0.1.40"
tracing-error = "0.2.0" tracing-error = "0.2.0"
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"
stats_alloc = { git = "https://github.com/Kerollmops/stats_alloc", branch = "stable-const-fn-trait" }
byte-unit = { version = "4.0.19", default-features = false, features = [ byte-unit = { version = "4.0.19", default-features = false, features = [
"std", "std",
"serde", "serde",
] } ] }
tokio = { version = "1.35.1", features = ["sync"] } tokio = { version = "1.35.1", features = ["sync"] }
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16.0", default-features = false }

View File

@ -101,58 +101,46 @@ pub struct SpanClose {
} }
/// A struct with a lot of memory allocation stats akin /// A struct with a lot of memory allocation stats akin
/// to the `stats_alloc::Stats` one but implements the /// to the `procfs::Process::StatsM` one plus the OOM score.
/// `Serialize/Deserialize` serde traits. ///
/// Note that all the values are in bytes not in pages.
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
pub struct MemoryStats { pub struct MemoryStats {
pub allocations: usize, /// Resident set size, measured in bytes.
pub deallocations: usize, /// (same as VmRSS in /proc/<pid>/status).
pub reallocations: usize, pub resident: u64,
pub bytes_allocated: usize, /// Number of resident shared bytes (i.e., backed by a file).
pub bytes_deallocated: usize, /// (same as RssFile+RssShmem in /proc/<pid>/status).
pub bytes_reallocated: isize, pub shared: u64,
} /// The current score that the kernel gives to this process
/// for the purpose of selecting a process for the OOM-killer
impl From<stats_alloc::Stats> for MemoryStats { ///
fn from(stats: stats_alloc::Stats) -> Self { /// A higher score means that the process is more likely to be selected
let stats_alloc::Stats { /// by the OOM-killer. The basis for this score is the amount of memory used
allocations, /// by the process, plus other factors.
deallocations, ///
reallocations, /// (Since linux 2.6.11)
bytes_allocated, pub oom_score: u32,
bytes_deallocated,
bytes_reallocated,
} = stats;
MemoryStats {
allocations,
deallocations,
reallocations,
bytes_allocated,
bytes_deallocated,
bytes_reallocated,
}
}
} }
impl MemoryStats { impl MemoryStats {
#[cfg(target_os = "linux")]
pub fn fetch() -> procfs::ProcResult<Self> {
let process = procfs::process::Process::myself().unwrap();
let procfs::process::StatM { resident, shared, .. } = process.statm()?;
let oom_score = process.oom_score()?;
let page_size = procfs::page_size();
Ok(MemoryStats { resident: resident * page_size, shared: shared * page_size, oom_score })
}
pub fn checked_sub(self, other: Self) -> Option<Self> { pub fn checked_sub(self, other: Self) -> Option<Self> {
Some(Self { Some(Self {
allocations: self.allocations.checked_sub(other.allocations)?, resident: self.resident.checked_sub(other.resident)?,
deallocations: self.deallocations.checked_sub(other.deallocations)?, shared: self.shared.checked_sub(other.shared)?,
reallocations: self.reallocations.checked_sub(other.reallocations)?, oom_score: self.oom_score.checked_sub(other.oom_score)?,
bytes_allocated: self.bytes_allocated.checked_sub(other.bytes_allocated)?,
bytes_deallocated: self.bytes_deallocated.checked_sub(other.bytes_deallocated)?,
bytes_reallocated: self.bytes_reallocated.checked_sub(other.bytes_reallocated)?,
}) })
} }
pub fn usage(&self) -> isize {
(self.bytes_allocated - self.bytes_deallocated) as isize + self.bytes_reallocated
}
pub fn operations(&self) -> usize {
self.allocations + self.deallocations + self.reallocations
}
} }
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]

View File

@ -1,11 +1,9 @@
use std::alloc::{GlobalAlloc, System};
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use std::ops::ControlFlow; use std::ops::ControlFlow;
use std::sync::RwLock; use std::sync::RwLock;
use stats_alloc::StatsAlloc;
use tracing::span::{Attributes, Id as TracingId}; use tracing::span::{Attributes, Id as TracingId};
use tracing::{Metadata, Subscriber}; use tracing::{Metadata, Subscriber};
use tracing_subscriber::layer::Context; use tracing_subscriber::layer::Context;
@ -18,55 +16,31 @@ use crate::entry::{
use crate::{Error, Trace, TraceWriter}; use crate::{Error, Trace, TraceWriter};
/// Layer that measures the time spent in spans. /// Layer that measures the time spent in spans.
pub struct TraceLayer<A: GlobalAlloc + 'static = System> { pub struct TraceLayer {
sender: tokio::sync::mpsc::UnboundedSender<Entry>, sender: tokio::sync::mpsc::UnboundedSender<Entry>,
callsites: RwLock<HashMap<OpaqueIdentifier, ResourceId>>, callsites: RwLock<HashMap<OpaqueIdentifier, ResourceId>>,
start_time: std::time::Instant, start_time: std::time::Instant,
memory_allocator: Option<&'static StatsAlloc<A>>,
} }
impl Trace { impl Trace {
pub fn new() -> (Self, TraceLayer<System>) { pub fn new() -> (Self, TraceLayer) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let trace = Trace { receiver }; let trace = Trace { receiver };
let layer = TraceLayer { let layer = TraceLayer {
sender, sender,
callsites: Default::default(), callsites: Default::default(),
start_time: std::time::Instant::now(), start_time: std::time::Instant::now(),
memory_allocator: None,
};
(trace, layer)
}
pub fn with_stats_alloc<A: GlobalAlloc>(
stats_alloc: &'static StatsAlloc<A>,
) -> (Self, TraceLayer<A>) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let trace = Trace { receiver };
let layer = TraceLayer {
sender,
callsites: Default::default(),
start_time: std::time::Instant::now(),
memory_allocator: Some(stats_alloc),
}; };
(trace, layer) (trace, layer)
} }
} }
impl<W: Write> TraceWriter<W> { impl<W: Write> TraceWriter<W> {
pub fn new(writer: W) -> (Self, TraceLayer<System>) { pub fn new(writer: W) -> (Self, TraceLayer) {
let (trace, layer) = Trace::new(); let (trace, layer) = Trace::new();
(trace.into_writer(writer), layer) (trace.into_writer(writer), layer)
} }
pub fn with_stats_alloc<A: GlobalAlloc>(
writer: W,
stats_alloc: &'static StatsAlloc<A>,
) -> (Self, TraceLayer<A>) {
let (trace, layer) = Trace::with_stats_alloc(stats_alloc);
(trace.into_writer(writer), layer)
}
pub async fn receive(&mut self) -> Result<ControlFlow<(), ()>, Error> { pub async fn receive(&mut self) -> Result<ControlFlow<(), ()>, Error> {
let Some(entry) = self.receiver.recv().await else { let Some(entry) = self.receiver.recv().await else {
return Ok(ControlFlow::Break(())); return Ok(ControlFlow::Break(()));
@ -107,7 +81,7 @@ enum OpaqueIdentifier {
Call(tracing::callsite::Identifier), Call(tracing::callsite::Identifier),
} }
impl<A: GlobalAlloc> TraceLayer<A> { impl TraceLayer {
fn resource_id(&self, opaque: OpaqueIdentifier) -> Option<ResourceId> { fn resource_id(&self, opaque: OpaqueIdentifier) -> Option<ResourceId> {
self.callsites.read().unwrap().get(&opaque).copied() self.callsites.read().unwrap().get(&opaque).copied()
} }
@ -122,8 +96,14 @@ impl<A: GlobalAlloc> TraceLayer<A> {
self.start_time.elapsed() self.start_time.elapsed()
} }
#[cfg(target_os = "linux")]
fn memory_stats(&self) -> Option<MemoryStats> { fn memory_stats(&self) -> Option<MemoryStats> {
self.memory_allocator.map(|ma| ma.stats().into()) Some(MemoryStats::fetch().unwrap())
}
#[cfg(not(target_os = "linux"))]
fn memory_stats(&self) -> Option<MemoryStats> {
None
} }
fn send(&self, entry: Entry) { fn send(&self, entry: Entry) {
@ -160,10 +140,9 @@ impl<A: GlobalAlloc> TraceLayer<A> {
} }
} }
impl<S, A> Layer<S> for TraceLayer<A> impl<S> Layer<S> for TraceLayer
where where
S: Subscriber, S: Subscriber,
A: GlobalAlloc,
{ {
fn on_new_span(&self, attrs: &Attributes<'_>, id: &TracingId, _ctx: Context<'_, S>) { fn on_new_span(&self, attrs: &Attributes<'_>, id: &TracingId, _ctx: Context<'_, S>) {
let call_id = self let call_id = self

View File

@ -227,8 +227,8 @@ fn add_memory_samples(
profile.add_counter_sample( profile.add_counter_sample(
memory_counters.usage, memory_counters.usage,
last_timestamp, last_timestamp,
stats.usage() as f64 - last_memory.usage() as f64, stats.resident as f64 - last_memory.resident as f64,
stats.operations().checked_sub(last_memory.operations()).unwrap_or_default() as u32, 0,
); );
let delta = stats.checked_sub(*last_memory); let delta = stats.checked_sub(*last_memory);
@ -317,39 +317,21 @@ impl<'a> ProfilerMarker for SpanMarker<'a> {
searchable: true, searchable: true,
}), }),
MarkerSchemaField::Dynamic(MarkerDynamicField { MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "allocations", key: "resident",
label: "Number of allocation operations while this function was executing", label: "Resident set size, measured in bytes while this function was executing",
format: MarkerFieldFormat::Integer,
searchable: false,
}),
MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "deallocations",
label: "Number of deallocation operations while this function was executing",
format: MarkerFieldFormat::Integer,
searchable: false,
}),
MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "reallocations",
label: "Number of reallocation operations while this function was executing",
format: MarkerFieldFormat::Integer,
searchable: false,
}),
MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "allocated_bytes",
label: "Number of allocated bytes while this function was executing",
format: MarkerFieldFormat::Bytes, format: MarkerFieldFormat::Bytes,
searchable: false, searchable: false,
}), }),
MarkerSchemaField::Dynamic(MarkerDynamicField { MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "deallocated_bytes", key: "shared",
label: "Number of deallocated bytes while this function was executing", label: "Number of resident shared pages (i.e., backed by a file) while this function was executing",
format: MarkerFieldFormat::Bytes, format: MarkerFieldFormat::Bytes,
searchable: false, searchable: false,
}), }),
MarkerSchemaField::Dynamic(MarkerDynamicField { MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "reallocated_bytes", key: "oom_score",
label: "Number of reallocated bytes while this function was executing", label: "The current score that the kernel gives to this process for the purpose of selecting a process for the OOM-killer while this function was executing",
format: MarkerFieldFormat::Bytes, format: MarkerFieldFormat::Integer,
searchable: false, searchable: false,
}), }),
]; ];
@ -384,21 +366,10 @@ impl<'a> ProfilerMarker for SpanMarker<'a> {
"thread_id": thread_id, "thread_id": thread_id,
}); });
if let Some(MemoryStats { if let Some(MemoryStats { resident, shared, oom_score }) = self.memory_delta {
allocations, value["resident"] = json!(resident);
deallocations, value["shared"] = json!(shared);
reallocations, value["oom_score"] = json!(oom_score);
bytes_allocated,
bytes_deallocated,
bytes_reallocated,
}) = self.memory_delta
{
value["allocations"] = json!(allocations);
value["deallocations"] = json!(deallocations);
value["reallocations"] = json!(reallocations);
value["allocated_bytes"] = json!(bytes_allocated);
value["deallocated_bytes"] = json!(bytes_deallocated);
value["reallocated_bytes"] = json!(bytes_reallocated);
} }
value value
@ -447,39 +418,21 @@ impl<'a> ProfilerMarker for EventMarker<'a> {
searchable: true, searchable: true,
}), }),
MarkerSchemaField::Dynamic(MarkerDynamicField { MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "allocations", key: "resident",
label: "Number of allocation operations since last measure", label: "Resident set size, measured in bytes while this function was executing",
format: MarkerFieldFormat::Integer,
searchable: false,
}),
MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "deallocations",
label: "Number of deallocation operations since last measure",
format: MarkerFieldFormat::Integer,
searchable: false,
}),
MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "reallocations",
label: "Number of reallocation operations since last measure",
format: MarkerFieldFormat::Integer,
searchable: false,
}),
MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "allocated_bytes",
label: "Number of allocated bytes since last measure",
format: MarkerFieldFormat::Bytes, format: MarkerFieldFormat::Bytes,
searchable: false, searchable: false,
}), }),
MarkerSchemaField::Dynamic(MarkerDynamicField { MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "deallocated_bytes", key: "shared",
label: "Number of deallocated bytes since last measure", label: "Number of resident shared pages (i.e., backed by a file) while this function was executing",
format: MarkerFieldFormat::Bytes, format: MarkerFieldFormat::Bytes,
searchable: false, searchable: false,
}), }),
MarkerSchemaField::Dynamic(MarkerDynamicField { MarkerSchemaField::Dynamic(MarkerDynamicField {
key: "reallocated_bytes", key: "oom_score",
label: "Number of reallocated bytes since last measure", label: "The current score that the kernel gives to this process for the purpose of selecting a process for the OOM-killer while this function was executing",
format: MarkerFieldFormat::Bytes, format: MarkerFieldFormat::Integer,
searchable: false, searchable: false,
}), }),
]; ];
@ -514,21 +467,10 @@ impl<'a> ProfilerMarker for EventMarker<'a> {
"thread_id": thread_id, "thread_id": thread_id,
}); });
if let Some(MemoryStats { if let Some(MemoryStats { resident, shared, oom_score }) = self.memory_delta {
allocations, value["resident"] = json!(resident);
deallocations, value["shared"] = json!(shared);
reallocations, value["oom_score"] = json!(oom_score);
bytes_allocated,
bytes_deallocated,
bytes_reallocated,
}) = self.memory_delta
{
value["allocations"] = json!(allocations);
value["deallocations"] = json!(deallocations);
value["reallocations"] = json!(reallocations);
value["allocated_bytes"] = json!(bytes_allocated);
value["deallocated_bytes"] = json!(bytes_deallocated);
value["reallocated_bytes"] = json!(bytes_reallocated);
} }
value value

View File

@ -188,23 +188,9 @@ fn print_duration(duration: std::time::Duration) -> String {
} }
/// Format only the allocated bytes, deallocated bytes and reallocated bytes in GiB, MiB, KiB, Bytes. /// Format only the allocated bytes, deallocated bytes and reallocated bytes in GiB, MiB, KiB, Bytes.
fn print_memory(memory: MemoryStats) -> String { fn print_memory(MemoryStats { resident, shared, oom_score }: MemoryStats) -> String {
use byte_unit::Byte; use byte_unit::Byte;
let rss_bytes = Byte::from_bytes(resident).get_appropriate_unit(true);
let allocated_bytes = Byte::from_bytes(memory.bytes_allocated.try_into().unwrap()); let shared_bytes = Byte::from_bytes(shared).get_appropriate_unit(true);
let deallocated_bytes = Byte::from_bytes(memory.bytes_deallocated.try_into().unwrap()); format!("RSS {rss_bytes:.2}, Shared {shared_bytes:.2}, OOM score {oom_score}")
let reallocated_sign = if memory.bytes_reallocated < 0 { "-" } else { "" };
let reallocated_bytes =
Byte::from_bytes(memory.bytes_reallocated.abs_diff(0).try_into().unwrap());
let adjusted_allocated_bytes = allocated_bytes.get_appropriate_unit(true);
let adjusted_deallocated_bytes = deallocated_bytes.get_appropriate_unit(true);
let adjusted_reallocated_bytes = reallocated_bytes.get_appropriate_unit(true);
format!(
"Allocated {adjusted_allocated_bytes:.2}, \
Deallocated {adjusted_deallocated_bytes:.2}, \
Reallocated {reallocated_sign}{adjusted_reallocated_bytes:.2}"
)
} }