Update Facets indexing to be compatible with new database structure

This commit is contained in:
Loïc Lecrenier 2022-08-30 14:03:18 +02:00 committed by Loïc Lecrenier
parent c3f49f766d
commit 7913d6365c
2 changed files with 178 additions and 477 deletions

View File

@ -1,168 +1,43 @@
/*!
This module initialises the databases that are used to quickly get the list
of documents with a faceted field value falling within a certain range. For
example, they can be used to implement filters such as `x >= 3`.
These databases are `facet_id_string_docids` and `facet_id_f64_docids`.
## Example with numbers
In the case of numbers, we start with a sorted list whose keys are
`(field_id, number_value)` and whose value is a roaring bitmap of the document ids
which contain the value `number_value` for the faceted field `field_id`.
From this list, we want to compute two things:
1. the bitmap of all documents that contain **any** number for each faceted field
2. a structure that allows us to use a (sort of) binary search to find all documents
containing numbers inside a certain range for a faceted field
To achieve goal (2), we recursively split the list into chunks. Every time we split it, we
create a new "level" that is several times smaller than the level below it. The base level,
level 0, is the starting list. Level 1 is composed of chunks of up to N elements. Each element
contains a range and a bitmap of docids. Level 2 is composed of chunks up to N^2 elements, etc.
For example, let's say we have 26 documents which we identify through the letters a-z.
We will focus on a single faceted field. When there are multiple faceted fields, the structure
described below is simply repeated for each field.
What we want to obtain is the following structure for each faceted field:
```text
all [a, b, c, d, e, f, g, u, y, z]
1.2 2 3.4 100 102 104
Level 2
[a, b, d, f, z] [c, d, e, f, g] [u, y]
1.2 1.3 1.6 2 3.4 12 12.3 100 102 104
Level 1
[a, b, d, z] [a, b, f] [c, d, g] [e, f] [u, y]
1.2 1.3 1.6 2 3.4 12 12.3 100 102 104
Level 0
[a, b] [d, z] [b, f] [a, f] [c, d] [g] [e] [e, f] [y] [u]
```
You can read more about this structure (for strings) in `[crate::search::facet::facet_strings]`.
To create the levels, we use a recursive algorithm which makes sure that we only need to iterate
over the elements of level 0 once. It is implemented by [`recursive_compute_levels`].
## Encoding
### Numbers
For numbers we use the same encoding for level 0 and the other levels.
The key is given by `FacetLevelValueF64Codec`. It consists of:
1. The field id : u16
2. The height of the level : u8
3. The start bound : f64
4. The end bound : f64
Note that at level 0, we have start bound == end bound.
The value is a serialised `RoaringBitmap`.
### Strings
For strings, we use a different encoding for level 0 and the other levels.
At level 0, the key is given by `FacetStringLevelZeroCodec`. It consists of:
1. The field id : u16
2. The height of the level : u8 <-- always == 0
3. The normalised string value : &str
And the value is given by `FacetStringLevelZeroValueCodec`. It consists of:
1. The original string
2. A serialised `RoaringBitmap`
At level 1, the key is given by `FacetLevelValueU32Codec`. It consists of:
1. The field id : u16
2. The height of the level : u8 <-- always >= 1
3. The start bound : u32
4. The end bound : u32
where the bounds are indices inside level 0.
The value is given by `FacetStringZeroBoundsValueCodec<CboRoaringBitmapCodec>`.
If the level is 1, then it consists of:
1. The normalised string of the start bound
2. The normalised string of the end bound
3. A serialised `RoaringBitmap`
If the level is higher, then it consists only of the serialised roaring bitmap.
The distinction between the value encoding of level 1 and the levels above it
is to allow us to retrieve the value in level 0 quickly by reading the key of
level 1 (we obtain the string value of the bound and execute a prefix search
in the database).
Therefore, for strings, the structure for a single faceted field looks more like this:
```text
all [a, b, c, d, e, f, g, u, y, z]
0 3 4 7 8 9
Level 2
[a, b, d, f, z] [c, d, e, f, g] [u, y]
0 1 2 3 4 5 6 7 8 9
Level 1 "ab" "ac" "ba" "bac" "gaf" "gal" "form" "wow" "woz" "zz"
[a, b, d, z] [a, b, f] [c, d, g] [e, f] [u, y]
"ab" "ac" "ba" "bac" "gaf" "gal" "form" "wow" "woz" "zz"
Level 0 "AB" " Ac" "ba " "Bac" " GAF" "gal" "Form" " wow" "woz" "ZZ"
[a, b] [d, z] [b, f] [a, f] [c, d] [g] [e] [e, f] [y] [u]
The first line in a cell is its key (without the field id and level height) and the last two
lines are its values.
```
*/
use std::cmp; use std::cmp;
use std::fs::File; use std::fs::File;
use std::num::{NonZeroU8, NonZeroUsize}; use std::num::NonZeroUsize;
use std::ops::RangeFrom;
use grenad::{CompressionType, Reader, Writer}; use grenad::CompressionType;
use heed::types::{ByteSlice, DecodeIgnore}; use heed::types::ByteSlice;
use heed::{BytesDecode, BytesEncode, Error}; use heed::{BytesEncode, Error, RoTxn};
use log::debug; use log::debug;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::error::InternalError; use crate::error::InternalError;
use crate::heed_codec::facet::new::ordered_f64_codec::OrderedF64Codec;
use crate::heed_codec::facet::new::str_ref::StrRefCodec;
use crate::heed_codec::facet::new::{ use crate::heed_codec::facet::new::{
FacetGroupValue, FacetGroupValueCodec, FacetKey, FacetKeyCodec, FacetGroupValue, FacetGroupValueCodec, FacetKey, FacetKeyCodec, MyByteSlice,
}; };
// use crate::heed_codec::CboRoaringBitmapCodec; // use crate::heed_codec::CboRoaringBitmapCodec;
use crate::update::index_documents::{create_writer, write_into_lmdb_database, writer_into_reader}; use crate::update::index_documents::{create_writer, write_into_lmdb_database, writer_into_reader};
use crate::{FieldId, Index, Result}; use crate::{FieldId, Index, Result};
pub struct Facets<'t, 'u, 'i> { pub struct Facets<'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
database: heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
pub(crate) chunk_compression_type: CompressionType, pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>, pub(crate) chunk_compression_level: Option<u32>,
level_group_size: NonZeroUsize, level_group_size: usize,
min_level_size: NonZeroUsize, min_level_size: usize,
} }
impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { impl<'i> Facets<'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Facets<'t, 'u, 'i> { pub fn new(
index: &'i Index,
database: heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
) -> Facets<'i> {
Facets { Facets {
wtxn,
index, index,
database,
chunk_compression_type: CompressionType::None, chunk_compression_type: CompressionType::None,
chunk_compression_level: None, chunk_compression_level: None,
level_group_size: NonZeroUsize::new(4).unwrap(), level_group_size: 4,
min_level_size: NonZeroUsize::new(5).unwrap(), min_level_size: 5,
} }
} }
@ -170,413 +45,233 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
/// ///
/// This setting is always greater than or equal to 2. /// This setting is always greater than or equal to 2.
pub fn level_group_size(&mut self, value: NonZeroUsize) -> &mut Self { pub fn level_group_size(&mut self, value: NonZeroUsize) -> &mut Self {
self.level_group_size = NonZeroUsize::new(cmp::max(value.get(), 2)).unwrap(); self.level_group_size = cmp::max(value.get(), 2);
self self
} }
/// The minimum number of elements that a level is allowed to have. /// The minimum number of elements that a level is allowed to have.
pub fn min_level_size(&mut self, value: NonZeroUsize) -> &mut Self { pub fn min_level_size(&mut self, value: NonZeroUsize) -> &mut Self {
self.min_level_size = value; self.min_level_size = value.get();
self self
} }
fn clear_levels(&self, wtxn: &mut heed::RwTxn, field_id: FieldId) -> Result<()> {
let left = FacetKey::<&[u8]> { field_id, level: 1, left_bound: &[] };
let right = FacetKey::<&[u8]> { field_id, level: u8::MAX, left_bound: &[] };
let range = left..=right;
self.database.delete_range(wtxn, &range).map(drop)?;
Ok(())
}
#[logging_timer::time("Facets::{}")] #[logging_timer::time("Facets::{}")]
pub fn execute(self) -> Result<()> { pub fn execute(mut self, wtxn: &mut heed::RwTxn) -> Result<()> {
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
// We get the faceted fields to be able to create the facet levels. // We get the faceted fields to be able to create the facet levels.
let faceted_fields = self.index.faceted_fields_ids(self.wtxn)?; let faceted_fields = self.index.faceted_fields_ids(wtxn)?.clone();
debug!("Computing and writing the facet values levels docids into LMDB on disk..."); debug!("Computing and writing the facet values levels docids into LMDB on disk...");
let mut nested_wtxn = self.index.env.nested_write_txn(self.wtxn).unwrap(); for &field_id in faceted_fields.iter() {
self.clear_levels(wtxn, field_id)?;
}
for field_id in faceted_fields { let mut nested_wtxn = self.index.env.nested_write_txn(wtxn)?;
// Clear the facet string levels.
// clear_field_string_levels(
// &mut nested_wtxn,
// self.index.facet_id_string_docids.remap_types::<ByteSlice, DecodeIgnore>(),
// field_id,
// )?;
let (facet_string_levels, string_documents_ids) = compute_facet_strings_levels( for &field_id in faceted_fields.iter() {
&mut nested_wtxn, let (level_readers, all_docids) =
self.index.facet_id_string_docids, self.compute_levels_for_field_id(field_id, &nested_wtxn)?;
self.chunk_compression_type,
self.chunk_compression_level,
self.level_group_size,
self.min_level_size,
field_id,
)?;
self.index.put_string_faceted_documents_ids( // TODO: this will need to be an argument to Facets as well
&mut nested_wtxn, self.index.put_string_faceted_documents_ids(&mut nested_wtxn, field_id, &all_docids)?;
field_id,
&string_documents_ids, for level_reader in level_readers {
)?; // TODO: append instead of write with merge
for facet_strings_level in facet_string_levels {
write_into_lmdb_database( write_into_lmdb_database(
&mut nested_wtxn, &mut nested_wtxn,
*self.index.facet_id_string_docids.as_polymorph(), *self.database.as_polymorph(),
facet_strings_level, level_reader,
|_, _| { |_, _| {
Err(InternalError::IndexingMergingKeys { process: "facet string levels" })? Err(InternalError::IndexingMergingKeys { process: "facet string levels" })?
}, },
)?; )?;
} }
// // Clear the facet number levels.
// clear_field_number_levels(&mut nested_wtxn, self.index.facet_id_f64_docids, field_id)?;
let (facet_number_levels, number_documents_ids) = compute_facet_number_levels(
&mut nested_wtxn,
self.index.facet_id_f64_docids,
self.chunk_compression_type,
self.chunk_compression_level,
self.level_group_size,
self.min_level_size,
field_id,
)?;
self.index.put_number_faceted_documents_ids(
&mut nested_wtxn,
field_id,
&number_documents_ids,
)?;
for facet_number_level in facet_number_levels {
write_into_lmdb_database(
&mut nested_wtxn,
*self.index.facet_id_f64_docids.as_polymorph(),
facet_number_level,
|_, _| {
Err(InternalError::IndexingMergingKeys { process: "facet number levels" })?
},
)?;
}
} }
Ok(()) Ok(())
} }
}
/// Compute the content of the database levels from its level 0 for the given field id. fn compute_levels_for_field_id(
/// &self,
/// ## Returns:
/// 1. a vector of grenad::Reader. The reader at index `i` corresponds to the elements of level `i + 1`
/// that must be inserted into the database.
/// 2. a roaring bitmap of all the document ids present in the database
fn compute_facet_number_levels<'t>(
rtxn: &'t mut heed::RwTxn,
db: heed::Database<FacetKeyCodec<OrderedF64Codec>, FacetGroupValueCodec>,
compression_type: CompressionType,
compression_level: Option<u32>,
level_group_size: NonZeroUsize,
min_level_size: NonZeroUsize,
field_id: FieldId, field_id: FieldId,
) -> Result<(Vec<Reader<File>>, RoaringBitmap)> { txn: &RoTxn,
let first_level_size = db ) -> Result<(Vec<grenad::Reader<File>>, RoaringBitmap)> {
.remap_key_type::<ByteSlice>() let algo = CreateFacetsAlgo {
.prefix_iter(rtxn, &field_id.to_be_bytes())? rtxn: txn,
.remap_types::<DecodeIgnore, DecodeIgnore>() db: &self.database,
.fold(Ok(0usize), |count, result| result.and(count).map(|c| c + 1))?;
let level_0_start = FacetKey { field_id, level: 0, left_bound: f64::MIN };
// Groups sizes are always a power of the original level_group_size and therefore a group
// always maps groups of the previous level and never splits previous levels groups in half.
let group_size_iter = (1u8..)
.map(|l| (l, level_group_size.get().pow(l as u32)))
.take_while(|(_, s)| first_level_size / *s >= min_level_size.get())
.collect::<Vec<_>>();
let mut number_document_ids = RoaringBitmap::new();
if let Some((top_level, _)) = group_size_iter.last() {
let subwriters = recursive_compute_levels::<OrderedF64Codec>(
rtxn,
db,
compression_type,
compression_level,
field_id, field_id,
*top_level, level_group_size: self.level_group_size,
level_0_start, min_level_size: self.min_level_size,
&(level_0_start..), chunk_compression_type: self.chunk_compression_type,
first_level_size, chunk_compression_level: self.chunk_compression_level,
level_group_size, };
&mut |bitmaps, _| { // TODO: first check whether there is anything in level 0
let mut all_docids = RoaringBitmap::new();
let subwriters = algo.compute_higher_levels(32, &mut |bitmaps, _| {
for bitmap in bitmaps { for bitmap in bitmaps {
number_document_ids |= bitmap; all_docids |= bitmap;
} }
Ok(()) Ok(())
}, })?;
)?; drop(algo);
Ok((subwriters, number_document_ids)) Ok((subwriters, all_docids))
} else {
let mut documents_ids = RoaringBitmap::new();
for result in db.range(rtxn, &(level_0_start..))?.take(first_level_size) {
let (_key, group_value) = result?;
documents_ids |= group_value.bitmap;
}
Ok((vec![], documents_ids))
} }
} }
/// Compute the content of the database levels from its level 0 for the given field id. pub struct CreateFacetsAlgo<'t> {
/// rtxn: &'t heed::RoTxn<'t>,
/// ## Returns: db: &'t heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
/// 1. a vector of grenad::Reader. The reader at index `i` corresponds to the elements of level `i + 1` chunk_compression_type: CompressionType,
/// that must be inserted into the database. chunk_compression_level: Option<u32>,
/// 2. a roaring bitmap of all the document ids present in the database field_id: u16,
fn compute_facet_strings_levels<'t>( level_group_size: usize,
rtxn: &'t mut heed::RwTxn, min_level_size: usize,
db: heed::Database<FacetKeyCodec<StrRefCodec>, FacetGroupValueCodec>,
compression_type: CompressionType,
compression_level: Option<u32>,
level_group_size: NonZeroUsize,
min_level_size: NonZeroUsize,
field_id: FieldId,
) -> Result<(Vec<Reader<File>>, RoaringBitmap)> {
let first_level_size = db
.remap_key_type::<ByteSlice>()
.prefix_iter(rtxn, &field_id.to_be_bytes())?
.remap_types::<DecodeIgnore, DecodeIgnore>()
.fold(Ok(0usize), |count, result| result.and(count).map(|c| c + 1))?;
let level_0_start = FacetKey { field_id, level: 0, left_bound: "" };
// Groups sizes are always a power of the original level_group_size and therefore a group
// always maps groups of the previous level and never splits previous levels groups in half.
let group_size_iter = (1u8..)
.map(|l| (l, level_group_size.get().pow(l as u32)))
.take_while(|(_, s)| first_level_size / *s >= min_level_size.get())
.collect::<Vec<_>>();
let mut strings_document_ids = RoaringBitmap::new();
if let Some((top_level, _)) = group_size_iter.last() {
let subwriters = recursive_compute_levels::<StrRefCodec>(
rtxn,
db,
compression_type,
compression_level,
field_id,
*top_level,
level_0_start,
&(level_0_start..),
first_level_size,
level_group_size,
&mut |bitmaps, _| {
for bitmap in bitmaps {
strings_document_ids |= bitmap;
}
Ok(())
},
)?;
Ok((subwriters, strings_document_ids))
} else {
let mut documents_ids = RoaringBitmap::new();
for result in db.range(rtxn, &(level_0_start..))?.take(first_level_size) {
let (_key, group_value) = result?;
documents_ids |= group_value.bitmap;
}
Ok((vec![], documents_ids))
}
} }
impl<'t> CreateFacetsAlgo<'t> {
/** fn read_level_0(
Compute a level from the levels below it, with the elements of level 0 already existing in the given `db`. &self,
handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
This function is generic to work with both numbers and strings. The generic type parameters are: ) -> Result<()> {
* `KeyCodec`/`ValueCodec`: the codecs used to read the elements of the database.
* `Bound`: part of the range in the levels structure. For example, for numbers, the `Bound` is `f64`
because each chunk in a level contains a range such as (1.2 ..= 4.5).
## Arguments
* `rtxn` : LMDB read transaction
* `db`: a database which already contains a `level 0`
* `compression_type`/`compression_level`: parameters used to create the `grenad::Writer` that
will contain the new levels
* `level` : the height of the level to create, or `0` to read elements from level 0.
* `level_0_start` : a key in the database that points to the beginning of its level 0
* `level_0_range` : equivalent to `level_0_start..`
* `level_0_size` : the number of elements in level 0
* `level_group_size` : the number of elements from the level below that are represented by a
single element of the new level
* `computed_group_bitmap` : a callback that is called whenever at most `level_group_size` elements
from the level below were read/created. Its arguments are:
0. the list of bitmaps from each read/created element of the level below
1. the start bound corresponding to the first element
2. the end bound corresponding to the last element
* `bound_from_db_key` : finds the `Bound` from a key in the database
* `bitmap_from_db_value` : finds the `RoaringBitmap` from a value in the database
* `write_entry` : writes an element of a level into the writer. The arguments are:
0. the writer
1. the height of the level
2. the start bound
3. the end bound
4. the docids of all elements between the start and end bound
## Return
A vector of grenad::Reader. The reader at index `i` corresponds to the elements of level `i + 1`
that must be inserted into the database.
*/
fn recursive_compute_levels<'t, BoundCodec>(
rtxn: &'t mut heed::RwTxn,
db: heed::Database<FacetKeyCodec<BoundCodec>, FacetGroupValueCodec>,
compression_type: CompressionType,
compression_level: Option<u32>,
field_id: FieldId,
level: u8,
level_0_start: FacetKey<<BoundCodec as BytesEncode<'t>>::EItem>,
level_0_range: &'t RangeFrom<FacetKey<<BoundCodec as BytesEncode<'t>>::EItem>>,
level_0_size: usize,
level_group_size: NonZeroUsize,
computed_group_bitmap: &mut dyn FnMut(
&[RoaringBitmap],
<BoundCodec as BytesEncode<'t>>::EItem,
) -> Result<()>,
) -> Result<Vec<Reader<File>>>
where
for<'a> BoundCodec:
BytesEncode<'a> + BytesDecode<'a, DItem = <BoundCodec as BytesEncode<'a>>::EItem>,
for<'a> <BoundCodec as BytesEncode<'a>>::EItem: Copy + Sized,
{
if level == 0 {
// base case for the recursion
// we read the elements one by one and // we read the elements one by one and
// 1. keep track of the start and end bounds // 1. keep track of the left bound
// 2. fill the `bitmaps` vector to give it to level 1 once `level_group_size` elements were read // 2. fill the `bitmaps` vector to give it to level 1 once `level_group_size` elements were read
let mut bitmaps = vec![]; let mut bitmaps = vec![];
let mut start_bound = level_0_start.left_bound; let mut level_0_prefix = vec![];
// let mut end_bound = level_0_start.bound; level_0_prefix.extend_from_slice(&self.field_id.to_be_bytes());
level_0_prefix.push(0);
let level_0_iter = self
.db
.as_polymorph()
.prefix_iter::<_, ByteSlice, ByteSlice>(self.rtxn, level_0_prefix.as_slice())?
.remap_types::<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>();
let mut left_bound: &[u8] = &[];
let mut first_iteration_for_new_group = true; let mut first_iteration_for_new_group = true;
for (i, db_result_item) in db.range(rtxn, level_0_range)?.take(level_0_size).enumerate() { for el in level_0_iter {
let (key, value) = db_result_item?; let (key, value) = el?;
let bound = key.left_bound; let bound = key.left_bound;
let docids = value.bitmap; let docids = value.bitmap;
if first_iteration_for_new_group { if first_iteration_for_new_group {
start_bound = bound; left_bound = bound;
first_iteration_for_new_group = false; first_iteration_for_new_group = false;
} }
// end_bound = bound;
bitmaps.push(docids); bitmaps.push(docids);
if bitmaps.len() == level_group_size.get() { if bitmaps.len() == self.level_group_size {
computed_group_bitmap(&bitmaps, start_bound)?; handle_group(&bitmaps, left_bound);
first_iteration_for_new_group = true; first_iteration_for_new_group = true;
bitmaps.clear(); bitmaps.clear();
} }
} }
// don't forget to give the leftover bitmaps as well // don't forget to give the leftover bitmaps as well
if !bitmaps.is_empty() { if !bitmaps.is_empty() {
computed_group_bitmap(&bitmaps, start_bound)?; handle_group(&bitmaps, left_bound);
bitmaps.clear(); bitmaps.clear();
} }
// level 0 is already stored in the DB Ok(())
}
/// Compute the content of the database levels from its level 0 for the given field id.
///
/// ## Returns:
/// 1. a vector of grenad::Reader. The reader at index `i` corresponds to the elements of level `i + 1`
/// that must be inserted into the database.
/// 2. a roaring bitmap of all the document ids present in the database
fn compute_higher_levels(
&self,
level: u8,
handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
) -> Result<Vec<grenad::Reader<File>>> {
if level == 0 {
self.read_level_0(handle_group);
// Level 0 is already in the database
return Ok(vec![]); return Ok(vec![]);
} else { }
// level >= 1 // level >= 1
// we compute each element of this level based on the elements of the level below it // we compute each element of this level based on the elements of the level below it
// once we have computed `level_group_size` elements, we give the start and end bounds // once we have computed `level_group_size` elements, we give the left bound
// of those elements, and their bitmaps, to the level above // of those elements, and their bitmaps, to the level above
let mut cur_writer = let mut cur_writer = create_writer(
create_writer(compression_type, compression_level, tempfile::tempfile()?); self.chunk_compression_type,
self.chunk_compression_level,
tempfile::tempfile()?,
);
let mut cur_writer_len = 0;
let mut range_for_bitmaps = vec![]; let mut group_sizes = vec![];
let mut left_bounds = vec![];
let mut bitmaps = vec![]; let mut bitmaps = vec![];
// compute the levels below // compute the levels below
// in the callback, we fill `cur_writer` with the correct elements for this level // in the callback, we fill `cur_writer` with the correct elements for this level
let mut sub_writers = recursive_compute_levels( let mut sub_writers =
rtxn, self.compute_higher_levels(level - 1, &mut |sub_bitmaps, left_bound| {
db,
compression_type,
compression_level,
field_id,
level - 1,
level_0_start,
level_0_range,
level_0_size,
level_group_size,
&mut |sub_bitmaps: &[RoaringBitmap],
start_range: <BoundCodec as BytesEncode<'t>>::EItem| {
let mut combined_bitmap = RoaringBitmap::default(); let mut combined_bitmap = RoaringBitmap::default();
for bitmap in sub_bitmaps { for bitmap in sub_bitmaps {
combined_bitmap |= bitmap; combined_bitmap |= bitmap;
} }
range_for_bitmaps.push(start_range); group_sizes.push(sub_bitmaps.len() as u8);
left_bounds.push(left_bound);
bitmaps.push(combined_bitmap); bitmaps.push(combined_bitmap);
if bitmaps.len() == level_group_size.get() { if bitmaps.len() != self.level_group_size {
let start_bound = range_for_bitmaps.first().unwrap(); return Ok(());
computed_group_bitmap(&bitmaps, *start_bound)?;
for (bitmap, start_bound) in bitmaps.drain(..).zip(range_for_bitmaps.drain(..))
{
write_entry::<BoundCodec>(
&mut cur_writer,
field_id,
NonZeroU8::new(level).unwrap(),
start_bound,
bitmap,
)?;
} }
let left_bound = left_bounds.first().unwrap();
handle_group(&bitmaps, left_bound)?;
for ((bitmap, left_bound), group_size) in
bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
{
let key = FacetKey { field_id: self.field_id, level, left_bound };
let key =
FacetKeyCodec::<MyByteSlice>::bytes_encode(&key).ok_or(Error::Encoding)?;
let value = FacetGroupValue { size: group_size, bitmap };
let value =
FacetGroupValueCodec::bytes_encode(&value).ok_or(Error::Encoding)?;
cur_writer.insert(key, value)?;
cur_writer_len += 1;
} }
Ok(()) Ok(())
}, })?;
)?;
// don't forget to insert the leftover elements into the writer as well // don't forget to insert the leftover elements into the writer as well
if !bitmaps.is_empty() { if !bitmaps.is_empty() && cur_writer_len >= self.level_group_size * self.min_level_size {
let start_range = range_for_bitmaps.first().unwrap(); let left_bound = left_bounds.first().unwrap();
let end_range = range_for_bitmaps.last().unwrap(); handle_group(&bitmaps, left_bound)?;
computed_group_bitmap(&bitmaps, *start_range)?; for ((bitmap, left_bound), group_size) in
for (bitmap, bound) in bitmaps.drain(..).zip(range_for_bitmaps.drain(..)) { bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
write_entry( {
&mut cur_writer, let key = FacetKey { field_id: self.field_id, level, left_bound };
field_id, let key =
NonZeroU8::new(level).unwrap(), FacetKeyCodec::<MyByteSlice>::bytes_encode(&key).ok_or(Error::Encoding)?;
bound, let value = FacetGroupValue { size: group_size, bitmap };
bitmap, let value = FacetGroupValueCodec::bytes_encode(&value).ok_or(Error::Encoding)?;
)?; cur_writer.insert(key, value)?;
cur_writer_len += 1;
} }
} }
if cur_writer_len > self.level_group_size * self.min_level_size {
sub_writers.push(writer_into_reader(cur_writer)?); sub_writers.push(writer_into_reader(cur_writer)?);
}
return Ok(sub_writers); return Ok(sub_writers);
} }
} }
fn write_entry<BoundCodec>(
writer: &mut Writer<File>,
field_id: FieldId,
level: NonZeroU8,
bound: <BoundCodec as BytesEncode<'_>>::EItem,
docids: RoaringBitmap,
) -> Result<()>
where
for<'a> BoundCodec: BytesEncode<'a>,
for<'a> <BoundCodec as BytesEncode<'a>>::EItem: Copy + Sized,
{
todo!()
// let key = FacetKey { field_id, level: level.get(), left_bound: bound };
// let key_bytes = FacetKeyCodec::<BoundCodec>::bytes_encode(&key).ok_or(Error::Encoding)?;
// let value_bytes =
// FacetGroupValueCodec::bytes_encode(&FacetGroupValue { size: 4, bitmap: docids })
// .ok_or(Error::Encoding)?;
// writer.insert(&key_bytes, &value_bytes)?;
// Ok(())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::num::NonZeroUsize; use std::num::NonZeroUsize;

View File

@ -34,6 +34,7 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::UserError; use crate::error::UserError;
use crate::heed_codec::facet::new::{FacetKeyCodec, MyByteSlice};
pub use crate::update::index_documents::helpers::CursorClonableMmap; pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{ use crate::update::{
self, Facets, IndexerConfig, PrefixWordPairsProximityDocids, UpdateIndexingStep, self, Facets, IndexerConfig, PrefixWordPairsProximityDocids, UpdateIndexingStep,
@ -431,7 +432,11 @@ where
let mut databases_seen = MERGED_DATABASE_COUNT; let mut databases_seen = MERGED_DATABASE_COUNT;
// Run the facets update operation. // Run the facets update operation.
let mut builder = Facets::new(self.wtxn, self.index); for facet_db in [
(&self.index.facet_id_string_docids).remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
(&self.index.facet_id_f64_docids).remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
] {
let mut builder = Facets::new(self.index, facet_db);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type; builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
if let Some(value) = self.config.facet_level_group_size { if let Some(value) = self.config.facet_level_group_size {
@ -440,7 +445,8 @@ where
if let Some(value) = self.config.facet_min_level_size { if let Some(value) = self.config.facet_min_level_size {
builder.min_level_size(value); builder.min_level_size(value);
} }
builder.execute()?; builder.execute(self.wtxn)?;
}
databases_seen += 1; databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {