diff --git a/milli/src/index.rs b/milli/src/index.rs index 8f9c9beb7..3d6d954f0 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -964,6 +964,11 @@ impl Index { .get::<_, Str, SerdeBincode>>(txn, main_key::EXACT_ATTRIBUTES)? .unwrap_or_default()) } + pub fn exact_attributes_ids(&self, txn: &RoTxn) -> Result> { + let attrs = self.exact_attributes(txn)?; + let fid_map = self.fields_ids_map(txn)?; + Ok(attrs.iter().filter_map(|attr| fid_map.id(attr)).collect()) + } pub(crate) fn put_exact_attributes(&self, txn: &mut RwTxn, attrs: &[&str]) -> Result<()> { self.main.put::<_, Str, SerdeBincode<&[&str]>>(txn, main_key::EXACT_ATTRIBUTES, &attrs)?; diff --git a/milli/src/lib.rs b/milli/src/lib.rs index ba2bd9b0f..b68c76048 100644 --- a/milli/src/lib.rs +++ b/milli/src/lib.rs @@ -74,6 +74,10 @@ pub fn absolute_from_relative_position(field_id: FieldId, relative: RelativePosi (field_id as u32) << 16 | (relative as u32) } +pub fn field_id_from_position(position: u32) -> FieldId { + (position >> 16 & 0xffff) as u16 +} + /// Transform a raw obkv store into a JSON Object. pub fn obkv_to_json( displayed_fields: &[FieldId], diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 03bfada21..5f231e5aa 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::fs::File; use std::io; use std::iter::FromIterator; @@ -10,8 +11,8 @@ use super::helpers::{ }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::update::index_documents::MergeFn; -use crate::Result; +use crate::update::index_documents::helpers::read_u32_ne_bytes; +use crate::{field_id_from_position, FieldId, Result}; /// Extracts the word and the documents ids where this word appear. /// @@ -24,6 +25,7 @@ use crate::Result; pub fn extract_word_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, + exact_attributes: &HashSet, ) -> Result<(grenad::Reader, grenad::Reader)> { let max_memory = indexer.max_memory_by_thread(); @@ -35,21 +37,50 @@ pub fn extract_word_docids( max_memory, ); + let mut exact_word_docids_sorter = create_sorter( + merge_roaring_bitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ); + let mut value_buffer = Vec::new(); let mut cursor = docid_word_positions.into_cursor()?; - while let Some((key, _value)) = cursor.move_on_next()? { + while let Some((key, positions)) = cursor.move_on_next()? { let (document_id_bytes, word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); let bitmap = RoaringBitmap::from_iter(Some(document_id)); serialize_roaring_bitmap(&bitmap, &mut value_buffer)?; - word_docids_sorter.insert(word_bytes, &value_buffer)?; + + // If there are no exact attributes, we do not need to iterate over positions. + if exact_attributes.is_empty() { + word_docids_sorter.insert(word_bytes, &value_buffer)?; + } else { + let mut added_to_exact = false; + let mut added_to_word_docids = false; + for position in read_u32_ne_bytes(positions) { + // as soon as we know that this word had been to both readers, we don't need to + // iterate over the positions. + if added_to_exact && added_to_word_docids { + break; + } + let fid = field_id_from_position(position); + if exact_attributes.contains(&fid) && !added_to_exact { + exact_word_docids_sorter.insert(word_bytes, &value_buffer)?; + added_to_exact = true; + } else if !added_to_word_docids { + word_docids_sorter.insert(word_bytes, &value_buffer)?; + added_to_word_docids = true; + } + } + } } - let empty_sorter = grenad::Sorter::new(merge_roaring_bitmaps as MergeFn); Ok(( sorter_into_reader(word_docids_sorter, indexer)?, - sorter_into_reader(empty_sorter, indexer)?, + sorter_into_reader(exact_word_docids_sorter, indexer)?, )) } diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 4e7f211ce..8f6797a3b 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -43,6 +43,7 @@ pub(crate) fn data_from_obkv_documents( geo_field_id: Option, stop_words: Option>, max_positions_per_attributes: Option, + exact_attributes: HashSet, ) -> Result<()> { let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = obkv_chunks .par_bridge() @@ -90,7 +91,7 @@ pub(crate) fn data_from_obkv_documents( docid_word_positions_chunks.clone(), indexer.clone(), lmdb_writer_sx.clone(), - extract_word_docids, + move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes), merge_roaring_bitmaps, |(word_docids_reader, exact_word_docids_reader)| TypedChunk::WordDocids { word_docids_reader, diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 633b72cc9..c490e93da 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -226,6 +226,7 @@ where }; let stop_words = self.index.stop_words(self.wtxn)?; + let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?; // Run extraction pipeline in parallel. pool.install(|| { @@ -255,6 +256,7 @@ where geo_field_id, stop_words, self.indexer_config.max_positions_per_attributes, + exact_attributes, ) });