From 794ebcd5826008f42867986e7589777af5fff83b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Fri, 30 Aug 2024 11:49:47 +0200 Subject: [PATCH] Replace grenad with the new grenad various-improvement branch --- Cargo.lock | 19 +- index-scheduler/src/batch.rs | 6 +- meilisearch/src/search/mod.rs | 2 +- meilitool/src/main.rs | 2 +- milli/Cargo.toml | 5 +- milli/src/index.rs | 2 +- milli/src/lib.rs | 2 +- milli/src/search/new/db_cache.rs | 45 ++-- milli/src/update/facet/bulk.rs | 8 +- milli/src/update/facet/incremental.rs | 6 +- milli/src/update/facet/mod.rs | 19 +- milli/src/update/index_documents/enrich.rs | 4 +- .../extract/extract_docid_word_positions.rs | 14 +- .../extract/extract_facet_number_docids.rs | 4 +- .../extract/extract_facet_string_docids.rs | 10 +- .../extract/extract_fid_docid_facet_values.rs | 28 +- .../extract/extract_fid_word_count_docids.rs | 6 +- .../extract/extract_geo_points.rs | 6 +- .../extract/extract_word_docids.rs | 13 +- .../extract_word_pair_proximity_docids.rs | 8 +- .../extract/extract_word_position_docids.rs | 9 +- .../index_documents/helpers/grenad_helpers.rs | 32 +-- .../helpers/merge_functions.rs | 247 +++++++++++------- .../src/update/index_documents/helpers/mod.rs | 13 +- milli/src/update/index_documents/mod.rs | 26 +- milli/src/update/index_documents/transform.rs | 51 ++-- .../src/update/index_documents/typed_chunk.rs | 77 +++--- milli/src/update/mod.rs | 5 +- milli/src/update/new/channel.rs | 2 +- .../merge/del_add_roaring_bitmap_merger.rs | 2 +- milli/src/update/new/mod.rs | 12 +- milli/src/update/word_prefix_docids.rs | 11 +- .../src/update/words_prefix_integer_docids.rs | 11 +- 33 files changed, 367 insertions(+), 340 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18f6838ed..281c0ab9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2221,25 +2221,15 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "grenad" version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "350d89047298d3b1b40050acd11ab76e487b854a104b760ebc5a7f375093de77" +source = "git+https://github.com/meilisearch/grenad?branch=various-improvements#58ac87d852413571102f44c5e55ca13509a3f1a0" dependencies = [ "bytemuck", "byteorder", + "either", "rayon", "tempfile", ] -[[package]] -name = "grenad" -version = "0.4.7" -source = "git+https://github.com/meilisearch/grenad?branch=various-improvements#d7512aedb854c247acc7cd18d0bfa148d3779923" -dependencies = [ - "bytemuck", - "byteorder", - "tempfile", -] - [[package]] name = "h2" version = "0.3.26" @@ -2848,7 +2838,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d" dependencies = [ "cfg-if", - "windows-targets 0.52.4", + "windows-targets 0.48.1", ] [[package]] @@ -3584,8 +3574,7 @@ dependencies = [ "fst", "fxhash", "geoutils", - "grenad 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", - "grenad 0.4.7 (git+https://github.com/meilisearch/grenad?branch=various-improvements)", + "grenad", "heed", "hf-hub", "indexmap", diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e6e78614..1a056dde9 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -909,10 +909,8 @@ impl IndexScheduler { while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? { - dump_content_file.push_document(&obkv_to_object( - &doc, - &documents_batch_index, - )?)?; + dump_content_file + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; } dump_content_file.flush()?; } diff --git a/meilisearch/src/search/mod.rs b/meilisearch/src/search/mod.rs index 4ada47ff1..54d0c4823 100644 --- a/meilisearch/src/search/mod.rs +++ b/meilisearch/src/search/mod.rs @@ -1642,7 +1642,7 @@ fn add_non_formatted_ids_to_formatted_options( fn make_document( displayed_attributes: &BTreeSet, field_ids_map: &FieldsIdsMap, - obkv: obkv::KvReaderU16, + obkv: &obkv::KvReaderU16, ) -> Result { let mut document = serde_json::Map::new(); diff --git a/meilitool/src/main.rs b/meilitool/src/main.rs index 06c4890a5..f908dc4b0 100644 --- a/meilitool/src/main.rs +++ b/meilitool/src/main.rs @@ -244,7 +244,7 @@ fn export_a_dump( format!("While iterating on content file {:?}", content_file_uuid) })? { dump_content_file - .push_document(&obkv_to_object(&doc, &documents_batch_index)?)?; + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; } dump_content_file.flush()?; count += 1; diff --git a/milli/Cargo.toml b/milli/Cargo.toml index b15f72f15..7059ed7f5 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -28,10 +28,7 @@ fst = "0.4.7" fxhash = "0.2.1" geoutils = "0.5.1" grenad = { version = "0.4.7", default-features = false, features = [ - "rayon", - "tempfile", -] } -grenad2 = { package = "grenad", version = "0.4.7", default-features = false, features = [ + "rayon", # TODO Should we keep this feature "tempfile" ], git = "https://github.com/meilisearch/grenad", branch = "various-improvements" } heed = { version = "0.20.3", default-features = false, features = [ diff --git a/milli/src/index.rs b/milli/src/index.rs index 9c582b97a..58b3a6bf4 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1311,7 +1311,7 @@ impl Index { })?; Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> { let (_docid, obkv) = entry?; - match primary_key.document_id(&obkv, &fields)? { + match primary_key.document_id(obkv, &fields)? { Ok(document_id) => Ok(document_id), Err(_) => Err(InternalError::DocumentsError( crate::documents::Error::InvalidDocumentFormat, diff --git a/milli/src/lib.rs b/milli/src/lib.rs index bb8325791..8b2468bea 100644 --- a/milli/src/lib.rs +++ b/milli/src/lib.rs @@ -431,7 +431,7 @@ mod tests { writer.insert(id1, b"1234").unwrap(); writer.insert(id2, b"4321").unwrap(); let contents = writer.into_inner().unwrap(); - let obkv = obkv::KvReaderU16::new(&contents); + let obkv = obkv::KvReaderU16::from_slice(&contents); let expected = json!({ "field1": 1234, diff --git a/milli/src/search/new/db_cache.rs b/milli/src/search/new/db_cache.rs index d1d9d6d9a..243303ba2 100644 --- a/milli/src/search/new/db_cache.rs +++ b/milli/src/search/new/db_cache.rs @@ -3,6 +3,7 @@ use std::collections::hash_map::Entry; use std::hash::Hash; use fxhash::FxHashMap; +use grenad::MergeFunction; use heed::types::Bytes; use heed::{BytesEncode, Database, RoTxn}; use roaring::RoaringBitmap; @@ -11,7 +12,7 @@ use super::interner::Interned; use super::Word; use crate::heed_codec::{BytesDecodeOwned, StrBEU16Codec}; use crate::proximity::ProximityPrecision; -use crate::update::{merge_cbo_roaring_bitmaps, MergeFn}; +use crate::update::MergeCboRoaringBitmaps; use crate::{ CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, Result, SearchContext, U8StrStrCodec, }; @@ -110,19 +111,21 @@ impl<'ctx> DatabaseCache<'ctx> { .map_err(Into::into) } - fn get_value_from_keys<'v, K1, KC>( + fn get_value_from_keys<'v, K1, KC, MF>( txn: &'ctx RoTxn<'_>, cache_key: K1, db_keys: &'v [KC::EItem], cache: &mut FxHashMap>>, db: Database, universe: Option<&RoaringBitmap>, - merger: MergeFn, + merger: MF, ) -> Result> where K1: Copy + Eq + Hash, KC: BytesEncode<'v>, KC::EItem: Sized, + MF: MergeFunction, + crate::Error: From, { if let Entry::Vacant(entry) = cache.entry(cache_key) { let bitmap_ptr: Option> = match db_keys { @@ -138,7 +141,7 @@ impl<'ctx> DatabaseCache<'ctx> { if bitmaps.is_empty() { None } else { - Some(merger(&[], &bitmaps[..])?) + Some(merger.merge(&[], &bitmaps[..])?) } } }; @@ -213,17 +216,17 @@ impl<'ctx> SearchContext<'ctx> { let keys: Vec<_> = restricted_fids.tolerant.iter().map(|(fid, _)| (interned, *fid)).collect(); - DatabaseCache::get_value_from_keys::<_, _>( + DatabaseCache::get_value_from_keys( self.txn, word, &keys[..], &mut self.db_cache.word_docids, self.index.word_fid_docids.remap_data_type::(), universe, - merge_cbo_roaring_bitmaps, + MergeCboRoaringBitmaps, ) } - None => DatabaseCache::get_value::<_, _>( + None => DatabaseCache::get_value( self.txn, word, self.word_interner.get(word).as_str(), @@ -245,17 +248,17 @@ impl<'ctx> SearchContext<'ctx> { let keys: Vec<_> = restricted_fids.exact.iter().map(|(fid, _)| (interned, *fid)).collect(); - DatabaseCache::get_value_from_keys::<_, _>( + DatabaseCache::get_value_from_keys( self.txn, word, &keys[..], &mut self.db_cache.exact_word_docids, self.index.word_fid_docids.remap_data_type::(), universe, - merge_cbo_roaring_bitmaps, + MergeCboRoaringBitmaps, ) } - None => DatabaseCache::get_value::<_, _>( + None => DatabaseCache::get_value( self.txn, word, self.word_interner.get(word).as_str(), @@ -302,17 +305,17 @@ impl<'ctx> SearchContext<'ctx> { let keys: Vec<_> = restricted_fids.tolerant.iter().map(|(fid, _)| (interned, *fid)).collect(); - DatabaseCache::get_value_from_keys::<_, _>( + DatabaseCache::get_value_from_keys( self.txn, prefix, &keys[..], &mut self.db_cache.word_prefix_docids, self.index.word_prefix_fid_docids.remap_data_type::(), universe, - merge_cbo_roaring_bitmaps, + MergeCboRoaringBitmaps, ) } - None => DatabaseCache::get_value::<_, _>( + None => DatabaseCache::get_value( self.txn, prefix, self.word_interner.get(prefix).as_str(), @@ -334,17 +337,17 @@ impl<'ctx> SearchContext<'ctx> { let keys: Vec<_> = restricted_fids.exact.iter().map(|(fid, _)| (interned, *fid)).collect(); - DatabaseCache::get_value_from_keys::<_, _>( + DatabaseCache::get_value_from_keys( self.txn, prefix, &keys[..], &mut self.db_cache.exact_word_prefix_docids, self.index.word_prefix_fid_docids.remap_data_type::(), universe, - merge_cbo_roaring_bitmaps, + MergeCboRoaringBitmaps, ) } - None => DatabaseCache::get_value::<_, _>( + None => DatabaseCache::get_value( self.txn, prefix, self.word_interner.get(prefix).as_str(), @@ -405,7 +408,7 @@ impl<'ctx> SearchContext<'ctx> { Ok(docids) } - ProximityPrecision::ByWord => DatabaseCache::get_value::<_, _>( + ProximityPrecision::ByWord => DatabaseCache::get_value( self.txn, (proximity, word1, word2), &( @@ -538,7 +541,7 @@ impl<'ctx> SearchContext<'ctx> { return Ok(None); } - DatabaseCache::get_value::<_, _>( + DatabaseCache::get_value( self.txn, (word, fid), &(self.word_interner.get(word).as_str(), fid), @@ -559,7 +562,7 @@ impl<'ctx> SearchContext<'ctx> { return Ok(None); } - DatabaseCache::get_value::<_, _>( + DatabaseCache::get_value( self.txn, (word_prefix, fid), &(self.word_interner.get(word_prefix).as_str(), fid), @@ -629,7 +632,7 @@ impl<'ctx> SearchContext<'ctx> { word: Interned, position: u16, ) -> Result> { - DatabaseCache::get_value::<_, _>( + DatabaseCache::get_value( self.txn, (word, position), &(self.word_interner.get(word).as_str(), position), @@ -645,7 +648,7 @@ impl<'ctx> SearchContext<'ctx> { word_prefix: Interned, position: u16, ) -> Result> { - DatabaseCache::get_value::<_, _>( + DatabaseCache::get_value( self.txn, (word_prefix, position), &(self.word_interner.get(word_prefix).as_str(), position), diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index 27de6e777..19dfc310b 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -14,7 +14,7 @@ use crate::heed_codec::facet::{ use crate::heed_codec::BytesRefCodec; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader}; -use crate::update::MergeFn; +use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldId, Index, Result}; /// Algorithm to insert elememts into the `facet_id_(string/f64)_docids` databases @@ -29,7 +29,7 @@ pub struct FacetsUpdateBulk<'i> { facet_type: FacetType, field_ids: Vec, // None if level 0 does not need to be updated - delta_data: Option, MergeFn>>, + delta_data: Option, MergeDeladdCboRoaringBitmaps>>, } impl<'i> FacetsUpdateBulk<'i> { @@ -37,7 +37,7 @@ impl<'i> FacetsUpdateBulk<'i> { index: &'i Index, field_ids: Vec, facet_type: FacetType, - delta_data: Merger, MergeFn>, + delta_data: Merger, MergeDeladdCboRoaringBitmaps>, group_size: u8, min_level_size: u8, ) -> FacetsUpdateBulk<'i> { @@ -90,7 +90,7 @@ impl<'i> FacetsUpdateBulk<'i> { /// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type pub(crate) struct FacetsUpdateBulkInner { pub db: heed::Database, FacetGroupValueCodec>, - pub delta_data: Option>, + pub delta_data: Option>, pub group_size: u8, pub min_level_size: u8, } diff --git a/milli/src/update/facet/incremental.rs b/milli/src/update/facet/incremental.rs index 637f84986..a1fa07fe3 100644 --- a/milli/src/update/facet/incremental.rs +++ b/milli/src/update/facet/incremental.rs @@ -15,7 +15,7 @@ use crate::heed_codec::BytesRefCodec; use crate::search::facet::get_highest_level; use crate::update::del_add::DelAdd; use crate::update::index_documents::valid_lmdb_key; -use crate::update::MergeFn; +use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{CboRoaringBitmapCodec, Index, Result}; /// Enum used as a return value for the facet incremental indexing. @@ -57,14 +57,14 @@ enum ModificationResult { /// `facet_id_(string/f64)_docids` databases. pub struct FacetsUpdateIncremental { inner: FacetsUpdateIncrementalInner, - delta_data: Merger, MergeFn>, + delta_data: Merger, MergeDeladdCboRoaringBitmaps>, } impl FacetsUpdateIncremental { pub fn new( index: &Index, facet_type: FacetType, - delta_data: Merger, MergeFn>, + delta_data: Merger, MergeDeladdCboRoaringBitmaps>, group_size: u8, min_level_size: u8, max_group_size: u8, diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index bccfdff12..2e592519b 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -86,12 +86,11 @@ use time::OffsetDateTime; use tracing::debug; use self::incremental::FacetsUpdateIncremental; -use super::FacetsUpdateBulk; +use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps}; use crate::facet::FacetType; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec}; use crate::heed_codec::BytesRefCodec; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; -use crate::update::MergeFn; use crate::{try_split_array_at, FieldId, Index, Result}; pub mod bulk; @@ -105,8 +104,8 @@ pub struct FacetsUpdate<'i> { index: &'i Index, database: heed::Database, FacetGroupValueCodec>, facet_type: FacetType, - delta_data: Merger, MergeFn>, - normalized_delta_data: Option, MergeFn>>, + delta_data: Merger, MergeDeladdCboRoaringBitmaps>, + normalized_delta_data: Option, MergeDeladdBtreesetString>>, group_size: u8, max_group_size: u8, min_level_size: u8, @@ -116,8 +115,8 @@ impl<'i> FacetsUpdate<'i> { pub fn new( index: &'i Index, facet_type: FacetType, - delta_data: Merger, MergeFn>, - normalized_delta_data: Option, MergeFn>>, + delta_data: Merger, MergeDeladdCboRoaringBitmaps>, + normalized_delta_data: Option, MergeDeladdBtreesetString>>, data_size: u64, ) -> Self { let database = match facet_type { @@ -182,7 +181,7 @@ impl<'i> FacetsUpdate<'i> { fn index_facet_search( wtxn: &mut heed::RwTxn<'_>, - normalized_delta_data: Merger, MergeFn>, + normalized_delta_data: Merger, MergeDeladdBtreesetString>, index: &Index, ) -> Result<()> { let mut iter = normalized_delta_data.into_stream_merger_iter()?; @@ -298,8 +297,8 @@ pub(crate) mod test_helpers { use crate::search::facet::get_highest_level; use crate::snapshot_tests::display_bitmap; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; - use crate::update::index_documents::merge_deladd_cbo_roaring_bitmaps; - use crate::update::{FacetsUpdateIncrementalInner, MergeFn}; + use crate::update::index_documents::MergeDeladdCboRoaringBitmaps; + use crate::update::FacetsUpdateIncrementalInner; use crate::CboRoaringBitmapCodec; /// Utility function to generate a string whose position in a lexicographically @@ -484,7 +483,7 @@ pub(crate) mod test_helpers { } writer.finish().unwrap(); let reader = grenad::Reader::new(std::io::Cursor::new(new_data)).unwrap(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); builder.push(reader.into_cursor().unwrap()); let merger = builder.build(); diff --git a/milli/src/update/index_documents/enrich.rs b/milli/src/update/index_documents/enrich.rs index a93d6f9f1..85f871830 100644 --- a/milli/src/update/index_documents/enrich.rs +++ b/milli/src/update/index_documents/enrich.rs @@ -47,7 +47,7 @@ pub fn enrich_documents_batch( return match cursor.next_document()? { Some(first_document) => Ok(Err(UserError::MissingDocumentId { primary_key: primary_key.to_string(), - document: obkv_to_object(&first_document, &documents_batch_index)?, + document: obkv_to_object(first_document, &documents_batch_index)?, })), None => unreachable!("Called with reader.is_empty()"), }; @@ -106,7 +106,7 @@ pub fn enrich_documents_batch( let mut count = 0; while let Some(document) = cursor.next_document()? { let document_id = match fetch_or_generate_document_id( - &document, + document, &documents_batch_index, primary_key, autogenerate_docids, diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index a939827d5..716e4dd6b 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -8,7 +8,7 @@ use obkv::{KvReader, KvWriterU16}; use roaring::RoaringBitmap; use serde_json::Value; -use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters}; +use super::helpers::{create_sorter, sorter_into_reader, GrenadParameters, KeepLatestObkv}; use crate::error::{InternalError, SerializationError}; use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd}; use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; @@ -35,7 +35,7 @@ pub fn extract_docid_word_positions( let mut documents_ids = RoaringBitmap::new(); let mut docid_word_positions_sorter = create_sorter( grenad::SortAlgorithm::Stable, - keep_latest_obkv, + KeepLatestObkv, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -83,7 +83,7 @@ pub fn extract_docid_word_positions( let obkv = KvReader::::from_slice(value); // if the searchable fields didn't change, skip the searchable indexing for this document. - if !force_reindexing && !searchable_fields_changed(&obkv, settings_diff) { + if !force_reindexing && !searchable_fields_changed(obkv, settings_diff) { continue; } @@ -98,7 +98,7 @@ pub fn extract_docid_word_positions( || { // deletions tokens_from_document( - &obkv, + obkv, &settings_diff.old, &del_tokenizer, max_positions_per_attributes, @@ -109,7 +109,7 @@ pub fn extract_docid_word_positions( || { // additions tokens_from_document( - &obkv, + obkv, &settings_diff.new, &add_tokenizer, max_positions_per_attributes, @@ -126,8 +126,8 @@ pub fn extract_docid_word_positions( // transforming two KV> into one KV>> value_buffer.clear(); del_add_from_two_obkvs( - &KvReader::::from_slice(del_obkv), - &KvReader::::from_slice(add_obkv), + KvReader::::from_slice(del_obkv), + KvReader::::from_slice(add_obkv), &mut value_buffer, )?; diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index 478631dea..8a5a93270 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -4,7 +4,7 @@ use std::io::{self, BufReader}; use heed::{BytesDecode, BytesEncode}; use super::helpers::{ - create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters, + create_sorter, sorter_into_reader, GrenadParameters, MergeDeladdCboRoaringBitmaps, }; use crate::heed_codec::facet::{ FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec, @@ -27,7 +27,7 @@ pub fn extract_facet_number_docids( let mut facet_number_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 7565b1ad1..f7bdcbb56 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -15,7 +15,7 @@ use crate::heed_codec::{BEU16StrCodec, StrRefCodec}; use crate::localized_attributes_rules::LocalizedFieldIds; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::index_documents::helpers::{ - merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, + MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps, }; use crate::update::settings::InnerIndexSettingsDiff; use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; @@ -56,7 +56,7 @@ fn extract_facet_string_docids_document_update( let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -65,7 +65,7 @@ fn extract_facet_string_docids_document_update( let mut normalized_facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, - merge_deladd_btreeset_string, + MergeDeladdBtreesetString, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -144,7 +144,7 @@ fn extract_facet_string_docids_settings( let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -153,7 +153,7 @@ fn extract_facet_string_docids_settings( let mut normalized_facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, - merge_deladd_btreeset_string, + MergeDeladdBtreesetString, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 7678e1edf..f7f447ca9 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -1,10 +1,8 @@ -use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet}; use std::convert::TryInto; use std::fs::File; use std::io::{self, BufReader}; use std::mem::size_of; -use std::result::Result as StdResult; use bytemuck::bytes_of; use grenad::Sorter; @@ -15,13 +13,13 @@ use roaring::RoaringBitmap; use serde_json::{from_slice, Value}; use FilterableValues::{Empty, Null, Values}; -use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; +use super::helpers::{create_sorter, sorter_into_reader, GrenadParameters, KeepFirst}; use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::index_documents::{create_writer, writer_into_reader}; use crate::update::settings::InnerIndexSettingsDiff; -use crate::{CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, MAX_FACET_VALUE_LENGTH}; +use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, MAX_FACET_VALUE_LENGTH}; /// The length of the elements that are always in the buffer when inserting new values. const TRUNCATE_SIZE: usize = size_of::() + size_of::(); @@ -50,7 +48,7 @@ pub fn extract_fid_docid_facet_values( let mut fid_docid_facet_numbers_sorter = create_sorter( grenad::SortAlgorithm::Stable, - keep_first, + KeepFirst, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -59,7 +57,7 @@ pub fn extract_fid_docid_facet_values( let mut fid_docid_facet_strings_sorter = create_sorter( grenad::SortAlgorithm::Stable, - keep_first, + KeepFirst, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -330,15 +328,12 @@ fn truncate_str(s: &str) -> &str { /// Computes the diff between both Del and Add numbers and /// only inserts the parts that differ in the sorter. -fn insert_numbers_diff( - fid_docid_facet_numbers_sorter: &mut Sorter, +fn insert_numbers_diff( + fid_docid_facet_numbers_sorter: &mut Sorter, key_buffer: &mut Vec, mut del_numbers: Vec, mut add_numbers: Vec, -) -> Result<()> -where - MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, -{ +) -> Result<()> { // We sort and dedup the float numbers del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); @@ -390,15 +385,12 @@ where /// Computes the diff between both Del and Add strings and /// only inserts the parts that differ in the sorter. -fn insert_strings_diff( - fid_docid_facet_strings_sorter: &mut Sorter, +fn insert_strings_diff( + fid_docid_facet_strings_sorter: &mut Sorter, key_buffer: &mut Vec, mut del_strings: Vec<(String, String)>, mut add_strings: Vec<(String, String)>, -) -> Result<()> -where - MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, -{ +) -> Result<()> { // We sort and dedup the normalized and original strings del_strings.sort_unstable(); add_strings.sort_unstable(); diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 291dcc014..784de5d94 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -4,8 +4,8 @@ use std::io::{self, BufReader}; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, - GrenadParameters, + create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters, + MergeDeladdCboRoaringBitmaps, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; @@ -30,7 +30,7 @@ pub fn extract_fid_word_count_docids( let mut fid_word_count_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs index fcf102eeb..84f5e556b 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -40,11 +40,9 @@ pub fn extract_geo_points( }; // extract old version - let del_lat_lng = - extract_lat_lng(&obkv, &settings_diff.old, DelAdd::Deletion, document_id)?; + let del_lat_lng = extract_lat_lng(obkv, &settings_diff.old, DelAdd::Deletion, document_id)?; // extract new version - let add_lat_lng = - extract_lat_lng(&obkv, &settings_diff.new, DelAdd::Addition, document_id)?; + let add_lat_lng = extract_lat_lng(obkv, &settings_diff.new, DelAdd::Addition, document_id)?; if del_lat_lng != add_lat_lng { let mut obkv = KvWriterDelAdd::memory(); diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index a14f39e01..70db9d759 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -7,8 +7,8 @@ use obkv::KvReaderU16; use roaring::RoaringBitmap; use super::helpers::{ - create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, - writer_into_reader, GrenadParameters, + create_sorter, create_writer, try_split_array_at, writer_into_reader, GrenadParameters, + MergeDeladdCboRoaringBitmaps, }; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; @@ -16,7 +16,6 @@ use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::index_documents::helpers::sorter_into_reader; use crate::update::settings::InnerIndexSettingsDiff; -use crate::update::MergeFn; use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result}; /// Extracts the word and the documents ids where this word appear. @@ -40,7 +39,7 @@ pub fn extract_word_docids( let mut word_fid_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -94,7 +93,7 @@ pub fn extract_word_docids( let mut word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -103,7 +102,7 @@ pub fn extract_word_docids( let mut exact_word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -163,7 +162,7 @@ fn words_into_sorter( key_buffer: &mut Vec, del_words: &BTreeSet>, add_words: &BTreeSet>, - word_fid_docids_sorter: &mut grenad::Sorter, + word_fid_docids_sorter: &mut grenad::Sorter, ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 01344563f..705a5c96f 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -6,8 +6,8 @@ use std::{cmp, io}; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, - writer_into_reader, GrenadParameters, MergeFn, + create_sorter, create_writer, try_split_array_at, writer_into_reader, GrenadParameters, + MergeDeladdCboRoaringBitmaps, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; @@ -44,7 +44,7 @@ pub fn extract_word_pair_proximity_docids( .map(|_| { create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -197,7 +197,7 @@ fn document_word_positions_into_sorter( document_id: DocumentId, del_word_pair_proximity: &BTreeMap<(String, String), u8>, add_word_pair_proximity: &BTreeMap<(String, String), u8>, - word_pair_proximity_docids_sorters: &mut [grenad::Sorter], + word_pair_proximity_docids_sorters: &mut [grenad::Sorter], ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 7f14d6075..bee510bfb 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -5,14 +5,13 @@ use std::io::{self, BufReader}; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, - GrenadParameters, + create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters, + MergeDeladdCboRoaringBitmaps, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::settings::InnerIndexSettingsDiff; -use crate::update::MergeFn; use crate::{bucketed_position, DocumentId, Result}; /// Extracts the word positions and the documents ids where this word appear. @@ -29,7 +28,7 @@ pub fn extract_word_position_docids( let mut word_position_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -100,7 +99,7 @@ fn words_position_into_sorter( key_buffer: &mut Vec, del_word_positions: &BTreeSet<(u16, Vec)>, add_word_positions: &BTreeSet<(u16, Vec)>, - word_position_docids_sorter: &mut grenad::Sorter, + word_position_docids_sorter: &mut grenad::Sorter, ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index 44009f2fa..1f8f7eddf 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -1,11 +1,10 @@ -use std::borrow::Cow; use std::fs::File; use std::io::{self, BufReader, BufWriter, Seek}; -use grenad::{CompressionType, Sorter}; +use grenad::{CompressionType, MergeFunction, Sorter}; use heed::types::Bytes; -use super::{ClonableMmap, MergeFn}; +use super::ClonableMmap; use crate::update::index_documents::valid_lmdb_key; use crate::Result; @@ -31,14 +30,14 @@ pub fn create_writer( /// A helper function that creates a grenad sorter /// with the given parameters. The max memory is /// clamped to something reasonable. -pub fn create_sorter( +pub fn create_sorter( sort_algorithm: grenad::SortAlgorithm, - merge: MergeFn, + merge: MF, chunk_compression_type: grenad::CompressionType, chunk_compression_level: Option, max_nb_chunks: Option, max_memory: Option, -) -> grenad::Sorter { +) -> grenad::Sorter { let mut builder = grenad::Sorter::builder(merge); builder.chunk_compression_type(chunk_compression_type); if let Some(level) = chunk_compression_level { @@ -57,10 +56,14 @@ pub fn create_sorter( } #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] -pub fn sorter_into_reader( - sorter: grenad::Sorter, +pub fn sorter_into_reader( + sorter: grenad::Sorter, indexer: GrenadParameters, -) -> Result>> { +) -> Result>> +where + MF: MergeFunction, + crate::Error: From, +{ let mut writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -169,8 +172,8 @@ pub fn grenad_obkv_into_chunks( /// Write provided sorter in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] -pub fn write_sorter_into_database( - sorter: Sorter, +pub fn write_sorter_into_database( + sorter: Sorter, database: &heed::Database, wtxn: &mut heed::RwTxn<'_>, index_is_empty: bool, @@ -180,6 +183,8 @@ pub fn write_sorter_into_database( where FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec) -> Result>, + MF: MergeFunction, + crate::Error: From, { let mut buffer = Vec::new(); let database = database.remap_types::(); @@ -207,8 +212,3 @@ where Ok(()) } - -/// Used when trying to merge readers, but you don't actually care about the values. -pub fn merge_ignore_values<'a>(_key: &[u8], _values: &[Cow<'a, [u8]>]) -> Result> { - Ok(Cow::Owned(Vec::new())) -} diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 51fa4e086..ab8a09a60 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -3,6 +3,8 @@ use std::collections::BTreeSet; use std::io; use std::result::Result as StdResult; +use either::Either; +use grenad::MergeFunction; use roaring::RoaringBitmap; use crate::heed_codec::CboRoaringBitmapCodec; @@ -10,7 +12,8 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::index_documents::transform::Operation; use crate::Result; -pub type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> Result>; +pub type EitherObkvMerge = + Either; pub fn serialize_roaring_bitmap(bitmap: &RoaringBitmap, buffer: &mut Vec) -> io::Result<()> { buffer.clear(); @@ -18,30 +21,48 @@ pub fn serialize_roaring_bitmap(bitmap: &RoaringBitmap, buffer: &mut Vec) -> bitmap.serialize_into(buffer) } -pub fn merge_roaring_bitmaps<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { - if values.len() == 1 { - Ok(values[0].clone()) - } else { - let merged = values - .iter() - .map(AsRef::as_ref) - .map(RoaringBitmap::deserialize_from) - .map(StdResult::unwrap) - .reduce(|a, b| a | b) - .unwrap(); - let mut buffer = Vec::new(); - serialize_roaring_bitmap(&merged, &mut buffer)?; - Ok(Cow::Owned(buffer)) +pub struct MergeRoaringBitmaps; + +impl MergeFunction for MergeRoaringBitmaps { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + let merged = values + .iter() + .map(AsRef::as_ref) + .map(RoaringBitmap::deserialize_from) + .map(StdResult::unwrap) + .reduce(|a, b| a | b) + .unwrap(); + let mut buffer = Vec::new(); + serialize_roaring_bitmap(&merged, &mut buffer)?; + Ok(Cow::Owned(buffer)) + } } } -pub fn keep_first<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { - Ok(values[0].clone()) +pub struct KeepFirst; + +impl MergeFunction for KeepFirst { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { + Ok(values[0].clone()) + } } /// Only the last value associated with an id is kept. -pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result> { - Ok(obkvs.last().unwrap().clone()) +pub struct KeepLatestObkv; + +impl MergeFunction for KeepLatestObkv { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result> { + Ok(obkvs.last().unwrap().clone()) + } } pub fn merge_two_del_add_obkvs( @@ -145,65 +166,79 @@ fn inner_merge_del_add_obkvs<'a>( } /// Merge all the obkvs from the newest to the oldest. -pub fn obkvs_merge_additions_and_deletions<'a>( - _key: &[u8], - obkvs: &[Cow<'a, [u8]>], -) -> Result> { - inner_merge_del_add_obkvs(obkvs, true) +#[derive(Copy, Clone)] +pub struct ObkvsMergeAdditionsAndDeletions; + +impl MergeFunction for ObkvsMergeAdditionsAndDeletions { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result> { + inner_merge_del_add_obkvs(obkvs, true) + } } /// Merge all the obkvs deletions from the newest to the oldest and keep only the newest additions. -pub fn obkvs_keep_last_addition_merge_deletions<'a>( - _key: &[u8], - obkvs: &[Cow<'a, [u8]>], -) -> Result> { - inner_merge_del_add_obkvs(obkvs, false) +#[derive(Copy, Clone)] +pub struct ObkvsKeepLastAdditionMergeDeletions; + +impl MergeFunction for ObkvsKeepLastAdditionMergeDeletions { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result> { + inner_merge_del_add_obkvs(obkvs, false) + } } /// Do a union of all the CboRoaringBitmaps in the values. -pub fn merge_cbo_roaring_bitmaps<'a>( - _key: &[u8], - values: &[Cow<'a, [u8]>], -) -> Result> { - if values.len() == 1 { - Ok(values[0].clone()) - } else { - let mut vec = Vec::new(); - CboRoaringBitmapCodec::merge_into(values, &mut vec)?; - Ok(Cow::from(vec)) +pub struct MergeCboRoaringBitmaps; + +impl MergeFunction for MergeCboRoaringBitmaps { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + let mut vec = Vec::new(); + CboRoaringBitmapCodec::merge_into(values, &mut vec)?; + Ok(Cow::from(vec)) + } } } /// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv /// separately and outputs a new DelAdd with both unions. -pub fn merge_deladd_cbo_roaring_bitmaps<'a>( - _key: &[u8], - values: &[Cow<'a, [u8]>], -) -> Result> { - if values.len() == 1 { - Ok(values[0].clone()) - } else { - // Retrieve the bitmaps from both sides - let mut del_bitmaps_bytes = Vec::new(); - let mut add_bitmaps_bytes = Vec::new(); - for value in values { - let obkv = KvReaderDelAdd::from_slice(value); - if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) { - del_bitmaps_bytes.push(bitmap_bytes); - } - if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) { - add_bitmaps_bytes.push(bitmap_bytes); - } - } +pub struct MergeDeladdCboRoaringBitmaps; - let mut output_deladd_obkv = KvWriterDelAdd::memory(); - let mut buffer = Vec::new(); - CboRoaringBitmapCodec::merge_into(del_bitmaps_bytes, &mut buffer)?; - output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?; - buffer.clear(); - CboRoaringBitmapCodec::merge_into(add_bitmaps_bytes, &mut buffer)?; - output_deladd_obkv.insert(DelAdd::Addition, &buffer)?; - output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) +impl MergeFunction for MergeDeladdCboRoaringBitmaps { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + // Retrieve the bitmaps from both sides + let mut del_bitmaps_bytes = Vec::new(); + let mut add_bitmaps_bytes = Vec::new(); + for value in values { + let obkv = KvReaderDelAdd::from_slice(value); + if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) { + del_bitmaps_bytes.push(bitmap_bytes); + } + if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) { + add_bitmaps_bytes.push(bitmap_bytes); + } + } + + let mut output_deladd_obkv = KvWriterDelAdd::memory(); + let mut buffer = Vec::new(); + CboRoaringBitmapCodec::merge_into(del_bitmaps_bytes, &mut buffer)?; + output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?; + buffer.clear(); + CboRoaringBitmapCodec::merge_into(add_bitmaps_bytes, &mut buffer)?; + output_deladd_obkv.insert(DelAdd::Addition, &buffer)?; + output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) + } } } @@ -225,37 +260,55 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>( /// Do a union of BtreeSet on both sides of a DelAdd obkv /// separately and outputs a new DelAdd with both unions. -pub fn merge_deladd_btreeset_string<'a>( - _key: &[u8], - values: &[Cow<'a, [u8]>], -) -> Result> { - if values.len() == 1 { - Ok(values[0].clone()) - } else { - // Retrieve the bitmaps from both sides - let mut del_set = BTreeSet::new(); - let mut add_set = BTreeSet::new(); - for value in values { - let obkv = KvReaderDelAdd::from_slice(value); - if let Some(bytes) = obkv.get(DelAdd::Deletion) { - let set = serde_json::from_slice::>(bytes).unwrap(); - for value in set { - del_set.insert(value); - } - } - if let Some(bytes) = obkv.get(DelAdd::Addition) { - let set = serde_json::from_slice::>(bytes).unwrap(); - for value in set { - add_set.insert(value); - } - } - } +pub struct MergeDeladdBtreesetString; - let mut output_deladd_obkv = KvWriterDelAdd::memory(); - let del = serde_json::to_vec(&del_set).unwrap(); - output_deladd_obkv.insert(DelAdd::Deletion, &del)?; - let add = serde_json::to_vec(&add_set).unwrap(); - output_deladd_obkv.insert(DelAdd::Addition, &add)?; - output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) +impl MergeFunction for MergeDeladdBtreesetString { + type Error = crate::Error; + + fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + // Retrieve the bitmaps from both sides + let mut del_set = BTreeSet::new(); + let mut add_set = BTreeSet::new(); + for value in values { + let obkv = KvReaderDelAdd::from_slice(value); + if let Some(bytes) = obkv.get(DelAdd::Deletion) { + let set = serde_json::from_slice::>(bytes).unwrap(); + for value in set { + del_set.insert(value); + } + } + if let Some(bytes) = obkv.get(DelAdd::Addition) { + let set = serde_json::from_slice::>(bytes).unwrap(); + for value in set { + add_set.insert(value); + } + } + } + + let mut output_deladd_obkv = KvWriterDelAdd::memory(); + let del = serde_json::to_vec(&del_set).unwrap(); + output_deladd_obkv.insert(DelAdd::Deletion, &del)?; + let add = serde_json::to_vec(&add_set).unwrap(); + output_deladd_obkv.insert(DelAdd::Addition, &add)?; + output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) + } + } +} + +/// Used when trying to merge readers, but you don't actually care about the values. +pub struct MergeIgnoreValues; + +impl MergeFunction for MergeIgnoreValues { + type Error = crate::Error; + + fn merge<'a>( + &self, + _key: &[u8], + _values: &[Cow<'a, [u8]>], + ) -> std::result::Result, Self::Error> { + Ok(Cow::Owned(Vec::new())) } } diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 5d8f16fae..c188e324d 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -7,17 +7,8 @@ use std::convert::{TryFrom, TryInto}; pub use clonable_mmap::{ClonableMmap, CursorClonableMmap}; use fst::{IntoStreamer, Streamer}; -pub use grenad_helpers::{ - as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, - merge_ignore_values, sorter_into_reader, write_sorter_into_database, writer_into_reader, - GrenadParameters, -}; -pub use merge_functions::{ - keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_deladd_btreeset_string, - merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, - merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions, - obkvs_merge_additions_and_deletions, MergeFn, -}; +pub use grenad_helpers::*; +pub use merge_functions::*; use crate::MAX_WORD_LENGTH; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 87c6bc6db..0cee93bdc 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -27,13 +27,7 @@ use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk}; use self::enrich::enrich_documents_batch; pub use self::enrich::{extract_finite_float_from_value, DocumentId}; -pub use self::helpers::{ - as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, - fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, - merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps, - valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn, -}; -use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; +pub use self::helpers::*; pub use self::transform::{Transform, TransformOutput}; use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; @@ -605,7 +599,7 @@ where let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? }; let word_docids = word_docids.get_or_insert_with(|| { - MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn) + MergerBuilder::new(MergeDeladdCboRoaringBitmaps) }); word_docids.push(cloneable_chunk.into_cursor()?); let cloneable_chunk = @@ -613,14 +607,14 @@ where let exact_word_docids = exact_word_docids.get_or_insert_with(|| { MergerBuilder::new( - merge_deladd_cbo_roaring_bitmaps as MergeFn, + MergeDeladdCboRoaringBitmaps, ) }); exact_word_docids.push(cloneable_chunk.into_cursor()?); let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? }; let word_fid_docids = word_fid_docids.get_or_insert_with(|| { - MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn) + MergerBuilder::new(MergeDeladdCboRoaringBitmaps) }); word_fid_docids.push(cloneable_chunk.into_cursor()?); TypedChunk::WordDocids { @@ -634,7 +628,7 @@ where let word_position_docids = word_position_docids.get_or_insert_with(|| { MergerBuilder::new( - merge_deladd_cbo_roaring_bitmaps as MergeFn, + MergeDeladdCboRoaringBitmaps, ) }); word_position_docids.push(cloneable_chunk.into_cursor()?); @@ -719,10 +713,10 @@ where )] pub fn execute_prefix_databases( self, - word_docids: Option>, - exact_word_docids: Option>, - word_position_docids: Option>, - word_fid_docids: Option>, + word_docids: Option>, + exact_word_docids: Option>, + word_position_docids: Option>, + word_fid_docids: Option>, ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, @@ -902,7 +896,7 @@ where )] fn execute_word_prefix_docids( txn: &mut heed::RwTxn<'_>, - merger: Merger, + merger: Merger, word_docids_db: Database, word_prefix_docids_db: Database, indexer_config: &IndexerConfig, diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index b9541e649..c3c48a6eb 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -5,6 +5,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs::File; use std::io::{Read, Seek}; +use either::Either; use fxhash::FxHashMap; use itertools::Itertools; use obkv::{KvReader, KvReaderU16, KvWriter}; @@ -13,10 +14,10 @@ use serde_json::Value; use smartstring::SmartString; use super::helpers::{ - create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions, - obkvs_merge_additions_and_deletions, sorter_into_reader, MergeFn, + create_sorter, create_writer, sorter_into_reader, EitherObkvMerge, + ObkvsKeepLastAdditionMergeDeletions, ObkvsMergeAdditionsAndDeletions, }; -use super::{IndexDocumentsMethod, IndexerConfig}; +use super::{IndexDocumentsMethod, IndexerConfig, KeepFirst}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; @@ -59,8 +60,8 @@ pub struct Transform<'a, 'i> { // Both grenad follows the same format: // key | value // u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored - original_sorter: grenad::Sorter, - flattened_sorter: grenad::Sorter, + original_sorter: grenad::Sorter, + flattened_sorter: grenad::Sorter, replaced_documents_ids: RoaringBitmap, new_documents_ids: RoaringBitmap, @@ -108,17 +109,19 @@ impl<'a, 'i> Transform<'a, 'i> { index_documents_method: IndexDocumentsMethod, _autogenerate_docids: bool, ) -> Result { + use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments}; + // We must choose the appropriate merge function for when two or more documents // with the same user id must be merged or fully replaced in the same batch. let merge_function = match index_documents_method { - IndexDocumentsMethod::ReplaceDocuments => obkvs_keep_last_addition_merge_deletions, - IndexDocumentsMethod::UpdateDocuments => obkvs_merge_additions_and_deletions, + ReplaceDocuments => Either::Left(ObkvsKeepLastAdditionMergeDeletions), + UpdateDocuments => Either::Right(ObkvsMergeAdditionsAndDeletions), }; // We initialize the sorter with the user indexing settings. let original_sorter = create_sorter( grenad::SortAlgorithm::Stable, - merge_function, + merge_function.clone(), indexer_settings.chunk_compression_type, indexer_settings.chunk_compression_level, indexer_settings.max_nb_chunks, @@ -979,7 +982,7 @@ impl<'a, 'i> Transform<'a, 'i> { let mut original_sorter = if settings_diff.reindex_vectors() { Some(create_sorter( grenad::SortAlgorithm::Stable, - keep_first, + KeepFirst, self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, self.indexer_settings.max_nb_chunks, @@ -1023,7 +1026,7 @@ impl<'a, 'i> Transform<'a, 'i> { if settings_diff.reindex_searchable() || settings_diff.reindex_facets() { Some(create_sorter( grenad::SortAlgorithm::Stable, - keep_first, + KeepFirst, self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, self.indexer_settings.max_nb_chunks, @@ -1162,6 +1165,8 @@ fn drop_and_reuse(mut vec: Vec) -> Vec { #[cfg(test)] mod test { + use grenad::MergeFunction; + use super::*; #[test] @@ -1219,25 +1224,32 @@ mod test { .unwrap(); additive_doc_0_1.insert(0, Operation::Addition as u8); - let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())]) - .unwrap(); + let ret = MergeFunction::merge( + &ObkvsMergeAdditionsAndDeletions, + &[], + &[Cow::from(additive_doc_0.as_slice())], + ) + .unwrap(); assert_eq!(*ret, additive_doc_0); - let ret = obkvs_merge_additions_and_deletions( + let ret = MergeFunction::merge( + &ObkvsMergeAdditionsAndDeletions, &[], &[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())], ) .unwrap(); assert_eq!(*ret, del_add_doc_0); - let ret = obkvs_merge_additions_and_deletions( + let ret = MergeFunction::merge( + &ObkvsMergeAdditionsAndDeletions, &[], &[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())], ) .unwrap(); assert_eq!(*ret, deletive_doc_0); - let ret = obkvs_merge_additions_and_deletions( + let ret = MergeFunction::merge( + &ObkvsMergeAdditionsAndDeletions, &[], &[ Cow::from(additive_doc_1.as_slice()), @@ -1248,21 +1260,24 @@ mod test { .unwrap(); assert_eq!(*ret, del_add_doc_0); - let ret = obkvs_merge_additions_and_deletions( + let ret = MergeFunction::merge( + &ObkvsMergeAdditionsAndDeletions, &[], &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())], ) .unwrap(); assert_eq!(*ret, additive_doc_0_1); - let ret = obkvs_keep_last_addition_merge_deletions( + let ret = MergeFunction::merge( + &ObkvsKeepLastAdditionMergeDeletions, &[], &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())], ) .unwrap(); assert_eq!(*ret, additive_doc_0); - let ret = obkvs_keep_last_addition_merge_deletions( + let ret = MergeFunction::merge( + &ObkvsKeepLastAdditionMergeDeletions, &[], &[ Cow::from(deletive_doc_0.as_slice()), diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 9fe152348..592ace80f 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -4,18 +4,17 @@ use std::fs::File; use std::io::{self, BufReader}; use bytemuck::allocation::pod_collect_to_vec; -use grenad::{Merger, MergerBuilder}; +use grenad::{MergeFunction, Merger, MergerBuilder}; use heed::types::Bytes; use heed::{BytesDecode, RwTxn}; use obkv::{KvReader, KvWriter}; use roaring::RoaringBitmap; use super::helpers::{ - self, keep_first, merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, - merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values, valid_lmdb_key, - CursorClonableMmap, + self, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, + CursorClonableMmap, KeepFirst, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps, + MergeIgnoreValues, }; -use super::MergeFn; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; use crate::index::db_name::DOCUMENTS; @@ -24,7 +23,7 @@ use crate::proximity::MAX_DISTANCE; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; use crate::update::index_documents::helpers::{ - as_cloneable_grenad, keep_latest_obkv, try_split_array_at, + as_cloneable_grenad, try_split_array_at, KeepLatestObkv, }; use crate::update::settings::InnerIndexSettingsDiff; use crate::{ @@ -140,7 +139,7 @@ pub(crate) fn write_typed_chunk_into_index( let vectors_fid = fields_ids_map.id(crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME); - let mut builder = MergerBuilder::new(keep_latest_obkv as MergeFn); + let mut builder = MergerBuilder::new(KeepLatestObkv); for typed_chunk in typed_chunks { let TypedChunk::Documents(chunk) = typed_chunk else { unreachable!(); @@ -234,7 +233,7 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db", "field_id_word_count_docids"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); for typed_chunk in typed_chunks { let TypedChunk::FieldIdWordCountDocids(chunk) = typed_chunk else { unreachable!(); @@ -257,13 +256,10 @@ pub(crate) fn write_typed_chunk_into_index( let span = tracing::trace_span!(target: "indexing::write_db", "word_docids"); let _entered = span.enter(); - let mut word_docids_builder = - MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); - let mut exact_word_docids_builder = - MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); - let mut word_fid_docids_builder = - MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); - let mut fst_merger_builder = MergerBuilder::new(merge_ignore_values as MergeFn); + let mut word_docids_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); + let mut exact_word_docids_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); + let mut word_fid_docids_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); + let mut fst_merger_builder = MergerBuilder::new(MergeIgnoreValues); for typed_chunk in typed_chunks { let TypedChunk::WordDocids { word_docids_reader, @@ -328,7 +324,7 @@ pub(crate) fn write_typed_chunk_into_index( let span = tracing::trace_span!(target: "indexing::write_db", "word_position_docids"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); for typed_chunk in typed_chunks { let TypedChunk::WordPositionDocids(chunk) = typed_chunk else { unreachable!(); @@ -352,7 +348,7 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db","field_id_facet_number_docids"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); let mut data_size = 0; for typed_chunk in typed_chunks { let TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids) = typed_chunk @@ -374,10 +370,9 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db", "field_id_facet_string_docids"); let _entered = span.enter(); - let mut facet_id_string_builder = - MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut facet_id_string_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); let mut normalized_facet_id_string_builder = - MergerBuilder::new(merge_deladd_btreeset_string as MergeFn); + MergerBuilder::new(MergeDeladdBtreesetString); let mut data_size = 0; for typed_chunk in typed_chunks { let TypedChunk::FieldIdFacetStringDocids(( @@ -411,7 +406,7 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db", "field_id_facet_exists_docids"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); for typed_chunk in typed_chunks { let TypedChunk::FieldIdFacetExistsDocids(chunk) = typed_chunk else { unreachable!(); @@ -435,7 +430,7 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_null_docids"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); for typed_chunk in typed_chunks { let TypedChunk::FieldIdFacetIsNullDocids(chunk) = typed_chunk else { unreachable!(); @@ -458,7 +453,7 @@ pub(crate) fn write_typed_chunk_into_index( let span = tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_empty_docids"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); for typed_chunk in typed_chunks { let TypedChunk::FieldIdFacetIsEmptyDocids(chunk) = typed_chunk else { unreachable!(); @@ -482,7 +477,7 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db", "word_pair_proximity_docids"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps); for typed_chunk in typed_chunks { let TypedChunk::WordPairProximityDocids(chunk) = typed_chunk else { unreachable!(); @@ -515,7 +510,7 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_numbers"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(keep_first as MergeFn); + let mut builder = MergerBuilder::new(KeepFirst); for typed_chunk in typed_chunks { let TypedChunk::FieldIdDocidFacetNumbers(chunk) = typed_chunk else { unreachable!(); @@ -549,7 +544,7 @@ pub(crate) fn write_typed_chunk_into_index( tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_strings"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(keep_first as MergeFn); + let mut builder = MergerBuilder::new(KeepFirst); for typed_chunk in typed_chunks { let TypedChunk::FieldIdDocidFacetStrings(chunk) = typed_chunk else { unreachable!(); @@ -582,7 +577,7 @@ pub(crate) fn write_typed_chunk_into_index( let span = tracing::trace_span!(target: "indexing::write_db", "geo_points"); let _entered = span.enter(); - let mut builder = MergerBuilder::new(keep_first as MergeFn); + let mut builder = MergerBuilder::new(KeepFirst); for typed_chunk in typed_chunks { let TypedChunk::GeoPoints(chunk) = typed_chunk else { unreachable!(); @@ -619,9 +614,9 @@ pub(crate) fn write_typed_chunk_into_index( let span = tracing::trace_span!(target: "indexing::write_db", "vector_points"); let _entered = span.enter(); - let mut remove_vectors_builder = MergerBuilder::new(keep_first as MergeFn); - let mut manual_vectors_builder = MergerBuilder::new(keep_first as MergeFn); - let mut embeddings_builder = MergerBuilder::new(keep_first as MergeFn); + let mut remove_vectors_builder = MergerBuilder::new(KeepFirst); + let mut manual_vectors_builder = MergerBuilder::new(KeepFirst); + let mut embeddings_builder = MergerBuilder::new(KeepFirst); let mut add_to_user_provided = RoaringBitmap::new(); let mut remove_from_user_provided = RoaringBitmap::new(); let mut params = None; @@ -786,9 +781,13 @@ fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint { GeoPoint::new(xyz_point, (docid, point)) } -fn merge_word_docids_reader_into_fst( - merger: Merger, -) -> Result>> { +fn merge_word_docids_reader_into_fst( + merger: Merger, +) -> Result>> +where + MF: MergeFunction, + crate::Error: From, +{ let mut iter = merger.into_stream_merger_iter()?; let mut builder = fst::SetBuilder::memory(); @@ -802,8 +801,8 @@ fn merge_word_docids_reader_into_fst( /// Write provided entries in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] -fn write_entries_into_database( - merger: Merger, +fn write_entries_into_database( + merger: Merger, database: &heed::Database, wtxn: &mut RwTxn<'_>, serialize_value: FS, @@ -813,6 +812,8 @@ where R: io::Read + io::Seek, FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec) -> Result>, + MF: MergeFunction, + crate::Error: From, { let mut buffer = Vec::new(); let database = database.remap_types::(); @@ -839,13 +840,15 @@ where /// Akin to the `write_entries_into_database` function but specialized /// for the case when we only index additional searchable fields only. #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] -fn write_proximity_entries_into_database_additional_searchables( - merger: Merger, +fn write_proximity_entries_into_database_additional_searchables( + merger: Merger, database: &heed::Database, wtxn: &mut RwTxn<'_>, ) -> Result<()> where R: io::Read + io::Seek, + MF: MergeFunction, + crate::Error: From, { let mut iter = merger.into_stream_merger_iter()?; while let Some((key, value)) = iter.next()? { diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index adfc85174..9842002a4 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -2,10 +2,7 @@ pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; -pub use self::index_documents::{ - merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, DocumentAdditionResult, DocumentId, - IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, MergeFn, -}; +pub use self::index_documents::*; pub use self::indexer_config::IndexerConfig; pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::update_step::UpdateIndexingStep; diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 15239aa3e..0aeabcfa7 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -159,7 +159,7 @@ impl DocumentSender { } pub enum MergerOperation { - WordDocidsCursors(Vec>), + WordDocidsCursors(Vec>), } pub struct MergerReceiver(Receiver); diff --git a/milli/src/update/new/merge/del_add_roaring_bitmap_merger.rs b/milli/src/update/new/merge/del_add_roaring_bitmap_merger.rs index 5e6310170..5aa2c31f8 100644 --- a/milli/src/update/new/merge/del_add_roaring_bitmap_merger.rs +++ b/milli/src/update/new/merge/del_add_roaring_bitmap_merger.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::io; -use grenad2::MergeFunction; +use grenad::MergeFunction; use roaring::RoaringBitmap; use crate::update::del_add::DelAdd; diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index e32290c2b..fc587cb2a 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -198,7 +198,7 @@ mod indexer { } let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); - Ok(docids_version_offsets.into_par_iter().map_with( + docids_version_offsets.into_par_iter().map_with( items, |context_pool, (external_docid, (internal_docid, operations))| { context_pool.with(|rtxn| match self.method { @@ -221,7 +221,9 @@ mod indexer { ), }) }, - )) + ); + + Ok(vec![].into_par_iter()) } } @@ -334,13 +336,13 @@ mod indexer { thread::scope(|s| { thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { document_changes.into_par_iter().for_each(|_dc| ()); - }); + })?; // TODO manage the errors correctly thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { let rtxn = index.read_txn().unwrap(); merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index).unwrap() - }); + })?; // TODO Split this code into another function for operation in writer_receiver { @@ -426,7 +428,7 @@ mod indexer { let sender = sender.word_docids(); let database = index.word_docids.remap_types::(); - let mut builder = grenad2::MergerBuilder::new(merge::DelAddRoaringBitmapMerger); + let mut builder = grenad::MergerBuilder::new(merge::DelAddRoaringBitmapMerger); builder.extend(cursors); /// TODO manage the error correctly let mut merger_iter = builder.build().into_stream_merger_iter().unwrap(); diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 925635f80..f683146cf 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -6,9 +6,8 @@ use heed::Database; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{ - create_sorter, merge_deladd_cbo_roaring_bitmaps, - merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, - write_sorter_into_database, CursorClonableMmap, MergeFn, + create_sorter, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, + write_sorter_into_database, CursorClonableMmap, MergeDeladdCboRoaringBitmaps, }; use crate::{CboRoaringBitmapCodec, Result}; @@ -47,7 +46,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { )] pub fn execute( self, - new_word_docids: grenad::Merger, + new_word_docids: grenad::Merger, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -56,7 +55,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { // and write into it at the same time, therefore we write into another file. let mut prefix_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, self.chunk_compression_type, self.chunk_compression_level, self.max_nb_chunks, @@ -139,7 +138,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { fn write_prefixes_in_sorter( prefixes: &mut HashMap, Vec>>, - sorter: &mut grenad::Sorter, + sorter: &mut grenad::Sorter, ) -> Result<()> { for (key, data_slices) in prefixes.drain() { for data in data_slices { diff --git a/milli/src/update/words_prefix_integer_docids.rs b/milli/src/update/words_prefix_integer_docids.rs index 9b6aa21ae..28b9b1523 100644 --- a/milli/src/update/words_prefix_integer_docids.rs +++ b/milli/src/update/words_prefix_integer_docids.rs @@ -11,9 +11,8 @@ use crate::heed_codec::StrBEU16Codec; use crate::index::main_key::WORDS_PREFIXES_FST_KEY; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{ - create_sorter, merge_deladd_cbo_roaring_bitmaps, - merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, - write_sorter_into_database, CursorClonableMmap, MergeFn, + create_sorter, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, + write_sorter_into_database, CursorClonableMmap, MergeDeladdCboRoaringBitmaps, }; use crate::{CboRoaringBitmapCodec, Result}; @@ -52,7 +51,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { )] pub fn execute( self, - new_word_integer_docids: grenad::Merger, + new_word_integer_docids: grenad::Merger, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -61,7 +60,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { let mut prefix_integer_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_deladd_cbo_roaring_bitmaps, + MergeDeladdCboRoaringBitmaps, self.chunk_compression_type, self.chunk_compression_level, self.max_nb_chunks, @@ -173,7 +172,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { fn write_prefixes_in_sorter( prefixes: &mut HashMap, Vec>>, - sorter: &mut grenad::Sorter, + sorter: &mut grenad::Sorter, ) -> Result<()> { // TODO: Merge before insertion. for (key, data_slices) in prefixes.drain() {