Rename facet codecs and refine FacetsUpdate API

This commit is contained in:
Loïc Lecrenier 2022-09-05 13:49:52 +02:00 committed by Loïc Lecrenier
parent 485a72306d
commit 330c9eb1b2
8 changed files with 133 additions and 138 deletions

View File

@ -4,7 +4,9 @@ use heed::Result;
use roaring::RoaringBitmap;
use super::{get_first_facet_value, get_highest_level};
use crate::heed_codec::facet::{ByteSliceRef, FacetGroupKey, FacetGroupValueCodec, FacetGroupKeyCodec};
use crate::heed_codec::facet::{
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec,
};
pub fn iterate_over_facet_distribution<'t, CB>(
rtxn: &'t heed::RoTxn<'t>,
@ -78,7 +80,8 @@ where
if level == 0 {
return self.iterate_level_0(candidates, starting_bound, group_size);
}
let starting_key = FacetGroupKey { field_id: self.field_id, level, left_bound: starting_bound };
let starting_key =
FacetGroupKey { field_id: self.field_id, level, left_bound: starting_bound };
let iter = self.db.range(&self.rtxn, &(&starting_key..)).unwrap().take(group_size);
for el in iter {
@ -109,16 +112,14 @@ where
#[cfg(test)]
mod tests {
use std::ops::ControlFlow;
use super::iterate_over_facet_distribution;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::milli_snap;
use crate::search::facet::test::FacetIndex;
use heed::BytesDecode;
use rand::{Rng, SeedableRng};
use roaring::RoaringBitmap;
use super::iterate_over_facet_distribution;
use crate::heed_codec::facet::ordered_f64_codec::OrderedF64Codec;
use crate::milli_snap;
use crate::search::facet::test::FacetIndex;
use std::ops::ControlFlow;
fn get_simple_index() -> FacetIndex<OrderedF64Codec> {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);

View File

@ -4,7 +4,9 @@ use heed::BytesEncode;
use roaring::RoaringBitmap;
use super::{get_first_facet_value, get_highest_level, get_last_facet_value};
use crate::heed_codec::facet::{FacetGroupValueCodec, FacetGroupKey, FacetGroupKeyCodec, ByteSliceRef};
use crate::heed_codec::facet::{
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec,
};
use crate::Result;
pub fn find_docids_of_facet_within_bounds<'t, BoundCodec>(
@ -117,7 +119,8 @@ impl<'t, 'b, 'bitmap> FacetRangeSearch<'t, 'b, 'bitmap> {
return self.run_level_0(starting_left_bound, group_size);
}
let left_key = FacetGroupKey { field_id: self.field_id, level, left_bound: starting_left_bound };
let left_key =
FacetGroupKey { field_id: self.field_id, level, left_bound: starting_left_bound };
let mut iter = self.db.range(&self.rtxn, &(left_key..))?.take(group_size);
let (mut previous_key, mut previous_value) = iter.next().unwrap()?;
@ -258,8 +261,8 @@ mod tests {
use roaring::RoaringBitmap;
use super::find_docids_of_facet_within_bounds;
use crate::heed_codec::facet::ordered_f64_codec::OrderedF64Codec;
use crate::heed_codec::facet::FacetGroupKeyCodec;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::milli_snap;
use crate::search::facet::test::FacetIndex;
use crate::snapshot_tests::display_bitmap;

View File

@ -3,7 +3,7 @@ use roaring::RoaringBitmap;
use super::{get_first_facet_value, get_highest_level};
use crate::heed_codec::facet::{
FacetGroupValue, FacetGroupValueCodec, FacetGroupKey, FacetGroupKeyCodec, ByteSliceRef,
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
pub fn ascending_facet_sort<'t>(
@ -86,7 +86,7 @@ mod tests {
use rand::{Rng, SeedableRng};
use roaring::RoaringBitmap;
use crate::heed_codec::facet::ordered_f64_codec::OrderedF64Codec;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::milli_snap;
use crate::search::facet::facet_sort_ascending::ascending_facet_sort;
use crate::search::facet::test::FacetIndex;

View File

@ -5,7 +5,7 @@ use roaring::RoaringBitmap;
use super::{get_first_facet_value, get_highest_level, get_last_facet_value};
use crate::heed_codec::facet::{
FacetGroupValue, FacetGroupValueCodec, FacetGroupKey, FacetGroupKeyCodec, ByteSliceRef,
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
pub fn descending_facet_sort<'t>(
@ -37,7 +37,9 @@ struct DescendingFacetSort<'t> {
field_id: u16,
stack: Vec<(
RoaringBitmap,
std::iter::Take<heed::RoRevRange<'t, FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>>,
std::iter::Take<
heed::RoRevRange<'t, FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>,
>,
Bound<&'t [u8]>,
)>,
}
@ -72,7 +74,8 @@ impl<'t> Iterator for DescendingFacetSort<'t> {
if level == 0 {
return Some(Ok(bitmap));
}
let starting_key_below = FacetGroupKey { field_id, level: level - 1, left_bound };
let starting_key_below =
FacetGroupKey { field_id, level: level - 1, left_bound };
let end_key_kelow = match *right_bound {
Bound::Included(right) => Bound::Included(FacetGroupKey {
@ -89,8 +92,10 @@ impl<'t> Iterator for DescendingFacetSort<'t> {
};
let prev_right_bound = *right_bound;
*right_bound = Bound::Excluded(left_bound);
let iter =
match self.db.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>().rev_range(
let iter = match self
.db
.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>()
.rev_range(
&self.rtxn,
&(Bound::Included(starting_key_below), end_key_kelow),
) {
@ -114,8 +119,8 @@ mod tests {
use rand::{Rng, SeedableRng};
use roaring::RoaringBitmap;
use crate::heed_codec::facet::ordered_f64_codec::OrderedF64Codec;
use crate::heed_codec::facet::{FacetGroupKeyCodec, ByteSliceRef};
use crate::heed_codec::facet::OrderedF64Codec;
use crate::heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec};
use crate::milli_snap;
use crate::search::facet::facet_sort_descending::descending_facet_sort;
use crate::search::facet::test::FacetIndex;

View File

@ -1,30 +1,24 @@
use crate::facet::FacetType;
use crate::heed_codec::facet::{
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
use crate::update::index_documents::{create_writer, writer_into_reader};
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::{BytesEncode, Error, RoTxn, RwTxn};
use log::debug;
use roaring::RoaringBitmap;
use std::borrow::Cow;
use std::cmp;
use std::fs::File;
use std::num::NonZeroUsize;
use grenad::CompressionType;
use heed::types::{ByteSlice, DecodeIgnore};
use heed::{BytesDecode, BytesEncode, Error, RoTxn, RwTxn};
use log::debug;
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use crate::error::InternalError;
use crate::facet::FacetType;
use crate::heed_codec::facet::{
FacetGroupValue, FacetGroupValueCodec, FacetGroupKey, FacetGroupKeyCodec, ByteSliceRef,
};
use crate::update::index_documents::{
create_writer, valid_lmdb_key, write_into_lmdb_database, writer_into_reader,
};
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
pub struct FacetsUpdateBulk<'i> {
index: &'i Index,
database: heed::Database<FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>,
level_group_size: usize,
min_level_size: usize,
level_group_size: u8,
min_level_size: u8,
facet_type: FacetType,
// None if level 0 does not need to be updated
new_data: Option<grenad::Reader<File>>,
@ -39,9 +33,9 @@ impl<'i> FacetsUpdateBulk<'i> {
FacetsUpdateBulk {
index,
database: match facet_type {
FacetType::String => {
index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>()
}
FacetType::String => index
.facet_id_string_docids
.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>(),
FacetType::Number => {
index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>()
}
@ -60,9 +54,9 @@ impl<'i> FacetsUpdateBulk<'i> {
FacetsUpdateBulk {
index,
database: match facet_type {
FacetType::String => {
index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>()
}
FacetType::String => index
.facet_id_string_docids
.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>(),
FacetType::Number => {
index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>()
}
@ -77,14 +71,14 @@ impl<'i> FacetsUpdateBulk<'i> {
/// The number of elements from the level below that are represented by a single element in the level above
///
/// This setting is always greater than or equal to 2.
pub fn level_group_size(&mut self, value: NonZeroUsize) -> &mut Self {
self.level_group_size = cmp::max(value.get(), 2);
pub fn level_group_size(mut self, value: u8) -> Self {
self.level_group_size = cmp::max(value, 2);
self
}
/// The minimum number of elements that a level is allowed to have.
pub fn min_level_size(&mut self, value: NonZeroUsize) -> &mut Self {
self.min_level_size = value.get();
pub fn min_level_size(mut self, value: u8) -> Self {
self.min_level_size = cmp::max(value, 1);
self
}
@ -109,8 +103,6 @@ impl<'i> FacetsUpdateBulk<'i> {
}
self.update_level0(wtxn)?;
// let mut nested_wtxn = self.index.env.nested_write_txn(wtxn)?;
for &field_id in faceted_fields.iter() {
let (level_readers, all_docids) = self.compute_levels_for_field_id(field_id, &wtxn)?;
@ -119,10 +111,6 @@ impl<'i> FacetsUpdateBulk<'i> {
for level_reader in level_readers {
let mut cursor = level_reader.into_cursor()?;
while let Some((k, v)) = cursor.move_on_next()? {
let key = FacetGroupKeyCodec::<DecodeIgnore>::bytes_decode(k).unwrap();
let value = FacetGroupValueCodec::bytes_decode(v).unwrap();
println!("inserting {key:?} {value:?}");
self.database.remap_types::<ByteSlice, ByteSlice>().put(wtxn, k, v)?;
}
}
@ -141,7 +129,6 @@ impl<'i> FacetsUpdateBulk<'i> {
let mut database = self.database.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
buffer.clear();
// the group size for level 0
buffer.push(1);
@ -149,16 +136,15 @@ impl<'i> FacetsUpdateBulk<'i> {
buffer.extend_from_slice(value);
unsafe { database.append(key, &buffer)? };
}
}
} else {
let mut buffer = Vec::new();
let database = self.database.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
// the value is a CboRoaringBitmap, but I still need to prepend the
// group size for level 0 (= 1) to it
buffer.clear();
// the group size for level 0
buffer.push(1);
// then we extend the buffer with the docids bitmap
match database.get(wtxn, key)? {
@ -176,7 +162,6 @@ impl<'i> FacetsUpdateBulk<'i> {
database.put(wtxn, key, &buffer)?;
}
}
}
Ok(())
}
@ -186,7 +171,7 @@ impl<'i> FacetsUpdateBulk<'i> {
field_id: FieldId,
txn: &RoTxn,
) -> Result<(Vec<grenad::Reader<File>>, RoaringBitmap)> {
// TODO: first check whether there is anything in level 0
// TODO: first check whether there is anything in level 0?
let algo = ComputeHigherLevels {
rtxn: txn,
db: &self.database,
@ -212,8 +197,8 @@ struct ComputeHigherLevels<'t> {
rtxn: &'t heed::RoTxn<'t>,
db: &'t heed::Database<FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>,
field_id: u16,
level_group_size: usize,
min_level_size: usize,
level_group_size: u8,
min_level_size: u8,
}
impl<'t> ComputeHigherLevels<'t> {
fn read_level_0(
@ -248,7 +233,7 @@ impl<'t> ComputeHigherLevels<'t> {
}
bitmaps.push(docids);
if bitmaps.len() == self.level_group_size {
if bitmaps.len() == self.level_group_size as usize {
handle_group(&bitmaps, left_bound)?;
first_iteration_for_new_group = true;
bitmaps.clear();
@ -265,9 +250,8 @@ impl<'t> ComputeHigherLevels<'t> {
/// Compute the content of the database levels from its level 0 for the given field id.
///
/// ## Returns:
/// 1. a vector of grenad::Reader. The reader at index `i` corresponds to the elements of level `i + 1`
/// A vector of grenad::Reader. The reader at index `i` corresponds to the elements of level `i + 1`
/// that must be inserted into the database.
/// 2. a roaring bitmap of all the document ids present in the database
fn compute_higher_levels(
&self,
level: u8,
@ -302,7 +286,7 @@ impl<'t> ComputeHigherLevels<'t> {
left_bounds.push(left_bound);
bitmaps.push(combined_bitmap);
if bitmaps.len() != self.level_group_size {
if bitmaps.len() != self.level_group_size as usize {
return Ok(());
}
let left_bound = left_bounds.first().unwrap();
@ -312,8 +296,8 @@ impl<'t> ComputeHigherLevels<'t> {
bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
{
let key = FacetGroupKey { field_id: self.field_id, level, left_bound };
let key =
FacetGroupKeyCodec::<ByteSliceRef>::bytes_encode(&key).ok_or(Error::Encoding)?;
let key = FacetGroupKeyCodec::<ByteSliceRef>::bytes_encode(&key)
.ok_or(Error::Encoding)?;
let value = FacetGroupValue { size: group_size, bitmap };
let value =
FacetGroupValueCodec::bytes_encode(&value).ok_or(Error::Encoding)?;
@ -330,8 +314,8 @@ impl<'t> ComputeHigherLevels<'t> {
bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
{
let key = FacetGroupKey { field_id: self.field_id, level, left_bound };
let key =
FacetGroupKeyCodec::<ByteSliceRef>::bytes_encode(&key).ok_or(Error::Encoding)?;
let key = FacetGroupKeyCodec::<ByteSliceRef>::bytes_encode(&key)
.ok_or(Error::Encoding)?;
let value = FacetGroupValue { size: group_size, bitmap };
let value = FacetGroupValueCodec::bytes_encode(&value).ok_or(Error::Encoding)?;
cur_writer.insert(key, value)?;
@ -340,6 +324,10 @@ impl<'t> ComputeHigherLevels<'t> {
}
if cur_writer_len > self.min_level_size {
sub_writers.push(writer_into_reader(cur_writer)?);
} else {
if !bitmaps.is_empty() {
handle_group(&bitmaps, left_bounds.first().unwrap())?;
}
}
return Ok(sub_writers);
}

View File

@ -3,7 +3,7 @@ use heed::{BytesDecode, Error, RoTxn, RwTxn};
use roaring::RoaringBitmap;
use crate::heed_codec::facet::{
FacetGroupValue, FacetGroupValueCodec, FacetGroupKey, FacetGroupKeyCodec, ByteSliceRef,
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
use crate::search::facet::get_highest_level;
use crate::Result;
@ -20,14 +20,26 @@ enum DeletionResult {
pub struct FacetsUpdateIncremental {
db: heed::Database<FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>,
group_size: usize,
min_level_size: usize,
max_group_size: usize,
group_size: u8,
min_level_size: u8,
max_group_size: u8,
}
impl FacetsUpdateIncremental {
pub fn new(db: heed::Database<FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>) -> Self {
Self { db, group_size: 4, min_level_size: 5, max_group_size: 8 }
}
pub fn group_size(mut self, size: u8) -> Self {
self.group_size = size;
self
}
pub fn min_level_size(mut self, size: u8) -> Self {
self.min_level_size = size;
self
}
pub fn max_group_size(mut self, size: u8) -> Self {
self.max_group_size = size;
self
}
}
impl FacetsUpdateIncremental {
fn find_insertion_key_value(
@ -178,12 +190,7 @@ impl FacetsUpdateIncremental {
let mut updated_value = self.db.get(&txn, &insertion_key.as_ref())?.unwrap();
updated_value.size += 1;
if updated_value.size as usize == max_group_size {
// need to split it
// recompute left element and right element
// replace current group by left element
// add one more group to the right
if updated_value.size == max_group_size {
let size_left = max_group_size / 2;
let size_right = max_group_size - size_left;
@ -201,7 +208,7 @@ impl FacetsUpdateIncremental {
)?
.unwrap();
let mut iter = self.db.range(&txn, &(start_key..))?.take(max_group_size);
let mut iter = self.db.range(&txn, &(start_key..))?.take(max_group_size as usize);
let group_left = {
let mut values_left = RoaringBitmap::new();
@ -234,8 +241,11 @@ impl FacetsUpdateIncremental {
values_right |= &value.bitmap;
}
let key =
FacetGroupKey { field_id, level, left_bound: right_start_key.unwrap().to_vec() };
let key = FacetGroupKey {
field_id,
level,
left_bound: right_start_key.unwrap().to_vec(),
};
let value = FacetGroupValue { size: size_right as u8, bitmap: values_right };
(key, value)
};
@ -288,7 +298,7 @@ impl FacetsUpdateIncremental {
.prefix_iter::<_, ByteSlice, ByteSlice>(&txn, &highest_level_prefix)?
.count();
if size_highest_level < self.group_size * self.min_level_size {
if size_highest_level < self.group_size as usize * self.min_level_size as usize {
return Ok(());
}
@ -438,7 +448,7 @@ impl FacetsUpdateIncremental {
.as_polymorph()
.prefix_iter::<_, ByteSlice, ByteSlice>(&txn, &highest_level_prefix)?
.count()
>= self.group_size
>= self.min_level_size as usize
{
return Ok(());
}
@ -450,7 +460,9 @@ impl FacetsUpdateIncremental {
while let Some(el) = iter.next() {
let (k, _) = el?;
to_delete.push(
FacetGroupKeyCodec::<ByteSliceRef>::bytes_decode(k).ok_or(Error::Encoding)?.into_owned(),
FacetGroupKeyCodec::<ByteSliceRef>::bytes_decode(k)
.ok_or(Error::Encoding)?
.into_owned(),
);
}
drop(iter);
@ -469,9 +481,9 @@ mod tests {
use rand::{Rng, SeedableRng};
use roaring::RoaringBitmap;
use crate::heed_codec::facet::ordered_f64_codec::OrderedF64Codec;
use crate::heed_codec::facet::str_ref::StrRefCodec;
use crate::heed_codec::facet::{FacetGroupValueCodec, FacetGroupKeyCodec, ByteSliceRef};
use crate::heed_codec::facet::OrderedF64Codec;
use crate::heed_codec::facet::StrRefCodec;
use crate::heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::milli_snap;
use crate::search::facet::get_highest_level;
use crate::search::facet::test::FacetIndex;

View File

@ -4,7 +4,6 @@ use crate::{
heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec},
CboRoaringBitmapCodec, FieldId, Index, Result,
};
use grenad::CompressionType;
use heed::BytesDecode;
use roaring::RoaringBitmap;
use std::{collections::HashMap, fs::File};
@ -42,26 +41,17 @@ impl<'i> FacetsUpdate<'i> {
}
}
// /// The number of elements from the level below that are represented by a single element in the level above
// ///
// /// This setting is always greater than or equal to 2.
// pub fn level_group_size(&mut self, value: u8) -> &mut Self {
// self.level_group_size = std::cmp::max(value, 2);
// self
// }
// /// The minimum number of elements that a level is allowed to have.
// pub fn min_level_size(&mut self, value: u8) -> &mut Self {
// self.min_level_size = std::cmp::max(value, 1);
// self
// }
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
// here, come up with a better condition!
if self.database.is_empty(wtxn)? {
let bulk_update = FacetsUpdateBulk::new(self.index, self.facet_type, self.new_data);
let bulk_update = FacetsUpdateBulk::new(self.index, self.facet_type, self.new_data)
.level_group_size(self.level_group_size)
.min_level_size(self.min_level_size);
bulk_update.execute(wtxn)?;
} else {
let indexer = FacetsUpdateIncremental::new(self.database);
let indexer = FacetsUpdateIncremental::new(self.database)
.max_group_size(self.max_level_group_size)
.min_level_size(self.min_level_size);
let mut new_faceted_docids = HashMap::<FieldId, RoaringBitmap>::default();

View File

@ -16,8 +16,4 @@ source: milli/src/update/facet/incremental.rs
0 0 k12 1 "[12, ]"
0 0 k13 1 "[13, ]"
0 0 k14 1 "[14, ]"
0 1 k0 4 "[0, 1, 2, 3, ]"
0 1 k4 4 "[4, 5, 6, 7, ]"
0 1 k8 4 "[8, 9, 10, 11, ]"
0 1 k12 3 "[12, 13, 14, ]"