From 3658f57f935dd63755df5d90467de78b2d5339b5 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 4 Nov 2024 15:10:40 +0100 Subject: [PATCH] Add progress --- index-scheduler/src/batch.rs | 44 ++++++ .../new/extract/faceted/extract_facets.rs | 22 ++- milli/src/update/new/extract/mod.rs | 16 +- .../extract/searchable/extract_word_docids.rs | 22 ++- .../src/update/new/extract/searchable/mod.rs | 38 +++-- .../update/new/indexer/document_changes.rs | 111 ++++++++++++- .../update/new/indexer/document_deletion.rs | 14 +- .../update/new/indexer/document_operation.rs | 4 + milli/src/update/new/indexer/mod.rs | 147 ++++++++++++++++-- milli/src/update/new/indexer/partial_dump.rs | 4 + .../update/new/indexer/update_by_function.rs | 6 +- 11 files changed, 380 insertions(+), 48 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 60393e51d..740528555 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -22,6 +22,7 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; +use std::sync::atomic::{self, AtomicU16, AtomicU32}; use bumpalo::collections::CollectIn; use bumpalo::Bump; @@ -30,6 +31,7 @@ use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; +use meilisearch_types::milli::update::new::indexer::document_changes::Progress; use meilisearch_types::milli::update::new::indexer::{ self, retrieve_or_guess_primary_key, UpdateByFunction, }; @@ -1221,6 +1223,40 @@ impl IndexScheduler { ) -> Result> { let indexer_alloc = Bump::new(); + let last_finished_steps = AtomicU16::new(0); + let last_finished_documents = AtomicU32::new(0); + + let send_progress = + |Progress { finished_steps, total_steps, step_name, finished_total_documents }| { + /* + let current = rayon::current_thread_index(); + + let last_finished_steps = + last_finished_steps.fetch_max(finished_steps, atomic::Ordering::Relaxed); + + if last_finished_steps > finished_steps { + return; + } + + if let Some((finished_documents, total_documents)) = finished_total_documents { + if last_finished_steps < finished_steps { + last_finished_documents.store(finished_documents, atomic::Ordering::Relaxed); + } else { + let last_finished_documents = last_finished_documents + .fetch_max(finished_documents, atomic::Ordering::Relaxed); + if last_finished_documents > finished_documents { + return; + } + } + tracing::warn!("Progress from {current:?}: {step_name} ({finished_steps}/{total_steps}), document {finished_documents}/{total_documents}") + } else { + tracing::warn!( + "Progress from {current:?}: {step_name} ({finished_steps}/{total_steps})" + ) + } + */ + }; + match operation { IndexOperation::DocumentClear { mut tasks, .. } => { let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?; @@ -1377,6 +1413,8 @@ impl IndexScheduler { &pool, &document_changes, embedders, + &|| must_stop_processing.get(), + &send_progress, )?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); @@ -1465,6 +1503,7 @@ impl IndexScheduler { let document_changes = indexer.into_changes(&primary_key)?; let embedders = index.embedding_configs(index_wtxn)?; let embedders = self.embedders(embedders)?; + let must_stop_processing = &self.must_stop_processing; indexer::index( index_wtxn, @@ -1475,6 +1514,8 @@ impl IndexScheduler { &pool, &document_changes, embedders, + &|| must_stop_processing.get(), + &send_progress, )?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); @@ -1604,6 +1645,7 @@ impl IndexScheduler { let document_changes = indexer.into_changes(&indexer_alloc, primary_key); let embedders = index.embedding_configs(index_wtxn)?; let embedders = self.embedders(embedders)?; + let must_stop_processing = &self.must_stop_processing; indexer::index( index_wtxn, @@ -1614,6 +1656,8 @@ impl IndexScheduler { &pool, &document_changes, embedders, + &|| must_stop_processing.get(), + &send_progress, )?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index f2cbad6ff..2d740f1a3 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -16,8 +16,8 @@ use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::extract::DocidsExtractor; use crate::update::new::indexer::document_changes::{ - for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, RefCellExt, ThreadLocal, + extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, + Progress, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -250,12 +250,19 @@ fn truncate_str(s: &str) -> &str { impl DocidsExtractor for FacetedDocidsExtractor { #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &mut ThreadLocal>>, - ) -> Result> { + finished_steps: u16, + total_steps: u16, + step_name: &'static str, + ) -> Result> + where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, + { let max_memory = grenad_parameters.max_memory_by_thread(); let index = indexing_context.index; @@ -276,12 +283,15 @@ impl DocidsExtractor for FacetedDocidsExtractor { grenad_parameters, max_memory, }; - for_each_document_change( + extract( document_changes, &extractor, indexing_context, extractor_allocs, &datastore, + finished_steps, + total_steps, + step_name, )?; } { diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index 8a18eb074..fb02b2c93 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -13,17 +13,25 @@ use grenad::Merger; pub use searchable::*; pub use vectors::EmbeddingExtractor; -use super::indexer::document_changes::{DocumentChanges, FullySend, IndexingContext, ThreadLocal}; +use super::indexer::document_changes::{ + DocumentChanges, FullySend, IndexingContext, Progress, ThreadLocal, +}; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::Result; pub trait DocidsExtractor { - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &mut ThreadLocal>>, - ) -> Result>; + finished_steps: u16, + total_steps: u16, + step_name: &'static str, + ) -> Result> + where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync; } /// TODO move in permissive json pointer diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 80f36b01d..b9e4803c7 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -12,8 +12,8 @@ use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ - for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, RefCellExt, ThreadLocal, + extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, + Progress, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -341,12 +341,19 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { pub struct WordDocidsExtractors; impl WordDocidsExtractors { - pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &mut ThreadLocal>>, - ) -> Result { + finished_steps: u16, + total_steps: u16, + step_name: &'static str, + ) -> Result + where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, + { let max_memory = grenad_parameters.max_memory_by_thread(); let index = indexing_context.index; @@ -391,12 +398,15 @@ impl WordDocidsExtractors { max_memory, }; - for_each_document_change( + extract( document_changes, &extractor, indexing_context, extractor_allocs, &datastore, + finished_steps, + total_steps, + step_name, )?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index dc429b1ba..e16e83167 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -17,8 +17,8 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; use crate::update::new::indexer::document_changes::{ - for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, ThreadLocal, + extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, + Progress, ThreadLocal, }; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -69,12 +69,19 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> } pub trait SearchableExtractor: Sized + Sync { - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &mut ThreadLocal>>, - ) -> Result> { + finished_steps: u16, + total_steps: u16, + step_name: &'static str, + ) -> Result> + where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, + { let max_memory = grenad_parameters.max_memory_by_thread(); let rtxn = indexing_context.index.read_txn()?; @@ -118,12 +125,15 @@ pub trait SearchableExtractor: Sized + Sync { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - for_each_document_change( + extract( document_changes, &extractor_data, indexing_context, extractor_allocs, &datastore, + finished_steps, + total_steps, + step_name, )?; } { @@ -168,17 +178,27 @@ pub trait SearchableExtractor: Sized + Sync { } impl DocidsExtractor for T { - fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &mut ThreadLocal>>, - ) -> Result> { + finished_steps: u16, + total_steps: u16, + step_name: &'static str, + ) -> Result> + where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, + { Self::run_extraction( grenad_parameters, document_changes, indexing_context, extractor_allocs, + finished_steps, + total_steps, + step_name, ) } } diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index fd16137b9..aad190269 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -9,7 +9,7 @@ use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; pub trait RefCellExt { fn try_borrow_or_yield(&self) -> std::result::Result, std::cell::BorrowError>; @@ -335,6 +335,12 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator>; + fn len(&self) -> usize; + + fn is_empty(&self) -> bool { + self.len() == 0 + } + fn item_to_document_change<'doc, // lifetime of a single `process` call T: MostlySend>( &'doc self, @@ -344,22 +350,72 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload ; } -#[derive(Clone, Copy)] pub struct IndexingContext< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index -> { + MSP, + SP, +> where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, +{ pub index: &'index Index, pub db_fields_ids_map: &'indexer FieldsIdsMap, pub new_fields_ids_map: &'fid RwLock, pub doc_allocs: &'indexer ThreadLocal>>, pub fields_ids_map_store: &'indexer ThreadLocal>>>, + pub must_stop_processing: &'indexer MSP, + pub send_progress: &'indexer SP, +} + +impl< + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation + 'index, // covariant lifetime of the index + MSP, + SP, + > Copy + for IndexingContext< + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation + 'index, // covariant lifetime of the index + MSP, + SP, + > +where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, +{ +} + +impl< + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation + 'index, // covariant lifetime of the index + MSP, + SP, + > Clone + for IndexingContext< + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation + 'index, // covariant lifetime of the index + MSP, + SP, + > +where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, +{ + fn clone(&self) -> Self { + *self + } } const CHUNK_SIZE: usize = 100; -pub fn for_each_document_change< +#[allow(clippy::too_many_arguments)] +pub fn extract< 'pl, // covariant lifetime of the underlying payload 'extractor, // invariant lifetime of extractor_alloc 'fid, // invariant lifetime of fields ids map @@ -368,6 +424,8 @@ pub fn for_each_document_change< 'index, // covariant lifetime of the index EX, DC: DocumentChanges<'pl>, + MSP, + SP, >( document_changes: &DC, extractor: &EX, @@ -377,20 +435,29 @@ pub fn for_each_document_change< new_fields_ids_map, doc_allocs, fields_ids_map_store, - }: IndexingContext<'fid, 'indexer, 'index>, + must_stop_processing, + send_progress, + }: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>>, datastore: &'data ThreadLocal, + finished_steps: u16, + total_steps: u16, + step_name: &'static str, ) -> Result<()> where EX: Extractor<'extractor>, + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, { // Clean up and reuse the extractor allocs for extractor_alloc in extractor_allocs.iter_mut() { extractor_alloc.0.get_mut().reset(); } + let total_documents = document_changes.len(); + let pi = document_changes.iter(CHUNK_SIZE); - pi.try_arc_for_each_try_init( + pi.enumerate().try_arc_for_each_try_init( || { DocumentChangeContext::new( index, @@ -403,7 +470,19 @@ where move |index_alloc| extractor.init_data(index_alloc), ) }, - |context, items| { + |context, (finished_documents, items)| { + if (must_stop_processing)() { + return Err(Arc::new(InternalError::AbortedIndexation.into())); + } + let finished_documents = finished_documents * CHUNK_SIZE; + + (send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: Some((finished_documents as u32, total_documents as u32)), + }); + // Clean up and reuse the document-specific allocator context.doc_alloc.reset(); @@ -419,5 +498,21 @@ where res }, - ) + )?; + + (send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: Some((total_documents as u32, total_documents as u32)), + }); + + Ok(()) +} + +pub struct Progress { + pub finished_steps: u16, + pub total_steps: u16, + pub step_name: &'static str, + pub finished_total_documents: Option<(u32, u32)>, } diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index d193b65fa..130560a44 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -75,6 +75,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id)))) } + + fn len(&self) -> usize { + self.to_delete.len() + } } #[cfg(test)] @@ -89,8 +93,7 @@ mod test { use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::tests::TempIndex; use crate::update::new::indexer::document_changes::{ - for_each_document_change, DocumentChangeContext, Extractor, IndexingContext, MostlySend, - ThreadLocal, + extract, DocumentChangeContext, Extractor, IndexingContext, MostlySend, ThreadLocal, }; use crate::update::new::indexer::DocumentDeletion; use crate::update::new::DocumentChange; @@ -165,17 +168,22 @@ mod test { new_fields_ids_map: &fields_ids_map, doc_allocs: &doc_allocs, fields_ids_map_store: &fields_ids_map_store, + must_stop_processing: &(|| false), + send_progress: &(|_progress| {}), }; for _ in 0..3 { let datastore = ThreadLocal::new(); - for_each_document_change( + extract( &changes, &deletion_tracker, context, &mut extractor_allocs, &datastore, + 0, + 1, + "test", ) .unwrap(); diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 9ba74c69e..c0f1ffbdd 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -241,6 +241,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { )?; Ok(change) } + + fn len(&self) -> usize { + self.docids_version_offsets.len() + } } trait MergeChanges { diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 62ad05813..3bee9904f 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -5,8 +5,8 @@ use std::thread::{self, Builder}; use big_s::S; use bumpalo::Bump; use document_changes::{ - for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, RefCellExt, ThreadLocal, + extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, + Progress, RefCellExt, ThreadLocal, }; pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; @@ -72,7 +72,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { context: &DocumentChangeContext, ) -> Result<()> { let mut document_buffer = Vec::new(); - let mut field_distribution_delta = context.data.0.borrow_mut(); + let mut field_distribution_delta = context.data.0.borrow_mut_or_yield(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); @@ -155,13 +155,70 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { } } +mod steps { + pub const STEPS: &[&str] = &[ + "extracting documents", + "extracting facets", + "extracting words", + "extracting word proximity", + "extracting embeddings", + "writing to database", + "post-processing facets", + "post-processing words", + "finalizing", + ]; + + const fn step(step: u16) -> (u16, &'static str) { + (step, STEPS[step as usize]) + } + + pub const fn total_steps() -> u16 { + STEPS.len() as u16 + } + + pub const fn extract_documents() -> (u16, &'static str) { + step(0) + } + + pub const fn extract_facets() -> (u16, &'static str) { + step(1) + } + + pub const fn extract_words() -> (u16, &'static str) { + step(2) + } + + pub const fn extract_word_proximity() -> (u16, &'static str) { + step(3) + } + + pub const fn extract_embeddings() -> (u16, &'static str) { + step(4) + } + + pub const fn write_db() -> (u16, &'static str) { + step(5) + } + + pub const fn post_processing_facets() -> (u16, &'static str) { + step(6) + } + pub const fn post_processing_words() -> (u16, &'static str) { + step(7) + } + + pub const fn finalizing() -> (u16, &'static str) { + step(8) + } +} + /// This is the main function of this crate. /// /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// /// TODO return stats #[allow(clippy::too_many_arguments)] // clippy: 😝 -pub fn index<'pl, 'indexer, 'index, DC>( +pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( wtxn: &mut RwTxn, index: &'index Index, db_fields_ids_map: &'indexer FieldsIdsMap, @@ -170,9 +227,13 @@ pub fn index<'pl, 'indexer, 'index, DC>( pool: &ThreadPool, document_changes: &DC, embedders: EmbeddingConfigs, + must_stop_processing: &'indexer MSP, + send_progress: &'indexer SP, ) -> Result<()> where DC: DocumentChanges<'pl>, + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, { let (merger_sender, writer_receiver) = merger_writer_channel(10_000); // This channel acts as a rendezvous point to ensure that we are one task ahead @@ -194,8 +255,12 @@ where new_fields_ids_map: &new_fields_ids_map, doc_allocs: &doc_allocs, fields_ids_map_store: &fields_ids_map_store, + must_stop_processing, + send_progress, }; + let total_steps = steps::total_steps(); + let mut field_distribution = index.field_distribution(wtxn)?; thread::scope(|s| -> Result<()> { @@ -213,7 +278,8 @@ where let document_sender = extractor_sender.document_sender(); let document_extractor = DocumentExtractor { document_sender: &document_sender, embedders }; let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; + let (finished_steps, step_name) = steps::extract_documents(); + extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; for field_distribution_delta in datastore { let field_distribution_delta = field_distribution_delta.0.into_inner(); @@ -238,22 +304,29 @@ where { let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let _entered = span.enter(); + let (finished_steps, step_name) = steps::extract_facets(); extract_and_send_docids::< _, FacetedDocidsExtractor, FacetDocids, + _, + _ >( grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender, + finished_steps, + total_steps, + step_name )?; } { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let _entered = span.enter(); + let (finished_steps, step_name) = steps::extract_words(); let WordDocidsMergers { word_fid_docids, @@ -261,7 +334,7 @@ where exact_word_docids, word_position_docids, fid_word_count_docids, - } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; + } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?; extractor_sender.send_searchable::(word_docids).unwrap(); extractor_sender.send_searchable::(word_fid_docids).unwrap(); extractor_sender.send_searchable::(exact_word_docids).unwrap(); @@ -276,16 +349,24 @@ where if proximity_precision == ProximityPrecision::ByWord { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let _entered = span.enter(); + let (finished_steps, step_name) = steps::extract_word_proximity(); + + extract_and_send_docids::< _, WordPairProximityDocidsExtractor, WordPairProximityDocids, + _, + _ >( grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender, + finished_steps, + total_steps, + step_name, )?; } @@ -301,8 +382,10 @@ where let embedding_sender = todo!(); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, &field_distribution, request_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let (finished_steps, step_name) = steps::extract_embeddings(); - for_each_document_change(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore)?; + + extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; let mut user_provided = HashMap::new(); @@ -325,6 +408,8 @@ where { let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); let _entered = span.enter(); + let (finished_steps, step_name) = steps::write_db(); + (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); } // TODO THIS IS TOO MUCH @@ -501,15 +586,38 @@ where /// TODO handle the panicking threads handle.join().unwrap()?; let merger_result = merger_thread.join().unwrap()?; + let (finished_steps, step_name) = steps::post_processing_facets(); + (indexing_context.send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: None, + }); if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta { compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; } + let (finished_steps, step_name) = steps::post_processing_words(); + (indexing_context.send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: None, + }); + if let Some(prefix_delta) = merger_result.prefix_delta { compute_prefix_database(index, wtxn, prefix_delta)?; } + let (finished_steps, step_name) = steps::finalizing(); + (indexing_context.send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: None, + }); + Ok(()) as Result<_> })?; @@ -585,6 +693,7 @@ fn compute_facet_level_database( /// TODO: GrenadParameters::default() should be removed in favor a passed parameter /// TODO: manage the errors correctly /// TODO: we must have a single trait that also gives the extractor type +#[allow(clippy::too_many_arguments)] fn extract_and_send_docids< 'pl, 'fid, @@ -593,15 +702,31 @@ fn extract_and_send_docids< DC: DocumentChanges<'pl>, E: DocidsExtractor, D: MergerOperationType, + MSP, + SP, >( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &mut ThreadLocal>>, sender: &ExtractorSender, -) -> Result<()> { - let merger = - E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?; + finished_steps: u16, + total_steps: u16, + step_name: &'static str, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, +{ + let merger = E::run_extraction( + grenad_parameters, + document_changes, + indexing_context, + extractor_allocs, + finished_steps, + total_steps, + step_name, + )?; sender.send_searchable::(merger).unwrap(); Ok(()) } diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 3913098ec..e58141af7 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -79,4 +79,8 @@ where let insertion = Insertion::create(docid, external_document_id, Versions::single(document)); Ok(Some(DocumentChange::Insertion(insertion))) } + + fn len(&self) -> usize { + self.iter.len() + } } diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index b08f8c380..3eb0cc306 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -9,7 +9,7 @@ use super::DocumentChanges; use crate::documents::Error::InvalidDocumentFormat; use crate::documents::PrimaryKey; use crate::error::{FieldIdMapMissingEntry, InternalError}; -use crate::update::new::document::{DocumentFromVersions, Versions}; +use crate::update::new::document::Versions; use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; @@ -176,6 +176,10 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { }, } } + + fn len(&self) -> usize { + self.documents.len() + } } fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result {