diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index b0a61bd7f..a2f16983e 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -1,8 +1,10 @@ use std::cell::RefCell; use std::io::{self, BufWriter}; +use std::iter::Cycle; use std::marker::PhantomData; use std::mem; use std::num::NonZeroU16; +use std::ops::Range; use std::time::Duration; use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer}; @@ -64,7 +66,11 @@ pub fn extractor_writer_bbqueue( let (sender, receiver) = flume::bounded(channel_capacity); let sender = ExtractorBbqueueSender { sender, producers, capacity }; - let receiver = WriterBbqueueReceiver { receiver, consumers }; + let receiver = WriterBbqueueReceiver { + receiver, + look_at_consumer: (0..consumers.len()).cycle(), + consumers, + }; (sender, receiver) } @@ -89,6 +95,9 @@ pub struct WriterBbqueueReceiver<'a> { /// any BBQueue buffer or directly sent throught this channel /// (still written to disk). receiver: flume::Receiver, + /// Indicates the consumer to observe. This cycling range + /// ensures fair distribution of work among consumers. + look_at_consumer: Cycle>, /// The BBQueue frames to read when waking-up. consumers: Vec>, } @@ -148,16 +157,9 @@ impl<'a> WriterBbqueueReceiver<'a> { } /// Reads all the BBQueue buffers and selects the first available frame. - /// - /// Note: Selecting the first available frame gives preference to - /// frames that will be cleaned up first. It may result in the - /// last frames being more likely to fill up. One potential optimization - /// could involve keeping track of the last processed BBQueue index - /// to cycle through the frames instead of always starting from the - /// beginning. pub fn recv_frame(&mut self) -> Option> { - for consumer in &mut self.consumers { - if let Some(frame) = consumer.read() { + for index in self.look_at_consumer.by_ref().take(self.consumers.len()) { + if let Some(frame) = self.consumers[index].read() { return Some(FrameWithHeader::from(frame)); } } @@ -511,9 +513,6 @@ impl<'b> ExtractorBbqueueSender<'b> { } } - // We could commit only the used memory. - grant.commit(total_length); - // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. if self.sender.is_empty() {