mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-02-23 19:15:31 +08:00
Compare commits
9 Commits
42257eec53
...
4224edea28
Author | SHA1 | Date | |
---|---|---|---|
|
4224edea28 | ||
|
cb1b7513af | ||
|
2f89b8209f | ||
|
a9d0f4a002 | ||
|
db032079d8 | ||
|
a00796c46a | ||
|
6112bd8caa | ||
|
cec88cfc29 | ||
|
f0d7ab81ad |
2
.github/workflows/sdks-tests.yml
vendored
2
.github/workflows/sdks-tests.yml
vendored
@ -52,7 +52,7 @@ jobs:
|
|||||||
- name: Setup .NET Core
|
- name: Setup .NET Core
|
||||||
uses: actions/setup-dotnet@v4
|
uses: actions/setup-dotnet@v4
|
||||||
with:
|
with:
|
||||||
dotnet-version: "6.0.x"
|
dotnet-version: "8.0.x"
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
run: dotnet restore
|
run: dotnet restore
|
||||||
- name: Build
|
- name: Build
|
||||||
|
@ -5,6 +5,8 @@ use std::marker::PhantomData;
|
|||||||
use std::mem;
|
use std::mem;
|
||||||
use std::num::NonZeroU16;
|
use std::num::NonZeroU16;
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
|
use std::sync::atomic::{self, AtomicUsize};
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use bbqueue::framed::{FrameGrantR, FrameProducer};
|
use bbqueue::framed::{FrameGrantR, FrameProducer};
|
||||||
@ -71,12 +73,23 @@ pub fn extractor_writer_bbqueue(
|
|||||||
consumer
|
consumer
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
|
||||||
|
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
let (sender, receiver) = flume::bounded(channel_capacity);
|
let (sender, receiver) = flume::bounded(channel_capacity);
|
||||||
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
|
let sender = ExtractorBbqueueSender {
|
||||||
|
sender,
|
||||||
|
producers,
|
||||||
|
max_grant,
|
||||||
|
sent_messages_attempts: sent_messages_attempts.clone(),
|
||||||
|
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
|
||||||
|
};
|
||||||
let receiver = WriterBbqueueReceiver {
|
let receiver = WriterBbqueueReceiver {
|
||||||
receiver,
|
receiver,
|
||||||
look_at_consumer: (0..consumers.len()).cycle(),
|
look_at_consumer: (0..consumers.len()).cycle(),
|
||||||
consumers,
|
consumers,
|
||||||
|
sent_messages_attempts,
|
||||||
|
blocking_sent_messages_attempts,
|
||||||
};
|
};
|
||||||
(sender, receiver)
|
(sender, receiver)
|
||||||
}
|
}
|
||||||
@ -92,6 +105,12 @@ pub struct ExtractorBbqueueSender<'a> {
|
|||||||
/// It will never be able to store more than that as the
|
/// It will never be able to store more than that as the
|
||||||
/// buffer cannot split data into two parts.
|
/// buffer cannot split data into two parts.
|
||||||
max_grant: usize,
|
max_grant: usize,
|
||||||
|
/// The total number of attempts to send messages
|
||||||
|
/// over the bbqueue channel.
|
||||||
|
sent_messages_attempts: Arc<AtomicUsize>,
|
||||||
|
/// The number of times an attempt to send a
|
||||||
|
/// messages failed and we had to pause for a bit.
|
||||||
|
blocking_sent_messages_attempts: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WriterBbqueueReceiver<'a> {
|
pub struct WriterBbqueueReceiver<'a> {
|
||||||
@ -104,6 +123,12 @@ pub struct WriterBbqueueReceiver<'a> {
|
|||||||
look_at_consumer: Cycle<Range<usize>>,
|
look_at_consumer: Cycle<Range<usize>>,
|
||||||
/// The BBQueue frames to read when waking-up.
|
/// The BBQueue frames to read when waking-up.
|
||||||
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
|
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
|
||||||
|
/// The total number of attempts to send messages
|
||||||
|
/// over the bbqueue channel.
|
||||||
|
sent_messages_attempts: Arc<AtomicUsize>,
|
||||||
|
/// The number of times an attempt to send a
|
||||||
|
/// message failed and we had to pause for a bit.
|
||||||
|
blocking_sent_messages_attempts: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The action to perform on the receiver/writer side.
|
/// The action to perform on the receiver/writer side.
|
||||||
@ -169,6 +194,16 @@ impl<'a> WriterBbqueueReceiver<'a> {
|
|||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the total count of attempts to send messages through the BBQueue channel.
|
||||||
|
pub fn sent_messages_attempts(&self) -> usize {
|
||||||
|
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
|
||||||
|
pub fn blocking_sent_messages_attempts(&self) -> usize {
|
||||||
|
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FrameWithHeader<'a> {
|
pub struct FrameWithHeader<'a> {
|
||||||
@ -458,10 +493,17 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Spin loop to have a frame the size we requested.
|
// Spin loop to have a frame the size we requested.
|
||||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
reserve_and_write_grant(
|
||||||
payload_header.serialize_into(grant);
|
&mut producer,
|
||||||
Ok(())
|
total_length,
|
||||||
})?;
|
&self.sender,
|
||||||
|
&self.sent_messages_attempts,
|
||||||
|
&self.blocking_sent_messages_attempts,
|
||||||
|
|grant| {
|
||||||
|
payload_header.serialize_into(grant);
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -500,20 +542,28 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Spin loop to have a frame the size we requested.
|
// Spin loop to have a frame the size we requested.
|
||||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
reserve_and_write_grant(
|
||||||
let header_size = payload_header.header_size();
|
&mut producer,
|
||||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
total_length,
|
||||||
payload_header.serialize_into(header_bytes);
|
&self.sender,
|
||||||
|
&self.sent_messages_attempts,
|
||||||
|
&self.blocking_sent_messages_attempts,
|
||||||
|
|grant| {
|
||||||
|
let header_size = payload_header.header_size();
|
||||||
|
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||||
|
payload_header.serialize_into(header_bytes);
|
||||||
|
|
||||||
if dimensions != 0 {
|
if dimensions != 0 {
|
||||||
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
|
let output_iter =
|
||||||
for (embedding, output) in embeddings.iter().zip(output_iter) {
|
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
|
||||||
output.copy_from_slice(bytemuck::cast_slice(embedding));
|
for (embedding, output) in embeddings.iter().zip(output_iter) {
|
||||||
|
output.copy_from_slice(bytemuck::cast_slice(embedding));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -571,13 +621,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Spin loop to have a frame the size we requested.
|
// Spin loop to have a frame the size we requested.
|
||||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
reserve_and_write_grant(
|
||||||
let header_size = payload_header.header_size();
|
&mut producer,
|
||||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
total_length,
|
||||||
payload_header.serialize_into(header_bytes);
|
&self.sender,
|
||||||
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
|
&self.sent_messages_attempts,
|
||||||
key_value_writer(key_buffer, value_buffer)
|
&self.blocking_sent_messages_attempts,
|
||||||
})?;
|
|grant| {
|
||||||
|
let header_size = payload_header.header_size();
|
||||||
|
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||||
|
payload_header.serialize_into(header_bytes);
|
||||||
|
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
|
||||||
|
key_value_writer(key_buffer, value_buffer)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -619,12 +676,19 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Spin loop to have a frame the size we requested.
|
// Spin loop to have a frame the size we requested.
|
||||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
reserve_and_write_grant(
|
||||||
let header_size = payload_header.header_size();
|
&mut producer,
|
||||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
total_length,
|
||||||
payload_header.serialize_into(header_bytes);
|
&self.sender,
|
||||||
key_writer(remaining)
|
&self.sent_messages_attempts,
|
||||||
})?;
|
&self.blocking_sent_messages_attempts,
|
||||||
|
|grant| {
|
||||||
|
let header_size = payload_header.header_size();
|
||||||
|
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||||
|
payload_header.serialize_into(header_bytes);
|
||||||
|
key_writer(remaining)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -637,12 +701,18 @@ fn reserve_and_write_grant<F>(
|
|||||||
producer: &mut FrameProducer,
|
producer: &mut FrameProducer,
|
||||||
total_length: usize,
|
total_length: usize,
|
||||||
sender: &flume::Sender<ReceiverAction>,
|
sender: &flume::Sender<ReceiverAction>,
|
||||||
|
sent_messages_attempts: &AtomicUsize,
|
||||||
|
blocking_sent_messages_attempts: &AtomicUsize,
|
||||||
f: F,
|
f: F,
|
||||||
) -> crate::Result<()>
|
) -> crate::Result<()>
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
||||||
{
|
{
|
||||||
loop {
|
loop {
|
||||||
|
// An attempt means trying multiple times
|
||||||
|
// whether is succeeded or not.
|
||||||
|
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
for _ in 0..10_000 {
|
for _ in 0..10_000 {
|
||||||
match producer.grant(total_length) {
|
match producer.grant(total_length) {
|
||||||
Ok(mut grant) => {
|
Ok(mut grant) => {
|
||||||
@ -666,6 +736,10 @@ where
|
|||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We made an attempt to send a message in the
|
||||||
|
// bbqueue channel but it didn't succeed.
|
||||||
|
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
// We prefer to yield and allow the writing thread
|
// We prefer to yield and allow the writing thread
|
||||||
// to do its job, especially beneficial when there
|
// to do its job, especially beneficial when there
|
||||||
// is only one CPU core available.
|
// is only one CPU core available.
|
||||||
|
@ -21,6 +21,7 @@ use crate::progress::Progress;
|
|||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
||||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
|
||||||
|
use std::sync::Once;
|
||||||
|
|
||||||
pub(crate) mod de;
|
pub(crate) mod de;
|
||||||
pub mod document_changes;
|
pub mod document_changes;
|
||||||
@ -33,6 +34,8 @@ mod post_processing;
|
|||||||
mod update_by_function;
|
mod update_by_function;
|
||||||
mod write;
|
mod write;
|
||||||
|
|
||||||
|
static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
|
||||||
|
|
||||||
/// This is the main function of this crate.
|
/// This is the main function of this crate.
|
||||||
///
|
///
|
||||||
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
|
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
|
||||||
@ -93,6 +96,15 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
LOG_MEMORY_METRICS_ONCE.call_once(|| {
|
||||||
|
tracing::debug!(
|
||||||
|
"Indexation allocated memory metrics - \
|
||||||
|
Total BBQueue size: {total_bbbuffer_capacity}, \
|
||||||
|
Total extractor memory: {:?}",
|
||||||
|
grenad_parameters.max_memory,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
let (extractor_sender, writer_receiver) = pool
|
let (extractor_sender, writer_receiver) = pool
|
||||||
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -72,7 +72,19 @@ pub(super) fn write_to_db(
|
|||||||
&mut aligned_embedding,
|
&mut aligned_embedding,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
|
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
|
||||||
|
|
||||||
|
let direct_attempts = writer_receiver.sent_messages_attempts();
|
||||||
|
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
|
||||||
|
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
|
||||||
|
tracing::debug!(
|
||||||
|
"Channel congestion metrics - \
|
||||||
|
Attempts: {direct_attempts}, \
|
||||||
|
Blocked attempts: {blocking_attempts} \
|
||||||
|
({congestion_pct:.1}% congestion)"
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user