mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-02-24 03:25:43 +08:00
Compare commits
12 Commits
fd1fc6a1f7
...
88d2c9b652
Author | SHA1 | Date | |
---|---|---|---|
|
88d2c9b652 | ||
|
b2abef6522 | ||
|
e8c4ea5de2 | ||
|
4224edea28 | ||
|
cb1b7513af | ||
|
2f89b8209f | ||
|
a9d0f4a002 | ||
|
db032079d8 | ||
|
a00796c46a | ||
|
6112bd8caa | ||
|
cec88cfc29 | ||
|
f0d7ab81ad |
2
.github/workflows/sdks-tests.yml
vendored
2
.github/workflows/sdks-tests.yml
vendored
@ -52,7 +52,7 @@ jobs:
|
||||
- name: Setup .NET Core
|
||||
uses: actions/setup-dotnet@v4
|
||||
with:
|
||||
dotnet-version: "6.0.x"
|
||||
dotnet-version: "8.0.x"
|
||||
- name: Install dependencies
|
||||
run: dotnet restore
|
||||
- name: Build
|
||||
|
@ -229,7 +229,7 @@ pub(crate) mod test {
|
||||
use big_s::S;
|
||||
use maplit::{btreemap, btreeset};
|
||||
use meilisearch_types::facet_values_sort::FacetValuesSort;
|
||||
use meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::keys::{Action, Key};
|
||||
use meilisearch_types::milli;
|
||||
@ -455,6 +455,10 @@ pub(crate) mod test {
|
||||
|
||||
dump.create_experimental_features(features).unwrap();
|
||||
|
||||
// ========== network
|
||||
let network = create_test_network();
|
||||
dump.create_network(network).unwrap();
|
||||
|
||||
// create the dump
|
||||
let mut file = tempfile::tempfile().unwrap();
|
||||
dump.persist_to(&mut file).unwrap();
|
||||
@ -467,6 +471,13 @@ pub(crate) mod test {
|
||||
RuntimeTogglableFeatures::default()
|
||||
}
|
||||
|
||||
fn create_test_network() -> Network {
|
||||
Network {
|
||||
local: Some("myself".to_string()),
|
||||
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()) }},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_creating_and_read_dump() {
|
||||
let mut file = create_test_dump();
|
||||
@ -515,5 +526,9 @@ pub(crate) mod test {
|
||||
// ==== checking the features
|
||||
let expected = create_test_features();
|
||||
assert_eq!(dump.features().unwrap().unwrap(), expected);
|
||||
|
||||
// ==== checking the network
|
||||
let expected = create_test_network();
|
||||
assert_eq!(&expected, dump.network().unwrap().unwrap());
|
||||
}
|
||||
}
|
||||
|
@ -196,6 +196,10 @@ impl CompatV5ToV6 {
|
||||
pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn network(&self) -> Result<Option<&v6::Network>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum CompatIndexV5ToV6 {
|
||||
|
@ -23,6 +23,7 @@ mod v6;
|
||||
pub type Document = serde_json::Map<String, serde_json::Value>;
|
||||
pub type UpdateFile = dyn Iterator<Item = Result<Document>>;
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum DumpReader {
|
||||
Current(V6Reader),
|
||||
Compat(CompatV5ToV6),
|
||||
@ -114,6 +115,13 @@ impl DumpReader {
|
||||
DumpReader::Compat(compat) => compat.features(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn network(&self) -> Result<Option<&v6::Network>> {
|
||||
match self {
|
||||
DumpReader::Current(current) => Ok(current.network()),
|
||||
DumpReader::Compat(compat) => compat.network(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<V6Reader> for DumpReader {
|
||||
@ -328,6 +336,7 @@ pub(crate) mod test {
|
||||
}
|
||||
|
||||
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
|
||||
assert_eq!(dump.network().unwrap(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -373,6 +382,27 @@ pub(crate) mod test {
|
||||
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_dump_v6_network() {
|
||||
let dump = File::open("tests/assets/v6-with-network.dump").unwrap();
|
||||
let dump = DumpReader::open(dump).unwrap();
|
||||
|
||||
// top level infos
|
||||
insta::assert_snapshot!(dump.date().unwrap(), @"2025-01-29 15:45:32.738676 +00:00:00");
|
||||
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
|
||||
|
||||
// network
|
||||
|
||||
let network = dump.network().unwrap().unwrap();
|
||||
insta::assert_snapshot!(network.local.as_ref().unwrap(), @"ms-0");
|
||||
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().url, @"http://localhost:7700");
|
||||
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().search_api_key.is_none(), @"true");
|
||||
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().url, @"http://localhost:7701");
|
||||
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().search_api_key.is_none(), @"true");
|
||||
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().url, @"http://ms-5679.example.meilisearch.io");
|
||||
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().search_api_key.as_ref().unwrap(), @"foo");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_dump_v5() {
|
||||
let dump = File::open("tests/assets/v5.dump").unwrap();
|
||||
|
@ -20,6 +20,7 @@ pub type Unchecked = meilisearch_types::settings::Unchecked;
|
||||
pub type Task = crate::TaskDump;
|
||||
pub type Key = meilisearch_types::keys::Key;
|
||||
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
pub type Network = meilisearch_types::features::Network;
|
||||
|
||||
// ===== Other types to clarify the code of the compat module
|
||||
// everything related to the tasks
|
||||
@ -50,6 +51,7 @@ pub struct V6Reader {
|
||||
tasks: BufReader<File>,
|
||||
keys: BufReader<File>,
|
||||
features: Option<RuntimeTogglableFeatures>,
|
||||
network: Option<Network>,
|
||||
}
|
||||
|
||||
impl V6Reader {
|
||||
@ -78,12 +80,30 @@ impl V6Reader {
|
||||
None
|
||||
};
|
||||
|
||||
let network_file = match fs::read(dump.path().join("network.json")) {
|
||||
Ok(network_file) => Some(network_file),
|
||||
Err(error) => match error.kind() {
|
||||
// Allows the file to be missing, this will only result in all experimental features disabled.
|
||||
ErrorKind::NotFound => {
|
||||
debug!("`network.json` not found in dump");
|
||||
None
|
||||
}
|
||||
_ => return Err(error.into()),
|
||||
},
|
||||
};
|
||||
let network = if let Some(network_file) = network_file {
|
||||
Some(serde_json::from_reader(&*network_file)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(V6Reader {
|
||||
metadata: serde_json::from_reader(&*meta_file)?,
|
||||
instance_uid,
|
||||
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
|
||||
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
|
||||
features,
|
||||
network,
|
||||
dump,
|
||||
})
|
||||
}
|
||||
@ -154,6 +174,10 @@ impl V6Reader {
|
||||
pub fn features(&self) -> Option<RuntimeTogglableFeatures> {
|
||||
self.features
|
||||
}
|
||||
|
||||
pub fn network(&self) -> Option<&Network> {
|
||||
self.network.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UpdateFile {
|
||||
|
@ -4,7 +4,7 @@ use std::path::PathBuf;
|
||||
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
use meilisearch_types::features::{Network, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::keys::Key;
|
||||
use meilisearch_types::settings::{Checked, Settings};
|
||||
use serde_json::{Map, Value};
|
||||
@ -61,6 +61,10 @@ impl DumpWriter {
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn create_network(&self, network: Network) -> Result<()> {
|
||||
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
|
||||
}
|
||||
|
||||
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
|
||||
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
|
||||
let mut tar_encoder = tar::Builder::new(gz_encoder);
|
||||
@ -295,7 +299,8 @@ pub(crate) mod test {
|
||||
├---- experimental-features.json
|
||||
├---- instance_uid.uuid
|
||||
├---- keys.jsonl
|
||||
└---- metadata.json
|
||||
├---- metadata.json
|
||||
└---- network.json
|
||||
"###);
|
||||
|
||||
// ==== checking the top level infos
|
||||
|
@ -326,7 +326,7 @@ fn test_auto_deletion_of_tasks() {
|
||||
fn test_task_queue_is_full() {
|
||||
let (index_scheduler, mut handle) = IndexScheduler::test_with_custom_config(vec![], |config| {
|
||||
// that's the minimum map size possible
|
||||
config.task_db_size = 1048576;
|
||||
config.task_db_size = 1048576 * 3;
|
||||
None
|
||||
});
|
||||
|
||||
|
@ -219,6 +219,8 @@ impl IndexScheduler {
|
||||
progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures);
|
||||
let features = self.features().runtime_features();
|
||||
dump.create_experimental_features(features)?;
|
||||
let network = self.network();
|
||||
dump.create_network(network)?;
|
||||
|
||||
let dump_uid = started_at.format(format_description!(
|
||||
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
|
||||
|
@ -431,10 +431,13 @@ fn import_dump(
|
||||
keys.push(key);
|
||||
}
|
||||
|
||||
// 3. Import the runtime features.
|
||||
// 3. Import the runtime features and network
|
||||
let features = dump_reader.features()?.unwrap_or_default();
|
||||
index_scheduler.put_runtime_features(features)?;
|
||||
|
||||
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
||||
index_scheduler.put_network(network)?;
|
||||
|
||||
let indexer_config = index_scheduler.indexer_config();
|
||||
|
||||
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||
|
@ -1908,7 +1908,8 @@ async fn import_dump_v6_containing_experimental_features() {
|
||||
"metrics": false,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -2069,7 +2070,8 @@ async fn generate_and_import_dump_containing_vectors() {
|
||||
"metrics": false,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
|
||||
|
@ -21,7 +21,8 @@ async fn experimental_features() {
|
||||
"metrics": false,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -33,7 +34,8 @@ async fn experimental_features() {
|
||||
"metrics": true,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -45,7 +47,8 @@ async fn experimental_features() {
|
||||
"metrics": true,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -58,7 +61,8 @@ async fn experimental_features() {
|
||||
"metrics": true,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -71,7 +75,8 @@ async fn experimental_features() {
|
||||
"metrics": true,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
}
|
||||
@ -91,7 +96,8 @@ async fn experimental_feature_metrics() {
|
||||
"metrics": true,
|
||||
"logsRoute": false,
|
||||
"editDocumentsByFunction": false,
|
||||
"containsFilter": false
|
||||
"containsFilter": false,
|
||||
"proxySearch": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -146,7 +152,7 @@ async fn errors() {
|
||||
meili_snap::snapshot!(code, @"400 Bad Request");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`",
|
||||
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `proxySearch`",
|
||||
"code": "bad_request",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||
|
@ -5,6 +5,8 @@ use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use std::num::NonZeroU16;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use bbqueue::framed::{FrameGrantR, FrameProducer};
|
||||
@ -71,12 +73,23 @@ pub fn extractor_writer_bbqueue(
|
||||
consumer
|
||||
});
|
||||
|
||||
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
|
||||
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let (sender, receiver) = flume::bounded(channel_capacity);
|
||||
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
|
||||
let sender = ExtractorBbqueueSender {
|
||||
sender,
|
||||
producers,
|
||||
max_grant,
|
||||
sent_messages_attempts: sent_messages_attempts.clone(),
|
||||
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
|
||||
};
|
||||
let receiver = WriterBbqueueReceiver {
|
||||
receiver,
|
||||
look_at_consumer: (0..consumers.len()).cycle(),
|
||||
consumers,
|
||||
sent_messages_attempts,
|
||||
blocking_sent_messages_attempts,
|
||||
};
|
||||
(sender, receiver)
|
||||
}
|
||||
@ -92,6 +105,12 @@ pub struct ExtractorBbqueueSender<'a> {
|
||||
/// It will never be able to store more than that as the
|
||||
/// buffer cannot split data into two parts.
|
||||
max_grant: usize,
|
||||
/// The total number of attempts to send messages
|
||||
/// over the bbqueue channel.
|
||||
sent_messages_attempts: Arc<AtomicUsize>,
|
||||
/// The number of times an attempt to send a
|
||||
/// messages failed and we had to pause for a bit.
|
||||
blocking_sent_messages_attempts: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
pub struct WriterBbqueueReceiver<'a> {
|
||||
@ -104,6 +123,12 @@ pub struct WriterBbqueueReceiver<'a> {
|
||||
look_at_consumer: Cycle<Range<usize>>,
|
||||
/// The BBQueue frames to read when waking-up.
|
||||
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
|
||||
/// The total number of attempts to send messages
|
||||
/// over the bbqueue channel.
|
||||
sent_messages_attempts: Arc<AtomicUsize>,
|
||||
/// The number of times an attempt to send a
|
||||
/// message failed and we had to pause for a bit.
|
||||
blocking_sent_messages_attempts: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
/// The action to perform on the receiver/writer side.
|
||||
@ -169,6 +194,16 @@ impl<'a> WriterBbqueueReceiver<'a> {
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns the total count of attempts to send messages through the BBQueue channel.
|
||||
pub fn sent_messages_attempts(&self) -> usize {
|
||||
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
|
||||
pub fn blocking_sent_messages_attempts(&self) -> usize {
|
||||
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrameWithHeader<'a> {
|
||||
@ -458,10 +493,17 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
||||
payload_header.serialize_into(grant);
|
||||
Ok(())
|
||||
})?;
|
||||
reserve_and_write_grant(
|
||||
&mut producer,
|
||||
total_length,
|
||||
&self.sender,
|
||||
&self.sent_messages_attempts,
|
||||
&self.blocking_sent_messages_attempts,
|
||||
|grant| {
|
||||
payload_header.serialize_into(grant);
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -500,20 +542,28 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
reserve_and_write_grant(
|
||||
&mut producer,
|
||||
total_length,
|
||||
&self.sender,
|
||||
&self.sent_messages_attempts,
|
||||
&self.blocking_sent_messages_attempts,
|
||||
|grant| {
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
|
||||
if dimensions != 0 {
|
||||
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
|
||||
for (embedding, output) in embeddings.iter().zip(output_iter) {
|
||||
output.copy_from_slice(bytemuck::cast_slice(embedding));
|
||||
if dimensions != 0 {
|
||||
let output_iter =
|
||||
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
|
||||
for (embedding, output) in embeddings.iter().zip(output_iter) {
|
||||
output.copy_from_slice(bytemuck::cast_slice(embedding));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -571,13 +621,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
|
||||
key_value_writer(key_buffer, value_buffer)
|
||||
})?;
|
||||
reserve_and_write_grant(
|
||||
&mut producer,
|
||||
total_length,
|
||||
&self.sender,
|
||||
&self.sent_messages_attempts,
|
||||
&self.blocking_sent_messages_attempts,
|
||||
|grant| {
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
|
||||
key_value_writer(key_buffer, value_buffer)
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -619,12 +676,19 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
key_writer(remaining)
|
||||
})?;
|
||||
reserve_and_write_grant(
|
||||
&mut producer,
|
||||
total_length,
|
||||
&self.sender,
|
||||
&self.sent_messages_attempts,
|
||||
&self.blocking_sent_messages_attempts,
|
||||
|grant| {
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
key_writer(remaining)
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -637,12 +701,18 @@ fn reserve_and_write_grant<F>(
|
||||
producer: &mut FrameProducer,
|
||||
total_length: usize,
|
||||
sender: &flume::Sender<ReceiverAction>,
|
||||
sent_messages_attempts: &AtomicUsize,
|
||||
blocking_sent_messages_attempts: &AtomicUsize,
|
||||
f: F,
|
||||
) -> crate::Result<()>
|
||||
where
|
||||
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
||||
{
|
||||
loop {
|
||||
// An attempt means trying multiple times
|
||||
// whether is succeeded or not.
|
||||
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
|
||||
for _ in 0..10_000 {
|
||||
match producer.grant(total_length) {
|
||||
Ok(mut grant) => {
|
||||
@ -666,6 +736,10 @@ where
|
||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||
}
|
||||
|
||||
// We made an attempt to send a message in the
|
||||
// bbqueue channel but it didn't succeed.
|
||||
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
|
||||
// We prefer to yield and allow the writing thread
|
||||
// to do its job, especially beneficial when there
|
||||
// is only one CPU core available.
|
||||
|
@ -21,6 +21,7 @@ use crate::progress::Progress;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
|
||||
use std::sync::Once;
|
||||
|
||||
pub(crate) mod de;
|
||||
pub mod document_changes;
|
||||
@ -33,6 +34,8 @@ mod post_processing;
|
||||
mod update_by_function;
|
||||
mod write;
|
||||
|
||||
static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
|
||||
|
||||
/// This is the main function of this crate.
|
||||
///
|
||||
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
|
||||
@ -93,6 +96,15 @@ where
|
||||
},
|
||||
);
|
||||
|
||||
LOG_MEMORY_METRICS_ONCE.call_once(|| {
|
||||
tracing::debug!(
|
||||
"Indexation allocated memory metrics - \
|
||||
Total BBQueue size: {total_bbbuffer_capacity}, \
|
||||
Total extractor memory: {:?}",
|
||||
grenad_parameters.max_memory,
|
||||
);
|
||||
});
|
||||
|
||||
let (extractor_sender, writer_receiver) = pool
|
||||
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
||||
.unwrap();
|
||||
|
@ -72,7 +72,19 @@ pub(super) fn write_to_db(
|
||||
&mut aligned_embedding,
|
||||
)?;
|
||||
}
|
||||
|
||||
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
|
||||
|
||||
let direct_attempts = writer_receiver.sent_messages_attempts();
|
||||
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
|
||||
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
|
||||
tracing::debug!(
|
||||
"Channel congestion metrics - \
|
||||
Attempts: {direct_attempts}, \
|
||||
Blocked attempts: {blocking_attempts} \
|
||||
({congestion_pct:.1}% congestion)"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user