Accumulate step durations from the progress system

This commit is contained in:
Kerollmops 2025-02-18 18:33:19 +01:00
parent 0f1aeb8eaa
commit 11a11fc870
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 55 additions and 7 deletions

View File

@ -339,6 +339,7 @@ impl IndexScheduler {
// We must re-add the canceled task so they're part of the same batch.
ids |= canceled;
eprintln!("{:#?}", progress.accumulated_durations());
self.queue.write_batch(&mut wtxn, processing_batch, &ids)?;
#[cfg(test)]

View File

@ -3,7 +3,9 @@ use std::borrow::Cow;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use indexmap::IndexMap;
use serde::Serialize;
use utoipa::ToSchema;
@ -15,28 +17,60 @@ pub trait Step: 'static + Send + Sync {
#[derive(Clone, Default)]
pub struct Progress {
steps: Arc<RwLock<Vec<(TypeId, Box<dyn Step>)>>>,
steps: Arc<RwLock<InnerProgress>>,
}
#[derive(Default)]
struct InnerProgress {
/// The hierarchy of progress steps.
steps: Vec<(TypeId, Box<dyn Step>, Instant)>,
/// The durations associated to the top level steps (*first*).
durations: Vec<(String, Duration)>,
}
fn name_from_steps<'a, I>(steps: I) -> String
where
I: Iterator<Item = &'a Box<dyn Step>> + ExactSizeIterator,
{
let len = steps.len();
let mut name = String::new();
for (i, step) in steps.into_iter().enumerate() {
name.push_str(&step.name());
if i + 1 < len {
name.push_str(" > ");
}
}
name
}
impl Progress {
pub fn update_progress<P: Step>(&self, sub_progress: P) {
let mut steps = self.steps.write().unwrap();
let mut inner = self.steps.write().unwrap();
let InnerProgress { steps, durations } = &mut *inner;
let now = Instant::now();
let step_type = TypeId::of::<P>();
if let Some(idx) = steps.iter().position(|(id, _)| *id == step_type) {
if let Some(idx) = steps.iter().position(|(id, _, _)| *id == step_type) {
for (i, (_, _, started_at)) in steps[idx..].iter().enumerate() {
let full_name = name_from_steps(steps.iter().take(idx + i + 1).map(|(_, s, _)| s));
durations.push((full_name, now.duration_since(*started_at)));
}
steps.truncate(idx);
}
steps.push((step_type, Box::new(sub_progress)));
steps.push((step_type, Box::new(sub_progress), now));
}
// TODO: This code should be in meilisearch_types but cannot because milli can't depend on meilisearch_types
pub fn as_progress_view(&self) -> ProgressView {
let steps = self.steps.read().unwrap();
let inner = self.steps.read().unwrap();
let InnerProgress { steps, .. } = &*inner;
let mut percentage = 0.0;
let mut prev_factors = 1.0;
let mut step_view = Vec::with_capacity(steps.len());
for (_, step) in steps.iter() {
for (_, step, _) in steps.iter() {
prev_factors *= step.total() as f32;
percentage += step.current() as f32 / prev_factors;
@ -49,6 +83,19 @@ impl Progress {
ProgressView { steps: step_view, percentage: percentage * 100.0 }
}
pub fn accumulated_durations(&self) -> IndexMap<String, String> {
let mut inner = self.steps.write().unwrap();
let InnerProgress { steps, durations, .. } = &mut *inner;
let now = Instant::now();
for (i, (_, _, started_at)) in steps.iter().enumerate() {
let full_name = name_from_steps(steps.iter().take(i + 1).map(|(_, s, _)| s));
durations.push((full_name, now.duration_since(*started_at)));
}
durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
}
}
/// This trait lets you use the AtomicSubStep defined right below.