diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 98538ea9e..6a47dc606 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -16,7 +16,14 @@ use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { let (sender, receiver) = crossbeam_channel::bounded(cap); - (MergerSender(sender), WriterReceiver(receiver)) + ( + MergerSender { + sender, + send_count: Default::default(), + contentious_count: Default::default(), + }, + WriterReceiver(receiver), + ) } /// The capacity of the channel is currently in number of messages. @@ -169,23 +176,40 @@ impl IntoIterator for WriterReceiver { } } -pub struct MergerSender(Sender); +pub struct MergerSender { + sender: Sender, + /// The number of message we send in total in the channel. + send_count: std::cell::Cell, + /// The number of times we sent something in a channel that was full. + contentious_count: std::cell::Cell, +} + +impl Drop for MergerSender { + fn drop(&mut self) { + tracing::info!( + "Merger channel stats: {} sends, {} were contentious (ratio {})", + self.send_count.get(), + self.contentious_count.get(), + self.contentious_count.get() as f64 / self.send_count.get() as f64 + ) + } +} impl MergerSender { pub fn main(&self) -> MainSender<'_> { - MainSender(&self.0) + MainSender(self) } pub fn docids(&self) -> WordDocidsSender<'_, D> { - WordDocidsSender { sender: &self.0, _marker: PhantomData } + WordDocidsSender { sender: self, _marker: PhantomData } } pub fn facet_docids(&self) -> FacetDocidsSender<'_> { - FacetDocidsSender { sender: &self.0 } + FacetDocidsSender { sender: self } } pub fn documents(&self) -> DocumentsSender<'_> { - DocumentsSender(&self.0) + DocumentsSender(self) } pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> { @@ -193,14 +217,25 @@ impl MergerSender { DOCUMENTS_IDS_KEY.as_bytes(), bitmap, )); - match self.0.send(WriterOperation { database: Database::Main, entry }) { + match self.send(WriterOperation { database: Database::Main, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> { + if self.sender.is_full() { + self.contentious_count.set(self.contentious_count.get() + 1); + } + self.send_count.set(self.send_count.get() + 1); + match self.sender.send(op) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } } -pub struct MainSender<'a>(&'a Sender); +pub struct MainSender<'a>(&'a MergerSender); impl MainSender<'_> { pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> { @@ -311,7 +346,7 @@ pub trait DocidsSender { } pub struct WordDocidsSender<'a, D> { - sender: &'a Sender, + sender: &'a MergerSender, _marker: PhantomData, } @@ -334,7 +369,7 @@ impl DocidsSender for WordDocidsSender<'_, D> { } pub struct FacetDocidsSender<'a> { - sender: &'a Sender, + sender: &'a MergerSender, } impl DocidsSender for FacetDocidsSender<'_> { @@ -370,7 +405,7 @@ impl FacetDocidsSender<'_> { } } -pub struct DocumentsSender<'a>(&'a Sender); +pub struct DocumentsSender<'a>(&'a MergerSender); impl DocumentsSender<'_> { /// TODO do that efficiently @@ -426,7 +461,7 @@ pub struct ExtractorSender(Sender); impl ExtractorSender { pub fn document_sender(&self) -> DocumentSender<'_> { - DocumentSender(&self.0) + DocumentSender(Some(&self.0)) } pub fn send_searchable( @@ -440,7 +475,7 @@ impl ExtractorSender { } } -pub struct DocumentSender<'a>(&'a Sender); +pub struct DocumentSender<'a>(Option<&'a Sender>); impl DocumentSender<'_> { pub fn insert( @@ -448,21 +483,24 @@ impl DocumentSender<'_> { docid: DocumentId, document: Box, ) -> StdResult<(), SendError<()>> { - match self.0.send(MergerOperation::InsertDocument { docid, document }) { + let sender = self.0.unwrap(); + match sender.send(MergerOperation::InsertDocument { docid, document }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { - match self.0.send(MergerOperation::DeleteDocument { docid }) { + let sender = self.0.unwrap(); + match sender.send(MergerOperation::DeleteDocument { docid }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } - pub fn finish(self) -> StdResult<(), SendError<()>> { - match self.0.send(MergerOperation::FinishedDocument) { + pub fn finish(mut self) -> StdResult<(), SendError<()>> { + let sender = self.0.take().unwrap(); + match sender.send(MergerOperation::FinishedDocument) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } @@ -471,6 +509,8 @@ impl DocumentSender<'_> { impl Drop for DocumentSender<'_> { fn drop(&mut self) { - self.0.send(MergerOperation::FinishedDocument); + if let Some(sender) = self.0.take() { + sender.send(MergerOperation::FinishedDocument); + } } } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index b40ddbc4d..3a9db79b6 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -17,7 +17,7 @@ use super::extract::*; use super::merger::merge_grenad_entries; use super::StdResult; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; -use crate::update::new::channel::{DatabaseType, ExtractorSender}; +use crate::update::new::channel::ExtractorSender; use crate::update::GrenadParameters; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};