diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 0f83ff79f..81596f3fe 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -57,6 +57,7 @@ mod steps { "extracting word proximity", "extracting embeddings", "writing to database", + "writing embeddings to database", "post-processing facets", "post-processing words", "finalizing", @@ -94,15 +95,19 @@ mod steps { step(5) } - pub const fn post_processing_facets() -> (u16, &'static str) { + pub const fn write_embedding_db() -> (u16, &'static str) { step(6) } - pub const fn post_processing_words() -> (u16, &'static str) { + + pub const fn post_processing_facets() -> (u16, &'static str) { step(7) } + pub const fn post_processing_words() -> (u16, &'static str) { + step(8) + } pub const fn finalizing() -> (u16, &'static str) { - step(8) + step(9) } } @@ -239,6 +244,7 @@ where index.word_docids.remap_types(), index, extractor_sender.docids::(), + &indexing_context.must_stop_processing, )?; } @@ -251,7 +257,8 @@ where word_fid_docids, index.word_fid_docids.remap_types(), index, - extractor_sender.docids::() + extractor_sender.docids::(), + &indexing_context.must_stop_processing, )?; } @@ -265,6 +272,7 @@ where index.exact_word_docids.remap_types(), index, extractor_sender.docids::(), + &indexing_context.must_stop_processing, )?; } @@ -278,6 +286,7 @@ where index.word_position_docids.remap_types(), index, extractor_sender.docids::(), + &indexing_context.must_stop_processing, )?; } @@ -291,6 +300,7 @@ where index.field_id_word_count_docids.remap_types(), index, extractor_sender.docids::(), + &indexing_context.must_stop_processing, )?; } } @@ -310,6 +320,7 @@ where index.word_pair_proximity_docids.remap_types(), index, extractor_sender.docids::(), + &indexing_context.must_stop_processing, )?; } @@ -376,8 +387,6 @@ where let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); - let indexer_span = tracing::Span::current(); - let vector_arroy = index.vector_arroy; let mut rng = rand::rngs::StdRng::seed_from_u64(42); let indexer_span = tracing::Span::current(); @@ -450,6 +459,15 @@ where ArroyOperation::Finish { mut user_provided } => { let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); let _entered = span.enter(); + + let (finished_steps, step_name) = steps::write_embedding_db(); + (indexing_context.send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: None, + }); + for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers { diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index b1c5c5fd9..4eca113ea 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -12,7 +12,10 @@ use super::extract::{ merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, }; use super::DocumentChange; -use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; +use crate::{ + CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, InternalError, + Result, +}; pub struct GeoExtractor { rtree: Option>, @@ -63,15 +66,22 @@ impl GeoExtractor { } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -pub fn merge_and_send_docids<'extractor>( +pub fn merge_and_send_docids<'extractor, MSP>( mut caches: Vec>, database: Database, index: &Index, docids_sender: impl DocidsSender + Sync, -) -> Result<()> { + must_stop_processing: &MSP, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, +{ transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { let rtxn = index.read_txn()?; let mut buffer = Vec::new(); + if must_stop_processing() { + return Err(InternalError::AbortedIndexation.into()); + } merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? {