Add FacetsUpdate type that wraps incremental and bulk indexing methods

This commit is contained in:
Loïc Lecrenier 2022-09-05 12:52:05 +02:00 committed by Loïc Lecrenier
parent 3d145d7f48
commit 9b55e582cd
11 changed files with 216 additions and 129 deletions

View File

@ -1,10 +1,11 @@
use std::borrow::Cow;
use std::cmp;
use std::fs::File;
use std::num::NonZeroUsize;
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::{BytesEncode, Error, RoTxn};
use heed::types::{ByteSlice, DecodeIgnore};
use heed::{BytesDecode, BytesEncode, Error, RoTxn, RwTxn};
use log::debug;
use roaring::RoaringBitmap;
use time::OffsetDateTime;
@ -14,21 +15,27 @@ use crate::facet::FacetType;
use crate::heed_codec::facet::new::{
FacetGroupValue, FacetGroupValueCodec, FacetKey, FacetKeyCodec, MyByteSlice,
};
use crate::update::index_documents::{create_writer, write_into_lmdb_database, writer_into_reader};
use crate::{FieldId, Index, Result};
use crate::update::index_documents::{
create_writer, valid_lmdb_key, write_into_lmdb_database, writer_into_reader,
};
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
pub struct FacetsUpdateBulk<'i> {
index: &'i Index,
database: heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
level_group_size: usize,
min_level_size: usize,
facet_type: FacetType,
// None if level 0 does not need to be updated
new_data: Option<grenad::Reader<File>>,
}
impl<'i> FacetsUpdateBulk<'i> {
pub fn new(index: &'i Index, facet_type: FacetType) -> FacetsUpdateBulk<'i> {
pub fn new(
index: &'i Index,
facet_type: FacetType,
new_data: grenad::Reader<File>,
) -> FacetsUpdateBulk<'i> {
FacetsUpdateBulk {
index,
database: match facet_type {
@ -39,11 +46,31 @@ impl<'i> FacetsUpdateBulk<'i> {
index.facet_id_f64_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>()
}
},
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
level_group_size: 4,
min_level_size: 5,
facet_type,
new_data: Some(new_data),
}
}
pub fn new_not_updating_level_0(
index: &'i Index,
facet_type: FacetType,
) -> FacetsUpdateBulk<'i> {
FacetsUpdateBulk {
index,
database: match facet_type {
FacetType::String => {
index.facet_id_string_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>()
}
FacetType::Number => {
index.facet_id_f64_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>()
}
},
level_group_size: 4,
min_level_size: 5,
facet_type,
new_data: None,
}
}
@ -70,39 +97,84 @@ impl<'i> FacetsUpdateBulk<'i> {
}
#[logging_timer::time("FacetsUpdateBulk::{}")]
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
pub fn execute(mut self, wtxn: &mut heed::RwTxn) -> Result<()> {
self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
debug!("Computing and writing the facet values levels docids into LMDB on disk...");
// We get the faceted fields to be able to create the facet levels.
let faceted_fields = self.index.faceted_fields_ids(wtxn)?.clone();
debug!("Computing and writing the facet values levels docids into LMDB on disk...");
for &field_id in faceted_fields.iter() {
self.clear_levels(wtxn, field_id)?;
}
self.update_level0(wtxn)?;
let mut nested_wtxn = self.index.env.nested_write_txn(wtxn)?;
// let mut nested_wtxn = self.index.env.nested_write_txn(wtxn)?;
for &field_id in faceted_fields.iter() {
let (level_readers, all_docids) =
self.compute_levels_for_field_id(field_id, &nested_wtxn)?;
let (level_readers, all_docids) = self.compute_levels_for_field_id(field_id, &wtxn)?;
let put_docids_fn = match self.facet_type {
FacetType::Number => Index::put_number_faceted_documents_ids,
FacetType::String => Index::put_string_faceted_documents_ids,
};
put_docids_fn(&self.index, &mut nested_wtxn, field_id, &all_docids)?;
self.index.put_faceted_documents_ids(wtxn, field_id, self.facet_type, &all_docids)?;
for level_reader in level_readers {
// TODO: append instead of write with merge
write_into_lmdb_database(
&mut nested_wtxn,
*self.database.as_polymorph(),
level_reader,
|_, _| {
Err(InternalError::IndexingMergingKeys { process: "facet string levels" })?
},
)?;
let mut cursor = level_reader.into_cursor()?;
while let Some((k, v)) = cursor.move_on_next()? {
let key = FacetKeyCodec::<DecodeIgnore>::bytes_decode(k).unwrap();
let value = FacetGroupValueCodec::bytes_decode(v).unwrap();
println!("inserting {key:?} {value:?}");
self.database.remap_types::<ByteSlice, ByteSlice>().put(wtxn, k, v)?;
}
}
}
Ok(())
}
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
let new_data = match self.new_data.take() {
Some(x) => x,
None => return Ok(()),
};
if self.database.is_empty(wtxn)? {
let mut buffer = Vec::new();
let mut database = self.database.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
buffer.clear();
// the group size for level 0
buffer.push(1);
// then we extend the buffer with the docids bitmap
buffer.extend_from_slice(value);
unsafe { database.append(key, &buffer)? };
}
}
} else {
let mut buffer = Vec::new();
let database = self.database.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
buffer.clear();
// the group size for level 0
buffer.push(1);
// then we extend the buffer with the docids bitmap
match database.get(wtxn, key)? {
Some(prev_value) => {
let old_bitmap = &prev_value[1..];
CboRoaringBitmapCodec::merge_into(
&[Cow::Borrowed(value), Cow::Borrowed(old_bitmap)],
&mut buffer,
)?;
}
None => {
buffer.extend_from_slice(value);
}
};
database.put(wtxn, key, &buffer)?;
}
}
}
@ -114,16 +186,14 @@ impl<'i> FacetsUpdateBulk<'i> {
field_id: FieldId,
txn: &RoTxn,
) -> Result<(Vec<grenad::Reader<File>>, RoaringBitmap)> {
let algo = FacetsUpdateBulkAlgorithm {
// TODO: first check whether there is anything in level 0
let algo = ComputeHigherLevels {
rtxn: txn,
db: &self.database,
field_id,
level_group_size: self.level_group_size,
min_level_size: self.min_level_size,
chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level,
};
// TODO: first check whether there is anything in level 0
let mut all_docids = RoaringBitmap::new();
let subwriters = algo.compute_higher_levels(32, &mut |bitmaps, _| {
@ -138,16 +208,14 @@ impl<'i> FacetsUpdateBulk<'i> {
}
}
pub struct FacetsUpdateBulkAlgorithm<'t> {
struct ComputeHigherLevels<'t> {
rtxn: &'t heed::RoTxn<'t>,
db: &'t heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
field_id: u16,
level_group_size: usize,
min_level_size: usize,
}
impl<'t> FacetsUpdateBulkAlgorithm<'t> {
impl<'t> ComputeHigherLevels<'t> {
fn read_level_0(
&self,
handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
@ -215,11 +283,7 @@ impl<'t> FacetsUpdateBulkAlgorithm<'t> {
// once we have computed `level_group_size` elements, we give the left bound
// of those elements, and their bitmaps, to the level above
let mut cur_writer = create_writer(
self.chunk_compression_type,
self.chunk_compression_level,
tempfile::tempfile()?,
);
let mut cur_writer = create_writer(CompressionType::None, None, tempfile::tempfile()?);
let mut cur_writer_len = 0;
let mut group_sizes = vec![];
@ -259,7 +323,7 @@ impl<'t> FacetsUpdateBulkAlgorithm<'t> {
Ok(())
})?;
// don't forget to insert the leftover elements into the writer as well
if !bitmaps.is_empty() && cur_writer_len >= self.level_group_size * self.min_level_size {
if !bitmaps.is_empty() && cur_writer_len >= self.min_level_size {
let left_bound = left_bounds.first().unwrap();
handle_group(&bitmaps, left_bound)?;
for ((bitmap, left_bound), group_size) in
@ -274,7 +338,7 @@ impl<'t> FacetsUpdateBulkAlgorithm<'t> {
cur_writer_len += 1;
}
}
if cur_writer_len > self.level_group_size * self.min_level_size {
if cur_writer_len > self.min_level_size {
sub_writers.push(writer_into_reader(cur_writer)?);
}
return Ok(sub_writers);
@ -315,9 +379,9 @@ mod tests {
documents.push(serde_json::json!({ "facet2": i }).as_object().unwrap().clone());
}
let documents = documents_batch_reader_from_objects(documents);
dbg!();
index.add_documents(documents).unwrap();
dbg!();
db_snap!(index, facet_id_f64_docids, name);
};

View File

@ -1,2 +1,90 @@
use std::{collections::HashMap, fs::File};
use grenad::CompressionType;
use heed::BytesDecode;
use roaring::RoaringBitmap;
use crate::{
facet::FacetType,
heed_codec::facet::new::{FacetGroupValueCodec, FacetKeyCodec, MyByteSlice},
CboRoaringBitmapCodec, FieldId, Index, Result,
};
use super::{FacetsUpdateBulk, FacetsUpdateIncremental};
pub mod bulk;
pub mod incremental;
pub struct FacetsUpdate<'i> {
index: &'i Index,
database: heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
level_group_size: u8,
max_level_group_size: u8,
min_level_size: u8,
facet_type: FacetType,
new_data: grenad::Reader<File>,
}
impl<'i> FacetsUpdate<'i> {
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
let database = match facet_type {
FacetType::String => {
index.facet_id_string_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>()
}
FacetType::Number => {
index.facet_id_f64_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>()
}
};
Self {
index,
database,
level_group_size: 4,
max_level_group_size: 8,
min_level_size: 5,
facet_type,
new_data,
}
}
// /// The number of elements from the level below that are represented by a single element in the level above
// ///
// /// This setting is always greater than or equal to 2.
// pub fn level_group_size(&mut self, value: u8) -> &mut Self {
// self.level_group_size = std::cmp::max(value, 2);
// self
// }
// /// The minimum number of elements that a level is allowed to have.
// pub fn min_level_size(&mut self, value: u8) -> &mut Self {
// self.min_level_size = std::cmp::max(value, 1);
// self
// }
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
if self.database.is_empty(wtxn)? {
let bulk_update = FacetsUpdateBulk::new(self.index, self.facet_type, self.new_data);
bulk_update.execute(wtxn)?;
} else {
let indexer = FacetsUpdateIncremental::new(self.database);
let mut new_faceted_docids = HashMap::<FieldId, RoaringBitmap>::default();
let mut cursor = self.new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let key =
FacetKeyCodec::<MyByteSlice>::bytes_decode(key).ok_or(heed::Error::Encoding)?;
let docids =
CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?;
indexer.insert(wtxn, key.field_id, key.left_bound, &docids)?;
*new_faceted_docids.entry(key.field_id).or_default() |= docids;
}
for (field_id, new_docids) in new_faceted_docids {
let mut docids =
self.index.faceted_documents_ids(wtxn, field_id, self.facet_type)?;
docids |= new_docids;
self.index.put_faceted_documents_ids(wtxn, field_id, self.facet_type, &docids)?;
}
}
Ok(())
}
}

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
947949d1a5c9c4e895c89fba46cbba68
07718df52f8463335fb8fefcd3ae01f4

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
947949d1a5c9c4e895c89fba46cbba68
07718df52f8463335fb8fefcd3ae01f4

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
947949d1a5c9c4e895c89fba46cbba68
07718df52f8463335fb8fefcd3ae01f4

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
947949d1a5c9c4e895c89fba46cbba68
07718df52f8463335fb8fefcd3ae01f4

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
947949d1a5c9c4e895c89fba46cbba68
07718df52f8463335fb8fefcd3ae01f4

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
947949d1a5c9c4e895c89fba46cbba68
07718df52f8463335fb8fefcd3ae01f4

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
5ce8009d3eb023e4b9c0a6e7fa4e6262
3e6a91b3c54c614a4787224ac4278ed3

View File

@ -1,4 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
5ce8009d3eb023e4b9c0a6e7fa4e6262
3e6a91b3c54c614a4787224ac4278ed3

View File

@ -1,5 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
use std::io;
@ -14,12 +13,12 @@ use super::helpers::{
valid_lmdb_key, CursorClonableMmap,
};
use super::{ClonableMmap, MergeFn};
use crate::heed_codec::facet::new::{FacetKeyCodec, MyByteSlice};
use crate::facet::FacetType;
use crate::update::facet::FacetsUpdate;
use crate::update::index_documents::helpers::as_cloneable_grenad;
use crate::update::FacetsUpdateIncremental;
use crate::{
lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint,
Index, Result,
lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index,
Result,
};
pub(crate) enum TypedChunk {
@ -138,78 +137,14 @@ pub(crate) fn write_typed_chunk_into_index(
)?;
is_merged_database = true;
}
TypedChunk::FieldIdFacetNumberDocids(facet_id_f64_docids_iter) => {
// merge cbo roaring bitmaps is not the correct merger because the data in the DB
// is FacetGroupValue and not RoaringBitmap
// so I need to create my own merging function
// facet_id_string_docids is encoded as:
// key: FacetKeyCodec<StrRefCodec>
// value: CboRoaringBitmapCodec
// basically
// TODO: a condition saying "if I have more than 1/50th of the DB to add,
// then I do it in bulk, otherwise I do it incrementally". But instead of 1/50,
// it is a ratio I determine empirically
// for now I only do it incrementally, to see if things work
let indexer = FacetsUpdateIncremental::new(
index.facet_id_f64_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
);
let mut new_faceted_docids = HashMap::<FieldId, RoaringBitmap>::default();
let mut cursor = facet_id_f64_docids_iter.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let key =
FacetKeyCodec::<MyByteSlice>::bytes_decode(key).ok_or(heed::Error::Encoding)?;
let docids =
CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?;
indexer.insert(wtxn, key.field_id, key.left_bound, &docids)?;
*new_faceted_docids.entry(key.field_id).or_default() |= docids;
}
for (field_id, new_docids) in new_faceted_docids {
let mut docids = index.number_faceted_documents_ids(wtxn, field_id)?;
docids |= new_docids;
index.put_number_faceted_documents_ids(wtxn, field_id, &docids)?;
}
TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids_iter) => {
let indexer = FacetsUpdate::new(index, FacetType::Number, facet_id_number_docids_iter);
indexer.execute(wtxn)?;
is_merged_database = true;
}
TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids) => {
// merge cbo roaring bitmaps is not the correct merger because the data in the DB
// is FacetGroupValue and not RoaringBitmap
// so I need to create my own merging function
// facet_id_string_docids is encoded as:
// key: FacetKeyCodec<StrRefCodec>
// value: CboRoaringBitmapCodec
// basically
// TODO: a condition saying "if I have more than 1/50th of the DB to add,
// then I do it in bulk, otherwise I do it incrementally". But instead of 1/50,
// it is a ratio I determine empirically
// for now I only do it incrementally, to see if things work
let indexer = FacetsUpdateIncremental::new(
index.facet_id_string_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
);
let mut new_faceted_docids = HashMap::<FieldId, RoaringBitmap>::default();
let mut cursor = facet_id_string_docids.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let key =
FacetKeyCodec::<MyByteSlice>::bytes_decode(key).ok_or(heed::Error::Encoding)?;
let docids =
CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?;
indexer.insert(wtxn, key.field_id, key.left_bound, &docids)?;
*new_faceted_docids.entry(key.field_id).or_default() |= docids;
}
for (field_id, new_docids) in new_faceted_docids {
let mut docids = index.string_faceted_documents_ids(wtxn, field_id)?;
docids |= new_docids;
index.put_string_faceted_documents_ids(wtxn, field_id, &docids)?;
}
TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids_iter) => {
let indexer = FacetsUpdate::new(index, FacetType::String, facet_id_string_docids_iter);
indexer.execute(wtxn)?;
is_merged_database = true;
}
TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => {