diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index a3f0c8f71..a2b1c9dcd 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -133,6 +133,8 @@ impl FacetsUpdateBulkInner { self.db.delete_range(wtxn, &range).map(drop)?; Ok(()) } + + // TODO the new_data is an Reader>> fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> { let new_data = match self.new_data.take() { Some(x) => x, diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index bbd25f91e..decb6a9ac 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -115,6 +115,7 @@ pub struct FacetsUpdate<'i> { min_level_size: u8, } impl<'i> FacetsUpdate<'i> { + // TODO grenad::Reader> pub fn new( index: &'i Index, facet_type: FacetType, diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index d557e0b6c..76dc6d3c6 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -17,6 +17,7 @@ use crate::Result; /// documents ids from the given chunk of docid facet number positions. #[logging_timer::time] pub fn extract_facet_number_docids( + // TODO Reader> docid_fid_facet_number: grenad::Reader, indexer: GrenadParameters, ) -> Result>> { @@ -26,6 +27,7 @@ pub fn extract_facet_number_docids( let mut facet_number_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, + // TODO We must modify the merger to do unions of Del and Add separately merge_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -34,12 +36,14 @@ pub fn extract_facet_number_docids( ); let mut cursor = docid_fid_facet_number.into_cursor()?; + // TODO the value is a Obkv and must be taken into account while let Some((key_bytes, _)) = cursor.move_on_next()? { let (field_id, document_id, number) = FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap(); let key = FacetGroupKey { field_id, level: 0, left_bound: number }; let key_bytes = FacetGroupKeyCodec::::bytes_encode(&key).unwrap(); + // TODO We must put a Obkv facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?; } diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index b1b27449e..b861c04e4 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -15,6 +15,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; /// documents ids from the given chunk of docid facet string positions. #[logging_timer::time] pub fn extract_facet_string_docids( + // TODO Reader> docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, ) -> Result>> { @@ -24,6 +25,7 @@ pub fn extract_facet_string_docids( let mut facet_string_docids_sorter = create_sorter( grenad::SortAlgorithm::Stable, + // TODO We must modify the merger to do unions of Del and Add separately merge_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -33,6 +35,7 @@ pub fn extract_facet_string_docids( let mut cursor = docid_fid_facet_string.into_cursor()?; while let Some((key, _original_value_bytes)) = cursor.move_on_next()? { + // TODO the value is a Obkv and must be taken into account let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); let field_id = FieldId::from_be_bytes(field_id_bytes); @@ -54,6 +57,7 @@ pub fn extract_facet_string_docids( let key = FacetGroupKey { field_id, level: 0, left_bound: normalised_value }; let key_bytes = FacetGroupKeyCodec::::bytes_encode(&key).unwrap(); // document id is encoded in native-endian because of the CBO roaring bitmap codec + // TODO Reader> facet_string_docids_sorter.insert(&key_bytes, document_id.to_ne_bytes())?; } diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 42c355323..0340fb709 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -6,17 +6,21 @@ use std::mem::size_of; use heed::zerocopy::AsBytes; use heed::BytesEncode; +use itertools::EitherOrBoth; +use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use serde_json::{from_slice, Value}; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; +use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{create_writer, writer_into_reader}; use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH}; /// The extracted facet values stored in grenad files by type. pub struct ExtractedFacetValues { + // TOOD rename into `fid_docid_*` pub docid_fid_facet_numbers_chunk: grenad::Reader>, pub docid_fid_facet_strings_chunk: grenad::Reader>, pub fid_facet_is_null_docids_chunk: grenad::Reader>, @@ -31,6 +35,7 @@ pub struct ExtractedFacetValues { /// We need the fid of the geofields to correctly parse them as numbers if they were sent as strings initially. #[logging_timer::time] pub fn extract_fid_docid_facet_values( + // TODO Reader>> obkv_documents: grenad::Reader, indexer: GrenadParameters, faceted_fields: &HashSet, @@ -58,13 +63,15 @@ pub fn extract_fid_docid_facet_values( max_memory.map(|m| m / 2), ); - let mut facet_exists_docids = BTreeMap::::new(); - let mut facet_is_null_docids = BTreeMap::::new(); - let mut facet_is_empty_docids = BTreeMap::::new(); + // The tuples represents the Del and Add side for a bitmap + let mut facet_exists_docids = BTreeMap::::new(); + let mut facet_is_null_docids = BTreeMap::::new(); + let mut facet_is_empty_docids = BTreeMap::::new(); let mut key_buffer = Vec::new(); let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { + // TODO Obkv> let obkv = obkv::KvReader::new(value); for (field_id, field_bytes) in obkv.iter() { @@ -79,50 +86,233 @@ pub fn extract_fid_docid_facet_values( let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap(); let document = BEU32::from(document).get(); - facet_exists_docids.entry(field_id).or_default().insert(document); - // For the other extraction tasks, prefix the key with the field_id and the document_id key_buffer.extend_from_slice(docid_bytes); - let value = from_slice(field_bytes).map_err(InternalError::SerdeJson)?; + let del_add_obkv = obkv::KvReader::new(field_bytes); + let del_value = match del_add_obkv.get(DelAdd::Deletion) { + Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?, + None => None, + }; + let add_value = match del_add_obkv.get(DelAdd::Addition) { + Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?, + None => None, + }; - match extract_facet_values( - &value, - geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng), - ) { - FilterableValues::Null => { - facet_is_null_docids.entry(field_id).or_default().insert(document); - } - FilterableValues::Empty => { - facet_is_empty_docids.entry(field_id).or_default().insert(document); - } - FilterableValues::Values { numbers, strings } => { - // insert facet numbers in sorter - for number in numbers { - key_buffer.truncate(size_of::() + size_of::()); - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); + // We insert the document id on the Del and the Add side if the field exists. + let (mut del_exists, mut add_exists) = + facet_exists_docids.entry(field_id).or_default(); + if del_value.is_some() { + del_exists.insert(document); + } + if add_value.is_some() { + add_exists.insert(document); + } - fid_docid_facet_numbers_sorter - .insert(&key_buffer, ().as_bytes())?; + // TODO extract both Del and Add numbers an strings (dedup) + // TODO use the `itertools::merge_join_by` method to sort and diff both sides (Del and Add) + // TODO if there is a Left generate a Del + // TODO if there is a Right generate an Add + // TODO if there is a Both don't insert + // TODO compare numbers using OrderedFloat and strings using both normalized and original values. + + let geo_support = + geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng); + + let del_filterable_values = + del_value.map(|value| extract_facet_values(&value, geo_support)); + let add_filterable_values = + add_value.map(|value| extract_facet_values(&value, geo_support)); + + use FilterableValues::{Empty, Null, Values}; + + match (del_filterable_values, add_filterable_values) { + (None, None) => (), + (Some(del_filterable_values), None) => match del_filterable_values { + Null => { + let (mut del_is_null, _) = + facet_is_null_docids.entry(field_id).or_default(); + del_is_null.insert(document); + } + Empty => { + let (mut del_is_empty, _) = + facet_is_empty_docids.entry(field_id).or_default(); + del_is_empty.insert(document); + } + Values { numbers, strings } => { + // insert facet numbers in sorter + for number in numbers { + key_buffer.truncate(size_of::() + size_of::()); + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; + } + } + + // insert normalized and original facet string in sorter + for (normalized, original) in + strings.into_iter().filter(|(n, _)| !n.is_empty()) + { + let normalized_truncated_value: String = normalized + .char_indices() + .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect(); + + key_buffer.truncate(size_of::() + size_of::()); + key_buffer.extend_from_slice(normalized_truncated_value.as_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those strings. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, original.as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; } } + }, + (None, Some(add_filterable_values)) => { + todo!() + } + (Some(del_filterable_values), Some(add_filterable_values)) => { + let (mut del_is_null, mut add_is_null) = + facet_is_null_docids.entry(field_id).or_default(); + let (mut del_is_empty, mut add_is_empty) = + facet_is_empty_docids.entry(field_id).or_default(); - // insert normalized and original facet string in sorter - for (normalized, original) in - strings.into_iter().filter(|(n, _)| !n.is_empty()) - { - let normalized_truncated_value: String = normalized - .char_indices() - .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect(); + match (del_filterable_values, add_filterable_values) { + (Null, Null) | (Empty, Empty) => (), + (Null, Empty) => { + del_is_null.insert(document); + add_is_empty.insert(document); + } + (Empty, Null) => { + del_is_empty.insert(document); + add_is_null.insert(document); + } + (Null, Values { numbers, strings }) => { + del_is_null.insert(document); + todo!() + } + (Empty, Values { numbers, strings }) => { + del_is_empty.insert(document); + todo!() + } + (Values { numbers, strings }, Null) => { + todo!(); + add_is_null.insert(document); + } + (Values { numbers, strings }, Empty) => { + todo!(); + add_is_empty.insert(document); + } + ( + Values { numbers: mut del_numbers, strings: mut del_strings }, + Values { numbers: mut add_numbers, strings: mut add_strings }, + ) => { + // We sort and dedup the float numbers + del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + del_numbers.dedup_by_key(|f| OrderedFloat(*f)); + add_numbers.dedup_by_key(|f| OrderedFloat(*f)); - key_buffer.truncate(size_of::() + size_of::()); - key_buffer.extend_from_slice(normalized_truncated_value.as_bytes()); - fid_docid_facet_strings_sorter - .insert(&key_buffer, original.as_bytes())?; + let merged_numbers_iter = itertools::merge_join_by( + del_numbers.into_iter().map(OrderedFloat), + add_numbers.into_iter().map(OrderedFloat), + |del, add| del.cmp(&add), + ); + + // insert facet numbers in sorter + for eob in merged_numbers_iter { + key_buffer + .truncate(size_of::() + size_of::()); + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter + .insert(&key_buffer, bytes)?; + } + } + EitherOrBoth::Right(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter + .insert(&key_buffer, bytes)?; + } + } + } + } + + // We sort and dedup the normalized and original strings + del_strings.sort_unstable(); + add_strings.sort_unstable(); + del_strings.dedup(); + add_strings.dedup(); + + let merged_strings_iter = itertools::merge_join_by( + del_strings.into_iter().filter(|(n, _)| !n.is_empty()), + add_strings.into_iter().filter(|(n, _)| !n.is_empty()), + |del, add| del.cmp(&add), + ); + + // insert normalized and original facet string in sorter + for eob in merged_strings_iter { + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left((normalized, original)) => { + let truncated = truncate_string(normalized); + + key_buffer.truncate( + size_of::() + size_of::(), + ); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter + .insert(&key_buffer, bytes)?; + } + EitherOrBoth::Right((normalized, original)) => { + let truncated = truncate_string(normalized); + + key_buffer.truncate( + size_of::() + size_of::(), + ); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter + .insert(&key_buffer, bytes)?; + } + } + } + } } } } @@ -135,6 +325,7 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_level, tempfile::tempfile()?, ); + // TODO generate an Obkv for (fid, bitmap) in facet_exists_docids.into_iter() { let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; @@ -146,12 +337,14 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_level, tempfile::tempfile()?, ); + // TODO generate an Obkv for (fid, bitmap) in facet_is_null_docids.into_iter() { let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; } let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?; + // TODO generate an Obkv let mut facet_is_empty_docids_writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -243,3 +436,10 @@ fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues { } } } + +fn truncate_string(mut s: String) -> String { + s.char_indices() + .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect() +}