Compare commits

..

12 Commits

Author SHA1 Message Date
Louis Dureuil
88d2c9b652
Merge b2abef6522b87487ccb6c37155391a03472d29fc into 4224edea28bc66e8de30d6ad69cdab5aaa922335 2025-01-29 16:58:54 +00:00
Louis Dureuil
b2abef6522
Fix tests 2025-01-29 17:58:45 +01:00
Louis Dureuil
e8c4ea5de2
Support network in dumps 2025-01-29 17:31:37 +01:00
meili-bors[bot]
4224edea28
Merge #5177
Some checks failed
Test suite / Tests on ubuntu-20.04 (push) Failing after 0s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 2s
Test suite / Tests on windows-2022 (push) Failing after 23s
Test suite / Run Rustfmt (push) Successful in 2m17s
Test suite / Run Clippy (push) Successful in 5m55s
Test suite / Tests on macos-13 (push) Has been cancelled
5177: Debug log  the channel congestion r=Kerollmops a=Kerollmops

This PR displays the congestion of the BBQueue channel and the allocated memory for the channel and the extraction. This information can be beneficial for debugging and noticing slow disks. We show three pieces of information in debug:
- The direct attempts: the number of tries to send something in the BBQueue channel,
- The blocked attempts: the number of unsuccessful attempts that must be retried,
- The congestion: The percentage of blocking attempts. The higher, the slower the receiver and, therefore, the disk.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
2025-01-29 15:35:31 +00:00
Kerollmops
cb1b7513af
Log the memory metrics only once 2025-01-29 15:21:52 +01:00
meili-bors[bot]
2f89b8209f
Merge #5291
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 12s
Test suite / Run tests in debug (push) Failing after 12s
Test suite / Run Clippy (push) Failing after 21s
Test suite / Run Rustfmt (push) Successful in 1m43s
Test suite / Tests on windows-2022 (push) Failing after 5m39s
Test suite / Tests on macos-13 (push) Has been cancelled
5291: Fix Dotnet tests in sdks-tests.yml r=irevoire a=curquiza



Co-authored-by: Clémentine <clementine@meilisearch.com>
2025-01-29 14:18:48 +00:00
Clément Renault
a9d0f4a002
Improve english comments
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2025-01-29 15:16:40 +01:00
Kerollmops
db032079d8
Show indexation allocated memory 2025-01-29 14:21:02 +01:00
Clément Renault
a00796c46a
Improve the naming in the log message 2025-01-29 14:21:02 +01:00
Kerollmops
6112bd8caa
Display the channel congestion 2025-01-29 14:21:02 +01:00
Kerollmops
cec88cfc29
Measure the bbqueue congestion 2025-01-29 14:21:02 +01:00
Clémentine
f0d7ab81ad
Fix Dotnet tests in sdks-tests.yml 2025-01-27 15:37:32 +01:00
14 changed files with 233 additions and 44 deletions

View File

@ -52,7 +52,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@v4
with:
dotnet-version: "6.0.x"
dotnet-version: "8.0.x"
- name: Install dependencies
run: dotnet restore
- name: Build

View File

@ -229,7 +229,7 @@ pub(crate) mod test {
use big_s::S;
use maplit::{btreemap, btreeset};
use meilisearch_types::facet_values_sort::FacetValuesSort;
use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures};
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::keys::{Action, Key};
use meilisearch_types::milli;
@ -455,6 +455,10 @@ pub(crate) mod test {
dump.create_experimental_features(features).unwrap();
// ========== network
let network = create_test_network();
dump.create_network(network).unwrap();
// create the dump
let mut file = tempfile::tempfile().unwrap();
dump.persist_to(&mut file).unwrap();
@ -467,6 +471,13 @@ pub(crate) mod test {
RuntimeTogglableFeatures::default()
}
fn create_test_network() -> Network {
Network {
local: Some("myself".to_string()),
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()) }},
}
}
#[test]
fn test_creating_and_read_dump() {
let mut file = create_test_dump();
@ -515,5 +526,9 @@ pub(crate) mod test {
// ==== checking the features
let expected = create_test_features();
assert_eq!(dump.features().unwrap().unwrap(), expected);
// ==== checking the network
let expected = create_test_network();
assert_eq!(&expected, dump.network().unwrap().unwrap());
}
}

View File

