From a2182e68a6585267c6af9aff5905d3624fc78a97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 5 Oct 2020 20:54:06 +0200 Subject: [PATCH] Rewrite the parallel merge indexing part --- src/bin/indexer.rs | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index f1acfbf15..c550d3338 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -794,26 +794,23 @@ fn main() -> anyhow::Result<()> { // the readers merges potentially done on another thread. enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids }; let (sender, receiver) = sync_channel(3); - let main_sender = sender.clone(); - let word_docids_sender = sender.clone(); - let words_pairs_proximities_docids_sender = sender; debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); rayon::spawn(move || { - let result = merge_readers(main_readers, main_merge); - main_sender.send((DatabaseType::Main, result)).unwrap(); - }); - rayon::spawn(move || { - let result = merge_readers(word_docids_readers, word_docids_merge); - word_docids_sender.send((DatabaseType::WordDocids, result)).unwrap(); - }); - rayon::spawn(move || { - let result = merge_readers( - words_pairs_proximities_docids_readers, - words_pairs_proximities_docids_merge, - ); - let message = (DatabaseType::WordsPairsProximitiesDocids, result); - words_pairs_proximities_docids_sender.send(message).unwrap(); + vec![ + (DatabaseType::Main, main_readers, main_merge as MergeFn), + (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), + ( + DatabaseType::WordsPairsProximitiesDocids, + words_pairs_proximities_docids_readers, + words_pairs_proximities_docids_merge, + ), + ] + .into_par_iter() + .for_each(|(dbtype, readers, merge)| { + let result = merge_readers(readers, merge); + sender.send((dbtype, result)).unwrap(); + }); }); let mut wtxn = env.write_txn()?;