Compare commits

..

1 Commits

14 changed files with 44 additions and 233 deletions

View File

@ -52,7 +52,7 @@ jobs:
- name: Setup .NET Core - name: Setup .NET Core
uses: actions/setup-dotnet@v4 uses: actions/setup-dotnet@v4
with: with:
dotnet-version: "8.0.x" dotnet-version: "6.0.x"
- name: Install dependencies - name: Install dependencies
run: dotnet restore run: dotnet restore
- name: Build - name: Build

View File

@ -229,7 +229,7 @@ pub(crate) mod test {
use big_s::S; use big_s::S;
use maplit::{btreemap, btreeset}; use maplit::{btreemap, btreeset};
use meilisearch_types::facet_values_sort::FacetValuesSort; use meilisearch_types::facet_values_sort::FacetValuesSort;
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures}; use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::keys::{Action, Key}; use meilisearch_types::keys::{Action, Key};
use meilisearch_types::milli; use meilisearch_types::milli;
@ -455,10 +455,6 @@ pub(crate) mod test {
dump.create_experimental_features(features).unwrap(); dump.create_experimental_features(features).unwrap();
// ========== network
let network = create_test_network();
dump.create_network(network).unwrap();
// create the dump // create the dump
let mut file = tempfile::tempfile().unwrap(); let mut file = tempfile::tempfile().unwrap();
dump.persist_to(&mut file).unwrap(); dump.persist_to(&mut file).unwrap();
@ -471,13 +467,6 @@ pub(crate) mod test {
RuntimeTogglableFeatures::default() RuntimeTogglableFeatures::default()
} }
fn create_test_network() -> Network {
Network {
local: Some("myself".to_string()),
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()) }},
}
}
#[test] #[test]
fn test_creating_and_read_dump() { fn test_creating_and_read_dump() {
let mut file = create_test_dump(); let mut file = create_test_dump();
@ -526,9 +515,5 @@ pub(crate) mod test {
// ==== checking the features // ==== checking the features
let expected = create_test_features(); let expected = create_test_features();
assert_eq!(dump.features().unwrap().unwrap(), expected); assert_eq!(dump.features().unwrap().unwrap(), expected);
// ==== checking the network
let expected = create_test_network();
assert_eq!(&expected, dump.network().unwrap().unwrap());
} }
} }

View File

