From a1f60c61e85b9c9ede3bde91e858457bd99580c1 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 26 Feb 2025 22:16:31 +0100 Subject: [PATCH] Reasonable changes --- .../update/new/indexer/document_deletion.rs | 27 ++++--- .../update/new/indexer/document_operation.rs | 21 ++--- crates/milli/src/update/new/indexer/mod.rs | 33 ++++---- .../src/update/new/indexer/partial_dump.rs | 38 ++++----- .../update/new/indexer/update_by_function.rs | 43 +++++----- crates/milli/src/update/new/merger.rs | 80 +++++++++++-------- 6 files changed, 127 insertions(+), 115 deletions(-) diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index 03f763f18..39e35ff34 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -1,8 +1,7 @@ use bumpalo::collections::CollectIn; use bumpalo::Bump; -use rayon::iter::IndexedParallelIterator; -use rayon::slice::ParallelSlice as _; use roaring::RoaringBitmap; +use scoped_thread_pool::PartitionChunks; use super::document_changes::{DocumentChangeContext, DocumentChanges}; use crate::documents::PrimaryKey; @@ -28,31 +27,28 @@ impl DocumentDeletion { self, indexer_alloc: &'indexer Bump, primary_key: PrimaryKey<'indexer>, + thread_pool: &scoped_thread_pool::ThreadPool, + chunk_size: usize, ) -> DocumentDeletionChanges<'indexer> { let to_delete: bumpalo::collections::Vec<_> = self.to_delete.into_iter().collect_in(indexer_alloc); let to_delete = to_delete.into_bump_slice(); + let to_delete = PartitionChunks::new(to_delete, chunk_size, thread_pool.thread_count()); + DocumentDeletionChanges { to_delete, primary_key } } } pub struct DocumentDeletionChanges<'indexer> { - to_delete: &'indexer [DocumentId], + to_delete: scoped_thread_pool::PartitionChunks<'indexer, DocumentId>, primary_key: PrimaryKey<'indexer>, } impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { type Item = DocumentId; - fn iter( - &self, - chunk_size: usize, - ) -> impl IndexedParallelIterator> { - self.to_delete.par_chunks(chunk_size) - } - fn item_to_document_change< 'doc, // lifetime of a single `process` call T: MostlySend, @@ -78,7 +74,11 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { } fn len(&self) -> usize { - self.to_delete.len() + self.to_delete.slice().len() + } + + fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> { + self.to_delete.partition(thread_index, task_index) } } @@ -86,6 +86,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { mod test { use std::cell::RefCell; use std::marker::PhantomData; + use std::num::NonZeroUsize; use std::sync::RwLock; use bumpalo::Bump; @@ -135,6 +136,9 @@ mod test { } } + let mut thread_pool = + scoped_thread_pool::ThreadPool::new(NonZeroUsize::new(1).unwrap(), "test".into()); + let mut deletions = DocumentDeletion::new(); deletions.delete_documents_by_docids(Vec::::new().into_iter().collect()); let indexer = Bump::new(); @@ -173,6 +177,7 @@ mod test { let datastore = ThreadLocal::new(); extract( + &mut thread_pool, &changes, &deletion_tracker, context, diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 8216742ec..148cb9eff 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -6,8 +6,8 @@ use bumparaw_collections::RawMap; use hashbrown::hash_map::Entry; use heed::RoTxn; use memmap2::Mmap; -use rayon::slice::ParallelSlice; use rustc_hash::FxBuildHasher; +use scoped_thread_pool::PartitionChunks; use serde_json::value::RawValue; use serde_json::Deserializer; @@ -57,6 +57,8 @@ impl<'pl> DocumentOperation<'pl> { new_fields_ids_map: &mut FieldsIdsMap, must_stop_processing: &MSP, progress: Progress, + thread_pool: &scoped_thread_pool::ThreadPool, + chunk_size: usize, ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> where MSP: Fn() -> bool, @@ -130,6 +132,8 @@ impl<'pl> DocumentOperation<'pl> { docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations)); let docids_version_offsets = docids_version_offsets.into_bump_slice(); + let docids_version_offsets = + PartitionChunks::new(docids_version_offsets, chunk_size, thread_pool.thread_count()); Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) } } @@ -353,13 +357,6 @@ fn merge_version_offsets<'s, 'pl>( impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { type Item = (&'pl str, PayloadOperations<'pl>); - fn iter( - &self, - chunk_size: usize, - ) -> impl rayon::prelude::IndexedParallelIterator> { - self.docids_version_offsets.par_chunks(chunk_size) - } - fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, context: &'doc DocumentChangeContext, @@ -379,12 +376,16 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { } fn len(&self) -> usize { - self.docids_version_offsets.len() + self.docids_version_offsets.slice().len() + } + + fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> { + self.docids_version_offsets.partition(thread_index, task_index) } } pub struct DocumentOperationChanges<'pl> { - docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)], + docids_version_offsets: PartitionChunks<'pl, (&'pl str, PayloadOperations<'pl>)>, } pub enum Payload<'pl> { diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 890191323..fd196c4f7 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -104,9 +104,8 @@ where ); }); - let (extractor_sender, writer_receiver) = pool - .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) - .unwrap(); + let (extractor_sender, writer_receiver) = + extractor_writer_bbqueue(thread_pool, &mut bbbuffers, total_bbbuffer_capacity, 1000); let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); @@ -139,21 +138,19 @@ where let document_ids = &mut document_ids; let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { - pool.install(move || { - extract::extract_all( - document_changes, - indexing_context, - indexer_span, - extractor_sender, - embedders, - &mut extractor_allocs, - finished_extraction, - field_distribution, - index_embeddings, - document_ids, - ) - }) - .unwrap() + extract::extract_all( + thread_pool, + document_changes, + indexing_context, + indexer_span, + extractor_sender, + embedders, + &mut extractor_allocs, + finished_extraction, + field_distribution, + index_embeddings, + document_ids, + ) })?; let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); diff --git a/crates/milli/src/update/new/indexer/partial_dump.rs b/crates/milli/src/update/new/indexer/partial_dump.rs index 6e4abd898..3069310bf 100644 --- a/crates/milli/src/update/new/indexer/partial_dump.rs +++ b/crates/milli/src/update/new/indexer/partial_dump.rs @@ -3,6 +3,7 @@ use std::ops::DerefMut; use bumparaw_collections::RawMap; use rayon::iter::IndexedParallelIterator; use rustc_hash::FxBuildHasher; +use scoped_thread_pool::ThreadPool; use serde_json::value::RawValue; use super::document_changes::{DocumentChangeContext, DocumentChanges}; @@ -14,45 +15,34 @@ use crate::update::new::thread_local::MostlySend; use crate::update::new::{DocumentChange, Insertion}; use crate::{Error, InternalError, Result, UserError}; -pub struct PartialDump { - iter: I, -} +pub struct PartialDump; -impl PartialDump { - pub fn new_from_jsonlines(iter: I) -> Self { - PartialDump { iter } +impl PartialDump { + pub fn new_from_jsonlines() -> Self { + PartialDump } pub fn into_changes<'index>( self, concurrent_available_ids: &'index ConcurrentAvailableIds, primary_key: &'index PrimaryKey, - ) -> PartialDumpChanges<'index, I> { + thread_pool: &ThreadPool, + chunk_size: usize, + ) -> PartialDumpChanges<'index> { // Note for future self: // - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). - PartialDumpChanges { iter: self.iter, concurrent_available_ids, primary_key } + PartialDumpChanges { concurrent_available_ids, primary_key } } } -pub struct PartialDumpChanges<'doc, I> { - iter: I, +pub struct PartialDumpChanges<'doc> { concurrent_available_ids: &'doc ConcurrentAvailableIds, primary_key: &'doc PrimaryKey<'doc>, } -impl<'index, Iter> DocumentChanges<'index> for PartialDumpChanges<'index, Iter> -where - Iter: IndexedParallelIterator> + Clone + Sync + 'index, -{ +impl<'index> DocumentChanges<'index> for PartialDumpChanges<'index> { type Item = Box; - fn iter( - &self, - chunk_size: usize, - ) -> impl IndexedParallelIterator> { - self.iter.clone().chunks(chunk_size) - } - fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, context: &'doc DocumentChangeContext, @@ -85,6 +75,10 @@ where } fn len(&self) -> usize { - self.iter.len() + unimplemented!() + } + + fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> { + unimplemented!() } } diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index 3001648e6..dba04a9e9 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -1,9 +1,10 @@ +use bumpalo::collections::CollectIn; +use bumpalo::Bump; use bumparaw_collections::RawMap; -use rayon::iter::IndexedParallelIterator; -use rayon::slice::ParallelSlice as _; use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST}; use roaring::RoaringBitmap; use rustc_hash::FxBuildHasher; +use scoped_thread_pool::{PartitionChunks, ThreadPool}; use super::document_changes::DocumentChangeContext; use super::DocumentChanges; @@ -22,14 +23,12 @@ pub struct UpdateByFunction { code: String, } -pub struct UpdateByFunctionChanges<'doc> { - primary_key: &'doc PrimaryKey<'doc>, +pub struct UpdateByFunctionChanges<'index> { + primary_key: &'index PrimaryKey<'index>, engine: Engine, ast: AST, context: Option, - // It is sad that the RoaringBitmap doesn't - // implement IndexedParallelIterator - documents: Vec, + documents: PartitionChunks<'index, u32>, } impl UpdateByFunction { @@ -40,6 +39,9 @@ impl UpdateByFunction { pub fn into_changes<'index>( self, primary_key: &'index PrimaryKey, + allocator: &'index Bump, + thread_pool: &ThreadPool, + chunk_size: usize, ) -> Result> { let Self { documents, context, code } = self; @@ -64,26 +66,19 @@ impl UpdateByFunction { None => None, }; - Ok(UpdateByFunctionChanges { - primary_key, - engine, - ast, - context, - documents: documents.into_iter().collect(), - }) + let documents: bumpalo::collections::Vec<'_, _> = + documents.into_iter().collect_in(allocator); + let documents = documents.into_bump_slice(); + + let documents = PartitionChunks::new(documents, chunk_size, thread_pool.thread_count()); + + Ok(UpdateByFunctionChanges { primary_key, engine, ast, context, documents }) } } impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { type Item = u32; - fn iter( - &self, - chunk_size: usize, - ) -> impl IndexedParallelIterator> { - self.documents.as_slice().par_chunks(chunk_size) - } - fn item_to_document_change<'doc, T: MostlySend + 'doc>( &self, context: &'doc DocumentChangeContext, @@ -185,7 +180,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { } fn len(&self) -> usize { - self.documents.len() + self.documents.slice().len() + } + + fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> { + self.documents.partition(thread_index, task_index) } } diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 090add6bd..10e9b77c7 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,10 +1,10 @@ use std::cell::RefCell; +use std::sync::Mutex; use hashbrown::HashMap; use heed::types::Bytes; use heed::{Database, RoTxn}; use memmap2::Mmap; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; use super::channel::*; @@ -64,6 +64,7 @@ where #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_docids<'extractor, MSP, D>( + thread_pool: &mut scoped_thread_pool::ThreadPool, mut caches: Vec>, database: Database, index: &Index, @@ -74,7 +75,10 @@ where MSP: Fn() -> bool + Sync, D: DatabaseType + Sync, { - transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { + let frozen_caches = Mutex::new(transpose_and_freeze_caches(&mut caches)?); + + match thread_pool.broadcast(|thread_index| { + let frozen = std::mem::take(frozen_caches.lock().unwrap().get_mut(thread_index).unwrap()); let rtxn = index.read_txn()?; if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); @@ -92,12 +96,17 @@ where } Operation::Ignore => Ok(()), } - }) - }) + }); + Ok(()) + }) { + Ok(()) => Ok(()), + Err(errors) => Err(crate::Error::from_scoped_thread_pool_errors(thread_pool, errors)), + } } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_facet_docids<'extractor>( + thread_pool: &mut scoped_thread_pool::ThreadPool, mut caches: Vec>, database: FacetDatabases, index: &Index, @@ -108,35 +117,42 @@ pub fn merge_and_send_facet_docids<'extractor>( let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize; let max_string_count = max_string_count.clamp(1000, 100_000); let max_number_count = max_number_count.clamp(1000, 100_000); - transpose_and_freeze_caches(&mut caches)? - .into_par_iter() - .map(|frozen| { - let mut facet_field_ids_delta = - FacetFieldIdsDelta::new(max_string_count, max_number_count); - let rtxn = index.read_txn()?; - merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { - let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; - match merge_cbo_bitmaps(current, del, add)? { - Operation::Write(bitmap) => { - facet_field_ids_delta.register_from_key(key); - docids_sender.write(key, &bitmap)?; - Ok(()) - } - Operation::Delete => { - facet_field_ids_delta.register_from_key(key); - docids_sender.delete(key)?; - Ok(()) - } - Operation::Ignore => Ok(()), - } - })?; + let transposed_frozen_caches = Mutex::new(transpose_and_freeze_caches(&mut caches)?); + let output = Mutex::new(FacetFieldIdsDelta::new(max_string_count, max_number_count)); + thread_pool.broadcast(|thread_index| { + // TODO: we can probably spare the mutex here since it is guaranteed that each thread will access its own cell of the vec + let frozen = + std::mem::take(transposed_frozen_caches.lock().unwrap().get_mut(thread_index).unwrap()); - Ok(facet_field_ids_delta) - }) - .reduce( - || Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)), - |lhs, rhs| Ok(lhs?.merge(rhs?)), - ) + let mut facet_field_ids_delta = FacetFieldIdsDelta::new(max_string_count, max_number_count); + let rtxn = index.read_txn()?; + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { + let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + facet_field_ids_delta.register_from_key(key); + docids_sender.write(key, &bitmap)?; + Ok(()) + } + Operation::Delete => { + facet_field_ids_delta.register_from_key(key); + docids_sender.delete(key)?; + Ok(()) + } + Operation::Ignore => Ok(()), + } + })?; + { + let mut common = output.lock().unwrap(); + *common = std::mem::replace( + &mut *common, + FacetFieldIdsDelta::new(max_string_count, max_number_count), + ) + .merge(facet_field_ids_delta); + } + Ok(()) + }); + Ok(output.into_inner().unwrap()) } pub struct FacetDatabases<'a> {