Build mergers in parallel

This commit is contained in:
ManyTheFish 2024-09-11 11:49:26 +02:00
parent 39b5990f64
commit 2b317c681b
3 changed files with 95 additions and 53 deletions

View File

@ -426,21 +426,38 @@ impl WordDocidsMergerBuilders {
current_docid: _, current_docid: _,
} = other; } = other;
let sorter = word_fid_docids.into_sorter()?; let mut word_fid_docids_readers = Ok(vec![]);
let readers = sorter.into_reader_cursors()?; let mut word_docids_readers = Ok(vec![]);
self.word_fid_docids.extend(readers); let mut exact_word_docids_readers = Ok(vec![]);
let sorter = word_docids.into_sorter()?; let mut word_position_docids_readers = Ok(vec![]);
let readers = sorter.into_reader_cursors()?; let mut fid_word_count_docids_readers = Ok(vec![]);
self.word_docids.extend(readers); rayon::scope(|s| {
let sorter = exact_word_docids.into_sorter()?; s.spawn(|_| {
let readers = sorter.into_reader_cursors()?; word_fid_docids_readers =
self.exact_word_docids.extend(readers); word_fid_docids.into_sorter().and_then(|s| s.into_reader_cursors());
let sorter = word_position_docids.into_sorter()?; });
let readers = sorter.into_reader_cursors()?; s.spawn(|_| {
self.word_position_docids.extend(readers); word_docids_readers =
let sorter = fid_word_count_docids.into_sorter()?; word_docids.into_sorter().and_then(|s| s.into_reader_cursors());
let readers = sorter.into_reader_cursors()?; });
self.fid_word_count_docids.extend(readers); s.spawn(|_| {
exact_word_docids_readers =
exact_word_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
s.spawn(|_| {
word_position_docids_readers =
word_position_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
s.spawn(|_| {
fid_word_count_docids_readers =
fid_word_count_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
});
self.word_fid_docids.extend(word_fid_docids_readers?);
self.word_docids.extend(word_docids_readers?);
self.exact_word_docids.extend(exact_word_docids_readers?);
self.word_position_docids.extend(word_position_docids_readers?);
self.fid_word_count_docids.extend(fid_word_count_docids_readers?);
Ok(()) Ok(())
} }
@ -509,25 +526,35 @@ impl WordDocidsExtractors {
)) ))
}); });
document_changes.into_par_iter().try_for_each(|document_change| { {
context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { let span =
Self::extract_document_change( tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
&*rtxn, let _entered = span.enter();
index, document_changes.into_par_iter().try_for_each(|document_change| {
document_tokenizer, context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| {
fields_ids_map, Self::extract_document_change(
cached_sorter, &*rtxn,
document_change?, index,
) document_tokenizer,
}) fields_ids_map,
})?; cached_sorter,
document_change?,
let mut builder = WordDocidsMergerBuilders::new(); )
for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { })
builder.add_sorters(cache)?; })?;
} }
Ok(builder.build()) {
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let mut builder = WordDocidsMergerBuilders::new();
for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() {
builder.add_sorters(cache)?;
}
Ok(builder.build())
}
} }
fn extract_document_change( fn extract_document_change(

View File

@ -107,7 +107,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
cached_sorter.insert_add_u32(key, docid)?; cached_sorter.insert_add_u32(key, docid)?;
} }
} }
}; }
} }
Ok(()) Ok(())

View File

@ -13,7 +13,7 @@ pub use extract_word_docids::{
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use grenad::Merger; use grenad::Merger;
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter; use super::cache::CboCachedSorter;
@ -78,27 +78,42 @@ pub trait SearchableExtractor {
)) ))
}); });
document_changes.into_par_iter().try_for_each(|document_change| { {
context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { let span =
Self::extract_document_change( tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
&*rtxn, let _entered = span.enter();
index, document_changes.into_par_iter().try_for_each(|document_change| {
document_tokenizer, context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| {
fields_ids_map, Self::extract_document_change(
cached_sorter, &*rtxn,
document_change?, index,
) document_tokenizer,
}) fields_ids_map,
})?; cached_sorter,
document_change?,
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); )
for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { })
let sorter = cache.into_sorter()?; })?;
let readers = sorter.into_reader_cursors()?;
builder.extend(readers);
} }
{
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
Ok(builder.build()) let readers: Vec<_> = context_pool
.into_items()
.par_bridge()
.map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| {
let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors()
})
.collect();
for reader in readers {
builder.extend(reader?);
}
Ok(builder.build())
}
} }
fn extract_document_change( fn extract_document_change(