diff --git a/milli/src/external_documents_ids.rs b/milli/src/external_documents_ids.rs index 1bf08396a..ee8d29ffc 100644 --- a/milli/src/external_documents_ids.rs +++ b/milli/src/external_documents_ids.rs @@ -74,10 +74,6 @@ impl ExternalDocumentsIds { for DocumentOperation { external_id, internal_id, kind } in operations { match kind { DocumentOperationKind::Create => { - // TODO should we get before insert to be able to detect bugs? - // if matches!(kind, DocumentOperationKind::Create) { - // panic!("Attempting to create an already-existing document"); - // } self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?; } DocumentOperationKind::Delete => { diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index 9aed862ab..1f5edeeeb 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -17,7 +17,6 @@ use crate::{DocumentId, FieldId, InternalError, Result, VectorOrArrayOfVectors}; pub fn extract_vector_points( obkv_documents: grenad::Reader, indexer: GrenadParameters, - primary_key_id: FieldId, vectors_fid: FieldId, ) -> Result>> { puffin::profile_function!(); diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 41722a53e..ee8713ee8 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -63,7 +63,6 @@ pub(crate) fn data_from_obkv_documents( indexer, lmdb_writer_sx.clone(), vectors_field_id, - primary_key_id, ) }) .collect::>()?; @@ -274,7 +273,6 @@ fn send_original_documents_data( indexer: GrenadParameters, lmdb_writer_sx: Sender>, vectors_field_id: Option, - primary_key_id: FieldId, ) -> Result<()> { let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -283,12 +281,7 @@ fn send_original_documents_data( let documents_chunk_cloned = original_documents_chunk.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); rayon::spawn(move || { - let result = extract_vector_points( - documents_chunk_cloned, - indexer, - primary_key_id, - vectors_field_id, - ); + let result = extract_vector_points(documents_chunk_cloned, indexer, vectors_field_id); let _ = match result { Ok(vector_points) => { lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints(vector_points))) diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 1b38be03b..7c3f587d2 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -17,6 +17,7 @@ use crate::distance::NDotProductPoint; use crate::error::UserError; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; +use crate::index::db_name::DOCUMENTS; use crate::index::Hnsw; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; @@ -24,7 +25,7 @@ use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_arr use crate::update::index_documents::validate_document_id_value; use crate::{ lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError, - Result, BEU32, + Result, SerializationError, BEU32, }; pub(crate) enum TypedChunk { @@ -124,13 +125,15 @@ pub(crate) fn write_typed_chunk_into_index( let mut operations: Vec = Default::default(); let mut docids = index.documents_ids(wtxn)?; - let primary_key = index.primary_key(wtxn)?.unwrap(); - let primary_key = index.fields_ids_map(wtxn)?.id(primary_key).unwrap(); let mut cursor = obkv_documents_iter.into_cursor()?; - while let Some((docid, reader)) = cursor.move_on_next()? { + while let Some((key, reader)) = cursor.move_on_next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); let reader: KvReader = KvReader::new(reader); - let docid = docid.try_into().map(DocumentId::from_be_bytes).unwrap(); + + let (document_id_bytes, external_id_bytes) = try_split_array_at(key) + .ok_or(SerializationError::Decoding { db_name: Some(DOCUMENTS) })?; + let docid = DocumentId::from_be_bytes(document_id_bytes); + let external_id = std::str::from_utf8(external_id_bytes)?; for (field_id, value) in reader.iter() { let del_add_reader = KvReaderDelAdd::new(value); @@ -140,45 +143,10 @@ pub(crate) fn write_typed_chunk_into_index( ) { (None, None) => {} (None, Some(value)) => { - // if primary key, new document - if field_id == primary_key { - // FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk - // rather than re-extract it here - // FIXME: unwraps - let document_id = serde_json::from_slice(value) - .map_err(InternalError::SerdeJson) - .unwrap(); - let external_id = - validate_document_id_value(document_id).unwrap().unwrap(); - operations.push(DocumentOperation { - external_id, - internal_id: docid, - kind: DocumentOperationKind::Create, - }); - docids.insert(docid); - } // anyway, write writer.insert(field_id, value)?; } - (Some(value), None) => { - // if primary key, deleted document - if field_id == primary_key { - // FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk - // rather than re-extract it here - // FIXME: unwraps - let document_id = serde_json::from_slice(value) - .map_err(InternalError::SerdeJson) - .unwrap(); - let external_id = - validate_document_id_value(document_id).unwrap().unwrap(); - operations.push(DocumentOperation { - external_id, - internal_id: docid, - kind: DocumentOperationKind::Delete, - }); - docids.remove(docid); - } - } + (Some(_), None) => {} (Some(_), Some(value)) => { // updated field, write writer.insert(field_id, value)?; @@ -190,8 +158,20 @@ pub(crate) fn write_typed_chunk_into_index( if !writer.is_empty() { db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?; + operations.push(DocumentOperation { + external_id: external_id.to_string(), + internal_id: docid, + kind: DocumentOperationKind::Create, + }); + docids.insert(docid); } else { db.delete(wtxn, &BEU32::new(docid))?; + operations.push(DocumentOperation { + external_id: external_id.to_string(), + internal_id: docid, + kind: DocumentOperationKind::Delete, + }); + docids.remove(docid); } } let external_documents_docids = index.external_documents_ids();