From b7a5ba100edc4bb7e065b41908b1f949b4f2ff3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 1 Oct 2024 11:10:18 +0200 Subject: [PATCH] Move the ParallelIteratorExt into the parallel_iterator_ext module --- .../new/extract/faceted/extract_facets.rs | 4 +- .../extract/searchable/extract_word_docids.rs | 4 +- .../src/update/new/extract/searchable/mod.rs | 4 +- .../update/new/indexer/document_deletion.rs | 2 +- .../update/new/indexer/document_operation.rs | 2 +- milli/src/update/new/indexer/mod.rs | 2 +- milli/src/update/new/indexer/partial_dump.rs | 2 +- milli/src/update/new/mod.rs | 3 +- ...items_pool.rs => parallel_iterator_ext.rs} | 54 ------------------- 9 files changed, 11 insertions(+), 66 deletions(-) rename milli/src/update/new/{items_pool.rs => parallel_iterator_ext.rs} (59%) diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index ef983c4e6..e4e6f7010 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -14,8 +14,8 @@ use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::DocidsExtractor; -use crate::update::new::items_pool::ParallelIteratorExt; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index a19ac3891..f4346ba52 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -12,8 +12,8 @@ use super::SearchableExtractor; use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; -use crate::update::new::items_pool::ParallelIteratorExt; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index f09f573e0..b3fa646b9 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -15,8 +15,8 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; use crate::update::new::append_only_vec::AppendOnlyVec; -use crate::update::new::items_pool::ParallelIteratorExt; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index eab4331b6..400b51af6 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -4,7 +4,7 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; -use crate::update::new::items_pool::ParallelIteratorExt as _; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, DocumentChange}; use crate::{Error, FieldsIdsMap, Index, Result}; diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 38d4a408f..f9e1bb8f3 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -12,7 +12,7 @@ use super::super::document_change::DocumentChange; use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; -use crate::update::new::items_pool::ParallelIteratorExt as _; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 934d0a364..28165c3a8 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -22,7 +22,7 @@ use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; -use crate::update::new::items_pool::ParallelIteratorExt; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::GrenadParameters; use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index db63256a6..325e13cc4 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -5,7 +5,7 @@ use rayon::iter::IndexedParallelIterator; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; -use crate::update::new::items_pool::ParallelIteratorExt; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 6a48e0407..264241caa 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -1,5 +1,4 @@ pub use document_change::{Deletion, DocumentChange, Insertion, Update}; -pub use items_pool::ItemsPool; pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; @@ -10,8 +9,8 @@ mod channel; mod document_change; mod extract; pub mod indexer; -mod items_pool; mod merger; +mod parallel_iterator_ext; mod top_level_map; mod word_fst_builder; mod words_prefix_docids; diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/parallel_iterator_ext.rs similarity index 59% rename from milli/src/update/new/items_pool.rs rename to milli/src/update/new/parallel_iterator_ext.rs index 8fa22b75b..043457cfd 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/parallel_iterator_ext.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use crossbeam_channel::{Receiver, Sender, TryRecvError}; use rayon::iter::{MapInit, ParallelIterator}; pub trait ParallelIteratorExt: ParallelIterator { @@ -73,56 +72,3 @@ pub trait ParallelIteratorExt: ParallelIterator { } impl ParallelIteratorExt for T {} - -/// A pool of items that can be pull and generated on demand. -pub struct ItemsPool -where - F: Fn() -> Result, -{ - init: F, - sender: Sender, - receiver: Receiver, -} - -impl ItemsPool -where - F: Fn() -> Result, -{ - /// Create a new unbounded items pool with the specified function - /// to generate items when needed. - /// - /// The `init` function will be invoked whenever a call to `with` requires new items. - pub fn new(init: F) -> Self { - let (sender, receiver) = crossbeam_channel::unbounded(); - ItemsPool { init, sender, receiver } - } - - /// Consumes the pool to retrieve all remaining items. - /// - /// This method is useful for cleaning up and managing the items once they are no longer needed. - pub fn into_items(self) -> crossbeam_channel::IntoIter { - self.receiver.into_iter() - } - - /// Allows running a function on an item from the pool, - /// potentially generating a new item if the pool is empty. - pub fn with(&self, f: G) -> Result - where - G: FnOnce(&mut T) -> Result, - { - let mut item = match self.receiver.try_recv() { - Ok(item) => item, - Err(TryRecvError::Empty) => (self.init)()?, - Err(TryRecvError::Disconnected) => unreachable!(), - }; - - // Run the user's closure with the retrieved item - let result = f(&mut item); - - if let Err(e) = self.sender.send(item) { - unreachable!("error when sending into channel {e}"); - } - - result - } -}