Use thread_pool broadcast

This commit is contained in:
Louis Dureuil 2025-02-26 22:12:19 +01:00
parent 89717ba0f1
commit de2fedaa9d

View File

@ -51,11 +51,12 @@ const MAX_FRAME_HEADER_SIZE: usize = 9;
/// when new stuff is available in any BBQueue buffer but we send /// when new stuff is available in any BBQueue buffer but we send
/// a message in this queue only if it is empty to avoid filling /// a message in this queue only if it is empty to avoid filling
/// the channel *and* the BBQueue. /// the channel *and* the BBQueue.
pub fn extractor_writer_bbqueue( pub fn extractor_writer_bbqueue<'a>(
bbbuffers: &mut Vec<BBBuffer>, thread_pool: &mut scoped_thread_pool::ThreadPool<crate::Error>,
bbbuffers: &'a mut Vec<BBBuffer>,
total_bbbuffer_capacity: usize, total_bbbuffer_capacity: usize,
channel_capacity: usize, channel_capacity: usize,
) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) { ) -> (ExtractorBbqueueSender<'a>, WriterBbqueueReceiver<'a>) {
let current_num_threads = rayon::current_num_threads(); let current_num_threads = rayon::current_num_threads();
let bbbuffer_capacity = total_bbbuffer_capacity.checked_div(current_num_threads).unwrap(); let bbbuffer_capacity = total_bbbuffer_capacity.checked_div(current_num_threads).unwrap();
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
@ -66,8 +67,8 @@ pub fn extractor_writer_bbqueue(
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap(); let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
let producers = ThreadLocal::with_capacity(bbbuffers.len()); let producers = ThreadLocal::with_capacity(bbbuffers.len());
let consumers = rayon::broadcast(|bi| { let consumers = thread_pool.broadcast(|thread_index| {
let bbqueue = &bbbuffers[bi.index()]; let bbqueue: &BBBuffer = &bbbuffers[thread_index];
let (producer, consumer) = bbqueue.try_split_framed().unwrap(); let (producer, consumer) = bbqueue.try_split_framed().unwrap();
producers.get_or(|| FullySend(RefCell::new(producer))); producers.get_or(|| FullySend(RefCell::new(producer)));
consumer consumer