Add tracing to index-scheduler

This commit is contained in:
Louis Dureuil 2024-01-23 09:41:59 +01:00
parent 117e43a9ec
commit 34a4a0520f
No known key found for this signature in database
4 changed files with 36 additions and 16 deletions

1
Cargo.lock generated
View File

@ -2917,6 +2917,7 @@ dependencies = [
"tempfile", "tempfile",
"thiserror", "thiserror",
"time", "time",
"tracing",
"ureq", "ureq",
"uuid", "uuid",
] ]

View File

@ -30,7 +30,13 @@ serde_json = { version = "1.0.111", features = ["preserve_order"] }
synchronoise = "1.0.1" synchronoise = "1.0.1"
tempfile = "3.9.0" tempfile = "3.9.0"
thiserror = "1.0.56" thiserror = "1.0.56"
time = { version = "0.3.31", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.31", features = [
"serde-well-known",
"formatting",
"parsing",
"macros",
] }
tracing = "0.1.40"
ureq = "2.9.1" ureq = "2.9.1"
uuid = { version = "1.6.1", features = ["serde", "v4"] } uuid = { version = "1.6.1", features = ["serde", "v4"] }

View File

@ -24,7 +24,6 @@ use std::fs::{self, File};
use std::io::BufWriter; use std::io::BufWriter;
use dump::IndexMetadata; use dump::IndexMetadata;
use log::{debug, error, info, trace};
use meilisearch_types::error::Code; use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -514,6 +513,7 @@ impl IndexScheduler {
/// 3. We get the *next* snapshot to process. /// 3. We get the *next* snapshot to process.
/// 4. We get the *next* dump to process. /// 4. We get the *next* dump to process.
/// 5. We get the *next* tasks to process for a specific index. /// 5. We get the *next* tasks to process for a specific index.
#[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")]
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> { pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> {
#[cfg(test)] #[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?; self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?;
@ -619,6 +619,7 @@ impl IndexScheduler {
/// The list of tasks that were processed. The metadata of each task in the returned /// The list of tasks that were processed. The metadata of each task in the returned
/// list is updated accordingly, with the exception of the its date fields /// list is updated accordingly, with the exception of the its date fields
/// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at).
#[tracing::instrument(level = "trace", skip(self, batch), target = "indexing::scheduler", fields(batch=batch.to_string()))]
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> { pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
#[cfg(test)] #[cfg(test)]
{ {
@ -668,9 +669,10 @@ impl IndexScheduler {
Ok(()) => { Ok(()) => {
for content_uuid in canceled_tasks_content_uuids { for content_uuid in canceled_tasks_content_uuids {
if let Err(error) = self.delete_update_file(content_uuid) { if let Err(error) = self.delete_update_file(content_uuid) {
error!( tracing::error!(
"We failed deleting the content file indentified as {}: {}", file_content_uuid = %content_uuid,
content_uuid, error %error,
"Failed deleting content file"
) )
} }
} }
@ -969,7 +971,10 @@ impl IndexScheduler {
match res { match res {
Ok(_) => (), Ok(_) => (),
Err(e) => error!("Could not write the stats of the index {}", e), Err(e) => tracing::error!(
error = &e as &dyn std::error::Error,
"Could not write the stats of the index"
),
} }
Ok(tasks) Ok(tasks)
@ -997,7 +1002,7 @@ impl IndexScheduler {
builder.set_primary_key(primary_key); builder.set_primary_key(primary_key);
let must_stop_processing = self.must_stop_processing.clone(); let must_stop_processing = self.must_stop_processing.clone();
builder.execute( builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;
index_wtxn.commit()?; index_wtxn.commit()?;
@ -1024,7 +1029,10 @@ impl IndexScheduler {
match res { match res {
Ok(_) => (), Ok(_) => (),
Err(e) => error!("Could not write the stats of the index {}", e), Err(e) => tracing::error!(
error = &e as &dyn std::error::Error,
"Could not write the stats of the index"
),
} }
Ok(vec![task]) Ok(vec![task])
@ -1143,6 +1151,11 @@ impl IndexScheduler {
/// ///
/// ## Return /// ## Return
/// The list of processed tasks. /// The list of processed tasks.
#[tracing::instrument(
level = "trace",
skip(self, index_wtxn, index),
target = "indexing::scheduler"
)]
fn apply_index_operation<'i>( fn apply_index_operation<'i>(
&self, &self,
index_wtxn: &mut RwTxn<'i>, index_wtxn: &mut RwTxn<'i>,
@ -1203,7 +1216,7 @@ impl IndexScheduler {
milli::update::Settings::new(index_wtxn, index, indexer_config); milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.set_primary_key(primary_key); builder.set_primary_key(primary_key);
builder.execute( builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.clone().get(), || must_stop_processing.clone().get(),
)?; )?;
primary_key_has_been_set = true; primary_key_has_been_set = true;
@ -1222,7 +1235,7 @@ impl IndexScheduler {
index, index,
indexer_config, indexer_config,
config, config,
|indexing_step| trace!("update: {:?}", indexing_step), |indexing_step| tracing::trace!(?indexing_step, "Update"),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;
@ -1294,7 +1307,7 @@ impl IndexScheduler {
if !tasks.iter().all(|res| res.error.is_some()) { if !tasks.iter().all(|res| res.error.is_some()) {
let addition = builder.execute()?; let addition = builder.execute()?;
info!("document addition done: {:?}", addition); tracing::info!(indexing_result = ?addition, "document indexing done");
} else if primary_key_has_been_set { } else if primary_key_has_been_set {
// Everything failed but we've set a primary key. // Everything failed but we've set a primary key.
// We need to remove it. // We need to remove it.
@ -1302,7 +1315,7 @@ impl IndexScheduler {
milli::update::Settings::new(index_wtxn, index, indexer_config); milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.reset_primary_key(); builder.reset_primary_key();
builder.execute( builder.execute(
|indexing_step| trace!("update: {:?}", indexing_step), |indexing_step| tracing::trace!(update = ?indexing_step),
|| must_stop_processing.clone().get(), || must_stop_processing.clone().get(),
)?; )?;
} }
@ -1372,7 +1385,7 @@ impl IndexScheduler {
let must_stop_processing = self.must_stop_processing.clone(); let must_stop_processing = self.must_stop_processing.clone();
builder.execute( builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;
@ -1584,7 +1597,7 @@ fn delete_document_by_filter<'a>(
index, index,
indexer_config, indexer_config,
config, config,
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;

View File

@ -1193,7 +1193,7 @@ impl IndexScheduler {
log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
} }
} }
log::info!("A batch of tasks was successfully completed."); tracing::info!("A batch of tasks was successfully completed.");
} }
// If we have an abortion error we must stop the tick here and re-schedule tasks. // If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError( Err(Error::Milli(milli::Error::InternalError(