Merge #5356
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 1s
Test suite / Test with Ollama (push) Failing after 12s
Test suite / Tests on ubuntu-20.04 (push) Failing after 19s
Test suite / Run Rustfmt (push) Failing after 17s
Test suite / Run Clippy (push) Successful in 9m21s
Run the indexing fuzzer / Setup the action (push) Successful in 1h8m47s
Test suite / Tests on macos-13 (push) Has been cancelled
Test suite / Tests on windows-2022 (push) Has been cancelled
Indexing bench (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of indexing (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for geo (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for songs (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for Wikipedia articles (push) / Run and upload benchmarks (push) Has been cancelled

5356: Display the internal indexing steps with timings on the `/batches` route r=irevoire a=Kerollmops

This PR computes the durations of each step, stores them in a map, and prints them (for now).

```
"callTrace": {
    "processing tasks > retrieving config": "185.38µs",
    "processing tasks > computing document changes > preparing update file > payload": "23.11ms",
    "processing tasks > computing document changes > preparing update file": "23.26ms",
    "processing tasks > computing document changes": "24.06ms",
    "processing tasks > indexing > extracting documents > document": "15.13ms",
    "processing tasks > indexing > extracting documents": "15.13ms",
    "processing tasks > indexing > extracting facets > document": "5.70ms",
    "processing tasks > indexing > extracting facets": "5.72ms",
    "processing tasks > indexing > extracting words > document": "597.24ms",
    "processing tasks > indexing > extracting words": "597.25ms",
    "processing tasks > indexing > extracting word proximity > document": "1.14s",
    "processing tasks > indexing > extracting word proximity": "1.15s",
    "processing tasks > indexing > tail writing to database": "430.91ms",
    "processing tasks > indexing > waiting for extractors": "52.54µs",
    "processing tasks > indexing > writing embeddings to database": "47.79µs",
    "processing tasks > indexing > post-processing facets": "476.04µs",
    "processing tasks > indexing > post-processing words": "97.82ms",
    "processing tasks > indexing > finalizing": "67.41ms",
    "processing tasks > indexing": "2.40s",
    "processing tasks": "2.43s",
    "writing tasks to disk > task": "37.71µs",
    "writing tasks to disk": "67.13µs"
},
"writeChannelCongestion": {
    "attempts": 2608482,
    "blocking_attempts": 0,
    "blocking_ratio": 0.0
}
```

## To Do
- [x] Update the batches PRD + delivery + tracking issue.
- [x] Store that in the batches to be visible from the `/batches` route.
- [x] Display the writer's congestion.
- [x] Display the info back in the logs too.
- [ ] (optional) Compute the size of each database by [using LMDB](https://docs.rs/heed/latest/heed/struct.DatabaseStat.html).
- [x] Push them in reverse order so that "processing task" is after the other sub-steps.


Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-02-20 17:38:50 +00:00 committed by GitHub
commit ea7bae9a71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 498 additions and 205 deletions

4
Cargo.lock generated
View File

@ -5160,9 +5160,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.135"
version = "1.0.138"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9"
checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949"
dependencies = [
"indexmap",
"itoa",

View File

@ -321,6 +321,8 @@ pub(crate) mod test {
status: maplit::btreemap! { Status::Succeeded => 1 },
types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 1 },
index_uids: maplit::btreemap! { "doggo".to_string() => 1 },
call_trace: Default::default(),
write_channel_congestion: None,
},
enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC),

View File

@ -29,7 +29,7 @@ page_size = "0.6.0"
rayon = "1.10.0"
roaring = { version = "0.10.10", features = ["serde"] }
serde = { version = "1.0.217", features = ["derive"] }
serde_json = { version = "1.0.135", features = ["preserve_order"] }
serde_json = { version = "1.0.138", features = ["preserve_order"] }
synchronoise = "1.0.1"
tempfile = "3.15.0"
thiserror = "2.0.9"

View File

@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use std::fmt::Write;
use meilisearch_types::batches::{Batch, BatchEnqueuedAt};
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats};
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
@ -342,6 +342,11 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database<BEU32, RoaringBitmapCodec
pub fn snapshot_batch(batch: &Batch) -> String {
let mut snap = String::new();
let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch;
let stats = BatchStats {
call_trace: Default::default(),
write_channel_congestion: None,
..stats.clone()
};
if let Some(finished_at) = finished_at {
assert!(finished_at > started_at);
}
@ -352,7 +357,7 @@ pub fn snapshot_batch(batch: &Batch) -> String {
snap.push('{');
snap.push_str(&format!("uid: {uid}, "));
snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap()));
snap.push_str(&format!("stats: {}, ", serde_json::to_string(stats).unwrap()));
snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap()));
snap.push('}');
snap
}

