diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index d2681c915..d1d64814e 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -93,6 +93,7 @@ pub struct WriterBbqueueReceiver<'a> { } /// The action to perform on the receiver/writer side. +#[derive(Debug)] pub enum ReceiverAction { /// Wake up, you have frames to read for the BBQueue buffers. WakeUp, @@ -599,6 +600,7 @@ impl DatabaseType for WordPositionDocids { const DATABASE: Database = Database::WordPositionDocids; } +#[derive(Clone, Copy)] pub struct WordDocidsSender<'a, 'b, D> { sender: &'a ExtractorBbqueueSender<'b>, _marker: PhantomData, @@ -621,6 +623,7 @@ impl WordDocidsSender<'_, '_, D> { } } +#[derive(Clone, Copy)] pub struct FacetDocidsSender<'a, 'b> { sender: &'a ExtractorBbqueueSender<'b>, } @@ -667,6 +670,7 @@ impl FacetDocidsSender<'_, '_> { } } +#[derive(Clone, Copy)] pub struct FieldIdDocidFacetSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl FieldIdDocidFacetSender<'_, '_> { @@ -691,6 +695,7 @@ impl FieldIdDocidFacetSender<'_, '_> { } } +#[derive(Clone, Copy)] pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl DocumentsSender<'_, '_> { @@ -716,6 +721,7 @@ impl DocumentsSender<'_, '_> { } } +#[derive(Clone, Copy)] pub struct EmbeddingSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl EmbeddingSender<'_, '_> { @@ -741,6 +747,7 @@ impl EmbeddingSender<'_, '_> { } } +#[derive(Clone, Copy)] pub struct GeoSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl GeoSender<'_, '_> { diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 9ad37d52c..490dada65 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -25,14 +25,14 @@ use crate::update::new::DocumentChange; use crate::update::GrenadParameters; use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; -pub struct FacetedExtractorData<'a> { +pub struct FacetedExtractorData<'a, 'b> { attributes_to_extract: &'a [&'a str], - sender: &'a FieldIdDocidFacetSender<'a>, + sender: &'a FieldIdDocidFacetSender<'a, 'b>, grenad_parameters: GrenadParameters, buckets: usize, } -impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> { +impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b> { type Data = RefCell>; fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result { diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 42278d443..1110432fa 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -18,17 +18,17 @@ use crate::vector::error::{ use crate::vector::{Embedder, Embedding, EmbeddingConfigs}; use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError}; -pub struct EmbeddingExtractor<'a> { +pub struct EmbeddingExtractor<'a, 'b> { embedders: &'a EmbeddingConfigs, - sender: EmbeddingSender<'a>, + sender: EmbeddingSender<'a, 'b>, possible_embedding_mistakes: PossibleEmbeddingMistakes, threads: &'a ThreadPoolNoAbort, } -impl<'a> EmbeddingExtractor<'a> { +impl<'a, 'b> EmbeddingExtractor<'a, 'b> { pub fn new( embedders: &'a EmbeddingConfigs, - sender: EmbeddingSender<'a>, + sender: EmbeddingSender<'a, 'b>, field_distribution: &'a FieldDistribution, threads: &'a ThreadPoolNoAbort, ) -> Self { @@ -43,7 +43,7 @@ pub struct EmbeddingExtractorData<'extractor>( unsafe impl MostlySend for EmbeddingExtractorData<'_> {} -impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { +impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> { type Data = RefCell>; fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result { @@ -76,7 +76,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { context.data, &self.possible_embedding_mistakes, self.threads, - &self.sender, + self.sender, &context.doc_alloc, )) } @@ -259,7 +259,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { // Currently this is the case as: // 1. BVec are inside of the bumaplo // 2. All other fields are either trivial (u8) or references. -struct Chunks<'a, 'extractor> { +struct Chunks<'a, 'b, 'extractor> { texts: BVec<'a, &'a str>, ids: BVec<'a, DocumentId>, @@ -270,11 +270,11 @@ struct Chunks<'a, 'extractor> { possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, user_provided: &'a RefCell>, threads: &'a ThreadPoolNoAbort, - sender: &'a EmbeddingSender<'a>, + sender: EmbeddingSender<'a, 'b>, has_manual_generation: Option<&'a str>, } -impl<'a, 'extractor> Chunks<'a, 'extractor> { +impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { #[allow(clippy::too_many_arguments)] pub fn new( embedder: &'a Embedder, @@ -284,7 +284,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> { user_provided: &'a RefCell>, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, threads: &'a ThreadPoolNoAbort, - sender: &'a EmbeddingSender<'a>, + sender: EmbeddingSender<'a, 'b>, doc_alloc: &'a Bump, ) -> Self { let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint(); @@ -368,7 +368,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> { possible_embedding_mistakes: &PossibleEmbeddingMistakes, unused_vectors_distribution: &UnusedVectorsDistributionBump, threads: &ThreadPoolNoAbort, - sender: EmbeddingSender<'a>, + sender: EmbeddingSender<'a, 'b>, has_manual_generation: Option<&'a str>, ) -> Result<()> { if let Some(external_docid) = has_manual_generation { diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 1fd60b610..982868d93 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -80,7 +80,7 @@ where let bbbuffers: Vec<_> = (0..rayon::current_num_threads()) .map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread .collect(); - let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers, 1000); + let (extractor_sender, mut writer_receiver) = extractor_writer_bbqueue(&bbbuffers, 1000); let finished_extraction = AtomicBool::new(false); let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; @@ -302,7 +302,7 @@ where } let embedding_sender = extractor_sender.embeddings(); - let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); + let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads()); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); @@ -363,7 +363,6 @@ where let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let vector_arroy = index.vector_arroy; - let mut rng = rand::rngs::StdRng::seed_from_u64(42); let indexer_span = tracing::Span::current(); let arroy_writers: Result> = embedders .inner_as_ref() @@ -490,6 +489,7 @@ where Step::WritingEmbeddingsToDatabase, )); + let mut rng = rand::rngs::StdRng::seed_from_u64(42); for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers { let dimensions = *dimensions; writer.build_and_quantize(