Compute prefix databases

We are now computing the prefix FST and a prefix delta in the Merger thread,
after all the databases are written, the main thread will recompute the prefix databases based on the prefix delta without needing any grenad temporary file anymore
This commit is contained in:
ManyTheFish 2024-10-01 09:56:49 +02:00
parent 64589278ac
commit bb7a503e5d
7 changed files with 313 additions and 77 deletions

View File

@ -88,6 +88,7 @@ pub type Object = serde_json::Map<String, serde_json::Value>;
pub type Position = u32;
pub type RelativePosition = u16;
pub type SmallString32 = smallstr::SmallString<[u8; 32]>;
pub type Prefix = smallstr::SmallString<[u8; 16]>;
pub type SmallVec16<T> = smallvec::SmallVec<[T; 16]>;
pub type SmallVec32<T> = smallvec::SmallVec<[T; 32]>;
pub type SmallVec8<T> = smallvec::SmallVec<[T; 8]>;

View File

@ -8,7 +8,7 @@ use memmap2::Mmap;
use super::extract::FacetKind;
use super::StdResult;
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY};
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::update::new::KvReaderFieldId;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{DocumentId, Index};
@ -257,6 +257,17 @@ impl MainSender<'_> {
}
}
pub fn write_words_prefixes_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(
WORDS_PREFIXES_FST_KEY.as_bytes(),
value,
));
match self.0.send(WriterOperation { database: Database::Main, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.0.send(WriterOperation { database: Database::Main, entry }) {

View File

@ -14,6 +14,10 @@ use super::channel::*;
use super::document_change::DocumentChange;
use super::extract::*;
use super::merger::merge_grenad_entries;
use super::word_fst_builder::PrefixDelta;
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
};
use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::update::new::channel::ExtractorSender;
@ -174,7 +178,7 @@ where
// TODO manage the errors correctly
let current_span = tracing::Span::current();
let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || {
let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || {
let span =
tracing::trace_span!(target: "indexing::documents", parent: &current_span, "merge");
let _entered = span.enter();
@ -202,7 +206,20 @@ where
/// TODO handle the panicking threads
handle.join().unwrap()?;
handle2.join().unwrap()?;
let merger_result = merger_thread.join().unwrap()?;
if let Some(prefix_delta) = merger_result.prefix_delta {
let span = tracing::trace_span!(target: "indexing", "prefix");
let _entered = span.enter();
let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix fid docids
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix position docids
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)?;
}
Ok(()) as Result<_>
})?;

View File

@ -6,15 +6,17 @@ use grenad::Merger;
use heed::types::Bytes;
use heed::{Database, RoTxn};
use roaring::RoaringBitmap;
use std::collections::HashSet;
use super::channel::*;
use super::extract::FacetKind;
use super::word_fst_builder::{PrefixData, PrefixDelta, PrefixSettings};
use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update};
use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::new::word_fst_builder::WordFstBuilder;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Result};
use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Prefix, Result};
/// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
@ -24,10 +26,11 @@ pub fn merge_grenad_entries(
rtxn: &RoTxn,
index: &Index,
mut global_fields_ids_map: GlobalFieldsIdsMap<'_>,
) -> Result<()> {
) -> Result<MergerResult> {
let mut buffer: Vec<u8> = Vec::new();
let mut documents_ids = index.documents_ids(rtxn)?;
let mut geo_extractor = GeoExtractor::new(rtxn, index)?;
let mut merger_result = MergerResult::default();
for merger_operation in receiver {
match merger_operation {
@ -59,7 +62,15 @@ pub fn merge_grenad_entries(
}
MergerOperation::WordDocidsMerger(merger) => {
let words_fst = index.words_fst(rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst, 4)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
/// TODO make this configurable
let prefix_settings = PrefixSettings {
compute_prefixes: true,
max_prefix_length: 4,
prefix_count_threshold: 100,
};
word_fst_builder.with_prefix_settings(prefix_settings);
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
@ -80,8 +91,12 @@ pub fn merge_grenad_entries(
tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
let (word_fst_mmap, prefix_fst_mmap) = word_fst_builder.build()?;
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, rtxn)?;
sender.main().write_words_fst(word_fst_mmap).unwrap();
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap();
merger_result.prefix_delta = Some(prefix_delta);
}
}
}
MergerOperation::WordFidDocidsMerger(merger) => {
@ -185,7 +200,13 @@ pub fn merge_grenad_entries(
// ...
Ok(())
Ok(merger_result)
}
#[derive(Default, Debug)]
pub struct MergerResult {
/// The delta of the prefixes
pub prefix_delta: Option<PrefixDelta>,
}
pub struct GeoExtractor {

View File

@ -13,6 +13,7 @@ mod items_pool;
mod merger;
mod top_level_map;
mod word_fst_builder;
mod words_prefix_docids;
/// TODO move them elsewhere
pub type StdResult<T, E> = std::result::Result<T, E>;

View File

@ -2,50 +2,37 @@ use std::{fs::File, io::BufWriter};
use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap;
use std::collections::HashSet;
use tempfile::tempfile;
use crate::{update::del_add::DelAdd, Result, SmallString32};
use crate::{update::del_add::DelAdd, Prefix, Result};
pub struct WordFstBuilder<'a> {
stream: Option<fst::set::Stream<'a>>,
word_fst_builder: SetBuilder<BufWriter<File>>,
/// TODO: Replace the full memory allocation
prefix_fst_builders: Vec<SetBuilder<Vec<u8>>>,
max_prefix_length: usize,
last_word: Option<Vec<u8>>,
current_prefix: Vec<SmallString32>,
current_prefix_count: Vec<u64>,
prefix_count_threshold: u64,
prefix_fst_builder: Option<PrefixFstBuilder>,
inserted_words: usize,
registered_words: usize,
base_set_length: usize,
}
impl<'a> WordFstBuilder<'a> {
pub fn new(
words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>,
max_prefix_length: usize,
) -> Result<Self> {
let mut prefix_fst_builders = Vec::new();
for _ in 0..max_prefix_length {
prefix_fst_builders.push(SetBuilder::memory());
}
pub fn new(words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>) -> Result<Self> {
Ok(Self {
stream: Some(words_fst.stream()),
word_fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?,
prefix_fst_builders,
max_prefix_length,
prefix_fst_builder: None,
last_word: None,
current_prefix: vec![SmallString32::new(); max_prefix_length],
current_prefix_count: vec![0; max_prefix_length],
prefix_count_threshold: 100,
inserted_words: 0,
registered_words: 0,
base_set_length: words_fst.len(),
})
}
pub fn with_prefix_settings(&mut self, prefix_settings: PrefixSettings) -> &Self {
self.prefix_fst_builder = PrefixFstBuilder::new(prefix_settings);
self
}
pub fn register_word(&mut self, deladd: DelAdd, right: &[u8]) -> Result<()> {
if deladd == DelAdd::Addition {
self.registered_words += 1;
@ -85,7 +72,7 @@ impl<'a> WordFstBuilder<'a> {
// If we reach this point, it means that the stream is empty
// and we need to insert the incoming word
self.insert_word(right)?;
self.insert_word(right, deladd, true)?;
self.stream = Some(stream);
}
@ -104,26 +91,18 @@ impl<'a> WordFstBuilder<'a> {
match left.cmp(right) {
std::cmp::Ordering::Less => {
// We need to insert the last word from the current fst
self.insert_word(left)?;
self.insert_word(left, DelAdd::Addition, false)?;
left_inserted = true;
}
std::cmp::Ordering::Equal => {
// Addition: We insert the word
// Deletion: We delete the word by not inserting it
if deladd == DelAdd::Addition {
self.insert_word(right)?;
}
self.insert_word(right, deladd, true)?;
left_inserted = true;
right_inserted = true;
}
std::cmp::Ordering::Greater => {
// Addition: We insert the word and keep the last word
// Deletion: We keep the current word until the left word to delete is greater or equal
if deladd == DelAdd::Addition {
self.insert_word(right)?;
}
self.insert_word(right, deladd, true)?;
right_inserted = true;
}
@ -132,14 +111,111 @@ impl<'a> WordFstBuilder<'a> {
Ok((left_inserted, right_inserted))
}
fn insert_word(&mut self, bytes: &[u8]) -> Result<()> {
fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> {
// Addition: We insert the word
// Deletion: We delete the word by not inserting it
if deladd == DelAdd::Addition {
self.inserted_words += 1;
self.word_fst_builder.insert(bytes)?;
}
if let Some(prefix_fst_builder) = self.prefix_fst_builder.as_mut() {
prefix_fst_builder.insert_word(bytes, deladd, is_modified)?;
}
Ok(())
}
fn drain_stream(&mut self) -> Result<()> {
if let Some(mut stream) = self.stream.take() {
while let Some(current) = stream.next() {
self.insert_word(current, DelAdd::Addition, false)?;
}
}
Ok(())
}
pub fn build(
mut self,
index: &crate::Index,
rtxn: &heed::RoTxn,
) -> Result<(Mmap, Option<PrefixData>)> {
self.drain_stream()?;
/// TODO: ugly unwrap
let words_fst_file = self.word_fst_builder.into_inner()?.into_inner().unwrap();
let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? };
let prefix_data = self
.prefix_fst_builder
.map(|prefix_fst_builder| prefix_fst_builder.build(index, rtxn))
.transpose()?;
Ok((words_fst_mmap, prefix_data))
}
}
#[derive(Debug)]
pub struct PrefixSettings {
pub prefix_count_threshold: u64,
pub max_prefix_length: usize,
pub compute_prefixes: bool,
}
pub struct PrefixData {
pub prefixes_fst_mmap: Mmap,
pub prefix_delta: PrefixDelta,
}
#[derive(Debug)]
pub struct PrefixDelta {
pub modified: HashSet<Prefix>,
pub deleted: HashSet<Prefix>,
}
struct PrefixFstBuilder {
prefix_count_threshold: u64,
max_prefix_length: usize,
/// TODO: Replace the full memory allocation
prefix_fst_builders: Vec<SetBuilder<Vec<u8>>>,
current_prefix: Vec<Prefix>,
current_prefix_count: Vec<u64>,
modified_prefixes: HashSet<Prefix>,
current_prefix_is_modified: Vec<bool>,
}
impl PrefixFstBuilder {
pub fn new(prefix_settings: PrefixSettings) -> Option<Self> {
let PrefixSettings { prefix_count_threshold, max_prefix_length, compute_prefixes } =
prefix_settings;
if !compute_prefixes {
return None;
}
let mut prefix_fst_builders = Vec::new();
for _ in 0..max_prefix_length {
prefix_fst_builders.push(SetBuilder::memory());
}
Some(Self {
prefix_count_threshold,
max_prefix_length,
prefix_fst_builders,
current_prefix: vec![Prefix::new(); max_prefix_length],
current_prefix_count: vec![0; max_prefix_length],
modified_prefixes: HashSet::new(),
current_prefix_is_modified: vec![false; max_prefix_length],
})
}
fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> {
for n in 0..self.max_prefix_length {
let current_prefix = &mut self.current_prefix[n];
let current_prefix_count = &mut self.current_prefix_count[n];
let builder = &mut self.prefix_fst_builders[n];
let current_prefix_is_modified = &mut self.current_prefix_is_modified[n];
// We try to get the first n bytes out of this string but we only want
// to split at valid characters bounds. If we try to split in the middle of
@ -153,43 +229,36 @@ impl<'a> WordFstBuilder<'a> {
// This is the first iteration of the loop,
// or the current word doesn't starts with the current prefix.
if *current_prefix_count == 0 || prefix != current_prefix.as_str() {
*current_prefix = SmallString32::from(prefix);
*current_prefix = Prefix::from(prefix);
*current_prefix_count = 0;
*current_prefix_is_modified = false;
}
*current_prefix_is_modified |= is_modified;
if deladd == DelAdd::Addition {
*current_prefix_count += 1;
}
// There is enough words corresponding to this prefix to add it to the cache.
/// TODO: (LEGACY) Replace this by `==` to avoid inserting several times the same prefix?
if *current_prefix_count >= self.prefix_count_threshold {
if *current_prefix_count == self.prefix_count_threshold {
builder.insert(prefix)?;
if *current_prefix_is_modified {
self.modified_prefixes.insert(current_prefix.clone());
}
}
}
Ok(())
}
fn drain_stream(&mut self) -> Result<()> {
if let Some(mut stream) = self.stream.take() {
while let Some(current) = stream.next() {
self.insert_word(current)?;
}
}
Ok(())
}
pub fn build(mut self) -> Result<(Mmap, Mmap)> {
self.drain_stream()?;
/// TODO: ugly unwrap
let words_fst_file = self.word_fst_builder.into_inner()?.into_inner().unwrap();
let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? };
fn build(self, index: &crate::Index, rtxn: &heed::RoTxn) -> Result<PrefixData> {
// We merge all of the previously computed prefixes into on final set.
let mut prefix_fsts = Vec::new();
for builder in self.prefix_fst_builders {
prefix_fsts.push(builder.into_set());
for builder in self.prefix_fst_builders.into_iter() {
let prefix_fst = builder.into_set();
prefix_fsts.push(prefix_fst);
}
let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter());
let mut builder = SetBuilder::new(BufWriter::new(tempfile()?))?;
@ -197,14 +266,22 @@ impl<'a> WordFstBuilder<'a> {
/// TODO: ugly unwrap
let prefix_fst_file = builder.into_inner()?.into_inner().unwrap();
let prefix_fst_mmap = unsafe { Mmap::map(&prefix_fst_file)? };
let new_prefix_fst = Set::new(&prefix_fst_mmap)?;
let old_prefix_fst = index.words_prefixes_fst(rtxn)?;
let mut deleted_prefixes = HashSet::new();
{
let mut deleted_prefixes_stream = old_prefix_fst.op().add(&new_prefix_fst).difference();
while let Some(prefix) = deleted_prefixes_stream.next() {
deleted_prefixes.insert(Prefix::from(std::str::from_utf8(prefix)?));
}
}
eprintln!("================================================");
eprintln!(
"inserted words: {}, registered words: {}, base set len: {}",
self.inserted_words, self.registered_words, self.base_set_length
);
eprintln!("================================================");
Ok((words_fst_mmap, prefix_fst_mmap))
Ok(PrefixData {
prefixes_fst_mmap: prefix_fst_mmap,
prefix_delta: PrefixDelta {
modified: self.modified_prefixes,
deleted: deleted_prefixes,
},
})
}
}

View File

@ -0,0 +1,108 @@
use std::collections::HashSet;
use heed::Database;
use heed::{types::Bytes, RwTxn};
use roaring::RoaringBitmap;
use crate::{CboRoaringBitmapCodec, Index, Prefix, Result};
struct WordPrefixDocids {
database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
}
impl WordPrefixDocids {
fn new(
database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
) -> WordPrefixDocids {
WordPrefixDocids { database, prefix_database }
}
fn execute(
self,
wtxn: &mut heed::RwTxn,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
) -> Result<()> {
self.delete_prefixes(wtxn, prefix_to_delete)?;
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn delete_prefixes(&self, wtxn: &mut heed::RwTxn, prefixes: &HashSet<Prefix>) -> Result<()> {
// We remove all the entries that are no more required in this word prefix docids database.
for prefix in prefixes {
let prefix = prefix.as_bytes();
if !self.prefix_database.delete(wtxn, prefix)? {
unreachable!("We tried to delete an unknown key")
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn recompute_modified_prefixes(
&self,
wtxn: &mut RwTxn,
prefixes: &HashSet<Prefix>,
) -> Result<()> {
// We fetch the docids associated to the newly added word prefix fst only.
let mut docids = RoaringBitmap::new();
for prefix in prefixes {
docids.clear();
let prefix = prefix.as_bytes();
for result in self.database.prefix_iter(wtxn, prefix)? {
let (_word, data) = result?;
docids |= &data;
}
self.prefix_database.put(wtxn, prefix, &docids)?;
}
Ok(())
}
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
pub fn compute_word_prefix_docids(
wtxn: &mut RwTxn,
index: &Index,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
) -> Result<()> {
WordPrefixDocids::new(
index.word_docids.remap_key_type(),
index.word_prefix_docids.remap_key_type(),
)
.execute(wtxn, prefix_to_compute, prefix_to_delete)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
pub fn compute_word_prefix_fid_docids(
wtxn: &mut RwTxn,
index: &Index,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
) -> Result<()> {
WordPrefixDocids::new(
index.word_fid_docids.remap_key_type(),
index.word_prefix_fid_docids.remap_key_type(),
)
.execute(wtxn, prefix_to_compute, prefix_to_delete)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
pub fn compute_word_prefix_position_docids(
wtxn: &mut RwTxn,
index: &Index,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
) -> Result<()> {
WordPrefixDocids::new(
index.word_position_docids.remap_key_type(),
index.word_prefix_position_docids.remap_key_type(),
)
.execute(wtxn, prefix_to_compute, prefix_to_delete)
}