mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-25 19:45:05 +08:00
Reimplement facet search and facetr level and put them in dedidcated functions
This commit is contained in:
parent
db55638714
commit
8b260de5a0
@ -94,8 +94,6 @@ pub enum Database {
|
|||||||
FacetIdExistsDocids,
|
FacetIdExistsDocids,
|
||||||
FacetIdF64NumberDocids,
|
FacetIdF64NumberDocids,
|
||||||
FacetIdStringDocids,
|
FacetIdStringDocids,
|
||||||
FacetIdNormalizedStringStrings,
|
|
||||||
FacetIdStringFst,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
@ -115,10 +113,6 @@ impl Database {
|
|||||||
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
|
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
|
||||||
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
|
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
|
||||||
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
|
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
|
||||||
Database::FacetIdNormalizedStringStrings => {
|
|
||||||
index.facet_id_normalized_string_strings.remap_types()
|
|
||||||
}
|
|
||||||
Database::FacetIdStringFst => index.facet_id_string_fst.remap_types(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -194,10 +188,6 @@ impl ExtractorSender {
|
|||||||
DocumentsSender(self)
|
DocumentsSender(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn facet_searchable(&self) -> FacetSearchableSender<'_> {
|
|
||||||
FacetSearchableSender { sender: self }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> {
|
pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> {
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap(
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap(
|
||||||
DOCUMENTS_IDS_KEY.as_bytes(),
|
DOCUMENTS_IDS_KEY.as_bytes(),
|
||||||
@ -322,34 +312,6 @@ impl DocidsSender for FacetDocidsSender<'_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FacetSearchableSender<'a> {
|
|
||||||
sender: &'a ExtractorSender,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FacetSearchableSender<'_> {
|
|
||||||
pub fn write_facet(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
|
|
||||||
match self
|
|
||||||
.sender
|
|
||||||
.send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry })
|
|
||||||
{
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(SendError(_)) => Err(SendError(())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete_facet(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
|
||||||
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
|
|
||||||
match self
|
|
||||||
.sender
|
|
||||||
.send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry })
|
|
||||||
{
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(SendError(_)) => Err(SendError(())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DocumentsSender<'a>(&'a ExtractorSender);
|
pub struct DocumentsSender<'a>(&'a ExtractorSender);
|
||||||
|
|
||||||
impl DocumentsSender<'_> {
|
impl DocumentsSender<'_> {
|
||||||
|
@ -6,7 +6,6 @@ use grenad::Sorter;
|
|||||||
use heed::types::{Bytes, SerdeJson};
|
use heed::types::{Bytes, SerdeJson};
|
||||||
use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn};
|
use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn};
|
||||||
|
|
||||||
use super::channel::FacetSearchableSender;
|
|
||||||
use super::extract::FacetKind;
|
use super::extract::FacetKind;
|
||||||
use super::fst_merger_builder::FstMergerBuilder;
|
use super::fst_merger_builder::FstMergerBuilder;
|
||||||
use super::KvReaderDelAdd;
|
use super::KvReaderDelAdd;
|
||||||
@ -42,7 +41,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
Some(0),
|
Some(0),
|
||||||
false,
|
true,
|
||||||
);
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
@ -115,13 +114,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_fst")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_fst")]
|
||||||
pub fn merge_and_send(
|
pub fn merge_and_write(self, index: &Index, wtxn: &mut RwTxn, rtxn: &RoTxn) -> Result<()> {
|
||||||
self,
|
|
||||||
index: &Index,
|
|
||||||
wtxn: &mut RwTxn,
|
|
||||||
rtxn: &RoTxn,
|
|
||||||
sender: FacetSearchableSender,
|
|
||||||
) -> Result<()> {
|
|
||||||
let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?;
|
let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?;
|
||||||
let mut builder = grenad::MergerBuilder::new(MergeDeladdBtreesetString);
|
let mut builder = grenad::MergerBuilder::new(MergeDeladdBtreesetString);
|
||||||
builder.extend(reader);
|
builder.extend(reader);
|
||||||
@ -138,24 +131,24 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
|
|
||||||
if current_field_id != Some(field_id) {
|
if current_field_id != Some(field_id) {
|
||||||
if let Some(fst_merger_builder) = fst_merger_builder {
|
if let Some(fst_merger_builder) = fst_merger_builder {
|
||||||
// send the previous fst to the channel
|
|
||||||
let mmap = fst_merger_builder.build(&mut callback)?;
|
let mmap = fst_merger_builder.build(&mut callback)?;
|
||||||
// sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
|
index
|
||||||
todo!("What to do");
|
.facet_id_string_fst
|
||||||
|
.remap_data_type::<Bytes>()
|
||||||
|
.put(wtxn, &field_id, &mmap)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("getting fst for field_id: {}", field_id);
|
|
||||||
fst = index.facet_id_string_fst.get(rtxn, &field_id)?;
|
fst = index.facet_id_string_fst.get(rtxn, &field_id)?;
|
||||||
fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?);
|
fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?);
|
||||||
current_field_id = Some(field_id);
|
current_field_id = Some(field_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
let current = database.get(rtxn, key)?;
|
let previous = database.get(rtxn, key)?;
|
||||||
let deladd: &KvReaderDelAdd = deladd.into();
|
let deladd: &KvReaderDelAdd = deladd.into();
|
||||||
let del = deladd.get(DelAdd::Deletion);
|
let del = deladd.get(DelAdd::Deletion);
|
||||||
let add = deladd.get(DelAdd::Addition);
|
let add = deladd.get(DelAdd::Addition);
|
||||||
|
|
||||||
match merge_btreesets(current, del, add)? {
|
match merge_btreesets(previous, del, add)? {
|
||||||
Operation::Write(value) => {
|
Operation::Write(value) => {
|
||||||
match fst_merger_builder.as_mut() {
|
match fst_merger_builder.as_mut() {
|
||||||
Some(fst_merger_builder) => {
|
Some(fst_merger_builder) => {
|
||||||
@ -170,7 +163,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
let key = (field_id, normalized_facet_string);
|
let key = (field_id, normalized_facet_string);
|
||||||
let key_bytes =
|
let key_bytes =
|
||||||
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
||||||
sender.write_facet(&key_bytes, &value).unwrap();
|
database.put(wtxn, &key_bytes, &value)?;
|
||||||
}
|
}
|
||||||
Operation::Delete => {
|
Operation::Delete => {
|
||||||
match fst_merger_builder.as_mut() {
|
match fst_merger_builder.as_mut() {
|
||||||
@ -186,7 +179,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
let key = (field_id, normalized_facet_string);
|
let key = (field_id, normalized_facet_string);
|
||||||
let key_bytes =
|
let key_bytes =
|
||||||
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
||||||
sender.delete_facet(&key_bytes).unwrap();
|
database.delete(wtxn, &key_bytes)?;
|
||||||
}
|
}
|
||||||
Operation::Ignore => (),
|
Operation::Ignore => (),
|
||||||
}
|
}
|
||||||
@ -194,8 +187,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
|
|
||||||
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
|
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
|
||||||
let mmap = fst_merger_builder.build(&mut callback)?;
|
let mmap = fst_merger_builder.build(&mut callback)?;
|
||||||
// sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
|
index.facet_id_string_fst.remap_data_type::<Bytes>().put(wtxn, &field_id, &mmap)?;
|
||||||
todo!("What to do");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -18,6 +18,7 @@ pub use update_by_function::UpdateByFunction;
|
|||||||
|
|
||||||
use super::channel::*;
|
use super::channel::*;
|
||||||
use super::extract::*;
|
use super::extract::*;
|
||||||
|
use super::facet_search_builder::FacetSearchBuilder;
|
||||||
use super::merger::{FacetDatabases, FacetFieldIdsDelta};
|
use super::merger::{FacetDatabases, FacetFieldIdsDelta};
|
||||||
use super::word_fst_builder::PrefixDelta;
|
use super::word_fst_builder::PrefixDelta;
|
||||||
use super::words_prefix_docids::{
|
use super::words_prefix_docids::{
|
||||||
@ -114,7 +115,6 @@ where
|
|||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
facet_field_ids_delta = merge_and_send_facet_docids(
|
facet_field_ids_delta = merge_and_send_facet_docids(
|
||||||
global_fields_ids_map,
|
|
||||||
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?,
|
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?,
|
||||||
FacetDatabases::new(index),
|
FacetDatabases::new(index),
|
||||||
index,
|
index,
|
||||||
@ -240,9 +240,6 @@ where
|
|||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
|
||||||
let indexer_span = tracing::Span::current();
|
|
||||||
|
|
||||||
for operation in writer_receiver {
|
for operation in writer_receiver {
|
||||||
let database = operation.database(index);
|
let database = operation.database(index);
|
||||||
match operation.entry() {
|
match operation.entry() {
|
||||||
@ -258,7 +255,57 @@ where
|
|||||||
/// TODO handle the panicking threads
|
/// TODO handle the panicking threads
|
||||||
let facet_field_ids_delta = extractor_handle.join().unwrap()?;
|
let facet_field_ids_delta = extractor_handle.join().unwrap()?;
|
||||||
|
|
||||||
let prefix_delta = {
|
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
|
||||||
|
compute_prefix_database(index, wtxn, prefix_delta)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
|
||||||
|
|
||||||
|
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
||||||
|
|
||||||
|
Result::Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// required to into_inner the new_fields_ids_map
|
||||||
|
drop(fields_ids_map_store);
|
||||||
|
|
||||||
|
let fields_ids_map = new_fields_ids_map.into_inner().unwrap();
|
||||||
|
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
|
||||||
|
|
||||||
|
if let Some(new_primary_key) = new_primary_key {
|
||||||
|
index.put_primary_key(wtxn, new_primary_key.name())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// used to update the localized and weighted maps while sharing the update code with the settings pipeline.
|
||||||
|
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?;
|
||||||
|
inner_index_settings.recompute_facets(wtxn, index)?;
|
||||||
|
inner_index_settings.recompute_searchables(wtxn, index)?;
|
||||||
|
|
||||||
|
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||||
|
fn compute_prefix_database(
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
prefix_delta: PrefixDelta,
|
||||||
|
) -> Result<()> {
|
||||||
|
eprintln!("prefix_delta: {:?}", &prefix_delta);
|
||||||
|
let PrefixDelta { modified, deleted } = prefix_delta;
|
||||||
|
// Compute word prefix docids
|
||||||
|
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?;
|
||||||
|
// Compute exact word prefix docids
|
||||||
|
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted)?;
|
||||||
|
// Compute word prefix fid docids
|
||||||
|
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?;
|
||||||
|
// Compute word prefix position docids
|
||||||
|
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
|
||||||
|
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
|
||||||
let rtxn = index.read_txn()?;
|
let rtxn = index.read_txn()?;
|
||||||
let words_fst = index.words_fst(&rtxn)?;
|
let words_fst = index.words_fst(&rtxn)?;
|
||||||
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
|
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
|
||||||
@ -303,59 +350,62 @@ where
|
|||||||
WORDS_PREFIXES_FST_KEY,
|
WORDS_PREFIXES_FST_KEY,
|
||||||
&prefixes_fst_mmap,
|
&prefixes_fst_mmap,
|
||||||
)?;
|
)?;
|
||||||
Some(prefix_delta)
|
Ok(Some(prefix_delta))
|
||||||
} else {
|
} else {
|
||||||
None
|
Ok(None)
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
// if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta {
|
|
||||||
// compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
|
||||||
// }
|
|
||||||
|
|
||||||
if let Some(prefix_delta) = prefix_delta {
|
|
||||||
compute_prefix_database(index, wtxn, prefix_delta)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Result::Ok(())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// required to into_inner the new_fields_ids_map
|
|
||||||
drop(fields_ids_map_store);
|
|
||||||
|
|
||||||
let fields_ids_map = new_fields_ids_map.into_inner().unwrap();
|
|
||||||
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
|
|
||||||
|
|
||||||
if let Some(new_primary_key) = new_primary_key {
|
|
||||||
index.put_primary_key(wtxn, new_primary_key.name())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// used to update the localized and weighted maps while sharing the update code with the settings pipeline.
|
|
||||||
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?;
|
|
||||||
inner_index_settings.recompute_facets(wtxn, index)?;
|
|
||||||
inner_index_settings.recompute_searchables(wtxn, index)?;
|
|
||||||
|
|
||||||
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")]
|
||||||
fn compute_prefix_database(
|
fn compute_facet_search_database(
|
||||||
index: &Index,
|
index: &Index,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
prefix_delta: PrefixDelta,
|
global_fields_ids_map: GlobalFieldsIdsMap,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
eprintln!("prefix_delta: {:?}", &prefix_delta);
|
let rtxn = index.read_txn()?;
|
||||||
let PrefixDelta { modified, deleted } = prefix_delta;
|
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
|
||||||
// Compute word prefix docids
|
let mut facet_search_builder = FacetSearchBuilder::new(
|
||||||
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?;
|
global_fields_ids_map,
|
||||||
// Compute exact word prefix docids
|
localized_attributes_rules.unwrap_or_default(),
|
||||||
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted)?;
|
);
|
||||||
// Compute word prefix fid docids
|
|
||||||
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?;
|
let previous_facet_id_string_docids = index
|
||||||
// Compute word prefix position docids
|
.facet_id_string_docids
|
||||||
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)
|
.iter(&rtxn)?
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||||
|
let current_facet_id_string_docids = index
|
||||||
|
.facet_id_string_docids
|
||||||
|
.iter(wtxn)?
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||||
|
for eob in merge_join_by(
|
||||||
|
previous_facet_id_string_docids,
|
||||||
|
current_facet_id_string_docids,
|
||||||
|
|lhs, rhs| match (lhs, rhs) {
|
||||||
|
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
||||||
|
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
match eob {
|
||||||
|
EitherOrBoth::Both(lhs, rhs) => {
|
||||||
|
let (_, _) = lhs?;
|
||||||
|
let (_, _) = rhs?;
|
||||||
|
}
|
||||||
|
EitherOrBoth::Left(result) => {
|
||||||
|
let (key, _) = result?;
|
||||||
|
facet_search_builder
|
||||||
|
.register_from_key(DelAdd::Deletion, key.left_bound.as_ref())?;
|
||||||
|
}
|
||||||
|
EitherOrBoth::Right(result) => {
|
||||||
|
let (key, _) = result?;
|
||||||
|
facet_search_builder
|
||||||
|
.register_from_key(DelAdd::Addition, key.left_bound.as_ref())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
facet_search_builder.merge_and_write(index, wtxn, &rtxn)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
|
||||||
|
@ -11,9 +11,7 @@ use super::channel::*;
|
|||||||
use super::extract::{
|
use super::extract::{
|
||||||
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
|
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
|
||||||
};
|
};
|
||||||
use super::facet_search_builder::FacetSearchBuilder;
|
|
||||||
use super::DocumentChange;
|
use super::DocumentChange;
|
||||||
use crate::update::del_add::DelAdd;
|
|
||||||
use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result};
|
use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result};
|
||||||
|
|
||||||
pub struct GeoExtractor {
|
pub struct GeoExtractor {
|
||||||
@ -93,37 +91,29 @@ pub fn merge_and_send_docids<'extractor>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
||||||
pub fn merge_and_send_facet_docids<'indexer, 'extractor>(
|
pub fn merge_and_send_facet_docids<'extractor>(
|
||||||
global_fields_ids_map: GlobalFieldsIdsMap<'indexer>,
|
|
||||||
mut caches: Vec<BalancedCaches<'extractor>>,
|
mut caches: Vec<BalancedCaches<'extractor>>,
|
||||||
database: FacetDatabases,
|
database: FacetDatabases,
|
||||||
index: &Index,
|
index: &Index,
|
||||||
docids_sender: impl DocidsSender + Sync,
|
docids_sender: impl DocidsSender + Sync,
|
||||||
) -> Result<(FacetFieldIdsDelta, FacetSearchBuilder<'indexer>)> {
|
) -> Result<FacetFieldIdsDelta> {
|
||||||
transpose_and_freeze_caches(&mut caches)?
|
transpose_and_freeze_caches(&mut caches)?
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|frozen| {
|
.map(|frozen| {
|
||||||
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
|
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
|
||||||
let rtxn = index.read_txn()?;
|
let rtxn = index.read_txn()?;
|
||||||
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
|
|
||||||
let mut facet_search_builder = FacetSearchBuilder::new(
|
|
||||||
global_fields_ids_map.clone(),
|
|
||||||
localized_attributes_rules.unwrap_or_default(),
|
|
||||||
);
|
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
||||||
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
|
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
|
||||||
match merge_cbo_bitmaps(current, del, add)? {
|
match merge_cbo_bitmaps(current, del, add)? {
|
||||||
Operation::Write(bitmap) => {
|
Operation::Write(bitmap) => {
|
||||||
facet_field_ids_delta.register_from_key(key);
|
facet_field_ids_delta.register_from_key(key);
|
||||||
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
|
|
||||||
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
|
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
|
||||||
docids_sender.write(key, value).unwrap();
|
docids_sender.write(key, value).unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Operation::Delete => {
|
Operation::Delete => {
|
||||||
facet_field_ids_delta.register_from_key(key);
|
facet_field_ids_delta.register_from_key(key);
|
||||||
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
|
|
||||||
docids_sender.delete(key).unwrap();
|
docids_sender.delete(key).unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -131,18 +121,9 @@ pub fn merge_and_send_facet_docids<'indexer, 'extractor>(
|
|||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok((facet_field_ids_delta, facet_search_builder))
|
Ok(facet_field_ids_delta)
|
||||||
})
|
})
|
||||||
.reduce(
|
.reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?)))
|
||||||
|| Ok((FacetFieldIdsDelta::default(), todo!())),
|
|
||||||
|lhs, rhs| {
|
|
||||||
let (lhs_ffid, lhs_fsb) = lhs?;
|
|
||||||
let (rhs_ffid, rhs_fsb) = rhs?;
|
|
||||||
let ffid_merged = lhs_ffid.merge(rhs_ffid);
|
|
||||||
let fsb_merged = todo!();
|
|
||||||
Ok((ffid_merged, fsb_merged))
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FacetDatabases<'a> {
|
pub struct FacetDatabases<'a> {
|
||||||
|
Loading…
Reference in New Issue
Block a user