diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index 3bd4cf5f5..8771cb6fe 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -1,7 +1,7 @@ use std::fs::File; use std::io::BufReader; -use grenad::CompressionType; +use grenad::{CompressionType, Merger}; use heed::types::Bytes; use heed::{BytesDecode, BytesEncode, Error, PutFlags, RoTxn, RwTxn}; use roaring::RoaringBitmap; @@ -14,6 +14,7 @@ use crate::heed_codec::facet::{ use crate::heed_codec::BytesRefCodec; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader}; +use crate::update::MergeFn; use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldId, Index, Result}; /// Algorithm to insert elememts into the `facet_id_(string/f64)_docids` databases @@ -28,7 +29,7 @@ pub struct FacetsUpdateBulk<'i> { facet_type: FacetType, field_ids: Vec, // None if level 0 does not need to be updated - delta_data: Option>>, + delta_data: Option, MergeFn>>, } impl<'i> FacetsUpdateBulk<'i> { @@ -36,7 +37,7 @@ impl<'i> FacetsUpdateBulk<'i> { index: &'i Index, field_ids: Vec, facet_type: FacetType, - delta_data: grenad::Reader>, + delta_data: Merger, MergeFn>, group_size: u8, min_level_size: u8, ) -> FacetsUpdateBulk<'i> { @@ -89,7 +90,7 @@ impl<'i> FacetsUpdateBulk<'i> { /// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type pub(crate) struct FacetsUpdateBulkInner { pub db: heed::Database, FacetGroupValueCodec>, - pub delta_data: Option>, + pub delta_data: Option>, pub group_size: u8, pub min_level_size: u8, } @@ -129,8 +130,8 @@ impl FacetsUpdateBulkInner { if self.db.is_empty(wtxn)? { let mut buffer = Vec::new(); let mut database = self.db.iter_mut(wtxn)?.remap_types::(); - let mut cursor = delta_data.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let mut iter = delta_data.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { if !valid_lmdb_key(key) { continue; } @@ -154,8 +155,8 @@ impl FacetsUpdateBulkInner { let mut buffer = Vec::new(); let database = self.db.remap_types::(); - let mut cursor = delta_data.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let mut iter = delta_data.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { if !valid_lmdb_key(key) { continue; } diff --git a/milli/src/update/facet/incremental.rs b/milli/src/update/facet/incremental.rs index 78db218e3..722ccb1cb 100644 --- a/milli/src/update/facet/incremental.rs +++ b/milli/src/update/facet/incremental.rs @@ -1,6 +1,7 @@ use std::fs::File; use std::io::BufReader; +use grenad::Merger; use heed::types::{Bytes, DecodeIgnore}; use heed::{BytesDecode, Error, RoTxn, RwTxn}; use obkv::KvReader; @@ -14,6 +15,7 @@ use crate::heed_codec::BytesRefCodec; use crate::search::facet::get_highest_level; use crate::update::del_add::DelAdd; use crate::update::index_documents::valid_lmdb_key; +use crate::update::MergeFn; use crate::{CboRoaringBitmapCodec, Index, Result}; enum InsertionResult { @@ -31,14 +33,14 @@ enum DeletionResult { /// `facet_id_(string/f64)_docids` databases. pub struct FacetsUpdateIncremental { inner: FacetsUpdateIncrementalInner, - delta_data: grenad::Reader>, + delta_data: Merger, MergeFn>, } impl FacetsUpdateIncremental { pub fn new( index: &Index, facet_type: FacetType, - delta_data: grenad::Reader>, + delta_data: Merger, MergeFn>, group_size: u8, min_level_size: u8, max_group_size: u8, @@ -61,16 +63,18 @@ impl FacetsUpdateIncremental { } } + #[logging_timer::time("FacetsUpdateIncremental::{}")] pub fn execute(self, wtxn: &mut RwTxn) -> crate::Result<()> { - let mut cursor = self.delta_data.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let mut iter = self.delta_data.into_stream_merger_iter()?; + + while let Some((key, value)) = iter.next()? { if !valid_lmdb_key(key) { continue; } + let key = FacetGroupKeyCodec::::bytes_decode(key) .map_err(heed::Error::Encoding)?; let value = KvReader::new(value); - let docids_to_delete = value .get(DelAdd::Deletion) .map(CboRoaringBitmapCodec::bytes_decode) diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index 400507c97..ed451c7ce 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -79,12 +79,9 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5; use std::collections::BTreeSet; use std::fs::File; use std::io::BufReader; -use std::iter::FromIterator; -use charabia::normalizer::{Normalize, NormalizerOption}; -use grenad::{CompressionType, SortAlgorithm}; -use heed::types::{Bytes, DecodeIgnore, SerdeJson}; -use heed::BytesEncode; +use grenad::Merger; +use heed::types::{Bytes, DecodeIgnore}; use time::OffsetDateTime; use tracing::debug; @@ -93,9 +90,9 @@ use super::FacetsUpdateBulk; use crate::facet::FacetType; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec}; use crate::heed_codec::BytesRefCodec; -use crate::update::index_documents::create_sorter; -use crate::update::merge_btreeset_string; -use crate::{BEU16StrCodec, Index, Result, MAX_FACET_VALUE_LENGTH}; +use crate::update::del_add::{DelAdd, KvReaderDelAdd}; +use crate::update::MergeFn; +use crate::{try_split_array_at, FieldId, Index, Result}; pub mod bulk; pub mod incremental; @@ -108,16 +105,20 @@ pub struct FacetsUpdate<'i> { index: &'i Index, database: heed::Database, FacetGroupValueCodec>, facet_type: FacetType, - delta_data: grenad::Reader>, + delta_data: Merger, MergeFn>, + normalized_delta_data: Option, MergeFn>>, group_size: u8, max_group_size: u8, min_level_size: u8, + data_size: u64, } impl<'i> FacetsUpdate<'i> { pub fn new( index: &'i Index, facet_type: FacetType, - delta_data: grenad::Reader>, + delta_data: Merger, MergeFn>, + normalized_delta_data: Option, MergeFn>>, + data_size: u64, ) -> Self { let database = match facet_type { FacetType::String => { @@ -135,18 +136,20 @@ impl<'i> FacetsUpdate<'i> { min_level_size: FACET_MIN_LEVEL_SIZE, facet_type, delta_data, + normalized_delta_data, + data_size, } } pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { - if self.delta_data.is_empty() { + if self.data_size == 0 { return Ok(()); } debug!("Computing and writing the facet values levels docids into LMDB on disk..."); self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; // See self::comparison_bench::benchmark_facet_indexing - if self.delta_data.len() >= (self.database.len(wtxn)? / 50) { + if self.data_size >= (self.database.len(wtxn)? / 50) { let field_ids = self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::>(); let bulk_update = FacetsUpdateBulk::new( @@ -170,92 +173,94 @@ impl<'i> FacetsUpdate<'i> { incremental_update.execute(wtxn)?; } - // We clear the list of normalized-for-search facets - // and the previous FSTs to compute everything from scratch - self.index.facet_id_normalized_string_strings.clear(wtxn)?; - self.index.facet_id_string_fst.clear(wtxn)?; + if let Some(normalized_delta_data) = self.normalized_delta_data { + let mut iter = normalized_delta_data.into_stream_merger_iter()?; + while let Some((key_bytes, delta_bytes)) = iter.next()? { + let deladd_reader = KvReaderDelAdd::new(delta_bytes); - // As we can't use the same write transaction to read and write in two different databases - // we must create a temporary sorter that we will write into LMDB afterward. - // As multiple unnormalized facet values can become the same normalized facet value - // we must merge them together. - let mut sorter = create_sorter( - SortAlgorithm::Unstable, - merge_btreeset_string, - CompressionType::None, - None, - None, - None, - ); + let database_set = self + .index + .facet_id_normalized_string_strings + .remap_key_type::() + .get(wtxn, &key_bytes)? + .unwrap_or_default(); - // We iterate on the list of original, semi-normalized, facet values - // and normalize them for search, inserting them in LMDB in any given order. - let options = NormalizerOption { lossy: true, ..Default::default() }; - let database = self.index.facet_id_string_docids.remap_data_type::(); - for result in database.iter(wtxn)? { - let (facet_group_key, ()) = result?; - if let FacetGroupKey { field_id, level: 0, left_bound } = facet_group_key { - let mut normalized_facet = left_bound.normalize(&options); - let normalized_truncated_facet: String; - if normalized_facet.len() > MAX_FACET_VALUE_LENGTH { - normalized_truncated_facet = normalized_facet - .char_indices() - .take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect(); - normalized_facet = normalized_truncated_facet.into(); + let add_set = deladd_reader + .get(DelAdd::Addition) + .and_then(|bytes| serde_json::from_slice::>(bytes).ok()) + .unwrap_or_default(); + + let del_set = match deladd_reader + .get(DelAdd::Deletion) + .and_then(|bytes| serde_json::from_slice::>(bytes).ok()) + { + Some(del_set) => { + let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap(); + let field_id = FieldId::from_be_bytes(field_id_bytes); + let mut set = BTreeSet::new(); + for facet in del_set { + let key = + FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() }; + // Check if the referenced value doesn't exist anymore before deleting it. + if self.index.facet_id_string_docids.get(wtxn, &key)?.is_none() { + set.insert(facet); + } + } + set + } + None => BTreeSet::new(), + }; + + let set: BTreeSet<_> = + database_set.difference(&del_set).chain(add_set.iter()).cloned().collect(); + + if set.is_empty() { + self.index + .facet_id_normalized_string_strings + .remap_key_type::() + .delete(wtxn, key_bytes)?; + } else { + self.index + .facet_id_normalized_string_strings + .remap_key_type::() + .put(wtxn, key_bytes, &set)?; } - let set = BTreeSet::from_iter(std::iter::once(left_bound)); - let key = (field_id, normalized_facet.as_ref()); - let key = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; - let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?; - sorter.insert(key, val)?; + } + + // We clear the FST of normalized-for-search to compute everything from scratch. + self.index.facet_id_string_fst.clear(wtxn)?; + // We compute one FST by string facet + let mut text_fsts = vec![]; + let mut current_fst: Option<(u16, fst::SetBuilder>)> = None; + let database = + self.index.facet_id_normalized_string_strings.remap_data_type::(); + for result in database.iter(wtxn)? { + let ((field_id, normalized_facet), _) = result?; + current_fst = match current_fst.take() { + Some((fid, fst_builder)) if fid != field_id => { + let fst = fst_builder.into_set(); + text_fsts.push((fid, fst)); + Some((field_id, fst::SetBuilder::memory())) + } + Some((field_id, fst_builder)) => Some((field_id, fst_builder)), + None => Some((field_id, fst::SetBuilder::memory())), + }; + + if let Some((_, fst_builder)) = current_fst.as_mut() { + fst_builder.insert(normalized_facet)?; + } + } + + if let Some((field_id, fst_builder)) = current_fst { + let fst = fst_builder.into_set(); + text_fsts.push((field_id, fst)); + } + + // We write those FSTs in LMDB now + for (field_id, fst) in text_fsts { + self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?; } } - - // In this loop we don't need to take care of merging bitmaps - // as the grenad sorter already merged them for us. - let mut merger_iter = sorter.into_stream_merger_iter()?; - while let Some((key_bytes, btreeset_bytes)) = merger_iter.next()? { - self.index.facet_id_normalized_string_strings.remap_types::().put( - wtxn, - key_bytes, - btreeset_bytes, - )?; - } - - // We compute one FST by string facet - let mut text_fsts = vec![]; - let mut current_fst: Option<(u16, fst::SetBuilder>)> = None; - let database = - self.index.facet_id_normalized_string_strings.remap_data_type::(); - for result in database.iter(wtxn)? { - let ((field_id, normalized_facet), _) = result?; - current_fst = match current_fst.take() { - Some((fid, fst_builder)) if fid != field_id => { - let fst = fst_builder.into_set(); - text_fsts.push((fid, fst)); - Some((field_id, fst::SetBuilder::memory())) - } - Some((field_id, fst_builder)) => Some((field_id, fst_builder)), - None => Some((field_id, fst::SetBuilder::memory())), - }; - - if let Some((_, fst_builder)) = current_fst.as_mut() { - fst_builder.insert(normalized_facet)?; - } - } - - if let Some((field_id, fst_builder)) = current_fst { - let fst = fst_builder.into_set(); - text_fsts.push((field_id, fst)); - } - - // We write those FSTs in LMDB now - for (field_id, fst) in text_fsts { - self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?; - } - Ok(()) } } @@ -268,6 +273,7 @@ pub(crate) mod test_helpers { use std::marker::PhantomData; use std::rc::Rc; + use grenad::MergerBuilder; use heed::types::Bytes; use heed::{BytesDecode, BytesEncode, Env, RoTxn, RwTxn}; use roaring::RoaringBitmap; @@ -280,7 +286,8 @@ pub(crate) mod test_helpers { use crate::search::facet::get_highest_level; use crate::snapshot_tests::display_bitmap; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; - use crate::update::FacetsUpdateIncrementalInner; + use crate::update::index_documents::merge_deladd_cbo_roaring_bitmaps; + use crate::update::{FacetsUpdateIncrementalInner, MergeFn}; use crate::CboRoaringBitmapCodec; /// Utility function to generate a string whose position in a lexicographically @@ -463,10 +470,13 @@ pub(crate) mod test_helpers { } writer.finish().unwrap(); let reader = grenad::Reader::new(std::io::Cursor::new(new_data)).unwrap(); + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + builder.push(reader.into_cursor().unwrap()); + let merger = builder.build(); let update = FacetsUpdateBulkInner { db: self.content, - delta_data: Some(reader), + delta_data: Some(merger), group_size: self.group_size.get(), min_level_size: self.min_level_size.get(), }; diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index d568154b2..dc4886f00 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -26,7 +26,7 @@ pub fn extract_docid_word_positions( obkv_documents: grenad::Reader, indexer: GrenadParameters, searchable_fields: &Option>, - stop_words: Option<&fst::Set<&[u8]>>, + stop_words: Option<&fst::Set>>, allowed_separators: Option<&[&str]>, dictionary: Option<&[&str]>, max_positions_per_attributes: Option, @@ -181,11 +181,11 @@ fn searchable_fields_changed( /// Factorize tokenizer building. fn tokenizer_builder<'a>( - stop_words: Option<&'a fst::Set<&[u8]>>, + stop_words: Option<&'a fst::Set>>, allowed_separators: Option<&'a [&str]>, dictionary: Option<&'a [&str]>, script_language: Option<&'a HashMap>>, -) -> TokenizerBuilder<'a, &'a [u8]> { +) -> TokenizerBuilder<'a, Vec> { let mut tokenizer_builder = TokenizerBuilder::new(); if let Some(stop_words) = stop_words { tokenizer_builder.stop_words(stop_words); @@ -211,7 +211,7 @@ fn lang_safe_tokens_from_document<'a>( obkv: &KvReader, searchable_fields: &Option>, tokenizer: &Tokenizer, - stop_words: Option<&fst::Set<&[u8]>>, + stop_words: Option<&fst::Set>>, allowed_separators: Option<&[&str]>, dictionary: Option<&[&str]>, max_positions_per_attributes: u32, diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index d14be7464..8fdd11ee7 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -1,15 +1,21 @@ +use std::collections::BTreeSet; use std::fs::File; use std::io::BufReader; +use std::iter::FromIterator; use std::{io, str}; +use charabia::normalizer::{Normalize, NormalizerOption}; +use heed::types::SerdeJson; use heed::BytesEncode; use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters}; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec}; -use crate::heed_codec::StrRefCodec; -use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd}; -use crate::update::index_documents::helpers::merge_deladd_cbo_roaring_bitmaps; -use crate::{FieldId, Result}; +use crate::heed_codec::{BEU16StrCodec, StrRefCodec}; +use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::index_documents::helpers::{ + merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, +}; +use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; /// Extracts the facet string and the documents ids where this facet string appear. /// @@ -19,10 +25,11 @@ use crate::{FieldId, Result}; pub fn extract_facet_string_docids( docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, -) -> Result>> { +) -> Result<(grenad::Reader>, grenad::Reader>)> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); + let options = NormalizerOption { lossy: true, ..Default::default() }; let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, @@ -30,12 +37,30 @@ pub fn extract_facet_string_docids( indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory, + max_memory.map(|m| m / 2), + ); + + let mut normalized_facet_string_docids_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + merge_deladd_btreeset_string, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory.map(|m| m / 2), ); let mut buffer = Vec::new(); let mut cursor = docid_fid_facet_string.into_cursor()?; while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? { + let deladd_reader = KvReaderDelAdd::new(deladd_original_value_bytes); + + // nothing to do if we delete and re-add the value. + if deladd_reader.get(DelAdd::Deletion).is_some() + && deladd_reader.get(DelAdd::Addition).is_some() + { + continue; + } + let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); let field_id = FieldId::from_be_bytes(field_id_bytes); @@ -44,17 +69,46 @@ pub fn extract_facet_string_docids( let document_id = u32::from_be_bytes(document_id_bytes); let normalized_value = str::from_utf8(normalized_value_bytes)?; + + // Facet search normalization + { + let mut hyper_normalized_value = normalized_value.normalize(&options); + let normalized_truncated_facet: String; + if hyper_normalized_value.len() > MAX_FACET_VALUE_LENGTH { + normalized_truncated_facet = hyper_normalized_value + .char_indices() + .take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect(); + hyper_normalized_value = normalized_truncated_facet.into(); + } + let set = BTreeSet::from_iter(std::iter::once(normalized_value)); + + buffer.clear(); + let mut obkv = KvWriterDelAdd::new(&mut buffer); + for (deladd_key, _) in deladd_reader.iter() { + let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?; + obkv.insert(deladd_key, val)?; + } + obkv.finish()?; + + let key = (field_id, hyper_normalized_value.as_ref()); + let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + normalized_facet_string_docids_sorter.insert(key_bytes, &buffer)?; + } + let key = FacetGroupKey { field_id, level: 0, left_bound: normalized_value }; let key_bytes = FacetGroupKeyCodec::::bytes_encode(&key).unwrap(); buffer.clear(); let mut obkv = KvWriterDelAdd::new(&mut buffer); - for (deladd_key, _) in KvReaderDelAdd::new(deladd_original_value_bytes).iter() { + for (deladd_key, _) in deladd_reader.iter() { obkv.insert(deladd_key, document_id.to_ne_bytes())?; } obkv.finish()?; facet_string_docids_sorter.insert(&key_bytes, &buffer)?; } - sorter_into_reader(facet_string_docids_sorter, indexer) + let normalized = sorter_into_reader(normalized_facet_string_docids_sorter, indexer)?; + sorter_into_reader(facet_string_docids_sorter, indexer).map(|s| (s, normalized)) } diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 44f54ff26..b8ff00125 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -15,7 +15,6 @@ use std::io::BufReader; use crossbeam_channel::Sender; use rayon::prelude::*; -use tracing::debug; use self::extract_docid_word_positions::extract_docid_word_positions; use self::extract_facet_number_docids::extract_facet_number_docids; @@ -29,10 +28,7 @@ use self::extract_vector_points::{ use self::extract_word_docids::extract_word_docids; use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids; use self::extract_word_position_docids::extract_word_position_docids; -use super::helpers::{ - as_cloneable_grenad, merge_deladd_cbo_roaring_bitmaps, CursorClonableMmap, GrenadParameters, - MergeFn, MergeableReader, -}; +use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; use crate::proximity::ProximityPrecision; use crate::vector::EmbeddingConfigs; @@ -52,7 +48,7 @@ pub(crate) fn data_from_obkv_documents( primary_key_id: FieldId, geo_fields_ids: Option<(FieldId, FieldId)>, field_id_map: FieldsIdsMap, - stop_words: Option>, + stop_words: Option>>, allowed_separators: Option<&[&str]>, dictionary: Option<&[&str]>, max_positions_per_attributes: Option, @@ -62,201 +58,154 @@ pub(crate) fn data_from_obkv_documents( ) -> Result<()> { puffin::profile_function!(); - original_obkv_chunks - .par_bridge() - .map(|original_documents_chunk| { - send_original_documents_data( - original_documents_chunk, - indexer, - lmdb_writer_sx.clone(), - field_id_map.clone(), - embedders.clone(), - ) - }) - .collect::>()?; - - #[allow(clippy::type_complexity)] - let result: Result<(Vec<_>, (Vec<_>, (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>)))))> = - flattened_obkv_chunks - .par_bridge() - .map(|flattened_obkv_chunks| { - send_and_extract_flattened_documents_data( - flattened_obkv_chunks, - indexer, - lmdb_writer_sx.clone(), - &searchable_fields, - &faceted_fields, - primary_key_id, - geo_fields_ids, - &stop_words, - &allowed_separators, - &dictionary, - max_positions_per_attributes, - ) - }) - .collect(); - - let ( - docid_word_positions_chunks, - ( - fid_docid_facet_numbers_chunks, - ( - fid_docid_facet_strings_chunks, - ( - facet_is_null_docids_chunks, - (facet_is_empty_docids_chunks, facet_exists_docids_chunks), - ), - ), - ), - ) = result?; - - // merge facet_exists_docids and send them as a typed chunk - { - let lmdb_writer_sx = lmdb_writer_sx.clone(); - rayon::spawn(move || { - debug!(database = "facet-id-exists-docids", "merge"); - match facet_exists_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) { - Ok(reader) => { - let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(reader))); - } - Err(e) => { - let _ = lmdb_writer_sx.send(Err(e)); - } - } - }); - } - - // merge facet_is_null_docids and send them as a typed chunk - { - let lmdb_writer_sx = lmdb_writer_sx.clone(); - rayon::spawn(move || { - debug!(database = "facet-id-is-null-docids", "merge"); - match facet_is_null_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) { - Ok(reader) => { - let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsNullDocids(reader))); - } - Err(e) => { - let _ = lmdb_writer_sx.send(Err(e)); - } - } - }); - } - - // merge facet_is_empty_docids and send them as a typed chunk - { - let lmdb_writer_sx = lmdb_writer_sx.clone(); - rayon::spawn(move || { - debug!(database = "facet-id-is-empty-docids", "merge"); - match facet_is_empty_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) { - Ok(reader) => { - let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsEmptyDocids(reader))); - } - Err(e) => { - let _ = lmdb_writer_sx.send(Err(e)); - } - } - }); - } - - if proximity_precision == ProximityPrecision::ByWord { - spawn_extraction_task::<_, _, Vec>>>( - docid_word_positions_chunks.clone(), - indexer, - lmdb_writer_sx.clone(), - extract_word_pair_proximity_docids, - merge_deladd_cbo_roaring_bitmaps, - TypedChunk::WordPairProximityDocids, - "word-pair-proximity-docids", - ); - } - - spawn_extraction_task::<_, _, Vec>>>( - docid_word_positions_chunks.clone(), - indexer, - lmdb_writer_sx.clone(), - extract_fid_word_count_docids, - merge_deladd_cbo_roaring_bitmaps, - TypedChunk::FieldIdWordCountDocids, - "field-id-wordcount-docids", - ); - - spawn_extraction_task::< - _, - _, - Vec<( - grenad::Reader>, - grenad::Reader>, - grenad::Reader>, - )>, - >( - docid_word_positions_chunks.clone(), - indexer, - lmdb_writer_sx.clone(), - move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes), - merge_deladd_cbo_roaring_bitmaps, - |(word_docids_reader, exact_word_docids_reader, word_fid_docids_reader)| { - TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, - word_fid_docids_reader, - } + let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join( + || { + original_obkv_chunks + .par_bridge() + .map(|original_documents_chunk| { + send_original_documents_data( + original_documents_chunk, + indexer, + lmdb_writer_sx.clone(), + field_id_map.clone(), + embedders.clone(), + ) + }) + .collect::>() + }, + || { + flattened_obkv_chunks + .par_bridge() + .map(|flattened_obkv_chunks| { + send_and_extract_flattened_documents_data( + flattened_obkv_chunks, + indexer, + lmdb_writer_sx.clone(), + &searchable_fields, + &faceted_fields, + primary_key_id, + geo_fields_ids, + &stop_words, + &allowed_separators, + &dictionary, + max_positions_per_attributes, + ) + }) + .map(|result| { + if let Ok(( + ref docid_word_positions_chunk, + (ref fid_docid_facet_numbers_chunk, ref fid_docid_facet_strings_chunk), + )) = result + { + run_extraction_task::<_, _, grenad::Reader>>( + docid_word_positions_chunk.clone(), + indexer, + lmdb_writer_sx.clone(), + extract_fid_word_count_docids, + TypedChunk::FieldIdWordCountDocids, + "field-id-wordcount-docids", + ); + + let exact_attributes = exact_attributes.clone(); + run_extraction_task::< + _, + _, + ( + grenad::Reader>, + grenad::Reader>, + grenad::Reader>, + ), + >( + docid_word_positions_chunk.clone(), + indexer, + lmdb_writer_sx.clone(), + move |doc_word_pos, indexer| { + extract_word_docids(doc_word_pos, indexer, &exact_attributes) + }, + |( + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + )| { + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } + }, + "word-docids", + ); + + run_extraction_task::<_, _, grenad::Reader>>( + docid_word_positions_chunk.clone(), + indexer, + lmdb_writer_sx.clone(), + extract_word_position_docids, + TypedChunk::WordPositionDocids, + "word-position-docids", + ); + + run_extraction_task::< + _, + _, + (grenad::Reader>, grenad::Reader>), + >( + fid_docid_facet_strings_chunk.clone(), + indexer, + lmdb_writer_sx.clone(), + extract_facet_string_docids, + TypedChunk::FieldIdFacetStringDocids, + "field-id-facet-string-docids", + ); + + run_extraction_task::<_, _, grenad::Reader>>( + fid_docid_facet_numbers_chunk.clone(), + indexer, + lmdb_writer_sx.clone(), + extract_facet_number_docids, + TypedChunk::FieldIdFacetNumberDocids, + "field-id-facet-number-docids", + ); + + if proximity_precision == ProximityPrecision::ByWord { + run_extraction_task::<_, _, grenad::Reader>>( + docid_word_positions_chunk.clone(), + indexer, + lmdb_writer_sx.clone(), + extract_word_pair_proximity_docids, + TypedChunk::WordPairProximityDocids, + "word-pair-proximity-docids", + ); + } + } + + Ok(()) + }) + .collect::>() }, - "word-docids", ); - spawn_extraction_task::<_, _, Vec>>>( - docid_word_positions_chunks.clone(), - indexer, - lmdb_writer_sx.clone(), - extract_word_position_docids, - merge_deladd_cbo_roaring_bitmaps, - TypedChunk::WordPositionDocids, - "word-position-docids", - ); - - spawn_extraction_task::<_, _, Vec>>>( - fid_docid_facet_strings_chunks, - indexer, - lmdb_writer_sx.clone(), - extract_facet_string_docids, - merge_deladd_cbo_roaring_bitmaps, - TypedChunk::FieldIdFacetStringDocids, - "field-id-facet-string-docids", - ); - - spawn_extraction_task::<_, _, Vec>>>( - fid_docid_facet_numbers_chunks, - indexer, - lmdb_writer_sx, - extract_facet_number_docids, - merge_deladd_cbo_roaring_bitmaps, - TypedChunk::FieldIdFacetNumberDocids, - "field-id-facet-number-docids", - ); - - Ok(()) + original_pipeline_result.and(flattened_pipeline_result) } /// Spawn a new task to extract data for a specific DB using extract_fn. /// Generated grenad chunks are merged using the merge_fn. /// The result of merged chunks is serialized as TypedChunk using the serialize_fn /// and sent into lmdb_writer_sx. -fn spawn_extraction_task( - chunks: Vec>, +fn run_extraction_task( + chunk: grenad::Reader, indexer: GrenadParameters, lmdb_writer_sx: Sender>, extract_fn: FE, - merge_fn: MergeFn, serialize_fn: FS, name: &'static str, ) where - FE: Fn(grenad::Reader, GrenadParameters) -> Result + FE: Fn(grenad::Reader, GrenadParameters) -> Result + Sync + Send + 'static, - FS: Fn(M::Output) -> TypedChunk + Sync + Send + 'static, - M: MergeableReader + FromParallelIterator + Send + 'static, - M::Output: Send, + FS: Fn(M) -> TypedChunk + Sync + Send + 'static, + M: Send, { let current_span = tracing::Span::current(); @@ -264,25 +213,16 @@ fn spawn_extraction_task( let child_span = tracing::trace_span!(target: "", parent: ¤t_span, "extract_multiple_chunks"); let _entered = child_span.enter(); - puffin::profile_scope!("extract_multiple_chunksdexing::details, ", name); - let chunks: Result = - chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect(); - let current_span = tracing::Span::current(); - - rayon::spawn(move || match chunks { - Ok(chunks) => { - let child_span = tracing::trace_span!(target: "", parent: ¤t_span, "merge_multiple_chunks"); - let _entered = child_span.enter(); - debug!(database = name, "merge"); - puffin::profile_scope!("merge_multiple_chunks", name); - let reader = chunks.merge(merge_fn, &indexer); - let _ = lmdb_writer_sx.send(reader.map(serialize_fn)); + puffin::profile_scope!("extract_multiple_chunks", name); + match extract_fn(chunk, indexer) { + Ok(chunk) => { + let _ = lmdb_writer_sx.send(Ok(serialize_fn(chunk))); } Err(e) => { let _ = lmdb_writer_sx.send(Err(e)); } - }) - }); + } + }) } /// Extract chunked data and send it into lmdb_writer_sx sender: @@ -340,7 +280,7 @@ fn send_original_documents_data( }); // TODO: create a custom internal error - lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))).unwrap(); + drop(lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk)))); Ok(()) } @@ -360,22 +300,13 @@ fn send_and_extract_flattened_documents_data( faceted_fields: &HashSet, primary_key_id: FieldId, geo_fields_ids: Option<(FieldId, FieldId)>, - stop_words: &Option>, + stop_words: &Option>>, allowed_separators: &Option<&[&str]>, dictionary: &Option<&[&str]>, max_positions_per_attributes: Option, ) -> Result<( grenad::Reader, - ( - grenad::Reader, - ( - grenad::Reader, - ( - grenad::Reader>, - (grenad::Reader>, grenad::Reader>), - ), - ), - ), + (grenad::Reader, grenad::Reader), )> { let flattened_documents_chunk = flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -446,16 +377,17 @@ fn send_and_extract_flattened_documents_data( fid_docid_facet_strings_chunk.clone(), ))); - Ok(( - fid_docid_facet_numbers_chunk, - ( - fid_docid_facet_strings_chunk, - ( - fid_facet_is_null_docids_chunk, - (fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk), - ), - ), - )) + let _ = lmdb_writer_sx + .send(Ok(TypedChunk::FieldIdFacetIsNullDocids(fid_facet_is_null_docids_chunk))); + + let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsEmptyDocids( + fid_facet_is_empty_docids_chunk, + ))); + + let _ = lmdb_writer_sx + .send(Ok(TypedChunk::FieldIdFacetExistsDocids(fid_facet_exists_docids_chunk))); + + Ok((fid_docid_facet_numbers_chunk, fid_docid_facet_strings_chunk)) }, ); diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index 3e63fcf77..b0e3654a9 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -90,90 +90,6 @@ pub unsafe fn as_cloneable_grenad( Ok(reader) } -pub trait MergeableReader -where - Self: Sized, -{ - type Output; - - fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result; -} - -impl MergeableReader for Vec>> { - type Output = grenad::Reader>; - - fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { - let mut merger = MergerBuilder::new(merge_fn); - self.into_iter().try_for_each(|r| merger.push(r))?; - merger.finish(params) - } -} - -impl MergeableReader for Vec<(grenad::Reader>, grenad::Reader>)> { - type Output = (grenad::Reader>, grenad::Reader>); - - fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { - let mut m1 = MergerBuilder::new(merge_fn); - let mut m2 = MergerBuilder::new(merge_fn); - for (r1, r2) in self.into_iter() { - m1.push(r1)?; - m2.push(r2)?; - } - Ok((m1.finish(params)?, m2.finish(params)?)) - } -} - -impl MergeableReader - for Vec<( - grenad::Reader>, - grenad::Reader>, - grenad::Reader>, - )> -{ - type Output = ( - grenad::Reader>, - grenad::Reader>, - grenad::Reader>, - ); - - fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { - let mut m1 = MergerBuilder::new(merge_fn); - let mut m2 = MergerBuilder::new(merge_fn); - let mut m3 = MergerBuilder::new(merge_fn); - for (r1, r2, r3) in self.into_iter() { - m1.push(r1)?; - m2.push(r2)?; - m3.push(r3)?; - } - Ok((m1.finish(params)?, m2.finish(params)?, m3.finish(params)?)) - } -} - -struct MergerBuilder(grenad::MergerBuilder); - -impl MergerBuilder { - fn new(merge_fn: MergeFn) -> Self { - Self(grenad::MergerBuilder::new(merge_fn)) - } - - fn push(&mut self, reader: grenad::Reader) -> Result<()> { - self.0.push(reader.into_cursor()?); - Ok(()) - } - - fn finish(self, params: &GrenadParameters) -> Result>> { - let merger = self.0.build(); - let mut writer = create_writer( - params.chunk_compression_type, - params.chunk_compression_level, - tempfile::tempfile()?, - ); - merger.write_into_stream_writer(&mut writer)?; - - writer_into_reader(writer) - } -} - #[derive(Debug, Clone, Copy)] pub struct GrenadParameters { pub chunk_compression_type: CompressionType, diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index d355ead68..7f5cc5dcd 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -35,27 +35,6 @@ pub fn merge_roaring_bitmaps<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Resul } } -pub fn merge_btreeset_string<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { - if values.len() == 1 { - Ok(values[0].clone()) - } else { - // TODO improve the perf by using a `#[borrow] Cow`. - let strings: BTreeSet = values - .iter() - .map(AsRef::as_ref) - .map(serde_json::from_slice::>) - .map(StdResult::unwrap) - .reduce(|mut current, new| { - for x in new { - current.insert(x); - } - current - }) - .unwrap(); - Ok(Cow::Owned(serde_json::to_vec(&strings).unwrap())) - } -} - pub fn keep_first<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { Ok(values[0].clone()) } @@ -243,3 +222,61 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>( buffer, )?) } + +pub fn merge_btreeset_string<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + // TODO improve the perf by using a `#[borrow] Cow`. + let strings: BTreeSet = values + .iter() + .map(AsRef::as_ref) + .map(serde_json::from_slice::>) + .map(StdResult::unwrap) + .reduce(|mut current, new| { + for x in new { + current.insert(x); + } + current + }) + .unwrap(); + Ok(Cow::Owned(serde_json::to_vec(&strings).unwrap())) + } +} + +/// Do a union of BtreeSet on both sides of a DelAdd obkv +/// separately and outputs a new DelAdd with both unions. +pub fn merge_deladd_btreeset_string<'a>( + _key: &[u8], + values: &[Cow<'a, [u8]>], +) -> Result> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + // Retrieve the bitmaps from both sides + let mut del_set = BTreeSet::new(); + let mut add_set = BTreeSet::new(); + for value in values { + let obkv = KvReaderDelAdd::new(value); + if let Some(bytes) = obkv.get(DelAdd::Deletion) { + let set = serde_json::from_slice::>(bytes).unwrap(); + for value in set { + del_set.insert(value); + } + } + if let Some(bytes) = obkv.get(DelAdd::Addition) { + let set = serde_json::from_slice::>(bytes).unwrap(); + for value in set { + add_set.insert(value); + } + } + } + + let mut output_deladd_obkv = KvWriterDelAdd::memory(); + let del = serde_json::to_vec(&del_set).unwrap(); + output_deladd_obkv.insert(DelAdd::Deletion, &del)?; + let add = serde_json::to_vec(&add_set).unwrap(); + output_deladd_obkv.insert(DelAdd::Addition, &add)?; + output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) + } +} diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 1e29c0240..b60f7be7d 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -10,13 +10,13 @@ use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::{ as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, merge_ignore_values, sorter_into_reader, write_sorter_into_database, writer_into_reader, - GrenadParameters, MergeableReader, + GrenadParameters, }; pub use merge_functions::{ keep_first, keep_latest_obkv, merge_btreeset_string, merge_cbo_roaring_bitmaps, - merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, - merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions, - obkvs_merge_additions_and_deletions, MergeFn, + merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps, + obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions, MergeFn, }; use crate::MAX_WORD_LENGTH; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 36aa94964..912ff2c2d 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -5,20 +5,21 @@ mod transform; mod typed_chunk; use std::collections::{HashMap, HashSet}; -use std::io::{Cursor, Read, Seek}; +use std::io::{Read, Seek}; use std::iter::FromIterator; use std::num::NonZeroU32; use std::result::Result as StdResult; use crossbeam_channel::{Receiver, Sender}; +use grenad::{Merger, MergerBuilder}; use heed::types::Str; use heed::Database; use rand::SeedableRng; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use slice_group_by::GroupBy; -use tracing::debug_span; -use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; +use tracing::debug; +use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk}; use self::enrich::enrich_documents_batch; pub use self::enrich::{extract_finite_float_from_value, DocumentId}; @@ -26,8 +27,7 @@ pub use self::helpers::{ as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, - merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader, - ClonableMmap, MergeFn, + merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; @@ -95,8 +95,8 @@ pub struct IndexDocumentsConfig { impl<'t, 'i, 'a, FP, FA> IndexDocuments<'t, 'i, 'a, FP, FA> where - FP: Fn(UpdateIndexingStep) + Sync, - FA: Fn() -> bool + Sync, + FP: Fn(UpdateIndexingStep) + Sync + Send, + FA: Fn() -> bool + Sync + Send, { pub fn new( wtxn: &'t mut heed::RwTxn<'i>, @@ -326,9 +326,6 @@ where } }; - let original_documents = grenad::Reader::new(original_documents)?; - let flattened_documents = grenad::Reader::new(flattened_documents)?; - // create LMDB writer channel let (lmdb_writer_sx, lmdb_writer_rx): ( Sender>, @@ -367,11 +364,7 @@ where let stop_words = self.index.stop_words(self.wtxn)?; let separators = self.index.allowed_separators(self.wtxn)?; - let separators: Option> = - separators.as_ref().map(|x| x.iter().map(String::as_str).collect()); let dictionary = self.index.dictionary(self.wtxn)?; - let dictionary: Option> = - dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect()); let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?; let proximity_precision = self.index.proximity_precision(self.wtxn)?.unwrap_or_default(); @@ -381,141 +374,202 @@ where max_memory: self.indexer_config.max_memory, max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen. }; - let documents_chunk_size = - self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 4); // 4MiB + let documents_chunk_size = match self.indexer_config.documents_chunk_size { + Some(chunk_size) => chunk_size, + None => { + let default_chunk_size = 1024 * 1024 * 4; // 4MiB + let min_chunk_size = 1024 * 512; // 512KiB + + // compute the chunk size from the number of available threads and the inputed data size. + let total_size = flattened_documents.metadata().map(|m| m.len()); + let current_num_threads = pool.current_num_threads(); + // if we have more than 2 thread, create a number of chunk equal to 3/4 threads count + let chunk_count = if current_num_threads > 2 { + (current_num_threads * 3 / 4).max(2) + } else { + current_num_threads + }; + total_size + .map_or(default_chunk_size, |size| (size as usize) / chunk_count) + .max(min_chunk_size) + } + }; + + let original_documents = grenad::Reader::new(original_documents)?; + let flattened_documents = grenad::Reader::new(flattened_documents)?; + let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; let cloned_embedder = self.embedders.clone(); + let mut final_documents_ids = RoaringBitmap::new(); + let mut databases_seen = 0; + let mut word_position_docids = None; + let mut word_fid_docids = None; + let mut word_docids = None; + let mut exact_word_docids = None; + let mut chunk_accumulator = ChunkAccumulator::default(); + let mut dimension = HashMap::new(); + let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap()); + let current_span = tracing::Span::current(); // Run extraction pipeline in parallel. pool.install(|| { - let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks"); + rayon::spawn(move || { + let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks"); let _enter = child_span.enter(); puffin::profile_scope!("extract_and_send_grenad_chunks"); - // split obkv file into several chunks - let original_chunk_iter = - grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size); + // split obkv file into several chunks + let original_chunk_iter = + grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size); - // split obkv file into several chunks - let flattened_chunk_iter = - grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size); + // split obkv file into several chunks + let flattened_chunk_iter = + grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size); - let result = original_chunk_iter.and_then(|original_chunk| { - let flattened_chunk = flattened_chunk_iter?; - // extract all databases from the chunked obkv douments - extract::data_from_obkv_documents( - original_chunk, - flattened_chunk, - pool_params, - lmdb_writer_sx.clone(), - searchable_fields, - faceted_fields, - primary_key_id, - geo_fields_ids, - field_id_map, - stop_words, - separators.as_deref(), - dictionary.as_deref(), - max_positions_per_attributes, - exact_attributes, - proximity_precision, - cloned_embedder, - ) + let separators: Option> = + separators.as_ref().map(|x| x.iter().map(String::as_str).collect()); + let dictionary: Option> = + dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect()); + let result = original_chunk_iter.and_then(|original_chunk| { + let flattened_chunk = flattened_chunk_iter?; + // extract all databases from the chunked obkv douments + extract::data_from_obkv_documents( + original_chunk, + flattened_chunk, + pool_params, + lmdb_writer_sx.clone(), + searchable_fields, + faceted_fields, + primary_key_id, + geo_fields_ids, + field_id_map, + stop_words, + separators.as_deref(), + dictionary.as_deref(), + max_positions_per_attributes, + exact_attributes, + proximity_precision, + cloned_embedder, + ) + }); + + if let Err(e) = result { + let _ = lmdb_writer_sx.send(Err(e)); + } + + // needs to be dropped to avoid channel waiting lock. + drop(lmdb_writer_sx); }); - if let Err(e) = result { - let _ = lmdb_writer_sx.send(Err(e)); - } + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); - // needs to be dropped to avoid channel waiting lock. - drop(lmdb_writer_sx); - }); + loop { + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } - let index_is_empty = self.index.number_of_documents(self.wtxn)? == 0; - let mut final_documents_ids = RoaringBitmap::new(); + match lmdb_writer_rx.clone().recv_timeout(std::time::Duration::from_millis(500)) { + Err(status) => { + if let Some(typed_chunks) = chunk_accumulator.pop_longest() { + let (docids, is_merged_database) = + write_typed_chunk_into_index(typed_chunks, self.index, self.wtxn)?; + if !docids.is_empty() { + final_documents_ids |= docids; + let documents_seen_count = final_documents_ids.len(); + (self.progress)(UpdateIndexingStep::IndexDocuments { + documents_seen: documents_seen_count as usize, + total_documents: documents_count, + }); + debug!(documents = documents_seen_count, total = documents_count, "Seen"); + } + if is_merged_database { + databases_seen += 1; + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + } + // If no more chunk remains in the chunk accumulator and the channel is disconected, break. + } else if status == crossbeam_channel::RecvTimeoutError::Disconnected { + break; + } + } + Ok(result) => { + let typed_chunk = match result? { + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } => { + let cloneable_chunk = + unsafe { as_cloneable_grenad(&word_docids_reader)? }; + let word_docids = word_docids.get_or_insert_with(|| { + MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn) + }); + word_docids.push(cloneable_chunk.into_cursor()?); + let cloneable_chunk = + unsafe { as_cloneable_grenad(&exact_word_docids_reader)? }; + let exact_word_docids = + exact_word_docids.get_or_insert_with(|| { + MergerBuilder::new( + merge_deladd_cbo_roaring_bitmaps as MergeFn, + ) + }); + exact_word_docids.push(cloneable_chunk.into_cursor()?); + let cloneable_chunk = + unsafe { as_cloneable_grenad(&word_fid_docids_reader)? }; + let word_fid_docids = word_fid_docids.get_or_insert_with(|| { + MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn) + }); + word_fid_docids.push(cloneable_chunk.into_cursor()?); + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } + } + TypedChunk::WordPositionDocids(chunk) => { + let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; + let word_position_docids = + word_position_docids.get_or_insert_with(|| { + MergerBuilder::new( + merge_deladd_cbo_roaring_bitmaps as MergeFn, + ) + }); + word_position_docids.push(cloneable_chunk.into_cursor()?); + TypedChunk::WordPositionDocids(chunk) + } + TypedChunk::VectorPoints { + expected_dimension, + remove_vectors, + embeddings, + manual_vectors, + embedder_name, + } => { + dimension.insert(embedder_name.clone(), expected_dimension); + TypedChunk::VectorPoints { + remove_vectors, + embeddings, + expected_dimension, + manual_vectors, + embedder_name, + } + } + otherwise => otherwise, + }; - let mut databases_seen = 0; - (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen, - total_databases: TOTAL_POSTING_DATABASE_COUNT, - }); - - let mut word_position_docids = None; - let mut word_fid_docids = None; - let mut word_docids = None; - let mut exact_word_docids = None; - - let mut dimension = HashMap::new(); - - for result in lmdb_writer_rx { - if (self.should_abort)() { - return Err(Error::InternalError(InternalError::AbortedIndexation)); - } - - let typed_chunk = match result? { - TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, - word_fid_docids_reader, - } => { - let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? }; - word_docids = Some(cloneable_chunk); - let cloneable_chunk = - unsafe { as_cloneable_grenad(&exact_word_docids_reader)? }; - exact_word_docids = Some(cloneable_chunk); - let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? }; - word_fid_docids = Some(cloneable_chunk); - TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, - word_fid_docids_reader, + chunk_accumulator.insert(typed_chunk); } } - TypedChunk::WordPositionDocids(chunk) => { - let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - word_position_docids = Some(cloneable_chunk); - TypedChunk::WordPositionDocids(chunk) - } - TypedChunk::VectorPoints { - expected_dimension, - remove_vectors, - embeddings, - manual_vectors, - embedder_name, - } => { - dimension.insert(embedder_name.clone(), expected_dimension); - TypedChunk::VectorPoints { - remove_vectors, - embeddings, - expected_dimension, - manual_vectors, - embedder_name, - } - } - otherwise => otherwise, - }; + } - let (docids, is_merged_database) = - write_typed_chunk_into_index(typed_chunk, self.index, self.wtxn, index_is_empty)?; - if !docids.is_empty() { - final_documents_ids |= docids; - let documents_seen_count = final_documents_ids.len(); - (self.progress)(UpdateIndexingStep::IndexDocuments { - documents_seen: documents_seen_count as usize, - total_documents: documents_count, - }); - debug_span!("Seen", documents = documents_seen_count, total = documents_count); - } - if is_merged_database { - databases_seen += 1; - (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen, - total_databases: TOTAL_POSTING_DATABASE_COUNT, - }); - } - } + Ok(()) + })?; // We write the field distribution into the main database self.index.put_field_distribution(self.wtxn, &field_distribution)?; @@ -548,10 +602,10 @@ where } self.execute_prefix_databases( - word_docids, - exact_word_docids, - word_position_docids, - word_fid_docids, + word_docids.map(MergerBuilder::build), + exact_word_docids.map(MergerBuilder::build), + word_position_docids.map(MergerBuilder::build), + word_fid_docids.map(MergerBuilder::build), )?; Ok(number_of_documents) @@ -565,10 +619,10 @@ where )] pub fn execute_prefix_databases( self, - word_docids: Option>, - exact_word_docids: Option>, - word_position_docids: Option>, - word_fid_docids: Option>, + word_docids: Option>, + exact_word_docids: Option>, + word_position_docids: Option>, + word_fid_docids: Option>, ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, @@ -751,7 +805,7 @@ where )] fn execute_word_prefix_docids( txn: &mut heed::RwTxn, - reader: grenad::Reader>, + merger: Merger, word_docids_db: Database, word_prefix_docids_db: Database, indexer_config: &IndexerConfig, @@ -761,13 +815,12 @@ fn execute_word_prefix_docids( ) -> Result<()> { puffin::profile_function!(); - let cursor = reader.into_cursor()?; let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db); builder.chunk_compression_type = indexer_config.chunk_compression_type; builder.chunk_compression_level = indexer_config.chunk_compression_level; builder.max_nb_chunks = indexer_config.max_nb_chunks; builder.max_memory = indexer_config.max_memory; - builder.execute(cursor, new_prefix_fst_words, common_prefix_fst_words, del_prefix_fst_words)?; + builder.execute(merger, new_prefix_fst_words, common_prefix_fst_words, del_prefix_fst_words)?; Ok(()) } diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index af828fee6..ef9b6707d 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -5,27 +5,60 @@ use std::io::{self, BufReader}; use bytemuck::allocation::pod_collect_to_vec; use charabia::{Language, Script}; -use grenad::MergerBuilder; +use grenad::{Merger, MergerBuilder}; use heed::types::Bytes; -use heed::{PutFlags, RwTxn}; +use heed::RwTxn; use obkv::{KvReader, KvWriter}; use roaring::RoaringBitmap; use super::helpers::{ - self, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values, - valid_lmdb_key, CursorClonableMmap, + self, keep_first, merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values, valid_lmdb_key, + CursorClonableMmap, }; -use super::{ClonableMmap, MergeFn}; +use super::MergeFn; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; use crate::index::db_name::DOCUMENTS; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; -use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at}; +use crate::update::index_documents::helpers::{ + as_cloneable_grenad, keep_latest_obkv, try_split_array_at, +}; use crate::{ lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, InternalError, Result, SerializationError, }; +/// This struct accumulates and group the TypedChunks +/// and is able to give the biggest accumulated group to index them all together +/// with a merger. +#[derive(Default)] +pub(crate) struct ChunkAccumulator { + inner: Vec>, +} + +impl ChunkAccumulator { + pub fn pop_longest(&mut self) -> Option> { + match self.inner.iter().max_by_key(|v| v.len()) { + Some(left) => { + let position = self.inner.iter().position(|right| left == right); + position.map(|p| self.inner.remove(p)).filter(|v| !v.is_empty()) + } + None => None, + } + } + + pub fn insert(&mut self, chunk: TypedChunk) { + match self.inner.iter().position(|right| Some(&chunk) == right.first()) { + Some(position) => { + let v = self.inner.get_mut(position).unwrap(); + v.push(chunk); + } + None => self.inner.push(vec![chunk]), + } + } +} + pub(crate) enum TypedChunk { FieldIdDocidFacetStrings(grenad::Reader), FieldIdDocidFacetNumbers(grenad::Reader), @@ -38,7 +71,7 @@ pub(crate) enum TypedChunk { }, WordPositionDocids(grenad::Reader>), WordPairProximityDocids(grenad::Reader>), - FieldIdFacetStringDocids(grenad::Reader>), + FieldIdFacetStringDocids((grenad::Reader>, grenad::Reader>)), FieldIdFacetNumberDocids(grenad::Reader>), FieldIdFacetExistsDocids(grenad::Reader>), FieldIdFacetIsNullDocids(grenad::Reader>), @@ -54,6 +87,34 @@ pub(crate) enum TypedChunk { ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>), } +impl PartialEq for TypedChunk { + fn eq(&self, other: &Self) -> bool { + use TypedChunk::*; + match (self, other) { + (FieldIdDocidFacetStrings(_), FieldIdDocidFacetStrings(_)) + | (FieldIdDocidFacetNumbers(_), FieldIdDocidFacetNumbers(_)) + | (Documents(_), Documents(_)) + | (FieldIdWordCountDocids(_), FieldIdWordCountDocids(_)) + | (WordDocids { .. }, WordDocids { .. }) + | (WordPositionDocids(_), WordPositionDocids(_)) + | (WordPairProximityDocids(_), WordPairProximityDocids(_)) + | (FieldIdFacetStringDocids(_), FieldIdFacetStringDocids(_)) + | (FieldIdFacetNumberDocids(_), FieldIdFacetNumberDocids(_)) + | (FieldIdFacetExistsDocids(_), FieldIdFacetExistsDocids(_)) + | (FieldIdFacetIsNullDocids(_), FieldIdFacetIsNullDocids(_)) + | (FieldIdFacetIsEmptyDocids(_), FieldIdFacetIsEmptyDocids(_)) + | (GeoPoints(_), GeoPoints(_)) + | (ScriptLanguageDocids(_), ScriptLanguageDocids(_)) => true, + ( + VectorPoints { embedder_name: left, expected_dimension: left_dim, .. }, + VectorPoints { embedder_name: right, expected_dimension: right_dim, .. }, + ) => left == right && left_dim == right_dim, + _ => false, + } + } +} +impl Eq for TypedChunk {} + impl TypedChunk { pub fn to_debug_string(&self) -> String { match self { @@ -85,7 +146,7 @@ impl TypedChunk { TypedChunk::WordPairProximityDocids(grenad) => { format!("WordPairProximityDocids {{ number_of_entries: {} }}", grenad.len()) } - TypedChunk::FieldIdFacetStringDocids(grenad) => { + TypedChunk::FieldIdFacetStringDocids((grenad, _)) => { format!("FieldIdFacetStringDocids {{ number_of_entries: {} }}", grenad.len()) } TypedChunk::FieldIdFacetNumberDocids(grenad) => { @@ -117,23 +178,32 @@ impl TypedChunk { /// Return new documents seen. #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] pub(crate) fn write_typed_chunk_into_index( - typed_chunk: TypedChunk, + typed_chunks: Vec, index: &Index, wtxn: &mut RwTxn, - index_is_empty: bool, ) -> Result<(RoaringBitmap, bool)> { - puffin::profile_function!(typed_chunk.to_debug_string()); + puffin::profile_function!(typed_chunks[0].to_debug_string()); let mut is_merged_database = false; - match typed_chunk { - TypedChunk::Documents(obkv_documents_iter) => { + match typed_chunks[0] { + TypedChunk::Documents(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "documents"); let _entered = span.enter(); + + let mut builder = MergerBuilder::new(keep_latest_obkv as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::Documents(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); let mut operations: Vec = Default::default(); let mut docids = index.documents_ids(wtxn)?; - let mut cursor = obkv_documents_iter.into_cursor()?; - while let Some((key, reader)) = cursor.move_on_next()? { + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, reader)) = iter.next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); let reader: KvReader = KvReader::new(reader); @@ -174,59 +244,91 @@ pub(crate) fn write_typed_chunk_into_index( external_documents_docids.apply(wtxn, operations)?; index.put_documents_ids(wtxn, &docids)?; } - TypedChunk::FieldIdWordCountDocids(fid_word_count_docids_iter) => { + TypedChunk::FieldIdWordCountDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "field_id_word_count_docids"); let _entered = span.enter(); - append_entries_into_database( - fid_word_count_docids_iter, + + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdWordCountDocids(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + + write_entries_into_database( + merger, &index.field_id_word_count_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } - TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, - word_fid_docids_reader, - } => { + TypedChunk::WordDocids { .. } => { let span = tracing::trace_span!(target: "indexing::write_db", "word_docids"); let _entered = span.enter(); - let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?; - append_entries_into_database( - word_docids_iter.clone(), + + let mut word_docids_builder = + MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut exact_word_docids_builder = + MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut word_fid_docids_builder = + MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut fst_merger_builder = MergerBuilder::new(merge_ignore_values as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } = typed_chunk + else { + unreachable!(); + }; + let clonable_word_docids = unsafe { as_cloneable_grenad(&word_docids_reader) }?; + let clonable_exact_word_docids = + unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?; + + word_docids_builder.push(word_docids_reader.into_cursor()?); + exact_word_docids_builder.push(exact_word_docids_reader.into_cursor()?); + word_fid_docids_builder.push(word_fid_docids_reader.into_cursor()?); + fst_merger_builder.push(clonable_word_docids.into_cursor()?); + fst_merger_builder.push(clonable_exact_word_docids.into_cursor()?); + } + + let word_docids_merger = word_docids_builder.build(); + write_entries_into_database( + word_docids_merger, &index.word_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; - let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?; - append_entries_into_database( - exact_word_docids_iter.clone(), + let exact_word_docids_merger = exact_word_docids_builder.build(); + write_entries_into_database( + exact_word_docids_merger, &index.exact_word_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; - let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?; - append_entries_into_database( - word_fid_docids_iter, + let word_fid_docids_merger = word_fid_docids_builder.build(); + write_entries_into_database( + word_fid_docids_merger, &index.word_fid_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; // create fst from word docids - let fst = merge_word_docids_reader_into_fst(word_docids_iter, exact_word_docids_iter)?; + let fst_merger = fst_merger_builder.build(); + let fst = merge_word_docids_reader_into_fst(fst_merger)?; let db_fst = index.words_fst(wtxn)?; // merge new fst with database fst @@ -237,98 +339,202 @@ pub(crate) fn write_typed_chunk_into_index( index.put_words_fst(wtxn, &fst)?; is_merged_database = true; } - TypedChunk::WordPositionDocids(word_position_docids_iter) => { + TypedChunk::WordPositionDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "word_position_docids"); let _entered = span.enter(); - append_entries_into_database( - word_position_docids_iter, + + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::WordPositionDocids(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + + write_entries_into_database( + merger, &index.word_position_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } - TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids_iter) => { + TypedChunk::FieldIdFacetNumberDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db","field_id_facet_number_docids"); let _entered = span.enter(); - let indexer = FacetsUpdate::new(index, FacetType::Number, facet_id_number_docids_iter); + + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut data_size = 0; + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids) = typed_chunk + else { + unreachable!(); + }; + + data_size += facet_id_number_docids.len(); + builder.push(facet_id_number_docids.into_cursor()?); + } + let merger = builder.build(); + + let indexer = FacetsUpdate::new(index, FacetType::Number, merger, None, data_size); indexer.execute(wtxn)?; is_merged_database = true; } - TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids_iter) => { + TypedChunk::FieldIdFacetStringDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "field_id_facet_string_docids"); let _entered = span.enter(); - let indexer = FacetsUpdate::new(index, FacetType::String, facet_id_string_docids_iter); + + let mut facet_id_string_builder = + MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + let mut normalized_facet_id_string_builder = + MergerBuilder::new(merge_deladd_btreeset_string as MergeFn); + let mut data_size = 0; + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdFacetStringDocids(( + facet_id_string_docids, + normalized_facet_id_string_docids, + )) = typed_chunk + else { + unreachable!(); + }; + + data_size += facet_id_string_docids.len(); + facet_id_string_builder.push(facet_id_string_docids.into_cursor()?); + normalized_facet_id_string_builder + .push(normalized_facet_id_string_docids.into_cursor()?); + } + let facet_id_string_merger = facet_id_string_builder.build(); + let normalized_facet_id_string_merger = normalized_facet_id_string_builder.build(); + + let indexer = FacetsUpdate::new( + index, + FacetType::String, + facet_id_string_merger, + Some(normalized_facet_id_string_merger), + data_size, + ); indexer.execute(wtxn)?; is_merged_database = true; } - TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => { + TypedChunk::FieldIdFacetExistsDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "field_id_facet_exists_docids"); let _entered = span.enter(); - append_entries_into_database( - facet_id_exists_docids, + + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdFacetExistsDocids(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + + write_entries_into_database( + merger, &index.facet_id_exists_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } - TypedChunk::FieldIdFacetIsNullDocids(facet_id_is_null_docids) => { + TypedChunk::FieldIdFacetIsNullDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_null_docids"); let _entered = span.enter(); - append_entries_into_database( - facet_id_is_null_docids, + + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdFacetIsNullDocids(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + + write_entries_into_database( + merger, &index.facet_id_is_null_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } - TypedChunk::FieldIdFacetIsEmptyDocids(facet_id_is_empty_docids) => { + TypedChunk::FieldIdFacetIsEmptyDocids(_) => { let span = tracing::trace_span!(target: "profile::indexing::write_db", "field_id_facet_is_empty_docids"); let _entered = span.enter(); - append_entries_into_database( - facet_id_is_empty_docids, + + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdFacetIsEmptyDocids(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + + write_entries_into_database( + merger, &index.facet_id_is_empty_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } - TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => { + TypedChunk::WordPairProximityDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "word_pair_proximity_docids"); let _entered = span.enter(); - append_entries_into_database( - word_pair_proximity_docids_iter, + + let mut builder = MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::WordPairProximityDocids(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + + write_entries_into_database( + merger, &index.word_pair_proximity_docids, wtxn, - index_is_empty, deladd_serialize_add_side, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?; is_merged_database = true; } - TypedChunk::FieldIdDocidFacetNumbers(fid_docid_facet_number) => { + TypedChunk::FieldIdDocidFacetNumbers(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_numbers"); let _entered = span.enter(); + + let mut builder = MergerBuilder::new(keep_first as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdDocidFacetNumbers(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + let index_fid_docid_facet_numbers = index.field_id_docid_facet_f64s.remap_types::(); - let mut cursor = fid_docid_facet_number.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { let reader = KvReaderDelAdd::new(value); if valid_lmdb_key(key) { match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { @@ -344,14 +550,25 @@ pub(crate) fn write_typed_chunk_into_index( } } } - TypedChunk::FieldIdDocidFacetStrings(fid_docid_facet_string) => { + TypedChunk::FieldIdDocidFacetStrings(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_strings"); let _entered = span.enter(); + + let mut builder = MergerBuilder::new(keep_first as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::FieldIdDocidFacetStrings(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + let index_fid_docid_facet_strings = index.field_id_docid_facet_strings.remap_types::(); - let mut cursor = fid_docid_facet_string.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { let reader = KvReaderDelAdd::new(value); if valid_lmdb_key(key) { match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { @@ -367,14 +584,25 @@ pub(crate) fn write_typed_chunk_into_index( } } } - TypedChunk::GeoPoints(geo_points) => { + TypedChunk::GeoPoints(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "geo_points"); let _entered = span.enter(); + + let mut builder = MergerBuilder::new(keep_first as MergeFn); + for typed_chunk in typed_chunks { + let TypedChunk::GeoPoints(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default(); let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?; - let mut cursor = geo_points.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { // convert the key back to a u32 (4 bytes) let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); @@ -393,15 +621,38 @@ pub(crate) fn write_typed_chunk_into_index( index.put_geo_rtree(wtxn, &rtree)?; index.put_geo_faceted_documents_ids(wtxn, &geo_faceted_docids)?; } - TypedChunk::VectorPoints { - remove_vectors, - manual_vectors, - embeddings, - expected_dimension, - embedder_name, - } => { + TypedChunk::VectorPoints { .. } => { let span = tracing::trace_span!(target: "indexing::write_db", "vector_points"); let _entered = span.enter(); + + let mut remove_vectors_builder = MergerBuilder::new(keep_first as MergeFn); + let mut manual_vectors_builder = MergerBuilder::new(keep_first as MergeFn); + let mut embeddings_builder = MergerBuilder::new(keep_first as MergeFn); + let mut params = None; + for typed_chunk in typed_chunks { + let TypedChunk::VectorPoints { + remove_vectors, + manual_vectors, + embeddings, + expected_dimension, + embedder_name, + } = typed_chunk + else { + unreachable!(); + }; + + params = Some((expected_dimension, embedder_name)); + + remove_vectors_builder.push(remove_vectors.into_cursor()?); + manual_vectors_builder.push(manual_vectors.into_cursor()?); + if let Some(embeddings) = embeddings { + embeddings_builder.push(embeddings.into_cursor()?); + } + } + + // typed chunks has always at least 1 chunk. + let Some((expected_dimension, embedder_name)) = params else { unreachable!() }; + let embedder_index = index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or( InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None }, )?; @@ -419,8 +670,9 @@ pub(crate) fn write_typed_chunk_into_index( let writers = writers?; // remove vectors for docids we want them removed - let mut cursor = remove_vectors.into_cursor()?; - while let Some((key, _)) = cursor.move_on_next()? { + let merger = remove_vectors_builder.build(); + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, _)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); for writer in &writers { @@ -432,40 +684,39 @@ pub(crate) fn write_typed_chunk_into_index( } // add generated embeddings - if let Some(embeddings) = embeddings { - let mut cursor = embeddings.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - let data = pod_collect_to_vec(value); - // it is a code error to have embeddings and not expected_dimension - let embeddings = - crate::vector::Embeddings::from_inner(data, expected_dimension) - // code error if we somehow got the wrong dimension - .unwrap(); + let merger = embeddings_builder.build(); + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { + let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); + let data = pod_collect_to_vec(value); + // it is a code error to have embeddings and not expected_dimension + let embeddings = crate::vector::Embeddings::from_inner(data, expected_dimension) + // code error if we somehow got the wrong dimension + .unwrap(); - if embeddings.embedding_count() > usize::from(u8::MAX) { - let external_docid = if let Ok(Some(Ok(index))) = index - .external_id_of(wtxn, std::iter::once(docid)) - .map(|it| it.into_iter().next()) - { - index - } else { - format!("internal docid={docid}") - }; - return Err(crate::Error::UserError(crate::UserError::TooManyVectors( - external_docid, - embeddings.embedding_count(), - ))); - } - for (embedding, writer) in embeddings.iter().zip(&writers) { - writer.add_item(wtxn, docid, embedding)?; - } + if embeddings.embedding_count() > usize::from(u8::MAX) { + let external_docid = if let Ok(Some(Ok(index))) = index + .external_id_of(wtxn, std::iter::once(docid)) + .map(|it| it.into_iter().next()) + { + index + } else { + format!("internal docid={docid}") + }; + return Err(crate::Error::UserError(crate::UserError::TooManyVectors( + external_docid, + embeddings.embedding_count(), + ))); + } + for (embedding, writer) in embeddings.iter().zip(&writers) { + writer.add_item(wtxn, docid, embedding)?; } } // perform the manual diff - let mut cursor = manual_vectors.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let merger = manual_vectors_builder.build(); + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { // convert the key back to a u32 (4 bytes) let (left, _index) = try_split_array_at(key).unwrap(); let docid = DocumentId::from_be_bytes(left); @@ -519,26 +770,30 @@ pub(crate) fn write_typed_chunk_into_index( tracing::debug!("Finished vector chunk for {}", embedder_name); } - TypedChunk::ScriptLanguageDocids(sl_map) => { + TypedChunk::ScriptLanguageDocids(_) => { let span = tracing::trace_span!(target: "indexing::write_db", "script_language_docids"); let _entered = span.enter(); - for (key, (deletion, addition)) in sl_map { - let mut db_key_exists = false; - let final_value = match index.script_language_docids.get(wtxn, &key)? { - Some(db_values) => { - db_key_exists = true; - (db_values - deletion) | addition - } - None => addition, - }; - if final_value.is_empty() { - // If the database entry exists, delete it. - if db_key_exists { - index.script_language_docids.delete(wtxn, &key)?; + for typed_chunk in typed_chunks { + let TypedChunk::ScriptLanguageDocids(sl_map) = typed_chunk else { unreachable!() }; + for (key, (deletion, addition)) in sl_map { + let mut db_key_exists = false; + let final_value = match index.script_language_docids.get(wtxn, &key)? { + Some(db_values) => { + db_key_exists = true; + (db_values - deletion) | addition + } + None => addition, + }; + + if final_value.is_empty() { + // If the database entry exists, delete it. + if db_key_exists { + index.script_language_docids.delete(wtxn, &key)?; + } + } else { + index.script_language_docids.put(wtxn, &key, &final_value)?; } - } else { - index.script_language_docids.put(wtxn, &key, &final_value)?; } } } @@ -557,13 +812,9 @@ fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint { } fn merge_word_docids_reader_into_fst( - word_docids_iter: grenad::Reader>, - exact_word_docids_iter: grenad::Reader>, + merger: Merger, ) -> Result>> { - let mut merger_builder = MergerBuilder::new(merge_ignore_values as MergeFn); - merger_builder.push(word_docids_iter.into_cursor()?); - merger_builder.push(exact_word_docids_iter.into_cursor()?); - let mut iter = merger_builder.build().into_stream_merger_iter()?; + let mut iter = merger.into_stream_merger_iter()?; let mut builder = fst::SetBuilder::memory(); while let Some((k, _)) = iter.next()? { @@ -577,10 +828,9 @@ fn merge_word_docids_reader_into_fst( /// merge_values function is used if an entry already exist in the database. #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] fn write_entries_into_database( - data: grenad::Reader, + merger: Merger, database: &heed::Database, wtxn: &mut RwTxn, - index_is_empty: bool, serialize_value: FS, merge_values: FM, ) -> Result<()> @@ -589,22 +839,17 @@ where FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec) -> Result>, { - puffin::profile_function!(format!("number of entries: {}", data.len())); - + puffin::profile_function!(); let mut buffer = Vec::new(); let database = database.remap_types::(); - let mut cursor = data.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { if valid_lmdb_key(key) { buffer.clear(); - let value = if index_is_empty { - Some(serialize_value(value, &mut buffer)?) - } else { - match database.get(wtxn, key)? { - Some(prev_value) => merge_values(value, prev_value, &mut buffer)?, - None => Some(serialize_value(value, &mut buffer)?), - } + let value = match database.get(wtxn, key)? { + Some(prev_value) => merge_values(value, prev_value, &mut buffer)?, + None => Some(serialize_value(value, &mut buffer)?), }; match value { Some(value) => database.put(wtxn, key, value)?, @@ -614,62 +859,5 @@ where } } } - - Ok(()) -} - -/// Write provided entries in database using serialize_value function. -/// merge_values function is used if an entry already exist in the database. -/// All provided entries must be ordered. -/// If the index is not empty, write_entries_into_database is called instead. -#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] -fn append_entries_into_database( - data: grenad::Reader, - database: &heed::Database, - wtxn: &mut RwTxn, - index_is_empty: bool, - serialize_value: FS, - merge_values: FM, -) -> Result<()> -where - R: io::Read + io::Seek, - FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, - FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec) -> Result>, - K: for<'a> heed::BytesDecode<'a>, -{ - puffin::profile_function!(format!("number of entries: {}", data.len())); - - if !index_is_empty { - return write_entries_into_database( - data, - database, - wtxn, - false, - serialize_value, - merge_values, - ); - } - - let mut buffer = Vec::new(); - let mut database = database.iter_mut(wtxn)?.remap_types::(); - - let mut cursor = data.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - if valid_lmdb_key(key) { - debug_assert!( - K::bytes_decode(key).is_ok(), - "Couldn't decode key with the database decoder, key length: {} - key bytes: {:x?}", - key.len(), - &key - ); - buffer.clear(); - let value = serialize_value(value, &mut buffer)?; - unsafe { - // safety: We do not keep a reference to anything that lives inside the database - database.put_current_with_options::(PutFlags::APPEND, key, value)? - }; - } - } - Ok(()) } diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 99c6c815e..1db066058 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -47,7 +47,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { )] pub fn execute( self, - mut new_word_docids_iter: grenad::ReaderCursor, + new_word_docids: grenad::Merger, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -68,7 +68,8 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { if !common_prefix_fst_words.is_empty() { let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((word, data)) = new_word_docids_iter.move_on_next()? { + let mut new_word_docids_iter = new_word_docids.into_stream_merger_iter()?; + while let Some((word, data)) = new_word_docids_iter.next()? { current_prefixes = match current_prefixes.take() { Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes), _otherwise => { diff --git a/milli/src/update/words_prefix_integer_docids.rs b/milli/src/update/words_prefix_integer_docids.rs index a05eb8721..272d465fd 100644 --- a/milli/src/update/words_prefix_integer_docids.rs +++ b/milli/src/update/words_prefix_integer_docids.rs @@ -52,7 +52,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { )] pub fn execute( self, - new_word_integer_docids: grenad::Reader, + new_word_integer_docids: grenad::Merger, new_prefix_fst_words: &[String], common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, @@ -69,14 +69,14 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { self.max_memory, ); - let mut new_word_integer_docids_iter = new_word_integer_docids.into_cursor()?; - if !common_prefix_fst_words.is_empty() { // We fetch all the new common prefixes between the previous and new prefix fst. let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some((key, data)) = new_word_integer_docids_iter.move_on_next()? { + let mut new_word_integer_docids_iter = + new_word_integer_docids.into_stream_merger_iter()?; + while let Some((key, data)) = new_word_integer_docids_iter.next()? { let (word, pos) = StrBEU16Codec::bytes_decode(key).map_err(heed::Error::Decoding)?;