Rename and use the try_arc_for_each_try_init method

This commit is contained in:
Clément Renault 2024-09-29 17:42:26 +02:00
parent d83c9a4074
commit 00e045b249
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 54 additions and 51 deletions

View File

@ -212,7 +212,6 @@ impl DocidsExtractor for FacetedDocidsExtractor {
let context_pool = ItemsPool::new(|| { let context_pool = ItemsPool::new(|| {
Ok(( Ok((
index.read_txn().map_err(Error::from).map_err(Arc::new)?,
fields_ids_map.clone(), fields_ids_map.clone(),
Vec::new(), Vec::new(),
CboCachedSorter::new( CboCachedSorter::new(
@ -234,12 +233,12 @@ impl DocidsExtractor for FacetedDocidsExtractor {
let span = let span =
tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
let _entered = span.enter(); let _entered = span.enter();
document_changes.into_par_iter().try_for_each_try_init( document_changes.into_par_iter().try_arc_for_each_try_init(
|| Ok(()), || index.read_txn().map_err(Error::from),
|_, document_change| { |rtxn, document_change| {
context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { context_pool.with(|(fields_ids_map, buffer, cached_sorter)| {
Self::extract_document_change( Self::extract_document_change(
&*rtxn, rtxn,
index, index,
buffer, buffer,
fields_ids_map, fields_ids_map,
@ -261,7 +260,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
let readers: Vec<_> = context_pool let readers: Vec<_> = context_pool
.into_items() .into_items()
.par_bridge() .par_bridge()
.map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { .map(|(_tokenizer, _fields_ids_map, cached_sorter)| {
let sorter = cached_sorter.into_sorter()?; let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors() sorter.into_reader_cursors()
}) })

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::IntoParallelIterator;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::CboCachedSorter;
@ -341,7 +341,6 @@ impl WordDocidsExtractors {
let context_pool = ItemsPool::new(|| { let context_pool = ItemsPool::new(|| {
Ok(( Ok((
index.read_txn().map_err(Error::from).map_err(Arc::new)?,
&document_tokenizer, &document_tokenizer,
fields_ids_map.clone(), fields_ids_map.clone(),
WordDocidsCachedSorters::new( WordDocidsCachedSorters::new(
@ -357,22 +356,20 @@ impl WordDocidsExtractors {
let span = let span =
tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
let _entered = span.enter(); let _entered = span.enter();
document_changes.into_par_iter().try_for_each_try_init( document_changes.into_par_iter().try_arc_for_each_try_init(
|| Ok(()), || index.read_txn().map_err(Error::from),
|_, document_change| { |rtxn, document_change| {
context_pool.with( context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| {
|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { Self::extract_document_change(
Self::extract_document_change( rtxn,
&*rtxn, index,
index, document_tokenizer,
document_tokenizer, fields_ids_map,
fields_ids_map, cached_sorter,
cached_sorter, document_change?,
document_change?, )
) .map_err(Arc::new)
.map_err(Arc::new) })
},
)
}, },
)?; )?;
} }
@ -382,7 +379,7 @@ impl WordDocidsExtractors {
tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = WordDocidsMergerBuilders::new(); let mut builder = WordDocidsMergerBuilders::new();
for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { for (_tokenizer, _fields_ids_map, cache) in context_pool.into_items() {
builder.add_sorters(cache)?; builder.add_sorters(cache)?;
} }

View File

@ -60,7 +60,6 @@ pub trait SearchableExtractor {
let context_pool = ItemsPool::new(|| { let context_pool = ItemsPool::new(|| {
Ok(( Ok((
index.read_txn().map_err(Error::from).map_err(Arc::new)?,
&document_tokenizer, &document_tokenizer,
fields_ids_map.clone(), fields_ids_map.clone(),
CboCachedSorter::new( CboCachedSorter::new(
@ -82,22 +81,20 @@ pub trait SearchableExtractor {
let span = let span =
tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
let _entered = span.enter(); let _entered = span.enter();
document_changes.into_par_iter().try_for_each_try_init( document_changes.into_par_iter().try_arc_for_each_try_init(
|| Ok(()), || index.read_txn().map_err(Error::from),
|_, document_change| { |rtxn, document_change| {
context_pool.with( context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| {
|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { Self::extract_document_change(
Self::extract_document_change( rtxn,
&*rtxn, index,
index, document_tokenizer,
document_tokenizer, fields_ids_map,
fields_ids_map, cached_sorter,
cached_sorter, document_change?,
document_change?, )
) .map_err(Arc::new)
.map_err(Arc::new) })
},
)
}, },
)?; )?;
} }
@ -110,7 +107,7 @@ pub trait SearchableExtractor {
let readers: Vec<_> = context_pool let readers: Vec<_> = context_pool
.into_items() .into_items()
.par_bridge() .par_bridge()
.map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { .map(|(_tokenizer, _fields_ids_map, cached_sorter)| {
let sorter = cached_sorter.into_sorter()?; let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors() sorter.into_reader_cursors()
}) })

View File

@ -5,11 +5,10 @@ use std::sync::Arc;
use heed::types::Bytes; use heed::types::Bytes;
use heed::RoTxn; use heed::RoTxn;
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator};
use IndexDocumentsMethod as Idm; use IndexDocumentsMethod as Idm;
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
use super::super::items_pool::ItemsPool;
use super::super::{CowStr, TopLevelMap}; use super::super::{CowStr, TopLevelMap};
use super::DocumentChanges; use super::DocumentChanges;
use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::documents::{DocumentIdExtractionError, PrimaryKey};

View File

@ -81,7 +81,8 @@ where
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.document_sender(); let document_sender = extractor_sender.document_sender();
document_changes.clone().into_par_iter().try_for_each_try_init(|| Ok(()) as Result<_>, |_, result| { document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>(
|result| {
match result? { match result? {
DocumentChange::Deletion(deletion) => { DocumentChange::Deletion(deletion) => {
let docid = deletion.docid(); let docid = deletion.docid();
@ -99,7 +100,7 @@ where
// extracted_dictionary_sender.send(self, dictionary: &[u8]); // extracted_dictionary_sender.send(self, dictionary: &[u8]);
} }
} }
Ok(()) as std::result::Result<_, Arc<_>> Ok(())
})?; })?;
document_sender.finish().unwrap(); document_sender.finish().unwrap();

View File

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use rayon::iter::{IndexedParallelIterator, ParallelBridge, ParallelIterator}; use rayon::iter::IndexedParallelIterator;
use super::DocumentChanges; use super::DocumentChanges;
use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::documents::{DocumentIdExtractionError, PrimaryKey};

View File

@ -1,4 +1,3 @@
use std::convert::identity;
use std::sync::Arc; use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender, TryRecvError}; use crossbeam_channel::{Receiver, Sender, TryRecvError};
@ -38,7 +37,7 @@ pub trait ParallelIteratorExt: ParallelIterator {
/// A method to run a closure of all the items and return an owned error. /// A method to run a closure of all the items and return an owned error.
/// ///
/// The init function is ran only as necessary which is basically once by thread. /// The init function is ran only as necessary which is basically once by thread.
fn try_for_each_try_init<F, INIT, T, E>(self, init: INIT, op: F) -> Result<(), E> fn try_arc_for_each_try_init<F, INIT, T, E>(self, init: INIT, op: F) -> Result<(), E>
where where
E: Send + Sync, E: Send + Sync,
F: Fn(&mut T, Self::Item) -> Result<(), Arc<E>> + Sync + Send + Clone, F: Fn(&mut T, Self::Item) -> Result<(), Arc<E>> + Sync + Send + Clone,
@ -60,6 +59,17 @@ pub trait ParallelIteratorExt: ParallelIterator {
Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")),
} }
} }
fn try_arc_for_each<F, E>(self, op: F) -> Result<(), E>
where
E: Send + Sync,
F: Fn(Self::Item) -> Result<(), Arc<E>> + Sync + Send + Clone,
{
match self.try_for_each(op) {
Ok(()) => Ok(()),
Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")),
}
}
} }
impl<T: ParallelIterator> ParallelIteratorExt for T {} impl<T: ParallelIterator> ParallelIteratorExt for T {}