Replace grenad with the new grenad various-improvement branch

This commit is contained in:
Clément Renault 2024-08-30 11:49:47 +02:00
parent b7c77c7a39
commit 794ebcd582
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
33 changed files with 367 additions and 340 deletions

19
Cargo.lock generated
View File

@ -2221,25 +2221,15 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]] [[package]]
name = "grenad" name = "grenad"
version = "0.4.7" version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/meilisearch/grenad?branch=various-improvements#58ac87d852413571102f44c5e55ca13509a3f1a0"
checksum = "350d89047298d3b1b40050acd11ab76e487b854a104b760ebc5a7f375093de77"
dependencies = [ dependencies = [
"bytemuck", "bytemuck",
"byteorder", "byteorder",
"either",
"rayon", "rayon",
"tempfile", "tempfile",
] ]
[[package]]
name = "grenad"
version = "0.4.7"
source = "git+https://github.com/meilisearch/grenad?branch=various-improvements#d7512aedb854c247acc7cd18d0bfa148d3779923"
dependencies = [
"bytemuck",
"byteorder",
"tempfile",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.26" version = "0.3.26"
@ -2848,7 +2838,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d" checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"windows-targets 0.52.4", "windows-targets 0.48.1",
] ]
[[package]] [[package]]
@ -3584,8 +3574,7 @@ dependencies = [
"fst", "fst",
"fxhash", "fxhash",
"geoutils", "geoutils",
"grenad 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "grenad",
"grenad 0.4.7 (git+https://github.com/meilisearch/grenad?branch=various-improvements)",
"heed", "heed",
"hf-hub", "hf-hub",
"indexmap", "indexmap",

View File

@ -909,10 +909,8 @@ impl IndexScheduler {
while let Some(doc) = while let Some(doc) =
cursor.next_document().map_err(milli::Error::from)? cursor.next_document().map_err(milli::Error::from)?
{ {
dump_content_file.push_document(&obkv_to_object( dump_content_file
&doc, .push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
&documents_batch_index,
)?)?;
} }
dump_content_file.flush()?; dump_content_file.flush()?;
} }

View File

@ -1642,7 +1642,7 @@ fn add_non_formatted_ids_to_formatted_options(
fn make_document( fn make_document(
displayed_attributes: &BTreeSet<FieldId>, displayed_attributes: &BTreeSet<FieldId>,
field_ids_map: &FieldsIdsMap, field_ids_map: &FieldsIdsMap,
obkv: obkv::KvReaderU16, obkv: &obkv::KvReaderU16,
) -> Result<Document, MeilisearchHttpError> { ) -> Result<Document, MeilisearchHttpError> {
let mut document = serde_json::Map::new(); let mut document = serde_json::Map::new();

View File

@ -244,7 +244,7 @@ fn export_a_dump(
format!("While iterating on content file {:?}", content_file_uuid) format!("While iterating on content file {:?}", content_file_uuid)
})? { })? {
dump_content_file dump_content_file
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?; .push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
} }
dump_content_file.flush()?; dump_content_file.flush()?;
count += 1; count += 1;

View File

