update typed chunks

This commit is contained in:
ManyTheFish 2023-10-31 17:44:57 +01:00
parent 4b64c33aa2
commit 1b4ff991c0
4 changed files with 22 additions and 54 deletions

View File

@ -74,10 +74,6 @@ impl ExternalDocumentsIds {
for DocumentOperation { external_id, internal_id, kind } in operations { for DocumentOperation { external_id, internal_id, kind } in operations {
match kind { match kind {
DocumentOperationKind::Create => { DocumentOperationKind::Create => {
// TODO should we get before insert to be able to detect bugs?
// if matches!(kind, DocumentOperationKind::Create) {
// panic!("Attempting to create an already-existing document");
// }
self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?; self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?;
} }
DocumentOperationKind::Delete => { DocumentOperationKind::Delete => {

View File

@ -17,7 +17,6 @@ use crate::{DocumentId, FieldId, InternalError, Result, VectorOrArrayOfVectors};
pub fn extract_vector_points<R: io::Read + io::Seek>( pub fn extract_vector_points<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>, obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
primary_key_id: FieldId,
vectors_fid: FieldId, vectors_fid: FieldId,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!(); puffin::profile_function!();

View File

@ -63,7 +63,6 @@ pub(crate) fn data_from_obkv_documents(
indexer, indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
vectors_field_id, vectors_field_id,
primary_key_id,
) )
}) })
.collect::<Result<()>>()?; .collect::<Result<()>>()?;
@ -274,7 +273,6 @@ fn send_original_documents_data(
indexer: GrenadParameters, indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>, lmdb_writer_sx: Sender<Result<TypedChunk>>,
vectors_field_id: Option<FieldId>, vectors_field_id: Option<FieldId>,
primary_key_id: FieldId,
) -> Result<()> { ) -> Result<()> {
let original_documents_chunk = let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
@ -283,12 +281,7 @@ fn send_original_documents_data(
let documents_chunk_cloned = original_documents_chunk.clone(); let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
rayon::spawn(move || { rayon::spawn(move || {
let result = extract_vector_points( let result = extract_vector_points(documents_chunk_cloned, indexer, vectors_field_id);
documents_chunk_cloned,
indexer,
primary_key_id,
vectors_field_id,
);
let _ = match result { let _ = match result {
Ok(vector_points) => { Ok(vector_points) => {
lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints(vector_points))) lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints(vector_points)))

View File

@ -17,6 +17,7 @@ use crate::distance::NDotProductPoint;
use crate::error::UserError; use crate::error::UserError;
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::index::db_name::DOCUMENTS;
use crate::index::Hnsw; use crate::index::Hnsw;
use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::facet::FacetsUpdate; use crate::update::facet::FacetsUpdate;
@ -24,7 +25,7 @@ use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_arr
use crate::update::index_documents::validate_document_id_value; use crate::update::index_documents::validate_document_id_value;
use crate::{ use crate::{
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError, lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
Result, BEU32, Result, SerializationError, BEU32,
}; };
pub(crate) enum TypedChunk { pub(crate) enum TypedChunk {
@ -124,13 +125,15 @@ pub(crate) fn write_typed_chunk_into_index(
let mut operations: Vec<DocumentOperation> = Default::default(); let mut operations: Vec<DocumentOperation> = Default::default();
let mut docids = index.documents_ids(wtxn)?; let mut docids = index.documents_ids(wtxn)?;
let primary_key = index.primary_key(wtxn)?.unwrap();
let primary_key = index.fields_ids_map(wtxn)?.id(primary_key).unwrap();
let mut cursor = obkv_documents_iter.into_cursor()?; let mut cursor = obkv_documents_iter.into_cursor()?;
while let Some((docid, reader)) = cursor.move_on_next()? { while let Some((key, reader)) = cursor.move_on_next()? {
let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
let reader: KvReader<FieldId> = KvReader::new(reader); let reader: KvReader<FieldId> = KvReader::new(reader);
let docid = docid.try_into().map(DocumentId::from_be_bytes).unwrap();
let (document_id_bytes, external_id_bytes) = try_split_array_at(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCUMENTS) })?;
let docid = DocumentId::from_be_bytes(document_id_bytes);
let external_id = std::str::from_utf8(external_id_bytes)?;
for (field_id, value) in reader.iter() { for (field_id, value) in reader.iter() {
let del_add_reader = KvReaderDelAdd::new(value); let del_add_reader = KvReaderDelAdd::new(value);
@ -140,45 +143,10 @@ pub(crate) fn write_typed_chunk_into_index(
) { ) {
(None, None) => {} (None, None) => {}
(None, Some(value)) => { (None, Some(value)) => {
// if primary key, new document
if field_id == primary_key {
// FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk
// rather than re-extract it here
// FIXME: unwraps
let document_id = serde_json::from_slice(value)
.map_err(InternalError::SerdeJson)
.unwrap();
let external_id =
validate_document_id_value(document_id).unwrap().unwrap();
operations.push(DocumentOperation {
external_id,
internal_id: docid,
kind: DocumentOperationKind::Create,
});
docids.insert(docid);
}
// anyway, write // anyway, write
writer.insert(field_id, value)?; writer.insert(field_id, value)?;
} }
(Some(value), None) => { (Some(_), None) => {}
// if primary key, deleted document
if field_id == primary_key {
// FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk
// rather than re-extract it here
// FIXME: unwraps
let document_id = serde_json::from_slice(value)
.map_err(InternalError::SerdeJson)
.unwrap();
let external_id =
validate_document_id_value(document_id).unwrap().unwrap();
operations.push(DocumentOperation {
external_id,
internal_id: docid,
kind: DocumentOperationKind::Delete,
});
docids.remove(docid);
}
}
(Some(_), Some(value)) => { (Some(_), Some(value)) => {
// updated field, write // updated field, write
writer.insert(field_id, value)?; writer.insert(field_id, value)?;
@ -190,8 +158,20 @@ pub(crate) fn write_typed_chunk_into_index(
if !writer.is_empty() { if !writer.is_empty() {
db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?; db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?;
operations.push(DocumentOperation {
external_id: external_id.to_string(),
internal_id: docid,
kind: DocumentOperationKind::Create,
});
docids.insert(docid);
} else { } else {
db.delete(wtxn, &BEU32::new(docid))?; db.delete(wtxn, &BEU32::new(docid))?;
operations.push(DocumentOperation {
external_id: external_id.to_string(),
internal_id: docid,
kind: DocumentOperationKind::Delete,
});
docids.remove(docid);
} }
} }
let external_documents_docids = index.external_documents_ids(); let external_documents_docids = index.external_documents_ids();