Rewrite the parallel merge indexing part

This commit is contained in:
Clément Renault 2020-10-05 20:54:06 +02:00
parent e9e03259c1
commit a2182e68a6
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -794,26 +794,23 @@ fn main() -> anyhow::Result<()> {
// the readers merges potentially done on another thread. // the readers merges potentially done on another thread.
enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids }; enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids };
let (sender, receiver) = sync_channel(3); let (sender, receiver) = sync_channel(3);
let main_sender = sender.clone();
let word_docids_sender = sender.clone();
let words_pairs_proximities_docids_sender = sender;
debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); debug!("Merging the main, word docids and words pairs proximity docids in parallel...");
rayon::spawn(move || { rayon::spawn(move || {
let result = merge_readers(main_readers, main_merge); vec![
main_sender.send((DatabaseType::Main, result)).unwrap(); (DatabaseType::Main, main_readers, main_merge as MergeFn),
}); (DatabaseType::WordDocids, word_docids_readers, word_docids_merge),
rayon::spawn(move || { (
let result = merge_readers(word_docids_readers, word_docids_merge); DatabaseType::WordsPairsProximitiesDocids,
word_docids_sender.send((DatabaseType::WordDocids, result)).unwrap();
});
rayon::spawn(move || {
let result = merge_readers(
words_pairs_proximities_docids_readers, words_pairs_proximities_docids_readers,
words_pairs_proximities_docids_merge, words_pairs_proximities_docids_merge,
); ),
let message = (DatabaseType::WordsPairsProximitiesDocids, result); ]
words_pairs_proximities_docids_sender.send(message).unwrap(); .into_par_iter()
.for_each(|(dbtype, readers, merge)| {
let result = merge_readers(readers, merge);
sender.send((dbtype, result)).unwrap();
});
}); });
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;