Move the ParallelIteratorExt into the parallel_iterator_ext module

This commit is contained in:
Clément Renault 2024-10-01 11:10:18 +02:00
parent dead7a56a3
commit b7a5ba100e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
9 changed files with 11 additions and 66 deletions

View File

@ -14,8 +14,8 @@ use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::append_only_vec::AppendOnlyVec;
use crate::update::new::extract::DocidsExtractor; use crate::update::new::extract::DocidsExtractor;
use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{ use crate::{
DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH,

View File

@ -12,8 +12,8 @@ use super::SearchableExtractor;
use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::append_only_vec::AppendOnlyVec;
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{ use crate::{
bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result,

View File

@ -15,8 +15,8 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter; use super::cache::CboCachedSorter;
use super::DocidsExtractor; use super::DocidsExtractor;
use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::append_only_vec::AppendOnlyVec;
use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};

View File

@ -4,7 +4,7 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::DocumentChanges; use super::DocumentChanges;
use crate::update::new::items_pool::ParallelIteratorExt as _; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::{Deletion, DocumentChange}; use crate::update::new::{Deletion, DocumentChange};
use crate::{Error, FieldsIdsMap, Index, Result}; use crate::{Error, FieldsIdsMap, Index, Result};

View File

@ -12,7 +12,7 @@ use super::super::document_change::DocumentChange;
use super::super::{CowStr, TopLevelMap}; use super::super::{CowStr, TopLevelMap};
use super::DocumentChanges; use super::DocumentChanges;
use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::documents::{DocumentIdExtractionError, PrimaryKey};
use crate::update::new::items_pool::ParallelIteratorExt as _; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError};

View File

@ -22,7 +22,7 @@ use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::update::new::channel::ExtractorSender; use crate::update::new::channel::ExtractorSender;
use crate::update::settings::InnerIndexSettings; use crate::update::settings::InnerIndexSettings;
use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};

View File

@ -5,7 +5,7 @@ use rayon::iter::IndexedParallelIterator;
use super::DocumentChanges; use super::DocumentChanges;
use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::documents::{DocumentIdExtractionError, PrimaryKey};
use crate::update::concurrent_available_ids::ConcurrentAvailableIds; use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};

View File

@ -1,5 +1,4 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update}; pub use document_change::{Deletion, DocumentChange, Insertion, Update};
pub use items_pool::ItemsPool;
pub use top_level_map::{CowStr, TopLevelMap}; pub use top_level_map::{CowStr, TopLevelMap};
use super::del_add::DelAdd; use super::del_add::DelAdd;
@ -10,8 +9,8 @@ mod channel;
mod document_change; mod document_change;
mod extract; mod extract;
pub mod indexer; pub mod indexer;
mod items_pool;
mod merger; mod merger;
mod parallel_iterator_ext;
mod top_level_map; mod top_level_map;
mod word_fst_builder; mod word_fst_builder;
mod words_prefix_docids; mod words_prefix_docids;

View File

@ -1,6 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender, TryRecvError};
use rayon::iter::{MapInit, ParallelIterator}; use rayon::iter::{MapInit, ParallelIterator};
pub trait ParallelIteratorExt: ParallelIterator { pub trait ParallelIteratorExt: ParallelIterator {
@ -73,56 +72,3 @@ pub trait ParallelIteratorExt: ParallelIterator {
} }
impl<T: ParallelIterator> ParallelIteratorExt for T {} impl<T: ParallelIterator> ParallelIteratorExt for T {}
/// A pool of items that can be pull and generated on demand.
pub struct ItemsPool<F, T, E>
where
F: Fn() -> Result<T, E>,
{
init: F,
sender: Sender<T>,
receiver: Receiver<T>,
}
impl<F, T, E> ItemsPool<F, T, E>
where
F: Fn() -> Result<T, E>,
{
/// Create a new unbounded items pool with the specified function
/// to generate items when needed.
///
/// The `init` function will be invoked whenever a call to `with` requires new items.
pub fn new(init: F) -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
ItemsPool { init, sender, receiver }
}
/// Consumes the pool to retrieve all remaining items.
///
/// This method is useful for cleaning up and managing the items once they are no longer needed.
pub fn into_items(self) -> crossbeam_channel::IntoIter<T> {
self.receiver.into_iter()
}
/// Allows running a function on an item from the pool,
/// potentially generating a new item if the pool is empty.
pub fn with<G, R>(&self, f: G) -> Result<R, E>
where
G: FnOnce(&mut T) -> Result<R, E>,
{
let mut item = match self.receiver.try_recv() {
Ok(item) => item,
Err(TryRecvError::Empty) => (self.init)()?,
Err(TryRecvError::Disconnected) => unreachable!(),
};
// Run the user's closure with the retrieved item
let result = f(&mut item);
if let Err(e) = self.sender.send(item) {
unreachable!("error when sending into channel {e}");
}
result
}
}