diff --git a/milli/src/update/new/extract/vectors/mod.rs b/milli/src/update/new/extract/vectors/mod.rs index facec10f6..92c355710 100644 --- a/milli/src/update/new/extract/vectors/mod.rs +++ b/milli/src/update/new/extract/vectors/mod.rs @@ -11,13 +11,16 @@ use crate::update::new::channel::EmbeddingSender; use crate::update::new::indexer::document_changes::{Extractor, FullySend}; use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; -use crate::vector::error::EmbedErrorKind; +use crate::vector::error::{ + EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump, +}; use crate::vector::{Embedder, Embedding, EmbeddingConfigs}; -use crate::{DocumentId, InternalError, Result, ThreadPoolNoAbort, UserError}; +use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError}; pub struct EmbeddingExtractor<'a> { embedders: &'a EmbeddingConfigs, sender: &'a EmbeddingSender<'a>, + possible_embedding_mistakes: PossibleEmbeddingMistakes, threads: &'a ThreadPoolNoAbort, } @@ -25,9 +28,11 @@ impl<'a> EmbeddingExtractor<'a> { pub fn new( embedders: &'a EmbeddingConfigs, sender: &'a EmbeddingSender<'a>, + field_distribution: &'a FieldDistribution, threads: &'a ThreadPoolNoAbort, ) -> Self { - Self { embedders, sender, threads } + let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); + Self { embedders, sender, threads, possible_embedding_mistakes } } } @@ -50,6 +55,8 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { >, ) -> crate::Result<()> { let embedders = self.embedders.inner_as_ref(); + let mut unused_vectors_distribution = + UnusedVectorsDistributionBump::new_in(&context.doc_alloc); let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc); for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { @@ -66,6 +73,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { embedder_name, prompt, &context.data.0, + &self.possible_embedding_mistakes, self.threads, self.sender, &context.doc_alloc, @@ -87,6 +95,10 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { )?; let new_vectors = update.updated_vectors(&context.doc_alloc)?; + if let Some(new_vectors) = &new_vectors { + unused_vectors_distribution.append(new_vectors); + } + for chunks in &mut all_chunks { let embedder_name = chunks.embedder_name(); let prompt = chunks.prompt(); @@ -128,7 +140,11 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { &context.doc_alloc, )?; if new_rendered != old_rendered { - chunks.set_autogenerated(update.docid(), new_rendered)?; + chunks.set_autogenerated( + update.docid(), + new_rendered, + &unused_vectors_distribution, + )?; } } } else if old_vectors.regenerate { @@ -151,17 +167,25 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { &context.doc_alloc, )?; if new_rendered != old_rendered { - chunks.set_autogenerated(update.docid(), new_rendered)?; + chunks.set_autogenerated( + update.docid(), + new_rendered, + &unused_vectors_distribution, + )?; } } } } DocumentChange::Insertion(insertion) => { + let new_vectors = insertion.inserted_vectors(&context.doc_alloc)?; + if let Some(new_vectors) = &new_vectors { + unused_vectors_distribution.append(new_vectors); + } + for chunks in &mut all_chunks { let embedder_name = chunks.embedder_name(); let prompt = chunks.prompt(); // if no inserted vectors, then regenerate: true + no embeddings => autogenerate - let new_vectors = insertion.inserted_vectors(&context.doc_alloc)?; if let Some(new_vectors) = new_vectors.as_ref().and_then(|new_vectors| { new_vectors.vectors_for_key(embedder_name).transpose() }) { @@ -178,7 +202,11 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { context.new_fields_ids_map, &context.doc_alloc, )?; - chunks.set_autogenerated(insertion.docid(), rendered)?; + chunks.set_autogenerated( + insertion.docid(), + rendered, + &unused_vectors_distribution, + )?; } } else { let rendered = prompt.render_document( @@ -186,7 +214,11 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { context.new_fields_ids_map, &context.doc_alloc, )?; - chunks.set_autogenerated(insertion.docid(), rendered)?; + chunks.set_autogenerated( + insertion.docid(), + rendered, + &unused_vectors_distribution, + )?; } } } @@ -194,7 +226,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { } for chunk in all_chunks { - chunk.drain()?; + chunk.drain(&unused_vectors_distribution)?; } Ok(()) } @@ -215,7 +247,7 @@ struct Chunks<'a> { embedder_id: u8, embedder_name: &'a str, prompt: &'a Prompt, - + possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, user_provided: &'a RefCell>, threads: &'a ThreadPoolNoAbort, sender: &'a EmbeddingSender<'a>, @@ -229,6 +261,7 @@ impl<'a> Chunks<'a> { embedder_name: &'a str, prompt: &'a Prompt, user_provided: &'a RefCell>, + possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, threads: &'a ThreadPoolNoAbort, sender: &'a EmbeddingSender<'a>, doc_alloc: &'a Bump, @@ -241,6 +274,7 @@ impl<'a> Chunks<'a> { ids, embedder, prompt, + possible_embedding_mistakes, threads, sender, embedder_id, @@ -249,7 +283,12 @@ impl<'a> Chunks<'a> { } } - pub fn set_autogenerated(&mut self, docid: DocumentId, rendered: &'a str) -> Result<()> { + pub fn set_autogenerated( + &mut self, + docid: DocumentId, + rendered: &'a str, + unused_vectors_distribution: &UnusedVectorsDistributionBump, + ) -> Result<()> { if self.texts.len() < self.texts.capacity() { self.texts.push(rendered); self.ids.push(docid); @@ -262,18 +301,25 @@ impl<'a> Chunks<'a> { self.embedder, self.embedder_id, self.embedder_name, + self.possible_embedding_mistakes, + unused_vectors_distribution, self.threads, self.sender, ) } - pub fn drain(mut self) -> Result<()> { + pub fn drain( + mut self, + unused_vectors_distribution: &UnusedVectorsDistributionBump, + ) -> Result<()> { let res = Self::embed_chunks( &mut self.texts, &mut self.ids, self.embedder, self.embedder_id, self.embedder_name, + self.possible_embedding_mistakes, + unused_vectors_distribution, self.threads, self.sender, ); @@ -285,11 +331,13 @@ impl<'a> Chunks<'a> { pub fn embed_chunks( texts: &mut BVec<'a, &'a str>, ids: &mut BVec<'a, DocumentId>, - embedder: &'a Embedder, + embedder: &Embedder, embedder_id: u8, embedder_name: &str, - threads: &'a ThreadPoolNoAbort, - sender: &'a EmbeddingSender<'a>, + possible_embedding_mistakes: &PossibleEmbeddingMistakes, + unused_vectors_distribution: &UnusedVectorsDistributionBump, + threads: &ThreadPoolNoAbort, + sender: &EmbeddingSender<'a>, ) -> Result<()> { let res = match embedder.embed_chunks_ref(texts.as_slice(), threads) { Ok(embeddings) => { @@ -312,25 +360,23 @@ impl<'a> Chunks<'a> { msg += &format!("\n- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`."); } - /// FIXME: reintroduce possible_embedding_mistakes and possible_embedding_mistakes let mut hint_count = 0; - /* - for (vector_misspelling, count) in - possible_embedding_mistakes.vector_mistakes().take(2) - { - msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s)."); - hint_count += 1; - } + for (vector_misspelling, count) in + possible_embedding_mistakes.vector_mistakes().take(2) + { + msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s)."); + hint_count += 1; + } + + for (embedder_misspelling, count) in possible_embedding_mistakes + .embedder_mistakes_bump(embedder_name, unused_vectors_distribution) + .take(2) + { + msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s)."); + hint_count += 1; + } - for (embedder_misspelling, count) in possible_embedding_mistakes - .embedder_mistakes(embedder_name, unused_vectors_distribution) - .take(2) - { - msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s)."); - hint_count += 1; - } - */ if hint_count == 0 { if let EmbedErrorKind::ManualEmbed(_) = &error.kind { msg += &format!( diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index dd0ff781d..f6a50ef1c 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -296,7 +296,7 @@ where } /// FIXME: need access to `merger_sender` let embedding_sender = todo!(); - let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, request_threads()); + let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, &field_distribution, request_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); for_each_document_change(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore)?; diff --git a/milli/src/vector/error.rs b/milli/src/vector/error.rs index 3c8cb4b06..d5e0697d6 100644 --- a/milli/src/vector/error.rs +++ b/milli/src/vector/error.rs @@ -1,11 +1,13 @@ use std::collections::BTreeMap; use std::path::PathBuf; +use bumpalo::Bump; use hf_hub::api::sync::ApiError; use super::parsed_vectors::ParsedVectorsDiff; use super::rest::ConfigurationSource; use crate::error::FaultSource; +use crate::update::new::vector_document::VectorDocument; use crate::{FieldDistribution, PanicCatched}; #[derive(Debug, thiserror::Error)] @@ -417,6 +419,23 @@ impl PossibleEmbeddingMistakes { } }) } + + pub fn embedder_mistakes_bump<'a, 'doc: 'a>( + &'a self, + embedder_name: &'a str, + unused_vectors_distribution: &'a UnusedVectorsDistributionBump<'doc>, + ) -> impl Iterator + 'a { + let builder = levenshtein_automata::LevenshteinAutomatonBuilder::new(2, true); + let automata = builder.build_dfa(embedder_name); + + unused_vectors_distribution.0.iter().filter_map(move |(field, count)| { + match automata.eval(field) { + levenshtein_automata::Distance::Exact(0) => None, + levenshtein_automata::Distance::Exact(_) => Some((*field, *count)), + levenshtein_automata::Distance::AtLeast(_) => None, + } + }) + } } #[derive(Default)] @@ -433,3 +452,23 @@ impl UnusedVectorsDistribution { } } } + +pub struct UnusedVectorsDistributionBump<'doc>( + hashbrown::HashMap<&'doc str, u64, hashbrown::hash_map::DefaultHashBuilder, &'doc Bump>, +); + +impl<'doc> UnusedVectorsDistributionBump<'doc> { + pub fn new_in(doc_alloc: &'doc Bump) -> Self { + Self(hashbrown::HashMap::new_in(doc_alloc)) + } + + pub fn append(&mut self, vectors: &impl VectorDocument<'doc>) -> Result<(), crate::Error> { + for res in vectors.iter_vectors() { + let (embedder_name, entry) = res?; + if !entry.has_configured_embedder { + *self.0.entry(embedder_name).or_default() += 1; + } + } + Ok(()) + } +}