Compare commits

..

9 Commits

Author SHA1 Message Date
meili-bors[bot]
4224edea28
Merge #5177
Some checks failed
Test suite / Tests on ubuntu-20.04 (push) Failing after 0s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 2s
Test suite / Tests on windows-2022 (push) Failing after 23s
Test suite / Run Rustfmt (push) Successful in 2m17s
Test suite / Run Clippy (push) Successful in 5m55s
Test suite / Tests on macos-13 (push) Has been cancelled
5177: Debug log  the channel congestion r=Kerollmops a=Kerollmops

This PR displays the congestion of the BBQueue channel and the allocated memory for the channel and the extraction. This information can be beneficial for debugging and noticing slow disks. We show three pieces of information in debug:
- The direct attempts: the number of tries to send something in the BBQueue channel,
- The blocked attempts: the number of unsuccessful attempts that must be retried,
- The congestion: The percentage of blocking attempts. The higher, the slower the receiver and, therefore, the disk.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
2025-01-29 15:35:31 +00:00
Kerollmops
cb1b7513af
Log the memory metrics only once 2025-01-29 15:21:52 +01:00
meili-bors[bot]
2f89b8209f
Merge #5291
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 12s
Test suite / Run tests in debug (push) Failing after 12s
Test suite / Run Clippy (push) Failing after 21s
Test suite / Run Rustfmt (push) Successful in 1m43s
Test suite / Tests on windows-2022 (push) Failing after 5m39s
Test suite / Tests on macos-13 (push) Has been cancelled
5291: Fix Dotnet tests in sdks-tests.yml r=irevoire a=curquiza



Co-authored-by: Clémentine <clementine@meilisearch.com>
2025-01-29 14:18:48 +00:00
Clément Renault
a9d0f4a002
Improve english comments
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2025-01-29 15:16:40 +01:00
Kerollmops
db032079d8
Show indexation allocated memory 2025-01-29 14:21:02 +01:00
Clément Renault
a00796c46a
Improve the naming in the log message 2025-01-29 14:21:02 +01:00
Kerollmops
6112bd8caa
Display the channel congestion 2025-01-29 14:21:02 +01:00
Kerollmops
cec88cfc29
Measure the bbqueue congestion 2025-01-29 14:21:02 +01:00
Clémentine
f0d7ab81ad
Fix Dotnet tests in sdks-tests.yml 2025-01-27 15:37:32 +01:00
4 changed files with 128 additions and 30 deletions

View File

@ -52,7 +52,7 @@ jobs:
- name: Setup .NET Core - name: Setup .NET Core
uses: actions/setup-dotnet@v4 uses: actions/setup-dotnet@v4
with: with:
dotnet-version: "6.0.x" dotnet-version: "8.0.x"
- name: Install dependencies - name: Install dependencies
run: dotnet restore run: dotnet restore
- name: Build - name: Build

View File

