diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 6dc600545..02b61dde4 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -22,6 +22,7 @@ mod indexer { use heed::{RoTxn, RwTxn}; use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; + use rayon::ThreadPool; use roaring::RoaringBitmap; use serde_json::Value; @@ -31,7 +32,6 @@ mod indexer { }; use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::items_pool::ItemsPool; - use super::merge; use crate::documents::{ obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, }; @@ -52,7 +52,7 @@ mod indexer { pub struct DocumentOperationIndexer { operations: Vec, - method: IndexDocumentsMethod, + index_documents_method: IndexDocumentsMethod, } enum Payload { @@ -81,7 +81,7 @@ mod indexer { impl DocumentOperationIndexer { pub fn new(method: IndexDocumentsMethod) -> Self { - Self { operations: Default::default(), method } + Self { operations: Default::default(), index_documents_method: method } } /// TODO please give me a type @@ -104,7 +104,7 @@ mod indexer { self, index: &'a Index, rtxn: &'a RoTxn, - mut fields_ids_map: FieldsIdsMap, + fields_ids_map: &'a mut FieldsIdsMap, primary_key: &'a PrimaryKey<'a>, ) -> Result>> + 'a> { let documents_ids = index.documents_ids(rtxn)?; @@ -198,33 +198,27 @@ mod indexer { } } - let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); - docids_version_offsets.into_par_iter().map_with( - items, - |context_pool, (external_docid, (internal_docid, operations))| { - context_pool.with(|rtxn| match self.method { - IndexDocumentsMethod::ReplaceDocuments => merge_document_for_replacements( + Ok(docids_version_offsets.into_par_iter().map_with( + Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), + move |context_pool, (external_docid, (internal_docid, operations))| { + context_pool.with(|rtxn| { + use IndexDocumentsMethod as Idm; + let document_merge_function = match self.index_documents_method { + Idm::ReplaceDocuments => merge_document_for_replacements, + Idm::UpdateDocuments => merge_document_for_updates, + }; + + document_merge_function( rtxn, index, - &fields_ids_map, + fields_ids_map, internal_docid, external_docid, &operations, - ), - // TODO Remap the documents to match the db fields_ids_map - IndexDocumentsMethod::UpdateDocuments => merge_document_for_updates( - rtxn, - index, - &fields_ids_map, - internal_docid, - external_docid, - &operations, - ), + ) }) }, - ); - - Ok(vec![].into_par_iter()) + )) } } @@ -253,7 +247,7 @@ mod indexer { // process: "external_id_of", // }) // })?; - pub fn document_changes<'a, F>( + pub fn document_changes<'a>( self, index: &'a Index, fields: &'a FieldsIdsMap, @@ -263,7 +257,7 @@ mod indexer { Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { items.with(|rtxn| { let current = index.document(rtxn, docid)?; - let external_docid = match primary_key.document_id(¤t, fields)? { + let external_docid = match primary_key.document_id(current, fields)? { Ok(document_id) => Ok(document_id) as Result<_>, Err(_) => Err(InternalError::DocumentsError( crate::documents::Error::InvalidDocumentFormat, @@ -325,7 +319,12 @@ mod indexer { /// TODO return stats /// TODO take the rayon ThreadPool - pub fn index(wtxn: &mut RwTxn, index: &Index, document_changes: PI) -> Result<()> + pub fn index( + wtxn: &mut RwTxn, + index: &Index, + pool: &ThreadPool, + document_changes: PI, + ) -> Result<()> where PI: IntoParallelIterator> + Send, PI::Iter: Clone, @@ -336,7 +335,9 @@ mod indexer { thread::scope(|s| { thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { - document_changes.into_par_iter().for_each(|_dc| ()); + pool.in_place_scope(|_s| { + document_changes.into_par_iter().for_each(|_dc| ()); + }) })?; // TODO manage the errors correctly