diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 17de26831..d2daf756a 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -13,6 +13,7 @@ use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::extract::DocidsExtractor; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ @@ -211,7 +212,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn()?, + index.read_txn().map_err(Error::from).map_err(Arc::new)?, fields_ids_map.clone(), Vec::new(), CboCachedSorter::new( @@ -233,19 +234,23 @@ impl DocidsExtractor for FacetedDocidsExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - buffer, - fields_ids_map, - &attributes_to_extract, - cached_sorter, - document_change?, - ) - }) - })?; + document_changes.into_par_iter().try_for_each_try_init( + || Ok(()), + |_, document_change| { + context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + buffer, + fields_ids_map, + &attributes_to_extract, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }) + }, + )?; } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 828219b41..a9552b499 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -10,6 +10,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ @@ -340,7 +341,7 @@ impl WordDocidsExtractors { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn()?, + index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), WordDocidsCachedSorters::new( @@ -356,18 +357,24 @@ impl WordDocidsExtractors { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, + document_changes.into_par_iter().try_for_each_try_init( + || Ok(()), + |_, document_change| { + context_pool.with( + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }, ) - }) - })?; + }, + )?; } { diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 2557862a2..b3f27ec78 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -14,6 +14,7 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; @@ -59,7 +60,7 @@ pub trait SearchableExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn()?, + index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), CboCachedSorter::new( @@ -81,18 +82,24 @@ pub trait SearchableExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, + document_changes.into_par_iter().try_for_each_try_init( + || Ok(()), + |_, document_change| { + context_pool.with( + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }, ) - }) - })?; + }, + )?; } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index caae956af..d6064e4fb 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -22,6 +22,7 @@ use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::GrenadParameters; use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; @@ -80,7 +81,7 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.document_sender(); - document_changes.clone().into_par_iter().try_for_each(|result| { + document_changes.clone().into_par_iter().try_for_each_try_init(|| Ok(()) as Result<_>, |_, result| { match result? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); @@ -98,7 +99,7 @@ where // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } - Ok(()) as Result<_> + Ok(()) as std::result::Result<_, Arc<_>> })?; document_sender.finish().unwrap(); diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index 01a2cf933..649f09105 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -1,3 +1,4 @@ +use std::convert::identity; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender, TryRecvError}; @@ -27,12 +28,38 @@ pub trait ParallelIteratorExt: ParallelIterator { Ok(t) => Ok(t), Err(err) => Err(Arc::new(err)), }, - move |maybe_t, item| match maybe_t { + move |result, item| match result { Ok(t) => map_op(t, item).map_err(Arc::new), - Err(maybe_err) => Err(maybe_err.clone()), + Err(err) => Err(err.clone()), }, ) } + + /// A method to run a closure of all the items and return an owned error. + /// + /// The init function is ran only as necessary which is basically once by thread. + fn try_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> + where + E: Send + Sync, + F: Fn(&mut T, Self::Item) -> Result<(), Arc> + Sync + Send + Clone, + INIT: Fn() -> Result + Sync + Send + Clone, + { + let result = self.try_for_each_init( + move || match init() { + Ok(t) => Ok(t), + Err(err) => Err(Arc::new(err)), + }, + move |result, item| match result { + Ok(t) => op(t, item), + Err(err) => Err(err.clone()), + }, + ); + + match result { + Ok(()) => Ok(()), + Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), + } + } } impl ParallelIteratorExt for T {}