@ -5,6 +5,8 @@ use std::marker::PhantomData;
use std::mem; use std::mem;
use std::num::NonZeroU16; use std::num::NonZeroU16;
use std::ops::Range; use std::ops::Range;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameProducer}; use bbqueue::framed::{FrameGrantR, FrameProducer};
@ -71,12 +73,23 @@ pub fn extractor_writer_bbqueue(
consumer consumer
}); });
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let (sender, receiver) = flume::bounded(channel_capacity); let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, max_grant }; let sender = ExtractorBbqueueSender {
sender,
producers,
max_grant,
sent_messages_attempts: sent_messages_attempts.clone(),
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
};
let receiver = WriterBbqueueReceiver { let receiver = WriterBbqueueReceiver {
receiver, receiver,
look_at_consumer: (0..consumers.len()).cycle(), look_at_consumer: (0..consumers.len()).cycle(),
consumers, consumers,
sent_messages_attempts,
blocking_sent_messages_attempts,
}; };
(sender, receiver) (sender, receiver)
} }
@ -92,6 +105,12 @@ pub struct ExtractorBbqueueSender<'a> {
/// It will never be able to store more than that as the /// It will never be able to store more than that as the
/// buffer cannot split data into two parts. /// buffer cannot split data into two parts.
max_grant: usize, max_grant: usize,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// messages failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
} }
pub struct WriterBbqueueReceiver<'a> { pub struct WriterBbqueueReceiver<'a> {
@ -104,6 +123,12 @@ pub struct WriterBbqueueReceiver<'a> {
look_at_consumer: Cycle<Range<usize>>, look_at_consumer: Cycle<Range<usize>>,
/// The BBQueue frames to read when waking-up. /// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>, consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// message failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
} }
/// The action to perform on the receiver/writer side. /// The action to perform on the receiver/writer side.
@ -169,6 +194,16 @@ impl<'a> WriterBbqueueReceiver<'a> {
} }
None None
} }
/// Returns the total count of attempts to send messages through the BBQueue channel.
pub fn sent_messages_attempts(&self) -> usize {
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
pub fn blocking_sent_messages_attempts(&self) -> usize {
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
} }
pub struct FrameWithHeader<'a> { pub struct FrameWithHeader<'a> {
@ -458,10 +493,17 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
payload_header.serialize_into(grant); payload_header.serialize_into(grant);
Ok(()) Ok(())
})?; },
)?;
Ok(()) Ok(())
} }
@ -500,20 +542,28 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size(); let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
if dimensions != 0 { if dimensions != 0 {
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>()); let output_iter =
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) { for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding)); output.copy_from_slice(bytemuck::cast_slice(embedding));
} }
} }
Ok(()) Ok(())
})?; },
)?;
Ok(()) Ok(())
} }
@ -571,13 +621,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size(); let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer) key_value_writer(key_buffer, value_buffer)
})?; },
)?;
Ok(()) Ok(())
} }
@ -619,12 +676,19 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size(); let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
key_writer(remaining) key_writer(remaining)
})?; },
)?;
Ok(()) Ok(())
} }
@ -637,12 +701,18 @@ fn reserve_and_write_grant<F>(
producer: &mut FrameProducer, producer: &mut FrameProducer,
total_length: usize, total_length: usize,
sender: &flume::Sender<ReceiverAction>, sender: &flume::Sender<ReceiverAction>,
sent_messages_attempts: &AtomicUsize,
blocking_sent_messages_attempts: &AtomicUsize,
f: F, f: F,
) -> crate::Result<()> ) -> crate::Result<()>
where where
F: FnOnce(&mut [u8]) -> crate::Result<()>, F: FnOnce(&mut [u8]) -> crate::Result<()>,
{ {
loop { loop {
// An attempt means trying multiple times
// whether is succeeded or not.
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
for _ in 0..10_000 { for _ in 0..10_000 {
match producer.grant(total_length) { match producer.grant(total_length) {
Ok(mut grant) => { Ok(mut grant) => {
@ -666,6 +736,10 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation)); return Err(Error::InternalError(InternalError::AbortedIndexation));
} }
// We made an attempt to send a message in the
// bbqueue channel but it didn't succeed.
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
// We prefer to yield and allow the writing thread // We prefer to yield and allow the writing thread
// to do its job, especially beneficial when there // to do its job, especially beneficial when there
// is only one CPU core available. // is only one CPU core available.

View File

@ -21,6 +21,7 @@ use crate::progress::Progress;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
use std::sync::Once;
pub(crate) mod de; pub(crate) mod de;
pub mod document_changes; pub mod document_changes;
@ -33,6 +34,8 @@ mod post_processing;
mod update_by_function; mod update_by_function;
mod write; mod write;
static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
/// This is the main function of this crate. /// This is the main function of this crate.
/// ///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
@ -93,6 +96,15 @@ where
}, },
); );
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (extractor_sender, writer_receiver) = pool let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap(); .unwrap();

View File

@ -72,7 +72,19 @@ pub(super) fn write_to_db(
&mut aligned_embedding, &mut aligned_embedding,
)?; )?;
} }
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?; write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
let direct_attempts = writer_receiver.sent_messages_attempts();
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
tracing::debug!(
"Channel congestion metrics - \
Attempts: {direct_attempts}, \
Blocked attempts: {blocking_attempts} \
({congestion_pct:.1}% congestion)"
);
Ok(()) Ok(())
} }