From de2fedaa9dbd904e89d49ac3bbb2e02993ef54b6 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 26 Feb 2025 22:12:19 +0100 Subject: [PATCH] Use thread_pool broadcast --- crates/milli/src/update/new/channel.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 4fff31a35..3d576a7e5 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -51,11 +51,12 @@ const MAX_FRAME_HEADER_SIZE: usize = 9; /// when new stuff is available in any BBQueue buffer but we send /// a message in this queue only if it is empty to avoid filling /// the channel *and* the BBQueue. -pub fn extractor_writer_bbqueue( - bbbuffers: &mut Vec, +pub fn extractor_writer_bbqueue<'a>( + thread_pool: &mut scoped_thread_pool::ThreadPool, + bbbuffers: &'a mut Vec, total_bbbuffer_capacity: usize, channel_capacity: usize, -) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) { +) -> (ExtractorBbqueueSender<'a>, WriterBbqueueReceiver<'a>) { let current_num_threads = rayon::current_num_threads(); let bbbuffer_capacity = total_bbbuffer_capacity.checked_div(current_num_threads).unwrap(); bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); @@ -66,8 +67,8 @@ pub fn extractor_writer_bbqueue( let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap(); let producers = ThreadLocal::with_capacity(bbbuffers.len()); - let consumers = rayon::broadcast(|bi| { - let bbqueue = &bbbuffers[bi.index()]; + let consumers = thread_pool.broadcast(|thread_index| { + let bbqueue: &BBBuffer = &bbbuffers[thread_index]; let (producer, consumer) = bbqueue.try_split_framed().unwrap(); producers.get_or(|| FullySend(RefCell::new(producer))); consumer