I push for Many

This commit is contained in:
Clément Renault 2024-09-02 15:10:21 +02:00
parent 72e7b7846e
commit 521775f788
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 30 additions and 21 deletions

View File

@ -8,7 +8,7 @@ use crate::update::new::KvReaderFieldId;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
pub fn merger_writer_channels(cap: usize) -> (MergerSender, WriterReceiver) { pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) {
let (sender, receiver) = crossbeam_channel::bounded(cap); let (sender, receiver) = crossbeam_channel::bounded(cap);
(MergerSender(sender), WriterReceiver(receiver)) (MergerSender(sender), WriterReceiver(receiver))
} }

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use rayon::iter::{ParallelBridge, ParallelIterator}; use rayon::iter::{ParallelBridge, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::Indexer; use super::DocumentChanges;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::update::new::{Deletion, DocumentChange, ItemsPool}; use crate::update::new::{Deletion, DocumentChange, ItemsPool};
use crate::{FieldsIdsMap, Index, InternalError, Result}; use crate::{FieldsIdsMap, Index, InternalError, Result};
@ -22,7 +22,7 @@ impl DocumentDeletion {
} }
} }
impl<'p> Indexer<'p> for DocumentDeletion { impl<'p> DocumentChanges<'p> for DocumentDeletion {
type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>); type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>);
fn document_changes( fn document_changes(

View File

@ -11,7 +11,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
use super::super::items_pool::ItemsPool; use super::super::items_pool::ItemsPool;
use super::Indexer; use super::DocumentChanges;
use crate::documents::{ use crate::documents::{
obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey,
}; };
@ -70,7 +70,7 @@ impl DocumentOperation {
} }
} }
impl<'p> Indexer<'p> for DocumentOperation { impl<'p> DocumentChanges<'p> for DocumentOperation {
type Parameter = (&'p Index, &'p RoTxn<'static>, &'p mut FieldsIdsMap, &'p PrimaryKey<'p>); type Parameter = (&'p Index, &'p RoTxn<'static>, &'p mut FieldsIdsMap, &'p PrimaryKey<'p>);
fn document_changes( fn document_changes(

View File

@ -10,7 +10,7 @@ use rayon::ThreadPool;
pub use update_by_function::UpdateByFunction; pub use update_by_function::UpdateByFunction;
use super::channel::{ use super::channel::{
extractors_merger_channels, merger_writer_channels, EntryOperation, ExtractorsMergerChannels, extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels,
WriterOperation, WriterOperation,
}; };
use super::document_change::DocumentChange; use super::document_change::DocumentChange;
@ -22,7 +22,7 @@ mod document_operation;
mod partial_dump; mod partial_dump;
mod update_by_function; mod update_by_function;
pub trait Indexer<'p> { pub trait DocumentChanges<'p> {
type Parameter: 'p; type Parameter: 'p;
fn document_changes( fn document_changes(
@ -36,7 +36,6 @@ pub trait Indexer<'p> {
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
/// ///
/// TODO return stats /// TODO return stats
/// TODO take the rayon ThreadPool
pub fn index<PI>( pub fn index<PI>(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
index: &Index, index: &Index,
@ -44,25 +43,31 @@ pub fn index<PI>(
document_changes: PI, document_changes: PI,
) -> Result<()> ) -> Result<()>
where where
PI: IntoParallelIterator<Item = Result<DocumentChange>> + Send, PI: IntoParallelIterator<Item = Result<Option<DocumentChange>>> + Send,
PI::Iter: Clone, PI::Iter: Clone,
{ {
let (merger_sender, writer_receiver) = merger_writer_channels(100); let (merger_sender, writer_receiver) = merger_writer_channel(100);
let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } = let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } =
extractors_merger_channels(100); extractors_merger_channels(100);
thread::scope(|s| { thread::scope(|s| {
// TODO manage the errors correctly // TODO manage the errors correctly
let handle =
thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
pool.in_place_scope(|_s| { pool.in_place_scope(|_s| {
document_changes.into_par_iter().for_each(|_dc| ()); // word docids
// document_changes.into_par_iter().try_for_each(|_dc| Ok(()) as Result<_>)
// let grenads = extractor_function(document_changes)?;
// deladd_cbo_roaring_bitmap_sender.word_docids(grenads)?;
Ok(()) as Result<_>
}) })
})?; })?;
// TODO manage the errors correctly // TODO manage the errors correctly
thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { let handle2 = thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || {
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index).unwrap() merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index)
})?; })?;
// TODO Split this code into another function // TODO Split this code into another function
@ -77,6 +82,10 @@ where
} }
} }
/// TODO handle the panicking threads
handle.join().unwrap()?;
handle2.join().unwrap()?;
Ok(()) Ok(())
}) })
} }

View File

@ -1,6 +1,6 @@
use rayon::iter::{ParallelBridge, ParallelIterator}; use rayon::iter::{ParallelBridge, ParallelIterator};
use super::Indexer; use super::DocumentChanges;
use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::documents::{DocumentIdExtractionError, PrimaryKey};
use crate::update::concurrent_available_ids::ConcurrentAvailableIds; use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId};
@ -16,7 +16,7 @@ impl<I> PartialDump<I> {
} }
} }
impl<'p, I> Indexer<'p> for PartialDump<I> impl<'p, I> DocumentChanges<'p> for PartialDump<I>
where where
I: IntoIterator<Item = Object>, I: IntoIterator<Item = Object>,
I::IntoIter: Send + 'p, I::IntoIter: Send + 'p,

View File

@ -1,12 +1,12 @@
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::Indexer; use super::DocumentChanges;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::Result; use crate::Result;
pub struct UpdateByFunction; pub struct UpdateByFunction;
impl<'p> Indexer<'p> for UpdateByFunction { impl<'p> DocumentChanges<'p> for UpdateByFunction {
type Parameter = (); type Parameter = ();
fn document_changes( fn document_changes(