@ -196,6 +196,10 @@ impl CompatV5ToV6 {
pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> {
Ok(None)
}
pub fn network(&self) -> Result<Option<&v6::Network>> {
Ok(None)
}
}
pub enum CompatIndexV5ToV6 {

View File

@ -23,6 +23,7 @@ mod v6;
pub type Document = serde_json::Map<String, serde_json::Value>;
pub type UpdateFile = dyn Iterator<Item = Result<Document>>;
#[allow(clippy::large_enum_variant)]
pub enum DumpReader {
Current(V6Reader),
Compat(CompatV5ToV6),
@ -114,6 +115,13 @@ impl DumpReader {
DumpReader::Compat(compat) => compat.features(),
}
}
pub fn network(&self) -> Result<Option<&v6::Network>> {
match self {
DumpReader::Current(current) => Ok(current.network()),
DumpReader::Compat(compat) => compat.network(),
}
}
}
impl From<V6Reader> for DumpReader {
@ -328,6 +336,7 @@ pub(crate) mod test {
}
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
assert_eq!(dump.network().unwrap(), None);
}
#[test]
@ -373,6 +382,27 @@ pub(crate) mod test {
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
}
#[test]
fn import_dump_v6_network() {
let dump = File::open("tests/assets/v6-with-network.dump").unwrap();
let dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_snapshot!(dump.date().unwrap(), @"2025-01-29 15:45:32.738676 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
// network
let network = dump.network().unwrap().unwrap();
insta::assert_snapshot!(network.local.as_ref().unwrap(), @"ms-0");
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().url, @"http://localhost:7700");
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().search_api_key.is_none(), @"true");
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().url, @"http://localhost:7701");
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().search_api_key.is_none(), @"true");
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().url, @"http://ms-5679.example.meilisearch.io");
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().search_api_key.as_ref().unwrap(), @"foo");
}
#[test]
fn import_dump_v5() {
let dump = File::open("tests/assets/v5.dump").unwrap();

View File

@ -20,6 +20,7 @@ pub type Unchecked = meilisearch_types::settings::Unchecked;
pub type Task = crate::TaskDump;
pub type Key = meilisearch_types::keys::Key;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
pub type Network = meilisearch_types::features::Network;
// ===== Other types to clarify the code of the compat module
// everything related to the tasks
@ -50,6 +51,7 @@ pub struct V6Reader {
tasks: BufReader<File>,
keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>,
network: Option<Network>,
}
impl V6Reader {
@ -78,12 +80,30 @@ impl V6Reader {
None
};
let network_file = match fs::read(dump.path().join("network.json")) {
Ok(network_file) => Some(network_file),
Err(error) => match error.kind() {
// Allows the file to be missing, this will only result in all experimental features disabled.
ErrorKind::NotFound => {
debug!("`network.json` not found in dump");
None
}
_ => return Err(error.into()),
},
};
let network = if let Some(network_file) = network_file {
Some(serde_json::from_reader(&*network_file)?)
} else {
None
};
Ok(V6Reader {
metadata: serde_json::from_reader(&*meta_file)?,
instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features,
network,
dump,
})
}
@ -154,6 +174,10 @@ impl V6Reader {
pub fn features(&self) -> Option<RuntimeTogglableFeatures> {
self.features
}
pub fn network(&self) -> Option<&Network> {
self.network.as_ref()
}
}
pub struct UpdateFile {

View File

@ -4,7 +4,7 @@ use std::path::PathBuf;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::features::{Network, RuntimeTogglableFeatures};
use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings};
use serde_json::{Map, Value};
@ -61,6 +61,10 @@ impl DumpWriter {
)?)
}
pub fn create_network(&self, network: Network) -> Result<()> {
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
}
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
let mut tar_encoder = tar::Builder::new(gz_encoder);
@ -295,7 +299,8 @@ pub(crate) mod test {
---- experimental-features.json
---- instance_uid.uuid
---- keys.jsonl
---- metadata.json
---- metadata.json
---- network.json
"###);
// ==== checking the top level infos

View File

@ -326,7 +326,7 @@ fn test_auto_deletion_of_tasks() {
fn test_task_queue_is_full() {
let (index_scheduler, mut handle) = IndexScheduler::test_with_custom_config(vec![], |config| {
// that's the minimum map size possible
config.task_db_size = 1048576;
config.task_db_size = 1048576 * 3;
None
});

View File

@ -219,6 +219,8 @@ impl IndexScheduler {
progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures);
let features = self.features().runtime_features();
dump.create_experimental_features(features)?;
let network = self.network();
dump.create_network(network)?;
let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"

View File

@ -431,10 +431,13 @@ fn import_dump(
keys.push(key);
}
// 3. Import the runtime features.
// 3. Import the runtime features and network
let features = dump_reader.features()?.unwrap_or_default();
index_scheduler.put_runtime_features(features)?;
let network = dump_reader.network()?.cloned().unwrap_or_default();
index_scheduler.put_network(network)?;
let indexer_config = index_scheduler.indexer_config();
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might

View File

@ -1908,7 +1908,8 @@ async fn import_dump_v6_containing_experimental_features() {
"metrics": false,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -2069,7 +2070,8 @@ async fn generate_and_import_dump_containing_vectors() {
"metrics": false,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);

View File

@ -21,7 +21,8 @@ async fn experimental_features() {
"metrics": false,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -33,7 +34,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -45,7 +47,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -58,7 +61,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -71,7 +75,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
}
@ -91,7 +96,8 @@ async fn experimental_feature_metrics() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -146,7 +152,7 @@ async fn errors() {
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`",
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `proxySearch`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"

View File

@ -5,6 +5,8 @@ use std::marker::PhantomData;
use std::mem;
use std::num::NonZeroU16;
use std::ops::Range;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameProducer};
@ -71,12 +73,23 @@ pub fn extractor_writer_bbqueue(
consumer
});
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
let sender = ExtractorBbqueueSender {
sender,
producers,
max_grant,
sent_messages_attempts: sent_messages_attempts.clone(),
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
};
let receiver = WriterBbqueueReceiver {
receiver,
look_at_consumer: (0..consumers.len()).cycle(),
consumers,
sent_messages_attempts,
blocking_sent_messages_attempts,
};
(sender, receiver)
}
@ -92,6 +105,12 @@ pub struct ExtractorBbqueueSender<'a> {
/// It will never be able to store more than that as the
/// buffer cannot split data into two parts.
max_grant: usize,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// messages failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
}
pub struct WriterBbqueueReceiver<'a> {
@ -104,6 +123,12 @@ pub struct WriterBbqueueReceiver<'a> {
look_at_consumer: Cycle<Range<usize>>,
/// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// message failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
}
/// The action to perform on the receiver/writer side.
@ -169,6 +194,16 @@ impl<'a> WriterBbqueueReceiver<'a> {
}
None
}
/// Returns the total count of attempts to send messages through the BBQueue channel.
pub fn sent_messages_attempts(&self) -> usize {
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
pub fn blocking_sent_messages_attempts(&self) -> usize {
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
}
pub struct FrameWithHeader<'a> {
@ -458,10 +493,17 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
payload_header.serialize_into(grant);
Ok(())
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
payload_header.serialize_into(grant);
Ok(())
},
)?;
Ok(())
}
@ -500,20 +542,28 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
if dimensions != 0 {
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
if dimensions != 0 {
let output_iter =
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
}
}
}
Ok(())
})?;
Ok(())
},
)?;
Ok(())
}
@ -571,13 +621,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)
},
)?;
Ok(())
}
@ -619,12 +676,19 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)
},
)?;
Ok(())
}
@ -637,12 +701,18 @@ fn reserve_and_write_grant<F>(
producer: &mut FrameProducer,
total_length: usize,
sender: &flume::Sender<ReceiverAction>,
sent_messages_attempts: &AtomicUsize,
blocking_sent_messages_attempts: &AtomicUsize,
f: F,
) -> crate::Result<()>
where
F: FnOnce(&mut [u8]) -> crate::Result<()>,
{
loop {
// An attempt means trying multiple times
// whether is succeeded or not.
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
for _ in 0..10_000 {
match producer.grant(total_length) {
Ok(mut grant) => {
@ -666,6 +736,10 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
// We made an attempt to send a message in the
// bbqueue channel but it didn't succeed.
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
// We prefer to yield and allow the writing thread
// to do its job, especially beneficial when there
// is only one CPU core available.

View File

@ -21,6 +21,7 @@ use crate::progress::Progress;
use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
use std::sync::Once;
pub(crate) mod de;
pub mod document_changes;
@ -33,6 +34,8 @@ mod post_processing;
mod update_by_function;
mod write;
static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
@ -93,6 +96,15 @@ where
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();

View File

@ -72,7 +72,19 @@ pub(super) fn write_to_db(
&mut aligned_embedding,
)?;
}
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
let direct_attempts = writer_receiver.sent_messages_attempts();
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
tracing::debug!(
"Channel congestion metrics - \
Attempts: {direct_attempts}, \
Blocked attempts: {blocking_attempts} \
({congestion_pct:.1}% congestion)"
);
Ok(())
}