Add the rayon Threadpool to the index function parameter

This commit is contained in:
Clément Renault 2024-08-30 14:34:24 +02:00
parent 54f2eb4507
commit 271ce91b3b
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -22,6 +22,7 @@ mod indexer {
use heed::{RoTxn, RwTxn}; use heed::{RoTxn, RwTxn};
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use rayon::ThreadPool;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::Value; use serde_json::Value;
@ -31,7 +32,6 @@ mod indexer {
}; };
use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::document_change::{Deletion, DocumentChange, Insertion, Update};
use super::items_pool::ItemsPool; use super::items_pool::ItemsPool;
use super::merge;
use crate::documents::{ use crate::documents::{
obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey,
}; };
@ -52,7 +52,7 @@ mod indexer {
pub struct DocumentOperationIndexer { pub struct DocumentOperationIndexer {
operations: Vec<Payload>, operations: Vec<Payload>,
method: IndexDocumentsMethod, index_documents_method: IndexDocumentsMethod,
} }
enum Payload { enum Payload {
@ -81,7 +81,7 @@ mod indexer {
impl DocumentOperationIndexer { impl DocumentOperationIndexer {
pub fn new(method: IndexDocumentsMethod) -> Self { pub fn new(method: IndexDocumentsMethod) -> Self {
Self { operations: Default::default(), method } Self { operations: Default::default(), index_documents_method: method }
} }
/// TODO please give me a type /// TODO please give me a type
@ -104,7 +104,7 @@ mod indexer {
self, self,
index: &'a Index, index: &'a Index,
rtxn: &'a RoTxn, rtxn: &'a RoTxn,
mut fields_ids_map: FieldsIdsMap, fields_ids_map: &'a mut FieldsIdsMap,
primary_key: &'a PrimaryKey<'a>, primary_key: &'a PrimaryKey<'a>,
) -> Result<impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'a> { ) -> Result<impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'a> {
let documents_ids = index.documents_ids(rtxn)?; let documents_ids = index.documents_ids(rtxn)?;
@ -198,33 +198,27 @@ mod indexer {
} }
} }
let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); Ok(docids_version_offsets.into_par_iter().map_with(
docids_version_offsets.into_par_iter().map_with( Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))),
items, move |context_pool, (external_docid, (internal_docid, operations))| {
|context_pool, (external_docid, (internal_docid, operations))| { context_pool.with(|rtxn| {
context_pool.with(|rtxn| match self.method { use IndexDocumentsMethod as Idm;
IndexDocumentsMethod::ReplaceDocuments => merge_document_for_replacements( let document_merge_function = match self.index_documents_method {
Idm::ReplaceDocuments => merge_document_for_replacements,
Idm::UpdateDocuments => merge_document_for_updates,
};
document_merge_function(
rtxn, rtxn,
index, index,
&fields_ids_map, fields_ids_map,
internal_docid, internal_docid,
external_docid, external_docid,
&operations, &operations,
), )
// TODO Remap the documents to match the db fields_ids_map
IndexDocumentsMethod::UpdateDocuments => merge_document_for_updates(
rtxn,
index,
&fields_ids_map,
internal_docid,
external_docid,
&operations,
),
}) })
}, },
); ))
Ok(vec![].into_par_iter())
} }
} }
@ -253,7 +247,7 @@ mod indexer {
// process: "external_id_of", // process: "external_id_of",
// }) // })
// })?; // })?;
pub fn document_changes<'a, F>( pub fn document_changes<'a>(
self, self,
index: &'a Index, index: &'a Index,
fields: &'a FieldsIdsMap, fields: &'a FieldsIdsMap,
@ -263,7 +257,7 @@ mod indexer {
Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| {
items.with(|rtxn| { items.with(|rtxn| {
let current = index.document(rtxn, docid)?; let current = index.document(rtxn, docid)?;
let external_docid = match primary_key.document_id(&current, fields)? { let external_docid = match primary_key.document_id(current, fields)? {
Ok(document_id) => Ok(document_id) as Result<_>, Ok(document_id) => Ok(document_id) as Result<_>,
Err(_) => Err(InternalError::DocumentsError( Err(_) => Err(InternalError::DocumentsError(
crate::documents::Error::InvalidDocumentFormat, crate::documents::Error::InvalidDocumentFormat,
@ -325,7 +319,12 @@ mod indexer {
/// TODO return stats /// TODO return stats
/// TODO take the rayon ThreadPool /// TODO take the rayon ThreadPool
pub fn index<PI>(wtxn: &mut RwTxn, index: &Index, document_changes: PI) -> Result<()> pub fn index<PI>(
wtxn: &mut RwTxn,
index: &Index,
pool: &ThreadPool,
document_changes: PI,
) -> Result<()>
where where
PI: IntoParallelIterator<Item = Result<DocumentChange>> + Send, PI: IntoParallelIterator<Item = Result<DocumentChange>> + Send,
PI::Iter: Clone, PI::Iter: Clone,
@ -336,7 +335,9 @@ mod indexer {
thread::scope(|s| { thread::scope(|s| {
thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
document_changes.into_par_iter().for_each(|_dc| ()); pool.in_place_scope(|_s| {
document_changes.into_par_iter().for_each(|_dc| ());
})
})?; })?;
// TODO manage the errors correctly // TODO manage the errors correctly