diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index e90ce97db..c57bc86f1 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -1,4 +1,36 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError}; +use rayon::iter::{MapInit, ParallelIterator}; + +pub trait ParallelIteratorExt: ParallelIterator { + fn try_map_try_init( + self, + init: INIT, + map_op: F, + ) -> MapInit< + Self, + impl Fn() -> Result> + Sync + Send + Clone, + impl Fn(&mut Result>, Self::Item) -> Result> + Sync + Send + Clone, + > + where + E: Send, + F: Fn(&mut T, Self::Item) -> Result + Sync + Send + Clone, + INIT: Fn() -> Result + Sync + Send + Clone, + R: Send, + { + self.map_init( + move || match init() { + Ok(t) => Ok(t), + Err(err) => Err(Some(err)), + }, + move |maybe_t, item| match maybe_t { + Ok(t) => map_op(t, item).map_err(Some), + Err(maybe_err) => Err(maybe_err.take()), + }, + ) + } +} + +impl ParallelIteratorExt for T {} /// A pool of items that can be pull and generated on demand. pub struct ItemsPool