diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index d1d64814e..0a6d37943 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -422,6 +422,12 @@ impl<'b> ExtractorBbqueueSender<'b> { // We could commit only the used memory. grant.commit(total_length); + // We only send a wake up message when the channel is empty + // so that we don't fill the channel with too many WakeUps. + if self.sender.is_empty() { + self.sender.send(ReceiverAction::WakeUp).unwrap(); + } + Ok(()) } @@ -460,6 +466,12 @@ impl<'b> ExtractorBbqueueSender<'b> { // We could commit only the used memory. grant.commit(total_length); + // We only send a wake up message when the channel is empty + // so that we don't fill the channel with too many WakeUps. + if self.sender.is_empty() { + self.sender.send(ReceiverAction::WakeUp).unwrap(); + } + Ok(()) } @@ -511,6 +523,12 @@ impl<'b> ExtractorBbqueueSender<'b> { // We could commit only the used memory. grant.commit(total_length); + // We only send a wake up message when the channel is empty + // so that we don't fill the channel with too many WakeUps. + if self.sender.is_empty() { + self.sender.send(ReceiverAction::WakeUp).unwrap(); + } + Ok(()) } @@ -561,6 +579,12 @@ impl<'b> ExtractorBbqueueSender<'b> { // We could commit only the used memory. grant.commit(total_length); + // We only send a wake up message when the channel is empty + // so that we don't fill the channel with too many WakeUps. + if self.sender.is_empty() { + self.sender.send(ReceiverAction::WakeUp).unwrap(); + } + Ok(()) } } diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 982868d93..835ee240b 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -420,61 +420,27 @@ where } } - while let Some(frame_with_header) = writer_receiver.read() { - match frame_with_header.header() { - EntryHeader::DbOperation(operation) => { - let database_name = operation.database.database_name(); - let database = operation.database.database(index); - let frame = frame_with_header.frame(); - match operation.key_value(frame) { - (key, Some(value)) => { - if let Err(error) = database.put(wtxn, key, value) { - return Err(Error::InternalError(InternalError::StorePut { - database_name, - key: key.into(), - value_length: value.len(), - error, - })); - } - } - (key, None) => match database.delete(wtxn, key) { - Ok(false) => { - unreachable!("We tried to delete an unknown key: {key:?}") - } - Ok(_) => (), - Err(error) => { - return Err(Error::InternalError( - InternalError::StoreDeletion { - database_name, - key: key.into(), - error, - }, - )); - } - }, - } - } - EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { - for (_index, (_name, _embedder, writer, dimensions)) in &mut arroy_writers { - let dimensions = *dimensions; - writer.del_items(wtxn, dimensions, docid)?; - } - } - EntryHeader::ArroySetVector(asv) => { - let ArroySetVector { docid, embedder_id, .. } = asv; - let frame = frame_with_header.frame(); - let embedding = asv.read_embedding_into_vec(frame, &mut aligned_embedding); - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_item(wtxn, docid, embedding)?; - } - } - } + // Every time the is a message in the channel we search + // for new entries in the BBQueue buffers. + write_from_bbqueue( + &mut writer_receiver, + index, + wtxn, + &arroy_writers, + &mut aligned_embedding, + )?; } - } - todo!("read the BBQueue once the channel is closed"); + // Once the extractor/writer channel is closed + // we must process the remaining BBQueue messages. + write_from_bbqueue( + &mut writer_receiver, + index, + wtxn, + &arroy_writers, + &mut aligned_embedding, + )?; + } 'vectors: { let span = @@ -548,6 +514,70 @@ where Ok(()) } +/// A function dedicated to manage all the available BBQueue frames. +/// +/// It reads all the available frames, do the corresponding database operations +/// and stops when no frame are available. +fn write_from_bbqueue( + writer_receiver: &mut WriterBbqueueReceiver<'_>, + index: &Index, + wtxn: &mut RwTxn<'_>, + arroy_writers: &HashMap, + aligned_embedding: &mut Vec, +) -> crate::Result<()> { + while let Some(frame_with_header) = writer_receiver.read() { + match frame_with_header.header() { + EntryHeader::DbOperation(operation) => { + let database_name = operation.database.database_name(); + let database = operation.database.database(index); + let frame = frame_with_header.frame(); + match operation.key_value(frame) { + (key, Some(value)) => { + if let Err(error) = database.put(wtxn, key, value) { + return Err(Error::InternalError(InternalError::StorePut { + database_name, + key: key.into(), + value_length: value.len(), + error, + })); + } + } + (key, None) => match database.delete(wtxn, key) { + Ok(false) => { + unreachable!("We tried to delete an unknown key: {key:?}") + } + Ok(_) => (), + Err(error) => { + return Err(Error::InternalError(InternalError::StoreDeletion { + database_name, + key: key.into(), + error, + })); + } + }, + } + } + EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { + for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers { + let dimensions = *dimensions; + writer.del_items(wtxn, dimensions, docid)?; + } + } + EntryHeader::ArroySetVector(asv) => { + let ArroySetVector { docid, embedder_id, .. } = asv; + let frame = frame_with_header.frame(); + let embedding = asv.read_embedding_into_vec(frame, aligned_embedding); + let (_, _, writer, dimensions) = + arroy_writers.get(&embedder_id).expect("requested a missing embedder"); + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_item(wtxn, docid, embedding)?; + } + } + } + + Ok(()) +} + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] fn compute_prefix_database( index: &Index,