From 5efd70c2518c892e5ba00ee419cc2f1839a368a7 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 28 Oct 2024 14:23:38 +0100 Subject: [PATCH] Allow random access to fields in documents --- .../extract/searchable/tokenize_document.rs | 9 +++- .../update/new/indexer/document_changes.rs | 5 +- .../update/new/indexer/document_deletion.rs | 5 +- .../update/new/indexer/document_operation.rs | 12 ++--- milli/src/update/new/indexer/mod.rs | 9 +++- milli/src/update/new/indexer/partial_dump.rs | 4 +- .../update/new/indexer/update_by_function.rs | 5 +- milli/src/update/new/vector_document.rs | 53 +++++++++++++++++-- 8 files changed, 77 insertions(+), 25 deletions(-) diff --git a/milli/src/update/new/extract/searchable/tokenize_document.rs b/milli/src/update/new/extract/searchable/tokenize_document.rs index 5428907f8..793e3a249 100644 --- a/milli/src/update/new/extract/searchable/tokenize_document.rs +++ b/milli/src/update/new/extract/searchable/tokenize_document.rs @@ -176,6 +176,7 @@ mod test { use serde_json::value::RawValue; use super::*; + use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::update::new::document::{DocumentFromVersions, Versions}; use crate::FieldsIdsMap; @@ -212,6 +213,11 @@ mod test { max_positions_per_attributes: 1000, }; + let fields_ids_map = FieldIdMapWithMetadata::new( + fields_ids_map, + MetadataBuilder::new(Default::default(), Default::default(), Default::default(), None), + ); + let fields_ids_map_lock = std::sync::RwLock::new(fields_ids_map); let mut global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); @@ -223,7 +229,8 @@ mod test { let document: &RawValue = serde_json::from_str(&document).unwrap(); let document = RawMap::from_raw_value(document, &bump).unwrap(); - let document = DocumentFromVersions::new(Versions::single(document)); + let document = Versions::single(document); + let document = DocumentFromVersions::new(&document); document_tokenizer .tokenize_document( diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 91c65a6d1..fd16137b9 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -7,6 +7,7 @@ use raw_collections::alloc::RefBump; use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; +use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result}; @@ -278,7 +279,7 @@ impl< pub fn new( index: &'indexer Index, db_fields_ids_map: &'indexer FieldsIdsMap, - new_fields_ids_map: &'fid RwLock, + new_fields_ids_map: &'fid RwLock, extractor_allocs: &'extractor ThreadLocal>>, doc_allocs: &'doc ThreadLocal>>, datastore: &'data ThreadLocal, @@ -351,7 +352,7 @@ pub struct IndexingContext< > { pub index: &'index Index, pub db_fields_ids_map: &'indexer FieldsIdsMap, - pub new_fields_ids_map: &'fid RwLock, + pub new_fields_ids_map: &'fid RwLock, pub doc_allocs: &'indexer ThreadLocal>>, pub fields_ids_map_store: &'indexer ThreadLocal>>>, } diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index bbd2b11ac..d193b65fa 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -86,6 +86,7 @@ mod test { use bumpalo::Bump; use raw_collections::alloc::RefBump; + use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::tests::TempIndex; use crate::update::new::indexer::document_changes::{ for_each_document_change, DocumentChangeContext, Extractor, IndexingContext, MostlySend, @@ -144,7 +145,9 @@ mod test { let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let fields_ids_map = RwLock::new(db_fields_ids_map.clone()); + let metadata_builder = MetadataBuilder::from_index(&index, &rtxn).unwrap(); + let fields_ids_map = + RwLock::new(FieldIdMapWithMetadata::new(db_fields_ids_map.clone(), metadata_builder)); let fields_ids_map_store = ThreadLocal::new(); diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 007b56643..bc1634d75 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -289,19 +289,17 @@ impl MergeChanges for MergeDocumentForReplacement { let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) .map_err(UserError::SerdeJson)?; - let document = DocumentFromVersions::new(Versions::single(document)); - if is_new { Ok(Some(DocumentChange::Insertion(Insertion::create( docid, external_doc, - document, + Versions::single(document), )))) } else { Ok(Some(DocumentChange::Update(Update::create( docid, external_doc, - document, + Versions::single(document), true, )))) } @@ -396,15 +394,13 @@ impl MergeChanges for MergeDocumentForUpdates { let Some(versions) = versions else { return Ok(None) }; - let document = DocumentFromVersions::new(versions); - if is_new { - Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, document)))) + Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, versions)))) } else { Ok(Some(DocumentChange::Update(Update::create( docid, external_docid, - document, + versions, has_deletion, )))) } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 0fc7940bb..dd2506ef9 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -28,6 +28,7 @@ use super::words_prefix_docids::{ use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; +use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::proximity::ProximityPrecision; use crate::update::new::channel::ExtractorSender; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; @@ -122,6 +123,10 @@ where // This channel acts as a rendezvous point to ensure that we are one task ahead let (extractor_sender, merger_receiver) = extractors_merger_channels(4); + let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; + + let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); + let new_fields_ids_map = RwLock::new(new_fields_ids_map); let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); @@ -298,8 +303,8 @@ where // required to into_inner the new_fields_ids_map drop(fields_ids_map_store); - let fields_ids_map = new_fields_ids_map.into_inner().unwrap(); - index.put_fields_ids_map(wtxn, &fields_ids_map)?; + let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); + index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; if let Some(new_primary_key) = new_primary_key { index.put_primary_key(wtxn, new_primary_key.name())?; diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 60cb627e9..3913098ec 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -76,9 +76,7 @@ where let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) .map_err(InternalError::SerdeJson)?; - let document = DocumentFromVersions::new(Versions::single(document)); - - let insertion = Insertion::create(docid, external_document_id, document); + let insertion = Insertion::create(docid, external_document_id, Versions::single(document)); Ok(Some(DocumentChange::Insertion(insertion))) } } diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index cff0e02fc..b08f8c380 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -160,12 +160,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { } else { let raw_new_doc = RawMap::from_raw_value(raw_new_doc, doc_alloc) .map_err(InternalError::SerdeJson)?; - let new_doc_version = - DocumentFromVersions::new(Versions::single(raw_new_doc)); + Ok(Some(DocumentChange::Update(Update::create( docid, new_document_id, - new_doc_version, + Versions::single(raw_new_doc), true, // It is like document replacement )))) } diff --git a/milli/src/update/new/vector_document.rs b/milli/src/update/new/vector_document.rs index 375d4f2ce..782076716 100644 --- a/milli/src/update/new/vector_document.rs +++ b/milli/src/update/new/vector_document.rs @@ -4,12 +4,12 @@ use raw_collections::RawMap; use serde::Serialize; use serde_json::value::RawValue; -use super::document::{Document, DocumentFromDb}; +use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions}; use crate::documents::FieldIdMapper; use crate::index::IndexEmbeddingConfig; use crate::vector::parsed_vectors::RawVectors; use crate::vector::Embedding; -use crate::{DocumentId, Index, InternalError, Result}; +use crate::{DocumentId, Index, InternalError, Result, UserError}; #[derive(Serialize)] #[serde(untagged)] @@ -17,6 +17,15 @@ pub enum Embeddings<'doc> { FromJson(&'doc RawValue), FromDb(Vec), } +impl<'doc> Embeddings<'doc> { + pub fn into_vec(self) -> std::result::Result, serde_json::Error> { + match self { + /// FIXME: this should be a VecOrArrayOfVec + Embeddings::FromJson(value) => serde_json::from_str(value.get()), + Embeddings::FromDb(vec) => Ok(vec), + } + } +} pub struct VectorEntry<'doc> { pub has_configured_embedder: bool, @@ -46,8 +55,10 @@ impl<'t> VectorDocumentFromDb<'t> { rtxn: &'t RoTxn, db_fields_ids_map: &'t Mapper, doc_alloc: &'t Bump, - ) -> Result { - let document = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?.unwrap(); + ) -> Result> { + let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else { + return Ok(None); + }; let vectors = document.vectors_field()?; let vectors_field = match vectors { Some(vectors) => { @@ -58,7 +69,7 @@ impl<'t> VectorDocumentFromDb<'t> { let embedding_config = index.embedding_configs(rtxn)?; - Ok(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc }) + Ok(Some(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc })) } fn entry_from_db( @@ -132,3 +143,35 @@ fn entry_from_raw_value( regenerate: value.must_regenerate(), }) } + +pub struct VectorDocumentFromVersions<'doc> { + vectors: RawMap<'doc>, +} + +impl<'doc> VectorDocumentFromVersions<'doc> { + pub fn new(versions: &Versions<'doc>, bump: &'doc Bump) -> Result> { + let document = DocumentFromVersions::new(versions); + if let Some(vectors_field) = document.vectors_field()? { + let vectors = + RawMap::from_raw_value(vectors_field, bump).map_err(UserError::SerdeJson)?; + Ok(Some(Self { vectors })) + } else { + Ok(None) + } + } +} + +impl<'doc> VectorDocument<'doc> for VectorDocumentFromVersions<'doc> { + fn iter_vectors(&self) -> impl Iterator)>> { + self.vectors.iter().map(|(embedder, vectors)| { + let vectors = entry_from_raw_value(vectors).map_err(UserError::SerdeJson)?; + Ok((embedder, vectors)) + }) + } + + fn vectors_for_key(&self, key: &str) -> Result>> { + let Some(vectors) = self.vectors.get(key) else { return Ok(None) }; + let vectors = entry_from_raw_value(vectors).map_err(UserError::SerdeJson)?; + Ok(Some(vectors)) + } +}