try using try_map_try_init

This commit is contained in:
Louis Dureuil 2024-09-26 17:48:32 +02:00 committed by Clément Renault
parent 5b776556fe
commit 8cb5e7437d
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 32 additions and 21 deletions

View File

@ -13,6 +13,7 @@ use super::super::items_pool::ItemsPool;
use super::super::{CowStr, TopLevelMap};
use super::DocumentChanges;
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
use crate::update::new::items_pool::ParallelIteratorExt as _;
use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError};
@ -73,7 +74,12 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> {
self,
fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
) -> Result<
impl IndexedParallelIterator<
Item = std::result::Result<DocumentChange, Option<crate::Error>>,
> + Clone
+ 'p,
> {
let (index, rtxn, primary_key) = param;
let documents_ids = index.documents_ids(rtxn)?;
@ -199,24 +205,22 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> {
// And finally sort them
docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops));
Ok(docids_version_offsets.into_par_iter().map_with(
Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))),
move |context_pool, (external_docid, (internal_docid, operations))| {
context_pool.with(|rtxn| {
let document_merge_function = match self.index_documents_method {
Idm::ReplaceDocuments => MergeDocumentForReplacement::merge,
Idm::UpdateDocuments => MergeDocumentForUpdates::merge,
};
Ok(docids_version_offsets.into_par_iter().try_map_try_init(
|| index.read_txn().map_err(crate::Error::from),
move |rtxn, (external_docid, (internal_docid, operations))| {
let document_merge_function = match self.index_documents_method {
Idm::ReplaceDocuments => MergeDocumentForReplacement::merge,
Idm::UpdateDocuments => MergeDocumentForUpdates::merge,
};
document_merge_function(
rtxn,
index,
&fields_ids_map,
internal_docid,
external_docid.to_string(), // TODO do not clone
&operations,
)
})
document_merge_function(
rtxn,
index,
&fields_ids_map,
internal_docid,
external_docid.to_string(), // TODO do not clone
&operations,
)
},
))
}

View File

@ -23,7 +23,7 @@ use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::update::new::channel::ExtractorSender;
use crate::update::settings::InnerIndexSettings;
use crate::update::GrenadParameters;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
mod document_deletion;
mod document_operation;
@ -37,7 +37,12 @@ pub trait DocumentChanges<'p> {
self,
fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p>;
) -> Result<
impl IndexedParallelIterator<
Item = std::result::Result<DocumentChange, Option<crate::Error>>,
> + Clone
+ 'p,
>;
}
/// This is the main function of this crate.
@ -53,7 +58,9 @@ pub fn index<PI>(
document_changes: PI,
) -> Result<()>
where
PI: IndexedParallelIterator<Item = Result<DocumentChange>> + Send + Clone,
PI: IndexedParallelIterator<Item = std::result::Result<DocumentChange, Option<Error>>>
+ Send
+ Clone,
{
let (merger_sender, writer_receiver) = merger_writer_channel(10_000);
// This channel acts as a rendezvous point to ensure that we are one task ahead