Use the disk less when computing prefixes

This commit is contained in:
Clément Renault 2024-11-21 10:45:37 +01:00
parent 1f9692cd04
commit ab2c83f868
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 43 additions and 14 deletions

View File

@ -441,8 +441,9 @@ where
(indexing_context.send_progress)(Progress::from_step(Step::PostProcessingWords)); (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingWords));
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
compute_prefix_database(index, wtxn, prefix_delta)?; compute_prefix_database(index, wtxn, prefix_delta, grenad_parameters)?;
} }
(indexing_context.send_progress)(Progress::from_step(Step::Finalizing)); (indexing_context.send_progress)(Progress::from_step(Step::Finalizing));
Ok(()) as Result<_> Ok(()) as Result<_>
@ -474,16 +475,17 @@ fn compute_prefix_database(
index: &Index, index: &Index,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
prefix_delta: PrefixDelta, prefix_delta: PrefixDelta,
grenad_parameters: GrenadParameters,
) -> Result<()> { ) -> Result<()> {
let PrefixDelta { modified, deleted } = prefix_delta; let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids // Compute word prefix docids
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?; compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute exact word prefix docids // Compute exact word prefix docids
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted)?; compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix fid docids // Compute word prefix fid docids
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?; compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix position docids // Compute word prefix position docids
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted) compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing")] #[tracing::instrument(level = "trace", skip_all, target = "indexing")]

View File

@ -7,24 +7,31 @@ use heed::types::Bytes;
use heed::{BytesDecode, Database, Error, RoTxn, RwTxn}; use heed::{BytesDecode, Database, Error, RoTxn, RwTxn};
use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
use roaring::MultiOps; use roaring::MultiOps;
use tempfile::tempfile; use tempfile::spooled_tempfile;
use thread_local::ThreadLocal; use thread_local::ThreadLocal;
use super::ref_cell_ext::RefCellExt as _; use super::ref_cell_ext::RefCellExt as _;
use crate::heed_codec::StrBEU16Codec; use crate::heed_codec::StrBEU16Codec;
use crate::update::GrenadParameters;
use crate::{CboRoaringBitmapCodec, Index, Prefix, Result}; use crate::{CboRoaringBitmapCodec, Index, Prefix, Result};
struct WordPrefixDocids { struct WordPrefixDocids {
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
max_memory_by_thread: Option<usize>,
} }
impl WordPrefixDocids { impl WordPrefixDocids {
fn new( fn new(
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
grenad_parameters: GrenadParameters,
) -> WordPrefixDocids { ) -> WordPrefixDocids {
WordPrefixDocids { database, prefix_database } WordPrefixDocids {
database,
prefix_database,
max_memory_by_thread: grenad_parameters.max_memory_by_thread(),
}
} }
fn execute( fn execute(
@ -51,9 +58,12 @@ impl WordPrefixDocids {
// of them and *serialize* them into files. There is one file by CPU. // of them and *serialize* them into files. There is one file by CPU.
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads());
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| {
let refcell = local_entries.get_or_try(|| { let refcell = local_entries.get_or(|| {
tempfile().map(BufWriter::new).map(|f| RefCell::new((Vec::new(), f, Vec::new()))) let file = BufWriter::new(spooled_tempfile(
})?; self.max_memory_by_thread.unwrap_or(usize::MAX),
));
RefCell::new((Vec::new(), file, Vec::new()))
});
let mut refmut = refcell.borrow_mut_or_yield(); let mut refmut = refcell.borrow_mut_or_yield();
let (ref mut index, ref mut file, ref mut buffer) = *refmut; let (ref mut index, ref mut file, ref mut buffer) = *refmut;
@ -144,14 +154,20 @@ unsafe impl<'a, 'rtxn> Sync for FrozenPrefixBitmaps<'a, 'rtxn> {}
struct WordPrefixIntegerDocids { struct WordPrefixIntegerDocids {
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
max_memory_by_thread: Option<usize>,
} }
impl WordPrefixIntegerDocids { impl WordPrefixIntegerDocids {
fn new( fn new(
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
grenad_parameters: GrenadParameters,
) -> WordPrefixIntegerDocids { ) -> WordPrefixIntegerDocids {
WordPrefixIntegerDocids { database, prefix_database } WordPrefixIntegerDocids {
database,
prefix_database,
max_memory_by_thread: grenad_parameters.max_memory_by_thread(),
}
} }
fn execute( fn execute(
@ -178,9 +194,12 @@ impl WordPrefixIntegerDocids {
// of them and *serialize* them into files. There is one file by CPU. // of them and *serialize* them into files. There is one file by CPU.
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads());
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| {
let refcell = local_entries.get_or_try(|| { let refcell = local_entries.get_or(|| {
tempfile().map(BufWriter::new).map(|f| RefCell::new((Vec::new(), f, Vec::new()))) let file = BufWriter::new(spooled_tempfile(
})?; self.max_memory_by_thread.unwrap_or(usize::MAX),
));
RefCell::new((Vec::new(), file, Vec::new()))
});
let mut refmut = refcell.borrow_mut_or_yield(); let mut refmut = refcell.borrow_mut_or_yield();
let (ref mut index, ref mut file, ref mut buffer) = *refmut; let (ref mut index, ref mut file, ref mut buffer) = *refmut;
@ -292,10 +311,12 @@ pub fn compute_word_prefix_docids(
index: &Index, index: &Index,
prefix_to_compute: &HashSet<Prefix>, prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>, prefix_to_delete: &HashSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> { ) -> Result<()> {
WordPrefixDocids::new( WordPrefixDocids::new(
index.word_docids.remap_key_type(), index.word_docids.remap_key_type(),
index.word_prefix_docids.remap_key_type(), index.word_prefix_docids.remap_key_type(),
grenad_parameters,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete)
} }
@ -306,10 +327,12 @@ pub fn compute_exact_word_prefix_docids(
index: &Index, index: &Index,
prefix_to_compute: &HashSet<Prefix>, prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>, prefix_to_delete: &HashSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> { ) -> Result<()> {
WordPrefixDocids::new( WordPrefixDocids::new(
index.exact_word_docids.remap_key_type(), index.exact_word_docids.remap_key_type(),
index.exact_word_prefix_docids.remap_key_type(), index.exact_word_prefix_docids.remap_key_type(),
grenad_parameters,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete)
} }
@ -320,10 +343,12 @@ pub fn compute_word_prefix_fid_docids(
index: &Index, index: &Index,
prefix_to_compute: &HashSet<Prefix>, prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>, prefix_to_delete: &HashSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> { ) -> Result<()> {
WordPrefixIntegerDocids::new( WordPrefixIntegerDocids::new(
index.word_fid_docids.remap_key_type(), index.word_fid_docids.remap_key_type(),
index.word_prefix_fid_docids.remap_key_type(), index.word_prefix_fid_docids.remap_key_type(),
grenad_parameters,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete)
} }
@ -334,10 +359,12 @@ pub fn compute_word_prefix_position_docids(
index: &Index, index: &Index,
prefix_to_compute: &HashSet<Prefix>, prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>, prefix_to_delete: &HashSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> { ) -> Result<()> {
WordPrefixIntegerDocids::new( WordPrefixIntegerDocids::new(
index.word_position_docids.remap_key_type(), index.word_position_docids.remap_key_type(),
index.word_prefix_position_docids.remap_key_type(), index.word_prefix_position_docids.remap_key_type(),
grenad_parameters,
) )
.execute(wtxn, prefix_to_compute, prefix_to_delete) .execute(wtxn, prefix_to_compute, prefix_to_delete)
} }