@ -28,10 +28,7 @@ fst = "0.4.7"
fxhash = "0.2.1" fxhash = "0.2.1"
geoutils = "0.5.1" geoutils = "0.5.1"
grenad = { version = "0.4.7", default-features = false, features = [ grenad = { version = "0.4.7", default-features = false, features = [
"rayon", "rayon", # TODO Should we keep this feature
"tempfile",
] }
grenad2 = { package = "grenad", version = "0.4.7", default-features = false, features = [
"tempfile" "tempfile"
], git = "https://github.com/meilisearch/grenad", branch = "various-improvements" } ], git = "https://github.com/meilisearch/grenad", branch = "various-improvements" }
heed = { version = "0.20.3", default-features = false, features = [ heed = { version = "0.20.3", default-features = false, features = [

View File

@ -1311,7 +1311,7 @@ impl Index {
})?; })?;
Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> { Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
let (_docid, obkv) = entry?; let (_docid, obkv) = entry?;
match primary_key.document_id(&obkv, &fields)? { match primary_key.document_id(obkv, &fields)? {
Ok(document_id) => Ok(document_id), Ok(document_id) => Ok(document_id),
Err(_) => Err(InternalError::DocumentsError( Err(_) => Err(InternalError::DocumentsError(
crate::documents::Error::InvalidDocumentFormat, crate::documents::Error::InvalidDocumentFormat,

View File

@ -431,7 +431,7 @@ mod tests {
writer.insert(id1, b"1234").unwrap(); writer.insert(id1, b"1234").unwrap();
writer.insert(id2, b"4321").unwrap(); writer.insert(id2, b"4321").unwrap();
let contents = writer.into_inner().unwrap(); let contents = writer.into_inner().unwrap();
let obkv = obkv::KvReaderU16::new(&contents); let obkv = obkv::KvReaderU16::from_slice(&contents);
let expected = json!({ let expected = json!({
"field1": 1234, "field1": 1234,

View File

@ -3,6 +3,7 @@ use std::collections::hash_map::Entry;
use std::hash::Hash; use std::hash::Hash;
use fxhash::FxHashMap; use fxhash::FxHashMap;
use grenad::MergeFunction;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{BytesEncode, Database, RoTxn}; use heed::{BytesEncode, Database, RoTxn};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@ -11,7 +12,7 @@ use super::interner::Interned;
use super::Word; use super::Word;
use crate::heed_codec::{BytesDecodeOwned, StrBEU16Codec}; use crate::heed_codec::{BytesDecodeOwned, StrBEU16Codec};
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::{merge_cbo_roaring_bitmaps, MergeFn}; use crate::update::MergeCboRoaringBitmaps;
use crate::{ use crate::{
CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, Result, SearchContext, U8StrStrCodec, CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, Result, SearchContext, U8StrStrCodec,
}; };
@ -110,19 +111,21 @@ impl<'ctx> DatabaseCache<'ctx> {
.map_err(Into::into) .map_err(Into::into)
} }
fn get_value_from_keys<'v, K1, KC>( fn get_value_from_keys<'v, K1, KC, MF>(
txn: &'ctx RoTxn<'_>, txn: &'ctx RoTxn<'_>,
cache_key: K1, cache_key: K1,
db_keys: &'v [KC::EItem], db_keys: &'v [KC::EItem],
cache: &mut FxHashMap<K1, Option<Cow<'ctx, [u8]>>>, cache: &mut FxHashMap<K1, Option<Cow<'ctx, [u8]>>>,
db: Database<KC, Bytes>, db: Database<KC, Bytes>,
universe: Option<&RoaringBitmap>, universe: Option<&RoaringBitmap>,
merger: MergeFn, merger: MF,
) -> Result<Option<RoaringBitmap>> ) -> Result<Option<RoaringBitmap>>
where where
K1: Copy + Eq + Hash, K1: Copy + Eq + Hash,
KC: BytesEncode<'v>, KC: BytesEncode<'v>,
KC::EItem: Sized, KC::EItem: Sized,
MF: MergeFunction,
crate::Error: From<MF::Error>,
{ {
if let Entry::Vacant(entry) = cache.entry(cache_key) { if let Entry::Vacant(entry) = cache.entry(cache_key) {
let bitmap_ptr: Option<Cow<'ctx, [u8]>> = match db_keys { let bitmap_ptr: Option<Cow<'ctx, [u8]>> = match db_keys {
@ -138,7 +141,7 @@ impl<'ctx> DatabaseCache<'ctx> {
if bitmaps.is_empty() { if bitmaps.is_empty() {
None None
} else { } else {
Some(merger(&[], &bitmaps[..])?) Some(merger.merge(&[], &bitmaps[..])?)
} }
} }
}; };
@ -213,17 +216,17 @@ impl<'ctx> SearchContext<'ctx> {
let keys: Vec<_> = let keys: Vec<_> =
restricted_fids.tolerant.iter().map(|(fid, _)| (interned, *fid)).collect(); restricted_fids.tolerant.iter().map(|(fid, _)| (interned, *fid)).collect();
DatabaseCache::get_value_from_keys::<_, _>( DatabaseCache::get_value_from_keys(
self.txn, self.txn,
word, word,
&keys[..], &keys[..],
&mut self.db_cache.word_docids, &mut self.db_cache.word_docids,
self.index.word_fid_docids.remap_data_type::<Bytes>(), self.index.word_fid_docids.remap_data_type::<Bytes>(),
universe, universe,
merge_cbo_roaring_bitmaps, MergeCboRoaringBitmaps,
) )
} }
None => DatabaseCache::get_value::<_, _>( None => DatabaseCache::get_value(
self.txn, self.txn,
word, word,
self.word_interner.get(word).as_str(), self.word_interner.get(word).as_str(),
@ -245,17 +248,17 @@ impl<'ctx> SearchContext<'ctx> {
let keys: Vec<_> = let keys: Vec<_> =
restricted_fids.exact.iter().map(|(fid, _)| (interned, *fid)).collect(); restricted_fids.exact.iter().map(|(fid, _)| (interned, *fid)).collect();
DatabaseCache::get_value_from_keys::<_, _>( DatabaseCache::get_value_from_keys(
self.txn, self.txn,
word, word,
&keys[..], &keys[..],
&mut self.db_cache.exact_word_docids, &mut self.db_cache.exact_word_docids,
self.index.word_fid_docids.remap_data_type::<Bytes>(), self.index.word_fid_docids.remap_data_type::<Bytes>(),
universe, universe,
merge_cbo_roaring_bitmaps, MergeCboRoaringBitmaps,
) )
} }
None => DatabaseCache::get_value::<_, _>( None => DatabaseCache::get_value(
self.txn, self.txn,
word, word,
self.word_interner.get(word).as_str(), self.word_interner.get(word).as_str(),
@ -302,17 +305,17 @@ impl<'ctx> SearchContext<'ctx> {
let keys: Vec<_> = let keys: Vec<_> =
restricted_fids.tolerant.iter().map(|(fid, _)| (interned, *fid)).collect(); restricted_fids.tolerant.iter().map(|(fid, _)| (interned, *fid)).collect();
DatabaseCache::get_value_from_keys::<_, _>( DatabaseCache::get_value_from_keys(
self.txn, self.txn,
prefix, prefix,
&keys[..], &keys[..],
&mut self.db_cache.word_prefix_docids, &mut self.db_cache.word_prefix_docids,
self.index.word_prefix_fid_docids.remap_data_type::<Bytes>(), self.index.word_prefix_fid_docids.remap_data_type::<Bytes>(),
universe, universe,
merge_cbo_roaring_bitmaps, MergeCboRoaringBitmaps,
) )
} }
None => DatabaseCache::get_value::<_, _>( None => DatabaseCache::get_value(
self.txn, self.txn,
prefix, prefix,
self.word_interner.get(prefix).as_str(), self.word_interner.get(prefix).as_str(),
@ -334,17 +337,17 @@ impl<'ctx> SearchContext<'ctx> {
let keys: Vec<_> = let keys: Vec<_> =
restricted_fids.exact.iter().map(|(fid, _)| (interned, *fid)).collect(); restricted_fids.exact.iter().map(|(fid, _)| (interned, *fid)).collect();
DatabaseCache::get_value_from_keys::<_, _>( DatabaseCache::get_value_from_keys(
self.txn, self.txn,
prefix, prefix,
&keys[..], &keys[..],
&mut self.db_cache.exact_word_prefix_docids, &mut self.db_cache.exact_word_prefix_docids,
self.index.word_prefix_fid_docids.remap_data_type::<Bytes>(), self.index.word_prefix_fid_docids.remap_data_type::<Bytes>(),
universe, universe,
merge_cbo_roaring_bitmaps, MergeCboRoaringBitmaps,
) )
} }
None => DatabaseCache::get_value::<_, _>( None => DatabaseCache::get_value(
self.txn, self.txn,
prefix, prefix,
self.word_interner.get(prefix).as_str(), self.word_interner.get(prefix).as_str(),
@ -405,7 +408,7 @@ impl<'ctx> SearchContext<'ctx> {
Ok(docids) Ok(docids)
} }
ProximityPrecision::ByWord => DatabaseCache::get_value::<_, _>( ProximityPrecision::ByWord => DatabaseCache::get_value(
self.txn, self.txn,
(proximity, word1, word2), (proximity, word1, word2),
&( &(
@ -538,7 +541,7 @@ impl<'ctx> SearchContext<'ctx> {
return Ok(None); return Ok(None);
} }
DatabaseCache::get_value::<_, _>( DatabaseCache::get_value(
self.txn, self.txn,
(word, fid), (word, fid),
&(self.word_interner.get(word).as_str(), fid), &(self.word_interner.get(word).as_str(), fid),
@ -559,7 +562,7 @@ impl<'ctx> SearchContext<'ctx> {
return Ok(None); return Ok(None);
} }
DatabaseCache::get_value::<_, _>( DatabaseCache::get_value(
self.txn, self.txn,
(word_prefix, fid), (word_prefix, fid),
&(self.word_interner.get(word_prefix).as_str(), fid), &(self.word_interner.get(word_prefix).as_str(), fid),
@ -629,7 +632,7 @@ impl<'ctx> SearchContext<'ctx> {
word: Interned<String>, word: Interned<String>,
position: u16, position: u16,
) -> Result<Option<RoaringBitmap>> { ) -> Result<Option<RoaringBitmap>> {
DatabaseCache::get_value::<_, _>( DatabaseCache::get_value(
self.txn, self.txn,
(word, position), (word, position),
&(self.word_interner.get(word).as_str(), position), &(self.word_interner.get(word).as_str(), position),
@ -645,7 +648,7 @@ impl<'ctx> SearchContext<'ctx> {
word_prefix: Interned<String>, word_prefix: Interned<String>,
position: u16, position: u16,
) -> Result<Option<RoaringBitmap>> { ) -> Result<Option<RoaringBitmap>> {
DatabaseCache::get_value::<_, _>( DatabaseCache::get_value(
self.txn, self.txn,
(word_prefix, position), (word_prefix, position),
&(self.word_interner.get(word_prefix).as_str(), position), &(self.word_interner.get(word_prefix).as_str(), position),

View File

@ -14,7 +14,7 @@ use crate::heed_codec::facet::{
use crate::heed_codec::BytesRefCodec; use crate::heed_codec::BytesRefCodec;
use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader}; use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
use crate::update::MergeFn; use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldId, Index, Result}; use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldId, Index, Result};
/// Algorithm to insert elememts into the `facet_id_(string/f64)_docids` databases /// Algorithm to insert elememts into the `facet_id_(string/f64)_docids` databases
@ -29,7 +29,7 @@ pub struct FacetsUpdateBulk<'i> {
facet_type: FacetType, facet_type: FacetType,
field_ids: Vec<FieldId>, field_ids: Vec<FieldId>,
// None if level 0 does not need to be updated // None if level 0 does not need to be updated
delta_data: Option<Merger<BufReader<File>, MergeFn>>, delta_data: Option<Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>>,
} }
impl<'i> FacetsUpdateBulk<'i> { impl<'i> FacetsUpdateBulk<'i> {
@ -37,7 +37,7 @@ impl<'i> FacetsUpdateBulk<'i> {
index: &'i Index, index: &'i Index,
field_ids: Vec<FieldId>, field_ids: Vec<FieldId>,
facet_type: FacetType, facet_type: FacetType,
delta_data: Merger<BufReader<File>, MergeFn>, delta_data: Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>,
group_size: u8, group_size: u8,
min_level_size: u8, min_level_size: u8,
) -> FacetsUpdateBulk<'i> { ) -> FacetsUpdateBulk<'i> {
@ -90,7 +90,7 @@ impl<'i> FacetsUpdateBulk<'i> {
/// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type /// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type
pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> { pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
pub db: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>, pub db: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
pub delta_data: Option<Merger<R, MergeFn>>, pub delta_data: Option<Merger<R, MergeDeladdCboRoaringBitmaps>>,
pub group_size: u8, pub group_size: u8,
pub min_level_size: u8, pub min_level_size: u8,
} }

View File

@ -15,7 +15,7 @@ use crate::heed_codec::BytesRefCodec;
use crate::search::facet::get_highest_level; use crate::search::facet::get_highest_level;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::index_documents::valid_lmdb_key; use crate::update::index_documents::valid_lmdb_key;
use crate::update::MergeFn; use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Index, Result}; use crate::{CboRoaringBitmapCodec, Index, Result};
/// Enum used as a return value for the facet incremental indexing. /// Enum used as a return value for the facet incremental indexing.
@ -57,14 +57,14 @@ enum ModificationResult {
/// `facet_id_(string/f64)_docids` databases. /// `facet_id_(string/f64)_docids` databases.
pub struct FacetsUpdateIncremental { pub struct FacetsUpdateIncremental {
inner: FacetsUpdateIncrementalInner, inner: FacetsUpdateIncrementalInner,
delta_data: Merger<BufReader<File>, MergeFn>, delta_data: Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>,
} }
impl FacetsUpdateIncremental { impl FacetsUpdateIncremental {
pub fn new( pub fn new(
index: &Index, index: &Index,
facet_type: FacetType, facet_type: FacetType,
delta_data: Merger<BufReader<File>, MergeFn>, delta_data: Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>,
group_size: u8, group_size: u8,
min_level_size: u8, min_level_size: u8,
max_group_size: u8, max_group_size: u8,

View File

@ -86,12 +86,11 @@ use time::OffsetDateTime;
use tracing::debug; use tracing::debug;
use self::incremental::FacetsUpdateIncremental; use self::incremental::FacetsUpdateIncremental;
use super::FacetsUpdateBulk; use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec}; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::BytesRefCodec; use crate::heed_codec::BytesRefCodec;
use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::MergeFn;
use crate::{try_split_array_at, FieldId, Index, Result}; use crate::{try_split_array_at, FieldId, Index, Result};
pub mod bulk; pub mod bulk;
@ -105,8 +104,8 @@ pub struct FacetsUpdate<'i> {
index: &'i Index, index: &'i Index,
database: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>, database: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
facet_type: FacetType, facet_type: FacetType,
delta_data: Merger<BufReader<File>, MergeFn>, delta_data: Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>,
normalized_delta_data: Option<Merger<BufReader<File>, MergeFn>>, normalized_delta_data: Option<Merger<BufReader<File>, MergeDeladdBtreesetString>>,
group_size: u8, group_size: u8,
max_group_size: u8, max_group_size: u8,
min_level_size: u8, min_level_size: u8,
@ -116,8 +115,8 @@ impl<'i> FacetsUpdate<'i> {
pub fn new( pub fn new(
index: &'i Index, index: &'i Index,
facet_type: FacetType, facet_type: FacetType,
delta_data: Merger<BufReader<File>, MergeFn>, delta_data: Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>,
normalized_delta_data: Option<Merger<BufReader<File>, MergeFn>>, normalized_delta_data: Option<Merger<BufReader<File>, MergeDeladdBtreesetString>>,
data_size: u64, data_size: u64,
) -> Self { ) -> Self {
let database = match facet_type { let database = match facet_type {
@ -182,7 +181,7 @@ impl<'i> FacetsUpdate<'i> {
fn index_facet_search( fn index_facet_search(
wtxn: &mut heed::RwTxn<'_>, wtxn: &mut heed::RwTxn<'_>,
normalized_delta_data: Merger<BufReader<File>, MergeFn>, normalized_delta_data: Merger<BufReader<File>, MergeDeladdBtreesetString>,
index: &Index, index: &Index,
) -> Result<()> { ) -> Result<()> {
let mut iter = normalized_delta_data.into_stream_merger_iter()?; let mut iter = normalized_delta_data.into_stream_merger_iter()?;
@ -298,8 +297,8 @@ pub(crate) mod test_helpers {
use crate::search::facet::get_highest_level; use crate::search::facet::get_highest_level;
use crate::snapshot_tests::display_bitmap; use crate::snapshot_tests::display_bitmap;
use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::index_documents::merge_deladd_cbo_roaring_bitmaps; use crate::update::index_documents::MergeDeladdCboRoaringBitmaps;
use crate::update::{FacetsUpdateIncrementalInner, MergeFn}; use crate::update::FacetsUpdateIncrementalInner;
use crate::CboRoaringBitmapCodec; use crate::CboRoaringBitmapCodec;
/// Utility function to generate a string whose position in a lexicographically /// Utility function to generate a string whose position in a lexicographically
@ -484,7 +483,7 @@ pub(crate) mod test_helpers {
} }
writer.finish().unwrap(); writer.finish().unwrap();
let reader = grenad::Reader::new(std::io::Cursor::new(new_data)).unwrap(); let reader = grenad::Reader::new(std::io::Cursor::new(new_data)).unwrap();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
builder.push(reader.into_cursor().unwrap()); builder.push(reader.into_cursor().unwrap());
let merger = builder.build(); let merger = builder.build();

View File

@ -47,7 +47,7 @@ pub fn enrich_documents_batch<R: Read + Seek>(
return match cursor.next_document()? { return match cursor.next_document()? {
Some(first_document) => Ok(Err(UserError::MissingDocumentId { Some(first_document) => Ok(Err(UserError::MissingDocumentId {
primary_key: primary_key.to_string(), primary_key: primary_key.to_string(),
document: obkv_to_object(&first_document, &documents_batch_index)?, document: obkv_to_object(first_document, &documents_batch_index)?,
})), })),
None => unreachable!("Called with reader.is_empty()"), None => unreachable!("Called with reader.is_empty()"),
}; };
@ -106,7 +106,7 @@ pub fn enrich_documents_batch<R: Read + Seek>(
let mut count = 0; let mut count = 0;
while let Some(document) = cursor.next_document()? { while let Some(document) = cursor.next_document()? {
let document_id = match fetch_or_generate_document_id( let document_id = match fetch_or_generate_document_id(
&document, document,
&documents_batch_index, &documents_batch_index,
primary_key, primary_key,
autogenerate_docids, autogenerate_docids,

View File

@ -8,7 +8,7 @@ use obkv::{KvReader, KvWriterU16};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::Value; use serde_json::Value;
use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters}; use super::helpers::{create_sorter, sorter_into_reader, GrenadParameters, KeepLatestObkv};
use crate::error::{InternalError, SerializationError}; use crate::error::{InternalError, SerializationError};
use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd}; use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd};
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
@ -35,7 +35,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
let mut documents_ids = RoaringBitmap::new(); let mut documents_ids = RoaringBitmap::new();
let mut docid_word_positions_sorter = create_sorter( let mut docid_word_positions_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
keep_latest_obkv, KeepLatestObkv,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -83,7 +83,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
let obkv = KvReader::<FieldId>::from_slice(value); let obkv = KvReader::<FieldId>::from_slice(value);
// if the searchable fields didn't change, skip the searchable indexing for this document. // if the searchable fields didn't change, skip the searchable indexing for this document.
if !force_reindexing && !searchable_fields_changed(&obkv, settings_diff) { if !force_reindexing && !searchable_fields_changed(obkv, settings_diff) {
continue; continue;
} }
@ -98,7 +98,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|| { || {
// deletions // deletions
tokens_from_document( tokens_from_document(
&obkv, obkv,
&settings_diff.old, &settings_diff.old,
&del_tokenizer, &del_tokenizer,
max_positions_per_attributes, max_positions_per_attributes,
@ -109,7 +109,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|| { || {
// additions // additions
tokens_from_document( tokens_from_document(
&obkv, obkv,
&settings_diff.new, &settings_diff.new,
&add_tokenizer, &add_tokenizer,
max_positions_per_attributes, max_positions_per_attributes,
@ -126,8 +126,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
// transforming two KV<FieldId, KV<u16, String>> into one KV<FieldId, KV<DelAdd, KV<u16, String>>> // transforming two KV<FieldId, KV<u16, String>> into one KV<FieldId, KV<DelAdd, KV<u16, String>>>
value_buffer.clear(); value_buffer.clear();
del_add_from_two_obkvs( del_add_from_two_obkvs(
&KvReader::<FieldId>::from_slice(del_obkv), KvReader::<FieldId>::from_slice(del_obkv),
&KvReader::<FieldId>::from_slice(add_obkv), KvReader::<FieldId>::from_slice(add_obkv),
&mut value_buffer, &mut value_buffer,
)?; )?;

View File

@ -4,7 +4,7 @@ use std::io::{self, BufReader};
use heed::{BytesDecode, BytesEncode}; use heed::{BytesDecode, BytesEncode};
use super::helpers::{ use super::helpers::{
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters, create_sorter, sorter_into_reader, GrenadParameters, MergeDeladdCboRoaringBitmaps,
}; };
use crate::heed_codec::facet::{ use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec, FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
@ -27,7 +27,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
let mut facet_number_docids_sorter = create_sorter( let mut facet_number_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,

View File

@ -15,7 +15,7 @@ use crate::heed_codec::{BEU16StrCodec, StrRefCodec};
use crate::localized_attributes_rules::LocalizedFieldIds; use crate::localized_attributes_rules::LocalizedFieldIds;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::helpers::{ use crate::update::index_documents::helpers::{
merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps,
}; };
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
@ -56,7 +56,7 @@ fn extract_facet_string_docids_document_update<R: io::Read + io::Seek>(
let mut facet_string_docids_sorter = create_sorter( let mut facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -65,7 +65,7 @@ fn extract_facet_string_docids_document_update<R: io::Read + io::Seek>(
let mut normalized_facet_string_docids_sorter = create_sorter( let mut normalized_facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
merge_deladd_btreeset_string, MergeDeladdBtreesetString,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -144,7 +144,7 @@ fn extract_facet_string_docids_settings<R: io::Read + io::Seek>(
let mut facet_string_docids_sorter = create_sorter( let mut facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -153,7 +153,7 @@ fn extract_facet_string_docids_settings<R: io::Read + io::Seek>(
let mut normalized_facet_string_docids_sorter = create_sorter( let mut normalized_facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
merge_deladd_btreeset_string, MergeDeladdBtreesetString,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,

View File

@ -1,10 +1,8 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryInto; use std::convert::TryInto;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader}; use std::io::{self, BufReader};
use std::mem::size_of; use std::mem::size_of;
use std::result::Result as StdResult;
use bytemuck::bytes_of; use bytemuck::bytes_of;
use grenad::Sorter; use grenad::Sorter;
@ -15,13 +13,13 @@ use roaring::RoaringBitmap;
use serde_json::{from_slice, Value}; use serde_json::{from_slice, Value};
use FilterableValues::{Empty, Null, Values}; use FilterableValues::{Empty, Null, Values};
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use super::helpers::{create_sorter, sorter_into_reader, GrenadParameters, KeepFirst};
use crate::error::InternalError; use crate::error::InternalError;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::{create_writer, writer_into_reader}; use crate::update::index_documents::{create_writer, writer_into_reader};
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::{CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, MAX_FACET_VALUE_LENGTH}; use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, MAX_FACET_VALUE_LENGTH};
/// The length of the elements that are always in the buffer when inserting new values. /// The length of the elements that are always in the buffer when inserting new values.
const TRUNCATE_SIZE: usize = size_of::<FieldId>() + size_of::<DocumentId>(); const TRUNCATE_SIZE: usize = size_of::<FieldId>() + size_of::<DocumentId>();
@ -50,7 +48,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
let mut fid_docid_facet_numbers_sorter = create_sorter( let mut fid_docid_facet_numbers_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
keep_first, KeepFirst,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -59,7 +57,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
let mut fid_docid_facet_strings_sorter = create_sorter( let mut fid_docid_facet_strings_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
keep_first, KeepFirst,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -330,15 +328,12 @@ fn truncate_str(s: &str) -> &str {
/// Computes the diff between both Del and Add numbers and /// Computes the diff between both Del and Add numbers and
/// only inserts the parts that differ in the sorter. /// only inserts the parts that differ in the sorter.
fn insert_numbers_diff<MF>( fn insert_numbers_diff(
fid_docid_facet_numbers_sorter: &mut Sorter<MF>, fid_docid_facet_numbers_sorter: &mut Sorter<KeepFirst>,
key_buffer: &mut Vec<u8>, key_buffer: &mut Vec<u8>,
mut del_numbers: Vec<f64>, mut del_numbers: Vec<f64>,
mut add_numbers: Vec<f64>, mut add_numbers: Vec<f64>,
) -> Result<()> ) -> Result<()> {
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
{
// We sort and dedup the float numbers // We sort and dedup the float numbers
del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
@ -390,15 +385,12 @@ where
/// Computes the diff between both Del and Add strings and /// Computes the diff between both Del and Add strings and
/// only inserts the parts that differ in the sorter. /// only inserts the parts that differ in the sorter.
fn insert_strings_diff<MF>( fn insert_strings_diff(
fid_docid_facet_strings_sorter: &mut Sorter<MF>, fid_docid_facet_strings_sorter: &mut Sorter<KeepFirst>,
key_buffer: &mut Vec<u8>, key_buffer: &mut Vec<u8>,
mut del_strings: Vec<(String, String)>, mut del_strings: Vec<(String, String)>,
mut add_strings: Vec<(String, String)>, mut add_strings: Vec<(String, String)>,
) -> Result<()> ) -> Result<()> {
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
{
// We sort and dedup the normalized and original strings // We sort and dedup the normalized and original strings
del_strings.sort_unstable(); del_strings.sort_unstable();
add_strings.sort_unstable(); add_strings.sort_unstable();

View File

@ -4,8 +4,8 @@ use std::io::{self, BufReader};
use obkv::KvReaderU16; use obkv::KvReaderU16;
use super::helpers::{ use super::helpers::{
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters,
GrenadParameters, MergeDeladdCboRoaringBitmaps,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
@ -30,7 +30,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
let mut fid_word_count_docids_sorter = create_sorter( let mut fid_word_count_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,

View File

@ -40,11 +40,9 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
}; };
// extract old version // extract old version
let del_lat_lng = let del_lat_lng = extract_lat_lng(obkv, &settings_diff.old, DelAdd::Deletion, document_id)?;
extract_lat_lng(&obkv, &settings_diff.old, DelAdd::Deletion, document_id)?;
// extract new version // extract new version
let add_lat_lng = let add_lat_lng = extract_lat_lng(obkv, &settings_diff.new, DelAdd::Addition, document_id)?;
extract_lat_lng(&obkv, &settings_diff.new, DelAdd::Addition, document_id)?;
if del_lat_lng != add_lat_lng { if del_lat_lng != add_lat_lng {
let mut obkv = KvWriterDelAdd::memory(); let mut obkv = KvWriterDelAdd::memory();

View File

@ -7,8 +7,8 @@ use obkv::KvReaderU16;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::helpers::{ use super::helpers::{
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, create_sorter, create_writer, try_split_array_at, writer_into_reader, GrenadParameters,
writer_into_reader, GrenadParameters, MergeDeladdCboRoaringBitmaps,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::heed_codec::StrBEU16Codec; use crate::heed_codec::StrBEU16Codec;
@ -16,7 +16,6 @@ use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::helpers::sorter_into_reader; use crate::update::index_documents::helpers::sorter_into_reader;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result}; use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result};
/// Extracts the word and the documents ids where this word appear. /// Extracts the word and the documents ids where this word appear.
@ -40,7 +39,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
let mut word_fid_docids_sorter = create_sorter( let mut word_fid_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -94,7 +93,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
let mut word_docids_sorter = create_sorter( let mut word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -103,7 +102,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
let mut exact_word_docids_sorter = create_sorter( let mut exact_word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -163,7 +162,7 @@ fn words_into_sorter(
key_buffer: &mut Vec<u8>, key_buffer: &mut Vec<u8>,
del_words: &BTreeSet<Vec<u8>>, del_words: &BTreeSet<Vec<u8>>,
add_words: &BTreeSet<Vec<u8>>, add_words: &BTreeSet<Vec<u8>>,
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>, word_fid_docids_sorter: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};

View File

@ -6,8 +6,8 @@ use std::{cmp, io};
use obkv::KvReaderU16; use obkv::KvReaderU16;
use super::helpers::{ use super::helpers::{
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, create_sorter, create_writer, try_split_array_at, writer_into_reader, GrenadParameters,
writer_into_reader, GrenadParameters, MergeFn, MergeDeladdCboRoaringBitmaps,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
@ -44,7 +44,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
.map(|_| { .map(|_| {
create_sorter( create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -197,7 +197,7 @@ fn document_word_positions_into_sorter(
document_id: DocumentId, document_id: DocumentId,
del_word_pair_proximity: &BTreeMap<(String, String), u8>, del_word_pair_proximity: &BTreeMap<(String, String), u8>,
add_word_pair_proximity: &BTreeMap<(String, String), u8>, add_word_pair_proximity: &BTreeMap<(String, String), u8>,
word_pair_proximity_docids_sorters: &mut [grenad::Sorter<MergeFn>], word_pair_proximity_docids_sorters: &mut [grenad::Sorter<MergeDeladdCboRoaringBitmaps>],
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};

View File

@ -5,14 +5,13 @@ use std::io::{self, BufReader};
use obkv::KvReaderU16; use obkv::KvReaderU16;
use super::helpers::{ use super::helpers::{
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters,
GrenadParameters, MergeDeladdCboRoaringBitmaps,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
use crate::{bucketed_position, DocumentId, Result}; use crate::{bucketed_position, DocumentId, Result};
/// Extracts the word positions and the documents ids where this word appear. /// Extracts the word positions and the documents ids where this word appear.
@ -29,7 +28,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
let mut word_position_docids_sorter = create_sorter( let mut word_position_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -100,7 +99,7 @@ fn words_position_into_sorter(
key_buffer: &mut Vec<u8>, key_buffer: &mut Vec<u8>,
del_word_positions: &BTreeSet<(u16, Vec<u8>)>, del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
add_word_positions: &BTreeSet<(u16, Vec<u8>)>, add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>, word_position_docids_sorter: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};

View File

@ -1,11 +1,10 @@
use std::borrow::Cow;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, BufWriter, Seek}; use std::io::{self, BufReader, BufWriter, Seek};
use grenad::{CompressionType, Sorter}; use grenad::{CompressionType, MergeFunction, Sorter};
use heed::types::Bytes; use heed::types::Bytes;
use super::{ClonableMmap, MergeFn}; use super::ClonableMmap;
use crate::update::index_documents::valid_lmdb_key; use crate::update::index_documents::valid_lmdb_key;
use crate::Result; use crate::Result;
@ -31,14 +30,14 @@ pub fn create_writer<R: io::Write>(
/// A helper function that creates a grenad sorter /// A helper function that creates a grenad sorter
/// with the given parameters. The max memory is /// with the given parameters. The max memory is
/// clamped to something reasonable. /// clamped to something reasonable.
pub fn create_sorter( pub fn create_sorter<MF: MergeFunction>(
sort_algorithm: grenad::SortAlgorithm, sort_algorithm: grenad::SortAlgorithm,
merge: MergeFn, merge: MF,
chunk_compression_type: grenad::CompressionType, chunk_compression_type: grenad::CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
max_nb_chunks: Option<usize>, max_nb_chunks: Option<usize>,
max_memory: Option<usize>, max_memory: Option<usize>,
) -> grenad::Sorter<MergeFn> { ) -> grenad::Sorter<MF> {
let mut builder = grenad::Sorter::builder(merge); let mut builder = grenad::Sorter::builder(merge);
builder.chunk_compression_type(chunk_compression_type); builder.chunk_compression_type(chunk_compression_type);
if let Some(level) = chunk_compression_level { if let Some(level) = chunk_compression_level {
@ -57,10 +56,14 @@ pub fn create_sorter(
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")]
pub fn sorter_into_reader( pub fn sorter_into_reader<MF>(
sorter: grenad::Sorter<MergeFn>, sorter: grenad::Sorter<MF>,
indexer: GrenadParameters, indexer: GrenadParameters,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>>
where
MF: MergeFunction,
crate::Error: From<MF::Error>,
{
let mut writer = create_writer( let mut writer = create_writer(
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
@ -169,8 +172,8 @@ pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
/// Write provided sorter in database using serialize_value function. /// Write provided sorter in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database. /// merge_values function is used if an entry already exist in the database.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")]
pub fn write_sorter_into_database<K, V, FS, FM>( pub fn write_sorter_into_database<K, V, FS, FM, MF>(
sorter: Sorter<MergeFn>, sorter: Sorter<MF>,
database: &heed::Database<K, V>, database: &heed::Database<K, V>,
wtxn: &mut heed::RwTxn<'_>, wtxn: &mut heed::RwTxn<'_>,
index_is_empty: bool, index_is_empty: bool,
@ -180,6 +183,8 @@ pub fn write_sorter_into_database<K, V, FS, FM>(
where where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>, FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>, FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
MF: MergeFunction,
crate::Error: From<MF::Error>,
{ {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let database = database.remap_types::<Bytes, Bytes>(); let database = database.remap_types::<Bytes, Bytes>();
@ -207,8 +212,3 @@ where
Ok(()) Ok(())
} }
/// Used when trying to merge readers, but you don't actually care about the values.
pub fn merge_ignore_values<'a>(_key: &[u8], _values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
Ok(Cow::Owned(Vec::new()))
}

View File

@ -3,6 +3,8 @@ use std::collections::BTreeSet;
use std::io; use std::io;
use std::result::Result as StdResult; use std::result::Result as StdResult;
use either::Either;
use grenad::MergeFunction;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::heed_codec::CboRoaringBitmapCodec; use crate::heed_codec::CboRoaringBitmapCodec;
@ -10,7 +12,8 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::transform::Operation; use crate::update::index_documents::transform::Operation;
use crate::Result; use crate::Result;
pub type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>>; pub type EitherObkvMerge =
Either<ObkvsKeepLastAdditionMergeDeletions, ObkvsMergeAdditionsAndDeletions>;
pub fn serialize_roaring_bitmap(bitmap: &RoaringBitmap, buffer: &mut Vec<u8>) -> io::Result<()> { pub fn serialize_roaring_bitmap(bitmap: &RoaringBitmap, buffer: &mut Vec<u8>) -> io::Result<()> {
buffer.clear(); buffer.clear();
@ -18,7 +21,12 @@ pub fn serialize_roaring_bitmap(bitmap: &RoaringBitmap, buffer: &mut Vec<u8>) ->
bitmap.serialize_into(buffer) bitmap.serialize_into(buffer)
} }
pub fn merge_roaring_bitmaps<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> { pub struct MergeRoaringBitmaps;
impl MergeFunction for MergeRoaringBitmaps {
type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 { if values.len() == 1 {
Ok(values[0].clone()) Ok(values[0].clone())
} else { } else {
@ -34,15 +42,28 @@ pub fn merge_roaring_bitmaps<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Resul
Ok(Cow::Owned(buffer)) Ok(Cow::Owned(buffer))
} }
} }
}
pub fn keep_first<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> { pub struct KeepFirst;
impl MergeFunction for KeepFirst {
type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
Ok(values[0].clone()) Ok(values[0].clone())
} }
}
/// Only the last value associated with an id is kept. /// Only the last value associated with an id is kept.
pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> { pub struct KeepLatestObkv;
impl MergeFunction for KeepLatestObkv {
type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
Ok(obkvs.last().unwrap().clone()) Ok(obkvs.last().unwrap().clone())
} }
}
pub fn merge_two_del_add_obkvs( pub fn merge_two_del_add_obkvs(
base: &obkv::KvReaderU16, base: &obkv::KvReaderU16,
@ -145,26 +166,36 @@ fn inner_merge_del_add_obkvs<'a>(
} }
/// Merge all the obkvs from the newest to the oldest. /// Merge all the obkvs from the newest to the oldest.
pub fn obkvs_merge_additions_and_deletions<'a>( #[derive(Copy, Clone)]
_key: &[u8], pub struct ObkvsMergeAdditionsAndDeletions;
obkvs: &[Cow<'a, [u8]>],
) -> Result<Cow<'a, [u8]>> { impl MergeFunction for ObkvsMergeAdditionsAndDeletions {
type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
inner_merge_del_add_obkvs(obkvs, true) inner_merge_del_add_obkvs(obkvs, true)
} }
}
/// Merge all the obkvs deletions from the newest to the oldest and keep only the newest additions. /// Merge all the obkvs deletions from the newest to the oldest and keep only the newest additions.
pub fn obkvs_keep_last_addition_merge_deletions<'a>( #[derive(Copy, Clone)]
_key: &[u8], pub struct ObkvsKeepLastAdditionMergeDeletions;
obkvs: &[Cow<'a, [u8]>],
) -> Result<Cow<'a, [u8]>> { impl MergeFunction for ObkvsKeepLastAdditionMergeDeletions {
type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
inner_merge_del_add_obkvs(obkvs, false) inner_merge_del_add_obkvs(obkvs, false)
} }
}
/// Do a union of all the CboRoaringBitmaps in the values. /// Do a union of all the CboRoaringBitmaps in the values.
pub fn merge_cbo_roaring_bitmaps<'a>( pub struct MergeCboRoaringBitmaps;
_key: &[u8],
values: &[Cow<'a, [u8]>], impl MergeFunction for MergeCboRoaringBitmaps {
) -> Result<Cow<'a, [u8]>> { type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 { if values.len() == 1 {
Ok(values[0].clone()) Ok(values[0].clone())
} else { } else {
@ -173,13 +204,16 @@ pub fn merge_cbo_roaring_bitmaps<'a>(
Ok(Cow::from(vec)) Ok(Cow::from(vec))
} }
} }
}
/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv /// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions. /// separately and outputs a new DelAdd with both unions.
pub fn merge_deladd_cbo_roaring_bitmaps<'a>( pub struct MergeDeladdCboRoaringBitmaps;
_key: &[u8],
values: &[Cow<'a, [u8]>], impl MergeFunction for MergeDeladdCboRoaringBitmaps {
) -> Result<Cow<'a, [u8]>> { type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 { if values.len() == 1 {
Ok(values[0].clone()) Ok(values[0].clone())
} else { } else {
@ -206,6 +240,7 @@ pub fn merge_deladd_cbo_roaring_bitmaps<'a>(
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
} }
} }
}
/// A function that merges a DelAdd of bitmao into an already existing bitmap. /// A function that merges a DelAdd of bitmao into an already existing bitmap.
/// ///
@ -225,10 +260,12 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>(
/// Do a union of BtreeSet on both sides of a DelAdd obkv /// Do a union of BtreeSet on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions. /// separately and outputs a new DelAdd with both unions.
pub fn merge_deladd_btreeset_string<'a>( pub struct MergeDeladdBtreesetString;
_key: &[u8],
values: &[Cow<'a, [u8]>], impl MergeFunction for MergeDeladdBtreesetString {
) -> Result<Cow<'a, [u8]>> { type Error = crate::Error;
fn merge<'a>(&self, _key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 { if values.len() == 1 {
Ok(values[0].clone()) Ok(values[0].clone())
} else { } else {
@ -259,3 +296,19 @@ pub fn merge_deladd_btreeset_string<'a>(
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
} }
} }
}
/// Used when trying to merge readers, but you don't actually care about the values.
pub struct MergeIgnoreValues;
impl MergeFunction for MergeIgnoreValues {
type Error = crate::Error;
fn merge<'a>(
&self,
_key: &[u8],
_values: &[Cow<'a, [u8]>],
) -> std::result::Result<Cow<'a, [u8]>, Self::Error> {
Ok(Cow::Owned(Vec::new()))
}
}

View File

@ -7,17 +7,8 @@ use std::convert::{TryFrom, TryInto};
pub use clonable_mmap::{ClonableMmap, CursorClonableMmap}; pub use clonable_mmap::{ClonableMmap, CursorClonableMmap};
use fst::{IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
pub use grenad_helpers::{ pub use grenad_helpers::*;
as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, pub use merge_functions::*;
merge_ignore_values, sorter_into_reader, write_sorter_into_database, writer_into_reader,
GrenadParameters,
};
pub use merge_functions::{
keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_deladd_btreeset_string,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions,
obkvs_merge_additions_and_deletions, MergeFn,
};
use crate::MAX_WORD_LENGTH; use crate::MAX_WORD_LENGTH;

View File

@ -27,13 +27,7 @@ use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk};
use self::enrich::enrich_documents_batch; use self::enrich::enrich_documents_batch;
pub use self::enrich::{extract_finite_float_from_value, DocumentId}; pub use self::enrich::{extract_finite_float_from_value, DocumentId};
pub use self::helpers::{ pub use self::helpers::*;
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps,
valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn,
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchReader}; use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
@ -605,7 +599,7 @@ where
let cloneable_chunk = let cloneable_chunk =
unsafe { as_cloneable_grenad(&word_docids_reader)? }; unsafe { as_cloneable_grenad(&word_docids_reader)? };
let word_docids = word_docids.get_or_insert_with(|| { let word_docids = word_docids.get_or_insert_with(|| {
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn) MergerBuilder::new(MergeDeladdCboRoaringBitmaps)
}); });
word_docids.push(cloneable_chunk.into_cursor()?); word_docids.push(cloneable_chunk.into_cursor()?);
let cloneable_chunk = let cloneable_chunk =
@ -613,14 +607,14 @@ where
let exact_word_docids = let exact_word_docids =
exact_word_docids.get_or_insert_with(|| { exact_word_docids.get_or_insert_with(|| {
MergerBuilder::new( MergerBuilder::new(
merge_deladd_cbo_roaring_bitmaps as MergeFn, MergeDeladdCboRoaringBitmaps,
) )
}); });
exact_word_docids.push(cloneable_chunk.into_cursor()?); exact_word_docids.push(cloneable_chunk.into_cursor()?);
let cloneable_chunk = let cloneable_chunk =
unsafe { as_cloneable_grenad(&word_fid_docids_reader)? }; unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
let word_fid_docids = word_fid_docids.get_or_insert_with(|| { let word_fid_docids = word_fid_docids.get_or_insert_with(|| {
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn) MergerBuilder::new(MergeDeladdCboRoaringBitmaps)
}); });
word_fid_docids.push(cloneable_chunk.into_cursor()?); word_fid_docids.push(cloneable_chunk.into_cursor()?);
TypedChunk::WordDocids { TypedChunk::WordDocids {
@ -634,7 +628,7 @@ where
let word_position_docids = let word_position_docids =
word_position_docids.get_or_insert_with(|| { word_position_docids.get_or_insert_with(|| {
MergerBuilder::new( MergerBuilder::new(
merge_deladd_cbo_roaring_bitmaps as MergeFn, MergeDeladdCboRoaringBitmaps,
) )
}); });
word_position_docids.push(cloneable_chunk.into_cursor()?); word_position_docids.push(cloneable_chunk.into_cursor()?);
@ -719,10 +713,10 @@ where
)] )]
pub fn execute_prefix_databases( pub fn execute_prefix_databases(
self, self,
word_docids: Option<Merger<CursorClonableMmap, MergeFn>>, word_docids: Option<Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>>,
exact_word_docids: Option<Merger<CursorClonableMmap, MergeFn>>, exact_word_docids: Option<Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>>,
word_position_docids: Option<Merger<CursorClonableMmap, MergeFn>>, word_position_docids: Option<Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>>,
word_fid_docids: Option<Merger<CursorClonableMmap, MergeFn>>, word_fid_docids: Option<Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>>,
) -> Result<()> ) -> Result<()>
where where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
@ -902,7 +896,7 @@ where
)] )]
fn execute_word_prefix_docids( fn execute_word_prefix_docids(
txn: &mut heed::RwTxn<'_>, txn: &mut heed::RwTxn<'_>,
merger: Merger<CursorClonableMmap, MergeFn>, merger: Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>,
word_docids_db: Database<Str, CboRoaringBitmapCodec>, word_docids_db: Database<Str, CboRoaringBitmapCodec>,
word_prefix_docids_db: Database<Str, CboRoaringBitmapCodec>, word_prefix_docids_db: Database<Str, CboRoaringBitmapCodec>,
indexer_config: &IndexerConfig, indexer_config: &IndexerConfig,

View File

@ -5,6 +5,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File; use std::fs::File;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use either::Either;
use fxhash::FxHashMap; use fxhash::FxHashMap;
use itertools::Itertools; use itertools::Itertools;
use obkv::{KvReader, KvReaderU16, KvWriter}; use obkv::{KvReader, KvReaderU16, KvWriter};
@ -13,10 +14,10 @@ use serde_json::Value;
use smartstring::SmartString; use smartstring::SmartString;
use super::helpers::{ use super::helpers::{
create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions, create_sorter, create_writer, sorter_into_reader, EitherObkvMerge,
obkvs_merge_additions_and_deletions, sorter_into_reader, MergeFn, ObkvsKeepLastAdditionMergeDeletions, ObkvsMergeAdditionsAndDeletions,
}; };
use super::{IndexDocumentsMethod, IndexerConfig}; use super::{IndexDocumentsMethod, IndexerConfig, KeepFirst};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key}; use crate::index::{db_name, main_key};
@ -59,8 +60,8 @@ pub struct Transform<'a, 'i> {
// Both grenad follows the same format: // Both grenad follows the same format:
// key | value // key | value
// u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored // u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored
original_sorter: grenad::Sorter<MergeFn>, original_sorter: grenad::Sorter<EitherObkvMerge>,
flattened_sorter: grenad::Sorter<MergeFn>, flattened_sorter: grenad::Sorter<EitherObkvMerge>,
replaced_documents_ids: RoaringBitmap, replaced_documents_ids: RoaringBitmap,
new_documents_ids: RoaringBitmap, new_documents_ids: RoaringBitmap,
@ -108,17 +109,19 @@ impl<'a, 'i> Transform<'a, 'i> {
index_documents_method: IndexDocumentsMethod, index_documents_method: IndexDocumentsMethod,
_autogenerate_docids: bool, _autogenerate_docids: bool,
) -> Result<Self> { ) -> Result<Self> {
use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
// We must choose the appropriate merge function for when two or more documents // We must choose the appropriate merge function for when two or more documents
// with the same user id must be merged or fully replaced in the same batch. // with the same user id must be merged or fully replaced in the same batch.
let merge_function = match index_documents_method { let merge_function = match index_documents_method {
IndexDocumentsMethod::ReplaceDocuments => obkvs_keep_last_addition_merge_deletions, ReplaceDocuments => Either::Left(ObkvsKeepLastAdditionMergeDeletions),
IndexDocumentsMethod::UpdateDocuments => obkvs_merge_additions_and_deletions, UpdateDocuments => Either::Right(ObkvsMergeAdditionsAndDeletions),
}; };
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let original_sorter = create_sorter( let original_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
merge_function, merge_function.clone(),
indexer_settings.chunk_compression_type, indexer_settings.chunk_compression_type,
indexer_settings.chunk_compression_level, indexer_settings.chunk_compression_level,
indexer_settings.max_nb_chunks, indexer_settings.max_nb_chunks,
@ -979,7 +982,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut original_sorter = if settings_diff.reindex_vectors() { let mut original_sorter = if settings_diff.reindex_vectors() {
Some(create_sorter( Some(create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
keep_first, KeepFirst,
self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level, self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks, self.indexer_settings.max_nb_chunks,
@ -1023,7 +1026,7 @@ impl<'a, 'i> Transform<'a, 'i> {
if settings_diff.reindex_searchable() || settings_diff.reindex_facets() { if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
Some(create_sorter( Some(create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
keep_first, KeepFirst,
self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level, self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks, self.indexer_settings.max_nb_chunks,
@ -1162,6 +1165,8 @@ fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use grenad::MergeFunction;
use super::*; use super::*;
#[test] #[test]
@ -1219,25 +1224,32 @@ mod test {
.unwrap(); .unwrap();
additive_doc_0_1.insert(0, Operation::Addition as u8); additive_doc_0_1.insert(0, Operation::Addition as u8);
let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())]) let ret = MergeFunction::merge(
&ObkvsMergeAdditionsAndDeletions,
&[],
&[Cow::from(additive_doc_0.as_slice())],
)
.unwrap(); .unwrap();
assert_eq!(*ret, additive_doc_0); assert_eq!(*ret, additive_doc_0);
let ret = obkvs_merge_additions_and_deletions( let ret = MergeFunction::merge(
&ObkvsMergeAdditionsAndDeletions,
&[], &[],
&[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())], &[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())],
) )
.unwrap(); .unwrap();
assert_eq!(*ret, del_add_doc_0); assert_eq!(*ret, del_add_doc_0);
let ret = obkvs_merge_additions_and_deletions( let ret = MergeFunction::merge(
&ObkvsMergeAdditionsAndDeletions,
&[], &[],
&[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())], &[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())],
) )
.unwrap(); .unwrap();
assert_eq!(*ret, deletive_doc_0); assert_eq!(*ret, deletive_doc_0);
let ret = obkvs_merge_additions_and_deletions( let ret = MergeFunction::merge(
&ObkvsMergeAdditionsAndDeletions,
&[], &[],
&[ &[
Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_1.as_slice()),
@ -1248,21 +1260,24 @@ mod test {
.unwrap(); .unwrap();
assert_eq!(*ret, del_add_doc_0); assert_eq!(*ret, del_add_doc_0);
let ret = obkvs_merge_additions_and_deletions( let ret = MergeFunction::merge(
&ObkvsMergeAdditionsAndDeletions,
&[], &[],
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())], &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
) )
.unwrap(); .unwrap();
assert_eq!(*ret, additive_doc_0_1); assert_eq!(*ret, additive_doc_0_1);
let ret = obkvs_keep_last_addition_merge_deletions( let ret = MergeFunction::merge(
&ObkvsKeepLastAdditionMergeDeletions,
&[], &[],
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())], &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
) )
.unwrap(); .unwrap();
assert_eq!(*ret, additive_doc_0); assert_eq!(*ret, additive_doc_0);
let ret = obkvs_keep_last_addition_merge_deletions( let ret = MergeFunction::merge(
&ObkvsKeepLastAdditionMergeDeletions,
&[], &[],
&[ &[
Cow::from(deletive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice()),

View File

@ -4,18 +4,17 @@ use std::fs::File;
use std::io::{self, BufReader}; use std::io::{self, BufReader};
use bytemuck::allocation::pod_collect_to_vec; use bytemuck::allocation::pod_collect_to_vec;
use grenad::{Merger, MergerBuilder}; use grenad::{MergeFunction, Merger, MergerBuilder};
use heed::types::Bytes; use heed::types::Bytes;
use heed::{BytesDecode, RwTxn}; use heed::{BytesDecode, RwTxn};
use obkv::{KvReader, KvWriter}; use obkv::{KvReader, KvWriter};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::helpers::{ use super::helpers::{
self, keep_first, merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, self, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values, valid_lmdb_key, CursorClonableMmap, KeepFirst, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps,
CursorClonableMmap, MergeIgnoreValues,
}; };
use super::MergeFn;
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::index::db_name::DOCUMENTS; use crate::index::db_name::DOCUMENTS;
@ -24,7 +23,7 @@ use crate::proximity::MAX_DISTANCE;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd}; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
use crate::update::facet::FacetsUpdate; use crate::update::facet::FacetsUpdate;
use crate::update::index_documents::helpers::{ use crate::update::index_documents::helpers::{
as_cloneable_grenad, keep_latest_obkv, try_split_array_at, as_cloneable_grenad, try_split_array_at, KeepLatestObkv,
}; };
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::{ use crate::{
@ -140,7 +139,7 @@ pub(crate) fn write_typed_chunk_into_index(
let vectors_fid = let vectors_fid =
fields_ids_map.id(crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME); fields_ids_map.id(crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME);
let mut builder = MergerBuilder::new(keep_latest_obkv as MergeFn); let mut builder = MergerBuilder::new(KeepLatestObkv);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::Documents(chunk) = typed_chunk else { let TypedChunk::Documents(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -234,7 +233,7 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db", "field_id_word_count_docids"); tracing::trace_span!(target: "indexing::write_db", "field_id_word_count_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdWordCountDocids(chunk) = typed_chunk else { let TypedChunk::FieldIdWordCountDocids(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -257,13 +256,10 @@ pub(crate) fn write_typed_chunk_into_index(
let span = tracing::trace_span!(target: "indexing::write_db", "word_docids"); let span = tracing::trace_span!(target: "indexing::write_db", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut word_docids_builder = let mut word_docids_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut exact_word_docids_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let mut exact_word_docids_builder = let mut word_fid_docids_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut fst_merger_builder = MergerBuilder::new(MergeIgnoreValues);
let mut word_fid_docids_builder =
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn);
let mut fst_merger_builder = MergerBuilder::new(merge_ignore_values as MergeFn);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::WordDocids { let TypedChunk::WordDocids {
word_docids_reader, word_docids_reader,
@ -328,7 +324,7 @@ pub(crate) fn write_typed_chunk_into_index(
let span = tracing::trace_span!(target: "indexing::write_db", "word_position_docids"); let span = tracing::trace_span!(target: "indexing::write_db", "word_position_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::WordPositionDocids(chunk) = typed_chunk else { let TypedChunk::WordPositionDocids(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -352,7 +348,7 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db","field_id_facet_number_docids"); tracing::trace_span!(target: "indexing::write_db","field_id_facet_number_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let mut data_size = 0; let mut data_size = 0;
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids) = typed_chunk let TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids) = typed_chunk
@ -374,10 +370,9 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db", "field_id_facet_string_docids"); tracing::trace_span!(target: "indexing::write_db", "field_id_facet_string_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut facet_id_string_builder = let mut facet_id_string_builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn);
let mut normalized_facet_id_string_builder = let mut normalized_facet_id_string_builder =
MergerBuilder::new(merge_deladd_btreeset_string as MergeFn); MergerBuilder::new(MergeDeladdBtreesetString);
let mut data_size = 0; let mut data_size = 0;
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdFacetStringDocids(( let TypedChunk::FieldIdFacetStringDocids((
@ -411,7 +406,7 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db", "field_id_facet_exists_docids"); tracing::trace_span!(target: "indexing::write_db", "field_id_facet_exists_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdFacetExistsDocids(chunk) = typed_chunk else { let TypedChunk::FieldIdFacetExistsDocids(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -435,7 +430,7 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_null_docids"); tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_null_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdFacetIsNullDocids(chunk) = typed_chunk else { let TypedChunk::FieldIdFacetIsNullDocids(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -458,7 +453,7 @@ pub(crate) fn write_typed_chunk_into_index(
let span = tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_empty_docids"); let span = tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_empty_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdFacetIsEmptyDocids(chunk) = typed_chunk else { let TypedChunk::FieldIdFacetIsEmptyDocids(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -482,7 +477,7 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db", "word_pair_proximity_docids"); tracing::trace_span!(target: "indexing::write_db", "word_pair_proximity_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); let mut builder = MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::WordPairProximityDocids(chunk) = typed_chunk else { let TypedChunk::WordPairProximityDocids(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -515,7 +510,7 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_numbers"); tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_numbers");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(keep_first as MergeFn); let mut builder = MergerBuilder::new(KeepFirst);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdDocidFacetNumbers(chunk) = typed_chunk else { let TypedChunk::FieldIdDocidFacetNumbers(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -549,7 +544,7 @@ pub(crate) fn write_typed_chunk_into_index(
tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_strings"); tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_strings");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(keep_first as MergeFn); let mut builder = MergerBuilder::new(KeepFirst);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::FieldIdDocidFacetStrings(chunk) = typed_chunk else { let TypedChunk::FieldIdDocidFacetStrings(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -582,7 +577,7 @@ pub(crate) fn write_typed_chunk_into_index(
let span = tracing::trace_span!(target: "indexing::write_db", "geo_points"); let span = tracing::trace_span!(target: "indexing::write_db", "geo_points");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = MergerBuilder::new(keep_first as MergeFn); let mut builder = MergerBuilder::new(KeepFirst);
for typed_chunk in typed_chunks { for typed_chunk in typed_chunks {
let TypedChunk::GeoPoints(chunk) = typed_chunk else { let TypedChunk::GeoPoints(chunk) = typed_chunk else {
unreachable!(); unreachable!();
@ -619,9 +614,9 @@ pub(crate) fn write_typed_chunk_into_index(
let span = tracing::trace_span!(target: "indexing::write_db", "vector_points"); let span = tracing::trace_span!(target: "indexing::write_db", "vector_points");
let _entered = span.enter(); let _entered = span.enter();
let mut remove_vectors_builder = MergerBuilder::new(keep_first as MergeFn); let mut remove_vectors_builder = MergerBuilder::new(KeepFirst);
let mut manual_vectors_builder = MergerBuilder::new(keep_first as MergeFn); let mut manual_vectors_builder = MergerBuilder::new(KeepFirst);
let mut embeddings_builder = MergerBuilder::new(keep_first as MergeFn); let mut embeddings_builder = MergerBuilder::new(KeepFirst);
let mut add_to_user_provided = RoaringBitmap::new(); let mut add_to_user_provided = RoaringBitmap::new();
let mut remove_from_user_provided = RoaringBitmap::new(); let mut remove_from_user_provided = RoaringBitmap::new();
let mut params = None; let mut params = None;
@ -786,9 +781,13 @@ fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
GeoPoint::new(xyz_point, (docid, point)) GeoPoint::new(xyz_point, (docid, point))
} }
fn merge_word_docids_reader_into_fst( fn merge_word_docids_reader_into_fst<MF>(
merger: Merger<CursorClonableMmap, MergeFn>, merger: Merger<CursorClonableMmap, MF>,
) -> Result<fst::Set<Vec<u8>>> { ) -> Result<fst::Set<Vec<u8>>>
where
MF: MergeFunction,
crate::Error: From<MF::Error>,
{
let mut iter = merger.into_stream_merger_iter()?; let mut iter = merger.into_stream_merger_iter()?;
let mut builder = fst::SetBuilder::memory(); let mut builder = fst::SetBuilder::memory();
@ -802,8 +801,8 @@ fn merge_word_docids_reader_into_fst(
/// Write provided entries in database using serialize_value function. /// Write provided entries in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database. /// merge_values function is used if an entry already exist in the database.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
fn write_entries_into_database<R, K, V, FS, FM>( fn write_entries_into_database<R, K, V, FS, FM, MF>(
merger: Merger<R, MergeFn>, merger: Merger<R, MF>,
database: &heed::Database<K, V>, database: &heed::Database<K, V>,
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,
serialize_value: FS, serialize_value: FS,
@ -813,6 +812,8 @@ where
R: io::Read + io::Seek, R: io::Read + io::Seek,
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>, FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>, FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
MF: MergeFunction,
crate::Error: From<MF::Error>,
{ {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let database = database.remap_types::<Bytes, Bytes>(); let database = database.remap_types::<Bytes, Bytes>();
@ -839,13 +840,15 @@ where
/// Akin to the `write_entries_into_database` function but specialized /// Akin to the `write_entries_into_database` function but specialized
/// for the case when we only index additional searchable fields only. /// for the case when we only index additional searchable fields only.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
fn write_proximity_entries_into_database_additional_searchables<R>( fn write_proximity_entries_into_database_additional_searchables<R, MF>(
merger: Merger<R, MergeFn>, merger: Merger<R, MF>,
database: &heed::Database<U8StrStrCodec, CboRoaringBitmapCodec>, database: &heed::Database<U8StrStrCodec, CboRoaringBitmapCodec>,
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,
) -> Result<()> ) -> Result<()>
where where
R: io::Read + io::Seek, R: io::Read + io::Seek,
MF: MergeFunction,
crate::Error: From<MF::Error>,
{ {
let mut iter = merger.into_stream_merger_iter()?; let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? { while let Some((key, value)) = iter.next()? {

View File

@ -2,10 +2,7 @@ pub use self::available_documents_ids::AvailableDocumentsIds;
pub use self::clear_documents::ClearDocuments; pub use self::clear_documents::ClearDocuments;
pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::{ pub use self::index_documents::*;
merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, DocumentAdditionResult, DocumentId,
IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, MergeFn,
};
pub use self::indexer_config::IndexerConfig; pub use self::indexer_config::IndexerConfig;
pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::settings::{validate_embedding_settings, Setting, Settings};
pub use self::update_step::UpdateIndexingStep; pub use self::update_step::UpdateIndexingStep;

View File

@ -159,7 +159,7 @@ impl DocumentSender {
} }
pub enum MergerOperation { pub enum MergerOperation {
WordDocidsCursors(Vec<grenad2::ReaderCursor<File>>), WordDocidsCursors(Vec<grenad::ReaderCursor<File>>),
} }
pub struct MergerReceiver(Receiver<MergerOperation>); pub struct MergerReceiver(Receiver<MergerOperation>);

View File

@ -1,7 +1,7 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::io; use std::io;
use grenad2::MergeFunction; use grenad::MergeFunction;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;

View File

@ -198,7 +198,7 @@ mod indexer {
} }
let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from)));
Ok(docids_version_offsets.into_par_iter().map_with( docids_version_offsets.into_par_iter().map_with(
items, items,
|context_pool, (external_docid, (internal_docid, operations))| { |context_pool, (external_docid, (internal_docid, operations))| {
context_pool.with(|rtxn| match self.method { context_pool.with(|rtxn| match self.method {
@ -221,7 +221,9 @@ mod indexer {
), ),
}) })
}, },
)) );
Ok(vec![].into_par_iter())
} }
} }
@ -334,13 +336,13 @@ mod indexer {
thread::scope(|s| { thread::scope(|s| {
thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
document_changes.into_par_iter().for_each(|_dc| ()); document_changes.into_par_iter().for_each(|_dc| ());
}); })?;
// TODO manage the errors correctly // TODO manage the errors correctly
thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || {
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index).unwrap() merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index).unwrap()
}); })?;
// TODO Split this code into another function // TODO Split this code into another function
for operation in writer_receiver { for operation in writer_receiver {
@ -426,7 +428,7 @@ mod indexer {
let sender = sender.word_docids(); let sender = sender.word_docids();
let database = index.word_docids.remap_types::<Bytes, Bytes>(); let database = index.word_docids.remap_types::<Bytes, Bytes>();
let mut builder = grenad2::MergerBuilder::new(merge::DelAddRoaringBitmapMerger); let mut builder = grenad::MergerBuilder::new(merge::DelAddRoaringBitmapMerger);
builder.extend(cursors); builder.extend(cursors);
/// TODO manage the error correctly /// TODO manage the error correctly
let mut merger_iter = builder.build().into_stream_merger_iter().unwrap(); let mut merger_iter = builder.build().into_stream_merger_iter().unwrap();

View File

@ -6,9 +6,8 @@ use heed::Database;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
use crate::update::index_documents::{ use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps, create_sorter, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, write_sorter_into_database, CursorClonableMmap, MergeDeladdCboRoaringBitmaps,
write_sorter_into_database, CursorClonableMmap, MergeFn,
}; };
use crate::{CboRoaringBitmapCodec, Result}; use crate::{CboRoaringBitmapCodec, Result};
@ -47,7 +46,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
)] )]
pub fn execute( pub fn execute(
self, self,
new_word_docids: grenad::Merger<CursorClonableMmap, MergeFn>, new_word_docids: grenad::Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>,
new_prefix_fst_words: &[String], new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]], common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>, del_prefix_fst_words: &HashSet<Vec<u8>>,
@ -56,7 +55,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
// and write into it at the same time, therefore we write into another file. // and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter( let mut prefix_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
self.chunk_compression_type, self.chunk_compression_type,
self.chunk_compression_level, self.chunk_compression_level,
self.max_nb_chunks, self.max_nb_chunks,
@ -139,7 +138,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
fn write_prefixes_in_sorter( fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>, prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>, sorter: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
) -> Result<()> { ) -> Result<()> {
for (key, data_slices) in prefixes.drain() { for (key, data_slices) in prefixes.drain() {
for data in data_slices { for data in data_slices {

View File

@ -11,9 +11,8 @@ use crate::heed_codec::StrBEU16Codec;
use crate::index::main_key::WORDS_PREFIXES_FST_KEY; use crate::index::main_key::WORDS_PREFIXES_FST_KEY;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
use crate::update::index_documents::{ use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps, create_sorter, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, write_sorter_into_database, CursorClonableMmap, MergeDeladdCboRoaringBitmaps,
write_sorter_into_database, CursorClonableMmap, MergeFn,
}; };
use crate::{CboRoaringBitmapCodec, Result}; use crate::{CboRoaringBitmapCodec, Result};
@ -52,7 +51,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
)] )]
pub fn execute( pub fn execute(
self, self,
new_word_integer_docids: grenad::Merger<CursorClonableMmap, MergeFn>, new_word_integer_docids: grenad::Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>,
new_prefix_fst_words: &[String], new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]], common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>, del_prefix_fst_words: &HashSet<Vec<u8>>,
@ -61,7 +60,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
let mut prefix_integer_docids_sorter = create_sorter( let mut prefix_integer_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, MergeDeladdCboRoaringBitmaps,
self.chunk_compression_type, self.chunk_compression_type,
self.chunk_compression_level, self.chunk_compression_level,
self.max_nb_chunks, self.max_nb_chunks,
@ -173,7 +172,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
fn write_prefixes_in_sorter( fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>, prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>, sorter: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
) -> Result<()> { ) -> Result<()> {
// TODO: Merge before insertion. // TODO: Merge before insertion.
for (key, data_slices) in prefixes.drain() { for (key, data_slices) in prefixes.drain() {