fix PR comments

This commit is contained in:
ManyTheFish 2024-02-13 15:14:03 +01:00
parent e5e811e2c9
commit 48026aa75c
6 changed files with 117 additions and 124 deletions

View File

@ -173,98 +173,110 @@ impl<'i> FacetsUpdate<'i> {
incremental_update.execute(wtxn)?; incremental_update.execute(wtxn)?;
} }
if let Some(normalized_delta_data) = self.normalized_delta_data { match self.normalized_delta_data {
let mut iter = normalized_delta_data.into_stream_merger_iter()?; Some(data) => index_facet_search(wtxn, data, self.index),
while let Some((key_bytes, delta_bytes)) = iter.next()? { None => Ok(()),
let deladd_reader = KvReaderDelAdd::new(delta_bytes);
let database_set = self
.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.get(wtxn, key_bytes)?
.unwrap_or_default();
let add_set = deladd_reader
.get(DelAdd::Addition)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
.unwrap_or_default();
let del_set = match deladd_reader
.get(DelAdd::Deletion)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
{
Some(del_set) => {
let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
let mut set = BTreeSet::new();
for facet in del_set {
let key =
FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() };
// Check if the referenced value doesn't exist anymore before deleting it.
if self.index.facet_id_string_docids.get(wtxn, &key)?.remap_data::<DecodeIgnore>().is_none() {
set.insert(facet);
}
}
set
}
None => BTreeSet::new(),
};
let set: BTreeSet<_> =
database_set.difference(&del_set).chain(add_set.iter()).cloned().collect();
if set.is_empty() {
self.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.delete(wtxn, key_bytes)?;
} else {
self.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.put(wtxn, key_bytes, &set)?;
}
}
// We clear the FST of normalized-for-search to compute everything from scratch.
self.index.facet_id_string_fst.clear(wtxn)?;
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database =
self.index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};
if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
}
if let Some((field_id, fst_builder)) = current_fst {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
}
// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}
} }
Ok(())
} }
} }
fn index_facet_search(
wtxn: &mut heed::RwTxn,
normalized_delta_data: Merger<BufReader<File>, MergeFn>,
index: &Index,
) -> Result<()> {
let mut iter = normalized_delta_data.into_stream_merger_iter()?;
while let Some((key_bytes, delta_bytes)) = iter.next()? {
let deladd_reader = KvReaderDelAdd::new(delta_bytes);
let database_set = index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.get(wtxn, key_bytes)?
.unwrap_or_default();
let add_set = deladd_reader
.get(DelAdd::Addition)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
.unwrap_or_default();
let del_set = match deladd_reader
.get(DelAdd::Deletion)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
{
Some(del_set) => {
let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
let mut set = BTreeSet::new();
for facet in del_set {
let key = FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() };
// Check if the referenced value doesn't exist anymore before deleting it.
if index
.facet_id_string_docids
.remap_data_type::<DecodeIgnore>()
.get(wtxn, &key)?
.is_none()
{
set.insert(facet);
}
}
set
}
None => BTreeSet::new(),
};
let set: BTreeSet<_> =
database_set.difference(&del_set).chain(add_set.iter()).cloned().collect();
if set.is_empty() {
index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.delete(wtxn, key_bytes)?;
} else {
index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.put(wtxn, key_bytes, &set)?;
}
}
// We clear the FST of normalized-for-search to compute everything from scratch.
index.facet_id_string_fst.clear(wtxn)?;
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database = index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};
if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
}
if let Some((field_id, fst_builder)) = current_fst {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
}
// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
pub(crate) mod test_helpers { pub(crate) mod test_helpers {
use std::cell::Cell; use std::cell::Cell;

View File

@ -223,27 +223,6 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>(
)?) )?)
} }
pub fn merge_btreeset_string<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 {
Ok(values[0].clone())
} else {
// TODO improve the perf by using a `#[borrow] Cow<str>`.
let strings: BTreeSet<String> = values
.iter()
.map(AsRef::as_ref)
.map(serde_json::from_slice::<BTreeSet<String>>)
.map(StdResult::unwrap)
.reduce(|mut current, new| {
for x in new {
current.insert(x);
}
current
})
.unwrap();
Ok(Cow::Owned(serde_json::to_vec(&strings).unwrap()))
}
}
/// Do a union of BtreeSet on both sides of a DelAdd obkv /// Do a union of BtreeSet on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions. /// separately and outputs a new DelAdd with both unions.
pub fn merge_deladd_btreeset_string<'a>( pub fn merge_deladd_btreeset_string<'a>(

View File

@ -13,10 +13,10 @@ pub use grenad_helpers::{
GrenadParameters, GrenadParameters,
}; };
pub use merge_functions::{ pub use merge_functions::{
keep_first, keep_latest_obkv, merge_btreeset_string, merge_cbo_roaring_bitmaps, keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_deladd_btreeset_string,
merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps, merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions,
obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions, MergeFn, obkvs_merge_additions_and_deletions, MergeFn,
}; };
use crate::MAX_WORD_LENGTH; use crate::MAX_WORD_LENGTH;

