diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 088303fb3..4041fcc6a 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -8,7 +8,7 @@ use crate::update::new::KvReaderFieldId; use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. -pub fn merger_writer_channels(cap: usize) -> (MergerSender, WriterReceiver) { +pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { let (sender, receiver) = crossbeam_channel::bounded(cap); (MergerSender(sender), WriterReceiver(receiver)) } diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index 2b4bdaeb7..c16299e9a 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use rayon::iter::{ParallelBridge, ParallelIterator}; use roaring::RoaringBitmap; -use super::Indexer; +use super::DocumentChanges; use crate::documents::PrimaryKey; use crate::update::new::{Deletion, DocumentChange, ItemsPool}; use crate::{FieldsIdsMap, Index, InternalError, Result}; @@ -22,7 +22,7 @@ impl DocumentDeletion { } } -impl<'p> Indexer<'p> for DocumentDeletion { +impl<'p> DocumentChanges<'p> for DocumentDeletion { type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>); fn document_changes( diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index fdcb84c7b..5d9755211 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -11,7 +11,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::super::document_change::DocumentChange; use super::super::items_pool::ItemsPool; -use super::Indexer; +use super::DocumentChanges; use crate::documents::{ obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, }; @@ -70,7 +70,7 @@ impl DocumentOperation { } } -impl<'p> Indexer<'p> for DocumentOperation { +impl<'p> DocumentChanges<'p> for DocumentOperation { type Parameter = (&'p Index, &'p RoTxn<'static>, &'p mut FieldsIdsMap, &'p PrimaryKey<'p>); fn document_changes( diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 85d4dbcb1..69ccc0451 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -10,7 +10,7 @@ use rayon::ThreadPool; pub use update_by_function::UpdateByFunction; use super::channel::{ - extractors_merger_channels, merger_writer_channels, EntryOperation, ExtractorsMergerChannels, + extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels, WriterOperation, }; use super::document_change::DocumentChange; @@ -22,7 +22,7 @@ mod document_operation; mod partial_dump; mod update_by_function; -pub trait Indexer<'p> { +pub trait DocumentChanges<'p> { type Parameter: 'p; fn document_changes( @@ -36,7 +36,6 @@ pub trait Indexer<'p> { /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// /// TODO return stats -/// TODO take the rayon ThreadPool pub fn index( wtxn: &mut RwTxn, index: &Index, @@ -44,25 +43,31 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IntoParallelIterator> + Send, + PI: IntoParallelIterator>> + Send, PI::Iter: Clone, { - let (merger_sender, writer_receiver) = merger_writer_channels(100); + let (merger_sender, writer_receiver) = merger_writer_channel(100); let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } = extractors_merger_channels(100); thread::scope(|s| { // TODO manage the errors correctly - thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { - pool.in_place_scope(|_s| { - document_changes.into_par_iter().for_each(|_dc| ()); - }) - })?; + let handle = + thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { + pool.in_place_scope(|_s| { + // word docids + // document_changes.into_par_iter().try_for_each(|_dc| Ok(()) as Result<_>) + // let grenads = extractor_function(document_changes)?; + // deladd_cbo_roaring_bitmap_sender.word_docids(grenads)?; + + Ok(()) as Result<_> + }) + })?; // TODO manage the errors correctly - thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { + let handle2 = thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { let rtxn = index.read_txn().unwrap(); - merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index).unwrap() + merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index) })?; // TODO Split this code into another function @@ -77,6 +82,10 @@ where } } + /// TODO handle the panicking threads + handle.join().unwrap()?; + handle2.join().unwrap()?; + Ok(()) }) } diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 7afb96d65..11c9fbd0e 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,6 +1,6 @@ use rayon::iter::{ParallelBridge, ParallelIterator}; -use super::Indexer; +use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; @@ -16,7 +16,7 @@ impl PartialDump { } } -impl<'p, I> Indexer<'p> for PartialDump +impl<'p, I> DocumentChanges<'p> for PartialDump where I: IntoIterator, I::IntoIter: Send + 'p, diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index e9bdf3640..035f95c02 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,12 +1,12 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use super::Indexer; +use super::DocumentChanges; use crate::update::new::DocumentChange; use crate::Result; pub struct UpdateByFunction; -impl<'p> Indexer<'p> for UpdateByFunction { +impl<'p> DocumentChanges<'p> for UpdateByFunction { type Parameter = (); fn document_changes(