From 31de5c747e05d7c7109114a8440e361cc9d7a7e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 26 Sep 2024 18:59:28 +0200 Subject: [PATCH] WIP using try_map_try_init --- .../new/extract/faceted/extract_facets.rs | 9 ++++++-- milli/src/update/new/extract/mod.rs | 7 ++++-- .../src/update/new/extract/searchable/mod.rs | 11 ++++++--- .../update/new/indexer/document_deletion.rs | 23 +++++++++++-------- .../update/new/indexer/document_operation.rs | 7 +++--- milli/src/update/new/indexer/mod.rs | 11 ++++----- milli/src/update/new/indexer/partial_dump.rs | 6 ++++- .../update/new/indexer/update_by_function.rs | 10 ++++++-- 8 files changed, 55 insertions(+), 29 deletions(-) diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 41bce2215..17de26831 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::fs::File; +use std::sync::Arc; use grenad::{MergeFunction, Merger}; use heed::RoTxn; @@ -14,7 +15,9 @@ use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::extract::DocidsExtractor; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH}; +use crate::{ + DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, +}; pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { @@ -195,7 +198,9 @@ impl DocidsExtractor for FacetedDocidsExtractor { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index 6e60a4063..c12634563 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -4,6 +4,7 @@ mod lru; mod searchable; use std::fs::File; +use std::sync::Arc; pub use faceted::*; use grenad::Merger; @@ -12,14 +13,16 @@ pub use searchable::*; use super::DocumentChange; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{GlobalFieldsIdsMap, Index, Result}; +use crate::{Error, GlobalFieldsIdsMap, Index, Result}; pub trait DocidsExtractor { fn run_extraction( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result>; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index c79bd4766..2557862a2 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -3,6 +3,7 @@ mod extract_word_pair_proximity_docids; mod tokenize_document; use std::fs::File; +use std::sync::Arc; pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; @@ -15,14 +16,16 @@ use super::cache::CboCachedSorter; use super::DocidsExtractor; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; +use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub trait SearchableExtractor { fn run_extraction( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -132,7 +135,9 @@ impl DocidsExtractor for T { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { Self::run_extraction(index, fields_ids_map, indexer, document_changes) } diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index bad72d3b2..eab4331b6 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -1,11 +1,12 @@ use std::sync::Arc; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; -use crate::update::new::{Deletion, DocumentChange, ItemsPool}; -use crate::{FieldsIdsMap, Index, Result}; +use crate::update::new::items_pool::ParallelIteratorExt as _; +use crate::update::new::{Deletion, DocumentChange}; +use crate::{Error, FieldsIdsMap, Index, Result}; pub struct DocumentDeletion { pub to_delete: RoaringBitmap, @@ -28,15 +29,19 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { let index = param; - let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); let to_delete: Vec<_> = self.to_delete.into_iter().collect(); - Ok(to_delete.into_par_iter().map_with(items, |items, docid| { - items.with(|rtxn| { + Ok(to_delete.into_par_iter().try_map_try_init( + || index.read_txn().map_err(crate::Error::from), + |rtxn, docid| { let current = index.document(rtxn, docid)?; Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) - }) - })) + }, + )) } } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 66f981bdd..b2dc67ce1 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -75,9 +75,8 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result< - impl IndexedParallelIterator< - Item = std::result::Result>, - > + Clone + impl IndexedParallelIterator>> + + Clone + 'p, > { let (index, rtxn, primary_key) = param; @@ -206,7 +205,7 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); Ok(docids_version_offsets.into_par_iter().try_map_try_init( - || index.read_txn().map_err(crate::Error::from), + || index.read_txn().map_err(Error::from), move |rtxn, (external_docid, (internal_docid, operations))| { let document_merge_function = match self.index_documents_method { Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index cc8af1312..caae956af 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,4 +1,4 @@ -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use std::thread::{self, Builder}; use big_s::S; @@ -38,9 +38,8 @@ pub trait DocumentChanges<'p> { fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result< - impl IndexedParallelIterator< - Item = std::result::Result>, - > + Clone + impl IndexedParallelIterator>> + + Clone + 'p, >; } @@ -58,7 +57,7 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IndexedParallelIterator>> + PI: IndexedParallelIterator>> + Send + Clone, { @@ -249,7 +248,7 @@ fn extract_and_send_docids( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator>>, sender: &ExtractorSender, ) -> Result<()> { let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?; diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 43a89c46c..02c9d68fc 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -30,7 +30,11 @@ where self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { let (fields_ids_map, concurrent_available_ids, primary_key) = param; Ok(self.iter.map(|object| { diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index d4c0f837b..d6d532433 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use super::DocumentChanges; use crate::update::new::DocumentChange; -use crate::{FieldsIdsMap, Result}; +use crate::{Error, FieldsIdsMap, Result}; pub struct UpdateByFunction; @@ -13,7 +15,11 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { self, _fields_ids_map: &mut FieldsIdsMap, _param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { Ok((0..100).into_par_iter().map(|_| todo!())) } }