diff --git a/milli/src/update/index_documents/merge_function.rs b/milli/src/update/index_documents/merge_function.rs index 6f24fcad9..54f994fc0 100644 --- a/milli/src/update/index_documents/merge_function.rs +++ b/milli/src/update/index_documents/merge_function.rs @@ -52,6 +52,10 @@ pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) - cbo_roaring_bitmap_merge(values) } +pub fn word_level_position_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { + cbo_roaring_bitmap_merge(values) +} + pub fn facet_field_value_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { cbo_roaring_bitmap_merge(values) } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 52949c13c..8fc35b654 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -18,11 +18,12 @@ use rayon::prelude::*; use serde::{Serialize, Deserialize}; use crate::index::Index; -use crate::update::{Facets, WordsPrefixes, UpdateIndexingStep}; +use crate::update::{Facets, WordsLevelPositions, WordsPrefixes, UpdateIndexingStep}; use self::store::{Store, Readers}; pub use self::merge_function::{ main_merge, word_docids_merge, words_pairs_proximities_docids_merge, - docid_word_positions_merge, documents_merge, facet_field_value_docids_merge, + docid_word_positions_merge, documents_merge, + word_level_position_docids_merge, facet_field_value_docids_merge, field_id_docid_facet_values_merge, }; pub use self::transform::{Transform, TransformOutput}; @@ -402,6 +403,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { enum DatabaseType { Main, WordDocids, + WordLevel0PositionDocids, FacetLevel0ValuesDocids, } @@ -467,6 +469,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let mut word_docids_readers = Vec::with_capacity(readers.len()); let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); + let mut word_level_position_docids_readers = Vec::with_capacity(readers.len()); let mut facet_field_value_docids_readers = Vec::with_capacity(readers.len()); let mut field_id_docid_facet_values_readers = Vec::with_capacity(readers.len()); let mut documents_readers = Vec::with_capacity(readers.len()); @@ -476,6 +479,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { word_docids, docid_word_positions, words_pairs_proximities_docids, + word_level_position_docids, facet_field_value_docids, field_id_docid_facet_values, documents @@ -484,6 +488,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { word_docids_readers.push(word_docids); docid_word_positions_readers.push(docid_word_positions); words_pairs_proximities_docids_readers.push(words_pairs_proximities_docids); + word_level_position_docids_readers.push(word_level_position_docids); facet_field_value_docids_readers.push(facet_field_value_docids); field_id_docid_facet_values_readers.push(field_id_docid_facet_values); documents_readers.push(documents); @@ -514,6 +519,11 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { facet_field_value_docids_readers, facet_field_value_docids_merge, ), + ( + DatabaseType::WordLevel0PositionDocids, + word_level_position_docids_readers, + word_level_position_docids_merge, + ), ] .into_par_iter() .for_each(|(dbtype, readers, merge)| { @@ -569,7 +579,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.index.put_documents_ids(self.wtxn, &documents_ids)?; let mut database_count = 0; - let total_databases = 7; + let total_databases = 8; progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen: 0, @@ -661,7 +671,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { )?; }, DatabaseType::FacetLevel0ValuesDocids => { - debug!("Writing the facet values docids into LMDB on disk..."); + debug!("Writing the facet level 0 values docids into LMDB on disk..."); let db = *self.index.facet_field_id_value_docids.as_polymorph(); write_into_lmdb_database( self.wtxn, @@ -671,6 +681,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { write_method, )?; }, + DatabaseType::WordLevel0PositionDocids => { + debug!("Writing the word level 0 positions docids into LMDB on disk..."); + let db = *self.index.word_level_position_docids.as_polymorph(); + write_into_lmdb_database( + self.wtxn, + db, + content, + word_level_position_docids_merge, + write_method, + )?; + } } database_count += 1; @@ -693,6 +714,19 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } builder.execute()?; + // Run the words positions update operation. + let mut builder = WordsLevelPositions::new(self.wtxn, self.index, self.update_id); + builder.chunk_compression_type = self.chunk_compression_type; + builder.chunk_compression_level = self.chunk_compression_level; + builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + if let Some(value) = self.facet_level_group_size { + builder.level_group_size(value); + } + if let Some(value) = self.facet_min_level_size { + builder.min_level_size(value); + } + builder.execute()?; + // Run the words prefixes update operation. let mut builder = WordsPrefixes::new(self.wtxn, self.index, self.update_id); builder.chunk_compression_type = self.chunk_compression_type; diff --git a/milli/src/update/index_documents/store.rs b/milli/src/update/index_documents/store.rs index 0bd83b692..358552768 100644 --- a/milli/src/update/index_documents/store.rs +++ b/milli/src/update/index_documents/store.rs @@ -29,7 +29,8 @@ use crate::{json_to_string, SmallVec8, SmallVec32, Position, DocumentId, FieldId use super::{MergeFn, create_writer, create_sorter, writer_into_reader}; use super::merge_function::{ main_merge, word_docids_merge, words_pairs_proximities_docids_merge, - facet_field_value_docids_merge, field_id_docid_facet_values_merge, + word_level_position_docids_merge, facet_field_value_docids_merge, + field_id_docid_facet_values_merge, }; const LMDB_MAX_KEY_LENGTH: usize = 511; @@ -43,6 +44,7 @@ pub struct Readers { pub word_docids: Reader, pub docid_word_positions: Reader, pub words_pairs_proximities_docids: Reader, + pub word_level_position_docids: Reader, pub facet_field_value_docids: Reader, pub field_id_docid_facet_values: Reader, pub documents: Reader, @@ -69,6 +71,7 @@ pub struct Store<'s, A> { main_sorter: Sorter, word_docids_sorter: Sorter, words_pairs_proximities_docids_sorter: Sorter, + word_level_position_docids_sorter: Sorter, facet_field_value_docids_sorter: Sorter, field_id_docid_facet_values_sorter: Sorter, // MTBL writers @@ -94,7 +97,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { ) -> anyhow::Result { // We divide the max memory by the number of sorter the Store have. - let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 4)); + let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 5)); let linked_hash_map_size = linked_hash_map_size.unwrap_or(500); let main_sorter = create_sorter( @@ -121,6 +124,14 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { max_nb_chunks, max_memory, ); + let word_level_position_docids_sorter = create_sorter( + word_level_position_docids_merge, + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + max_nb_chunks, + max_memory, + ); let facet_field_value_docids_sorter = create_sorter( facet_field_value_docids_merge, chunk_compression_type, @@ -172,6 +183,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { main_sorter, word_docids_sorter, words_pairs_proximities_docids_sorter, + word_level_position_docids_sorter, facet_field_value_docids_sorter, field_id_docid_facet_values_sorter, // MTBL writers @@ -290,6 +302,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { self.documents_writer.insert(document_id.to_be_bytes(), record)?; Self::write_docid_word_positions(&mut self.docid_word_positions_writer, document_id, words_positions)?; + Self::write_word_position_docids(&mut self.word_level_position_docids_sorter, document_id, words_positions)?; words_positions.clear(); @@ -360,6 +373,42 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { Ok(()) } + fn write_word_position_docids( + writer: &mut Sorter, + document_id: DocumentId, + words_positions: &HashMap>, + ) -> anyhow::Result<()> + { + let mut key_buffer = Vec::new(); + let mut data_buffer = Vec::new(); + + for (word, positions) in words_positions { + key_buffer.clear(); + key_buffer.extend_from_slice(word.as_bytes()); + key_buffer.push(0); // level 0 + + for position in positions { + key_buffer.truncate(word.len()); + let position_bytes = position.to_be_bytes(); + key_buffer.extend_from_slice(position_bytes.as_bytes()); + key_buffer.extend_from_slice(position_bytes.as_bytes()); + + data_buffer.clear(); + let positions = RoaringBitmap::from_iter(Some(document_id)); + // We serialize the positions into a buffer. + CboRoaringBitmapCodec::serialize_into(&positions, &mut data_buffer) + .with_context(|| "could not serialize positions")?; + + // that we write under the generated key into MTBL + if lmdb_key_valid_size(&key_buffer) { + writer.insert(&key_buffer, &data_buffer)?; + } + } + } + + Ok(()) + } + fn write_facet_field_value_docids( sorter: &mut Sorter, iter: I, @@ -561,6 +610,9 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?; + let mut word_level_position_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; + self.word_level_position_docids_sorter.write_into(&mut word_level_position_docids_wtr)?; + let mut facet_field_value_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; self.facet_field_value_docids_sorter.write_into(&mut facet_field_value_docids_wtr)?; @@ -570,6 +622,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { let main = writer_into_reader(main_wtr, shrink_size)?; let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?; let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?; + let word_level_position_docids = writer_into_reader(word_level_position_docids_wtr, shrink_size)?; let facet_field_value_docids = writer_into_reader(facet_field_value_docids_wtr, shrink_size)?; let field_id_docid_facet_values = writer_into_reader(field_id_docid_facet_values_wtr, shrink_size)?; let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?; @@ -580,6 +633,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { word_docids, docid_word_positions, words_pairs_proximities_docids, + word_level_position_docids, facet_field_value_docids, field_id_docid_facet_values, documents, diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index c2df94468..1fc4890fb 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -6,6 +6,7 @@ pub use self::index_documents::{DocumentAdditionResult, IndexDocuments, IndexDoc pub use self::settings::{Setting, Settings}; pub use self::update_builder::UpdateBuilder; pub use self::update_step::UpdateIndexingStep; +pub use self::words_level_positions::WordsLevelPositions; pub use self::words_prefixes::WordsPrefixes; mod available_documents_ids; @@ -16,5 +17,6 @@ mod index_documents; mod settings; mod update_builder; mod update_step; +mod words_level_positions; mod words_prefixes; diff --git a/milli/src/update/words_level_positions.rs b/milli/src/update/words_level_positions.rs new file mode 100644 index 000000000..983f82657 --- /dev/null +++ b/milli/src/update/words_level_positions.rs @@ -0,0 +1,184 @@ +use std::cmp; +use std::fs::File; +use std::num::NonZeroUsize; + +use grenad::{CompressionType, Reader, Writer, FileFuse}; +use heed::types::{ByteSlice, DecodeIgnore}; +use heed::{BytesEncode, Error}; +use log::debug; +use roaring::RoaringBitmap; + +use crate::facet::FacetType; +use crate::heed_codec::{StrLevelPositionCodec, CboRoaringBitmapCodec}; +use crate::Index; +use crate::update::index_documents::WriteMethod; +use crate::update::index_documents::{create_writer, writer_into_reader, write_into_lmdb_database}; + +pub struct WordsLevelPositions<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + pub(crate) chunk_compression_type: CompressionType, + pub(crate) chunk_compression_level: Option, + pub(crate) chunk_fusing_shrink_size: Option, + level_group_size: NonZeroUsize, + min_level_size: NonZeroUsize, + _update_id: u64, +} + +impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> WordsLevelPositions<'t, 'u, 'i> + { + WordsLevelPositions { + wtxn, + index, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + level_group_size: NonZeroUsize::new(4).unwrap(), + min_level_size: NonZeroUsize::new(5).unwrap(), + _update_id: update_id, + } + } + + pub fn level_group_size(&mut self, value: NonZeroUsize) -> &mut Self { + self.level_group_size = NonZeroUsize::new(cmp::max(value.get(), 2)).unwrap(); + self + } + + pub fn min_level_size(&mut self, value: NonZeroUsize) -> &mut Self { + self.min_level_size = value; + self + } + + pub fn execute(self) -> anyhow::Result<()> { + debug!("Computing and writing the word levels positions docids into LMDB on disk..."); + + clear_non_zero_levels_positions(self.wtxn, self.index.word_level_position_docids)?; + + let entries = compute_positions_levels( + self.wtxn, + self.index.word_level_position_docids, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.level_group_size, + self.min_level_size, + )?; + + write_into_lmdb_database( + self.wtxn, + *self.index.facet_field_id_value_docids.as_polymorph(), + entries, + |_, _| anyhow::bail!("invalid facet level merging"), + WriteMethod::GetMergePut, + )?; + + Ok(()) + } +} + +fn clear_non_zero_levels_positions( + wtxn: &mut heed::RwTxn, + db: heed::Database, +) -> heed::Result<()> +{ + let mut iter = db.iter_mut(wtxn)?.lazily_decode_data(); + while let Some(result) = iter.next() { + let ((_, level, _, _), _) = result?; + if level != 0 { + iter.del_current()?; + } + } + Ok(()) +} + +/// Generates all the words positions levels (including the level zero). +fn compute_positions_levels( + rtxn: &heed::RoTxn, + db: heed::Database, + compression_type: CompressionType, + compression_level: Option, + shrink_size: Option, + level_group_size: NonZeroUsize, + min_level_size: NonZeroUsize, +) -> anyhow::Result> +{ + // let first_level_size = db.prefix_iter(rtxn, &[field_id])? + // .remap_types::() + // .fold(Ok(0usize), |count, result| result.and(count).map(|c| c + 1))?; + + // // It is forbidden to keep a cursor and write in a database at the same time with LMDB + // // therefore we write the facet levels entries into a grenad file before transfering them. + // let mut writer = tempfile::tempfile().and_then(|file| { + // create_writer(compression_type, compression_level, file) + // })?; + + // let level_0_range = { + // let left = (field_id, 0, T::min_value(), T::min_value()); + // let right = (field_id, 0, T::max_value(), T::max_value()); + // left..=right + // }; + + // // Groups sizes are always a power of the original level_group_size and therefore a group + // // always maps groups of the previous level and never splits previous levels groups in half. + // let group_size_iter = (1u8..) + // .map(|l| (l, level_group_size.get().pow(l as u32))) + // .take_while(|(_, s)| first_level_size / *s >= min_level_size.get()); + + // for (level, group_size) in group_size_iter { + // let mut left = T::zero(); + // let mut right = T::zero(); + // let mut group_docids = RoaringBitmap::new(); + + // let db = db.remap_key_type::(); + // for (i, result) in db.range(rtxn, &level_0_range)?.enumerate() { + // let ((_field_id, _level, value, _right), docids) = result?; + + // if i == 0 { + // left = value; + // } else if i % group_size == 0 { + // // we found the first bound of the next group, we must store the left + // // and right bounds associated with the docids. + // write_entry::(&mut writer, field_id, level, left, right, &group_docids)?; + + // // We save the left bound for the new group and also reset the docids. + // group_docids = RoaringBitmap::new(); + // left = value; + // } + + // // The right bound is always the bound we run through. + // group_docids.union_with(&docids); + // right = value; + // } + + // if !group_docids.is_empty() { + // write_entry::(&mut writer, field_id, level, left, right, &group_docids)?; + // } + // } + + // writer_into_reader(writer, shrink_size) + + todo!() +} + +fn write_entry( + writer: &mut Writer, + field_id: u8, + level: u8, + left: T, + right: T, + ids: &RoaringBitmap, +) -> anyhow::Result<()> +where + KC: for<'x> heed::BytesEncode<'x, EItem = (u8, u8, T, T)>, +{ + let key = (field_id, level, left, right); + let key = KC::bytes_encode(&key).ok_or(Error::Encoding)?; + let data = CboRoaringBitmapCodec::bytes_encode(&ids).ok_or(Error::Encoding)?; + writer.insert(&key, &data)?; + Ok(()) +}