From bcab61ab1d83738710a69766bf4c3723b1596906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 2 Dec 2024 10:42:47 +0100 Subject: [PATCH] Do spurious wake ups on the receiver side --- crates/milli/src/update/new/channel.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index e8bb6930c..631fcf74e 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -3,11 +3,12 @@ use std::io::{self, BufWriter}; use std::marker::PhantomData; use std::mem; use std::num::NonZeroU16; +use std::time::Duration; use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer}; use bbqueue::BBBuffer; use bytemuck::{checked, CheckedBitPattern, NoUninit}; -use flume::SendError; +use flume::{RecvTimeoutError, SendError}; use heed::types::Bytes; use heed::BytesDecode; use memmap2::{Mmap, MmapMut}; @@ -136,10 +137,24 @@ impl LargeVectors { } impl<'a> WriterBbqueueReceiver<'a> { + /// Tries to receive an action to do until the timeout occurs + /// and if it does, consider it as a spurious wake up. pub fn recv_action(&mut self) -> Option { - self.receiver.recv().ok() + match self.receiver.recv_timeout(Duration::from_millis(100)) { + Ok(action) => Some(action), + Err(RecvTimeoutError::Timeout) => Some(ReceiverAction::WakeUp), + Err(RecvTimeoutError::Disconnected) => None, + } } + /// Reads all the BBQueue buffers and selects the first available frame. + /// + /// Note: Selecting the first available frame gives preference to + /// frames that will be cleaned up first. It may result in the + /// last frames being more likely to fill up. One potential optimization + /// could involve keeping track of the last processed BBQueue index + /// to cycle through the frames instead of always starting from the + /// beginning. pub fn recv_frame(&mut self) -> Option> { for consumer in &mut self.consumers { if let Some(frame) = consumer.read() {