From 0c57cf7565c836e559cce63f03b77450207eb26f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 29 Aug 2024 19:20:10 +0200 Subject: [PATCH] Replace obkv with the temporary new version of it --- Cargo.lock | 13 +--- meilisearch/Cargo.toml | 2 +- meilisearch/src/search/mod.rs | 2 +- milli/Cargo.toml | 5 +- milli/src/documents/enriched.rs | 2 +- milli/src/documents/mod.rs | 4 +- milli/src/documents/primary_key.rs | 2 +- milli/src/documents/reader.rs | 12 ++-- milli/src/heed_codec/obkv_codec.rs | 4 +- .../cbo_roaring_bitmap_codec.rs | 2 +- milli/src/index.rs | 8 +-- milli/src/lib.rs | 7 +- milli/src/prompt/document.rs | 4 +- milli/src/prompt/mod.rs | 2 +- milli/src/update/del_add.rs | 32 ++------- milli/src/update/facet/bulk.rs | 4 +- milli/src/update/facet/incremental.rs | 2 +- milli/src/update/facet/mod.rs | 2 +- milli/src/update/index_documents/enrich.rs | 2 +- .../extract/extract_docid_word_positions.rs | 16 ++--- .../extract/extract_facet_number_docids.rs | 2 +- .../extract/extract_facet_string_docids.rs | 4 +- .../extract/extract_fid_docid_facet_values.rs | 4 +- .../extract/extract_fid_word_count_docids.rs | 10 ++- .../extract/extract_geo_points.rs | 12 ++-- .../extract/extract_vector_points.rs | 14 ++-- .../extract/extract_word_docids.rs | 10 +-- .../extract_word_pair_proximity_docids.rs | 8 +-- .../extract/extract_word_position_docids.rs | 6 +- .../helpers/merge_functions.rs | 20 +++--- milli/src/update/index_documents/parallel.rs | 4 +- milli/src/update/index_documents/transform.rs | 42 +++++------ .../src/update/index_documents/typed_chunk.rs | 14 ++-- milli/src/update/new/channel.rs | 5 +- milli/src/update/new/document_change.rs | 2 +- milli/src/update/new/mod.rs | 71 +++---------------- milli/src/vector/parsed_vectors.rs | 10 ++- 37 files changed, 142 insertions(+), 223 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a21cbc007..e0effa54d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3434,7 +3434,7 @@ dependencies = [ "mimalloc", "mime", "num_cpus", - "obkv 0.2.2", + "obkv", "once_cell", "ordered-float", "parking_lot", @@ -3601,8 +3601,7 @@ dependencies = [ "memchr", "memmap2", "mimalloc", - "obkv 0.2.2", - "obkv 0.3.0", + "obkv", "once_cell", "ordered-float", "rand", @@ -3849,16 +3848,10 @@ dependencies = [ "memchr", ] -[[package]] -name = "obkv" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2e27bcfe835a379d32352112f6b8dbae2d99d16a5fff42abe6e5ba5386c1e5a" - [[package]] name = "obkv" version = "0.3.0" -source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#5289a6658cd471f4212c1edc1a40b2a3c3d11fe0" +source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#9c2900d106fa84e7079b288e7f7c366ec7cae948" [[package]] name = "once_cell" diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index e614ecc6a..041d5d871 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -57,7 +57,7 @@ meilisearch-types = { path = "../meilisearch-types" } mimalloc = { version = "0.1.43", default-features = false } mime = "0.3.17" num_cpus = "1.16.0" -obkv = "0.2.2" +obkv = { git = "https://github.com/kerollmops/obkv", branch = "unsized-kvreader" } once_cell = "1.19.0" ordered-float = "4.2.1" parking_lot = "0.12.3" diff --git a/meilisearch/src/search/mod.rs b/meilisearch/src/search/mod.rs index 915505be0..4ada47ff1 100644 --- a/meilisearch/src/search/mod.rs +++ b/meilisearch/src/search/mod.rs @@ -1247,7 +1247,7 @@ impl<'a> HitMaker<'a> { self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?; // First generate a document with all the displayed fields - let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, obkv)?; + let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, &obkv)?; let add_vectors_fid = self.vectors_fid.filter(|_fid| self.retrieve_vectors == RetrieveVectors::Retrieve); diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 9fa270d46..b15f72f15 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -12,6 +12,7 @@ readme.workspace = true license.workspace = true [dependencies] +big_s = "1.0.2" bimap = { version = "0.6.3", features = ["serde"] } bincode = "1.3.3" bstr = "1.9.1" @@ -44,8 +45,7 @@ levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] } lru = "0.12.3" memchr = "2.5.0" memmap2 = "0.9.4" -obkv = "0.2.2" -obkv2 = { package = "obkv", git = "https://github.com/kerollmops/obkv", branch = "unsized-kvreader" } +obkv = { git = "https://github.com/kerollmops/obkv", branch = "unsized-kvreader" } once_cell = "1.19.0" ordered-float = "4.2.1" rayon = "1.10.0" @@ -94,7 +94,6 @@ rayon-par-bridge = "0.1.0" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } -big_s = "1.0.2" insta = "1.39.0" maplit = "1.0.2" md5 = "0.7.0" diff --git a/milli/src/documents/enriched.rs b/milli/src/documents/enriched.rs index 609765068..cede4d2f0 100644 --- a/milli/src/documents/enriched.rs +++ b/milli/src/documents/enriched.rs @@ -69,7 +69,7 @@ impl EnrichedDocumentsBatchReader { #[derive(Debug, Clone)] pub struct EnrichedDocument<'a> { - pub document: KvReader<'a, FieldId>, + pub document: &'a KvReader, pub document_id: DocumentId, } diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index f4509256d..036981b65 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -27,7 +27,7 @@ use crate::{FieldId, Object, Result}; const DOCUMENTS_BATCH_INDEX_KEY: [u8; 8] = u64::MAX.to_be_bytes(); /// Helper function to convert an obkv reader into a JSON object. -pub fn obkv_to_object(obkv: &KvReader<'_, FieldId>, index: &DocumentsBatchIndex) -> Result { +pub fn obkv_to_object(obkv: &KvReader, index: &DocumentsBatchIndex) -> Result { obkv.iter() .map(|(field_id, value)| { let field_name = index @@ -76,7 +76,7 @@ impl DocumentsBatchIndex { self.0.get_by_right(name).cloned() } - pub fn recreate_json(&self, document: &obkv::KvReaderU16<'_>) -> Result { + pub fn recreate_json(&self, document: &obkv::KvReaderU16) -> Result { let mut map = Object::new(); for (k, v) in document.iter() { diff --git a/milli/src/documents/primary_key.rs b/milli/src/documents/primary_key.rs index 64131af40..22918f8fc 100644 --- a/milli/src/documents/primary_key.rs +++ b/milli/src/documents/primary_key.rs @@ -52,7 +52,7 @@ impl<'a> PrimaryKey<'a> { pub fn document_id( &self, - document: &obkv::KvReader<'_, FieldId>, + document: &obkv::KvReader, fields: &impl FieldIdMapper, ) -> Result> { match self { diff --git a/milli/src/documents/reader.rs b/milli/src/documents/reader.rs index ebdc514fd..20e932805 100644 --- a/milli/src/documents/reader.rs +++ b/milli/src/documents/reader.rs @@ -76,11 +76,9 @@ impl DocumentsBatchCursor { pub fn get( &mut self, offset: u32, - ) -> Result>, DocumentsBatchCursorError> { + ) -> Result>, DocumentsBatchCursorError> { match self.cursor.move_on_key_equal_to(offset.to_be_bytes())? { - Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => { - Ok(Some(KvReader::new(value))) - } + Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())), _otherwise => Ok(None), } } @@ -89,11 +87,9 @@ impl DocumentsBatchCursor { /// `next_document` advance the document reader until all the documents have been read. pub fn next_document( &mut self, - ) -> Result>, DocumentsBatchCursorError> { + ) -> Result>, DocumentsBatchCursorError> { match self.cursor.move_on_next()? { - Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => { - Ok(Some(KvReader::new(value))) - } + Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())), _otherwise => Ok(None), } } diff --git a/milli/src/heed_codec/obkv_codec.rs b/milli/src/heed_codec/obkv_codec.rs index 390a57af3..447323571 100644 --- a/milli/src/heed_codec/obkv_codec.rs +++ b/milli/src/heed_codec/obkv_codec.rs @@ -6,10 +6,10 @@ use obkv::{KvReaderU16, KvWriterU16}; pub struct ObkvCodec; impl<'a> heed::BytesDecode<'a> for ObkvCodec { - type DItem = KvReaderU16<'a>; + type DItem = &'a KvReaderU16; fn bytes_decode(bytes: &'a [u8]) -> Result { - Ok(KvReaderU16::new(bytes)) + Ok(KvReaderU16::from_slice(bytes)) } } diff --git a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs index fa65d5217..257d5bd0a 100644 --- a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs +++ b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs @@ -122,7 +122,7 @@ impl CboRoaringBitmapCodec { /// Merges a DelAdd delta into a CboRoaringBitmap. pub fn merge_deladd_into<'a>( - deladd: KvReaderDelAdd<'_>, + deladd: &KvReaderDelAdd, previous: &[u8], buffer: &'a mut Vec, ) -> io::Result> { diff --git a/milli/src/index.rs b/milli/src/index.rs index 5d651e144..9c582b97a 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1252,7 +1252,7 @@ impl Index { /* documents */ /// Returns a document by using the document id. - pub fn document<'t>(&self, rtxn: &'t RoTxn, id: DocumentId) -> Result> { + pub fn document<'t>(&self, rtxn: &'t RoTxn, id: DocumentId) -> Result<&'t obkv::KvReaderU16> { self.documents .get(rtxn, &id)? .ok_or(UserError::UnknownInternalDocumentId { document_id: id }) @@ -1264,7 +1264,7 @@ impl Index { &'a self, rtxn: &'t RoTxn<'t>, ids: impl IntoIterator + 'a, - ) -> Result)>> + 'a> { + ) -> Result> + 'a> { Ok(ids.into_iter().map(move |id| { let kv = self .documents @@ -1279,7 +1279,7 @@ impl Index { &self, rtxn: &'t RoTxn<'t>, ids: impl IntoIterator, - ) -> Result)>> { + ) -> Result> { self.iter_documents(rtxn, ids)?.collect() } @@ -1287,7 +1287,7 @@ impl Index { pub fn all_documents<'a, 't: 'a>( &'a self, rtxn: &'t RoTxn<'t>, - ) -> Result)>> + 'a> { + ) -> Result> + 'a> { self.iter_documents(rtxn, self.documents_ids(rtxn)?) } diff --git a/milli/src/lib.rs b/milli/src/lib.rs index 8008b7bd1..bb8325791 100644 --- a/milli/src/lib.rs +++ b/milli/src/lib.rs @@ -214,7 +214,7 @@ pub fn bucketed_position(relative: u16) -> u16 { pub fn obkv_to_json( displayed_fields: &[FieldId], fields_ids_map: &FieldsIdsMap, - obkv: obkv::KvReaderU16<'_>, + obkv: &obkv::KvReaderU16, ) -> Result { displayed_fields .iter() @@ -232,10 +232,7 @@ pub fn obkv_to_json( } /// Transform every field of a raw obkv store into a JSON Object. -pub fn all_obkv_to_json( - obkv: obkv::KvReaderU16<'_>, - fields_ids_map: &FieldsIdsMap, -) -> Result { +pub fn all_obkv_to_json(obkv: &obkv::KvReaderU16, fields_ids_map: &FieldsIdsMap) -> Result { let all_keys = obkv.iter().map(|(k, _v)| k).collect::>(); obkv_to_json(all_keys.as_slice(), fields_ids_map, obkv) } diff --git a/milli/src/prompt/document.rs b/milli/src/prompt/document.rs index b5d43b5be..a809f58ce 100644 --- a/milli/src/prompt/document.rs +++ b/milli/src/prompt/document.rs @@ -30,13 +30,13 @@ impl ParsedValue { impl<'a> Document<'a> { pub fn new( - data: obkv::KvReaderU16<'a>, + data: &'a obkv::KvReaderU16, side: DelAdd, inverted_field_map: &'a FieldsIdsMap, ) -> Self { let mut out_data = BTreeMap::new(); for (fid, raw) in data { - let obkv = KvReaderDelAdd::new(raw); + let obkv = KvReaderDelAdd::from_slice(raw); let Some(raw) = obkv.get(side) else { continue; }; diff --git a/milli/src/prompt/mod.rs b/milli/src/prompt/mod.rs index 97ccbfb61..79e4eabbb 100644 --- a/milli/src/prompt/mod.rs +++ b/milli/src/prompt/mod.rs @@ -91,7 +91,7 @@ impl Prompt { pub fn render( &self, - document: obkv::KvReaderU16<'_>, + document: &obkv::KvReaderU16, side: DelAdd, field_id_map: &FieldsIdsMap, ) -> Result { diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs index 790cdd028..97ff86f2a 100644 --- a/milli/src/update/del_add.rs +++ b/milli/src/update/del_add.rs @@ -1,7 +1,7 @@ use obkv::Key; pub type KvWriterDelAdd = obkv::KvWriter; -pub type KvReaderDelAdd<'a> = obkv::KvReader<'a, DelAdd>; +pub type KvReaderDelAdd = obkv::KvReader; /// DelAdd defines the new value to add in the database and old value to delete from the database. /// @@ -30,31 +30,13 @@ impl Key for DelAdd { } } -// TODO remove this implementation -impl obkv2::Key for DelAdd { - const BYTES_SIZE: usize = std::mem::size_of::(); - type BYTES = [u8; ::BYTES_SIZE]; - - fn to_be_bytes(&self) -> Self::BYTES { - u8::to_be_bytes(*self as u8) - } - - fn from_be_bytes(array: Self::BYTES) -> Self { - match u8::from_be_bytes(array) { - 0 => Self::Deletion, - 1 => Self::Addition, - otherwise => unreachable!("DelAdd has only 2 variants, unknown variant: {}", otherwise), - } - } -} - /// Creates a Kv> from Kv /// /// Deletion: put all the values under DelAdd::Deletion /// Addition: put all the values under DelAdd::Addition, /// DeletionAndAddition: put all the values under DelAdd::Deletion and DelAdd::Addition, pub fn into_del_add_obkv( - reader: obkv::KvReader<'_, K>, + reader: &obkv::KvReader, operation: DelAddOperation, buffer: &mut Vec, ) -> Result<(), std::io::Error> { @@ -64,7 +46,7 @@ pub fn into_del_add_obkv( /// Akin to the [into_del_add_obkv] function but lets you /// conditionally define the `DelAdd` variant based on the obkv key. pub fn into_del_add_obkv_conditional_operation( - reader: obkv::KvReader<'_, K>, + reader: &obkv::KvReader, buffer: &mut Vec, operation: F, ) -> std::io::Result<()> @@ -104,8 +86,8 @@ pub enum DelAddOperation { /// putting each deletion obkv's keys under an DelAdd::Deletion /// and putting each addition obkv's keys under an DelAdd::Addition pub fn del_add_from_two_obkvs( - deletion: &obkv::KvReader<'_, K>, - addition: &obkv::KvReader<'_, K>, + deletion: &obkv::KvReader, + addition: &obkv::KvReader, buffer: &mut Vec, ) -> Result<(), std::io::Error> { use itertools::merge_join_by; @@ -139,7 +121,7 @@ pub fn del_add_from_two_obkvs( writer.finish() } -pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd<'_>) -> bool { +pub fn is_noop_del_add_obkv(del_add: &KvReaderDelAdd) -> bool { del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition) } @@ -154,5 +136,5 @@ pub fn deladd_serialize_add_side<'a>( obkv: &'a [u8], _buffer: &mut Vec, ) -> crate::Result<&'a [u8]> { - Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default()) + Ok(KvReaderDelAdd::from_slice(obkv).get(DelAdd::Addition).unwrap_or_default()) } diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index a63d59693..27de6e777 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -135,7 +135,7 @@ impl FacetsUpdateBulkInner { if !valid_lmdb_key(key) { continue; } - let value = KvReaderDelAdd::new(value); + let value = KvReaderDelAdd::from_slice(value); // DB is empty, it is safe to ignore Del operations let Some(value) = value.get(DelAdd::Addition) else { @@ -161,7 +161,7 @@ impl FacetsUpdateBulkInner { continue; } - let value = KvReaderDelAdd::new(value); + let value = KvReaderDelAdd::from_slice(value); // the value is a CboRoaringBitmap, but I still need to prepend the // group size for level 0 (= 1) to it diff --git a/milli/src/update/facet/incremental.rs b/milli/src/update/facet/incremental.rs index 0f0937855..637f84986 100644 --- a/milli/src/update/facet/incremental.rs +++ b/milli/src/update/facet/incremental.rs @@ -109,7 +109,7 @@ impl FacetsUpdateIncremental { } current_field_id = Some(key.field_id); - let value = KvReader::new(value); + let value = KvReader::from_slice(value); let docids_to_delete = value .get(DelAdd::Deletion) .map(CboRoaringBitmapCodec::bytes_decode) diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index ad3ddc38f..bccfdff12 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -187,7 +187,7 @@ fn index_facet_search( ) -> Result<()> { let mut iter = normalized_delta_data.into_stream_merger_iter()?; while let Some((key_bytes, delta_bytes)) = iter.next()? { - let deladd_reader = KvReaderDelAdd::new(delta_bytes); + let deladd_reader = KvReaderDelAdd::from_slice(delta_bytes); let database_set = index .facet_id_normalized_string_strings diff --git a/milli/src/update/index_documents/enrich.rs b/milli/src/update/index_documents/enrich.rs index 691b2b9d1..a93d6f9f1 100644 --- a/milli/src/update/index_documents/enrich.rs +++ b/milli/src/update/index_documents/enrich.rs @@ -145,7 +145,7 @@ pub fn enrich_documents_batch( #[tracing::instrument(level = "trace", skip(uuid_buffer, documents_batch_index, document) target = "indexing::documents")] fn fetch_or_generate_document_id( - document: &obkv::KvReader<'_, FieldId>, + document: &obkv::KvReader, documents_batch_index: &DocumentsBatchIndex, primary_key: PrimaryKey<'_>, autogenerate_docids: bool, diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index ba11ceeb3..a939827d5 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -80,7 +80,7 @@ pub fn extract_docid_word_positions( .try_into() .map(u32::from_be_bytes) .map_err(|_| SerializationError::InvalidNumberSerialization)?; - let obkv = KvReader::::new(value); + let obkv = KvReader::::from_slice(value); // if the searchable fields didn't change, skip the searchable indexing for this document. if !force_reindexing && !searchable_fields_changed(&obkv, settings_diff) { @@ -126,13 +126,13 @@ pub fn extract_docid_word_positions( // transforming two KV> into one KV>> value_buffer.clear(); del_add_from_two_obkvs( - &KvReader::::new(del_obkv), - &KvReader::::new(add_obkv), + &KvReader::::from_slice(del_obkv), + &KvReader::::from_slice(add_obkv), &mut value_buffer, )?; // write each KV> into the sorter, field by field. - let obkv = KvReader::::new(&value_buffer); + let obkv = KvReader::::from_slice(&value_buffer); for (field_id, value) in obkv.iter() { key_buffer.truncate(mem::size_of::()); key_buffer.extend_from_slice(&field_id.to_be_bytes()); @@ -146,13 +146,13 @@ pub fn extract_docid_word_positions( /// Check if any searchable fields of a document changed. fn searchable_fields_changed( - obkv: &KvReader<'_, FieldId>, + obkv: &KvReader, settings_diff: &InnerIndexSettingsDiff, ) -> bool { let searchable_fields = &settings_diff.new.searchable_fields_ids; for (field_id, field_bytes) in obkv.iter() { if searchable_fields.contains(&field_id) { - let del_add = KvReaderDelAdd::new(field_bytes); + let del_add = KvReaderDelAdd::from_slice(field_bytes); match (del_add.get(DelAdd::Deletion), del_add.get(DelAdd::Addition)) { // if both fields are None, check the next field. (None, None) => (), @@ -189,7 +189,7 @@ fn tokenizer_builder<'a>( /// Extract words mapped with their positions of a document. fn tokens_from_document<'a>( - obkv: &KvReader<'a, FieldId>, + obkv: &'a KvReader, settings: &InnerIndexSettings, tokenizer: &Tokenizer<'_>, max_positions_per_attributes: u32, @@ -202,7 +202,7 @@ fn tokens_from_document<'a>( // if field is searchable. if settings.searchable_fields_ids.contains(&field_id) { // extract deletion or addition only. - if let Some(field_bytes) = KvReaderDelAdd::new(field_bytes).get(del_add) { + if let Some(field_bytes) = KvReaderDelAdd::from_slice(field_bytes).get(del_add) { // parse json. let value = serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index bfd769604..478631dea 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -45,7 +45,7 @@ pub fn extract_facet_number_docids( buffer.clear(); let mut obkv = KvWriterDelAdd::new(&mut buffer); - for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() { + for (deladd_key, _) in KvReaderDelAdd::from_slice(deladd_obkv_bytes).iter() { obkv.insert(deladd_key, document_id.to_ne_bytes())?; } obkv.finish()?; diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 36dd20b15..7565b1ad1 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -75,7 +75,7 @@ fn extract_facet_string_docids_document_update( let mut buffer = Vec::new(); let mut cursor = docid_fid_facet_string.into_cursor()?; while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? { - let deladd_reader = KvReaderDelAdd::new(deladd_original_value_bytes); + let deladd_reader = KvReaderDelAdd::from_slice(deladd_original_value_bytes); let is_same_value = deladd_reader.get(DelAdd::Deletion).is_some() && deladd_reader.get(DelAdd::Addition).is_some(); @@ -163,7 +163,7 @@ fn extract_facet_string_docids_settings( let mut buffer = Vec::new(); let mut cursor = docid_fid_facet_string.into_cursor()?; while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? { - let deladd_reader = KvReaderDelAdd::new(deladd_original_value_bytes); + let deladd_reader = KvReaderDelAdd::from_slice(deladd_original_value_bytes); let is_same_value = deladd_reader.get(DelAdd::Deletion).is_some() && deladd_reader.get(DelAdd::Addition).is_some(); diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 93c6ab408..7678e1edf 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -83,10 +83,10 @@ pub fn extract_fid_docid_facet_values( if !settings_diff.settings_update_only || old_faceted_fids != new_faceted_fids { let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { - let obkv = obkv::KvReader::new(value); + let obkv = obkv::KvReader::from_slice(value); let get_document_json_value = move |field_id, side| { obkv.get(field_id) - .map(KvReaderDelAdd::new) + .map(KvReaderDelAdd::from_slice) .and_then(|kv| kv.get(side)) .map(from_slice) .transpose() diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index f252df1cd..291dcc014 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -45,19 +45,23 @@ pub fn extract_fid_word_count_docids( .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); - let del_add_reader = KvReaderDelAdd::new(value); + let del_add_reader = KvReaderDelAdd::from_slice(value); let deletion = del_add_reader // get deleted words .get(DelAdd::Deletion) // count deleted words - .map(|deletion| KvReaderU16::new(deletion).iter().take(MAX_COUNTED_WORDS + 1).count()) + .map(|deletion| { + KvReaderU16::from_slice(deletion).iter().take(MAX_COUNTED_WORDS + 1).count() + }) // keep the count if under or equal to MAX_COUNTED_WORDS .filter(|&word_count| word_count <= MAX_COUNTED_WORDS); let addition = del_add_reader // get added words .get(DelAdd::Addition) // count added words - .map(|addition| KvReaderU16::new(addition).iter().take(MAX_COUNTED_WORDS + 1).count()) + .map(|addition| { + KvReaderU16::from_slice(addition).iter().take(MAX_COUNTED_WORDS + 1).count() + }) // keep the count if under or equal to MAX_COUNTED_WORDS .filter(|&word_count| word_count <= MAX_COUNTED_WORDS); diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs index ac8b7abee..fcf102eeb 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -29,11 +29,11 @@ pub fn extract_geo_points( let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { - let obkv = obkv::KvReader::new(value); + let obkv = obkv::KvReader::from_slice(value); // since we only need the primary key when we throw an error // we create this getter to lazily get it when needed let document_id = || -> Value { - let reader = KvReaderDelAdd::new(obkv.get(primary_key_id).unwrap()); + let reader = KvReaderDelAdd::from_slice(obkv.get(primary_key_id).unwrap()); let document_id = reader.get(DelAdd::Deletion).or(reader.get(DelAdd::Addition)).unwrap(); serde_json::from_slice(document_id).unwrap() @@ -68,15 +68,17 @@ pub fn extract_geo_points( /// Extract the finite floats lat and lng from two bytes slices. fn extract_lat_lng( - document: &obkv::KvReader<'_, FieldId>, + document: &obkv::KvReader, settings: &InnerIndexSettings, deladd: DelAdd, document_id: impl Fn() -> Value, ) -> Result> { match settings.geo_fields_ids { Some((lat_fid, lng_fid)) => { - let lat = document.get(lat_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd)); - let lng = document.get(lng_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd)); + let lat = + document.get(lat_fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd)); + let lng = + document.get(lng_fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd)); let (lat, lng) = match (lat, lng) { (Some(lat), Some(lng)) => (lat, lng), (Some(_), None) => { diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index f66c3fd46..6de555e4a 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -307,7 +307,7 @@ pub fn extract_vector_points( debug_assert!(from_utf8(external_id_bytes).is_ok()); let docid = DocumentId::from_be_bytes(docid_bytes); - let obkv = obkv::KvReader::new(value); + let obkv = obkv::KvReader::from_slice(value); key_buffer.clear(); key_buffer.extend_from_slice(docid_bytes.as_slice()); @@ -475,7 +475,7 @@ pub fn extract_vector_points( #[allow(clippy::too_many_arguments)] // feel free to find efficient way to factor arguments fn extract_vector_document_diff( docid: DocumentId, - obkv: obkv::KvReader<'_, FieldId>, + obkv: &obkv::KvReader, prompt: &Prompt, (add_to_user_provided, remove_from_user_provided): (&mut RoaringBitmap, &mut RoaringBitmap), (old, new): (VectorState, VectorState), @@ -517,7 +517,7 @@ fn extract_vector_document_diff( // Do we keep this document? let document_is_kept = obkv .iter() - .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) + .map(|(_, deladd)| KvReaderDelAdd::from_slice(deladd)) .any(|deladd| deladd.get(DelAdd::Addition).is_some()); if document_is_kept { @@ -553,7 +553,7 @@ fn extract_vector_document_diff( // Do we keep this document? let document_is_kept = obkv .iter() - .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) + .map(|(_, deladd)| KvReaderDelAdd::from_slice(deladd)) .any(|deladd| deladd.get(DelAdd::Addition).is_some()); if document_is_kept { if embedder_is_manual { @@ -579,7 +579,7 @@ fn extract_vector_document_diff( // Do we keep this document? let document_is_kept = obkv .iter() - .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) + .map(|(_, deladd)| KvReaderDelAdd::from_slice(deladd)) .any(|deladd| deladd.get(DelAdd::Addition).is_some()); if document_is_kept { // if the new version of documents has the vectors in the DB, @@ -597,7 +597,7 @@ fn extract_vector_document_diff( } fn regenerate_if_prompt_changed( - obkv: obkv::KvReader<'_, FieldId>, + obkv: &obkv::KvReader, (old_prompt, new_prompt): (&Prompt, &Prompt), (old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap), ) -> Result { @@ -612,7 +612,7 @@ fn regenerate_if_prompt_changed( } fn regenerate_prompt( - obkv: obkv::KvReader<'_, FieldId>, + obkv: &obkv::KvReader, prompt: &Prompt, new_fields_ids_map: &FieldsIdsMap, ) -> Result { diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 457d2359e..a14f39e01 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -58,17 +58,17 @@ pub fn extract_word_docids( let document_id = u32::from_be_bytes(document_id_bytes); let fid = u16::from_be_bytes(fid_bytes); - let del_add_reader = KvReaderDelAdd::new(value); + let del_add_reader = KvReaderDelAdd::from_slice(value); // extract all unique words to remove. if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) { - for (_pos, word) in KvReaderU16::new(deletion).iter() { + for (_pos, word) in KvReaderU16::from_slice(deletion).iter() { del_words.insert(word.to_vec()); } } // extract all unique additional words. if let Some(addition) = del_add_reader.get(DelAdd::Addition) { - for (_pos, word) in KvReaderU16::new(addition).iter() { + for (_pos, word) in KvReaderU16::from_slice(addition).iter() { add_words.insert(word.to_vec()); } } @@ -115,7 +115,7 @@ pub fn extract_word_docids( // NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters. while let Some((key, value)) = iter.next()? { // only keep the value if their is a change to apply in the DB. - if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) { + if !is_noop_del_add_obkv(KvReaderDelAdd::from_slice(value)) { word_fid_docids_writer.insert(key, value)?; } @@ -123,7 +123,7 @@ pub fn extract_word_docids( .map_err(|_| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; // merge all deletions - let obkv = KvReaderDelAdd::new(value); + let obkv = KvReaderDelAdd::from_slice(value); if let Some(value) = obkv.get(DelAdd::Deletion) { let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid); buffer.clear(); diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 5a9363942..01344563f 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -92,8 +92,8 @@ pub fn extract_word_pair_proximity_docids( } // deletions - if let Some(deletion) = KvReaderDelAdd::new(value).get(DelAdd::Deletion) { - for (position, word) in KvReaderU16::new(deletion).iter() { + if let Some(deletion) = KvReaderDelAdd::from_slice(value).get(DelAdd::Deletion) { + for (position, word) in KvReaderU16::from_slice(deletion).iter() { // drain the proximity window until the head word is considered close to the word we are inserting. while del_word_positions.front().map_or(false, |(_w, p)| { index_proximity(*p as u32, position as u32) >= MAX_DISTANCE @@ -125,8 +125,8 @@ pub fn extract_word_pair_proximity_docids( } // additions - if let Some(addition) = KvReaderDelAdd::new(value).get(DelAdd::Addition) { - for (position, word) in KvReaderU16::new(addition).iter() { + if let Some(addition) = KvReaderDelAdd::from_slice(value).get(DelAdd::Addition) { + for (position, word) in KvReaderU16::from_slice(addition).iter() { // drain the proximity window until the head word is considered close to the word we are inserting. while add_word_positions.front().map_or(false, |(_w, p)| { index_proximity(*p as u32, position as u32) >= MAX_DISTANCE diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 50b1617f9..7f14d6075 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -60,10 +60,10 @@ pub fn extract_word_position_docids( current_document_id = Some(document_id); - let del_add_reader = KvReaderDelAdd::new(value); + let del_add_reader = KvReaderDelAdd::from_slice(value); // extract all unique words to remove. if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) { - for (position, word_bytes) in KvReaderU16::new(deletion).iter() { + for (position, word_bytes) in KvReaderU16::from_slice(deletion).iter() { let position = bucketed_position(position); del_word_positions.insert((position, word_bytes.to_vec())); } @@ -71,7 +71,7 @@ pub fn extract_word_position_docids( // extract all unique additional words. if let Some(addition) = del_add_reader.get(DelAdd::Addition) { - for (position, word_bytes) in KvReaderU16::new(addition).iter() { + for (position, word_bytes) in KvReaderU16::from_slice(addition).iter() { let position = bucketed_position(position); add_word_positions.insert((position, word_bytes.to_vec())); } diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 42784048a..51fa4e086 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -45,8 +45,8 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result, - update: obkv::KvReaderU16<'_>, + base: &obkv::KvReaderU16, + update: &obkv::KvReaderU16, merge_additions: bool, buffer: &mut Vec, ) { @@ -66,7 +66,7 @@ pub fn merge_two_del_add_obkvs( // If merge_additions is false, recreate an obkv keeping the deletions only. value_buffer.clear(); let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); - let base_reader = KvReaderDelAdd::new(v); + let base_reader = KvReaderDelAdd::from_slice(v); if let Some(deletion) = base_reader.get(DelAdd::Deletion) { value_writer.insert(DelAdd::Deletion, deletion).unwrap(); @@ -80,8 +80,8 @@ pub fn merge_two_del_add_obkvs( // merge deletions and additions. value_buffer.clear(); let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); - let base_reader = KvReaderDelAdd::new(base); - let update_reader = KvReaderDelAdd::new(update); + let base_reader = KvReaderDelAdd::from_slice(base); + let update_reader = KvReaderDelAdd::from_slice(update); // keep newest deletion. if let Some(deletion) = update_reader @@ -131,8 +131,8 @@ fn inner_merge_del_add_obkvs<'a>( break; } - let newest = obkv::KvReader::new(&acc); - let oldest = obkv::KvReader::new(¤t[1..]); + let newest = obkv::KvReader::from_slice(&acc); + let oldest = obkv::KvReader::from_slice(¤t[1..]); merge_two_del_add_obkvs(oldest, newest, merge_additions, &mut buffer); // we want the result of the merge into our accumulator. @@ -187,7 +187,7 @@ pub fn merge_deladd_cbo_roaring_bitmaps<'a>( let mut del_bitmaps_bytes = Vec::new(); let mut add_bitmaps_bytes = Vec::new(); for value in values { - let obkv = KvReaderDelAdd::new(value); + let obkv = KvReaderDelAdd::from_slice(value); if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) { del_bitmaps_bytes.push(bitmap_bytes); } @@ -217,7 +217,7 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>( buffer: &'a mut Vec, ) -> Result> { Ok(CboRoaringBitmapCodec::merge_deladd_into( - KvReaderDelAdd::new(deladd_obkv), + KvReaderDelAdd::from_slice(deladd_obkv), previous, buffer, )?) @@ -236,7 +236,7 @@ pub fn merge_deladd_btreeset_string<'a>( let mut del_set = BTreeSet::new(); let mut add_set = BTreeSet::new(); for value in values { - let obkv = KvReaderDelAdd::new(value); + let obkv = KvReaderDelAdd::from_slice(value); if let Some(bytes) = obkv.get(DelAdd::Deletion) { let set = serde_json::from_slice::>(bytes).unwrap(); for value in set { diff --git a/milli/src/update/index_documents/parallel.rs b/milli/src/update/index_documents/parallel.rs index 52e72a378..2f6bf9caf 100644 --- a/milli/src/update/index_documents/parallel.rs +++ b/milli/src/update/index_documents/parallel.rs @@ -31,14 +31,14 @@ impl<'t> ImmutableObkvs<'t> { } /// Returns the OBKVs identified by the given ID. - pub fn obkv(&self, docid: DocumentId) -> heed::Result>> { + pub fn obkv(&self, docid: DocumentId) -> heed::Result> { match self .ids .rank(docid) .checked_sub(1) .and_then(|offset| self.slices.get(offset as usize)) { - Some(bytes) => Ok(Some(KvReaderU16::new(bytes))), + Some(&bytes) => Ok(Some(bytes.into())), None => Ok(None), } } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 73fa3ca7b..b9541e649 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -278,13 +278,13 @@ impl<'a, 'i> Transform<'a, 'i> { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( - KvReaderU16::new(base_obkv), + KvReaderU16::from_slice(base_obkv), deladd_operation, &mut document_sorter_value_buffer, )?; self.original_sorter .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - let base_obkv = KvReader::new(base_obkv); + let base_obkv = KvReader::from_slice(base_obkv); if let Some(flattened_obkv) = Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)? { @@ -292,7 +292,7 @@ impl<'a, 'i> Transform<'a, 'i> { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), + KvReaderU16::from_slice(&flattened_obkv), deladd_operation, &mut document_sorter_value_buffer, )?; @@ -311,7 +311,7 @@ impl<'a, 'i> Transform<'a, 'i> { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( - KvReaderU16::new(&obkv_buffer), + KvReaderU16::from_slice(&obkv_buffer), DelAddOperation::Addition, &mut document_sorter_value_buffer, )?; @@ -319,14 +319,14 @@ impl<'a, 'i> Transform<'a, 'i> { self.original_sorter .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - let flattened_obkv = KvReader::new(&obkv_buffer); + let flattened_obkv = KvReader::from_slice(&obkv_buffer); if let Some(obkv) = Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( - KvReaderU16::new(&obkv), + KvReaderU16::from_slice(&obkv), DelAddOperation::Addition, &mut document_sorter_value_buffer, )? @@ -519,14 +519,14 @@ impl<'a, 'i> Transform<'a, 'i> { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Deletion as u8); into_del_add_obkv( - KvReaderU16::new(base_obkv), + KvReaderU16::from_slice(base_obkv), DelAddOperation::Deletion, document_sorter_value_buffer, )?; self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; // flatten it and push it as to delete in the flattened_sorter - let flattened_obkv = KvReader::new(base_obkv); + let flattened_obkv = KvReader::from_slice(base_obkv); if let Some(obkv) = Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? { @@ -534,7 +534,7 @@ impl<'a, 'i> Transform<'a, 'i> { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Deletion as u8); into_del_add_obkv( - KvReaderU16::new(&obkv), + KvReaderU16::from_slice(&obkv), DelAddOperation::Deletion, document_sorter_value_buffer, )?; @@ -552,7 +552,7 @@ impl<'a, 'i> Transform<'a, 'i> { target = "indexing::transform" )] fn flatten_from_fields_ids_map( - obkv: &KvReader<'_, FieldId>, + obkv: &KvReader, fields_ids_map: &mut FieldsIdsMap, ) -> Result>> { if obkv @@ -720,10 +720,10 @@ impl<'a, 'i> Transform<'a, 'i> { total_documents: self.documents_count, }); - for (key, value) in KvReader::new(val) { - let reader = KvReaderDelAdd::new(value); + for (key, value) in KvReader::from_slice(val) { + let reader = KvReaderDelAdd::from_slice(value); match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { - (None, None) => {} + (None, None) => (), (None, Some(_)) => { // New field let name = self.fields_ids_map.name(key).ok_or( @@ -837,7 +837,7 @@ impl<'a, 'i> Transform<'a, 'i> { /// then fill the provided buffers with delta documents using KvWritterDelAdd. #[allow(clippy::too_many_arguments)] // need the vectors + fid, feel free to create a struct xo xo fn rebind_existing_document( - old_obkv: KvReader<'_, FieldId>, + old_obkv: &KvReader, settings_diff: &InnerIndexSettingsDiff, modified_faceted_fields: &HashSet, mut injected_vectors: serde_json::Map, @@ -925,7 +925,7 @@ impl<'a, 'i> Transform<'a, 'i> { } let data = obkv_writer.into_inner()?; - let obkv = KvReader::::new(&data); + let obkv = KvReader::::from_slice(&data); if let Some(original_obkv_buffer) = original_obkv_buffer { original_obkv_buffer.clear(); @@ -936,7 +936,7 @@ impl<'a, 'i> Transform<'a, 'i> { // take the non-flattened version if flatten_from_fields_ids_map returns None. let mut fields_ids_map = settings_diff.new.fields_ids_map.clone(); let flattened = Self::flatten_from_fields_ids_map(&obkv, &mut fields_ids_map)?; - let flattened = flattened.as_deref().map_or(obkv, KvReader::new); + let flattened = flattened.as_deref().map_or(obkv, KvReader::from_slice); flattened_obkv_buffer.clear(); into_del_add_obkv_conditional_operation(flattened, flattened_obkv_buffer, |id| { @@ -1173,21 +1173,21 @@ mod test { kv_writer.insert(0_u8, [0]).unwrap(); let buffer = kv_writer.into_inner().unwrap(); into_del_add_obkv( - KvReaderU16::new(&buffer), + KvReaderU16::from_slice(&buffer), DelAddOperation::Addition, &mut additive_doc_0, ) .unwrap(); additive_doc_0.insert(0, Operation::Addition as u8); into_del_add_obkv( - KvReaderU16::new(&buffer), + KvReaderU16::from_slice(&buffer), DelAddOperation::Deletion, &mut deletive_doc_0, ) .unwrap(); deletive_doc_0.insert(0, Operation::Deletion as u8); into_del_add_obkv( - KvReaderU16::new(&buffer), + KvReaderU16::from_slice(&buffer), DelAddOperation::DeletionAndAddition, &mut del_add_doc_0, ) @@ -1199,7 +1199,7 @@ mod test { kv_writer.insert(1_u8, [1]).unwrap(); let buffer = kv_writer.into_inner().unwrap(); into_del_add_obkv( - KvReaderU16::new(&buffer), + KvReaderU16::from_slice(&buffer), DelAddOperation::Addition, &mut additive_doc_1, ) @@ -1212,7 +1212,7 @@ mod test { kv_writer.insert(1_u8, [1]).unwrap(); let buffer = kv_writer.into_inner().unwrap(); into_del_add_obkv( - KvReaderU16::new(&buffer), + KvReaderU16::from_slice(&buffer), DelAddOperation::Addition, &mut additive_doc_0_1, ) diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 9de95778b..9fe152348 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -162,7 +162,7 @@ pub(crate) fn write_typed_chunk_into_index( let mut vectors_buffer = Vec::new(); while let Some((key, reader)) = iter.next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); - let reader: KvReader<'_, FieldId> = KvReader::new(reader); + let reader: &KvReader = reader.into(); let (document_id_bytes, external_id_bytes) = try_split_array_at(key) .ok_or(SerializationError::Decoding { db_name: Some(DOCUMENTS) })?; @@ -170,7 +170,7 @@ pub(crate) fn write_typed_chunk_into_index( let external_id = std::str::from_utf8(external_id_bytes)?; for (field_id, value) in reader.iter() { - let del_add_reader = KvReaderDelAdd::new(value); + let del_add_reader = KvReaderDelAdd::from_slice(value); if let Some(addition) = del_add_reader.get(DelAdd::Addition) { let addition = if vectors_fid == Some(field_id) { @@ -529,7 +529,7 @@ pub(crate) fn write_typed_chunk_into_index( index.field_id_docid_facet_f64s.remap_types::(); let mut iter = merger.into_stream_merger_iter()?; while let Some((key, value)) = iter.next()? { - let reader = KvReaderDelAdd::new(value); + let reader = KvReaderDelAdd::from_slice(value); if valid_lmdb_key(key) { match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { (None, None) => {} @@ -563,7 +563,7 @@ pub(crate) fn write_typed_chunk_into_index( index.field_id_docid_facet_strings.remap_types::(); let mut iter = merger.into_stream_merger_iter()?; while let Some((key, value)) = iter.next()? { - let reader = KvReaderDelAdd::new(value); + let reader = KvReaderDelAdd::from_slice(value); if valid_lmdb_key(key) { match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { (None, None) => {} @@ -600,7 +600,7 @@ pub(crate) fn write_typed_chunk_into_index( // convert the key back to a u32 (4 bytes) let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - let deladd_obkv = KvReaderDelAdd::new(value); + let deladd_obkv = KvReaderDelAdd::from_slice(value); if let Some(value) = deladd_obkv.get(DelAdd::Deletion) { let geopoint = extract_geo_point(value, docid); rtree.remove(&geopoint); @@ -723,7 +723,7 @@ pub(crate) fn write_typed_chunk_into_index( let (left, _index) = try_split_array_at(key).unwrap(); let docid = DocumentId::from_be_bytes(left); - let vector_deladd_obkv = KvReaderDelAdd::new(value); + let vector_deladd_obkv = KvReaderDelAdd::from_slice(value); if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) { let vector: Vec = pod_collect_to_vec(value); @@ -852,7 +852,7 @@ where if valid_lmdb_key(key) { let (proximity_to_insert, word1, word2) = U8StrStrCodec::bytes_decode(key).map_err(heed::Error::Decoding)?; - let data_to_insert = match KvReaderDelAdd::new(value).get(DelAdd::Addition) { + let data_to_insert = match KvReaderDelAdd::from_slice(value).get(DelAdd::Addition) { Some(value) => { CboRoaringBitmapCodec::bytes_decode(value).map_err(heed::Error::Decoding)? } diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 6780be72e..15239aa3e 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,4 +1,3 @@ -use core::slice::SlicePattern; use std::fs::File; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; @@ -44,11 +43,11 @@ impl KeyValueEntry { } pub fn key(&self) -> &[u8] { - &self.data.as_slice()[..self.key_length] + &self.data.as_ref()[..self.key_length] } pub fn value(&self) -> &[u8] { - &self.data.as_slice()[self.key_length..] + &self.data.as_ref()[self.key_length..] } } diff --git a/milli/src/update/new/document_change.rs b/milli/src/update/new/document_change.rs index 311e22404..1764b6ee7 100644 --- a/milli/src/update/new/document_change.rs +++ b/milli/src/update/new/document_change.rs @@ -1,5 +1,5 @@ use heed::RoTxn; -use obkv2::KvReader; +use obkv::KvReader; use super::indexer::KvReaderFieldId; use crate::{DocumentId, FieldId}; diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index da76bdfee..92fcd6b0c 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -4,7 +4,8 @@ mod channel; mod items_pool; mod merge; -mod global_fields_ids_map; +/// TODO remove this +// mod global_fields_ids_map; pub type StdResult = std::result::Result; @@ -27,8 +28,7 @@ mod indexer { use super::channel::{ extractors_merger_channels, merger_writer_channels, EntryOperation, - ExtractorsMergerChannels, MergerReceiver, MergerSender, MergerWriterChannels, - WriterOperation, + ExtractorsMergerChannels, MergerReceiver, MergerSender, WriterOperation, }; use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::items_pool::ItemsPool; @@ -44,10 +44,10 @@ mod indexer { Result, UserError, }; - pub type KvReaderFieldId = obkv2::KvReader; - pub type KvReaderDelAdd = obkv2::KvReader; - pub type KvWriterFieldId = obkv2::KvWriter; - pub type KvWriterDelAdd = obkv2::KvWriter; + pub type KvReaderFieldId = obkv::KvReader; + pub type KvReaderDelAdd = obkv::KvReader; + pub type KvWriterFieldId = obkv::KvWriter; + pub type KvWriterDelAdd = obkv::KvWriter; pub struct DocumentOperationIndexer { operations: Vec, @@ -105,7 +105,7 @@ mod indexer { rtxn: &'a RoTxn, mut fields_ids_map: FieldsIdsMap, primary_key: &'a PrimaryKey<'a>, - ) -> Result + 'a> { + ) -> Result>> + 'a> { let documents_ids = index.documents_ids(rtxn)?; let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids); let mut docids_version_offsets = HashMap::::new(); @@ -198,7 +198,7 @@ mod indexer { } let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); - docids_version_offsets.into_par_iter().map_with( + Ok(docids_version_offsets.into_par_iter().map_with( items, |context_pool, (external_docid, (internal_docid, operations))| { context_pool.with(|rtxn| match self.method { @@ -221,58 +221,7 @@ mod indexer { ), }) }, - ); - - Ok(vec![].into_par_iter()) - - // let mut file_count: usize = 0; - // for result in WalkDir::new(update_files_path) - // // TODO handle errors - // .sort_by_key(|entry| entry.metadata().unwrap().created().unwrap()) - // { - // let entry = result?; - // if !entry.file_type().is_file() { - // continue; - // } - - // let file = File::open(entry.path()) - // .with_context(|| format!("While opening {}", entry.path().display()))?; - // let content = unsafe { - // Mmap::map(&file) - // .map(Arc::new) - // .with_context(|| format!("While memory mapping {}", entry.path().display()))? - // }; - - // let reader = - // crate::documents::DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; - // let (mut batch_cursor, batch_index) = reader.into_cursor_and_fields_index(); - // batch_index.iter().for_each(|(_, name)| { - // fields_ids_map.insert(name); - // }); - // let mut offset: u32 = 0; - // while let Some(document) = batch_cursor.next_document()? { - // let primary_key = batch_index.id(primary_key).unwrap(); - // let document_id = document.get(primary_key).unwrap(); - // let document_id = std::str::from_utf8(document_id).unwrap(); - - // let document_offset = DocumentOffset { content: content.clone(), offset }; - // match docids_version_offsets.get_mut(document_id) { - // None => { - // let docid = match maindb.external_documents_ids.get(rtxn, document_id)? { - // Some(docid) => docid, - // None => sequential_docids.next().context("no more available docids")?, - // }; - // docids_version_offsets - // .insert(document_id.into(), (docid, smallvec![document_offset])); - // } - // Some((_, offsets)) => offsets.push(document_offset), - // } - // offset += 1; - // p.inc(1); - // } - - // file_count += 1; - // } + )) } } diff --git a/milli/src/vector/parsed_vectors.rs b/milli/src/vector/parsed_vectors.rs index 9dbf025e6..8e5ccf690 100644 --- a/milli/src/vector/parsed_vectors.rs +++ b/milli/src/vector/parsed_vectors.rs @@ -109,14 +109,13 @@ impl ParsedVectorsDiff { pub fn new( docid: DocumentId, embedders_configs: &[IndexEmbeddingConfig], - documents_diff: KvReader<'_, FieldId>, + documents_diff: &KvReader, old_vectors_fid: Option, new_vectors_fid: Option, ) -> Result { let mut old = match old_vectors_fid .and_then(|vectors_fid| documents_diff.get(vectors_fid)) - .map(KvReaderDelAdd::new) - .map(|obkv| to_vector_map(obkv, DelAdd::Deletion)) + .map(|bytes| to_vector_map(bytes.into(), DelAdd::Deletion)) .transpose() { Ok(del) => del, @@ -143,8 +142,7 @@ impl ParsedVectorsDiff { let Some(bytes) = documents_diff.get(new_vectors_fid) else { break 'new VectorsState::NoVectorsFieldInDocument; }; - let obkv = KvReaderDelAdd::new(bytes); - match to_vector_map(obkv, DelAdd::Addition)? { + match to_vector_map(bytes.into(), DelAdd::Addition)? { Some(new) => VectorsState::Vectors(new), None => VectorsState::NoVectorsFieldInDocument, } @@ -239,7 +237,7 @@ impl Error { } fn to_vector_map( - obkv: KvReaderDelAdd<'_>, + obkv: &KvReaderDelAdd, side: DelAdd, ) -> Result>, Error> { Ok(if let Some(value) = obkv.get(side) {