Facets Bulk update

This commit is contained in:
Louis Dureuil 2023-10-19 11:56:42 +02:00
parent cd40f6b28c
commit 9b4627cc8b
No known key found for this signature in database
2 changed files with 38 additions and 24 deletions

View File

@ -1,9 +1,10 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::fs::File; use std::fs::File;
use grenad::CompressionType; use grenad::{CompressionType, Reader};
use heed::types::ByteSlice; use heed::types::ByteSlice;
use heed::{BytesEncode, Error, RoTxn, RwTxn}; use heed::{BytesEncode, Error, RoTxn, RwTxn};
use obkv::KvReader;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; use super::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
@ -12,6 +13,7 @@ use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
}; };
use crate::heed_codec::ByteSliceRefCodec; use crate::heed_codec::ByteSliceRefCodec;
use crate::update::del_add::DelAdd;
use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader}; use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result}; use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
@ -30,7 +32,7 @@ pub struct FacetsUpdateBulk<'i> {
facet_type: FacetType, facet_type: FacetType,
field_ids: Vec<FieldId>, field_ids: Vec<FieldId>,
// None if level 0 does not need to be updated // None if level 0 does not need to be updated
new_data: Option<grenad::Reader<File>>, delta_data: Option<grenad::Reader<File>>,
} }
impl<'i> FacetsUpdateBulk<'i> { impl<'i> FacetsUpdateBulk<'i> {
@ -38,7 +40,7 @@ impl<'i> FacetsUpdateBulk<'i> {
index: &'i Index, index: &'i Index,
field_ids: Vec<FieldId>, field_ids: Vec<FieldId>,
facet_type: FacetType, facet_type: FacetType,
new_data: grenad::Reader<File>, delta_data: grenad::Reader<File>,
group_size: u8, group_size: u8,
min_level_size: u8, min_level_size: u8,
) -> FacetsUpdateBulk<'i> { ) -> FacetsUpdateBulk<'i> {
@ -48,7 +50,7 @@ impl<'i> FacetsUpdateBulk<'i> {
group_size, group_size,
min_level_size, min_level_size,
facet_type, facet_type,
new_data: Some(new_data), delta_data: Some(delta_data),
} }
} }
@ -63,13 +65,13 @@ impl<'i> FacetsUpdateBulk<'i> {
group_size: FACET_GROUP_SIZE, group_size: FACET_GROUP_SIZE,
min_level_size: FACET_MIN_LEVEL_SIZE, min_level_size: FACET_MIN_LEVEL_SIZE,
facet_type, facet_type,
new_data: None, delta_data: None,
} }
} }
#[logging_timer::time("FacetsUpdateBulk::{}")] #[logging_timer::time("FacetsUpdateBulk::{}")]
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
let Self { index, field_ids, group_size, min_level_size, facet_type, new_data } = self; let Self { index, field_ids, group_size, min_level_size, facet_type, delta_data } = self;
let db = match facet_type { let db = match facet_type {
FacetType::String => index FacetType::String => index
@ -80,7 +82,7 @@ impl<'i> FacetsUpdateBulk<'i> {
} }
}; };
let inner = FacetsUpdateBulkInner { db, new_data, group_size, min_level_size }; let inner = FacetsUpdateBulkInner { db, delta_data, group_size, min_level_size };
inner.update(wtxn, &field_ids, |wtxn, field_id, all_docids| { inner.update(wtxn, &field_ids, |wtxn, field_id, all_docids| {
index.put_faceted_documents_ids(wtxn, field_id, facet_type, &all_docids)?; index.put_faceted_documents_ids(wtxn, field_id, facet_type, &all_docids)?;
@ -94,7 +96,7 @@ impl<'i> FacetsUpdateBulk<'i> {
/// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type /// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type
pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> { pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
pub db: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>, pub db: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
pub new_data: Option<grenad::Reader<R>>, pub delta_data: Option<grenad::Reader<R>>,
pub group_size: u8, pub group_size: u8,
pub min_level_size: u8, pub min_level_size: u8,
} }
@ -133,20 +135,26 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
Ok(()) Ok(())
} }
// TODO the new_data is an Reader<Obkv<Key, Obkv<DelAdd, RoaringBitmap>>>
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> { fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
let new_data = match self.new_data.take() { let delta_data = match self.delta_data.take() {
Some(x) => x, Some(x) => x,
None => return Ok(()), None => return Ok(()),
}; };
if self.db.is_empty(wtxn)? { if self.db.is_empty(wtxn)? {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let mut database = self.db.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>(); let mut database = self.db.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = new_data.into_cursor()?; let mut cursor = delta_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
if !valid_lmdb_key(key) { if !valid_lmdb_key(key) {
continue; continue;
} }
let value: KvReader<DelAdd> = KvReader::new(value);
// DB is empty, it is safe to ignore Del operations
let Some(value) = value.get(DelAdd::Addition) else {
continue;
};
buffer.clear(); buffer.clear();
// the group size for level 0 // the group size for level 0
buffer.push(1); buffer.push(1);
@ -158,11 +166,14 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let database = self.db.remap_types::<ByteSlice, ByteSlice>(); let database = self.db.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = new_data.into_cursor()?; let mut cursor = delta_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
if !valid_lmdb_key(key) { if !valid_lmdb_key(key) {
continue; continue;
} }
let value: KvReader<DelAdd> = KvReader::new(value);
// the value is a CboRoaringBitmap, but I still need to prepend the // the value is a CboRoaringBitmap, but I still need to prepend the
// group size for level 0 (= 1) to it // group size for level 0 (= 1) to it
buffer.clear(); buffer.clear();
@ -171,12 +182,15 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
match database.get(wtxn, key)? { match database.get(wtxn, key)? {
Some(prev_value) => { Some(prev_value) => {
let old_bitmap = &prev_value[1..]; let old_bitmap = &prev_value[1..];
CboRoaringBitmapCodec::merge_into( CboRoaringBitmapCodec::merge_deladd_into(value, old_bitmap, &mut buffer)?;
&[Cow::Borrowed(value), Cow::Borrowed(old_bitmap)],
&mut buffer,
)?;
} }
None => { None => {
// it is safe to ignore the del in that case.
let Some(value) = value.get(DelAdd::Addition) else {
// won't put the key in DB as the value would be empty
continue;
};
buffer.extend_from_slice(value); buffer.extend_from_slice(value);
} }
}; };

View File

@ -108,14 +108,14 @@ pub struct FacetsUpdate<'i> {
index: &'i Index, index: &'i Index,
database: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>, database: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
facet_type: FacetType, facet_type: FacetType,
new_data: grenad::Reader<File>, delta_data: grenad::Reader<File>,
group_size: u8, group_size: u8,
max_group_size: u8, max_group_size: u8,
min_level_size: u8, min_level_size: u8,
} }
impl<'i> FacetsUpdate<'i> { impl<'i> FacetsUpdate<'i> {
// TODO grenad::Reader<Key, Obkv<DelAdd, RoaringBitmap>> // TODO grenad::Reader<Key, Obkv<DelAdd, RoaringBitmap>>
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self { pub fn new(index: &'i Index, facet_type: FacetType, delta_data: grenad::Reader<File>) -> Self {
let database = match facet_type { let database = match facet_type {
FacetType::String => index FacetType::String => index
.facet_id_string_docids .facet_id_string_docids
@ -131,26 +131,26 @@ impl<'i> FacetsUpdate<'i> {
max_group_size: FACET_MAX_GROUP_SIZE, max_group_size: FACET_MAX_GROUP_SIZE,
min_level_size: FACET_MIN_LEVEL_SIZE, min_level_size: FACET_MIN_LEVEL_SIZE,
facet_type, facet_type,
new_data, delta_data,
} }
} }
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
if self.new_data.is_empty() { if self.delta_data.is_empty() {
return Ok(()); return Ok(());
} }
debug!("Computing and writing the facet values levels docids into LMDB on disk..."); debug!("Computing and writing the facet values levels docids into LMDB on disk...");
self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
// See self::comparison_bench::benchmark_facet_indexing // See self::comparison_bench::benchmark_facet_indexing
if self.new_data.len() >= (self.database.len(wtxn)? as u64 / 50) { if self.delta_data.len() >= (self.database.len(wtxn)? as u64 / 50) {
let field_ids = let field_ids =
self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::<Vec<_>>(); self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::<Vec<_>>();
let bulk_update = FacetsUpdateBulk::new( let bulk_update = FacetsUpdateBulk::new(
self.index, self.index,
field_ids, field_ids,
self.facet_type, self.facet_type,
self.new_data, self.delta_data,
self.group_size, self.group_size,
self.min_level_size, self.min_level_size,
); );
@ -159,7 +159,7 @@ impl<'i> FacetsUpdate<'i> {
let incremental_update = FacetsUpdateIncremental::new( let incremental_update = FacetsUpdateIncremental::new(
self.index, self.index,
self.facet_type, self.facet_type,
self.new_data, self.delta_data,
self.group_size, self.group_size,
self.min_level_size, self.min_level_size,
self.max_group_size, self.max_group_size,
@ -459,7 +459,7 @@ pub(crate) mod test_helpers {
let update = FacetsUpdateBulkInner { let update = FacetsUpdateBulkInner {
db: self.content, db: self.content,
new_data: Some(reader), delta_data: Some(reader),
group_size: self.group_size.get(), group_size: self.group_size.get(),
min_level_size: self.min_level_size.get(), min_level_size: self.min_level_size.get(),
}; };