mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 10:07:40 +08:00
Implement facet search exctraction
This commit is contained in:
parent
86a0097311
commit
60cc09abec
@ -144,6 +144,8 @@ pub enum Database {
|
|||||||
FacetIdExistsDocids,
|
FacetIdExistsDocids,
|
||||||
FacetIdF64NumberDocids,
|
FacetIdF64NumberDocids,
|
||||||
FacetIdStringDocids,
|
FacetIdStringDocids,
|
||||||
|
FacetIdNormalizedStringStrings,
|
||||||
|
FacetIdStringFst,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
@ -163,6 +165,10 @@ impl Database {
|
|||||||
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
|
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
|
||||||
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
|
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
|
||||||
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
|
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
|
||||||
|
Database::FacetIdNormalizedStringStrings => {
|
||||||
|
index.facet_id_normalized_string_strings.remap_types()
|
||||||
|
}
|
||||||
|
Database::FacetIdStringFst => index.facet_id_string_fst.remap_types(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -240,6 +246,10 @@ impl MergerSender {
|
|||||||
DocumentsSender(self)
|
DocumentsSender(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn facet_searchable(&self) -> FacetSearchableSender<'_> {
|
||||||
|
FacetSearchableSender { sender: self }
|
||||||
|
}
|
||||||
|
|
||||||
pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> {
|
pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> {
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap(
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap(
|
||||||
DOCUMENTS_IDS_KEY.as_bytes(),
|
DOCUMENTS_IDS_KEY.as_bytes(),
|
||||||
@ -445,6 +455,50 @@ impl DocidsSender for FacetDocidsSender<'_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct FacetSearchableSender<'a> {
|
||||||
|
sender: &'a MergerSender,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FacetSearchableSender<'_> {
|
||||||
|
pub fn write_facet(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
|
||||||
|
match self
|
||||||
|
.sender
|
||||||
|
.send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry })
|
||||||
|
{
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete_facet(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
|
||||||
|
match self
|
||||||
|
.sender
|
||||||
|
.send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry })
|
||||||
|
{
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_fst(&self, key: &[u8], value: Mmap) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(key, value));
|
||||||
|
match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete_fst(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
|
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
|
||||||
|
match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(SendError(_)) => Err(SendError(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct DocumentsSender<'a>(&'a MergerSender);
|
pub struct DocumentsSender<'a>(&'a MergerSender);
|
||||||
|
|
||||||
impl DocumentsSender<'_> {
|
impl DocumentsSender<'_> {
|
||||||
|
275
milli/src/update/new/facet_search_builder.rs
Normal file
275
milli/src/update/new/facet_search_builder.rs
Normal file
@ -0,0 +1,275 @@
|
|||||||
|
use std::collections::{BTreeSet, HashMap};
|
||||||
|
|
||||||
|
use charabia::{normalizer::NormalizerOption, Language, Normalize, StrDetection, Token};
|
||||||
|
use grenad::Sorter;
|
||||||
|
use heed::{
|
||||||
|
types::{Bytes, SerdeJson},
|
||||||
|
BytesDecode, BytesEncode, RoTxn,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
heed_codec::{
|
||||||
|
facet::{FacetGroupKey, FacetGroupKeyCodec},
|
||||||
|
StrRefCodec,
|
||||||
|
},
|
||||||
|
update::{
|
||||||
|
create_sorter,
|
||||||
|
del_add::{DelAdd, KvWriterDelAdd},
|
||||||
|
MergeDeladdBtreesetString,
|
||||||
|
},
|
||||||
|
BEU16StrCodec, FieldId, GlobalFieldsIdsMap, Index, LocalizedAttributesRule, Result,
|
||||||
|
MAX_FACET_VALUE_LENGTH,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
channel::FacetSearchableSender, extract::FacetKind, fst_merger_builder::FstMergerBuilder,
|
||||||
|
KvReaderDelAdd,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct FacetSearchBuilder<'indexer> {
|
||||||
|
registered_facets: HashMap<FieldId, usize>,
|
||||||
|
normalized_facet_string_docids_sorter: Sorter<MergeDeladdBtreesetString>,
|
||||||
|
global_fields_ids_map: GlobalFieldsIdsMap<'indexer>,
|
||||||
|
localized_attributes_rules: Vec<LocalizedAttributesRule>,
|
||||||
|
// Buffered data below
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
localized_field_ids: HashMap<FieldId, Option<Vec<Language>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'indexer> FacetSearchBuilder<'indexer> {
|
||||||
|
pub fn new(
|
||||||
|
global_fields_ids_map: GlobalFieldsIdsMap<'indexer>,
|
||||||
|
localized_attributes_rules: Vec<LocalizedAttributesRule>,
|
||||||
|
) -> Self {
|
||||||
|
let registered_facets = HashMap::new();
|
||||||
|
let normalized_facet_string_docids_sorter = create_sorter(
|
||||||
|
grenad::SortAlgorithm::Stable,
|
||||||
|
MergeDeladdBtreesetString,
|
||||||
|
grenad::CompressionType::None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Some(0),
|
||||||
|
);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
registered_facets,
|
||||||
|
normalized_facet_string_docids_sorter,
|
||||||
|
buffer: Vec::new(),
|
||||||
|
global_fields_ids_map,
|
||||||
|
localized_attributes_rules,
|
||||||
|
localized_field_ids: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_key_data<'k>(&self, key: &'k [u8]) -> Result<Option<FacetGroupKey<&'k str>>> {
|
||||||
|
match FacetKind::from(key[0]) {
|
||||||
|
// Only strings are searchable
|
||||||
|
FacetKind::String => Ok(Some(
|
||||||
|
FacetGroupKeyCodec::<StrRefCodec>::bytes_decode(&key[1..])
|
||||||
|
.map_err(heed::Error::Encoding)?,
|
||||||
|
)),
|
||||||
|
_ => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_from_key(&mut self, deladd: DelAdd, facet_key: &[u8]) -> Result<()> {
|
||||||
|
let Some(FacetGroupKey { field_id, level: _level, left_bound }) =
|
||||||
|
self.extract_key_data(facet_key)?
|
||||||
|
else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
if deladd == DelAdd::Addition {
|
||||||
|
self.registered_facets.entry(field_id).and_modify(|count| *count += 1).or_insert(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let locales = self.locales(field_id);
|
||||||
|
let hyper_normalized_value = normalize_facet_string(left_bound, locales.as_deref());
|
||||||
|
|
||||||
|
let set = BTreeSet::from_iter(std::iter::once(left_bound));
|
||||||
|
|
||||||
|
// as the facet string is the same, we can put the deletion and addition in the same obkv.
|
||||||
|
self.buffer.clear();
|
||||||
|
let mut obkv = KvWriterDelAdd::new(&mut self.buffer);
|
||||||
|
let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?;
|
||||||
|
obkv.insert(deladd, val)?;
|
||||||
|
obkv.finish()?;
|
||||||
|
|
||||||
|
let key: (u16, &str) = (field_id, hyper_normalized_value.as_ref());
|
||||||
|
let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
||||||
|
self.normalized_facet_string_docids_sorter.insert(key_bytes, &self.buffer)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> {
|
||||||
|
if self.localized_field_ids.get(&field_id).is_none() {
|
||||||
|
let Some(field_name) = self.global_fields_ids_map.name(field_id) else {
|
||||||
|
unreachable!("Field id {} not found in the global fields ids map", field_id);
|
||||||
|
};
|
||||||
|
|
||||||
|
let locales = self
|
||||||
|
.localized_attributes_rules
|
||||||
|
.iter()
|
||||||
|
.find(|rule| rule.match_str(field_name))
|
||||||
|
.map(|rule| rule.locales.clone());
|
||||||
|
|
||||||
|
self.localized_field_ids.insert(field_id, locales);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.localized_field_ids.get(&field_id).unwrap().as_deref()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_fst")]
|
||||||
|
pub fn merge_and_send(
|
||||||
|
self,
|
||||||
|
index: &Index,
|
||||||
|
rtxn: &RoTxn<'_>,
|
||||||
|
sender: FacetSearchableSender,
|
||||||
|
) -> Result<()> {
|
||||||
|
let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?;
|
||||||
|
let mut builder = grenad::MergerBuilder::new(MergeDeladdBtreesetString);
|
||||||
|
builder.extend(reader);
|
||||||
|
|
||||||
|
let database = index.facet_id_normalized_string_strings.remap_types::<Bytes, Bytes>();
|
||||||
|
|
||||||
|
let mut merger_iter = builder.build().into_stream_merger_iter()?;
|
||||||
|
let mut current_field_id = None;
|
||||||
|
let mut fst;
|
||||||
|
let mut fst_merger_builder: Option<FstMergerBuilder> = None;
|
||||||
|
while let Some((key, deladd)) = merger_iter.next()? {
|
||||||
|
let (field_id, normalized_facet_string) =
|
||||||
|
BEU16StrCodec::bytes_decode(&key).map_err(heed::Error::Encoding)?;
|
||||||
|
|
||||||
|
if current_field_id != Some(field_id) {
|
||||||
|
if let Some(fst_merger_builder) = fst_merger_builder {
|
||||||
|
// send the previous fst to the channel
|
||||||
|
let mmap = fst_merger_builder.build(&mut callback)?;
|
||||||
|
sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("getting fst for field_id: {}", field_id);
|
||||||
|
fst = index.facet_id_string_fst.get(rtxn, &field_id)?;
|
||||||
|
fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?);
|
||||||
|
current_field_id = Some(field_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
let current = database.get(rtxn, key)?;
|
||||||
|
let deladd: &KvReaderDelAdd = deladd.into();
|
||||||
|
let del = deladd.get(DelAdd::Deletion);
|
||||||
|
let add = deladd.get(DelAdd::Addition);
|
||||||
|
|
||||||
|
match merge_btreesets(current, del, add)? {
|
||||||
|
Operation::Write(value) => {
|
||||||
|
match fst_merger_builder.as_mut() {
|
||||||
|
Some(fst_merger_builder) => {
|
||||||
|
fst_merger_builder.register(
|
||||||
|
DelAdd::Addition,
|
||||||
|
normalized_facet_string.as_bytes(),
|
||||||
|
&mut callback,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
None => unreachable!(),
|
||||||
|
}
|
||||||
|
let key = (field_id, normalized_facet_string);
|
||||||
|
let key_bytes =
|
||||||
|
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
||||||
|
sender.write_facet(&key_bytes, &value).unwrap();
|
||||||
|
}
|
||||||
|
Operation::Delete => {
|
||||||
|
match fst_merger_builder.as_mut() {
|
||||||
|
Some(fst_merger_builder) => {
|
||||||
|
fst_merger_builder.register(
|
||||||
|
DelAdd::Deletion,
|
||||||
|
normalized_facet_string.as_bytes(),
|
||||||
|
&mut callback,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
None => unreachable!(),
|
||||||
|
}
|
||||||
|
let key = (field_id, normalized_facet_string);
|
||||||
|
let key_bytes =
|
||||||
|
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
||||||
|
sender.delete_facet(&key_bytes).unwrap();
|
||||||
|
}
|
||||||
|
Operation::Ignore => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
|
||||||
|
let mmap = fst_merger_builder.build(&mut callback)?;
|
||||||
|
sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn callback(_bytes: &[u8], _deladd: DelAdd, _is_modified: bool) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn merge_btreesets<'a>(
|
||||||
|
current: Option<&[u8]>,
|
||||||
|
del: Option<&[u8]>,
|
||||||
|
add: Option<&[u8]>,
|
||||||
|
) -> Result<Operation> {
|
||||||
|
let mut result: BTreeSet<String> = match current {
|
||||||
|
Some(current) => SerdeJson::bytes_decode(current).map_err(heed::Error::Encoding)?,
|
||||||
|
None => BTreeSet::new(),
|
||||||
|
};
|
||||||
|
if let Some(del) = del {
|
||||||
|
let del: BTreeSet<String> = SerdeJson::bytes_decode(del).map_err(heed::Error::Encoding)?;
|
||||||
|
result = result.difference(&del).cloned().collect();
|
||||||
|
}
|
||||||
|
if let Some(add) = add {
|
||||||
|
let add: BTreeSet<String> = SerdeJson::bytes_decode(add).map_err(heed::Error::Encoding)?;
|
||||||
|
result.extend(add);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TODO remove allocation
|
||||||
|
let result = SerdeJson::bytes_encode(&result).map_err(heed::Error::Encoding)?.into_owned();
|
||||||
|
if Some(result.as_ref()) == current {
|
||||||
|
Ok(Operation::Ignore)
|
||||||
|
} else if result.is_empty() {
|
||||||
|
Ok(Operation::Delete)
|
||||||
|
} else {
|
||||||
|
Ok(Operation::Write(result))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Normalizes the facet string and truncates it to the max length.
|
||||||
|
fn normalize_facet_string(facet_string: &str, locales: Option<&[Language]>) -> String {
|
||||||
|
let options: NormalizerOption = NormalizerOption { lossy: true, ..Default::default() };
|
||||||
|
let mut detection = StrDetection::new(facet_string, locales);
|
||||||
|
|
||||||
|
let script = detection.script();
|
||||||
|
// Detect the language of the facet string only if several locales are explicitly provided.
|
||||||
|
let language = match locales {
|
||||||
|
Some(&[language]) => Some(language),
|
||||||
|
Some(multiple_locales) if multiple_locales.len() > 1 => detection.language(),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let token = Token {
|
||||||
|
lemma: std::borrow::Cow::Borrowed(facet_string),
|
||||||
|
script,
|
||||||
|
language,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// truncate the facet string to the max length
|
||||||
|
token
|
||||||
|
.normalize(&options)
|
||||||
|
.lemma
|
||||||
|
.char_indices()
|
||||||
|
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
|
||||||
|
.map(|(_, c)| c)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Operation {
|
||||||
|
Write(Vec<u8>),
|
||||||
|
Delete,
|
||||||
|
Ignore,
|
||||||
|
}
|
155
milli/src/update/new/fst_merger_builder.rs
Normal file
155
milli/src/update/new/fst_merger_builder.rs
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
use std::{fs::File, io::BufWriter};
|
||||||
|
|
||||||
|
use fst::{Set, SetBuilder, Streamer};
|
||||||
|
use memmap2::Mmap;
|
||||||
|
use tempfile::tempfile;
|
||||||
|
|
||||||
|
use crate::{update::del_add::DelAdd, InternalError, Result};
|
||||||
|
|
||||||
|
pub struct FstMergerBuilder<'a> {
|
||||||
|
stream: Option<fst::set::Stream<'a>>,
|
||||||
|
fst_builder: SetBuilder<BufWriter<File>>,
|
||||||
|
last: Option<Vec<u8>>,
|
||||||
|
inserted_words: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> FstMergerBuilder<'a> {
|
||||||
|
pub fn new<D: AsRef<[u8]>>(fst: Option<&'a Set<D>>) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
stream: fst.map(|fst| fst.stream()),
|
||||||
|
fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?,
|
||||||
|
last: None,
|
||||||
|
inserted_words: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(
|
||||||
|
&mut self,
|
||||||
|
deladd: DelAdd,
|
||||||
|
right: &[u8],
|
||||||
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
|
) -> Result<()> {
|
||||||
|
if let Some(left) = self.last.take() {
|
||||||
|
let (left_inserted, right_inserted) =
|
||||||
|
self.compare_and_insert(deladd, left.as_slice(), right, insertion_callback)?;
|
||||||
|
|
||||||
|
// left was not inserted, so we keep it for the next iteration
|
||||||
|
if !left_inserted {
|
||||||
|
self.last = Some(left);
|
||||||
|
}
|
||||||
|
|
||||||
|
// right was inserted, so we can stop
|
||||||
|
if right_inserted {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(mut stream) = self.stream.take() {
|
||||||
|
while let Some(left) = stream.next() {
|
||||||
|
let (left_inserted, right_inserted) =
|
||||||
|
self.compare_and_insert(deladd, left, right, insertion_callback)?;
|
||||||
|
|
||||||
|
// left was not inserted, so we keep it for the next iteration
|
||||||
|
if !left_inserted {
|
||||||
|
self.last = Some(left.to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
// right was inserted, so we can stop
|
||||||
|
if right_inserted {
|
||||||
|
self.stream = Some(stream);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we reach this point, it means that the stream is empty
|
||||||
|
// and we need to insert the incoming word
|
||||||
|
self.insert(right, deladd, true, insertion_callback)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compare_and_insert(
|
||||||
|
&mut self,
|
||||||
|
deladd: DelAdd,
|
||||||
|
left: &[u8],
|
||||||
|
right: &[u8],
|
||||||
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
|
) -> Result<(bool, bool)> {
|
||||||
|
let mut left_inserted = false;
|
||||||
|
let mut right_inserted = false;
|
||||||
|
match left.cmp(right) {
|
||||||
|
std::cmp::Ordering::Less => {
|
||||||
|
// We need to insert the last word from the current fst
|
||||||
|
self.insert(left, DelAdd::Addition, false, insertion_callback)?;
|
||||||
|
|
||||||
|
left_inserted = true;
|
||||||
|
}
|
||||||
|
std::cmp::Ordering::Equal => {
|
||||||
|
self.insert(right, deladd, true, insertion_callback)?;
|
||||||
|
|
||||||
|
left_inserted = true;
|
||||||
|
right_inserted = true;
|
||||||
|
}
|
||||||
|
std::cmp::Ordering::Greater => {
|
||||||
|
self.insert(right, deladd, true, insertion_callback)?;
|
||||||
|
|
||||||
|
right_inserted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((left_inserted, right_inserted))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(
|
||||||
|
&mut self,
|
||||||
|
bytes: &[u8],
|
||||||
|
deladd: DelAdd,
|
||||||
|
is_modified: bool,
|
||||||
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
|
) -> Result<()> {
|
||||||
|
// Addition: We insert the word
|
||||||
|
// Deletion: We delete the word by not inserting it
|
||||||
|
if deladd == DelAdd::Addition {
|
||||||
|
self.inserted_words += 1;
|
||||||
|
self.fst_builder.insert(bytes)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
insertion_callback(bytes, deladd, is_modified)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drain_stream(
|
||||||
|
&mut self,
|
||||||
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
|
) -> Result<()> {
|
||||||
|
if let Some(last) = self.last.take() {
|
||||||
|
self.insert(last.as_slice(), DelAdd::Addition, false, insertion_callback)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(mut stream) = self.stream.take() {
|
||||||
|
while let Some(current) = stream.next() {
|
||||||
|
self.insert(current, DelAdd::Addition, false, insertion_callback)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build(
|
||||||
|
mut self,
|
||||||
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
|
) -> Result<Mmap> {
|
||||||
|
self.drain_stream(insertion_callback)?;
|
||||||
|
|
||||||
|
let fst_file = self
|
||||||
|
.fst_builder
|
||||||
|
.into_inner()?
|
||||||
|
.into_inner()
|
||||||
|
.map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?;
|
||||||
|
let fst_mmap = unsafe { Mmap::map(&fst_file)? };
|
||||||
|
|
||||||
|
Ok(fst_mmap)
|
||||||
|
}
|
||||||
|
}
|
@ -10,13 +10,17 @@ use roaring::RoaringBitmap;
|
|||||||
|
|
||||||
use super::channel::*;
|
use super::channel::*;
|
||||||
use super::extract::FacetKind;
|
use super::extract::FacetKind;
|
||||||
|
use super::facet_search_builder::FacetSearchBuilder;
|
||||||
use super::word_fst_builder::{PrefixData, PrefixDelta};
|
use super::word_fst_builder::{PrefixData, PrefixDelta};
|
||||||
use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId};
|
use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId};
|
||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
use crate::update::new::channel::MergerOperation;
|
use crate::update::new::channel::MergerOperation;
|
||||||
use crate::update::new::word_fst_builder::WordFstBuilder;
|
use crate::update::new::word_fst_builder::WordFstBuilder;
|
||||||
use crate::update::MergeDeladdCboRoaringBitmaps;
|
use crate::update::MergeDeladdCboRoaringBitmaps;
|
||||||
use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result};
|
use crate::{
|
||||||
|
localized_attributes_rules, CboRoaringBitmapCodec, Error, FieldId, GeoPoint,
|
||||||
|
GlobalFieldsIdsMap, Index, Result,
|
||||||
|
};
|
||||||
|
|
||||||
/// TODO We must return some infos/stats
|
/// TODO We must return some infos/stats
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
|
||||||
@ -170,6 +174,12 @@ pub fn merge_grenad_entries(
|
|||||||
tracing::trace_span!(target: "indexing::documents::merge", "facet_docids");
|
tracing::trace_span!(target: "indexing::documents::merge", "facet_docids");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
let mut facet_field_ids_delta = FacetFieldIdsDelta::new();
|
let mut facet_field_ids_delta = FacetFieldIdsDelta::new();
|
||||||
|
let localized_attributes_rules =
|
||||||
|
index.localized_attributes_rules(rtxn)?.unwrap_or_default();
|
||||||
|
let mut facet_search_builder = FacetSearchBuilder::new(
|
||||||
|
global_fields_ids_map.clone(),
|
||||||
|
localized_attributes_rules,
|
||||||
|
);
|
||||||
merge_and_send_facet_docids(
|
merge_and_send_facet_docids(
|
||||||
merger,
|
merger,
|
||||||
FacetDatabases::new(index),
|
FacetDatabases::new(index),
|
||||||
@ -177,9 +187,12 @@ pub fn merge_grenad_entries(
|
|||||||
&mut buffer,
|
&mut buffer,
|
||||||
sender.facet_docids(),
|
sender.facet_docids(),
|
||||||
&mut facet_field_ids_delta,
|
&mut facet_field_ids_delta,
|
||||||
|
&mut facet_search_builder,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
merger_result.facet_field_ids_delta = Some(facet_field_ids_delta);
|
merger_result.facet_field_ids_delta = Some(facet_field_ids_delta);
|
||||||
|
// merge and send the facet fst and the searchable facet values
|
||||||
|
facet_search_builder.merge_and_send(index, rtxn, sender.facet_searchable())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -294,6 +307,7 @@ fn merge_and_send_facet_docids(
|
|||||||
buffer: &mut Vec<u8>,
|
buffer: &mut Vec<u8>,
|
||||||
docids_sender: impl DocidsSender,
|
docids_sender: impl DocidsSender,
|
||||||
facet_field_ids_delta: &mut FacetFieldIdsDelta,
|
facet_field_ids_delta: &mut FacetFieldIdsDelta,
|
||||||
|
facet_search_builder: &mut FacetSearchBuilder,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
|
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
|
||||||
while let Some((key, deladd)) = merger_iter.next().unwrap() {
|
while let Some((key, deladd)) = merger_iter.next().unwrap() {
|
||||||
@ -305,11 +319,13 @@ fn merge_and_send_facet_docids(
|
|||||||
match merge_cbo_bitmaps(current, del, add)? {
|
match merge_cbo_bitmaps(current, del, add)? {
|
||||||
Operation::Write(bitmap) => {
|
Operation::Write(bitmap) => {
|
||||||
facet_field_ids_delta.register_from_key(key);
|
facet_field_ids_delta.register_from_key(key);
|
||||||
|
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
|
||||||
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
|
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
|
||||||
docids_sender.write(key, value).unwrap();
|
docids_sender.write(key, value).unwrap();
|
||||||
}
|
}
|
||||||
Operation::Delete => {
|
Operation::Delete => {
|
||||||
facet_field_ids_delta.register_from_key(key);
|
facet_field_ids_delta.register_from_key(key);
|
||||||
|
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
|
||||||
docids_sender.delete(key).unwrap();
|
docids_sender.delete(key).unwrap();
|
||||||
}
|
}
|
||||||
Operation::Ignore => (),
|
Operation::Ignore => (),
|
||||||
|
@ -8,6 +8,8 @@ mod channel;
|
|||||||
pub mod document;
|
pub mod document;
|
||||||
mod document_change;
|
mod document_change;
|
||||||
mod extract;
|
mod extract;
|
||||||
|
mod facet_search_builder;
|
||||||
|
mod fst_merger_builder;
|
||||||
pub mod indexer;
|
pub mod indexer;
|
||||||
mod merger;
|
mod merger;
|
||||||
mod parallel_iterator_ext;
|
mod parallel_iterator_ext;
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::{fs::File, io::BufWriter};
|
use std::io::BufWriter;
|
||||||
|
|
||||||
use fst::{Set, SetBuilder, Streamer};
|
use fst::{Set, SetBuilder, Streamer};
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
@ -7,23 +7,19 @@ use tempfile::tempfile;
|
|||||||
|
|
||||||
use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result};
|
use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result};
|
||||||
|
|
||||||
|
use super::fst_merger_builder::FstMergerBuilder;
|
||||||
|
|
||||||
pub struct WordFstBuilder<'a> {
|
pub struct WordFstBuilder<'a> {
|
||||||
stream: Option<fst::set::Stream<'a>>,
|
word_fst_builder: FstMergerBuilder<'a>,
|
||||||
word_fst_builder: SetBuilder<BufWriter<File>>,
|
|
||||||
last_word: Option<Vec<u8>>,
|
|
||||||
prefix_fst_builder: Option<PrefixFstBuilder>,
|
prefix_fst_builder: Option<PrefixFstBuilder>,
|
||||||
inserted_words: usize,
|
|
||||||
registered_words: usize,
|
registered_words: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> WordFstBuilder<'a> {
|
impl<'a> WordFstBuilder<'a> {
|
||||||
pub fn new(words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>) -> Result<Self> {
|
pub fn new(words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>) -> Result<Self> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
stream: Some(words_fst.stream()),
|
word_fst_builder: FstMergerBuilder::new(Some(words_fst))?,
|
||||||
word_fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?,
|
|
||||||
prefix_fst_builder: None,
|
prefix_fst_builder: None,
|
||||||
last_word: None,
|
|
||||||
inserted_words: 0,
|
|
||||||
registered_words: 0,
|
registered_words: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -38,100 +34,13 @@ impl<'a> WordFstBuilder<'a> {
|
|||||||
self.registered_words += 1;
|
self.registered_words += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(left) = self.last_word.take() {
|
self.word_fst_builder.register(deladd, right, &mut |bytes, deladd, is_modified| {
|
||||||
let (left_inserted, right_inserted) =
|
if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder {
|
||||||
self.compare_and_insert(deladd, left.as_slice(), right)?;
|
prefix_fst_builder.insert_word(bytes, deladd, is_modified)
|
||||||
|
} else {
|
||||||
// left was not inserted, so we keep it for the next iteration
|
Ok(())
|
||||||
if !left_inserted {
|
|
||||||
self.last_word = Some(left);
|
|
||||||
}
|
}
|
||||||
|
})?;
|
||||||
// right was inserted, so we can stop
|
|
||||||
if right_inserted {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(mut stream) = self.stream.take() {
|
|
||||||
while let Some(left) = stream.next() {
|
|
||||||
let (left_inserted, right_inserted) =
|
|
||||||
self.compare_and_insert(deladd, left, right)?;
|
|
||||||
|
|
||||||
// left was not inserted, so we keep it for the next iteration
|
|
||||||
if !left_inserted {
|
|
||||||
self.last_word = Some(left.to_vec());
|
|
||||||
}
|
|
||||||
|
|
||||||
// right was inserted, so we can stop
|
|
||||||
if right_inserted {
|
|
||||||
self.stream = Some(stream);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we reach this point, it means that the stream is empty
|
|
||||||
// and we need to insert the incoming word
|
|
||||||
self.insert_word(right, deladd, true)?;
|
|
||||||
|
|
||||||
self.stream = Some(stream);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn compare_and_insert(
|
|
||||||
&mut self,
|
|
||||||
deladd: DelAdd,
|
|
||||||
left: &[u8],
|
|
||||||
right: &[u8],
|
|
||||||
) -> Result<(bool, bool)> {
|
|
||||||
let mut left_inserted = false;
|
|
||||||
let mut right_inserted = false;
|
|
||||||
match left.cmp(right) {
|
|
||||||
std::cmp::Ordering::Less => {
|
|
||||||
// We need to insert the last word from the current fst
|
|
||||||
self.insert_word(left, DelAdd::Addition, false)?;
|
|
||||||
|
|
||||||
left_inserted = true;
|
|
||||||
}
|
|
||||||
std::cmp::Ordering::Equal => {
|
|
||||||
self.insert_word(right, deladd, true)?;
|
|
||||||
|
|
||||||
left_inserted = true;
|
|
||||||
right_inserted = true;
|
|
||||||
}
|
|
||||||
std::cmp::Ordering::Greater => {
|
|
||||||
self.insert_word(right, deladd, true)?;
|
|
||||||
|
|
||||||
right_inserted = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((left_inserted, right_inserted))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> {
|
|
||||||
// Addition: We insert the word
|
|
||||||
// Deletion: We delete the word by not inserting it
|
|
||||||
if deladd == DelAdd::Addition {
|
|
||||||
self.inserted_words += 1;
|
|
||||||
self.word_fst_builder.insert(bytes)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(prefix_fst_builder) = self.prefix_fst_builder.as_mut() {
|
|
||||||
prefix_fst_builder.insert_word(bytes, deladd, is_modified)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn drain_stream(&mut self) -> Result<()> {
|
|
||||||
if let Some(mut stream) = self.stream.take() {
|
|
||||||
while let Some(current) = stream.next() {
|
|
||||||
self.insert_word(current, DelAdd::Addition, false)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -141,13 +50,13 @@ impl<'a> WordFstBuilder<'a> {
|
|||||||
index: &crate::Index,
|
index: &crate::Index,
|
||||||
rtxn: &heed::RoTxn,
|
rtxn: &heed::RoTxn,
|
||||||
) -> Result<(Mmap, Option<PrefixData>)> {
|
) -> Result<(Mmap, Option<PrefixData>)> {
|
||||||
self.drain_stream()?;
|
let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| {
|
||||||
|
if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder {
|
||||||
let words_fst_file =
|
prefix_fst_builder.insert_word(bytes, deladd, is_modified)
|
||||||
self.word_fst_builder.into_inner()?.into_inner().map_err(|_| {
|
} else {
|
||||||
InternalError::IndexingMergingKeys { process: "building-words-fst" }
|
Ok(())
|
||||||
})?;
|
}
|
||||||
let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? };
|
})?;
|
||||||
|
|
||||||
let prefix_data = self
|
let prefix_data = self
|
||||||
.prefix_fst_builder
|
.prefix_fst_builder
|
||||||
|
Loading…
Reference in New Issue
Block a user