diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 9268bf3e7..a7937ddd3 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -339,6 +339,7 @@ impl IndexScheduler { // We must re-add the canceled task so they're part of the same batch. ids |= canceled; + eprintln!("{:#?}", progress.accumulated_durations()); self.queue.write_batch(&mut wtxn, processing_batch, &ids)?; #[cfg(test)] diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 3837e173a..884f49241 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -3,7 +3,9 @@ use std::borrow::Cow; use std::marker::PhantomData; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use indexmap::IndexMap; use serde::Serialize; use utoipa::ToSchema; @@ -15,28 +17,60 @@ pub trait Step: 'static + Send + Sync { #[derive(Clone, Default)] pub struct Progress { - steps: Arc)>>>, + steps: Arc>, +} + +#[derive(Default)] +struct InnerProgress { + /// The hierarchy of progress steps. + steps: Vec<(TypeId, Box, Instant)>, + /// The durations associated to the top level steps (*first*). + durations: Vec<(String, Duration)>, +} + +fn name_from_steps<'a, I>(steps: I) -> String +where + I: Iterator> + ExactSizeIterator, +{ + let len = steps.len(); + let mut name = String::new(); + for (i, step) in steps.into_iter().enumerate() { + name.push_str(&step.name()); + if i + 1 < len { + name.push_str(" > "); + } + } + name } impl Progress { pub fn update_progress(&self, sub_progress: P) { - let mut steps = self.steps.write().unwrap(); + let mut inner = self.steps.write().unwrap(); + let InnerProgress { steps, durations } = &mut *inner; + + let now = Instant::now(); let step_type = TypeId::of::

(); - if let Some(idx) = steps.iter().position(|(id, _)| *id == step_type) { + if let Some(idx) = steps.iter().position(|(id, _, _)| *id == step_type) { + for (i, (_, _, started_at)) in steps[idx..].iter().enumerate() { + let full_name = name_from_steps(steps.iter().take(idx + i + 1).map(|(_, s, _)| s)); + durations.push((full_name, now.duration_since(*started_at))); + } steps.truncate(idx); } - steps.push((step_type, Box::new(sub_progress))); + + steps.push((step_type, Box::new(sub_progress), now)); } // TODO: This code should be in meilisearch_types but cannot because milli can't depend on meilisearch_types pub fn as_progress_view(&self) -> ProgressView { - let steps = self.steps.read().unwrap(); + let inner = self.steps.read().unwrap(); + let InnerProgress { steps, .. } = &*inner; let mut percentage = 0.0; let mut prev_factors = 1.0; let mut step_view = Vec::with_capacity(steps.len()); - for (_, step) in steps.iter() { + for (_, step, _) in steps.iter() { prev_factors *= step.total() as f32; percentage += step.current() as f32 / prev_factors; @@ -49,6 +83,19 @@ impl Progress { ProgressView { steps: step_view, percentage: percentage * 100.0 } } + + pub fn accumulated_durations(&self) -> IndexMap { + let mut inner = self.steps.write().unwrap(); + let InnerProgress { steps, durations, .. } = &mut *inner; + + let now = Instant::now(); + for (i, (_, _, started_at)) in steps.iter().enumerate() { + let full_name = name_from_steps(steps.iter().take(i + 1).map(|(_, s, _)| s)); + durations.push((full_name, now.duration_since(*started_at))); + } + + durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect() + } } /// This trait lets you use the AtomicSubStep defined right below. @@ -164,7 +211,7 @@ pub struct ProgressStepView { /// Used when the name can change but it's still the same step. /// To avoid conflicts on the `TypeId`, create a unique type every time you use this step: /// ```text -/// enum UpgradeVersion {} +/// enum UpgradeVersion {} /// /// progress.update_progress(VariableNameStep::::new( /// "v1 to v2",