From c829feb40ba1e756ee0c6883194bbda1eaae9b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 17 Oct 2023 18:09:41 +0200 Subject: [PATCH 1/6] Work on fid docid facet values rewrite --- milli/src/update/facet/bulk.rs | 2 + milli/src/update/facet/mod.rs | 1 + .../extract/extract_facet_number_docids.rs | 4 + .../extract/extract_facet_string_docids.rs | 4 + .../extract/extract_fid_docid_facet_values.rs | 276 +++++++++++++++--- 5 files changed, 249 insertions(+), 38 deletions(-) diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index 30f15ebab..825217b00 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -132,6 +132,8 @@ impl FacetsUpdateBulkInner { self.db.delete_range(wtxn, &range).map(drop)?; Ok(()) } + + // TODO the new_data is an Reader>> fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> { let new_data = match self.new_data.take() { Some(x) => x, diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index 15776a709..6631f431b 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -114,6 +114,7 @@ pub struct FacetsUpdate<'i> { min_level_size: u8, } impl<'i> FacetsUpdate<'i> { + // TODO grenad::Reader> pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader) -> Self { let database = match facet_type { FacetType::String => index diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index dec02b120..092de6468 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -17,6 +17,7 @@ use crate::Result; /// documents ids from the given chunk of docid facet number positions. #[logging_timer::time] pub fn extract_facet_number_docids( + // TODO Reader> docid_fid_facet_number: grenad::Reader, indexer: GrenadParameters, ) -> Result> { @@ -26,6 +27,7 @@ pub fn extract_facet_number_docids( let mut facet_number_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, + // TODO We must modify the merger to do unions of Del and Add separately merge_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -34,12 +36,14 @@ pub fn extract_facet_number_docids( ); let mut cursor = docid_fid_facet_number.into_cursor()?; + // TODO the value is a Obkv and must be taken into account while let Some((key_bytes, _)) = cursor.move_on_next()? { let (field_id, document_id, number) = FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap(); let key = FacetGroupKey { field_id, level: 0, left_bound: number }; let key_bytes = FacetGroupKeyCodec::::bytes_encode(&key).unwrap(); + // TODO We must put a Obkv facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?; } diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 0035f54e1..f2fb5c2fc 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -15,6 +15,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; /// documents ids from the given chunk of docid facet string positions. #[logging_timer::time] pub fn extract_facet_string_docids( + // TODO Reader> docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, ) -> Result> { @@ -24,6 +25,7 @@ pub fn extract_facet_string_docids( let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, + // TODO We must modify the merger to do unions of Del and Add separately merge_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -33,6 +35,7 @@ pub fn extract_facet_string_docids( let mut cursor = docid_fid_facet_string.into_cursor()?; while let Some((key, _original_value_bytes)) = cursor.move_on_next()? { + // TODO the value is a Obkv and must be taken into account let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); let field_id = FieldId::from_be_bytes(field_id_bytes); @@ -54,6 +57,7 @@ pub fn extract_facet_string_docids( let key = FacetGroupKey { field_id, level: 0, left_bound: normalised_value }; let key_bytes = FacetGroupKeyCodec::::bytes_encode(&key).unwrap(); // document id is encoded in native-endian because of the CBO roaring bitmap codec + // TODO Reader> facet_string_docids_sorter.insert(&key_bytes, document_id.to_ne_bytes())?; } diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 5496a071b..5f5217515 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -6,17 +6,21 @@ use std::mem::size_of; use heed::zerocopy::AsBytes; use heed::BytesEncode; +use itertools::EitherOrBoth; +use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use serde_json::{from_slice, Value}; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; +use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{create_writer, writer_into_reader}; use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH}; /// The extracted facet values stored in grenad files by type. pub struct ExtractedFacetValues { + // TOOD rename into `fid_docid_*` pub docid_fid_facet_numbers_chunk: grenad::Reader, pub docid_fid_facet_strings_chunk: grenad::Reader, pub fid_facet_is_null_docids_chunk: grenad::Reader, @@ -31,6 +35,7 @@ pub struct ExtractedFacetValues { /// We need the fid of the geofields to correctly parse them as numbers if they were sent as strings initially. #[logging_timer::time] pub fn extract_fid_docid_facet_values( + // TODO Reader>> obkv_documents: grenad::Reader, indexer: GrenadParameters, faceted_fields: &HashSet, @@ -58,13 +63,15 @@ pub fn extract_fid_docid_facet_values( max_memory.map(|m| m / 2), ); - let mut facet_exists_docids = BTreeMap::::new(); - let mut facet_is_null_docids = BTreeMap::::new(); - let mut facet_is_empty_docids = BTreeMap::::new(); + // The tuples represents the Del and Add side for a bitmap + let mut facet_exists_docids = BTreeMap::::new(); + let mut facet_is_null_docids = BTreeMap::::new(); + let mut facet_is_empty_docids = BTreeMap::::new(); let mut key_buffer = Vec::new(); let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { + // TODO Obkv> let obkv = obkv::KvReader::new(value); for (field_id, field_bytes) in obkv.iter() { @@ -79,50 +86,233 @@ pub fn extract_fid_docid_facet_values( let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap(); let document = BEU32::from(document).get(); - facet_exists_docids.entry(field_id).or_default().insert(document); - // For the other extraction tasks, prefix the key with the field_id and the document_id key_buffer.extend_from_slice(docid_bytes); - let value = from_slice(field_bytes).map_err(InternalError::SerdeJson)?; + let del_add_obkv = obkv::KvReader::new(field_bytes); + let del_value = match del_add_obkv.get(DelAdd::Deletion) { + Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?, + None => None, + }; + let add_value = match del_add_obkv.get(DelAdd::Addition) { + Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?, + None => None, + }; - match extract_facet_values( - &value, - geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng), - ) { - FilterableValues::Null => { - facet_is_null_docids.entry(field_id).or_default().insert(document); - } - FilterableValues::Empty => { - facet_is_empty_docids.entry(field_id).or_default().insert(document); - } - FilterableValues::Values { numbers, strings } => { - // insert facet numbers in sorter - for number in numbers { - key_buffer.truncate(size_of::() + size_of::()); - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); + // We insert the document id on the Del and the Add side if the field exists. + let (mut del_exists, mut add_exists) = + facet_exists_docids.entry(field_id).or_default(); + if del_value.is_some() { + del_exists.insert(document); + } + if add_value.is_some() { + add_exists.insert(document); + } - fid_docid_facet_numbers_sorter - .insert(&key_buffer, ().as_bytes())?; + // TODO extract both Del and Add numbers an strings (dedup) + // TODO use the `itertools::merge_join_by` method to sort and diff both sides (Del and Add) + // TODO if there is a Left generate a Del + // TODO if there is a Right generate an Add + // TODO if there is a Both don't insert + // TODO compare numbers using OrderedFloat and strings using both normalized and original values. + + let geo_support = + geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng); + + let del_filterable_values = + del_value.map(|value| extract_facet_values(&value, geo_support)); + let add_filterable_values = + add_value.map(|value| extract_facet_values(&value, geo_support)); + + use FilterableValues::{Empty, Null, Values}; + + match (del_filterable_values, add_filterable_values) { + (None, None) => (), + (Some(del_filterable_values), None) => match del_filterable_values { + Null => { + let (mut del_is_null, _) = + facet_is_null_docids.entry(field_id).or_default(); + del_is_null.insert(document); + } + Empty => { + let (mut del_is_empty, _) = + facet_is_empty_docids.entry(field_id).or_default(); + del_is_empty.insert(document); + } + Values { numbers, strings } => { + // insert facet numbers in sorter + for number in numbers { + key_buffer.truncate(size_of::() + size_of::()); + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; + } + } + + // insert normalized and original facet string in sorter + for (normalized, original) in + strings.into_iter().filter(|(n, _)| !n.is_empty()) + { + let normalized_truncated_value: String = normalized + .char_indices() + .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect(); + + key_buffer.truncate(size_of::() + size_of::()); + key_buffer.extend_from_slice(normalized_truncated_value.as_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those strings. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, original.as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; } } + }, + (None, Some(add_filterable_values)) => { + todo!() + } + (Some(del_filterable_values), Some(add_filterable_values)) => { + let (mut del_is_null, mut add_is_null) = + facet_is_null_docids.entry(field_id).or_default(); + let (mut del_is_empty, mut add_is_empty) = + facet_is_empty_docids.entry(field_id).or_default(); - // insert normalized and original facet string in sorter - for (normalized, original) in - strings.into_iter().filter(|(n, _)| !n.is_empty()) - { - let normalized_truncated_value: String = normalized - .char_indices() - .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect(); + match (del_filterable_values, add_filterable_values) { + (Null, Null) | (Empty, Empty) => (), + (Null, Empty) => { + del_is_null.insert(document); + add_is_empty.insert(document); + } + (Empty, Null) => { + del_is_empty.insert(document); + add_is_null.insert(document); + } + (Null, Values { numbers, strings }) => { + del_is_null.insert(document); + todo!() + } + (Empty, Values { numbers, strings }) => { + del_is_empty.insert(document); + todo!() + } + (Values { numbers, strings }, Null) => { + todo!(); + add_is_null.insert(document); + } + (Values { numbers, strings }, Empty) => { + todo!(); + add_is_empty.insert(document); + } + ( + Values { numbers: mut del_numbers, strings: mut del_strings }, + Values { numbers: mut add_numbers, strings: mut add_strings }, + ) => { + // We sort and dedup the float numbers + del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + del_numbers.dedup_by_key(|f| OrderedFloat(*f)); + add_numbers.dedup_by_key(|f| OrderedFloat(*f)); - key_buffer.truncate(size_of::() + size_of::()); - key_buffer.extend_from_slice(normalized_truncated_value.as_bytes()); - fid_docid_facet_strings_sorter - .insert(&key_buffer, original.as_bytes())?; + let merged_numbers_iter = itertools::merge_join_by( + del_numbers.into_iter().map(OrderedFloat), + add_numbers.into_iter().map(OrderedFloat), + |del, add| del.cmp(&add), + ); + + // insert facet numbers in sorter + for eob in merged_numbers_iter { + key_buffer + .truncate(size_of::() + size_of::()); + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter + .insert(&key_buffer, bytes)?; + } + } + EitherOrBoth::Right(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter + .insert(&key_buffer, bytes)?; + } + } + } + } + + // We sort and dedup the normalized and original strings + del_strings.sort_unstable(); + add_strings.sort_unstable(); + del_strings.dedup(); + add_strings.dedup(); + + let merged_strings_iter = itertools::merge_join_by( + del_strings.into_iter().filter(|(n, _)| !n.is_empty()), + add_strings.into_iter().filter(|(n, _)| !n.is_empty()), + |del, add| del.cmp(&add), + ); + + // insert normalized and original facet string in sorter + for eob in merged_strings_iter { + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left((normalized, original)) => { + let truncated = truncate_string(normalized); + + key_buffer.truncate( + size_of::() + size_of::(), + ); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter + .insert(&key_buffer, bytes)?; + } + EitherOrBoth::Right((normalized, original)) => { + let truncated = truncate_string(normalized); + + key_buffer.truncate( + size_of::() + size_of::(), + ); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter + .insert(&key_buffer, bytes)?; + } + } + } + } } } } @@ -135,6 +325,7 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_level, tempfile::tempfile()?, ); + // TODO generate an Obkv for (fid, bitmap) in facet_exists_docids.into_iter() { let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; @@ -146,12 +337,14 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_level, tempfile::tempfile()?, ); + // TODO generate an Obkv for (fid, bitmap) in facet_is_null_docids.into_iter() { let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; } let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?; + // TODO generate an Obkv let mut facet_is_empty_docids_writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -243,3 +436,10 @@ fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues { } } } + +fn truncate_string(mut s: String) -> String { + s.char_indices() + .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect() +} From 7d546b9c2216264c1e05f447a0fe4eb999f37398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 17 Oct 2023 18:15:14 +0200 Subject: [PATCH 2/6] Generate the DelAdd for is_null, is_empty, and exists --- .../extract/extract_fid_docid_facet_values.rs | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 5f5217515..4f9b273ef 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -325,10 +325,14 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_level, tempfile::tempfile()?, ); - // TODO generate an Obkv - for (fid, bitmap) in facet_exists_docids.into_iter() { - let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); - facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; + for (fid, (del_bitmap, add_bitmap)) in facet_exists_docids.into_iter() { + let mut obkv = KvWriterDelAdd::memory(); + let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); + let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); + obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; + obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; + let bytes = obkv.into_inner()?; + facet_exists_docids_writer.insert(fid.to_be_bytes(), &bytes)?; } let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?; @@ -337,22 +341,30 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_level, tempfile::tempfile()?, ); - // TODO generate an Obkv - for (fid, bitmap) in facet_is_null_docids.into_iter() { - let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); - facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; + for (fid, (del_bitmap, add_bitmap)) in facet_is_null_docids.into_iter() { + let mut obkv = KvWriterDelAdd::memory(); + let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); + let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); + obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; + obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; + let bytes = obkv.into_inner()?; + facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bytes)?; } let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?; - // TODO generate an Obkv let mut facet_is_empty_docids_writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, tempfile::tempfile()?, ); - for (fid, bitmap) in facet_is_empty_docids.into_iter() { - let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); - facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; + for (fid, (del_bitmap, add_bitmap)) in facet_is_empty_docids.into_iter() { + let mut obkv = KvWriterDelAdd::memory(); + let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); + let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); + obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; + obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; + let bytes = obkv.into_inner()?; + facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &bytes)?; } let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?; From 178a9802fadd9acc1e591fd212bdd4d057449baf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 18 Oct 2023 11:01:02 +0200 Subject: [PATCH 3/6] Implement all the facet extraction paths and simplify them --- .../extract/extract_fid_docid_facet_values.rs | 404 +++++++++--------- 1 file changed, 212 insertions(+), 192 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 4f9b273ef..7d366ae1a 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -1,22 +1,31 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::fs::File; use std::io; use std::mem::size_of; +use std::result::Result as StdResult; +use grenad::Sorter; use heed::zerocopy::AsBytes; use heed::BytesEncode; use itertools::EitherOrBoth; use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use serde_json::{from_slice, Value}; +use FilterableValues::{Empty, Null, Values}; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{create_writer, writer_into_reader}; -use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH}; +use crate::{ + CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH, +}; + +/// The length of the elements that are always in the buffer when inserting new values. +const TRUNCATE_SIZE: usize = size_of::() + size_of::(); /// The extracted facet values stored in grenad files by type. pub struct ExtractedFacetValues { @@ -68,7 +77,10 @@ pub fn extract_fid_docid_facet_values( let mut facet_is_null_docids = BTreeMap::::new(); let mut facet_is_empty_docids = BTreeMap::::new(); - let mut key_buffer = Vec::new(); + // We create two buffer for mutable ref issues with closures. + let mut numbers_key_buffer = Vec::new(); + let mut strings_key_buffer = Vec::new(); + let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { // TODO Obkv> @@ -76,18 +88,21 @@ pub fn extract_fid_docid_facet_values( for (field_id, field_bytes) in obkv.iter() { if faceted_fields.contains(&field_id) { - key_buffer.clear(); + numbers_key_buffer.clear(); + strings_key_buffer.clear(); // Set key to the field_id // Note: this encoding is consistent with FieldIdCodec - key_buffer.extend_from_slice(&field_id.to_be_bytes()); + numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes()); + strings_key_buffer.extend_from_slice(&field_id.to_be_bytes()); // Here, we know already that the document must be added to the “field id exists” database let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap(); let document = BEU32::from(document).get(); // For the other extraction tasks, prefix the key with the field_id and the document_id - key_buffer.extend_from_slice(docid_bytes); + numbers_key_buffer.extend_from_slice(docid_bytes); + strings_key_buffer.extend_from_slice(docid_bytes); let del_add_obkv = obkv::KvReader::new(field_bytes); let del_value = match del_add_obkv.get(DelAdd::Deletion) { @@ -100,8 +115,13 @@ pub fn extract_fid_docid_facet_values( }; // We insert the document id on the Del and the Add side if the field exists. - let (mut del_exists, mut add_exists) = + let (ref mut del_exists, ref mut add_exists) = facet_exists_docids.entry(field_id).or_default(); + let (ref mut del_is_null, ref mut add_is_null) = + facet_is_null_docids.entry(field_id).or_default(); + let (ref mut del_is_empty, ref mut add_is_empty) = + facet_is_empty_docids.entry(field_id).or_default(); + if del_value.is_some() { del_exists.insert(document); } @@ -109,84 +129,58 @@ pub fn extract_fid_docid_facet_values( add_exists.insert(document); } - // TODO extract both Del and Add numbers an strings (dedup) - // TODO use the `itertools::merge_join_by` method to sort and diff both sides (Del and Add) - // TODO if there is a Left generate a Del - // TODO if there is a Right generate an Add - // TODO if there is a Both don't insert - // TODO compare numbers using OrderedFloat and strings using both normalized and original values. - let geo_support = geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng); - let del_filterable_values = del_value.map(|value| extract_facet_values(&value, geo_support)); let add_filterable_values = add_value.map(|value| extract_facet_values(&value, geo_support)); - use FilterableValues::{Empty, Null, Values}; + // Those closures are just here to simplify things a bit. + let mut insert_numbers_diff = |del_numbers, add_numbers| { + insert_numbers_diff( + &mut fid_docid_facet_numbers_sorter, + &mut numbers_key_buffer, + del_numbers, + add_numbers, + ) + }; + let mut insert_strings_diff = |del_strings, add_strings| { + insert_strings_diff( + &mut fid_docid_facet_strings_sorter, + &mut strings_key_buffer, + del_strings, + add_strings, + ) + }; match (del_filterable_values, add_filterable_values) { (None, None) => (), (Some(del_filterable_values), None) => match del_filterable_values { Null => { - let (mut del_is_null, _) = - facet_is_null_docids.entry(field_id).or_default(); del_is_null.insert(document); } Empty => { - let (mut del_is_empty, _) = - facet_is_empty_docids.entry(field_id).or_default(); del_is_empty.insert(document); } Values { numbers, strings } => { - // insert facet numbers in sorter - for number in numbers { - key_buffer.truncate(size_of::() + size_of::()); - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those numbers. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, ().as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; - } - } - - // insert normalized and original facet string in sorter - for (normalized, original) in - strings.into_iter().filter(|(n, _)| !n.is_empty()) - { - let normalized_truncated_value: String = normalized - .char_indices() - .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect(); - - key_buffer.truncate(size_of::() + size_of::()); - key_buffer.extend_from_slice(normalized_truncated_value.as_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those strings. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, original.as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; - } + insert_numbers_diff(numbers, vec![])?; + insert_strings_diff(strings, vec![])?; + } + }, + (None, Some(add_filterable_values)) => match add_filterable_values { + Null => { + add_is_null.insert(document); + } + Empty => { + add_is_empty.insert(document); + } + Values { numbers, strings } => { + insert_numbers_diff(vec![], numbers)?; + insert_strings_diff(vec![], strings)?; } }, - (None, Some(add_filterable_values)) => { - todo!() - } (Some(del_filterable_values), Some(add_filterable_values)) => { - let (mut del_is_null, mut add_is_null) = - facet_is_null_docids.entry(field_id).or_default(); - let (mut del_is_empty, mut add_is_empty) = - facet_is_empty_docids.entry(field_id).or_default(); - match (del_filterable_values, add_filterable_values) { (Null, Null) | (Empty, Empty) => (), (Null, Empty) => { @@ -198,120 +192,31 @@ pub fn extract_fid_docid_facet_values( add_is_null.insert(document); } (Null, Values { numbers, strings }) => { + insert_numbers_diff(vec![], numbers)?; + insert_strings_diff(vec![], strings)?; del_is_null.insert(document); - todo!() } (Empty, Values { numbers, strings }) => { + insert_numbers_diff(vec![], numbers)?; + insert_strings_diff(vec![], strings)?; del_is_empty.insert(document); - todo!() } (Values { numbers, strings }, Null) => { - todo!(); add_is_null.insert(document); + insert_numbers_diff(numbers, vec![])?; + insert_strings_diff(strings, vec![])?; } (Values { numbers, strings }, Empty) => { - todo!(); add_is_empty.insert(document); + insert_numbers_diff(numbers, vec![])?; + insert_strings_diff(strings, vec![])?; } ( - Values { numbers: mut del_numbers, strings: mut del_strings }, - Values { numbers: mut add_numbers, strings: mut add_strings }, + Values { numbers: del_numbers, strings: del_strings }, + Values { numbers: add_numbers, strings: add_strings }, ) => { - // We sort and dedup the float numbers - del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); - add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); - del_numbers.dedup_by_key(|f| OrderedFloat(*f)); - add_numbers.dedup_by_key(|f| OrderedFloat(*f)); - - let merged_numbers_iter = itertools::merge_join_by( - del_numbers.into_iter().map(OrderedFloat), - add_numbers.into_iter().map(OrderedFloat), - |del, add| del.cmp(&add), - ); - - // insert facet numbers in sorter - for eob in merged_numbers_iter { - key_buffer - .truncate(size_of::() + size_of::()); - match eob { - EitherOrBoth::Both(_, _) => (), // no need to touch anything - EitherOrBoth::Left(OrderedFloat(number)) => { - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those numbers. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, ().as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_numbers_sorter - .insert(&key_buffer, bytes)?; - } - } - EitherOrBoth::Right(OrderedFloat(number)) => { - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those numbers. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Addition, ().as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_numbers_sorter - .insert(&key_buffer, bytes)?; - } - } - } - } - - // We sort and dedup the normalized and original strings - del_strings.sort_unstable(); - add_strings.sort_unstable(); - del_strings.dedup(); - add_strings.dedup(); - - let merged_strings_iter = itertools::merge_join_by( - del_strings.into_iter().filter(|(n, _)| !n.is_empty()), - add_strings.into_iter().filter(|(n, _)| !n.is_empty()), - |del, add| del.cmp(&add), - ); - - // insert normalized and original facet string in sorter - for eob in merged_strings_iter { - match eob { - EitherOrBoth::Both(_, _) => (), // no need to touch anything - EitherOrBoth::Left((normalized, original)) => { - let truncated = truncate_string(normalized); - - key_buffer.truncate( - size_of::() + size_of::(), - ); - key_buffer.extend_from_slice(truncated.as_bytes()); - - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, original)?; - let bytes = obkv.into_inner()?; - fid_docid_facet_strings_sorter - .insert(&key_buffer, bytes)?; - } - EitherOrBoth::Right((normalized, original)) => { - let truncated = truncate_string(normalized); - - key_buffer.truncate( - size_of::() + size_of::(), - ); - key_buffer.extend_from_slice(truncated.as_bytes()); - - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Addition, original)?; - let bytes = obkv.into_inner()?; - fid_docid_facet_strings_sorter - .insert(&key_buffer, bytes)?; - } - } - } + insert_numbers_diff(del_numbers, add_numbers)?; + insert_strings_diff(del_strings, add_strings)?; } } } @@ -320,19 +225,15 @@ pub fn extract_fid_docid_facet_values( } } + let mut buffer = Vec::new(); let mut facet_exists_docids_writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, tempfile::tempfile()?, ); for (fid, (del_bitmap, add_bitmap)) in facet_exists_docids.into_iter() { - let mut obkv = KvWriterDelAdd::memory(); - let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); - let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); - obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; - obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; - let bytes = obkv.into_inner()?; - facet_exists_docids_writer.insert(fid.to_be_bytes(), &bytes)?; + deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?; + facet_exists_docids_writer.insert(fid.to_be_bytes(), &buffer)?; } let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?; @@ -342,13 +243,8 @@ pub fn extract_fid_docid_facet_values( tempfile::tempfile()?, ); for (fid, (del_bitmap, add_bitmap)) in facet_is_null_docids.into_iter() { - let mut obkv = KvWriterDelAdd::memory(); - let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); - let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); - obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; - obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; - let bytes = obkv.into_inner()?; - facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bytes)?; + deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?; + facet_is_null_docids_writer.insert(fid.to_be_bytes(), &buffer)?; } let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?; @@ -358,13 +254,8 @@ pub fn extract_fid_docid_facet_values( tempfile::tempfile()?, ); for (fid, (del_bitmap, add_bitmap)) in facet_is_empty_docids.into_iter() { - let mut obkv = KvWriterDelAdd::memory(); - let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); - let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); - obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; - obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; - let bytes = obkv.into_inner()?; - facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &bytes)?; + deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?; + facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &buffer)?; } let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?; @@ -377,6 +268,141 @@ pub fn extract_fid_docid_facet_values( }) } +/// Generates a vector of bytes containing a DelAdd obkv with two bitmaps. +fn deladd_obkv_cbo_roaring_bitmaps( + buffer: &mut Vec, + del_bitmap: &RoaringBitmap, + add_bitmap: &RoaringBitmap, +) -> io::Result<()> { + buffer.clear(); + let mut obkv = KvWriterDelAdd::new(buffer); + let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(del_bitmap).unwrap(); + let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(add_bitmap).unwrap(); + obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; + obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; + obkv.finish() +} + +/// Truncates a string to the biggest valid LMDB key size. +fn truncate_string(s: String) -> String { + s.char_indices() + .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect() +} + +/// Computes the diff between both Del and Add numbers and +/// only inserts the parts that differ in the sorter. +fn insert_numbers_diff( + fid_docid_facet_numbers_sorter: &mut Sorter, + key_buffer: &mut Vec, + mut del_numbers: Vec, + mut add_numbers: Vec, +) -> Result<()> +where + MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, +{ + // We sort and dedup the float numbers + del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + del_numbers.dedup_by_key(|f| OrderedFloat(*f)); + add_numbers.dedup_by_key(|f| OrderedFloat(*f)); + + let merged_numbers_iter = itertools::merge_join_by( + del_numbers.into_iter().map(OrderedFloat), + add_numbers.into_iter().map(OrderedFloat), + |del, add| del.cmp(add), + ); + + // insert facet numbers in sorter + for eob in merged_numbers_iter { + key_buffer.truncate(TRUNCATE_SIZE); + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; + } + } + EitherOrBoth::Right(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; + } + } + } + } + + Ok(()) +} + +/// Computes the diff between both Del and Add strings and +/// only inserts the parts that differ in the sorter. +fn insert_strings_diff( + fid_docid_facet_strings_sorter: &mut Sorter, + key_buffer: &mut Vec, + mut del_strings: Vec<(String, String)>, + mut add_strings: Vec<(String, String)>, +) -> Result<()> +where + MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, +{ + // We sort and dedup the normalized and original strings + del_strings.sort_unstable(); + add_strings.sort_unstable(); + del_strings.dedup(); + add_strings.dedup(); + + let merged_strings_iter = itertools::merge_join_by( + del_strings.into_iter().filter(|(n, _)| !n.is_empty()), + add_strings.into_iter().filter(|(n, _)| !n.is_empty()), + |del, add| del.cmp(add), + ); + + // insert normalized and original facet string in sorter + for eob in merged_strings_iter { + key_buffer.truncate(TRUNCATE_SIZE); + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left((normalized, original)) => { + let truncated = truncate_string(normalized); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; + } + EitherOrBoth::Right((normalized, original)) => { + let truncated = truncate_string(normalized); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; + } + } + } + + Ok(()) +} + /// Represent what a document field contains. enum FilterableValues { /// Corresponds to the JSON `null` value. @@ -387,6 +413,7 @@ enum FilterableValues { Values { numbers: Vec, strings: Vec<(String, String)> }, } +/// Extracts the facet values of a JSON field. fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues { fn inner_extract_facet_values( value: &Value, @@ -448,10 +475,3 @@ fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues { } } } - -fn truncate_string(mut s: String) -> String { - s.char_indices() - .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect() -} From c445e9daec348cad13981d168d575b1075b42389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 18 Oct 2023 13:53:58 +0200 Subject: [PATCH 4/6] Rename docid_fid into fid_docid --- .../extract/extract_fid_docid_facet_values.rs | 12 +++---- .../src/update/index_documents/extract/mod.rs | 36 +++++++++---------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 7d366ae1a..846a023e7 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -29,9 +29,8 @@ const TRUNCATE_SIZE: usize = size_of::() + size_of::(); /// The extracted facet values stored in grenad files by type. pub struct ExtractedFacetValues { - // TOOD rename into `fid_docid_*` - pub docid_fid_facet_numbers_chunk: grenad::Reader, - pub docid_fid_facet_strings_chunk: grenad::Reader, + pub fid_docid_facet_numbers_chunk: grenad::Reader, + pub fid_docid_facet_strings_chunk: grenad::Reader, pub fid_facet_is_null_docids_chunk: grenad::Reader, pub fid_facet_is_empty_docids_chunk: grenad::Reader, pub fid_facet_exists_docids_chunk: grenad::Reader, @@ -44,7 +43,6 @@ pub struct ExtractedFacetValues { /// We need the fid of the geofields to correctly parse them as numbers if they were sent as strings initially. #[logging_timer::time] pub fn extract_fid_docid_facet_values( - // TODO Reader>> obkv_documents: grenad::Reader, indexer: GrenadParameters, faceted_fields: &HashSet, @@ -83,7 +81,6 @@ pub fn extract_fid_docid_facet_values( let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { - // TODO Obkv> let obkv = obkv::KvReader::new(value); for (field_id, field_bytes) in obkv.iter() { @@ -96,7 +93,6 @@ pub fn extract_fid_docid_facet_values( numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes()); strings_key_buffer.extend_from_slice(&field_id.to_be_bytes()); - // Here, we know already that the document must be added to the “field id exists” database let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap(); let document = BEU32::from(document).get(); @@ -260,8 +256,8 @@ pub fn extract_fid_docid_facet_values( let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?; Ok(ExtractedFacetValues { - docid_fid_facet_numbers_chunk: sorter_into_reader(fid_docid_facet_numbers_sorter, indexer)?, - docid_fid_facet_strings_chunk: sorter_into_reader(fid_docid_facet_strings_sorter, indexer)?, + fid_docid_facet_numbers_chunk: sorter_into_reader(fid_docid_facet_numbers_sorter, indexer)?, + fid_docid_facet_strings_chunk: sorter_into_reader(fid_docid_facet_strings_sorter, indexer)?, fid_facet_is_null_docids_chunk: facet_is_null_docids_reader, fid_facet_is_empty_docids_chunk: facet_is_empty_docids_reader, fid_facet_exists_docids_chunk: facet_exists_docids_reader, diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index f28af8c2a..3eb46e4a7 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -91,9 +91,9 @@ pub(crate) fn data_from_obkv_documents( let ( docid_word_positions_chunks, ( - docid_fid_facet_numbers_chunks, + fid_docid_facet_numbers_chunks, ( - docid_fid_facet_strings_chunks, + fid_docid_facet_strings_chunks, ( facet_is_null_docids_chunks, (facet_is_empty_docids_chunks, facet_exists_docids_chunks), @@ -201,7 +201,7 @@ pub(crate) fn data_from_obkv_documents( ); spawn_extraction_task::<_, _, Vec>>( - docid_fid_facet_strings_chunks, + fid_docid_facet_strings_chunks, indexer, lmdb_writer_sx.clone(), extract_facet_string_docids, @@ -211,7 +211,7 @@ pub(crate) fn data_from_obkv_documents( ); spawn_extraction_task::<_, _, Vec>>( - docid_fid_facet_numbers_chunks, + fid_docid_facet_numbers_chunks, indexer, lmdb_writer_sx, extract_facet_number_docids, @@ -344,7 +344,7 @@ fn send_and_extract_flattened_documents_data( }); } - let (docid_word_positions_chunk, docid_fid_facet_values_chunks): (Result<_>, Result<_>) = + let (docid_word_positions_chunk, fid_docid_facet_values_chunks): (Result<_>, Result<_>) = rayon::join( || { let (documents_ids, docid_word_positions_chunk, script_language_pair) = @@ -372,8 +372,8 @@ fn send_and_extract_flattened_documents_data( }, || { let ExtractedFacetValues { - docid_fid_facet_numbers_chunk, - docid_fid_facet_strings_chunk, + fid_docid_facet_numbers_chunk, + fid_docid_facet_strings_chunk, fid_facet_is_null_docids_chunk, fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk, @@ -384,26 +384,26 @@ fn send_and_extract_flattened_documents_data( geo_fields_ids, )?; - // send docid_fid_facet_numbers_chunk to DB writer - let docid_fid_facet_numbers_chunk = - unsafe { as_cloneable_grenad(&docid_fid_facet_numbers_chunk)? }; + // send fid_docid_facet_numbers_chunk to DB writer + let fid_docid_facet_numbers_chunk = + unsafe { as_cloneable_grenad(&fid_docid_facet_numbers_chunk)? }; let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers( - docid_fid_facet_numbers_chunk.clone(), + fid_docid_facet_numbers_chunk.clone(), ))); - // send docid_fid_facet_strings_chunk to DB writer - let docid_fid_facet_strings_chunk = - unsafe { as_cloneable_grenad(&docid_fid_facet_strings_chunk)? }; + // send fid_docid_facet_strings_chunk to DB writer + let fid_docid_facet_strings_chunk = + unsafe { as_cloneable_grenad(&fid_docid_facet_strings_chunk)? }; let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings( - docid_fid_facet_strings_chunk.clone(), + fid_docid_facet_strings_chunk.clone(), ))); Ok(( - docid_fid_facet_numbers_chunk, + fid_docid_facet_numbers_chunk, ( - docid_fid_facet_strings_chunk, + fid_docid_facet_strings_chunk, ( fid_facet_is_null_docids_chunk, (fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk), @@ -413,5 +413,5 @@ fn send_and_extract_flattened_documents_data( }, ); - Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?)) + Ok((docid_word_positions_chunk?, fid_docid_facet_values_chunks?)) } From 5c43ff72c135578d912436dab17140a2e4f2ed2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 18 Oct 2023 17:40:13 +0200 Subject: [PATCH 5/6] Update extract_facet_number_docids to support deladd obkvs --- .../cbo_roaring_bitmap_codec.rs | 10 ++++-- .../extract/extract_facet_number_docids.rs | 26 ++++++++++------ .../helpers/merge_functions.rs | 31 +++++++++++++++++++ .../src/update/index_documents/helpers/mod.rs | 5 +-- 4 files changed, 57 insertions(+), 15 deletions(-) diff --git a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs index bf76287d8..79b52695e 100644 --- a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs +++ b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs @@ -60,12 +60,16 @@ impl CboRoaringBitmapCodec { /// if the merged values length is under the threshold, values are directly /// serialized in the buffer else a RoaringBitmap is created from the /// values and is serialized in the buffer. - pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec) -> io::Result<()> { + pub fn merge_into(slices: I, buffer: &mut Vec) -> io::Result<()> + where + I: IntoIterator, + A: AsRef<[u8]>, + { let mut roaring = RoaringBitmap::new(); let mut vec = Vec::new(); for bytes in slices { - if bytes.len() <= THRESHOLD * size_of::() { + if bytes.as_ref().len() <= THRESHOLD * size_of::() { let mut reader = bytes.as_ref(); while let Ok(integer) = reader.read_u32::() { vec.push(integer); @@ -85,7 +89,7 @@ impl CboRoaringBitmapCodec { } } else { // We can unwrap safely because the vector is sorted upper. - let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter()).unwrap(); + let roaring = RoaringBitmap::from_sorted_iter(vec).unwrap(); roaring.serialize_into(buffer)?; } } else { diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index 092de6468..cafa7e75f 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -4,11 +4,12 @@ use std::io; use heed::{BytesDecode, BytesEncode}; use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters, + create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters, }; use crate::heed_codec::facet::{ FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec, }; +use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd}; use crate::Result; /// Extracts the facet number and the documents ids where this facet number appear. @@ -17,8 +18,7 @@ use crate::Result; /// documents ids from the given chunk of docid facet number positions. #[logging_timer::time] pub fn extract_facet_number_docids( - // TODO Reader> - docid_fid_facet_number: grenad::Reader, + fid_docid_facet_number: grenad::Reader, indexer: GrenadParameters, ) -> Result> { puffin::profile_function!(); @@ -27,24 +27,30 @@ pub fn extract_facet_number_docids( let mut facet_number_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - // TODO We must modify the merger to do unions of Del and Add separately - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, max_memory, ); - let mut cursor = docid_fid_facet_number.into_cursor()?; - // TODO the value is a Obkv and must be taken into account - while let Some((key_bytes, _)) = cursor.move_on_next()? { + let mut buffer = Vec::new(); + let mut cursor = fid_docid_facet_number.into_cursor()?; + while let Some((key_bytes, deladd_obkv_bytes)) = cursor.move_on_next()? { let (field_id, document_id, number) = FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap(); let key = FacetGroupKey { field_id, level: 0, left_bound: number }; let key_bytes = FacetGroupKeyCodec::::bytes_encode(&key).unwrap(); - // TODO We must put a Obkv - facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?; + + buffer.clear(); + let mut obkv = KvWriterDelAdd::new(&mut buffer); + for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() { + obkv.insert(deladd_key, document_id.to_ne_bytes())?; + } + obkv.finish()?; + + facet_number_docids_sorter.insert(key_bytes, &buffer)?; } sorter_into_reader(facet_number_docids_sorter, indexer) diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index dee200b21..a418f8786 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -205,3 +205,34 @@ pub fn merge_cbo_roaring_bitmaps<'a>( Ok(Cow::from(vec)) } } + +pub fn merge_deladd_cbo_roaring_bitmaps<'a>( + _key: &[u8], + values: &[Cow<'a, [u8]>], +) -> Result> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + // Retrieve the bitmaps from both sides + let mut del_bitmaps_bytes = Vec::new(); + let mut add_bitmaps_bytes = Vec::new(); + for value in values { + let obkv = KvReaderDelAdd::new(value); + if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) { + del_bitmaps_bytes.push(bitmap_bytes); + } + if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) { + add_bitmaps_bytes.push(bitmap_bytes); + } + } + + let mut output_deladd_obkv = KvWriterDelAdd::memory(); + let mut buffer = Vec::new(); + CboRoaringBitmapCodec::merge_into(del_bitmaps_bytes, &mut buffer)?; + output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?; + buffer.clear(); + CboRoaringBitmapCodec::merge_into(add_bitmaps_bytes, &mut buffer)?; + output_deladd_obkv.insert(DelAdd::Addition, &buffer)?; + output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) + } +} diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 8f70a2de2..1f2f8e6ef 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -14,8 +14,9 @@ pub use grenad_helpers::{ }; pub use merge_functions::{ concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string, - merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions, - obkvs_merge_additions_and_deletions, serialize_roaring_bitmap, MergeFn, + merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_roaring_bitmaps, + obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions, + serialize_roaring_bitmap, MergeFn, }; use crate::MAX_WORD_LENGTH; From 061f490204c7de8528f767c320850f45177441b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 18 Oct 2023 18:06:41 +0200 Subject: [PATCH 6/6] Update extract_facet_string_docids to support deladd obkvs --- .../extract/extract_facet_string_docids.rs | 40 ++++++++----------- .../helpers/merge_functions.rs | 3 ++ 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index f2fb5c2fc..d7d55360c 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -1,13 +1,14 @@ use std::fs::File; -use std::io; +use std::{io, str}; use heed::BytesEncode; use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters}; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec}; use crate::heed_codec::StrRefCodec; -use crate::update::index_documents::merge_cbo_roaring_bitmaps; -use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; +use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::index_documents::helpers::merge_deladd_cbo_roaring_bitmaps; +use crate::{FieldId, Result}; /// Extracts the facet string and the documents ids where this facet string appear. /// @@ -15,7 +16,6 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; /// documents ids from the given chunk of docid facet string positions. #[logging_timer::time] pub fn extract_facet_string_docids( - // TODO Reader> docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, ) -> Result> { @@ -25,17 +25,16 @@ pub fn extract_facet_string_docids( let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, - // TODO We must modify the merger to do unions of Del and Add separately - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, max_memory, ); + let mut buffer = Vec::new(); let mut cursor = docid_fid_facet_string.into_cursor()?; - while let Some((key, _original_value_bytes)) = cursor.move_on_next()? { - // TODO the value is a Obkv and must be taken into account + while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? { let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); let field_id = FieldId::from_be_bytes(field_id_bytes); @@ -43,22 +42,17 @@ pub fn extract_facet_string_docids( try_split_array_at::<_, 4>(bytes).unwrap(); let document_id = u32::from_be_bytes(document_id_bytes); - let mut normalised_value = std::str::from_utf8(normalized_value_bytes)?; - - let normalised_truncated_value: String; - if normalised_value.len() > MAX_FACET_VALUE_LENGTH { - normalised_truncated_value = normalised_value - .char_indices() - .take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect(); - normalised_value = normalised_truncated_value.as_str(); - } - let key = FacetGroupKey { field_id, level: 0, left_bound: normalised_value }; + let normalized_value = str::from_utf8(normalized_value_bytes)?; + let key = FacetGroupKey { field_id, level: 0, left_bound: normalized_value }; let key_bytes = FacetGroupKeyCodec::::bytes_encode(&key).unwrap(); - // document id is encoded in native-endian because of the CBO roaring bitmap codec - // TODO Reader> - facet_string_docids_sorter.insert(&key_bytes, document_id.to_ne_bytes())?; + + buffer.clear(); + let mut obkv = KvWriterDelAdd::new(&mut buffer); + for (deladd_key, _) in KvReaderDelAdd::new(deladd_original_value_bytes).iter() { + obkv.insert(deladd_key, document_id.to_ne_bytes())?; + } + obkv.finish()?; + facet_string_docids_sorter.insert(&key_bytes, &buffer)?; } sorter_into_reader(facet_string_docids_sorter, indexer) diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index a418f8786..770629c8e 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -193,6 +193,7 @@ pub fn obkvs_keep_last_addition_merge_deletions<'a>( inner_merge_del_add_obkvs(obkvs, false) } +/// Do a union of all the CboRoaringBitmaps in the values. pub fn merge_cbo_roaring_bitmaps<'a>( _key: &[u8], values: &[Cow<'a, [u8]>], @@ -206,6 +207,8 @@ pub fn merge_cbo_roaring_bitmaps<'a>( } } +/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv +/// separately and outputs a new DelAdd with both unions. pub fn merge_deladd_cbo_roaring_bitmaps<'a>( _key: &[u8], values: &[Cow<'a, [u8]>],