diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 870e56686..4acd7b22a 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -8,6 +8,7 @@ use bumpalo::Bump; use criterion::{criterion_group, criterion_main, Criterion}; use milli::documents::PrimaryKey; use milli::heed::{EnvOpenOptions, RwTxn}; +use milli::progress::Progress; use milli::update::new::indexer; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; @@ -151,7 +152,7 @@ fn indexing_songs_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -166,7 +167,7 @@ fn indexing_songs_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -218,7 +219,7 @@ fn reindexing_songs_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -233,7 +234,7 @@ fn reindexing_songs_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -263,7 +264,7 @@ fn reindexing_songs_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -278,7 +279,7 @@ fn reindexing_songs_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -332,7 +333,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -347,7 +348,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -409,7 +410,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -424,7 +425,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -454,7 +455,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -469,7 +470,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -495,7 +496,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -510,7 +511,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -563,7 +564,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -578,7 +579,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -630,7 +631,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -645,7 +646,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -697,7 +698,7 @@ fn indexing_wiki(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -712,7 +713,7 @@ fn indexing_wiki(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -763,7 +764,7 @@ fn reindexing_wiki(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -778,7 +779,7 @@ fn reindexing_wiki(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -808,7 +809,7 @@ fn reindexing_wiki(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -823,7 +824,7 @@ fn reindexing_wiki(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -876,7 +877,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -891,7 +892,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -953,7 +954,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -968,7 +969,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -999,7 +1000,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -1014,7 +1015,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -1041,7 +1042,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -1056,7 +1057,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -1108,7 +1109,7 @@ fn indexing_movies_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -1123,7 +1124,7 @@ fn indexing_movies_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -1174,7 +1175,7 @@ fn reindexing_movies_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -1189,7 +1190,7 @@ fn reindexing_movies_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -1219,7 +1220,7 @@ fn reindexing_movies_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -1234,7 +1235,7 @@ fn reindexing_movies_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -1287,7 +1288,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -1302,7 +1303,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); @@ -1350,7 +1351,7 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec Index { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -125,7 +126,7 @@ pub fn base_setup(conf: &Conf) -> Index { &document_changes, EmbeddingConfigs::default(), &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); diff --git a/crates/fuzzers/src/bin/fuzz-indexing.rs b/crates/fuzzers/src/bin/fuzz-indexing.rs index ee927940f..08711e5e3 100644 --- a/crates/fuzzers/src/bin/fuzz-indexing.rs +++ b/crates/fuzzers/src/bin/fuzz-indexing.rs @@ -10,6 +10,7 @@ use either::Either; use fuzzers::Operation; use milli::documents::mmap_from_objects; use milli::heed::EnvOpenOptions; +use milli::progress::Progress; use milli::update::new::indexer; use milli::update::{IndexDocumentsMethod, IndexerConfig}; use milli::vector::EmbeddingConfigs; @@ -128,7 +129,7 @@ fn main() { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -143,7 +144,7 @@ fn main() { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 93e9a1404..1bfa7f53b 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -22,8 +22,6 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; -use std::sync::atomic::{self, AtomicU64}; -use std::time::Duration; use bumpalo::collections::CollectIn; use bumpalo::Bump; @@ -32,6 +30,7 @@ use meilisearch_types::batches::BatchId; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; +use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::{ DocumentAdditionResult, IndexDocumentsMethod, Settings as MilliSettings, @@ -41,9 +40,7 @@ use meilisearch_types::milli::vector::parsed_vectors::{ }; use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; -use meilisearch_types::tasks::{ - Details, IndexSwap, Kind, KindWithContent, Status, Task, TaskProgress, -}; +use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; use roaring::RoaringBitmap; use time::macros::format_description; @@ -561,11 +558,12 @@ impl IndexScheduler { /// The list of tasks that were processed. The metadata of each task in the returned /// list is updated accordingly, with the exception of the its date fields /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). - #[tracing::instrument(level = "trace", skip(self, batch), target = "indexing::scheduler", fields(batch=batch.to_string()))] + #[tracing::instrument(level = "trace", skip(self, batch, progress), target = "indexing::scheduler", fields(batch=batch.to_string()))] pub(crate) fn process_batch( &self, batch: Batch, current_batch: &mut ProcessingBatch, + progress: Progress, ) -> Result> { #[cfg(test)] { @@ -953,7 +951,7 @@ impl IndexScheduler { .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let mut index_wtxn = index.write_txn()?; - let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; + let tasks = self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; { let span = tracing::trace_span!(target: "indexing::scheduler", "commit"); @@ -996,6 +994,7 @@ impl IndexScheduler { self.process_batch( Batch::IndexUpdate { index_uid, primary_key, task }, current_batch, + progress, ) } Batch::IndexUpdate { index_uid, primary_key, mut task } => { @@ -1168,7 +1167,7 @@ impl IndexScheduler { /// The list of processed tasks. #[tracing::instrument( level = "trace", - skip(self, index_wtxn, index), + skip(self, index_wtxn, index, progress), target = "indexing::scheduler" )] fn apply_index_operation<'i>( @@ -1176,44 +1175,12 @@ impl IndexScheduler { index_wtxn: &mut RwTxn<'i>, index: &'i Index, operation: IndexOperation, + progress: Progress, ) -> Result> { let indexer_alloc = Bump::new(); let started_processing_at = std::time::Instant::now(); - let secs_since_started_processing_at = AtomicU64::new(0); - const PRINT_SECS_DELTA: u64 = 5; - - let processing_tasks = self.processing_tasks.clone(); let must_stop_processing = self.must_stop_processing.clone(); - let send_progress = |progress| { - let now = std::time::Instant::now(); - let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed); - let previous = started_processing_at + Duration::from_secs(elapsed); - let elapsed = now - previous; - - if elapsed.as_secs() < PRINT_SECS_DELTA { - return; - } - - secs_since_started_processing_at - .store((now - started_processing_at).as_secs(), atomic::Ordering::Relaxed); - - let TaskProgress { - current_step, - finished_steps, - total_steps, - finished_substeps, - total_substeps, - } = processing_tasks.write().unwrap().update_progress(progress); - - tracing::info!( - current_step, - finished_steps, - total_steps, - finished_substeps, - total_substeps - ); - }; match operation { IndexOperation::DocumentClear { index_uid, mut tasks } => { @@ -1308,7 +1275,7 @@ impl IndexScheduler { primary_key.as_deref(), &mut new_fields_ids_map, &|| must_stop_processing.get(), - &send_progress, + progress.clone(), ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; @@ -1356,7 +1323,7 @@ impl IndexScheduler { &document_changes, embedders, &|| must_stop_processing.get(), - &send_progress, + &progress, ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; @@ -1470,7 +1437,7 @@ impl IndexScheduler { &document_changes, embedders, &|| must_stop_processing.get(), - &send_progress, + &progress, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; @@ -1621,7 +1588,7 @@ impl IndexScheduler { &document_changes, embedders, &|| must_stop_processing.get(), - &send_progress, + &progress, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; @@ -1673,12 +1640,14 @@ impl IndexScheduler { index_uid: index_uid.clone(), tasks: cleared_tasks, }, + progress.clone(), )?; let settings_tasks = self.apply_index_operation( index_wtxn, index, IndexOperation::Settings { index_uid, settings, tasks: settings_tasks }, + progress, )?; let mut tasks = settings_tasks; @@ -1702,8 +1671,8 @@ impl IndexScheduler { let all_task_ids = self.all_task_ids(wtxn)?; let mut to_delete_tasks = all_task_ids & matched_tasks; - to_delete_tasks -= processing_tasks; - to_delete_tasks -= enqueued_tasks; + to_delete_tasks -= &**processing_tasks; + to_delete_tasks -= &enqueued_tasks; // 2. We now have a list of tasks to delete, delete them diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index bcd5966b5..67627d8c1 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -353,7 +353,7 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database String { let mut snap = String::new(); - let Batch { uid, details, stats, started_at, finished_at } = batch; + let Batch { uid, details, stats, started_at, finished_at, progress: _ } = batch; if let Some(finished_at) = finished_at { assert!(finished_at > started_at); } diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index e780b21a1..f5f73087d 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -26,6 +26,7 @@ mod index_mapper; #[cfg(test)] mod insta_snapshot; mod lru; +mod processing; mod utils; pub mod uuid_codec; @@ -56,12 +57,12 @@ use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128}; use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn}; use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::index::IndexEmbeddingConfig; -use meilisearch_types::milli::update::new::indexer::document_changes::Progress; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::task_view::TaskView; -use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task, TaskProgress}; +use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; +use processing::ProcessingTasks; use rayon::current_num_threads; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; @@ -72,7 +73,8 @@ use utils::{filter_out_references_to_newer_tasks, keep_ids_within_datetimes, map use uuid::Uuid; use crate::index_mapper::IndexMapper; -use crate::utils::{check_index_swap_validity, clamp_to_page_size, ProcessingBatch}; +use crate::processing::{AtomicTaskStep, BatchProgress}; +use crate::utils::{check_index_swap_validity, clamp_to_page_size}; pub(crate) type BEI128 = I128; @@ -163,48 +165,6 @@ impl Query { } } -#[derive(Debug, Clone)] -pub struct ProcessingTasks { - batch: Option, - /// The list of tasks ids that are currently running. - processing: RoaringBitmap, - /// The progress on processing tasks - progress: Option, -} - -impl ProcessingTasks { - /// Creates an empty `ProcessingAt` struct. - fn new() -> ProcessingTasks { - ProcessingTasks { batch: None, processing: RoaringBitmap::new(), progress: None } - } - - /// Stores the currently processing tasks, and the date time at which it started. - fn start_processing(&mut self, processing_batch: ProcessingBatch, processing: RoaringBitmap) { - self.batch = Some(processing_batch); - self.processing = processing; - } - - fn update_progress(&mut self, progress: Progress) -> TaskProgress { - self.progress.get_or_insert_with(TaskProgress::default).update(progress) - } - - /// Set the processing tasks to an empty list - fn stop_processing(&mut self) -> Self { - self.progress = None; - - Self { - batch: std::mem::take(&mut self.batch), - processing: std::mem::take(&mut self.processing), - progress: None, - } - } - - /// Returns `true` if there, at least, is one task that is currently processing that we must stop. - fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { - !self.processing.is_disjoint(canceled_tasks) - } -} - #[derive(Default, Clone, Debug)] struct MustStopProcessing(Arc); @@ -813,7 +773,7 @@ impl IndexScheduler { let mut batch_tasks = RoaringBitmap::new(); for batch_uid in batch_uids { if processing_batch.as_ref().map_or(false, |batch| batch.uid == *batch_uid) { - batch_tasks |= &processing_tasks; + batch_tasks |= &*processing_tasks; } else { batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?; } @@ -827,13 +787,13 @@ impl IndexScheduler { match status { // special case for Processing tasks Status::Processing => { - status_tasks |= &processing_tasks; + status_tasks |= &*processing_tasks; } status => status_tasks |= &self.get_status(rtxn, *status)?, }; } if !status.contains(&Status::Processing) { - tasks -= &processing_tasks; + tasks -= &*processing_tasks; } tasks &= status_tasks; } @@ -882,7 +842,7 @@ impl IndexScheduler { // Once we have filtered the two subsets, we put them back together and assign it back to `tasks`. tasks = { let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) = - (&tasks - &processing_tasks, &tasks & &processing_tasks); + (&tasks - &*processing_tasks, &tasks & &*processing_tasks); // special case for Processing tasks // A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds @@ -1090,7 +1050,7 @@ impl IndexScheduler { // Once we have filtered the two subsets, we put them back together and assign it back to `batches`. batches = { let (mut filtered_non_processing_batches, mut filtered_processing_batches) = - (&batches - &processing.processing, &batches & &processing.processing); + (&batches - &*processing.processing, &batches & &*processing.processing); // special case for Processing batches // A closure that clears the filtered_processing_batches if their started_at date falls outside the given bounds @@ -1606,7 +1566,8 @@ impl IndexScheduler { // We reset the must_stop flag to be sure that we don't stop processing tasks self.must_stop_processing.reset(); - self.processing_tasks + let progress = self + .processing_tasks .write() .unwrap() // We can clone the processing batch here because we don't want its modification to affect the view of the processing batches @@ -1619,11 +1580,12 @@ impl IndexScheduler { let res = { let cloned_index_scheduler = self.private_clone(); let processing_batch = &mut processing_batch; + let progress = progress.clone(); std::thread::scope(|s| { let handle = std::thread::Builder::new() .name(String::from("batch-operation")) .spawn_scoped(s, move || { - cloned_index_scheduler.process_batch(batch, processing_batch) + cloned_index_scheduler.process_batch(batch, processing_batch, progress) }) .unwrap(); handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) @@ -1636,6 +1598,7 @@ impl IndexScheduler { #[cfg(test)] self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; + progress.update_progress(BatchProgress::WritingTasksToDisk); processing_batch.finished(); let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut canceled = RoaringBitmap::new(); @@ -1645,12 +1608,15 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::ProcessBatchSucceeded); + let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32); + progress.update_progress(task_progress_obj); let mut success = 0; let mut failure = 0; let mut canceled_by = None; #[allow(unused_variables)] for (i, mut task) in tasks.into_iter().enumerate() { + task_progress.fetch_add(1, Ordering::Relaxed); processing_batch.update(&mut task); if task.status == Status::Canceled { canceled.insert(task.uid); @@ -1718,8 +1684,12 @@ impl IndexScheduler { Err(err) => { #[cfg(test)] self.breakpoint(Breakpoint::ProcessBatchFailed); + let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32); + progress.update_progress(task_progress_obj); + let error: ResponseError = err.into(); for id in ids.iter() { + task_progress.fetch_add(1, Ordering::Relaxed); let mut task = self .get_task(&wtxn, id) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs new file mode 100644 index 000000000..e5e892927 --- /dev/null +++ b/crates/index-scheduler/src/processing.rs @@ -0,0 +1,205 @@ +use crate::utils::ProcessingBatch; +use meilisearch_types::milli::progress::{AtomicSubStep, NamedStep, Progress, ProgressView, Step}; +use roaring::RoaringBitmap; +use std::{borrow::Cow, sync::Arc}; + +#[derive(Clone)] +pub struct ProcessingTasks { + pub batch: Option>, + /// The list of tasks ids that are currently running. + pub processing: Arc, + /// The progress on processing tasks + pub progress: Option, +} + +impl ProcessingTasks { + /// Creates an empty `ProcessingAt` struct. + pub fn new() -> ProcessingTasks { + ProcessingTasks { batch: None, processing: Arc::new(RoaringBitmap::new()), progress: None } + } + + pub fn get_progress_view(&self) -> Option { + Some(self.progress.as_ref()?.as_progress_view()) + } + + /// Stores the currently processing tasks, and the date time at which it started. + pub fn start_processing( + &mut self, + processing_batch: ProcessingBatch, + processing: RoaringBitmap, + ) -> Progress { + self.batch = Some(Arc::new(processing_batch)); + self.processing = Arc::new(processing); + let progress = Progress::default(); + progress.update_progress(BatchProgress::ProcessingTasks); + self.progress = Some(progress.clone()); + + progress + } + + /// Set the processing tasks to an empty list + pub fn stop_processing(&mut self) -> Self { + self.progress = None; + + Self { + batch: std::mem::take(&mut self.batch), + processing: std::mem::take(&mut self.processing), + progress: None, + } + } + + /// Returns `true` if there, at least, is one task that is currently processing that we must stop. + pub fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { + !self.processing.is_disjoint(canceled_tasks) + } +} + +#[repr(u8)] +#[derive(Copy, Clone)] +pub enum BatchProgress { + ProcessingTasks, + WritingTasksToDisk, +} + +impl Step for BatchProgress { + fn name(&self) -> Cow<'static, str> { + match self { + BatchProgress::ProcessingTasks => Cow::Borrowed("processing tasks"), + BatchProgress::WritingTasksToDisk => Cow::Borrowed("writing tasks to disk"), + } + } + + fn current(&self) -> u32 { + *self as u8 as u32 + } + + fn total(&self) -> u32 { + 2 + } +} + +#[derive(Default)] +pub struct Task {} + +impl NamedStep for Task { + fn name(&self) -> &'static str { + "task" + } +} +pub type AtomicTaskStep = AtomicSubStep; + +#[cfg(test)] +mod test { + use std::sync::atomic::Ordering; + + use meili_snap::{json_string, snapshot}; + + use super::*; + + #[test] + fn one_level() { + let mut processing = ProcessingTasks::new(); + processing.start_processing(ProcessingBatch::new(0), RoaringBitmap::new()); + snapshot!(json_string!(processing.get_progress_view()), @r#" + { + "steps": [ + { + "name": "processing tasks", + "finished": 0, + "total": 2 + } + ], + "percentage": 0.0 + } + "#); + processing.progress.as_ref().unwrap().update_progress(BatchProgress::WritingTasksToDisk); + snapshot!(json_string!(processing.get_progress_view()), @r#" + { + "steps": [ + { + "name": "writing tasks to disk", + "finished": 1, + "total": 2 + } + ], + "percentage": 50.0 + } + "#); + } + + #[test] + fn task_progress() { + let mut processing = ProcessingTasks::new(); + processing.start_processing(ProcessingBatch::new(0), RoaringBitmap::new()); + let (atomic, tasks) = AtomicTaskStep::new(10); + processing.progress.as_ref().unwrap().update_progress(tasks); + snapshot!(json_string!(processing.get_progress_view()), @r#" + { + "steps": [ + { + "name": "processing tasks", + "finished": 0, + "total": 2 + }, + { + "name": "task", + "finished": 0, + "total": 10 + } + ], + "percentage": 0.0 + } + "#); + atomic.fetch_add(6, Ordering::Relaxed); + snapshot!(json_string!(processing.get_progress_view()), @r#" + { + "steps": [ + { + "name": "processing tasks", + "finished": 0, + "total": 2 + }, + { + "name": "task", + "finished": 6, + "total": 10 + } + ], + "percentage": 30.000002 + } + "#); + processing.progress.as_ref().unwrap().update_progress(BatchProgress::WritingTasksToDisk); + snapshot!(json_string!(processing.get_progress_view()), @r#" + { + "steps": [ + { + "name": "writing tasks to disk", + "finished": 1, + "total": 2 + } + ], + "percentage": 50.0 + } + "#); + let (atomic, tasks) = AtomicTaskStep::new(5); + processing.progress.as_ref().unwrap().update_progress(tasks); + atomic.fetch_add(4, Ordering::Relaxed); + snapshot!(json_string!(processing.get_progress_view()), @r#" + { + "steps": [ + { + "name": "writing tasks to disk", + "finished": 1, + "total": 2 + }, + { + "name": "task", + "finished": 4, + "total": 5 + } + ], + "percentage": 90.0 + } + "#); + } +} diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 356d77b35..3718c69ca 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -134,6 +134,7 @@ impl ProcessingBatch { pub fn to_batch(&self) -> Batch { Batch { uid: self.uid, + progress: None, details: self.details.clone(), stats: self.stats.clone(), started_at: self.started_at, @@ -187,6 +188,7 @@ impl IndexScheduler { &batch.uid, &Batch { uid: batch.uid, + progress: None, details: batch.details, stats: batch.stats, started_at: batch.started_at, @@ -273,7 +275,10 @@ impl IndexScheduler { .into_iter() .map(|batch_id| { if Some(batch_id) == processing.batch.as_ref().map(|batch| batch.uid) { - Ok(processing.batch.as_ref().unwrap().to_batch()) + let mut batch = processing.batch.as_ref().unwrap().to_batch(); + println!("here with progress: {}", processing.progress.is_some()); + batch.progress = processing.get_progress_view(); + Ok(batch) } else { self.get_batch(rtxn, batch_id) .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) diff --git a/crates/meilisearch-types/src/batch_view.rs b/crates/meilisearch-types/src/batch_view.rs index 5d800d897..a3d7f834f 100644 --- a/crates/meilisearch-types/src/batch_view.rs +++ b/crates/meilisearch-types/src/batch_view.rs @@ -1,3 +1,4 @@ +use milli::progress::ProgressView; use serde::Serialize; use time::{Duration, OffsetDateTime}; @@ -11,6 +12,7 @@ use crate::{ #[serde(rename_all = "camelCase")] pub struct BatchView { pub uid: BatchId, + pub progress: Option, pub details: DetailsView, pub stats: BatchStats, #[serde(serialize_with = "serialize_duration", default)] @@ -25,6 +27,7 @@ impl BatchView { pub fn from_batch(batch: &Batch) -> Self { Self { uid: batch.uid, + progress: batch.progress.clone(), details: batch.details.clone(), stats: batch.stats.clone(), duration: batch.finished_at.map(|finished_at| finished_at - batch.started_at), diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index a60386e52..57c609320 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use milli::progress::ProgressView; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -15,6 +16,8 @@ pub type BatchId = u32; pub struct Batch { pub uid: BatchId, + #[serde(skip_deserializing)] + pub progress: Option, pub details: DetailsView, pub stats: BatchStats, diff --git a/crates/meilisearch-types/src/tasks.rs b/crates/meilisearch-types/src/tasks.rs index ebd28f526..c62f550ae 100644 --- a/crates/meilisearch-types/src/tasks.rs +++ b/crates/meilisearch-types/src/tasks.rs @@ -4,7 +4,6 @@ use std::fmt::{Display, Write}; use std::str::FromStr; use enum_iterator::Sequence; -use milli::update::new::indexer::document_changes::Progress; use milli::update::IndexDocumentsMethod; use milli::Object; use roaring::RoaringBitmap; @@ -41,62 +40,6 @@ pub struct Task { pub kind: KindWithContent, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct TaskProgress { - pub current_step: &'static str, - pub finished_steps: u16, - pub total_steps: u16, - pub finished_substeps: Option, - pub total_substeps: Option, -} - -impl Default for TaskProgress { - fn default() -> Self { - Self::new() - } -} - -impl TaskProgress { - pub fn new() -> Self { - Self { - current_step: "start", - finished_steps: 0, - total_steps: 1, - finished_substeps: None, - total_substeps: None, - } - } - - pub fn update(&mut self, progress: Progress) -> TaskProgress { - if self.finished_steps > progress.finished_steps { - return *self; - } - - if self.current_step != progress.step_name { - self.current_step = progress.step_name - } - - self.total_steps = progress.total_steps; - - if self.finished_steps < progress.finished_steps { - self.finished_substeps = None; - self.total_substeps = None; - } - self.finished_steps = progress.finished_steps; - if let Some((finished_substeps, total_substeps)) = progress.finished_total_substep { - if let Some(task_finished_substeps) = self.finished_substeps { - if task_finished_substeps > finished_substeps { - return *self; - } - } - self.finished_substeps = Some(finished_substeps); - self.total_substeps = Some(total_substeps); - } - *self - } -} - impl Task { pub fn index_uid(&self) -> Option<&str> { use KindWithContent::*; diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 268d33cd9..f60b59c72 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1734,6 +1734,7 @@ pub(crate) mod tests { use crate::error::{Error, InternalError}; use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS}; + use crate::progress::Progress; use crate::update::new::indexer; use crate::update::settings::InnerIndexSettings; use crate::update::{ @@ -1810,7 +1811,7 @@ pub(crate) mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), )?; if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { @@ -1829,7 +1830,7 @@ pub(crate) mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) }) .unwrap()?; @@ -1901,7 +1902,7 @@ pub(crate) mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), )?; if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { @@ -1920,7 +1921,7 @@ pub(crate) mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) }) .unwrap()?; @@ -1982,7 +1983,7 @@ pub(crate) mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2001,7 +2002,7 @@ pub(crate) mod tests { &document_changes, embedders, &|| should_abort.load(Relaxed), - &|_| (), + &Progress::default(), ) }) .unwrap() diff --git a/crates/milli/src/lib.rs b/crates/milli/src/lib.rs index 1fc876f79..3ae0bfdb9 100644 --- a/crates/milli/src/lib.rs +++ b/crates/milli/src/lib.rs @@ -31,6 +31,7 @@ pub mod vector; #[macro_use] pub mod snapshot_tests; mod fieldids_weights_map; +pub mod progress; use std::collections::{BTreeMap, HashMap}; use std::convert::{TryFrom, TryInto}; diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs new file mode 100644 index 000000000..63f0fbef8 --- /dev/null +++ b/crates/milli/src/progress.rs @@ -0,0 +1,116 @@ +use std::{ + any::TypeId, + borrow::Cow, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, RwLock, + }, +}; + +use serde::Serialize; + +pub trait Step: 'static + Send + Sync { + fn name(&self) -> Cow<'static, str>; + fn current(&self) -> u32; + fn total(&self) -> u32; +} + +#[derive(Clone, Default)] +pub struct Progress { + steps: Arc)>>>, +} + +impl Progress { + pub fn update_progress(&self, sub_progress: P) { + let mut steps = self.steps.write().unwrap(); + let step_type = TypeId::of::

(); + if let Some(idx) = steps.iter().position(|(id, _)| *id == step_type) { + steps.truncate(idx); + } + steps.push((step_type, Box::new(sub_progress))); + } + + // TODO: This code should be in meilisearch_types but cannot because milli can't depend on meilisearch_types + pub fn as_progress_view(&self) -> ProgressView { + let steps = self.steps.read().unwrap(); + + let mut percentage = 0.0; + let mut prev_factors = 1.0; + + let mut step_view = Vec::new(); + for (_, step) in steps.iter() { + prev_factors *= step.total() as f32; + percentage += step.current() as f32 / prev_factors; + + step_view.push(ProgressStepView { + name: step.name(), + finished: step.current(), + total: step.total(), + }); + } + + ProgressView { steps: step_view, percentage: percentage * 100.0 } + } +} + +/// This trait lets you use the AtomicSubStep defined right below. +/// The name must be a const that never changed but that can't be enforced by the type system because it make the trait non object-safe. +/// By forcing the Default trait + the &'static str we make it harder to miss-use the trait. +pub trait NamedStep: 'static + Send + Sync + Default { + fn name(&self) -> &'static str; +} + +/// Structure to quickly define steps that need very quick, lockless updating of their current step. +/// You can use this struct if: +/// - The name of the step doesn't change +/// - The total number of steps doesn't change +pub struct AtomicSubStep { + name: Name, + current: Arc, + total: u32, +} + +impl AtomicSubStep { + pub fn new(total: u32) -> (Arc, Self) { + let current = Arc::new(AtomicU32::new(0)); + (current.clone(), Self { current, total, name: Name::default() }) + } +} + +impl Step for AtomicSubStep { + fn name(&self) -> Cow<'static, str> { + self.name.name().into() + } + + fn current(&self) -> u32 { + self.current.load(Ordering::Relaxed) + } + + fn total(&self) -> u32 { + self.total + } +} + +#[derive(Default)] +pub struct Document {} + +impl NamedStep for Document { + fn name(&self) -> &'static str { + "document" + } +} + +pub type AtomicDocumentStep = AtomicSubStep; + +#[derive(Debug, Serialize, Clone)] +pub struct ProgressView { + steps: Vec, + percentage: f32, +} + +#[derive(Debug, Serialize, Clone)] +pub struct ProgressStepView { + name: Cow<'static, str>, + finished: u32, + total: u32, +} diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index 5db5b400b..04d3b6667 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -5,6 +5,7 @@ use bumpalo::Bump; use heed::EnvOpenOptions; use maplit::{btreemap, hashset}; +use crate::progress::Progress; use crate::update::new::indexer; use crate::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use crate::vector::EmbeddingConfigs; @@ -72,7 +73,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -91,7 +92,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 3988b311c..bae8e00b4 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -766,6 +766,7 @@ mod tests { use crate::documents::mmap_from_objects; use crate::index::tests::TempIndex; use crate::index::IndexEmbeddingConfig; + use crate::progress::Progress; use crate::search::TermsMatchingStrategy; use crate::update::new::indexer; use crate::update::Setting; @@ -1964,7 +1965,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2148,7 +2149,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2163,7 +2164,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2210,7 +2211,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2225,7 +2226,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2263,7 +2264,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2278,7 +2279,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2315,7 +2316,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2330,7 +2331,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2369,7 +2370,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2384,7 +2385,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2428,7 +2429,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2443,7 +2444,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2480,7 +2481,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2495,7 +2496,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2532,7 +2533,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2547,7 +2548,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2726,7 +2727,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2741,7 +2742,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2785,7 +2786,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2800,7 +2801,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2841,7 +2842,7 @@ mod tests { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -2856,7 +2857,7 @@ mod tests { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); wtxn.commit().unwrap(); diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index b865d0a35..66ed6cbfb 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -16,10 +16,10 @@ use crate::update::del_add::DelAdd; use crate::update::new::channel::FieldIdDocidFacetSender; use crate::update::new::extract::perm_json_p; use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, + extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; -use crate::update::new::steps::Step; +use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; @@ -373,26 +373,16 @@ fn truncate_str(s: &str) -> &str { impl FacetedDocidsExtractor { #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] - pub fn run_extraction< - 'pl, - 'fid, - 'indexer, - 'index, - 'extractor, - DC: DocumentChanges<'pl>, - MSP, - SP, - >( + pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, sender: &FieldIdDocidFacetSender, - step: Step, + step: IndexingStep, ) -> Result>> where MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { let index = indexing_context.index; let rtxn = index.read_txn()?; diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 0bdf31635..4bcb918e4 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -15,23 +15,22 @@ pub use geo::*; pub use searchable::*; pub use vectors::EmbeddingExtractor; -use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress}; -use super::steps::Step; +use super::indexer::document_changes::{DocumentChanges, IndexingContext}; +use super::steps::IndexingStep; use super::thread_local::{FullySend, ThreadLocal}; use crate::update::GrenadParameters; use crate::Result; pub trait DocidsExtractor { - fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP, SP>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, - step: Step, + step: IndexingStep, ) -> Result>> where - MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync; + MSP: Fn() -> bool + Sync; } /// TODO move in permissive json pointer diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 5e85eb1c8..952ee91e4 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -11,10 +11,10 @@ use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, + extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; -use crate::update::new::steps::Step; +use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; @@ -239,25 +239,15 @@ impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> { pub struct WordDocidsExtractors; impl WordDocidsExtractors { - pub fn run_extraction< - 'pl, - 'fid, - 'indexer, - 'index, - 'extractor, - DC: DocumentChanges<'pl>, - MSP, - SP, - >( + pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, - step: Step, + step: IndexingStep, ) -> Result> where MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { let index = indexing_context.index; let rtxn = index.read_txn()?; diff --git a/crates/milli/src/update/new/extract/searchable/mod.rs b/crates/milli/src/update/new/extract/searchable/mod.rs index 05d2406d9..c4240196a 100644 --- a/crates/milli/src/update/new/extract/searchable/mod.rs +++ b/crates/milli/src/update/new/extract/searchable/mod.rs @@ -14,9 +14,9 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::BalancedCaches; use super::DocidsExtractor; use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, + extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, }; -use crate::update::new::steps::Step; +use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; @@ -56,16 +56,15 @@ impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> } pub trait SearchableExtractor: Sized + Sync { - fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP, SP>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, - step: Step, + step: IndexingStep, ) -> Result>> where MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { let rtxn = indexing_context.index.read_txn()?; let stop_words = indexing_context.index.stop_words(&rtxn)?; @@ -134,16 +133,15 @@ pub trait SearchableExtractor: Sized + Sync { } impl DocidsExtractor for T { - fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP, SP>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, - step: Step, + step: IndexingStep, ) -> Result>> where MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { Self::run_extraction( grenad_parameters, diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index 2a5c25525..f2edfb1f3 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -1,4 +1,5 @@ use std::cell::{Cell, RefCell}; +use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use bumpalo::Bump; @@ -7,8 +8,9 @@ use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; +use crate::progress::{AtomicDocumentStep, Progress}; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; -use crate::update::new::steps::Step; +use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; @@ -133,10 +135,8 @@ pub struct IndexingContext< 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, - SP, > where MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { pub index: &'index Index, pub db_fields_ids_map: &'indexer FieldsIdsMap, @@ -144,7 +144,8 @@ pub struct IndexingContext< pub doc_allocs: &'indexer ThreadLocal>>, pub fields_ids_map_store: &'indexer ThreadLocal>>>, pub must_stop_processing: &'indexer MSP, - pub send_progress: &'indexer SP, + // TODO: TAMO: Rename field to progress + pub send_progress: &'indexer Progress, } impl< @@ -152,18 +153,15 @@ impl< 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, - SP, > Copy for IndexingContext< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, - SP, > where MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { } @@ -172,18 +170,15 @@ impl< 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, - SP, > Clone for IndexingContext< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, - SP, > where MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { fn clone(&self) -> Self { *self @@ -202,7 +197,6 @@ pub fn extract< EX, DC: DocumentChanges<'pl>, MSP, - SP, >( document_changes: &DC, extractor: &EX, @@ -214,17 +208,17 @@ pub fn extract< fields_ids_map_store, must_stop_processing, send_progress, - }: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, + }: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, datastore: &'data ThreadLocal, - step: Step, + step: IndexingStep, ) -> Result<()> where EX: Extractor<'extractor>, MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { tracing::trace!("We are resetting the extractor allocators"); + send_progress.update_progress(step); // Clean up and reuse the extractor allocs for extractor_alloc in extractor_allocs.iter_mut() { tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes()); @@ -232,6 +226,8 @@ where } let total_documents = document_changes.len() as u32; + let (step, progress_step) = AtomicDocumentStep::new(total_documents); + send_progress.update_progress(progress_step); let pi = document_changes.iter(CHUNK_SIZE); pi.enumerate().try_arc_for_each_try_init( @@ -253,7 +249,7 @@ where } let finished_documents = (finished_documents * CHUNK_SIZE) as u32; - (send_progress)(Progress::from_step_substep(step, finished_documents, total_documents)); + step.store(finished_documents, Ordering::Relaxed); // Clean up and reuse the document-specific allocator context.doc_alloc.reset(); @@ -271,32 +267,7 @@ where res }, )?; - - (send_progress)(Progress::from_step_substep(step, total_documents, total_documents)); + step.store(total_documents, Ordering::Relaxed); Ok(()) } - -pub struct Progress { - pub finished_steps: u16, - pub total_steps: u16, - pub step_name: &'static str, - pub finished_total_substep: Option<(u32, u32)>, -} - -impl Progress { - pub fn from_step(step: Step) -> Self { - Self { - finished_steps: step.finished_steps(), - total_steps: Step::total_steps(), - step_name: step.name(), - finished_total_substep: None, - } - } - pub fn from_step_substep(step: Step, finished_substep: u32, total_substep: u32) -> Self { - Self { - finished_total_substep: Some((finished_substep, total_substep)), - ..Progress::from_step(step) - } - } -} diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index 518786e6f..33e69e49c 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -92,11 +92,12 @@ mod test { use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::tests::TempIndex; + use crate::progress::Progress; use crate::update::new::indexer::document_changes::{ extract, DocumentChangeContext, Extractor, IndexingContext, }; use crate::update::new::indexer::DocumentDeletion; - use crate::update::new::steps::Step; + use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{MostlySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::DocumentId; @@ -164,7 +165,7 @@ mod test { doc_allocs: &doc_allocs, fields_ids_map_store: &fields_ids_map_store, must_stop_processing: &(|| false), - send_progress: &(|_progress| {}), + send_progress: &Progress::default(), }; for _ in 0..3 { @@ -176,7 +177,7 @@ mod test { context, &mut extractor_allocs, &datastore, - Step::ExtractingDocuments, + IndexingStep::ExtractingDocuments, ) .unwrap(); diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 0b7ec493e..0ce53d5d2 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::Ordering; + use bumpalo::collections::CollectIn; use bumpalo::Bump; use bumparaw_collections::RawMap; @@ -10,11 +12,12 @@ use serde_json::value::RawValue; use serde_json::Deserializer; use super::super::document_change::DocumentChange; -use super::document_changes::{DocumentChangeContext, DocumentChanges, Progress}; +use super::document_changes::{DocumentChangeContext, DocumentChanges}; use super::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; +use crate::progress::{AtomicSubStep, Progress}; use crate::update::new::document::Versions; -use crate::update::new::steps::Step; +use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; use crate::update::new::{Deletion, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; @@ -45,7 +48,7 @@ impl<'pl> DocumentOperation<'pl> { #[allow(clippy::too_many_arguments)] #[tracing::instrument(level = "trace", skip_all, target = "indexing::document_operation")] - pub fn into_changes( + pub fn into_changes( self, indexer: &'pl Bump, index: &Index, @@ -53,12 +56,12 @@ impl<'pl> DocumentOperation<'pl> { primary_key_from_op: Option<&'pl str>, new_fields_ids_map: &mut FieldsIdsMap, must_stop_processing: &MSP, - send_progress: &SP, + progress: Progress, ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> where MSP: Fn() -> bool, - SP: Fn(Progress), { + progress.update_progress(IndexingStep::PreparingPayloads); let Self { operations, method } = self; let documents_ids = index.documents_ids(rtxn)?; @@ -68,16 +71,15 @@ impl<'pl> DocumentOperation<'pl> { let mut primary_key = None; let payload_count = operations.len(); + let (step, progress_step) = + AtomicSubStep::::new(payload_count as u32); + progress.update_progress(progress_step); for (payload_index, operation) in operations.into_iter().enumerate() { if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); } - send_progress(Progress::from_step_substep( - Step::PreparingPayloads, - payload_index as u32, - payload_count as u32, - )); + step.store(payload_index as u32, Ordering::Relaxed); let mut bytes = 0; let result = match operation { @@ -118,12 +120,7 @@ impl<'pl> DocumentOperation<'pl> { }; operations_stats.push(PayloadStats { document_count, bytes, error }); } - - send_progress(Progress::from_step_substep( - Step::PreparingPayloads, - payload_count as u32, - payload_count as u32, - )); + step.store(payload_count as u32, Ordering::Relaxed); // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 601645385..79416bcd5 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -5,7 +5,7 @@ use std::thread::{self, Builder}; use big_s::S; use bumparaw_collections::RawMap; -use document_changes::{extract, DocumentChanges, IndexingContext, Progress}; +use document_changes::{extract, DocumentChanges, IndexingContext}; pub use document_deletion::DocumentDeletion; pub use document_operation::{DocumentOperation, PayloadStats}; use hashbrown::HashMap; @@ -22,7 +22,7 @@ use super::channel::*; use super::extract::*; use super::facet_search_builder::FacetSearchBuilder; use super::merger::FacetFieldIdsDelta; -use super::steps::Step; +use super::steps::IndexingStep; use super::thread_local::ThreadLocal; use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use super::words_prefix_docids::{ @@ -33,6 +33,7 @@ use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; +use crate::progress::Progress; use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; use crate::update::new::extract::EmbeddingExtractor; @@ -60,7 +61,7 @@ mod update_by_function; /// /// TODO return stats #[allow(clippy::too_many_arguments)] // clippy: 😝 -pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( +pub fn index<'pl, 'indexer, 'index, DC, MSP>( wtxn: &mut RwTxn, index: &'index Index, pool: &ThreadPoolNoAbort, @@ -71,12 +72,11 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( document_changes: &DC, embedders: EmbeddingConfigs, must_stop_processing: &'indexer MSP, - send_progress: &'indexer SP, + send_progress: &'indexer Progress, ) -> Result<()> where DC: DocumentChanges<'pl>, MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, { let mut bbbuffers = Vec::new(); let finished_extraction = AtomicBool::new(false); @@ -159,7 +159,7 @@ where indexing_context, &mut extractor_allocs, &datastore, - Step::ExtractingDocuments, + IndexingStep::ExtractingDocuments, )?; } { @@ -191,7 +191,7 @@ where indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), - Step::ExtractingFacets + IndexingStep::ExtractingFacets )? }; @@ -224,7 +224,7 @@ where document_changes, indexing_context, &mut extractor_allocs, - Step::ExtractingWords + IndexingStep::ExtractingWords )? }; @@ -302,7 +302,7 @@ where document_changes, indexing_context, &mut extractor_allocs, - Step::ExtractingWordProximity, + IndexingStep::ExtractingWordProximity, )? }; @@ -338,7 +338,7 @@ where indexing_context, &mut extractor_allocs, &datastore, - Step::ExtractingEmbeddings, + IndexingStep::ExtractingEmbeddings, )?; } { @@ -371,7 +371,7 @@ where indexing_context, &mut extractor_allocs, &datastore, - Step::WritingGeoPoints + IndexingStep::WritingGeoPoints )?; } @@ -383,9 +383,7 @@ where &indexing_context.must_stop_processing, )?; } - - (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); - + indexing_context.send_progress.update_progress(IndexingStep::WritingToDatabase); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); Result::Ok((facet_field_ids_delta, index_embeddings)) @@ -485,7 +483,7 @@ where )?; } - (indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors)); + indexing_context.send_progress.update_progress(IndexingStep::WaitingForExtractors); let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?; @@ -498,10 +496,9 @@ where break 'vectors; } - (indexing_context.send_progress)(Progress::from_step( - Step::WritingEmbeddingsToDatabase, - )); - + indexing_context + .send_progress + .update_progress(IndexingStep::WritingEmbeddingsToDatabase); let mut rng = rand::rngs::StdRng::seed_from_u64(42); for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers { let dimensions = *dimensions; @@ -517,21 +514,19 @@ where index.put_embedding_configs(wtxn, index_embeddings)?; } - (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingFacets)); - + indexing_context.send_progress.update_progress(IndexingStep::PostProcessingFacets); if index.facet_search(wtxn)? { compute_facet_search_database(index, wtxn, global_fields_ids_map)?; } compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; - (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingWords)); - + indexing_context.send_progress.update_progress(IndexingStep::PostProcessingWords); if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { compute_prefix_database(index, wtxn, prefix_delta, grenad_parameters)?; } - (indexing_context.send_progress)(Progress::from_step(Step::Finalizing)); + indexing_context.send_progress.update_progress(IndexingStep::Finalizing); Ok(()) as Result<_> })?; diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index bee1be260..9eb7d376d 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -1,8 +1,12 @@ +use std::borrow::Cow; + use enum_iterator::Sequence; +use crate::progress::Step; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)] -#[repr(u16)] -pub enum Step { +#[repr(u8)] +pub enum IndexingStep { PreparingPayloads, ExtractingDocuments, ExtractingFacets, @@ -18,30 +22,31 @@ pub enum Step { Finalizing, } -impl Step { - pub fn name(&self) -> &'static str { +impl Step for IndexingStep { + fn name(&self) -> Cow<'static, str> { match self { - Step::PreparingPayloads => "preparing update file", - Step::ExtractingDocuments => "extracting documents", - Step::ExtractingFacets => "extracting facets", - Step::ExtractingWords => "extracting words", - Step::ExtractingWordProximity => "extracting word proximity", - Step::ExtractingEmbeddings => "extracting embeddings", - Step::WritingGeoPoints => "writing geo points", - Step::WritingToDatabase => "writing to database", - Step::WaitingForExtractors => "waiting for extractors", - Step::WritingEmbeddingsToDatabase => "writing embeddings to database", - Step::PostProcessingFacets => "post-processing facets", - Step::PostProcessingWords => "post-processing words", - Step::Finalizing => "finalizing", + IndexingStep::PreparingPayloads => "preparing update file", + IndexingStep::ExtractingDocuments => "extracting documents", + IndexingStep::ExtractingFacets => "extracting facets", + IndexingStep::ExtractingWords => "extracting words", + IndexingStep::ExtractingWordProximity => "extracting word proximity", + IndexingStep::ExtractingEmbeddings => "extracting embeddings", + IndexingStep::WritingGeoPoints => "writing geo points", + IndexingStep::WritingToDatabase => "writing to database", + IndexingStep::WaitingForExtractors => "waiting for extractors", + IndexingStep::WritingEmbeddingsToDatabase => "writing embeddings to database", + IndexingStep::PostProcessingFacets => "post-processing facets", + IndexingStep::PostProcessingWords => "post-processing words", + IndexingStep::Finalizing => "finalizing", } + .into() } - pub fn finished_steps(self) -> u16 { - self as u16 + fn current(&self) -> u32 { + *self as u32 } - pub const fn total_steps() -> u16 { - Self::CARDINALITY as u16 + fn total(&self) -> u32 { + Self::CARDINALITY as u32 } } diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 418cdc356..ced81409d 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -3,6 +3,7 @@ use bumpalo::Bump; use heed::EnvOpenOptions; use maplit::hashset; use milli::documents::mmap_from_objects; +use milli::progress::Progress; use milli::update::new::indexer; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; @@ -57,7 +58,7 @@ fn test_facet_distribution_with_no_facet_values() { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -72,7 +73,7 @@ fn test_facet_distribution_with_no_facet_values() { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 08b22d7b6..30690969b 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -7,6 +7,7 @@ use bumpalo::Bump; use either::{Either, Left, Right}; use heed::EnvOpenOptions; use maplit::{btreemap, hashset}; +use milli::progress::Progress; use milli::update::new::indexer; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; @@ -90,7 +91,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -109,7 +110,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 8401f0444..304059915 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -5,6 +5,7 @@ use bumpalo::Bump; use heed::EnvOpenOptions; use itertools::Itertools; use maplit::hashset; +use milli::progress::Progress; use milli::update::new::indexer; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; @@ -326,7 +327,7 @@ fn criteria_ascdesc() { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -341,7 +342,7 @@ fn criteria_ascdesc() { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index dbee296ee..d33d79e54 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -3,6 +3,7 @@ use std::collections::BTreeSet; use bumpalo::Bump; use heed::EnvOpenOptions; use milli::documents::mmap_from_objects; +use milli::progress::Progress; use milli::update::new::indexer; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; @@ -135,7 +136,7 @@ fn test_typo_disabled_on_word() { None, &mut new_fields_ids_map, &|| false, - &|_progress| (), + Progress::default(), ) .unwrap(); @@ -150,7 +151,7 @@ fn test_typo_disabled_on_word() { &document_changes, embedders, &|| false, - &|_| (), + &Progress::default(), ) .unwrap();