use std::collections::BTreeSet; use bumpalo::Bump; use deserr::{Deserr, IntoValue}; use heed::RoTxn; use raw_collections::RawMap; use serde::Serialize; use serde_json::value::RawValue; use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions}; use super::indexer::de::DeserrRawValue; use crate::documents::FieldIdMapper; use crate::index::IndexEmbeddingConfig; use crate::vector::parsed_vectors::{ RawVectors, RawVectorsError, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfigs}; use crate::{DocumentId, Index, InternalError, Result, UserError}; #[derive(Serialize)] #[serde(untagged)] pub enum Embeddings<'doc> { FromJsonExplicit(&'doc RawValue), FromJsonImplicityUserProvided(&'doc RawValue), FromDb(Vec), } impl<'doc> Embeddings<'doc> { pub fn into_vec( self, doc_alloc: &'doc Bump, embedder_name: &str, ) -> std::result::Result, deserr::errors::JsonError> { match self { Embeddings::FromJsonExplicit(value) => { let vectors_ref = deserr::ValuePointerRef::Key { key: RESERVED_VECTORS_FIELD_NAME, prev: &deserr::ValuePointerRef::Origin, }; let embedders_ref = deserr::ValuePointerRef::Key { key: embedder_name, prev: &vectors_ref }; let embeddings_ref = deserr::ValuePointerRef::Key { key: "embeddings", prev: &embedders_ref }; let v: VectorOrArrayOfVectors = VectorOrArrayOfVectors::deserialize_from_value( DeserrRawValue::new_in(value, doc_alloc).into_value(), embeddings_ref, )?; Ok(v.into_array_of_vectors().unwrap_or_default()) } Embeddings::FromJsonImplicityUserProvided(value) => { let vectors_ref = deserr::ValuePointerRef::Key { key: RESERVED_VECTORS_FIELD_NAME, prev: &deserr::ValuePointerRef::Origin, }; let embedders_ref = deserr::ValuePointerRef::Key { key: embedder_name, prev: &vectors_ref }; let v: VectorOrArrayOfVectors = VectorOrArrayOfVectors::deserialize_from_value( DeserrRawValue::new_in(value, doc_alloc).into_value(), embedders_ref, )?; Ok(v.into_array_of_vectors().unwrap_or_default()) } Embeddings::FromDb(vec) => Ok(vec), } } } pub struct VectorEntry<'doc> { pub has_configured_embedder: bool, pub embeddings: Option>, pub regenerate: bool, pub implicit: bool, } pub trait VectorDocument<'doc> { fn iter_vectors(&self) -> impl Iterator)>>; fn vectors_for_key(&self, key: &str) -> Result>>; } pub struct VectorDocumentFromDb<'t> { docid: DocumentId, embedding_config: Vec, index: &'t Index, vectors_field: Option>, rtxn: &'t RoTxn<'t>, doc_alloc: &'t Bump, } impl<'t> VectorDocumentFromDb<'t> { pub fn new( docid: DocumentId, index: &'t Index, rtxn: &'t RoTxn, db_fields_ids_map: &'t Mapper, doc_alloc: &'t Bump, ) -> Result> { let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else { return Ok(None); }; let vectors = document.vectors_field()?; let vectors_field = match vectors { Some(vectors) => { Some(RawMap::from_raw_value(vectors, doc_alloc).map_err(InternalError::SerdeJson)?) } None => None, }; let embedding_config = index.embedding_configs(rtxn)?; Ok(Some(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc })) } fn entry_from_db( &self, embedder_id: u8, config: &IndexEmbeddingConfig, ) -> Result> { let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_id, config.config.quantized()); let vectors = reader.item_vectors(self.rtxn, self.docid)?; Ok(VectorEntry { has_configured_embedder: true, embeddings: Some(Embeddings::FromDb(vectors)), regenerate: !config.user_provided.contains(self.docid), implicit: false, }) } } impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> { fn iter_vectors(&self) -> impl Iterator)>> { self.embedding_config .iter() .map(|config| { let embedder_id = self.index.embedder_category_id.get(self.rtxn, &config.name)?.unwrap(); let entry = self.entry_from_db(embedder_id, config)?; let config_name = self.doc_alloc.alloc_str(config.name.as_str()); Ok((&*config_name, entry)) }) .chain(self.vectors_field.iter().flat_map(|map| map.iter()).map(|(name, value)| { Ok(( name, entry_from_raw_value(value, false).map_err(|_| { InternalError::Serialization(crate::SerializationError::Decoding { db_name: Some(crate::index::db_name::VECTOR_ARROY), }) })?, )) })) } fn vectors_for_key(&self, key: &str) -> Result>> { Ok(match self.index.embedder_category_id.get(self.rtxn, key)? { Some(embedder_id) => { let config = self.embedding_config.iter().find(|config| config.name == key).unwrap(); Some(self.entry_from_db(embedder_id, config)?) } None => match self.vectors_field.as_ref().and_then(|obkv| obkv.get(key)) { Some(embedding_from_doc) => { Some(entry_from_raw_value(embedding_from_doc, false).map_err(|_| { InternalError::Serialization(crate::SerializationError::Decoding { db_name: Some(crate::index::db_name::VECTOR_ARROY), }) })?) } None => None, }, }) } } fn entry_from_raw_value_user<'doc>( external_docid: &str, embedder_name: &str, value: &'doc RawValue, has_configured_embedder: bool, ) -> Result> { entry_from_raw_value(value, has_configured_embedder).map_err(|error| { UserError::InvalidVectorsEmbedderConf { document_id: external_docid.to_string(), error: error.msg(embedder_name), } .into() }) } fn entry_from_raw_value( value: &RawValue, has_configured_embedder: bool, ) -> std::result::Result, RawVectorsError> { let value: RawVectors = RawVectors::from_raw_value(value)?; Ok(match value { RawVectors::Explicit(raw_explicit_vectors) => VectorEntry { has_configured_embedder, embeddings: raw_explicit_vectors.embeddings.map(Embeddings::FromJsonExplicit), regenerate: raw_explicit_vectors.regenerate, implicit: false, }, RawVectors::ImplicitlyUserProvided(value) => VectorEntry { has_configured_embedder, // implicitly user provided always provide embeddings // `None` here means that there are no embeddings embeddings: Some( value .map(Embeddings::FromJsonImplicityUserProvided) .unwrap_or(Embeddings::FromDb(Default::default())), ), regenerate: false, implicit: true, }, }) } pub struct VectorDocumentFromVersions<'doc> { external_document_id: &'doc str, vectors: RawMap<'doc>, embedders: &'doc EmbeddingConfigs, } impl<'doc> VectorDocumentFromVersions<'doc> { pub fn new( external_document_id: &'doc str, versions: &Versions<'doc>, bump: &'doc Bump, embedders: &'doc EmbeddingConfigs, ) -> Result> { let document = DocumentFromVersions::new(versions); if let Some(vectors_field) = document.vectors_field()? { let vectors = RawMap::from_raw_value(vectors_field, bump).map_err(UserError::SerdeJson)?; Ok(Some(Self { external_document_id, vectors, embedders })) } else { Ok(None) } } } impl<'doc> VectorDocument<'doc> for VectorDocumentFromVersions<'doc> { fn iter_vectors(&self) -> impl Iterator)>> { self.vectors.iter().map(|(embedder, vectors)| { let vectors = entry_from_raw_value_user( self.external_document_id, embedder, vectors, self.embedders.contains(embedder), )?; Ok((embedder, vectors)) }) } fn vectors_for_key(&self, key: &str) -> Result>> { let Some(vectors) = self.vectors.get(key) else { return Ok(None) }; let vectors = entry_from_raw_value_user( self.external_document_id, key, vectors, self.embedders.contains(key), )?; Ok(Some(vectors)) } } pub struct MergedVectorDocument<'doc> { new_doc: Option>, db: Option>, } impl<'doc> MergedVectorDocument<'doc> { #[allow(clippy::too_many_arguments)] pub fn with_db( docid: DocumentId, external_document_id: &'doc str, index: &'doc Index, rtxn: &'doc RoTxn, db_fields_ids_map: &'doc Mapper, versions: &Versions<'doc>, doc_alloc: &'doc Bump, embedders: &'doc EmbeddingConfigs, ) -> Result> { let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?; let new_doc = VectorDocumentFromVersions::new(&external_document_id, versions, doc_alloc, embedders)?; Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) }) } pub fn without_db( external_document_id: &'doc str, versions: &Versions<'doc>, doc_alloc: &'doc Bump, embedders: &'doc EmbeddingConfigs, ) -> Result> { let Some(new_doc) = VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)? else { return Ok(None); }; Ok(Some(Self { new_doc: Some(new_doc), db: None })) } } impl<'doc> VectorDocument<'doc> for MergedVectorDocument<'doc> { fn iter_vectors(&self) -> impl Iterator)>> { let mut new_doc_it = self.new_doc.iter().flat_map(|new_doc| new_doc.iter_vectors()); let mut db_it = self.db.iter().flat_map(|db| db.iter_vectors()); let mut seen_fields = BTreeSet::new(); std::iter::from_fn(move || { if let Some(next) = new_doc_it.next() { if let Ok((name, _)) = next { seen_fields.insert(name); } return Some(next); } loop { match db_it.next()? { Ok((name, value)) => { if seen_fields.contains(name) { continue; } return Some(Ok((name, value))); } Err(err) => return Some(Err(err)), } } }) } fn vectors_for_key(&self, key: &str) -> Result>> { if let Some(new_doc) = &self.new_doc { if let Some(entry) = new_doc.vectors_for_key(key)? { return Ok(Some(entry)); } } let Some(db) = self.db.as_ref() else { return Ok(None) }; db.vectors_for_key(key) } }