From 33b1f54b41ac85c5fa89ea9f49c80be323527183 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 5 Nov 2024 16:23:02 +0100 Subject: [PATCH] Progress, in the task queue --- index-scheduler/src/batch.rs | 59 ++++++++++----------------- index-scheduler/src/insta_snapshot.rs | 1 + index-scheduler/src/lib.rs | 28 +++++++++++-- index-scheduler/src/utils.rs | 2 + meilisearch-types/src/task_view.rs | 7 +++- meilisearch-types/src/tasks.rs | 56 +++++++++++++++++++++++++ 6 files changed, 111 insertions(+), 42 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 740528555..bd307b19e 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -22,7 +22,8 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; -use std::sync::atomic::{self, AtomicU16, AtomicU32}; +use std::sync::atomic::{self, AtomicU64}; +use std::time::Duration; use bumpalo::collections::CollectIn; use bumpalo::Bump; @@ -31,7 +32,6 @@ use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; -use meilisearch_types::milli::update::new::indexer::document_changes::Progress; use meilisearch_types::milli::update::new::indexer::{ self, retrieve_or_guess_primary_key, UpdateByFunction, }; @@ -531,7 +531,7 @@ impl IndexScheduler { if let Some(task_id) = to_cancel.max() { // We retrieve the tasks that were processing before this tasks cancelation started. // We must *not* reset the processing tasks before calling this method. - let ProcessingTasks { started_at, processing } = + let ProcessingTasks { started_at, processing, progress: _ } = &*self.processing_tasks.read().unwrap(); return Ok(Some(Batch::TaskCancelation { task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?, @@ -1223,39 +1223,29 @@ impl IndexScheduler { ) -> Result> { let indexer_alloc = Bump::new(); - let last_finished_steps = AtomicU16::new(0); - let last_finished_documents = AtomicU32::new(0); + let started_processing_at = std::time::Instant::now(); + let secs_since_started_processing_at = AtomicU64::new(0); + const PRINT_SECS_DELTA: u64 = 1; - let send_progress = - |Progress { finished_steps, total_steps, step_name, finished_total_documents }| { - /* - let current = rayon::current_thread_index(); + let processing_tasks = self.processing_tasks.clone(); - let last_finished_steps = - last_finished_steps.fetch_max(finished_steps, atomic::Ordering::Relaxed); + let must_stop_processing = self.must_stop_processing.clone(); - if last_finished_steps > finished_steps { - return; - } + let send_progress = |progress| { + let now = std::time::Instant::now(); + let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed); + let previous = started_processing_at + Duration::from_secs(elapsed); + let elapsed = now - previous; - if let Some((finished_documents, total_documents)) = finished_total_documents { - if last_finished_steps < finished_steps { - last_finished_documents.store(finished_documents, atomic::Ordering::Relaxed); - } else { - let last_finished_documents = last_finished_documents - .fetch_max(finished_documents, atomic::Ordering::Relaxed); - if last_finished_documents > finished_documents { - return; - } - } - tracing::warn!("Progress from {current:?}: {step_name} ({finished_steps}/{total_steps}), document {finished_documents}/{total_documents}") - } else { - tracing::warn!( - "Progress from {current:?}: {step_name} ({finished_steps}/{total_steps})" - ) - } - */ - }; + if elapsed.as_secs() < PRINT_SECS_DELTA { + return; + } + + secs_since_started_processing_at + .store((now - started_processing_at).as_secs(), atomic::Ordering::Relaxed); + + processing_tasks.write().unwrap().update_progress(progress); + }; match operation { IndexOperation::DocumentClear { mut tasks, .. } => { @@ -1286,8 +1276,6 @@ impl IndexScheduler { operations, mut tasks, } => { - let started_processing_at = std::time::Instant::now(); - let must_stop_processing = self.must_stop_processing.clone(); let indexer_config = self.index_mapper.indexer_config(); // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // this is made difficult by the fact we're doing private clones of the index scheduler and sending it @@ -1503,7 +1491,6 @@ impl IndexScheduler { let document_changes = indexer.into_changes(&primary_key)?; let embedders = index.embedding_configs(index_wtxn)?; let embedders = self.embedders(embedders)?; - let must_stop_processing = &self.must_stop_processing; indexer::index( index_wtxn, @@ -1645,7 +1632,6 @@ impl IndexScheduler { let document_changes = indexer.into_changes(&indexer_alloc, primary_key); let embedders = index.embedding_configs(index_wtxn)?; let embedders = self.embedders(embedders)?; - let must_stop_processing = &self.must_stop_processing; indexer::index( index_wtxn, @@ -1679,7 +1665,6 @@ impl IndexScheduler { task.status = Status::Succeeded; } - let must_stop_processing = self.must_stop_processing.clone(); builder.execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index f295e35b6..f63a289eb 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -148,6 +148,7 @@ pub fn snapshot_task(task: &Task) -> String { enqueued_at: _, started_at: _, finished_at: _, + progress: _, error, canceled_by, details, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index fe8244f9b..16b4a5897 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -55,11 +55,12 @@ use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128}; use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn}; use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::index::IndexEmbeddingConfig; +use meilisearch_types::milli::update::new::indexer::document_changes::Progress; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::task_view::TaskView; -use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; +use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task, TaskProgress}; use rayon::current_num_threads; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; @@ -161,12 +162,18 @@ struct ProcessingTasks { started_at: OffsetDateTime, /// The list of tasks ids that are currently running. processing: RoaringBitmap, + /// The progress on processing tasks + progress: Option, } impl ProcessingTasks { /// Creates an empty `ProcessingAt` struct. fn new() -> ProcessingTasks { - ProcessingTasks { started_at: OffsetDateTime::now_utc(), processing: RoaringBitmap::new() } + ProcessingTasks { + started_at: OffsetDateTime::now_utc(), + processing: RoaringBitmap::new(), + progress: None, + } } /// Stores the currently processing tasks, and the date time at which it started. @@ -175,8 +182,13 @@ impl ProcessingTasks { self.processing = processing; } + fn update_progress(&mut self, progress: Progress) { + self.progress.get_or_insert_with(TaskProgress::default).update(progress); + } + /// Set the processing tasks to an empty list fn stop_processing(&mut self) -> RoaringBitmap { + self.progress = None; std::mem::take(&mut self.processing) } @@ -956,7 +968,7 @@ impl IndexScheduler { tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), )?; - let ProcessingTasks { started_at, processing, .. } = + let ProcessingTasks { started_at, processing, progress, .. } = self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); let ret = tasks.into_iter(); @@ -966,7 +978,12 @@ impl IndexScheduler { Ok(( ret.map(|task| { if processing.contains(task.uid) { - Task { status: Status::Processing, started_at: Some(started_at), ..task } + Task { + status: Status::Processing, + progress: progress.clone(), + started_at: Some(started_at), + ..task + } } else { task } @@ -1008,6 +1025,7 @@ impl IndexScheduler { enqueued_at: OffsetDateTime::now_utc(), started_at: None, finished_at: None, + progress: None, error: None, canceled_by: None, details: kind.default_details(), @@ -1588,6 +1606,8 @@ impl<'a> Dump<'a> { enqueued_at: task.enqueued_at, started_at: task.started_at, finished_at: task.finished_at, + /// FIXME: should we update dump to contain progress information? 🤔 + progress: None, error: task.error, canceled_by: task.canceled_by, details: task.details, diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 788a70fb8..7ae419495 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -345,6 +345,8 @@ impl IndexScheduler { enqueued_at, started_at, finished_at, + /// FIXME: assert something here? ask tamo 🤔 + progress: _, error: _, canceled_by, details, diff --git a/meilisearch-types/src/task_view.rs b/meilisearch-types/src/task_view.rs index 3075fa899..fd9367bf4 100644 --- a/meilisearch-types/src/task_view.rs +++ b/meilisearch-types/src/task_view.rs @@ -4,7 +4,9 @@ use time::{Duration, OffsetDateTime}; use crate::error::ResponseError; use crate::settings::{Settings, Unchecked}; -use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId}; +use crate::tasks::{ + serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId, TaskProgress, +}; #[derive(Debug, Clone, PartialEq, Eq, Serialize)] #[serde(rename_all = "camelCase")] @@ -27,6 +29,8 @@ pub struct TaskView { pub started_at: Option, #[serde(with = "time::serde::rfc3339::option", default)] pub finished_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub progress: Option, } impl TaskView { @@ -43,6 +47,7 @@ impl TaskView { enqueued_at: task.enqueued_at, started_at: task.started_at, finished_at: task.finished_at, + progress: task.progress.clone(), } } } diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 1dd6d3fbf..56d839432 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -4,6 +4,7 @@ use std::fmt::{Display, Write}; use std::str::FromStr; use enum_iterator::Sequence; +use milli::update::new::indexer::document_changes::Progress; use milli::update::IndexDocumentsMethod; use milli::Object; use roaring::RoaringBitmap; @@ -30,6 +31,8 @@ pub struct Task { #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, + pub progress: Option, + pub error: Option, pub canceled_by: Option, pub details: Option
, @@ -38,6 +41,59 @@ pub struct Task { pub kind: KindWithContent, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskProgress { + pub current_step: String, + pub finished_steps: u16, + pub total_steps: u16, + pub finished_documents: Option, + pub total_documents: Option, +} + +impl Default for TaskProgress { + fn default() -> Self { + Self::new() + } +} + +impl TaskProgress { + pub fn new() -> Self { + Self { + current_step: String::new(), + finished_steps: 0, + total_steps: 1, + finished_documents: None, + total_documents: None, + } + } + + pub fn update(&mut self, progress: Progress) { + if self.current_step != progress.step_name { + self.current_step.clear(); + self.current_step.push_str(progress.step_name); + } + self.total_steps = progress.total_steps; + if self.finished_steps > progress.finished_steps { + return; + } + if self.finished_steps < progress.finished_steps { + self.finished_documents = None; + self.total_documents = None; + } + self.finished_steps = progress.finished_steps; + if let Some((finished_documents, total_documents)) = progress.finished_total_documents { + if let Some(task_finished_documents) = self.finished_documents { + if task_finished_documents > finished_documents { + return; + } + } + self.finished_documents = Some(finished_documents); + self.total_documents = Some(total_documents); + } + } +} + impl Task { pub fn index_uid(&self) -> Option<&str> { use KindWithContent::*;