From d675e73af114bd6e70d75429356af966d2af768a Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Mon, 14 Oct 2024 11:12:10 +0200 Subject: [PATCH] Finish prefix databases --- milli/src/index.rs | 15 +++ milli/src/update/new/indexer/mod.rs | 3 + milli/src/update/new/merger.rs | 9 +- milli/src/update/new/word_fst_builder.rs | 20 ++-- milli/src/update/new/words_prefix_docids.rs | 120 +++++++++++++++++--- 5 files changed, 131 insertions(+), 36 deletions(-) diff --git a/milli/src/index.rs b/milli/src/index.rs index 4a7f2c42b..19064e8d7 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1669,6 +1669,14 @@ impl Index { } Ok(res) } + + pub fn prefix_settings(&self, _rtxn: &RoTxn<'_>) -> Result { + Ok(PrefixSettings { + compute_prefixes: true, + max_prefix_length: 4, + prefix_count_threshold: 100, + }) + } } #[derive(Debug, Deserialize, Serialize)] @@ -1678,6 +1686,13 @@ pub struct IndexEmbeddingConfig { pub user_provided: RoaringBitmap, } +#[derive(Debug, Deserialize, Serialize)] +pub struct PrefixSettings { + pub prefix_count_threshold: u64, + pub max_prefix_length: usize, + pub compute_prefixes: bool, +} + #[derive(Serialize, Deserialize)] #[serde(transparent)] struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] time::OffsetDateTime); diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index c634e22b6..3de5c176e 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -29,6 +29,7 @@ use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; use crate::proximity::ProximityPrecision; use crate::update::new::channel::ExtractorSender; +use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; @@ -301,6 +302,8 @@ fn compute_prefix_database( let PrefixDelta { modified, deleted } = prefix_delta; // Compute word prefix docids compute_word_prefix_docids(wtxn, index, &modified, &deleted)?; + // Compute exact word prefix docids + compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted)?; // Compute word prefix fid docids compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?; // Compute word prefix position docids diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 80556ced9..998a5d4a2 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -10,7 +10,7 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::FacetKind; -use super::word_fst_builder::{PrefixData, PrefixDelta, PrefixSettings}; +use super::word_fst_builder::{PrefixData, PrefixDelta}; use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; @@ -63,12 +63,7 @@ pub fn merge_grenad_entries( MergerOperation::WordDocidsMerger(merger) => { let words_fst = index.words_fst(rtxn)?; let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; - /// TODO make this configurable - let prefix_settings = PrefixSettings { - compute_prefixes: true, - max_prefix_length: 4, - prefix_count_threshold: 100, - }; + let prefix_settings = index.prefix_settings(rtxn)?; word_fst_builder.with_prefix_settings(prefix_settings); { diff --git a/milli/src/update/new/word_fst_builder.rs b/milli/src/update/new/word_fst_builder.rs index 97cd47e73..867d3e86d 100644 --- a/milli/src/update/new/word_fst_builder.rs +++ b/milli/src/update/new/word_fst_builder.rs @@ -5,7 +5,7 @@ use memmap2::Mmap; use std::collections::HashSet; use tempfile::tempfile; -use crate::{update::del_add::DelAdd, Prefix, Result}; +use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result}; pub struct WordFstBuilder<'a> { stream: Option>, @@ -143,8 +143,10 @@ impl<'a> WordFstBuilder<'a> { ) -> Result<(Mmap, Option)> { self.drain_stream()?; - /// TODO: ugly unwrap - let words_fst_file = self.word_fst_builder.into_inner()?.into_inner().unwrap(); + let words_fst_file = + self.word_fst_builder.into_inner()?.into_inner().map_err(|_| { + InternalError::IndexingMergingKeys { process: "building-words-fst" } + })?; let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? }; let prefix_data = self @@ -156,13 +158,6 @@ impl<'a> WordFstBuilder<'a> { } } -#[derive(Debug)] -pub struct PrefixSettings { - pub prefix_count_threshold: u64, - pub max_prefix_length: usize, - pub compute_prefixes: bool, -} - pub struct PrefixData { pub prefixes_fst_mmap: Mmap, pub prefix_delta: PrefixDelta, @@ -269,8 +264,9 @@ impl PrefixFstBuilder { let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter()); let mut builder = SetBuilder::new(BufWriter::new(tempfile()?))?; builder.extend_stream(op.r#union())?; - /// TODO: ugly unwrap - let prefix_fst_file = builder.into_inner()?.into_inner().unwrap(); + let prefix_fst_file = builder.into_inner()?.into_inner().map_err(|_| { + InternalError::IndexingMergingKeys { process: "building-words-prefixes-fst" } + })?; let prefix_fst_mmap = unsafe { Mmap::map(&prefix_fst_file)? }; let new_prefix_fst = Set::new(&prefix_fst_mmap)?; let old_prefix_fst = index.words_prefixes_fst(rtxn)?; diff --git a/milli/src/update/new/words_prefix_docids.rs b/milli/src/update/new/words_prefix_docids.rs index 32a22ba73..8795fd9a4 100644 --- a/milli/src/update/new/words_prefix_docids.rs +++ b/milli/src/update/new/words_prefix_docids.rs @@ -1,9 +1,11 @@ use std::collections::HashSet; -use heed::Database; +use hashbrown::HashMap; use heed::{types::Bytes, RwTxn}; +use heed::{BytesDecode, Database}; use roaring::RoaringBitmap; +use crate::heed_codec::StrBEU16Codec; use crate::{CboRoaringBitmapCodec, Index, Prefix, Result}; struct WordPrefixDocids { @@ -25,23 +27,10 @@ impl WordPrefixDocids { prefix_to_compute: &HashSet, prefix_to_delete: &HashSet, ) -> Result<()> { - self.delete_prefixes(wtxn, prefix_to_delete)?; + delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; self.recompute_modified_prefixes(wtxn, prefix_to_compute) } - #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] - fn delete_prefixes(&self, wtxn: &mut heed::RwTxn, prefixes: &HashSet) -> Result<()> { - // We remove all the entries that are no more required in this word prefix docids database. - for prefix in prefixes { - let prefix = prefix.as_bytes(); - if !self.prefix_database.delete(wtxn, prefix)? { - unreachable!("We tried to delete an unknown key") - } - } - - Ok(()) - } - #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] fn recompute_modified_prefixes( &self, @@ -65,6 +54,89 @@ impl WordPrefixDocids { } } +struct WordPrefixIntegerDocids { + database: Database, + prefix_database: Database, +} + +impl WordPrefixIntegerDocids { + fn new( + database: Database, + prefix_database: Database, + ) -> WordPrefixIntegerDocids { + WordPrefixIntegerDocids { database, prefix_database } + } + + fn execute( + self, + wtxn: &mut heed::RwTxn, + prefix_to_compute: &HashSet, + prefix_to_delete: &HashSet, + ) -> Result<()> { + delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; + self.recompute_modified_prefixes(wtxn, prefix_to_compute) + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] + fn recompute_modified_prefixes( + &self, + wtxn: &mut RwTxn, + prefixes: &HashSet, + ) -> Result<()> { + // We fetch the docids associated to the newly added word prefix fst only. + // We use a HashMap to store the docids associated to each position, may be RAM consuming. + let mut integer_docids = HashMap::new(); + let mut key_buffer = Vec::new(); + for prefix in prefixes { + let prefix = prefix.as_bytes(); + for result in self.database.prefix_iter(wtxn, prefix)? { + let (key, data) = result?; + let (_word, pos) = + StrBEU16Codec::bytes_decode(key).map_err(heed::Error::Decoding)?; + + match integer_docids.get_mut(&pos) { + Some(docids) => { + *docids |= &data; + } + None => { + integer_docids.insert(pos, data); + } + } + } + + for (pos, docids) in integer_docids.iter_mut() { + if !docids.is_empty() { + key_buffer.clear(); + key_buffer.extend_from_slice(prefix); + key_buffer.push(0); + key_buffer.extend_from_slice(&pos.to_be_bytes()); + self.prefix_database.put(wtxn, &key_buffer, &docids)?; + } + docids.clear(); + } + } + + Ok(()) + } +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] +fn delete_prefixes( + wtxn: &mut RwTxn, + prefix_database: &Database, + prefixes: &HashSet, +) -> Result<()> { + // We remove all the entries that are no more required in this word prefix docids database. + for prefix in prefixes { + let prefix = prefix.as_bytes(); + if !prefix_database.delete(wtxn, prefix)? { + unreachable!("We tried to delete an unknown key") + } + } + + Ok(()) +} + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] pub fn compute_word_prefix_docids( wtxn: &mut RwTxn, @@ -80,13 +152,27 @@ pub fn compute_word_prefix_docids( } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] -pub fn compute_word_prefix_fid_docids( +pub fn compute_exact_word_prefix_docids( wtxn: &mut RwTxn, index: &Index, prefix_to_compute: &HashSet, prefix_to_delete: &HashSet, ) -> Result<()> { WordPrefixDocids::new( + index.exact_word_docids.remap_key_type(), + index.exact_word_prefix_docids.remap_key_type(), + ) + .execute(wtxn, prefix_to_compute, prefix_to_delete) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] +pub fn compute_word_prefix_fid_docids( + wtxn: &mut RwTxn, + index: &Index, + prefix_to_compute: &HashSet, + prefix_to_delete: &HashSet, +) -> Result<()> { + WordPrefixIntegerDocids::new( index.word_fid_docids.remap_key_type(), index.word_prefix_fid_docids.remap_key_type(), ) @@ -100,7 +186,7 @@ pub fn compute_word_prefix_position_docids( prefix_to_compute: &HashSet, prefix_to_delete: &HashSet, ) -> Result<()> { - WordPrefixDocids::new( + WordPrefixIntegerDocids::new( index.word_position_docids.remap_key_type(), index.word_prefix_position_docids.remap_key_type(), )