From 9b4627cc8b3f96c0fc2bdedd9a6a2d453dca1eb8 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 19 Oct 2023 11:56:42 +0200 Subject: [PATCH] Facets Bulk update --- milli/src/update/facet/bulk.rs | 46 ++++++++++++++++++++++------------ milli/src/update/facet/mod.rs | 16 ++++++------ 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index 825217b00..c3dd84810 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -1,9 +1,10 @@ use std::borrow::Cow; use std::fs::File; -use grenad::CompressionType; +use grenad::{CompressionType, Reader}; use heed::types::ByteSlice; use heed::{BytesEncode, Error, RoTxn, RwTxn}; +use obkv::KvReader; use roaring::RoaringBitmap; use super::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; @@ -12,6 +13,7 @@ use crate::heed_codec::facet::{ FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec, }; use crate::heed_codec::ByteSliceRefCodec; +use crate::update::del_add::DelAdd; use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader}; use crate::{CboRoaringBitmapCodec, FieldId, Index, Result}; @@ -30,7 +32,7 @@ pub struct FacetsUpdateBulk<'i> { facet_type: FacetType, field_ids: Vec, // None if level 0 does not need to be updated - new_data: Option>, + delta_data: Option>, } impl<'i> FacetsUpdateBulk<'i> { @@ -38,7 +40,7 @@ impl<'i> FacetsUpdateBulk<'i> { index: &'i Index, field_ids: Vec, facet_type: FacetType, - new_data: grenad::Reader, + delta_data: grenad::Reader, group_size: u8, min_level_size: u8, ) -> FacetsUpdateBulk<'i> { @@ -48,7 +50,7 @@ impl<'i> FacetsUpdateBulk<'i> { group_size, min_level_size, facet_type, - new_data: Some(new_data), + delta_data: Some(delta_data), } } @@ -63,13 +65,13 @@ impl<'i> FacetsUpdateBulk<'i> { group_size: FACET_GROUP_SIZE, min_level_size: FACET_MIN_LEVEL_SIZE, facet_type, - new_data: None, + delta_data: None, } } #[logging_timer::time("FacetsUpdateBulk::{}")] pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { - let Self { index, field_ids, group_size, min_level_size, facet_type, new_data } = self; + let Self { index, field_ids, group_size, min_level_size, facet_type, delta_data } = self; let db = match facet_type { FacetType::String => index @@ -80,7 +82,7 @@ impl<'i> FacetsUpdateBulk<'i> { } }; - let inner = FacetsUpdateBulkInner { db, new_data, group_size, min_level_size }; + let inner = FacetsUpdateBulkInner { db, delta_data, group_size, min_level_size }; inner.update(wtxn, &field_ids, |wtxn, field_id, all_docids| { index.put_faceted_documents_ids(wtxn, field_id, facet_type, &all_docids)?; @@ -94,7 +96,7 @@ impl<'i> FacetsUpdateBulk<'i> { /// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type pub(crate) struct FacetsUpdateBulkInner { pub db: heed::Database, FacetGroupValueCodec>, - pub new_data: Option>, + pub delta_data: Option>, pub group_size: u8, pub min_level_size: u8, } @@ -133,20 +135,26 @@ impl FacetsUpdateBulkInner { Ok(()) } - // TODO the new_data is an Reader>> fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> { - let new_data = match self.new_data.take() { + let delta_data = match self.delta_data.take() { Some(x) => x, None => return Ok(()), }; if self.db.is_empty(wtxn)? { let mut buffer = Vec::new(); let mut database = self.db.iter_mut(wtxn)?.remap_types::(); - let mut cursor = new_data.into_cursor()?; + let mut cursor = delta_data.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { if !valid_lmdb_key(key) { continue; } + let value: KvReader = KvReader::new(value); + + // DB is empty, it is safe to ignore Del operations + let Some(value) = value.get(DelAdd::Addition) else { + continue; + }; + buffer.clear(); // the group size for level 0 buffer.push(1); @@ -158,11 +166,14 @@ impl FacetsUpdateBulkInner { let mut buffer = Vec::new(); let database = self.db.remap_types::(); - let mut cursor = new_data.into_cursor()?; + let mut cursor = delta_data.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { if !valid_lmdb_key(key) { continue; } + + let value: KvReader = KvReader::new(value); + // the value is a CboRoaringBitmap, but I still need to prepend the // group size for level 0 (= 1) to it buffer.clear(); @@ -171,12 +182,15 @@ impl FacetsUpdateBulkInner { match database.get(wtxn, key)? { Some(prev_value) => { let old_bitmap = &prev_value[1..]; - CboRoaringBitmapCodec::merge_into( - &[Cow::Borrowed(value), Cow::Borrowed(old_bitmap)], - &mut buffer, - )?; + CboRoaringBitmapCodec::merge_deladd_into(value, old_bitmap, &mut buffer)?; } None => { + // it is safe to ignore the del in that case. + let Some(value) = value.get(DelAdd::Addition) else { + // won't put the key in DB as the value would be empty + continue; + }; + buffer.extend_from_slice(value); } }; diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index 6631f431b..b3e66fdb3 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -108,14 +108,14 @@ pub struct FacetsUpdate<'i> { index: &'i Index, database: heed::Database, FacetGroupValueCodec>, facet_type: FacetType, - new_data: grenad::Reader, + delta_data: grenad::Reader, group_size: u8, max_group_size: u8, min_level_size: u8, } impl<'i> FacetsUpdate<'i> { // TODO grenad::Reader> - pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader) -> Self { + pub fn new(index: &'i Index, facet_type: FacetType, delta_data: grenad::Reader) -> Self { let database = match facet_type { FacetType::String => index .facet_id_string_docids @@ -131,26 +131,26 @@ impl<'i> FacetsUpdate<'i> { max_group_size: FACET_MAX_GROUP_SIZE, min_level_size: FACET_MIN_LEVEL_SIZE, facet_type, - new_data, + delta_data, } } pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { - if self.new_data.is_empty() { + if self.delta_data.is_empty() { return Ok(()); } debug!("Computing and writing the facet values levels docids into LMDB on disk..."); self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; // See self::comparison_bench::benchmark_facet_indexing - if self.new_data.len() >= (self.database.len(wtxn)? as u64 / 50) { + if self.delta_data.len() >= (self.database.len(wtxn)? as u64 / 50) { let field_ids = self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::>(); let bulk_update = FacetsUpdateBulk::new( self.index, field_ids, self.facet_type, - self.new_data, + self.delta_data, self.group_size, self.min_level_size, ); @@ -159,7 +159,7 @@ impl<'i> FacetsUpdate<'i> { let incremental_update = FacetsUpdateIncremental::new( self.index, self.facet_type, - self.new_data, + self.delta_data, self.group_size, self.min_level_size, self.max_group_size, @@ -459,7 +459,7 @@ pub(crate) mod test_helpers { let update = FacetsUpdateBulkInner { db: self.content, - new_data: Some(reader), + delta_data: Some(reader), group_size: self.group_size.get(), min_level_size: self.min_level_size.get(), };