@ -196,10 +196,6 @@ impl CompatV5ToV6 {
pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> { pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> {
Ok(None) Ok(None)
} }
pub fn network(&self) -> Result<Option<&v6::Network>> {
Ok(None)
}
} }
pub enum CompatIndexV5ToV6 { pub enum CompatIndexV5ToV6 {

View File

@ -23,7 +23,6 @@ mod v6;
pub type Document = serde_json::Map<String, serde_json::Value>; pub type Document = serde_json::Map<String, serde_json::Value>;
pub type UpdateFile = dyn Iterator<Item = Result<Document>>; pub type UpdateFile = dyn Iterator<Item = Result<Document>>;
#[allow(clippy::large_enum_variant)]
pub enum DumpReader { pub enum DumpReader {
Current(V6Reader), Current(V6Reader),
Compat(CompatV5ToV6), Compat(CompatV5ToV6),
@ -115,13 +114,6 @@ impl DumpReader {
DumpReader::Compat(compat) => compat.features(), DumpReader::Compat(compat) => compat.features(),
} }
} }
pub fn network(&self) -> Result<Option<&v6::Network>> {
match self {
DumpReader::Current(current) => Ok(current.network()),
DumpReader::Compat(compat) => compat.network(),
}
}
} }
impl From<V6Reader> for DumpReader { impl From<V6Reader> for DumpReader {
@ -336,7 +328,6 @@ pub(crate) mod test {
} }
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default()); assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
assert_eq!(dump.network().unwrap(), None);
} }
#[test] #[test]
@ -382,27 +373,6 @@ pub(crate) mod test {
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default()); assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
} }
#[test]
fn import_dump_v6_network() {
let dump = File::open("tests/assets/v6-with-network.dump").unwrap();
let dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_snapshot!(dump.date().unwrap(), @"2025-01-29 15:45:32.738676 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
// network
let network = dump.network().unwrap().unwrap();
insta::assert_snapshot!(network.local.as_ref().unwrap(), @"ms-0");
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().url, @"http://localhost:7700");
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().search_api_key.is_none(), @"true");
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().url, @"http://localhost:7701");
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().search_api_key.is_none(), @"true");
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().url, @"http://ms-5679.example.meilisearch.io");
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().search_api_key.as_ref().unwrap(), @"foo");
}
#[test] #[test]
fn import_dump_v5() { fn import_dump_v5() {
let dump = File::open("tests/assets/v5.dump").unwrap(); let dump = File::open("tests/assets/v5.dump").unwrap();

View File

@ -20,7 +20,6 @@ pub type Unchecked = meilisearch_types::settings::Unchecked;
pub type Task = crate::TaskDump; pub type Task = crate::TaskDump;
pub type Key = meilisearch_types::keys::Key; pub type Key = meilisearch_types::keys::Key;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
pub type Network = meilisearch_types::features::Network;
// ===== Other types to clarify the code of the compat module // ===== Other types to clarify the code of the compat module
// everything related to the tasks // everything related to the tasks
@ -51,7 +50,6 @@ pub struct V6Reader {
tasks: BufReader<File>, tasks: BufReader<File>,
keys: BufReader<File>, keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>, features: Option<RuntimeTogglableFeatures>,
network: Option<Network>,
} }
impl V6Reader { impl V6Reader {
@ -80,30 +78,12 @@ impl V6Reader {
None None
}; };
let network_file = match fs::read(dump.path().join("network.json")) {
Ok(network_file) => Some(network_file),
Err(error) => match error.kind() {
// Allows the file to be missing, this will only result in all experimental features disabled.
ErrorKind::NotFound => {
debug!("`network.json` not found in dump");
None
}
_ => return Err(error.into()),
},
};
let network = if let Some(network_file) = network_file {
Some(serde_json::from_reader(&*network_file)?)
} else {
None
};
Ok(V6Reader { Ok(V6Reader {
metadata: serde_json::from_reader(&*meta_file)?, metadata: serde_json::from_reader(&*meta_file)?,
instance_uid, instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features, features,
network,
dump, dump,
}) })
} }
@ -174,10 +154,6 @@ impl V6Reader {
pub fn features(&self) -> Option<RuntimeTogglableFeatures> { pub fn features(&self) -> Option<RuntimeTogglableFeatures> {
self.features self.features
} }
pub fn network(&self) -> Option<&Network> {
self.network.as_ref()
}
} }
pub struct UpdateFile { pub struct UpdateFile {

View File

@ -4,7 +4,7 @@ use std::path::PathBuf;
use flate2::write::GzEncoder; use flate2::write::GzEncoder;
use flate2::Compression; use flate2::Compression;
use meilisearch_types::features::{Network, RuntimeTogglableFeatures}; use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::keys::Key; use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings}; use meilisearch_types::settings::{Checked, Settings};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
@ -61,10 +61,6 @@ impl DumpWriter {
)?) )?)
} }
pub fn create_network(&self, network: Network) -> Result<()> {
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
}
pub fn persist_to(self, mut writer: impl Write) -> Result<()> { pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
let gz_encoder = GzEncoder::new(&mut writer, Compression::default()); let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
let mut tar_encoder = tar::Builder::new(gz_encoder); let mut tar_encoder = tar::Builder::new(gz_encoder);
@ -299,8 +295,7 @@ pub(crate) mod test {
---- experimental-features.json ---- experimental-features.json
---- instance_uid.uuid ---- instance_uid.uuid
---- keys.jsonl ---- keys.jsonl
---- metadata.json ---- metadata.json
---- network.json
"###); "###);
// ==== checking the top level infos // ==== checking the top level infos

View File

@ -326,7 +326,7 @@ fn test_auto_deletion_of_tasks() {
fn test_task_queue_is_full() { fn test_task_queue_is_full() {
let (index_scheduler, mut handle) = IndexScheduler::test_with_custom_config(vec![], |config| { let (index_scheduler, mut handle) = IndexScheduler::test_with_custom_config(vec![], |config| {
// that's the minimum map size possible // that's the minimum map size possible
config.task_db_size = 1048576 * 3; config.task_db_size = 1048576;
None None
}); });

View File

@ -219,8 +219,6 @@ impl IndexScheduler {
progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures);
let features = self.features().runtime_features(); let features = self.features().runtime_features();
dump.create_experimental_features(features)?; dump.create_experimental_features(features)?;
let network = self.network();
dump.create_network(network)?;
let dump_uid = started_at.format(format_description!( let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"

View File

@ -431,13 +431,10 @@ fn import_dump(
keys.push(key); keys.push(key);
} }
// 3. Import the runtime features and network // 3. Import the runtime features.
let features = dump_reader.features()?.unwrap_or_default(); let features = dump_reader.features()?.unwrap_or_default();
index_scheduler.put_runtime_features(features)?; index_scheduler.put_runtime_features(features)?;
let network = dump_reader.network()?.cloned().unwrap_or_default();
index_scheduler.put_network(network)?;
let indexer_config = index_scheduler.indexer_config(); let indexer_config = index_scheduler.indexer_config();
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might

View File

@ -1908,8 +1908,7 @@ async fn import_dump_v6_containing_experimental_features() {
"metrics": false, "metrics": false,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);
@ -2070,8 +2069,7 @@ async fn generate_and_import_dump_containing_vectors() {
"metrics": false, "metrics": false,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);

View File

@ -21,8 +21,7 @@ async fn experimental_features() {
"metrics": false, "metrics": false,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);
@ -34,8 +33,7 @@ async fn experimental_features() {
"metrics": true, "metrics": true,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);
@ -47,8 +45,7 @@ async fn experimental_features() {
"metrics": true, "metrics": true,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);
@ -61,8 +58,7 @@ async fn experimental_features() {
"metrics": true, "metrics": true,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);
@ -75,8 +71,7 @@ async fn experimental_features() {
"metrics": true, "metrics": true,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);
} }
@ -96,8 +91,7 @@ async fn experimental_feature_metrics() {
"metrics": true, "metrics": true,
"logsRoute": false, "logsRoute": false,
"editDocumentsByFunction": false, "editDocumentsByFunction": false,
"containsFilter": false, "containsFilter": false
"proxySearch": false
} }
"###); "###);
@ -152,7 +146,7 @@ async fn errors() {
meili_snap::snapshot!(code, @"400 Bad Request"); meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###" meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{ {
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `proxySearch`", "message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`",
"code": "bad_request", "code": "bad_request",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request" "link": "https://docs.meilisearch.com/errors#bad_request"

View File

@ -5,8 +5,6 @@ use std::marker::PhantomData;
use std::mem; use std::mem;
use std::num::NonZeroU16; use std::num::NonZeroU16;
use std::ops::Range; use std::ops::Range;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameProducer}; use bbqueue::framed::{FrameGrantR, FrameProducer};
@ -73,23 +71,12 @@ pub fn extractor_writer_bbqueue(
consumer consumer
}); });
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let (sender, receiver) = flume::bounded(channel_capacity); let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { let sender = ExtractorBbqueueSender { sender, producers, max_grant };
sender,
producers,
max_grant,
sent_messages_attempts: sent_messages_attempts.clone(),
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
};
let receiver = WriterBbqueueReceiver { let receiver = WriterBbqueueReceiver {
receiver, receiver,
look_at_consumer: (0..consumers.len()).cycle(), look_at_consumer: (0..consumers.len()).cycle(),
consumers, consumers,
sent_messages_attempts,
blocking_sent_messages_attempts,
}; };
(sender, receiver) (sender, receiver)
} }
@ -105,12 +92,6 @@ pub struct ExtractorBbqueueSender<'a> {
/// It will never be able to store more than that as the /// It will never be able to store more than that as the
/// buffer cannot split data into two parts. /// buffer cannot split data into two parts.
max_grant: usize, max_grant: usize,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// messages failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
} }
pub struct WriterBbqueueReceiver<'a> { pub struct WriterBbqueueReceiver<'a> {
@ -123,12 +104,6 @@ pub struct WriterBbqueueReceiver<'a> {
look_at_consumer: Cycle<Range<usize>>, look_at_consumer: Cycle<Range<usize>>,
/// The BBQueue frames to read when waking-up. /// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>, consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// message failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
} }
/// The action to perform on the receiver/writer side. /// The action to perform on the receiver/writer side.
@ -194,16 +169,6 @@ impl<'a> WriterBbqueueReceiver<'a> {
} }
None None
} }
/// Returns the total count of attempts to send messages through the BBQueue channel.
pub fn sent_messages_attempts(&self) -> usize {
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
pub fn blocking_sent_messages_attempts(&self) -> usize {
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
} }
pub struct FrameWithHeader<'a> { pub struct FrameWithHeader<'a> {
@ -493,17 +458,10 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant( reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
payload_header.serialize_into(grant); payload_header.serialize_into(grant);
Ok(()) Ok(())
}, })?;
)?;
Ok(()) Ok(())
} }
@ -542,28 +500,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant( reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size(); let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
if dimensions != 0 { if dimensions != 0 {
let output_iter = let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) { for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding)); output.copy_from_slice(bytemuck::cast_slice(embedding));
} }
} }
Ok(()) Ok(())
}, })?;
)?;
Ok(()) Ok(())
} }
@ -621,20 +571,13 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant( reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size(); let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer) key_value_writer(key_buffer, value_buffer)
}, })?;
)?;
Ok(()) Ok(())
} }
@ -676,19 +619,12 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant( reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size(); let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
key_writer(remaining) key_writer(remaining)
}, })?;
)?;
Ok(()) Ok(())
} }
@ -701,18 +637,12 @@ fn reserve_and_write_grant<F>(
producer: &mut FrameProducer, producer: &mut FrameProducer,
total_length: usize, total_length: usize,
sender: &flume::Sender<ReceiverAction>, sender: &flume::Sender<ReceiverAction>,
sent_messages_attempts: &AtomicUsize,
blocking_sent_messages_attempts: &AtomicUsize,
f: F, f: F,
) -> crate::Result<()> ) -> crate::Result<()>
where where
F: FnOnce(&mut [u8]) -> crate::Result<()>, F: FnOnce(&mut [u8]) -> crate::Result<()>,
{ {
loop { loop {
// An attempt means trying multiple times
// whether is succeeded or not.
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
for _ in 0..10_000 { for _ in 0..10_000 {
match producer.grant(total_length) { match producer.grant(total_length) {
Ok(mut grant) => { Ok(mut grant) => {
@ -736,10 +666,6 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation)); return Err(Error::InternalError(InternalError::AbortedIndexation));
} }
// We made an attempt to send a message in the
// bbqueue channel but it didn't succeed.
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
// We prefer to yield and allow the writing thread // We prefer to yield and allow the writing thread
// to do its job, especially beneficial when there // to do its job, especially beneficial when there
// is only one CPU core available. // is only one CPU core available.

View File

@ -21,7 +21,6 @@ use crate::progress::Progress;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
use std::sync::Once;
pub(crate) mod de; pub(crate) mod de;
pub mod document_changes; pub mod document_changes;
@ -34,8 +33,6 @@ mod post_processing;
mod update_by_function; mod update_by_function;
mod write; mod write;
static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
/// This is the main function of this crate. /// This is the main function of this crate.
/// ///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
@ -96,15 +93,6 @@ where
}, },
); );
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (extractor_sender, writer_receiver) = pool let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap(); .unwrap();

View File

@ -72,19 +72,7 @@ pub(super) fn write_to_db(
&mut aligned_embedding, &mut aligned_embedding,
)?; )?;
} }
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?; write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
let direct_attempts = writer_receiver.sent_messages_attempts();
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
tracing::debug!(
"Channel congestion metrics - \
Attempts: {direct_attempts}, \
Blocked attempts: {blocking_attempts} \
({congestion_pct:.1}% congestion)"
);
Ok(()) Ok(())
} }