diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index d2daf756a..40f561b97 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -212,7 +212,6 @@ impl DocidsExtractor for FacetedDocidsExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn().map_err(Error::from).map_err(Arc::new)?, fields_ids_map.clone(), Vec::new(), CboCachedSorter::new( @@ -234,12 +233,12 @@ impl DocidsExtractor for FacetedDocidsExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each_try_init( - || Ok(()), - |_, document_change| { - context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { + document_changes.into_par_iter().try_arc_for_each_try_init( + || index.read_txn().map_err(Error::from), + |rtxn, document_change| { + context_pool.with(|(fields_ids_map, buffer, cached_sorter)| { Self::extract_document_change( - &*rtxn, + rtxn, index, buffer, fields_ids_map, @@ -261,7 +260,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let readers: Vec<_> = context_pool .into_items() .par_bridge() - .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index a9552b499..f59f5a03d 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::IntoParallelIterator; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::CboCachedSorter; @@ -341,7 +341,6 @@ impl WordDocidsExtractors { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), WordDocidsCachedSorters::new( @@ -357,22 +356,20 @@ impl WordDocidsExtractors { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each_try_init( - || Ok(()), - |_, document_change| { - context_pool.with( - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }, - ) + document_changes.into_par_iter().try_arc_for_each_try_init( + || index.read_txn().map_err(Error::from), + |rtxn, document_change| { + context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }) }, )?; } @@ -382,7 +379,7 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { + for (_tokenizer, _fields_ids_map, cache) in context_pool.into_items() { builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index b3f27ec78..b6cda3a87 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -60,7 +60,6 @@ pub trait SearchableExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), CboCachedSorter::new( @@ -82,22 +81,20 @@ pub trait SearchableExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each_try_init( - || Ok(()), - |_, document_change| { - context_pool.with( - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }, - ) + document_changes.into_par_iter().try_arc_for_each_try_init( + || index.read_txn().map_err(Error::from), + |rtxn, document_change| { + context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }) }, )?; } @@ -110,7 +107,7 @@ pub trait SearchableExtractor { let readers: Vec<_> = context_pool .into_items() .par_bridge() - .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index b2dc67ce1..38d4a408f 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -5,11 +5,10 @@ use std::sync::Arc; use heed::types::Bytes; use heed::RoTxn; use memmap2::Mmap; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use IndexDocumentsMethod as Idm; use super::super::document_change::DocumentChange; -use super::super::items_pool::ItemsPool; use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index d6064e4fb..934d0a364 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -81,7 +81,8 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.document_sender(); - document_changes.clone().into_par_iter().try_for_each_try_init(|| Ok(()) as Result<_>, |_, result| { + document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>( + |result| { match result? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); @@ -99,7 +100,7 @@ where // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } - Ok(()) as std::result::Result<_, Arc<_>> + Ok(()) })?; document_sender.finish().unwrap(); diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index aa01f6547..db63256a6 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use rayon::iter::{IndexedParallelIterator, ParallelBridge, ParallelIterator}; +use rayon::iter::IndexedParallelIterator; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index 649f09105..8fa22b75b 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -1,4 +1,3 @@ -use std::convert::identity; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender, TryRecvError}; @@ -38,7 +37,7 @@ pub trait ParallelIteratorExt: ParallelIterator { /// A method to run a closure of all the items and return an owned error. /// /// The init function is ran only as necessary which is basically once by thread. - fn try_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> + fn try_arc_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> where E: Send + Sync, F: Fn(&mut T, Self::Item) -> Result<(), Arc> + Sync + Send + Clone, @@ -60,6 +59,17 @@ pub trait ParallelIteratorExt: ParallelIterator { Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), } } + + fn try_arc_for_each(self, op: F) -> Result<(), E> + where + E: Send + Sync, + F: Fn(Self::Item) -> Result<(), Arc> + Sync + Send + Clone, + { + match self.try_for_each(op) { + Ok(()) => Ok(()), + Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), + } + } } impl ParallelIteratorExt for T {}