From 5b776556feae34dc2a675f99996a169f57f7e289 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 26 Sep 2024 17:46:58 +0200 Subject: [PATCH 01/10] Add ParallelIteratorExt --- milli/src/update/new/items_pool.rs | 32 ++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index e90ce97db..c57bc86f1 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -1,4 +1,36 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError}; +use rayon::iter::{MapInit, ParallelIterator}; + +pub trait ParallelIteratorExt: ParallelIterator { + fn try_map_try_init( + self, + init: INIT, + map_op: F, + ) -> MapInit< + Self, + impl Fn() -> Result> + Sync + Send + Clone, + impl Fn(&mut Result>, Self::Item) -> Result> + Sync + Send + Clone, + > + where + E: Send, + F: Fn(&mut T, Self::Item) -> Result + Sync + Send + Clone, + INIT: Fn() -> Result + Sync + Send + Clone, + R: Send, + { + self.map_init( + move || match init() { + Ok(t) => Ok(t), + Err(err) => Err(Some(err)), + }, + move |maybe_t, item| match maybe_t { + Ok(t) => map_op(t, item).map_err(Some), + Err(maybe_err) => Err(maybe_err.take()), + }, + ) + } +} + +impl ParallelIteratorExt for T {} /// A pool of items that can be pull and generated on demand. pub struct ItemsPool From 8cb5e7437d180603438f958cc3fe8ded026ffd9b Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 26 Sep 2024 17:48:32 +0200 Subject: [PATCH 02/10] try using try_map_try_init --- .../update/new/indexer/document_operation.rs | 40 ++++++++++--------- milli/src/update/new/indexer/mod.rs | 13 ++++-- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 572ea8528..66f981bdd 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -13,6 +13,7 @@ use super::super::items_pool::ItemsPool; use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; +use crate::update::new::items_pool::ParallelIteratorExt as _; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; @@ -73,7 +74,12 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator< + Item = std::result::Result>, + > + Clone + + 'p, + > { let (index, rtxn, primary_key) = param; let documents_ids = index.documents_ids(rtxn)?; @@ -199,24 +205,22 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { // And finally sort them docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); - Ok(docids_version_offsets.into_par_iter().map_with( - Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), - move |context_pool, (external_docid, (internal_docid, operations))| { - context_pool.with(|rtxn| { - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, - Idm::UpdateDocuments => MergeDocumentForUpdates::merge, - }; + Ok(docids_version_offsets.into_par_iter().try_map_try_init( + || index.read_txn().map_err(crate::Error::from), + move |rtxn, (external_docid, (internal_docid, operations))| { + let document_merge_function = match self.index_documents_method { + Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, + Idm::UpdateDocuments => MergeDocumentForUpdates::merge, + }; - document_merge_function( - rtxn, - index, - &fields_ids_map, - internal_docid, - external_docid.to_string(), // TODO do not clone - &operations, - ) - }) + document_merge_function( + rtxn, + index, + &fields_ids_map, + internal_docid, + external_docid.to_string(), // TODO do not clone + &operations, + ) }, )) } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index e30333b3a..cc8af1312 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -23,7 +23,7 @@ use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; use crate::update::GrenadParameters; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; mod document_deletion; mod document_operation; @@ -37,7 +37,12 @@ pub trait DocumentChanges<'p> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p>; + ) -> Result< + impl IndexedParallelIterator< + Item = std::result::Result>, + > + Clone + + 'p, + >; } /// This is the main function of this crate. @@ -53,7 +58,9 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IndexedParallelIterator> + Send + Clone, + PI: IndexedParallelIterator>> + + Send + + Clone, { let (merger_sender, writer_receiver) = merger_writer_channel(10_000); // This channel acts as a rendezvous point to ensure that we are one task ahead From 3843240940465f3a299d5949fcf53022c42b220e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 26 Sep 2024 18:43:23 +0200 Subject: [PATCH 03/10] Prefer using Ars instead of Options --- milli/src/update/new/items_pool.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index c57bc86f1..92a6d5e64 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -1,18 +1,21 @@ +use std::sync::Arc; + use crossbeam_channel::{Receiver, Sender, TryRecvError}; use rayon::iter::{MapInit, ParallelIterator}; pub trait ParallelIteratorExt: ParallelIterator { + /// A method on a parallel iterator to map fn try_map_try_init( self, init: INIT, map_op: F, ) -> MapInit< Self, - impl Fn() -> Result> + Sync + Send + Clone, - impl Fn(&mut Result>, Self::Item) -> Result> + Sync + Send + Clone, + impl Fn() -> Result> + Sync + Send + Clone, + impl Fn(&mut Result>, Self::Item) -> Result> + Sync + Send + Clone, > where - E: Send, + E: Send + Sync, F: Fn(&mut T, Self::Item) -> Result + Sync + Send + Clone, INIT: Fn() -> Result + Sync + Send + Clone, R: Send, @@ -20,11 +23,11 @@ pub trait ParallelIteratorExt: ParallelIterator { self.map_init( move || match init() { Ok(t) => Ok(t), - Err(err) => Err(Some(err)), + Err(err) => Err(Arc::new(err)), }, move |maybe_t, item| match maybe_t { - Ok(t) => map_op(t, item).map_err(Some), - Err(maybe_err) => Err(maybe_err.take()), + Ok(t) => map_op(t, item).map_err(Arc::new), + Err(maybe_err) => Err(maybe_err.clone()), }, ) } From 31de5c747e05d7c7109114a8440e361cc9d7a7e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 26 Sep 2024 18:59:28 +0200 Subject: [PATCH 04/10] WIP using try_map_try_init --- .../new/extract/faceted/extract_facets.rs | 9 ++++++-- milli/src/update/new/extract/mod.rs | 7 ++++-- .../src/update/new/extract/searchable/mod.rs | 11 ++++++--- .../update/new/indexer/document_deletion.rs | 23 +++++++++++-------- .../update/new/indexer/document_operation.rs | 7 +++--- milli/src/update/new/indexer/mod.rs | 11 ++++----- milli/src/update/new/indexer/partial_dump.rs | 6 ++++- .../update/new/indexer/update_by_function.rs | 10 ++++++-- 8 files changed, 55 insertions(+), 29 deletions(-) diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 41bce2215..17de26831 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::fs::File; +use std::sync::Arc; use grenad::{MergeFunction, Merger}; use heed::RoTxn; @@ -14,7 +15,9 @@ use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::extract::DocidsExtractor; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH}; +use crate::{ + DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, +}; pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { @@ -195,7 +198,9 @@ impl DocidsExtractor for FacetedDocidsExtractor { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index 6e60a4063..c12634563 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -4,6 +4,7 @@ mod lru; mod searchable; use std::fs::File; +use std::sync::Arc; pub use faceted::*; use grenad::Merger; @@ -12,14 +13,16 @@ pub use searchable::*; use super::DocumentChange; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{GlobalFieldsIdsMap, Index, Result}; +use crate::{Error, GlobalFieldsIdsMap, Index, Result}; pub trait DocidsExtractor { fn run_extraction( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result>; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index c79bd4766..2557862a2 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -3,6 +3,7 @@ mod extract_word_pair_proximity_docids; mod tokenize_document; use std::fs::File; +use std::sync::Arc; pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; @@ -15,14 +16,16 @@ use super::cache::CboCachedSorter; use super::DocidsExtractor; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; +use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub trait SearchableExtractor { fn run_extraction( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -132,7 +135,9 @@ impl DocidsExtractor for T { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { Self::run_extraction(index, fields_ids_map, indexer, document_changes) } diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index bad72d3b2..eab4331b6 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -1,11 +1,12 @@ use std::sync::Arc; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; -use crate::update::new::{Deletion, DocumentChange, ItemsPool}; -use crate::{FieldsIdsMap, Index, Result}; +use crate::update::new::items_pool::ParallelIteratorExt as _; +use crate::update::new::{Deletion, DocumentChange}; +use crate::{Error, FieldsIdsMap, Index, Result}; pub struct DocumentDeletion { pub to_delete: RoaringBitmap, @@ -28,15 +29,19 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { let index = param; - let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); let to_delete: Vec<_> = self.to_delete.into_iter().collect(); - Ok(to_delete.into_par_iter().map_with(items, |items, docid| { - items.with(|rtxn| { + Ok(to_delete.into_par_iter().try_map_try_init( + || index.read_txn().map_err(crate::Error::from), + |rtxn, docid| { let current = index.document(rtxn, docid)?; Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) - }) - })) + }, + )) } } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 66f981bdd..b2dc67ce1 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -75,9 +75,8 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result< - impl IndexedParallelIterator< - Item = std::result::Result>, - > + Clone + impl IndexedParallelIterator>> + + Clone + 'p, > { let (index, rtxn, primary_key) = param; @@ -206,7 +205,7 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); Ok(docids_version_offsets.into_par_iter().try_map_try_init( - || index.read_txn().map_err(crate::Error::from), + || index.read_txn().map_err(Error::from), move |rtxn, (external_docid, (internal_docid, operations))| { let document_merge_function = match self.index_documents_method { Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index cc8af1312..caae956af 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,4 +1,4 @@ -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use std::thread::{self, Builder}; use big_s::S; @@ -38,9 +38,8 @@ pub trait DocumentChanges<'p> { fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result< - impl IndexedParallelIterator< - Item = std::result::Result>, - > + Clone + impl IndexedParallelIterator>> + + Clone + 'p, >; } @@ -58,7 +57,7 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IndexedParallelIterator>> + PI: IndexedParallelIterator>> + Send + Clone, { @@ -249,7 +248,7 @@ fn extract_and_send_docids( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator>>, sender: &ExtractorSender, ) -> Result<()> { let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?; diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 43a89c46c..02c9d68fc 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -30,7 +30,11 @@ where self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { let (fields_ids_map, concurrent_available_ids, primary_key) = param; Ok(self.iter.map(|object| { diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index d4c0f837b..d6d532433 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use super::DocumentChanges; use crate::update::new::DocumentChange; -use crate::{FieldsIdsMap, Result}; +use crate::{Error, FieldsIdsMap, Result}; pub struct UpdateByFunction; @@ -13,7 +15,11 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { self, _fields_ids_map: &mut FieldsIdsMap, _param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { Ok((0..100).into_par_iter().map(|_| todo!())) } } From f3356ddaa4f551c672bc937c63b5a39b0693c786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 29 Sep 2024 16:46:58 +0200 Subject: [PATCH 05/10] Fix the errors when using the try_map_try_init method --- .../extract/searchable/extract_word_docids.rs | 7 +- milli/src/update/new/indexer/partial_dump.rs | 76 ++++++++++--------- milli/src/update/new/items_pool.rs | 4 +- 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index caab170a4..828219b41 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::fs::File; use std::num::NonZero; +use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; @@ -12,7 +13,7 @@ use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ - bucketed_position, DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, + bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE, }; @@ -303,7 +304,9 @@ impl WordDocidsExtractors { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result { let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 02c9d68fc..aa01f6547 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,8 +1,11 @@ -use rayon::iter::IndexedParallelIterator; +use std::sync::Arc; + +use rayon::iter::{IndexedParallelIterator, ParallelBridge, ParallelIterator}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; @@ -37,41 +40,46 @@ where > { let (fields_ids_map, concurrent_available_ids, primary_key) = param; - Ok(self.iter.map(|object| { - let docid = match concurrent_available_ids.next() { - Some(id) => id, - None => return Err(Error::UserError(UserError::DocumentLimitReached)), - }; + Ok(self.iter.try_map_try_init( + || Ok(()), + |_, object| { + let docid = match concurrent_available_ids.next() { + Some(id) => id, + None => return Err(Error::UserError(UserError::DocumentLimitReached)), + }; - let mut writer = KvWriterFieldId::memory(); - object.iter().for_each(|(key, value)| { - let key = fields_ids_map.id(key).unwrap(); - /// TODO better error management - let value = serde_json::to_vec(&value).unwrap(); - /// TODO it is not ordered - writer.insert(key, value).unwrap(); - }); + let mut writer = KvWriterFieldId::memory(); + object.iter().for_each(|(key, value)| { + let key = fields_ids_map.id(key).unwrap(); + /// TODO better error management + let value = serde_json::to_vec(&value).unwrap(); + /// TODO it is not ordered + writer.insert(key, value).unwrap(); + }); - let document = writer.into_boxed(); - let external_docid = match primary_key.document_id(&document, fields_ids_map)? { - Ok(document_id) => Ok(document_id), - Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error), - Err(DocumentIdExtractionError::MissingDocumentId) => { - Err(UserError::MissingDocumentId { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { - Err(UserError::TooManyDocumentIds { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - }?; + let document = writer.into_boxed(); + let external_docid = match primary_key.document_id(&document, fields_ids_map)? { + Ok(document_id) => Ok(document_id), + Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { + Err(user_error) + } + Err(DocumentIdExtractionError::MissingDocumentId) => { + Err(UserError::MissingDocumentId { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + }?; - let insertion = Insertion::create(docid, document); - Ok(DocumentChange::Insertion(insertion)) - })) + let insertion = Insertion::create(docid, document); + Ok(DocumentChange::Insertion(insertion)) + }, + )) } } diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index 92a6d5e64..01a2cf933 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -4,7 +4,9 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError}; use rayon::iter::{MapInit, ParallelIterator}; pub trait ParallelIteratorExt: ParallelIterator { - /// A method on a parallel iterator to map + /// Maps items based on the init function. + /// + /// The init function is ran only as necessary which is basically once by thread. fn try_map_try_init( self, init: INIT, From d83c9a4074a9d03373f97a3f9ebbfdd3c5ef1a6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 29 Sep 2024 17:21:11 +0200 Subject: [PATCH 06/10] Introduce the try_for_each_try_init method to be used with Arced Errors --- .../new/extract/faceted/extract_facets.rs | 33 +++++++++++-------- .../extract/searchable/extract_word_docids.rs | 31 ++++++++++------- .../src/update/new/extract/searchable/mod.rs | 31 ++++++++++------- milli/src/update/new/indexer/mod.rs | 5 +-- milli/src/update/new/items_pool.rs | 31 +++++++++++++++-- 5 files changed, 89 insertions(+), 42 deletions(-) diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 17de26831..d2daf756a 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -13,6 +13,7 @@ use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::extract::DocidsExtractor; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ @@ -211,7 +212,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn()?, + index.read_txn().map_err(Error::from).map_err(Arc::new)?, fields_ids_map.clone(), Vec::new(), CboCachedSorter::new( @@ -233,19 +234,23 @@ impl DocidsExtractor for FacetedDocidsExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - buffer, - fields_ids_map, - &attributes_to_extract, - cached_sorter, - document_change?, - ) - }) - })?; + document_changes.into_par_iter().try_for_each_try_init( + || Ok(()), + |_, document_change| { + context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + buffer, + fields_ids_map, + &attributes_to_extract, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }) + }, + )?; } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 828219b41..a9552b499 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -10,6 +10,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ @@ -340,7 +341,7 @@ impl WordDocidsExtractors { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn()?, + index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), WordDocidsCachedSorters::new( @@ -356,18 +357,24 @@ impl WordDocidsExtractors { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, + document_changes.into_par_iter().try_for_each_try_init( + || Ok(()), + |_, document_change| { + context_pool.with( + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }, ) - }) - })?; + }, + )?; } { diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 2557862a2..b3f27ec78 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -14,6 +14,7 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; @@ -59,7 +60,7 @@ pub trait SearchableExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn()?, + index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), CboCachedSorter::new( @@ -81,18 +82,24 @@ pub trait SearchableExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, + document_changes.into_par_iter().try_for_each_try_init( + || Ok(()), + |_, document_change| { + context_pool.with( + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }, ) - }) - })?; + }, + )?; } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index caae956af..d6064e4fb 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -22,6 +22,7 @@ use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::GrenadParameters; use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; @@ -80,7 +81,7 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.document_sender(); - document_changes.clone().into_par_iter().try_for_each(|result| { + document_changes.clone().into_par_iter().try_for_each_try_init(|| Ok(()) as Result<_>, |_, result| { match result? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); @@ -98,7 +99,7 @@ where // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } - Ok(()) as Result<_> + Ok(()) as std::result::Result<_, Arc<_>> })?; document_sender.finish().unwrap(); diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index 01a2cf933..649f09105 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -1,3 +1,4 @@ +use std::convert::identity; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender, TryRecvError}; @@ -27,12 +28,38 @@ pub trait ParallelIteratorExt: ParallelIterator { Ok(t) => Ok(t), Err(err) => Err(Arc::new(err)), }, - move |maybe_t, item| match maybe_t { + move |result, item| match result { Ok(t) => map_op(t, item).map_err(Arc::new), - Err(maybe_err) => Err(maybe_err.clone()), + Err(err) => Err(err.clone()), }, ) } + + /// A method to run a closure of all the items and return an owned error. + /// + /// The init function is ran only as necessary which is basically once by thread. + fn try_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> + where + E: Send + Sync, + F: Fn(&mut T, Self::Item) -> Result<(), Arc> + Sync + Send + Clone, + INIT: Fn() -> Result + Sync + Send + Clone, + { + let result = self.try_for_each_init( + move || match init() { + Ok(t) => Ok(t), + Err(err) => Err(Arc::new(err)), + }, + move |result, item| match result { + Ok(t) => op(t, item), + Err(err) => Err(err.clone()), + }, + ); + + match result { + Ok(()) => Ok(()), + Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), + } + } } impl ParallelIteratorExt for T {} From 00e045b24977947ed526b35afa6f5f5f775baaf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 29 Sep 2024 17:42:26 +0200 Subject: [PATCH 07/10] Rename and use the try_arc_for_each_try_init method --- .../new/extract/faceted/extract_facets.rs | 13 ++++--- .../extract/searchable/extract_word_docids.rs | 35 +++++++++---------- .../src/update/new/extract/searchable/mod.rs | 33 ++++++++--------- .../update/new/indexer/document_operation.rs | 3 +- milli/src/update/new/indexer/mod.rs | 5 +-- milli/src/update/new/indexer/partial_dump.rs | 2 +- milli/src/update/new/items_pool.rs | 14 ++++++-- 7 files changed, 54 insertions(+), 51 deletions(-) diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index d2daf756a..40f561b97 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -212,7 +212,6 @@ impl DocidsExtractor for FacetedDocidsExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn().map_err(Error::from).map_err(Arc::new)?, fields_ids_map.clone(), Vec::new(), CboCachedSorter::new( @@ -234,12 +233,12 @@ impl DocidsExtractor for FacetedDocidsExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each_try_init( - || Ok(()), - |_, document_change| { - context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { + document_changes.into_par_iter().try_arc_for_each_try_init( + || index.read_txn().map_err(Error::from), + |rtxn, document_change| { + context_pool.with(|(fields_ids_map, buffer, cached_sorter)| { Self::extract_document_change( - &*rtxn, + rtxn, index, buffer, fields_ids_map, @@ -261,7 +260,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let readers: Vec<_> = context_pool .into_items() .par_bridge() - .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index a9552b499..f59f5a03d 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::IntoParallelIterator; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::CboCachedSorter; @@ -341,7 +341,6 @@ impl WordDocidsExtractors { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), WordDocidsCachedSorters::new( @@ -357,22 +356,20 @@ impl WordDocidsExtractors { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each_try_init( - || Ok(()), - |_, document_change| { - context_pool.with( - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }, - ) + document_changes.into_par_iter().try_arc_for_each_try_init( + || index.read_txn().map_err(Error::from), + |rtxn, document_change| { + context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }) }, )?; } @@ -382,7 +379,7 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { + for (_tokenizer, _fields_ids_map, cache) in context_pool.into_items() { builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index b3f27ec78..b6cda3a87 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -60,7 +60,6 @@ pub trait SearchableExtractor { let context_pool = ItemsPool::new(|| { Ok(( - index.read_txn().map_err(Error::from).map_err(Arc::new)?, &document_tokenizer, fields_ids_map.clone(), CboCachedSorter::new( @@ -82,22 +81,20 @@ pub trait SearchableExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each_try_init( - || Ok(()), - |_, document_change| { - context_pool.with( - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }, - ) + document_changes.into_par_iter().try_arc_for_each_try_init( + || index.read_txn().map_err(Error::from), + |rtxn, document_change| { + context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) + }) }, )?; } @@ -110,7 +107,7 @@ pub trait SearchableExtractor { let readers: Vec<_> = context_pool .into_items() .par_bridge() - .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index b2dc67ce1..38d4a408f 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -5,11 +5,10 @@ use std::sync::Arc; use heed::types::Bytes; use heed::RoTxn; use memmap2::Mmap; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use IndexDocumentsMethod as Idm; use super::super::document_change::DocumentChange; -use super::super::items_pool::ItemsPool; use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index d6064e4fb..934d0a364 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -81,7 +81,8 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.document_sender(); - document_changes.clone().into_par_iter().try_for_each_try_init(|| Ok(()) as Result<_>, |_, result| { + document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>( + |result| { match result? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); @@ -99,7 +100,7 @@ where // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } - Ok(()) as std::result::Result<_, Arc<_>> + Ok(()) })?; document_sender.finish().unwrap(); diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index aa01f6547..db63256a6 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use rayon::iter::{IndexedParallelIterator, ParallelBridge, ParallelIterator}; +use rayon::iter::IndexedParallelIterator; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index 649f09105..8fa22b75b 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -1,4 +1,3 @@ -use std::convert::identity; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender, TryRecvError}; @@ -38,7 +37,7 @@ pub trait ParallelIteratorExt: ParallelIterator { /// A method to run a closure of all the items and return an owned error. /// /// The init function is ran only as necessary which is basically once by thread. - fn try_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> + fn try_arc_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> where E: Send + Sync, F: Fn(&mut T, Self::Item) -> Result<(), Arc> + Sync + Send + Clone, @@ -60,6 +59,17 @@ pub trait ParallelIteratorExt: ParallelIterator { Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), } } + + fn try_arc_for_each(self, op: F) -> Result<(), E> + where + E: Send + Sync, + F: Fn(Self::Item) -> Result<(), Arc> + Sync + Send + Clone, + { + match self.try_for_each(op) { + Ok(()) => Ok(()), + Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), + } + } } impl ParallelIteratorExt for T {} From 0a8cb471dff621c963d9d86d18b6d027c893ad1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 30 Sep 2024 23:50:43 +0200 Subject: [PATCH 08/10] Introduce the AppendOnlyVec struct for the parallel computing --- milli/src/update/new/append_only_vec.rs | 327 ++++++++++++++++++++++++ milli/src/update/new/mod.rs | 1 + 2 files changed, 328 insertions(+) create mode 100644 milli/src/update/new/append_only_vec.rs diff --git a/milli/src/update/new/append_only_vec.rs b/milli/src/update/new/append_only_vec.rs new file mode 100644 index 000000000..fe05dd782 --- /dev/null +++ b/milli/src/update/new/append_only_vec.rs @@ -0,0 +1,327 @@ +// Code taken from +// and modified in order to get a ref mut instead of the index of newly inserted items. + +//! AppendOnlyVec +//! +//! This is a pretty simple type, which is a vector that you can push into and +//! receive a reference to the item you just inserted. The data structure never +//! moves an element once allocated, so you can push to the vec even while holding +//! mutable references to elements that have already been pushed. +//! +//! ### Scaling +//! +//! 1. Accessing an element is O(1), but slightly more expensive than for a +//! standard `Vec`. +//! +//! 2. Pushing a new element amortizes to O(1), but may require allocation of a +//! new chunk. +//! +//! ### Example +//! +//! ``` +//! use append_only_vec::AppendOnlyVec; +//! +//! static V: AppendOnlyVec = AppendOnlyVec::::new(); +//! let mut threads = Vec::new(); +//! for thread_num in 0..10 { +//! threads.push(std::thread::spawn(move || { +//! for n in 0..100 { +//! let s = format!("thread {} says {}", thread_num, n); +//! let which = V.push(s.clone()); +//! assert_eq!(&which, &s); +//! } +//! })); +//! } +//! +//! for t in threads { +//! t.join(); +//! } +//! +//! assert_eq!(V.len(), 1000); +//! ``` + +use std::cell::UnsafeCell; +use std::fmt::Debug; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; + +pub struct AppendOnlyVec { + count: AtomicUsize, + _reserved: AtomicUsize, + data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3], +} + +unsafe impl Send for AppendOnlyVec {} +unsafe impl Sync for AppendOnlyVec {} + +const BITS: usize = std::mem::size_of::() * 8; + +#[cfg(target_arch = "x86_64")] +const BITS_USED: usize = 48; +#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))] +const BITS_USED: usize = 64; +#[cfg(target_pointer_width = "32")] +const BITS_USED: usize = 32; + +// This takes an index into a vec, and determines which data array will hold it +// (the first return value), and what the index will be into that data array +// (second return value) +// +// The ith data array holds 1< (u32, usize) { + let i = i + 8; + let bin = BITS as u32 - 1 - i.leading_zeros(); + let bin = bin - 3; + let offset = i - bin_size(bin); + (bin, offset) +} + +const fn bin_size(array: u32) -> usize { + (1 << 3) << array +} + +#[test] +fn test_indices() { + for i in 0..32 { + println!("{:3}: {} {}", i, indices(i).0, indices(i).1); + } + let mut array = 0; + let mut offset = 0; + let mut index = 0; + while index < 1000 { + index += 1; + offset += 1; + if offset >= bin_size(array) { + offset = 0; + array += 1; + } + assert_eq!(indices(index), (array, offset)); + } +} + +impl Default for AppendOnlyVec { + fn default() -> Self { + Self::new() + } +} + +impl AppendOnlyVec { + const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut()); + + /// Allocate a new empty array. + pub const fn new() -> Self { + AppendOnlyVec { + count: AtomicUsize::new(0), + _reserved: AtomicUsize::new(0), + data: [Self::EMPTY; BITS_USED - 1 - 3], + } + } + + /// Find the length of the array. + #[inline] + pub fn len(&self) -> usize { + self.count.load(Ordering::Acquire) + } + + fn layout(array: u32) -> std::alloc::Layout { + std::alloc::Layout::array::(bin_size(array)).unwrap() + } + + /// Append an element to the array and get a mutable ref to it. + /// + /// This is notable in that it doesn't require a `&mut self`, because it + /// does appropriate atomic synchronization. + pub fn push(&self, val: T) -> &mut T { + let idx = self._reserved.fetch_add(1, Ordering::Relaxed); + let (array, offset) = indices(idx); + let ptr = if self.len() < 1 + idx - offset { + // We are working on a new array, which may not have been allocated... + if offset == 0 { + // It is our job to allocate the array! The size of the array + // is determined in the self.layout method, which needs to be + // consistent with the indices function. + let layout = Self::layout(array); + let ptr = unsafe { std::alloc::alloc(layout) } as *mut T; + unsafe { + *self.data[array as usize].get() = ptr; + } + ptr + } else { + // We need to wait for the array to be allocated. + while self.len() < 1 + idx - offset { + std::hint::spin_loop(); + } + // The Ordering::Acquire semantics of self.len() ensures that + // this pointer read will get the non-null pointer allocated + // above. + unsafe { *self.data[array as usize].get() } + } + } else { + // The Ordering::Acquire semantics of self.len() ensures that + // this pointer read will get the non-null pointer allocated + // above. + unsafe { *self.data[array as usize].get() } + }; + + // The contents of this offset are guaranteed to be unused (so far) + // because we got the idx from our fetch_add above, and ptr is + // guaranteed to be valid because of the loop we used above, which used + // self.len() which has Ordering::Acquire semantics. + unsafe { (ptr.add(offset)).write(val) }; + + // Now we need to increase the size of the vec, so it can get read. We + // use Release upon success, to ensure that the value which we wrote is + // visible to any thread that has confirmed that the count is big enough + // to read that element. In case of failure, we can be relaxed, since + // we don't do anything with the result other than try again. + while self + .count + .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed) + .is_err() + { + // This means that someone else *started* pushing before we started, + // but hasn't yet finished. We have to wait for them to finish + // pushing before we can update the count. Note that using a + // spinloop here isn't really ideal, but except when allocating a + // new array, the window between reserving space and using it is + // pretty small, so contention will hopefully be rare, and having a + // context switch during that interval will hopefully be vanishingly + // unlikely. + std::hint::spin_loop(); + } + + unsafe { &mut *ptr } + } + + /// Convert into a standard `Vec`. + pub fn into_vec(self) -> Vec { + let mut vec = Vec::with_capacity(self.len()); + + for idx in 0..self.len() { + let (array, offset) = indices(idx); + // We use a Relaxed load of the pointer, because the loop above (which + // ends before `self.len()`) should ensure that the data we want is + // already visible, since it Acquired `self.count` which synchronizes + // with the write in `self.push`. + let ptr = unsafe { *self.data[array as usize].get() }; + + // Copy the element value. The copy remaining in the array must not + // be used again (i.e. make sure we do not drop it) + let value = unsafe { ptr.add(offset).read() }; + + vec.push(value); + } + + // Prevent dropping the copied-out values by marking the count as 0 before + // our own drop is run + self.count.store(0, Ordering::Relaxed); + + vec + } +} + +impl Debug for AppendOnlyVec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish() + } +} + +impl Drop for AppendOnlyVec { + fn drop(&mut self) { + // First we'll drop all the `T` in a slightly sloppy way. FIXME this + // could be optimized to avoid reloading the `ptr`. + for idx in 0..self.len() { + let (array, offset) = indices(idx); + // We use a Relaxed load of the pointer, because the loop above (which + // ends before `self.len()`) should ensure that the data we want is + // already visible, since it Acquired `self.count` which synchronizes + // with the write in `self.push`. + let ptr = unsafe { *self.data[array as usize].get() }; + unsafe { + ptr::drop_in_place(ptr.add(offset)); + } + } + // Now we will free all the arrays. + for array in 0..self.data.len() as u32 { + // This load is relaxed because no other thread can have a reference + // to Self because we have a &mut self. + let ptr = unsafe { *self.data[array as usize].get() }; + if !ptr.is_null() { + let layout = Self::layout(array); + unsafe { std::alloc::dealloc(ptr as *mut u8, layout) }; + } else { + break; + } + } + } +} + +impl IntoIterator for AppendOnlyVec { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.into_vec().into_iter() + } +} + +#[test] +fn test_parallel_pushing() { + use std::sync::Arc; + let v = Arc::new(AppendOnlyVec::::new()); + let mut threads = Vec::new(); + const N: u64 = 100; + for thread_num in 0..N { + let v = v.clone(); + threads.push(std::thread::spawn(move || { + let which1 = v.push(thread_num); + let which2 = v.push(thread_num); + assert_eq!(*which1, thread_num); + assert_eq!(*which2, thread_num); + })); + } + for t in threads { + t.join().unwrap(); + } + let v = Arc::into_inner(v).unwrap().into_vec(); + for thread_num in 0..N { + assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); + } +} + +#[test] +fn test_into_vec() { + struct SafeToDrop(bool); + + impl Drop for SafeToDrop { + fn drop(&mut self) { + assert!(self.0); + } + } + + let v = AppendOnlyVec::new(); + + for _ in 0..50 { + v.push(SafeToDrop(false)); + } + + let mut v = v.into_vec(); + assert_eq!(v.len(), 50); + + for i in v.iter_mut() { + i.0 = true; + } +} + +#[test] +fn test_push_then_index_mut() { + let v = AppendOnlyVec::::new(); + for i in 0..1024 { + *v.push(i) += 1; + } + + let v = v.into_vec(); + for i in 0..1024 { + assert_eq!(v[i], 2 * i); + } +} diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 98b60378f..6a48e0407 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -5,6 +5,7 @@ pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; +mod append_only_vec; mod channel; mod document_change; mod extract; From dead7a56a32b2331ac9886f793dfea5b653ef7ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 1 Oct 2024 11:07:56 +0200 Subject: [PATCH 09/10] Keep the caches in the AppendOnlyVec --- milli/src/update/new/append_only_vec.rs | 12 ++-- .../new/extract/faceted/extract_facets.rs | 69 +++++++++---------- .../extract/searchable/extract_word_docids.rs | 50 +++++++------- .../src/update/new/extract/searchable/mod.rs | 67 +++++++++--------- 4 files changed, 95 insertions(+), 103 deletions(-) diff --git a/milli/src/update/new/append_only_vec.rs b/milli/src/update/new/append_only_vec.rs index fe05dd782..d4a30c1b1 100644 --- a/milli/src/update/new/append_only_vec.rs +++ b/milli/src/update/new/append_only_vec.rs @@ -99,12 +99,6 @@ fn test_indices() { } } -impl Default for AppendOnlyVec { - fn default() -> Self { - Self::new() - } -} - impl AppendOnlyVec { const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut()); @@ -220,6 +214,12 @@ impl AppendOnlyVec { } } +impl Default for AppendOnlyVec { + fn default() -> Self { + Self::new() + } +} + impl Debug for AppendOnlyVec { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish() diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 40f561b97..ef983c4e6 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -12,6 +12,7 @@ use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::DocidsExtractor; use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; @@ -209,45 +210,40 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - - let context_pool = ItemsPool::new(|| { - Ok(( - fields_ids_map.clone(), - Vec::new(), - CboCachedSorter::new( - // TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( - || index.read_txn().map_err(Error::from), - |rtxn, document_change| { - context_pool.with(|(fields_ids_map, buffer, cached_sorter)| { - Self::extract_document_change( - rtxn, - index, - buffer, - fields_ids_map, - &attributes_to_extract, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }) + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + // TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) + }, + |(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { + Self::extract_document_change( + rtxn, + index, + buffer, + fields_ids_map, + &attributes_to_extract, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) }, )?; } @@ -257,14 +253,15 @@ impl DocidsExtractor for FacetedDocidsExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = context_pool - .into_items() + let readers: Vec<_> = caches + .into_iter() .par_bridge() - .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { + .map(|cached_sorter| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) .collect(); + for reader in readers { builder.extend(reader?); } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index f59f5a03d..a19ac3891 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -8,6 +8,8 @@ use heed::RoTxn; use rayon::iter::IntoParallelIterator; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; +use super::SearchableExtractor; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::items_pool::ParallelIteratorExt; @@ -339,37 +341,33 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let context_pool = ItemsPool::new(|| { - Ok(( - &document_tokenizer, - fields_ids_map.clone(), - WordDocidsCachedSorters::new( - indexer, - max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( - || index.read_txn().map_err(Error::from), - |rtxn, document_change| { - context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }) + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(WordDocidsCachedSorters::new( + indexer, + max_memory, + // TODO use a better value + 200_000.try_into().unwrap(), + )); + Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + }, + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) }, )?; } @@ -379,7 +377,7 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for (_tokenizer, _fields_ids_map, cache) in context_pool.into_items() { + for cache in caches.into_iter() { builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index b6cda3a87..f09f573e0 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -14,6 +14,7 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -57,44 +58,39 @@ pub trait SearchableExtractor { localized_attributes_rules: &localized_attributes_rules, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - - let context_pool = ItemsPool::new(|| { - Ok(( - &document_tokenizer, - fields_ids_map.clone(), - CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( - || index.read_txn().map_err(Error::from), - |rtxn, document_change| { - context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }) + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + // TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + }, + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) }, )?; } @@ -104,14 +100,15 @@ pub trait SearchableExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = context_pool - .into_items() + let readers: Vec<_> = caches + .into_iter() .par_bridge() - .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { + .map(|cached_sorter| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) .collect(); + for reader in readers { builder.extend(reader?); } From b7a5ba100edc4bb7e065b41908b1f949b4f2ff3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 1 Oct 2024 11:10:18 +0200 Subject: [PATCH 10/10] Move the ParallelIteratorExt into the parallel_iterator_ext module --- .../new/extract/faceted/extract_facets.rs | 4 +- .../extract/searchable/extract_word_docids.rs | 4 +- .../src/update/new/extract/searchable/mod.rs | 4 +- .../update/new/indexer/document_deletion.rs | 2 +- .../update/new/indexer/document_operation.rs | 2 +- milli/src/update/new/indexer/mod.rs | 2 +- milli/src/update/new/indexer/partial_dump.rs | 2 +- milli/src/update/new/mod.rs | 3 +- ...items_pool.rs => parallel_iterator_ext.rs} | 54 ------------------- 9 files changed, 11 insertions(+), 66 deletions(-) rename milli/src/update/new/{items_pool.rs => parallel_iterator_ext.rs} (59%) diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index ef983c4e6..e4e6f7010 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -14,8 +14,8 @@ use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::DocidsExtractor; -use crate::update::new::items_pool::ParallelIteratorExt; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index a19ac3891..f4346ba52 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -12,8 +12,8 @@ use super::SearchableExtractor; use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; -use crate::update::new::items_pool::ParallelIteratorExt; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index f09f573e0..b3fa646b9 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -15,8 +15,8 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; use crate::update::new::append_only_vec::AppendOnlyVec; -use crate::update::new::items_pool::ParallelIteratorExt; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index eab4331b6..400b51af6 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -4,7 +4,7 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; -use crate::update::new::items_pool::ParallelIteratorExt as _; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, DocumentChange}; use crate::{Error, FieldsIdsMap, Index, Result}; diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 38d4a408f..f9e1bb8f3 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -12,7 +12,7 @@ use super::super::document_change::DocumentChange; use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; -use crate::update::new::items_pool::ParallelIteratorExt as _; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 934d0a364..28165c3a8 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -22,7 +22,7 @@ use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; -use crate::update::new::items_pool::ParallelIteratorExt; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::GrenadParameters; use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index db63256a6..325e13cc4 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -5,7 +5,7 @@ use rayon::iter::IndexedParallelIterator; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; -use crate::update::new::items_pool::ParallelIteratorExt; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 6a48e0407..264241caa 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -1,5 +1,4 @@ pub use document_change::{Deletion, DocumentChange, Insertion, Update}; -pub use items_pool::ItemsPool; pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; @@ -10,8 +9,8 @@ mod channel; mod document_change; mod extract; pub mod indexer; -mod items_pool; mod merger; +mod parallel_iterator_ext; mod top_level_map; mod word_fst_builder; mod words_prefix_docids; diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/parallel_iterator_ext.rs similarity index 59% rename from milli/src/update/new/items_pool.rs rename to milli/src/update/new/parallel_iterator_ext.rs index 8fa22b75b..043457cfd 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/parallel_iterator_ext.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use crossbeam_channel::{Receiver, Sender, TryRecvError}; use rayon::iter::{MapInit, ParallelIterator}; pub trait ParallelIteratorExt: ParallelIterator { @@ -73,56 +72,3 @@ pub trait ParallelIteratorExt: ParallelIterator { } impl ParallelIteratorExt for T {} - -/// A pool of items that can be pull and generated on demand. -pub struct ItemsPool -where - F: Fn() -> Result, -{ - init: F, - sender: Sender, - receiver: Receiver, -} - -impl ItemsPool -where - F: Fn() -> Result, -{ - /// Create a new unbounded items pool with the specified function - /// to generate items when needed. - /// - /// The `init` function will be invoked whenever a call to `with` requires new items. - pub fn new(init: F) -> Self { - let (sender, receiver) = crossbeam_channel::unbounded(); - ItemsPool { init, sender, receiver } - } - - /// Consumes the pool to retrieve all remaining items. - /// - /// This method is useful for cleaning up and managing the items once they are no longer needed. - pub fn into_items(self) -> crossbeam_channel::IntoIter { - self.receiver.into_iter() - } - - /// Allows running a function on an item from the pool, - /// potentially generating a new item if the pool is empty. - pub fn with(&self, f: G) -> Result - where - G: FnOnce(&mut T) -> Result, - { - let mut item = match self.receiver.try_recv() { - Ok(item) => item, - Err(TryRecvError::Empty) => (self.init)()?, - Err(TryRecvError::Disconnected) => unreachable!(), - }; - - // Run the user's closure with the retrieved item - let result = f(&mut item); - - if let Err(e) = self.sender.send(item) { - unreachable!("error when sending into channel {e}"); - } - - result - } -}