Allow random access to fields in documents

This commit is contained in:
Louis Dureuil 2024-10-28 14:23:38 +01:00
parent 65470e26e0
commit 5efd70c251
No known key found for this signature in database
8 changed files with 77 additions and 25 deletions

View File

@ -176,6 +176,7 @@ mod test {
use serde_json::value::RawValue;
use super::*;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::update::new::document::{DocumentFromVersions, Versions};
use crate::FieldsIdsMap;
@ -212,6 +213,11 @@ mod test {
max_positions_per_attributes: 1000,
};
let fields_ids_map = FieldIdMapWithMetadata::new(
fields_ids_map,
MetadataBuilder::new(Default::default(), Default::default(), Default::default(), None),
);
let fields_ids_map_lock = std::sync::RwLock::new(fields_ids_map);
let mut global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
@ -223,7 +229,8 @@ mod test {
let document: &RawValue = serde_json::from_str(&document).unwrap();
let document = RawMap::from_raw_value(document, &bump).unwrap();
let document = DocumentFromVersions::new(Versions::single(document));
let document = Versions::single(document);
let document = DocumentFromVersions::new(&document);
document_tokenizer
.tokenize_document(

View File

@ -7,6 +7,7 @@ use raw_collections::alloc::RefBump;
use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result};
@ -278,7 +279,7 @@ impl<
pub fn new<F>(
index: &'indexer Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: &'fid RwLock<FieldsIdsMap>,
new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
extractor_allocs: &'extractor ThreadLocal<FullySend<RefCell<Bump>>>,
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
datastore: &'data ThreadLocal<T>,
@ -351,7 +352,7 @@ pub struct IndexingContext<
> {
pub index: &'index Index,
pub db_fields_ids_map: &'indexer FieldsIdsMap,
pub new_fields_ids_map: &'fid RwLock<FieldsIdsMap>,
pub new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
pub doc_allocs: &'indexer ThreadLocal<FullySend<Cell<Bump>>>,
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
}

View File

@ -86,6 +86,7 @@ mod test {
use bumpalo::Bump;
use raw_collections::alloc::RefBump;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::index::tests::TempIndex;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, Extractor, IndexingContext, MostlySend,
@ -144,7 +145,9 @@ mod test {
let rtxn = index.read_txn().unwrap();
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let fields_ids_map = RwLock::new(db_fields_ids_map.clone());
let metadata_builder = MetadataBuilder::from_index(&index, &rtxn).unwrap();
let fields_ids_map =
RwLock::new(FieldIdMapWithMetadata::new(db_fields_ids_map.clone(), metadata_builder));
let fields_ids_map_store = ThreadLocal::new();

View File

@ -289,19 +289,17 @@ impl MergeChanges for MergeDocumentForReplacement {
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
.map_err(UserError::SerdeJson)?;
let document = DocumentFromVersions::new(Versions::single(document));
if is_new {
Ok(Some(DocumentChange::Insertion(Insertion::create(
docid,
external_doc,
document,
Versions::single(document),
))))
} else {
Ok(Some(DocumentChange::Update(Update::create(
docid,
external_doc,
document,
Versions::single(document),
true,
))))
}
@ -396,15 +394,13 @@ impl MergeChanges for MergeDocumentForUpdates {
let Some(versions) = versions else { return Ok(None) };
let document = DocumentFromVersions::new(versions);
if is_new {
Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, document))))
Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, versions))))
} else {
Ok(Some(DocumentChange::Update(Update::create(
docid,
external_docid,
document,
versions,
has_deletion,
))))
}

View File

@ -28,6 +28,7 @@ use super::words_prefix_docids::{
use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::proximity::ProximityPrecision;
use crate::update::new::channel::ExtractorSender;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
@ -122,6 +123,10 @@ where
// This channel acts as a rendezvous point to ensure that we are one task ahead
let (extractor_sender, merger_receiver) = extractors_merger_channels(4);
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
@ -298,8 +303,8 @@ where
// required to into_inner the new_fields_ids_map
drop(fields_ids_map_store);
let fields_ids_map = new_fields_ids_map.into_inner().unwrap();
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
if let Some(new_primary_key) = new_primary_key {
index.put_primary_key(wtxn, new_primary_key.name())?;

View File

@ -76,9 +76,7 @@ where
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
.map_err(InternalError::SerdeJson)?;
let document = DocumentFromVersions::new(Versions::single(document));
let insertion = Insertion::create(docid, external_document_id, document);
let insertion = Insertion::create(docid, external_document_id, Versions::single(document));
Ok(Some(DocumentChange::Insertion(insertion)))
}
}