View File

@ -215,14 +215,16 @@ impl IndexScheduler {
let mut stop_scheduler_forever = false;
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new();
let mut congestion = None;
match res {
Ok(tasks) => {
Ok((tasks, cong)) => {
#[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
progress.update_progress(task_progress_obj);
congestion = cong;
let mut success = 0;
let mut failure = 0;
let mut canceled_by = None;
@ -339,6 +341,28 @@ impl IndexScheduler {
// We must re-add the canceled task so they're part of the same batch.
ids |= canceled;
processing_batch.stats.call_trace =
progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect();
processing_batch.stats.write_channel_congestion = congestion.map(|congestion| {
let mut congestion_info = serde_json::Map::new();
congestion_info.insert("attempts".into(), congestion.attempts.into());
congestion_info.insert("blocking_attempts".into(), congestion.blocking_attempts.into());
congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into());
congestion_info
});
if let Some(congestion) = congestion {
tracing::debug!(
"Channel congestion metrics - Attempts: {}, Blocked attempts: {} ({:.1}% congestion)",
congestion.attempts,
congestion.blocking_attempts,
congestion.congestion_ratio(),
);
}
tracing::debug!("call trace: {:?}", progress.accumulated_durations());
self.queue.write_batch(&mut wtxn, processing_batch, &ids)?;
#[cfg(test)]

View File

@ -5,7 +5,7 @@ use std::sync::atomic::Ordering;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self};
use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task};
use milli::update::Settings as MilliSettings;
use roaring::RoaringBitmap;
@ -35,7 +35,7 @@ impl IndexScheduler {
batch: Batch,
current_batch: &mut ProcessingBatch,
progress: Progress,
) -> Result<Vec<Task>> {
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
#[cfg(test)]
{
self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?;
@ -76,7 +76,7 @@ impl IndexScheduler {
canceled_tasks.push(task);
Ok(canceled_tasks)
Ok((canceled_tasks, None))
}
Batch::TaskDeletions(mut tasks) => {
// 1. Retrieve the tasks that matched the query at enqueue-time.
@ -115,10 +115,14 @@ impl IndexScheduler {
_ => unreachable!(),
}
}
Ok(tasks)
Ok((tasks, None))
}
Batch::SnapshotCreation(tasks) => {
self.process_snapshot(progress, tasks).map(|tasks| (tasks, None))
}
Batch::Dump(task) => {
self.process_dump_creation(progress, task).map(|tasks| (tasks, None))
}
Batch::SnapshotCreation(tasks) => self.process_snapshot(progress, tasks),
Batch::Dump(task) => self.process_dump_creation(progress, task),
Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid().to_string();
let index = if must_create_index {
@ -135,7 +139,8 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op, progress)?;
let (tasks, congestion) =
self.apply_index_operation(&mut index_wtxn, &index, op, progress)?;
{
let span = tracing::trace_span!(target: "indexing::scheduler", "commit");
@ -166,7 +171,7 @@ impl IndexScheduler {
),
}
Ok(tasks)
Ok((tasks, congestion))
}
Batch::IndexCreation { index_uid, primary_key, task } => {
progress.update_progress(CreateIndexProgress::CreatingTheIndex);
@ -234,7 +239,7 @@ impl IndexScheduler {
),
}
Ok(vec![task])
Ok((vec![task], None))
}
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
progress.update_progress(DeleteIndexProgress::DeletingTheIndex);
@ -268,7 +273,7 @@ impl IndexScheduler {
};
}
Ok(tasks)
Ok((tasks, None))
}
Batch::IndexSwap { mut task } => {
progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap);
@ -316,7 +321,7 @@ impl IndexScheduler {
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok(vec![task])
Ok((vec![task], None))
}
Batch::UpgradeDatabase { mut tasks } => {
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
@ -346,7 +351,7 @@ impl IndexScheduler {
task.error = None;
}
Ok(tasks)
Ok((tasks, None))
}
}
}

