From cc63802115d864ce169a3f86cf669ed356f8167d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 27 Nov 2024 14:58:03 +0100 Subject: [PATCH] Modify and return the IndexEmbeddings to write them later --- crates/milli/src/update/new/indexer/mod.rs | 25 +++++++++++----------- crates/milli/src/update/new/steps.rs | 4 ++-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 835ee240b..89c1b850d 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -117,7 +117,6 @@ where let rtxn = index.read_txn()?; - // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.documents(); let document_extractor = DocumentsExtractor::new(document_sender, embedders); @@ -180,10 +179,6 @@ where } { - - - - let WordDocidsCaches { word_docids, word_fid_docids, @@ -296,7 +291,6 @@ where } 'vectors: { - if index_embeddings.is_empty() { break 'vectors; } @@ -308,7 +302,14 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); let _entered = span.enter(); - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; + extract( + document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + Step::ExtractingEmbeddings, + )?; } { let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); @@ -357,7 +358,7 @@ where finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); - Result::Ok(facet_field_ids_delta) + Result::Ok((facet_field_ids_delta, index_embeddings)) })?; let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); @@ -442,6 +443,10 @@ where )?; } + (indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors)); + + let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?; + 'vectors: { let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); @@ -470,10 +475,6 @@ where index.put_embedding_configs(wtxn, index_embeddings)?; } - (indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors)); - - let facet_field_ids_delta = extractor_handle.join().unwrap()?; - (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingFacets)); if index.facet_search(wtxn)? { diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index 7c2441933..bee1be260 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -11,8 +11,8 @@ pub enum Step { ExtractingEmbeddings, WritingGeoPoints, WritingToDatabase, - WritingEmbeddingsToDatabase, WaitingForExtractors, + WritingEmbeddingsToDatabase, PostProcessingFacets, PostProcessingWords, Finalizing, @@ -29,8 +29,8 @@ impl Step { Step::ExtractingEmbeddings => "extracting embeddings", Step::WritingGeoPoints => "writing geo points", Step::WritingToDatabase => "writing to database", - Step::WritingEmbeddingsToDatabase => "writing embeddings to database", Step::WaitingForExtractors => "waiting for extractors", + Step::WritingEmbeddingsToDatabase => "writing embeddings to database", Step::PostProcessingFacets => "post-processing facets", Step::PostProcessingWords => "post-processing words", Step::Finalizing => "finalizing",