View File

@ -160,12 +160,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
} else {
let raw_new_doc = RawMap::from_raw_value(raw_new_doc, doc_alloc)
.map_err(InternalError::SerdeJson)?;
let new_doc_version =
DocumentFromVersions::new(Versions::single(raw_new_doc));
Ok(Some(DocumentChange::Update(Update::create(
docid,
new_document_id,
new_doc_version,
Versions::single(raw_new_doc),
true, // It is like document replacement
))))
}

View File

@ -4,12 +4,12 @@ use raw_collections::RawMap;
use serde::Serialize;
use serde_json::value::RawValue;
use super::document::{Document, DocumentFromDb};
use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions};
use crate::documents::FieldIdMapper;
use crate::index::IndexEmbeddingConfig;
use crate::vector::parsed_vectors::RawVectors;
use crate::vector::Embedding;
use crate::{DocumentId, Index, InternalError, Result};
use crate::{DocumentId, Index, InternalError, Result, UserError};
#[derive(Serialize)]
#[serde(untagged)]
@ -17,6 +17,15 @@ pub enum Embeddings<'doc> {
FromJson(&'doc RawValue),
FromDb(Vec<Embedding>),
}
impl<'doc> Embeddings<'doc> {
pub fn into_vec(self) -> std::result::Result<Vec<Embedding>, serde_json::Error> {
match self {
/// FIXME: this should be a VecOrArrayOfVec
Embeddings::FromJson(value) => serde_json::from_str(value.get()),
Embeddings::FromDb(vec) => Ok(vec),
}
}
}
pub struct VectorEntry<'doc> {
pub has_configured_embedder: bool,
@ -46,8 +55,10 @@ impl<'t> VectorDocumentFromDb<'t> {
rtxn: &'t RoTxn,
db_fields_ids_map: &'t Mapper,
doc_alloc: &'t Bump,
) -> Result<Self> {
let document = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?.unwrap();
) -> Result<Option<Self>> {
let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else {
return Ok(None);
};
let vectors = document.vectors_field()?;
let vectors_field = match vectors {
Some(vectors) => {
@ -58,7 +69,7 @@ impl<'t> VectorDocumentFromDb<'t> {
let embedding_config = index.embedding_configs(rtxn)?;
Ok(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc })
Ok(Some(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc }))
}
fn entry_from_db(
@ -132,3 +143,35 @@ fn entry_from_raw_value(
regenerate: value.must_regenerate(),
})
}
pub struct VectorDocumentFromVersions<'doc> {
vectors: RawMap<'doc>,
}
impl<'doc> VectorDocumentFromVersions<'doc> {
pub fn new(versions: &Versions<'doc>, bump: &'doc Bump) -> Result<Option<Self>> {
let document = DocumentFromVersions::new(versions);
if let Some(vectors_field) = document.vectors_field()? {
let vectors =
RawMap::from_raw_value(vectors_field, bump).map_err(UserError::SerdeJson)?;
Ok(Some(Self { vectors }))
} else {
Ok(None)
}
}
}
impl<'doc> VectorDocument<'doc> for VectorDocumentFromVersions<'doc> {
fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>> {
self.vectors.iter().map(|(embedder, vectors)| {
let vectors = entry_from_raw_value(vectors).map_err(UserError::SerdeJson)?;
Ok((embedder, vectors))
})
}
fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>> {
let Some(vectors) = self.vectors.get(key) else { return Ok(None) };
let vectors = entry_from_raw_value(vectors).map_err(UserError::SerdeJson)?;
Ok(Some(vectors))
}
}