View File

@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder};
use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder};
use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::Index;
@ -33,9 +33,8 @@ impl IndexScheduler {
index: &'i Index,
operation: IndexOperation,
progress: Progress,
) -> Result<Vec<Task>> {
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
let indexer_alloc = Bump::new();
let started_processing_at = std::time::Instant::now();
let must_stop_processing = self.scheduler.must_stop_processing.clone();
@ -60,7 +59,7 @@ impl IndexScheduler {
};
}
Ok(tasks)
Ok((tasks, None))
}
IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
@ -173,21 +172,24 @@ impl IndexScheduler {
}
progress.update_progress(DocumentOperationProgress::Indexing);
let mut congestion = None;
if tasks.iter().any(|res| res.error.is_none()) {
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
congestion = Some(
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
);
let addition = DocumentAdditionResult {
indexed_documents: candidates_count,
@ -199,7 +201,7 @@ impl IndexScheduler {
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}
Ok(tasks)
Ok((tasks, congestion))
}
IndexOperation::DocumentEdition { index_uid, mut task } => {
progress.update_progress(DocumentEditionProgress::RetrievingConfig);
@ -247,7 +249,7 @@ impl IndexScheduler {
edited_documents: Some(0),
});
return Ok(vec![task]);
return Ok((vec![task], None));
}
let rtxn = index.read_txn()?;
@ -262,6 +264,7 @@ impl IndexScheduler {
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
let mut congestion = None;
if task.error.is_none() {
let local_pool;
let indexer_config = self.index_mapper.indexer_config();
@ -292,20 +295,22 @@ impl IndexScheduler {
let embedders = self.embedders(index_uid.clone(), embedders)?;
progress.update_progress(DocumentEditionProgress::Indexing);
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
congestion = Some(
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
let addition = DocumentAdditionResult {
indexed_documents: candidates_count,
@ -341,7 +346,7 @@ impl IndexScheduler {
}
}
Ok(vec![task])
Ok((vec![task], congestion))
}
IndexOperation::DocumentDeletion { mut tasks, index_uid } => {
progress.update_progress(DocumentDeletionProgress::RetrievingConfig);
@ -408,7 +413,7 @@ impl IndexScheduler {
}
if to_delete.is_empty() {
return Ok(tasks);
return Ok((tasks, None));
}
let rtxn = index.read_txn()?;
@ -422,6 +427,7 @@ impl IndexScheduler {
PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
.map_err(|err| Error::from_milli(err.into(), Some(index_uid.clone())))?;
let mut congestion = None;
if !tasks.iter().all(|res| res.error.is_some()) {
let local_pool;
let indexer_config = self.index_mapper.indexer_config();
@ -447,20 +453,22 @@ impl IndexScheduler {
let embedders = self.embedders(index_uid.clone(), embedders)?;
progress.update_progress(DocumentDeletionProgress::Indexing);
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // document deletion never changes primary key
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
congestion = Some(
indexer::index(
index_wtxn,
index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // document deletion never changes primary key
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
let addition = DocumentAdditionResult {
indexed_documents: candidates_count,
@ -472,7 +480,7 @@ impl IndexScheduler {
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}
Ok(tasks)
Ok((tasks, congestion))
}
IndexOperation::Settings { index_uid, settings, mut tasks } => {
progress.update_progress(SettingsProgress::RetrievingAndMergingTheSettings);
@ -497,7 +505,7 @@ impl IndexScheduler {
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
Ok(tasks)
Ok((tasks, None))
}
IndexOperation::DocumentClearAndSetting {
index_uid,
@ -505,7 +513,7 @@ impl IndexScheduler {
settings,
settings_tasks,
} => {
let mut import_tasks = self.apply_index_operation(
let (mut import_tasks, _congestion) = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::DocumentClear {
@ -515,7 +523,7 @@ impl IndexScheduler {
progress.clone(),
)?;
let settings_tasks = self.apply_index_operation(
let (settings_tasks, _congestion) = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
@ -524,7 +532,7 @@ impl IndexScheduler {
let mut tasks = settings_tasks;
tasks.append(&mut import_tasks);
Ok(tasks)
Ok((tasks, None))
}
}
}

View File

@ -14,6 +14,7 @@ license.workspace = true
actix-web = { version = "4.9.0", default-features = false }
anyhow = "1.0.95"
bumpalo = "3.16.0"
bumparaw-collections = "0.1.4"
convert_case = "0.6.0"
csv = "1.3.1"
deserr = { version = "0.6.3", features = ["actix-web"] }
@ -24,12 +25,11 @@ flate2 = "1.0.35"
fst = "0.4.7"
memmap2 = "0.9.5"
milli = { path = "../milli" }
bumparaw-collections = "0.1.4"
roaring = { version = "0.10.10", features = ["serde"] }
rustc-hash = "2.1.0"
serde = { version = "1.0.217", features = ["derive"] }
serde-cs = "0.2.4"
serde_json = "1.0.135"
serde_json = { version = "1.0.135", features = ["preserve_order"] }
tar = "0.4.43"
tempfile = "3.15.0"
thiserror = "2.0.9"

View File

@ -60,4 +60,8 @@ pub struct BatchStats {
pub status: BTreeMap<Status, u32>,
pub types: BTreeMap<Kind, u32>,
pub index_uids: BTreeMap<String, u32>,
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
pub call_trace: serde_json::Map<String, serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub write_channel_congestion: Option<serde_json::Map<String, serde_json::Value>>,
}

View File

@ -275,8 +275,15 @@ async fn test_summarized_document_addition_or_update() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -294,21 +301,30 @@ async fn test_summarized_document_addition_or_update() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]",
"writeChannelCongestion": "[writeChannelCongestion]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
let (task, _status_code) =
index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), Some("id")).await;
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(1).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 1,
"progress": null,
@ -326,13 +342,15 @@ async fn test_summarized_document_addition_or_update() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]",
"writeChannelCongestion": "[writeChannelCongestion]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -343,8 +361,15 @@ async fn test_summarized_delete_documents_by_batch() {
index.wait_task(task.uid()).await.failed();
let (batch, _) = index.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -362,21 +387,29 @@ async fn test_summarized_delete_documents_by_batch() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
index.create(None).await;
let (task, _status_code) = index.delete_batch(vec![42]).await;
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(2).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 2,
"progress": null,
@ -394,13 +427,14 @@ async fn test_summarized_delete_documents_by_batch() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -413,8 +447,15 @@ async fn test_summarized_delete_documents_by_filter() {
index.wait_task(task.uid()).await.failed();
let (batch, _) = index.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -433,13 +474,14 @@ async fn test_summarized_delete_documents_by_filter() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
index.create(None).await;
let (task, _status_code) =
@ -447,8 +489,15 @@ async fn test_summarized_delete_documents_by_filter() {
index.wait_task(task.uid()).await.failed();
let (batch, _) = index.get_batch(2).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 2,
"progress": null,
@ -467,13 +516,14 @@ async fn test_summarized_delete_documents_by_filter() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
index.update_settings(json!({ "filterableAttributes": ["doggo"] })).await;
let (task, _status_code) =
@ -481,7 +531,14 @@ async fn test_summarized_delete_documents_by_filter() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(4).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r#"
{
"uid": 4,
@ -501,7 +558,8 @@ async fn test_summarized_delete_documents_by_filter() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
@ -517,7 +575,16 @@ async fn test_summarized_delete_document_by_id() {
let (task, _status_code) = index.delete_document(1).await;
index.wait_task(task.uid()).await.failed();
let (batch, _) = index.get_batch(0).await;
snapshot!(batch,
assert_json_snapshot!(batch,
{
".uid" => "[uid]",
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r#"
{
"uid": "[uid]",
@ -536,7 +603,8 @@ async fn test_summarized_delete_document_by_id() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
@ -549,7 +617,14 @@ async fn test_summarized_delete_document_by_id() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(2).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r#"
{
"uid": 2,
@ -568,7 +643,8 @@ async fn test_summarized_delete_document_by_id() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
@ -597,8 +673,15 @@ async fn test_summarized_settings_update() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -625,13 +708,14 @@ async fn test_summarized_settings_update() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -642,8 +726,15 @@ async fn test_summarized_index_creation() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -658,20 +749,28 @@ async fn test_summarized_index_creation() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
let (task, _status_code) = index.create(Some("doggos")).await;
index.wait_task(task.uid()).await.failed();
let (batch, _) = index.get_batch(1).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 1,
"progress": null,
@ -688,13 +787,14 @@ async fn test_summarized_index_creation() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -815,8 +915,15 @@ async fn test_summarized_index_update() {
index.wait_task(task.uid()).await.failed();
let (batch, _) = index.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -831,20 +938,28 @@ async fn test_summarized_index_update() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
let (task, _status_code) = index.update(Some("bones")).await;
index.wait_task(task.uid()).await.failed();
let (batch, _) = index.get_batch(1).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 1,
"progress": null,
@ -861,13 +976,14 @@ async fn test_summarized_index_update() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
// And run the same two tests once the index do exists.
index.create(None).await;
@ -876,7 +992,14 @@ async fn test_summarized_index_update() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(3).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r#"
{
"uid": 3,
@ -892,7 +1015,8 @@ async fn test_summarized_index_update() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
@ -904,8 +1028,15 @@ async fn test_summarized_index_update() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(4).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 4,
"progress": null,
@ -922,13 +1053,14 @@ async fn test_summarized_index_update() {
},
"indexUids": {
"test": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -942,8 +1074,15 @@ async fn test_summarized_index_swap() {
server.wait_task(task.uid()).await.failed();
let (batch, _) = server.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -965,13 +1104,14 @@ async fn test_summarized_index_swap() {
"types": {
"indexSwap": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
server.index("doggos").create(None).await;
let (task, _status_code) = server.index("cattos").create(None).await;
@ -983,8 +1123,15 @@ async fn test_summarized_index_swap() {
server.wait_task(task.uid()).await.succeeded();
let (batch, _) = server.get_batch(1).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 1,
"progress": null,
@ -999,13 +1146,14 @@ async fn test_summarized_index_swap() {
},
"indexUids": {
"doggos": 1
}
},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -1019,8 +1167,15 @@ async fn test_summarized_batch_cancelation() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(1).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 1,
"progress": null,
@ -1037,13 +1192,14 @@ async fn test_summarized_batch_cancelation() {
"types": {
"taskCancelation": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -1057,8 +1213,15 @@ async fn test_summarized_batch_deletion() {
index.wait_task(task.uid()).await.succeeded();
let (batch, _) = index.get_batch(1).await;
assert_json_snapshot!(batch,
{ ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 1,
"progress": null,
@ -1075,13 +1238,14 @@ async fn test_summarized_batch_deletion() {
"types": {
"taskDeletion": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}
#[actix_web::test]
@ -1091,8 +1255,16 @@ async fn test_summarized_dump_creation() {
server.wait_task(task.uid()).await;
let (batch, _) = server.get_batch(0).await;
assert_json_snapshot!(batch,
{ ".details.dumpUid" => "[dumpUid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" },
@r#"
{
".details.dumpUid" => "[dumpUid]",
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.callTrace" => "[callTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
},
@r###"
{
"uid": 0,
"progress": null,
@ -1107,11 +1279,12 @@ async fn test_summarized_dump_creation() {
"types": {
"dumpCreation": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
"###);
}

View File

@ -2202,7 +2202,13 @@ async fn import_dump_v6_containing_batches_and_enqueued_tasks() {
let (tasks, _) = server.tasks().await;
snapshot!(json_string!(tasks, { ".results[1].startedAt" => "[date]", ".results[1].finishedAt" => "[date]", ".results[1].duration" => "[date]" }), name: "tasks");
let (batches, _) = server.batches().await;
snapshot!(json_string!(batches, { ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].duration" => "[date]" }), name: "batches");
snapshot!(json_string!(batches, {
".results[0].startedAt" => "[date]",
".results[0].finishedAt" => "[date]",
".results[0].duration" => "[date]",
".results[0].stats.callTrace" => "[callTrace]",
".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]",
}), name: "batches");
let (indexes, code) = server.list_indexes(None, None).await;
assert_eq!(code, 200, "{indexes}");

View File

@ -1,6 +1,5 @@
---
source: crates/meilisearch/tests/dumps/mod.rs
snapshot_kind: text
---
{
"results": [
@ -21,7 +20,9 @@ snapshot_kind: text
},
"indexUids": {
"kefir": 1
}
},
"callTrace": "[callTrace]",
"writeChannelCongestion": "[writeChannelCongestion]"
},
"duration": "[date]",
"startedAt": "[date]",

View File

@ -18,7 +18,8 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"types": {
"upgradeDatabase": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",

View File

@ -18,7 +18,8 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"types": {
"upgradeDatabase": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",

View File

@ -18,7 +18,8 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"types": {
"upgradeDatabase": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",

View File

@ -1,6 +1,5 @@
---
source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
snapshot_kind: text
---
{
"results": [
@ -19,7 +18,8 @@ snapshot_kind: text
"types": {
"upgradeDatabase": 1
},
"indexUids": {}
"indexUids": {},
"callTrace": "[callTrace]"
},
"duration": "[duration]",
"startedAt": "[date]",

View File

@ -2,6 +2,7 @@
// It must test pretty much all the features of meilisearch because the other tests will only tests
// the new features they introduced.
use insta::assert_json_snapshot;
use manifest_dir_macros::exist_relative_path;
use meili_snap::{json_string, snapshot};
use meilisearch::Opt;
@ -126,10 +127,14 @@ async fn check_the_index_scheduler(server: &Server) {
"#);
// And their metadata are still right
let (stats, _) = server.stats().await;
snapshot!(stats, @r###"
assert_json_snapshot!(stats, {
".databaseSize" => "[bytes]",
".usedDatabaseSize" => "[bytes]"
},
@r###"
{
"databaseSize": 438272,
"usedDatabaseSize": 196608,
"databaseSize": "[bytes]",
"usedDatabaseSize": "[bytes]",
"lastUpdate": "2025-01-23T11:36:22.634859166Z",
"indexes": {
"kefir": {
@ -159,7 +164,7 @@ async fn check_the_index_scheduler(server: &Server) {
let (tasks, _) = server.tasks_filter("limit=1000").await;
snapshot!(json_string!(tasks, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "the_whole_task_queue_once_everything_has_been_processed");
let (batches, _) = server.batches_filter("limit=1000").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "the_whole_batch_queue_once_everything_has_been_processed");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "the_whole_batch_queue_once_everything_has_been_processed");
// Tests all the tasks query parameters
let (tasks, _) = server.tasks_filter("uids=10").await;
@ -186,32 +191,36 @@ async fn check_the_index_scheduler(server: &Server) {
// Tests all the batches query parameters
let (batches, _) = server.batches_filter("uids=10").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_uids_equal_10");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10");
let (batches, _) = server.batches_filter("batchUids=10").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_batchUids_equal_10");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10");
let (batches, _) = server.batches_filter("statuses=canceled").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_statuses_equal_canceled");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled");
// types has already been tested above to retrieve the upgrade database
let (batches, _) = server.batches_filter("canceledBy=19").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_canceledBy_equal_19");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19");
let (batches, _) = server.batches_filter("beforeEnqueuedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterEnqueuedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("beforeStartedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterStartedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("beforeFinishedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.callTrace" => "[callTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41");
let (stats, _) = server.stats().await;
snapshot!(stats, @r###"
assert_json_snapshot!(stats, {
".databaseSize" => "[bytes]",
".usedDatabaseSize" => "[bytes]"
},
@r###"
{
"databaseSize": 438272,
"usedDatabaseSize": 196608,
"databaseSize": "[bytes]",
"usedDatabaseSize": "[bytes]",
"lastUpdate": "2025-01-23T11:36:22.634859166Z",
"indexes": {
"kefir": {

View File

@ -73,6 +73,7 @@ pub use self::search::{
FacetDistribution, Filter, FormatOptions, MatchBounds, MatcherBuilder, MatchingWords, OrderBy,
Search, SearchResult, SemanticSearch, TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET,
};
pub use self::update::ChannelCongestion;
pub type Result<T> = std::result::Result<T, error::Error>;

View File

@ -3,7 +3,10 @@ use std::borrow::Cow;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use indexmap::IndexMap;
use itertools::Itertools;
use serde::Serialize;
use utoipa::ToSchema;
@ -15,28 +18,42 @@ pub trait Step: 'static + Send + Sync {
#[derive(Clone, Default)]
pub struct Progress {
steps: Arc<RwLock<Vec<(TypeId, Box<dyn Step>)>>>,
steps: Arc<RwLock<InnerProgress>>,
}
#[derive(Default)]
struct InnerProgress {
/// The hierarchy of steps.
steps: Vec<(TypeId, Box<dyn Step>, Instant)>,
/// The durations associated to each steps.
durations: Vec<(String, Duration)>,
}
impl Progress {
pub fn update_progress<P: Step>(&self, sub_progress: P) {
let mut steps = self.steps.write().unwrap();
let mut inner = self.steps.write().unwrap();
let InnerProgress { steps, durations } = &mut *inner;
let now = Instant::now();
let step_type = TypeId::of::<P>();
if let Some(idx) = steps.iter().position(|(id, _)| *id == step_type) {
if let Some(idx) = steps.iter().position(|(id, _, _)| *id == step_type) {
push_steps_durations(steps, durations, now, idx);
steps.truncate(idx);
}
steps.push((step_type, Box::new(sub_progress)));
steps.push((step_type, Box::new(sub_progress), now));
}
// TODO: This code should be in meilisearch_types but cannot because milli can't depend on meilisearch_types
pub fn as_progress_view(&self) -> ProgressView {
let steps = self.steps.read().unwrap();
let inner = self.steps.read().unwrap();
let InnerProgress { steps, .. } = &*inner;
let mut percentage = 0.0;
let mut prev_factors = 1.0;
let mut step_view = Vec::with_capacity(steps.len());
for (_, step) in steps.iter() {
for (_, step, _) in steps.iter() {
prev_factors *= step.total() as f32;
percentage += step.current() as f32 / prev_factors;
@ -49,6 +66,29 @@ impl Progress {
ProgressView { steps: step_view, percentage: percentage * 100.0 }
}
pub fn accumulated_durations(&self) -> IndexMap<String, String> {
let mut inner = self.steps.write().unwrap();
let InnerProgress { steps, durations, .. } = &mut *inner;
let now = Instant::now();
push_steps_durations(steps, durations, now, 0);
durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
}
}
/// Generate the names associated with the durations and push them.
fn push_steps_durations(
steps: &[(TypeId, Box<dyn Step>, Instant)],
durations: &mut Vec<(String, Duration)>,
now: Instant,
idx: usize,
) {
for (i, (_, _, started_at)) in steps.iter().skip(idx).enumerate().rev() {
let full_name = steps.iter().take(idx + i + 1).map(|(_, s, _)| s.name()).join(" > ");
durations.push((full_name, now.duration_since(*started_at)));
}
}
/// This trait lets you use the AtomicSubStep defined right below.
@ -164,7 +204,7 @@ pub struct ProgressStepView {
/// Used when the name can change but it's still the same step.
/// To avoid conflicts on the `TypeId`, create a unique type every time you use this step:
/// ```text
/// enum UpgradeVersion {}
/// enum UpgradeVersion {}
///
/// progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
/// "v1 to v2",

View File

@ -5,6 +5,7 @@ pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::*;
pub use self::indexer_config::IndexerConfig;
pub use self::new::ChannelCongestion;
pub use self::settings::{validate_embedding_settings, Setting, Settings};
pub use self::update_step::UpdateIndexingStep;
pub use self::word_prefix_docids::WordPrefixDocids;

View File

@ -291,7 +291,7 @@ where
&indexing_context.must_stop_processing,
)?;
}
indexing_context.progress.update_progress(IndexingStep::WritingToDatabase);
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
Result::Ok((facet_field_ids_delta, index_embeddings))

View File

@ -10,6 +10,7 @@ use hashbrown::HashMap;
use heed::RwTxn;
pub use partial_dump::PartialDump;
pub use update_by_function::UpdateByFunction;
pub use write::ChannelCongestion;
use write::{build_vectors, update_index, write_to_db};
use super::channel::*;
@ -53,7 +54,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP>(
embedders: EmbeddingConfigs,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
) -> Result<()>
) -> Result<ChannelCongestion>
where
DC: DocumentChanges<'pl>,
MSP: Fn() -> bool + Sync,
@ -130,7 +131,7 @@ where
let mut field_distribution = index.field_distribution(wtxn)?;
let mut document_ids = index.documents_ids(wtxn)?;
thread::scope(|s| -> Result<()> {
let congestion = thread::scope(|s| -> Result<ChannelCongestion> {
let indexer_span = tracing::Span::current();
let embedders = &embedders;
let finished_extraction = &finished_extraction;
@ -182,7 +183,8 @@ where
let mut arroy_writers = arroy_writers?;
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
@ -210,7 +212,7 @@ where
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(()) as Result<_>
Ok(congestion) as Result<_>
})?;
// required to into_inner the new_fields_ids_map
@ -227,5 +229,5 @@ where
document_ids,
)?;
Ok(())
Ok(congestion)
}

View File

@ -14,13 +14,13 @@ use crate::update::settings::InnerIndexSettings;
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings};
use crate::{Error, Index, InternalError, Result};
pub(super) fn write_to_db(
pub fn write_to_db(
mut writer_receiver: WriterBbqueueReceiver<'_>,
finished_extraction: &AtomicBool,
index: &Index,
wtxn: &mut RwTxn<'_>,
arroy_writers: &HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
) -> Result<()> {
) -> Result<ChannelCongestion> {
// Used by by the ArroySetVector to copy the embedding into an
// aligned memory area, required by arroy to accept a new vector.
let mut aligned_embedding = Vec::new();
@ -75,21 +75,29 @@ pub(super) fn write_to_db(
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
let direct_attempts = writer_receiver.sent_messages_attempts();
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
tracing::debug!(
"Channel congestion metrics - \
Attempts: {direct_attempts}, \
Blocked attempts: {blocking_attempts} \
({congestion_pct:.1}% congestion)"
);
Ok(ChannelCongestion {
attempts: writer_receiver.sent_messages_attempts(),
blocking_attempts: writer_receiver.blocking_sent_messages_attempts(),
})
}
Ok(())
/// Stats exposing the congestion of a channel.
#[derive(Debug, Copy, Clone)]
pub struct ChannelCongestion {
/// Number of attempts to send a message into the bbqueue buffer.
pub attempts: usize,
/// Number of blocking attempts which require a retry.
pub blocking_attempts: usize,
}
impl ChannelCongestion {
pub fn congestion_ratio(&self) -> f32 {
self.blocking_attempts as f32 / self.attempts as f32
}
}
#[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")]
pub(super) fn build_vectors<MSP>(
pub fn build_vectors<MSP>(
index: &Index,
wtxn: &mut RwTxn<'_>,
index_embeddings: Vec<IndexEmbeddingConfig>,
@ -113,7 +121,7 @@ where
Ok(())
}
pub(super) fn update_index(
pub fn update_index(
index: &Index,
wtxn: &mut RwTxn<'_>,
new_fields_ids_map: FieldIdMapWithMetadata,

View File

@ -1,4 +1,5 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update};
pub use indexer::ChannelCongestion;
pub use merger::{
merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,
};

View File

@ -14,7 +14,7 @@ pub enum IndexingStep {
ExtractingWordProximity,
ExtractingEmbeddings,
WritingGeoPoints,
WritingToDatabase,
WaitingForDatabaseWrites,
WaitingForExtractors,
WritingEmbeddingsToDatabase,
PostProcessingFacets,
@ -32,7 +32,7 @@ impl Step for IndexingStep {
IndexingStep::ExtractingWordProximity => "extracting word proximity",
IndexingStep::ExtractingEmbeddings => "extracting embeddings",
IndexingStep::WritingGeoPoints => "writing geo points",
IndexingStep::WritingToDatabase => "writing to database",
IndexingStep::WaitingForDatabaseWrites => "waiting for database writes",
IndexingStep::WaitingForExtractors => "waiting for extractors",
IndexingStep::WritingEmbeddingsToDatabase => "writing embeddings to database",
IndexingStep::PostProcessingFacets => "post-processing facets",