diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 572ea8528..66f981bdd 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -13,6 +13,7 @@ use super::super::items_pool::ItemsPool; use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; +use crate::update::new::items_pool::ParallelIteratorExt as _; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; @@ -73,7 +74,12 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator< + Item = std::result::Result>, + > + Clone + + 'p, + > { let (index, rtxn, primary_key) = param; let documents_ids = index.documents_ids(rtxn)?; @@ -199,24 +205,22 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { // And finally sort them docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); - Ok(docids_version_offsets.into_par_iter().map_with( - Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), - move |context_pool, (external_docid, (internal_docid, operations))| { - context_pool.with(|rtxn| { - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, - Idm::UpdateDocuments => MergeDocumentForUpdates::merge, - }; + Ok(docids_version_offsets.into_par_iter().try_map_try_init( + || index.read_txn().map_err(crate::Error::from), + move |rtxn, (external_docid, (internal_docid, operations))| { + let document_merge_function = match self.index_documents_method { + Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, + Idm::UpdateDocuments => MergeDocumentForUpdates::merge, + }; - document_merge_function( - rtxn, - index, - &fields_ids_map, - internal_docid, - external_docid.to_string(), // TODO do not clone - &operations, - ) - }) + document_merge_function( + rtxn, + index, + &fields_ids_map, + internal_docid, + external_docid.to_string(), // TODO do not clone + &operations, + ) }, )) } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index e30333b3a..cc8af1312 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -23,7 +23,7 @@ use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; use crate::update::GrenadParameters; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; mod document_deletion; mod document_operation; @@ -37,7 +37,12 @@ pub trait DocumentChanges<'p> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p>; + ) -> Result< + impl IndexedParallelIterator< + Item = std::result::Result>, + > + Clone + + 'p, + >; } /// This is the main function of this crate. @@ -53,7 +58,9 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IndexedParallelIterator> + Send + Clone, + PI: IndexedParallelIterator>> + + Send + + Clone, { let (merger_sender, writer_receiver) = merger_writer_channel(10_000); // This channel acts as a rendezvous point to ensure that we are one task ahead