View File

@ -25,9 +25,9 @@ use self::enrich::enrich_documents_batch;
pub use self::enrich::{extract_finite_float_from_value, DocumentId}; pub use self::enrich::{extract_finite_float_from_value, DocumentId};
pub use self::helpers::{ pub use self::helpers::{
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps,
merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn, valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn,
}; };
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};

View File

@ -41,7 +41,7 @@ impl ChunkAccumulator {
pub fn pop_longest(&mut self) -> Option<Vec<TypedChunk>> { pub fn pop_longest(&mut self) -> Option<Vec<TypedChunk>> {
match self.inner.iter().max_by_key(|v| v.len()) { match self.inner.iter().max_by_key(|v| v.len()) {
Some(left) => { Some(left) => {
let position = self.inner.iter().position(|right| left == right); let position = self.inner.iter().position(|right| left.len() == right.len());
position.map(|p| self.inner.remove(p)).filter(|v| !v.is_empty()) position.map(|p| self.inner.remove(p)).filter(|v| !v.is_empty())
} }
None => None, None => None,
@ -49,7 +49,11 @@ impl ChunkAccumulator {
} }
pub fn insert(&mut self, chunk: TypedChunk) { pub fn insert(&mut self, chunk: TypedChunk) {
match self.inner.iter().position(|right| Some(&chunk) == right.first()) { match self
.inner
.iter()
.position(|right| right.first().map_or(false, |right| chunk.is_batchable_with(right)))
{
Some(position) => { Some(position) => {
let v = self.inner.get_mut(position).unwrap(); let v = self.inner.get_mut(position).unwrap();
v.push(chunk); v.push(chunk);
@ -87,8 +91,8 @@ pub(crate) enum TypedChunk {
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>), ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
} }
impl PartialEq for TypedChunk { impl TypedChunk {
fn eq(&self, other: &Self) -> bool { fn is_batchable_with(&self, other: &Self) -> bool {
use TypedChunk::*; use TypedChunk::*;
match (self, other) { match (self, other) {
(FieldIdDocidFacetStrings(_), FieldIdDocidFacetStrings(_)) (FieldIdDocidFacetStrings(_), FieldIdDocidFacetStrings(_))
@ -113,7 +117,6 @@ impl PartialEq for TypedChunk {
} }
} }
} }
impl Eq for TypedChunk {}
impl TypedChunk { impl TypedChunk {
pub fn to_debug_string(&self) -> String { pub fn to_debug_string(&self) -> String {

View File

@ -3,9 +3,8 @@ pub use self::clear_documents::ClearDocuments;
pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::{ pub use self::index_documents::{
merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, DocumentAdditionResult, DocumentId,
DocumentAdditionResult, DocumentId, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, MergeFn,
MergeFn,
}; };
pub use self::indexer_config::IndexerConfig; pub use self::indexer_config::IndexerConfig;
pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::settings::{validate_embedding_settings, Setting, Settings};