diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index a4896ee3f..5b91ae77f 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -2,12 +2,11 @@ use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; -use hashbrown::HashMap; use heed::types::Bytes; -use roaring::RoaringBitmap; use super::extract::FacetKind; use super::StdResult; +use crate::index::IndexEmbeddingConfig; use crate::update::new::KvReaderFieldId; use crate::vector::Embedding; use crate::{DocumentId, Index}; @@ -87,7 +86,7 @@ pub enum ArroyOperation { embedding: Embedding, }, Finish { - user_provided: HashMap, + configs: Vec, }, } @@ -418,12 +417,9 @@ impl EmbeddingSender<'_> { } /// Marks all embedders as "to be built" - pub fn finish( - self, - user_provided: HashMap, - ) -> StdResult<(), SendError<()>> { + pub fn finish(self, configs: Vec) -> StdResult<(), SendError<()>> { self.0 - .send(WriterOperation::ArroyOperation(ArroyOperation::Finish { user_provided })) + .send(WriterOperation::ArroyOperation(ArroyOperation::Finish { configs })) .map_err(|_| SendError(())) } } diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 55121fb14..df8e2ed09 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -85,8 +85,13 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { for change in changes { let change = change?; match change { - DocumentChange::Deletion(_deletion) => { - // handled by document sender + DocumentChange::Deletion(deletion) => { + // vector deletion is handled by document sender, + // we still need to accomodate deletion from user_provided + for chunks in &mut all_chunks { + // regenerate: true means we delete from user_provided + chunks.set_regenerate(deletion.docid(), true); + } } DocumentChange::Update(update) => { let old_vectors = update.current_vectors( @@ -423,9 +428,9 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> { let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default(); if regenerate { // regenerate == !user_provided - user_provided.del.get_or_insert(Default::default()).insert(docid); + user_provided.insert_del_u32(docid); } else { - user_provided.add.get_or_insert(Default::default()).insert(docid); + user_provided.insert_add_u32(docid); } } diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index ca61a9b7b..6d1d0eea8 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -342,35 +342,28 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); let _entered = span.enter(); - let index_embeddings = index.embedding_configs(&rtxn)?; + let mut index_embeddings = index.embedding_configs(&rtxn)?; if index_embeddings.is_empty() { break 'vectors; } let embedding_sender = extractor_sender.embeddings(); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let (finished_steps, step_name) = steps::extract_embeddings(); extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - - let mut user_provided = HashMap::new(); - for data in datastore { - let data = data.into_inner().0; - for (embedder, deladd) in data.into_iter() { - let user_provided = user_provided.entry(embedder).or_insert(Default::default()); - if let Some(del) = deladd.del { - *user_provided -= del; - } - if let Some(add) = deladd.add { - *user_provided |= add; - } + for config in &mut index_embeddings { + 'data: for data in datastore.iter_mut() { + let data = &mut data.get_mut().0; + let Some(deladd) = data.remove(&config.name) else { continue 'data; }; + deladd.apply_to(&mut config.user_provided); } } - embedding_sender.finish(user_provided).unwrap(); + embedding_sender.finish(index_embeddings).unwrap(); } // TODO THIS IS TOO MUCH @@ -472,7 +465,7 @@ where writer.del_items(wtxn, *dimensions, docid)?; writer.add_item(wtxn, docid, &embedding)?; } - ArroyOperation::Finish { mut user_provided } => { + ArroyOperation::Finish { configs } => { let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); let _entered = span.enter(); @@ -497,14 +490,6 @@ where )?; } - let mut configs = index.embedding_configs(wtxn)?; - - for config in &mut configs { - if let Some(user_provided) = user_provided.remove(&config.name) { - config.user_provided = user_provided; - } - } - index.put_embedding_configs(wtxn, configs)?; } },