mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 20:15:07 +08:00
Facets Bulk update
This commit is contained in:
parent
560e8f5613
commit
f67ff3a738
@ -2,9 +2,10 @@ use std::borrow::Cow;
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::BufReader;
|
use std::io::BufReader;
|
||||||
|
|
||||||
use grenad::CompressionType;
|
use grenad::{CompressionType, Reader};
|
||||||
use heed::types::ByteSlice;
|
use heed::types::ByteSlice;
|
||||||
use heed::{BytesEncode, Error, RoTxn, RwTxn};
|
use heed::{BytesEncode, Error, RoTxn, RwTxn};
|
||||||
|
use obkv::KvReader;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use super::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
|
use super::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
|
||||||
@ -13,6 +14,7 @@ use crate::heed_codec::facet::{
|
|||||||
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
|
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
|
||||||
};
|
};
|
||||||
use crate::heed_codec::ByteSliceRefCodec;
|
use crate::heed_codec::ByteSliceRefCodec;
|
||||||
|
use crate::update::del_add::DelAdd;
|
||||||
use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
|
use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
|
||||||
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
|
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
|
||||||
|
|
||||||
@ -31,7 +33,7 @@ pub struct FacetsUpdateBulk<'i> {
|
|||||||
facet_type: FacetType,
|
facet_type: FacetType,
|
||||||
field_ids: Vec<FieldId>,
|
field_ids: Vec<FieldId>,
|
||||||
// None if level 0 does not need to be updated
|
// None if level 0 does not need to be updated
|
||||||
new_data: Option<grenad::Reader<BufReader<File>>>,
|
delta_data: Option<grenad::Reader<BufReader<File>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'i> FacetsUpdateBulk<'i> {
|
impl<'i> FacetsUpdateBulk<'i> {
|
||||||
@ -39,7 +41,7 @@ impl<'i> FacetsUpdateBulk<'i> {
|
|||||||
index: &'i Index,
|
index: &'i Index,
|
||||||
field_ids: Vec<FieldId>,
|
field_ids: Vec<FieldId>,
|
||||||
facet_type: FacetType,
|
facet_type: FacetType,
|
||||||
new_data: grenad::Reader<BufReader<File>>,
|
delta_data: grenad::Reader<BufReader<File>>,
|
||||||
group_size: u8,
|
group_size: u8,
|
||||||
min_level_size: u8,
|
min_level_size: u8,
|
||||||
) -> FacetsUpdateBulk<'i> {
|
) -> FacetsUpdateBulk<'i> {
|
||||||
@ -49,7 +51,7 @@ impl<'i> FacetsUpdateBulk<'i> {
|
|||||||
group_size,
|
group_size,
|
||||||
min_level_size,
|
min_level_size,
|
||||||
facet_type,
|
facet_type,
|
||||||
new_data: Some(new_data),
|
delta_data: Some(delta_data),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,13 +66,13 @@ impl<'i> FacetsUpdateBulk<'i> {
|
|||||||
group_size: FACET_GROUP_SIZE,
|
group_size: FACET_GROUP_SIZE,
|
||||||
min_level_size: FACET_MIN_LEVEL_SIZE,
|
min_level_size: FACET_MIN_LEVEL_SIZE,
|
||||||
facet_type,
|
facet_type,
|
||||||
new_data: None,
|
delta_data: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[logging_timer::time("FacetsUpdateBulk::{}")]
|
#[logging_timer::time("FacetsUpdateBulk::{}")]
|
||||||
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
|
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
|
||||||
let Self { index, field_ids, group_size, min_level_size, facet_type, new_data } = self;
|
let Self { index, field_ids, group_size, min_level_size, facet_type, delta_data } = self;
|
||||||
|
|
||||||
let db = match facet_type {
|
let db = match facet_type {
|
||||||
FacetType::String => index
|
FacetType::String => index
|
||||||
@ -81,7 +83,7 @@ impl<'i> FacetsUpdateBulk<'i> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let inner = FacetsUpdateBulkInner { db, new_data, group_size, min_level_size };
|
let inner = FacetsUpdateBulkInner { db, delta_data, group_size, min_level_size };
|
||||||
|
|
||||||
inner.update(wtxn, &field_ids, |wtxn, field_id, all_docids| {
|
inner.update(wtxn, &field_ids, |wtxn, field_id, all_docids| {
|
||||||
index.put_faceted_documents_ids(wtxn, field_id, facet_type, &all_docids)?;
|
index.put_faceted_documents_ids(wtxn, field_id, facet_type, &all_docids)?;
|
||||||
@ -95,7 +97,7 @@ impl<'i> FacetsUpdateBulk<'i> {
|
|||||||
/// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type
|
/// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type
|
||||||
pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
|
pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
|
||||||
pub db: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
|
pub db: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
|
||||||
pub new_data: Option<grenad::Reader<R>>,
|
pub delta_data: Option<grenad::Reader<R>>,
|
||||||
pub group_size: u8,
|
pub group_size: u8,
|
||||||
pub min_level_size: u8,
|
pub min_level_size: u8,
|
||||||
}
|
}
|
||||||
@ -134,20 +136,26 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO the new_data is an Reader<Obkv<Key, Obkv<DelAdd, RoaringBitmap>>>
|
|
||||||
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
|
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
|
||||||
let new_data = match self.new_data.take() {
|
let delta_data = match self.delta_data.take() {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => return Ok(()),
|
None => return Ok(()),
|
||||||
};
|
};
|
||||||
if self.db.is_empty(wtxn)? {
|
if self.db.is_empty(wtxn)? {
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
let mut database = self.db.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
|
let mut database = self.db.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
|
||||||
let mut cursor = new_data.into_cursor()?;
|
let mut cursor = delta_data.into_cursor()?;
|
||||||
while let Some((key, value)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
if !valid_lmdb_key(key) {
|
if !valid_lmdb_key(key) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
let value: KvReader<DelAdd> = KvReader::new(value);
|
||||||
|
|
||||||
|
// DB is empty, it is safe to ignore Del operations
|
||||||
|
let Some(value) = value.get(DelAdd::Addition) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
// the group size for level 0
|
// the group size for level 0
|
||||||
buffer.push(1);
|
buffer.push(1);
|
||||||
@ -159,11 +167,14 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
|
|||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
let database = self.db.remap_types::<ByteSlice, ByteSlice>();
|
let database = self.db.remap_types::<ByteSlice, ByteSlice>();
|
||||||
|
|
||||||
let mut cursor = new_data.into_cursor()?;
|
let mut cursor = delta_data.into_cursor()?;
|
||||||
while let Some((key, value)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
if !valid_lmdb_key(key) {
|
if !valid_lmdb_key(key) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let value: KvReader<DelAdd> = KvReader::new(value);
|
||||||
|
|
||||||
// the value is a CboRoaringBitmap, but I still need to prepend the
|
// the value is a CboRoaringBitmap, but I still need to prepend the
|
||||||
// group size for level 0 (= 1) to it
|
// group size for level 0 (= 1) to it
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
@ -172,12 +183,15 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
|
|||||||
match database.get(wtxn, key)? {
|
match database.get(wtxn, key)? {
|
||||||
Some(prev_value) => {
|
Some(prev_value) => {
|
||||||
let old_bitmap = &prev_value[1..];
|
let old_bitmap = &prev_value[1..];
|
||||||
CboRoaringBitmapCodec::merge_into(
|
CboRoaringBitmapCodec::merge_deladd_into(value, old_bitmap, &mut buffer)?;
|
||||||
&[Cow::Borrowed(value), Cow::Borrowed(old_bitmap)],
|
|
||||||
&mut buffer,
|
|
||||||
)?;
|
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
// it is safe to ignore the del in that case.
|
||||||
|
let Some(value) = value.get(DelAdd::Addition) else {
|
||||||
|
// won't put the key in DB as the value would be empty
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
buffer.extend_from_slice(value);
|
buffer.extend_from_slice(value);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -109,7 +109,7 @@ pub struct FacetsUpdate<'i> {
|
|||||||
index: &'i Index,
|
index: &'i Index,
|
||||||
database: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
|
database: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
|
||||||
facet_type: FacetType,
|
facet_type: FacetType,
|
||||||
new_data: grenad::Reader<BufReader<File>>,
|
delta_data: grenad::Reader<BufReader<File>>,
|
||||||
group_size: u8,
|
group_size: u8,
|
||||||
max_group_size: u8,
|
max_group_size: u8,
|
||||||
min_level_size: u8,
|
min_level_size: u8,
|
||||||
@ -119,7 +119,7 @@ impl<'i> FacetsUpdate<'i> {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
index: &'i Index,
|
index: &'i Index,
|
||||||
facet_type: FacetType,
|
facet_type: FacetType,
|
||||||
new_data: grenad::Reader<BufReader<File>>,
|
delta_data: grenad::Reader<BufReader<File>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let database = match facet_type {
|
let database = match facet_type {
|
||||||
FacetType::String => index
|
FacetType::String => index
|
||||||
@ -136,26 +136,26 @@ impl<'i> FacetsUpdate<'i> {
|
|||||||
max_group_size: FACET_MAX_GROUP_SIZE,
|
max_group_size: FACET_MAX_GROUP_SIZE,
|
||||||
min_level_size: FACET_MIN_LEVEL_SIZE,
|
min_level_size: FACET_MIN_LEVEL_SIZE,
|
||||||
facet_type,
|
facet_type,
|
||||||
new_data,
|
delta_data,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
|
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
|
||||||
if self.new_data.is_empty() {
|
if self.delta_data.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
debug!("Computing and writing the facet values levels docids into LMDB on disk...");
|
debug!("Computing and writing the facet values levels docids into LMDB on disk...");
|
||||||
self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
||||||
|
|
||||||
// See self::comparison_bench::benchmark_facet_indexing
|
// See self::comparison_bench::benchmark_facet_indexing
|
||||||
if self.new_data.len() >= (self.database.len(wtxn)? as u64 / 50) {
|
if self.delta_data.len() >= (self.database.len(wtxn)? as u64 / 50) {
|
||||||
let field_ids =
|
let field_ids =
|
||||||
self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::<Vec<_>>();
|
self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::<Vec<_>>();
|
||||||
let bulk_update = FacetsUpdateBulk::new(
|
let bulk_update = FacetsUpdateBulk::new(
|
||||||
self.index,
|
self.index,
|
||||||
field_ids,
|
field_ids,
|
||||||
self.facet_type,
|
self.facet_type,
|
||||||
self.new_data,
|
self.delta_data,
|
||||||
self.group_size,
|
self.group_size,
|
||||||
self.min_level_size,
|
self.min_level_size,
|
||||||
);
|
);
|
||||||
@ -164,7 +164,7 @@ impl<'i> FacetsUpdate<'i> {
|
|||||||
let incremental_update = FacetsUpdateIncremental::new(
|
let incremental_update = FacetsUpdateIncremental::new(
|
||||||
self.index,
|
self.index,
|
||||||
self.facet_type,
|
self.facet_type,
|
||||||
self.new_data,
|
self.delta_data,
|
||||||
self.group_size,
|
self.group_size,
|
||||||
self.min_level_size,
|
self.min_level_size,
|
||||||
self.max_group_size,
|
self.max_group_size,
|
||||||
@ -464,7 +464,7 @@ pub(crate) mod test_helpers {
|
|||||||
|
|
||||||
let update = FacetsUpdateBulkInner {
|
let update = FacetsUpdateBulkInner {
|
||||||
db: self.content,
|
db: self.content,
|
||||||
new_data: Some(reader),
|
delta_data: Some(reader),
|
||||||
group_size: self.group_size.get(),
|
group_size: self.group_size.get(),
|
||||||
min_level_size: self.min_level_size.get(),
|
min_level_size: self.min_level_size.get(),
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user