Measure merger writer channel contention

This commit is contained in:
Clément Renault 2024-09-23 11:07:59 +02:00
parent f4ab1f168e
commit 013acb3d93
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 59 additions and 19 deletions

View File

@ -16,7 +16,14 @@ use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) {
let (sender, receiver) = crossbeam_channel::bounded(cap); let (sender, receiver) = crossbeam_channel::bounded(cap);
(MergerSender(sender), WriterReceiver(receiver)) (
MergerSender {
sender,
send_count: Default::default(),
contentious_count: Default::default(),
},
WriterReceiver(receiver),
)
} }
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
@ -169,23 +176,40 @@ impl IntoIterator for WriterReceiver {
} }
} }
pub struct MergerSender(Sender<WriterOperation>); pub struct MergerSender {
sender: Sender<WriterOperation>,
/// The number of message we send in total in the channel.
send_count: std::cell::Cell<usize>,
/// The number of times we sent something in a channel that was full.
contentious_count: std::cell::Cell<usize>,
}
impl Drop for MergerSender {
fn drop(&mut self) {
tracing::info!(
"Merger channel stats: {} sends, {} were contentious (ratio {})",
self.send_count.get(),
self.contentious_count.get(),
self.contentious_count.get() as f64 / self.send_count.get() as f64
)
}
}
impl MergerSender { impl MergerSender {
pub fn main(&self) -> MainSender<'_> { pub fn main(&self) -> MainSender<'_> {
MainSender(&self.0) MainSender(self)
} }
pub fn docids<D: DatabaseType>(&self) -> WordDocidsSender<'_, D> { pub fn docids<D: DatabaseType>(&self) -> WordDocidsSender<'_, D> {
WordDocidsSender { sender: &self.0, _marker: PhantomData } WordDocidsSender { sender: self, _marker: PhantomData }
} }
pub fn facet_docids(&self) -> FacetDocidsSender<'_> { pub fn facet_docids(&self) -> FacetDocidsSender<'_> {
FacetDocidsSender { sender: &self.0 } FacetDocidsSender { sender: self }
} }
pub fn documents(&self) -> DocumentsSender<'_> { pub fn documents(&self) -> DocumentsSender<'_> {
DocumentsSender(&self.0) DocumentsSender(self)
} }
pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> { pub fn send_documents_ids(&self, bitmap: &[u8]) -> StdResult<(), SendError<()>> {
@ -193,14 +217,25 @@ impl MergerSender {
DOCUMENTS_IDS_KEY.as_bytes(), DOCUMENTS_IDS_KEY.as_bytes(),
bitmap, bitmap,
)); ));
match self.0.send(WriterOperation { database: Database::Main, entry }) { match self.send(WriterOperation { database: Database::Main, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> {
if self.sender.is_full() {
self.contentious_count.set(self.contentious_count.get() + 1);
}
self.send_count.set(self.send_count.get() + 1);
match self.sender.send(op) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())), Err(SendError(_)) => Err(SendError(())),
} }
} }
} }
pub struct MainSender<'a>(&'a Sender<WriterOperation>); pub struct MainSender<'a>(&'a MergerSender);
impl MainSender<'_> { impl MainSender<'_> {
pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> { pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> {
@ -311,7 +346,7 @@ pub trait DocidsSender {
} }
pub struct WordDocidsSender<'a, D> { pub struct WordDocidsSender<'a, D> {
sender: &'a Sender<WriterOperation>, sender: &'a MergerSender,
_marker: PhantomData<D>, _marker: PhantomData<D>,
} }
@ -334,7 +369,7 @@ impl<D: DatabaseType> DocidsSender for WordDocidsSender<'_, D> {
} }
pub struct FacetDocidsSender<'a> { pub struct FacetDocidsSender<'a> {
sender: &'a Sender<WriterOperation>, sender: &'a MergerSender,
} }
impl DocidsSender for FacetDocidsSender<'_> { impl DocidsSender for FacetDocidsSender<'_> {
@ -370,7 +405,7 @@ impl FacetDocidsSender<'_> {
} }
} }
pub struct DocumentsSender<'a>(&'a Sender<WriterOperation>); pub struct DocumentsSender<'a>(&'a MergerSender);
impl DocumentsSender<'_> { impl DocumentsSender<'_> {
/// TODO do that efficiently /// TODO do that efficiently
@ -426,7 +461,7 @@ pub struct ExtractorSender(Sender<MergerOperation>);
impl ExtractorSender { impl ExtractorSender {
pub fn document_sender(&self) -> DocumentSender<'_> { pub fn document_sender(&self) -> DocumentSender<'_> {
DocumentSender(&self.0) DocumentSender(Some(&self.0))
} }
pub fn send_searchable<D: MergerOperationType>( pub fn send_searchable<D: MergerOperationType>(
@ -440,7 +475,7 @@ impl ExtractorSender {
} }
} }
pub struct DocumentSender<'a>(&'a Sender<MergerOperation>); pub struct DocumentSender<'a>(Option<&'a Sender<MergerOperation>>);
impl DocumentSender<'_> { impl DocumentSender<'_> {
pub fn insert( pub fn insert(
@ -448,21 +483,24 @@ impl DocumentSender<'_> {
docid: DocumentId, docid: DocumentId,
document: Box<KvReaderFieldId>, document: Box<KvReaderFieldId>,
) -> StdResult<(), SendError<()>> { ) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::InsertDocument { docid, document }) { let sender = self.0.unwrap();
match sender.send(MergerOperation::InsertDocument { docid, document }) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())), Err(SendError(_)) => Err(SendError(())),
} }
} }
pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::DeleteDocument { docid }) { let sender = self.0.unwrap();
match sender.send(MergerOperation::DeleteDocument { docid }) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())), Err(SendError(_)) => Err(SendError(())),
} }
} }
pub fn finish(self) -> StdResult<(), SendError<()>> { pub fn finish(mut self) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::FinishedDocument) { let sender = self.0.take().unwrap();
match sender.send(MergerOperation::FinishedDocument) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())), Err(SendError(_)) => Err(SendError(())),
} }
@ -471,6 +509,8 @@ impl DocumentSender<'_> {
impl Drop for DocumentSender<'_> { impl Drop for DocumentSender<'_> {
fn drop(&mut self) { fn drop(&mut self) {
self.0.send(MergerOperation::FinishedDocument); if let Some(sender) = self.0.take() {
sender.send(MergerOperation::FinishedDocument);
}
} }
} }

View File

@ -17,7 +17,7 @@ use super::extract::*;
use super::merger::merge_grenad_entries; use super::merger::merge_grenad_entries;
use super::StdResult; use super::StdResult;
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::update::new::channel::{DatabaseType, ExtractorSender}; use crate::update::new::channel::ExtractorSender;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};