4475: Allow running benchmarks without sending results to the dashboard r=irevoire a=dureuill

Adds a `--no-dashboard` option to avoid sending results to the dashboard.

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-03-13 09:59:52 +00:00 committed by GitHub
commit 07b1d0edaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 189 additions and 163 deletions

View File

@ -11,157 +11,179 @@ use super::client::Client;
use super::env_info; use super::env_info;
use super::workload::Workload; use super::workload::Workload;
pub async fn cancel_on_ctrl_c( #[derive(Debug, Clone)]
invocation_uuid: Uuid, pub enum DashboardClient {
dashboard_client: Client, Client(Client),
abort_handle: AbortHandle, Dry,
) { }
tracing::info!("press Ctrl-C to cancel the invocation");
match ctrl_c().await { impl DashboardClient {
Ok(()) => { pub fn new(dashboard_url: &str, api_key: Option<&str>) -> anyhow::Result<Self> {
tracing::info!(%invocation_uuid, "received Ctrl-C, cancelling invocation"); let dashboard_client = Client::new(
mark_as_failed(dashboard_client, invocation_uuid, None).await; Some(format!("{}/api/v1", dashboard_url)),
abort_handle.abort(); api_key,
Some(std::time::Duration::from_secs(60)),
)?;
Ok(Self::Client(dashboard_client))
}
pub fn new_dry() -> Self {
Self::Dry
}
pub async fn send_machine_info(&self, env: &env_info::Environment) -> anyhow::Result<()> {
let Self::Client(dashboard_client) = self else { return Ok(()) };
let response = dashboard_client
.put("machine")
.json(&json!({"hostname": env.hostname}))
.send()
.await
.context("sending machine information")?;
if !response.status().is_success() {
bail!(
"could not send machine information: {} {}",
response.status(),
response.text().await.unwrap_or_else(|_| "unknown".into())
);
} }
Err(error) => tracing::warn!( Ok(())
error = &error as &dyn std::error::Error,
"failed to listen to Ctrl-C signal, invocation won't be canceled on Ctrl-C"
),
} }
}
pub async fn mark_as_failed( pub async fn create_invocation(
dashboard_client: Client, &self,
invocation_uuid: Uuid, build_info: build_info::BuildInfo,
failure_reason: Option<String>, commit_message: &str,
) { env: env_info::Environment,
let response = dashboard_client max_workloads: usize,
.post("cancel-invocation") reason: Option<&str>,
.json(&json!({ ) -> anyhow::Result<Uuid> {
"invocation_uuid": invocation_uuid, let Self::Client(dashboard_client) = self else { return Ok(Uuid::now_v7()) };
"failure_reason": failure_reason,
})) let response = dashboard_client
.send() .put("invocation")
.await; .json(&json!({
let response = match response { "commit": {
Ok(response) => response, "sha1": build_info.commit_sha1,
Err(response_error) => { "message": commit_message,
tracing::error!(error = &response_error as &dyn std::error::Error, %invocation_uuid, "could not mark invocation as failed"); "commit_date": build_info.commit_timestamp,
return; "branch": build_info.branch,
"tag": build_info.describe.and_then(|describe| describe.as_tag()),
},
"machine_hostname": env.hostname,
"max_workloads": max_workloads,
"reason": reason
}))
.send()
.await
.context("sending invocation")?;
if !response.status().is_success() {
bail!(
"could not send new invocation: {}",
response.text().await.unwrap_or_else(|_| "unknown".into())
);
} }
}; let invocation_uuid: Uuid =
response.json().await.context("could not deserialize invocation response as JSON")?;
if !response.status().is_success() { Ok(invocation_uuid)
tracing::error!(
%invocation_uuid,
"could not mark invocation as failed: {}",
response.text().await.unwrap()
);
return;
}
tracing::warn!(%invocation_uuid, "marked invocation as failed or canceled");
}
pub async fn send_machine_info(
dashboard_client: &Client,
env: &env_info::Environment,
) -> anyhow::Result<()> {
let response = dashboard_client
.put("machine")
.json(&json!({"hostname": env.hostname}))
.send()
.await
.context("sending machine information")?;
if !response.status().is_success() {
bail!(
"could not send machine information: {} {}",
response.status(),
response.text().await.unwrap_or_else(|_| "unknown".into())
);
}
Ok(())
}
pub async fn create_invocation(
dashboard_client: &Client,
build_info: build_info::BuildInfo,
commit_message: &str,
env: env_info::Environment,
max_workloads: usize,
reason: Option<&str>,
) -> anyhow::Result<Uuid> {
let response = dashboard_client
.put("invocation")
.json(&json!({
"commit": {
"sha1": build_info.commit_sha1,
"message": commit_message,
"commit_date": build_info.commit_timestamp,
"branch": build_info.branch,
"tag": build_info.describe.and_then(|describe| describe.as_tag()),
},
"machine_hostname": env.hostname,
"max_workloads": max_workloads,
"reason": reason
}))
.send()
.await
.context("sending invocation")?;
if !response.status().is_success() {
bail!(
"could not send new invocation: {}",
response.text().await.unwrap_or_else(|_| "unknown".into())
);
}
let invocation_uuid: Uuid =
response.json().await.context("could not deserialize invocation response as JSON")?;
Ok(invocation_uuid)
}
pub async fn create_workload(
dashboard_client: &Client,
invocation_uuid: Uuid,
workload: &Workload,
) -> anyhow::Result<Uuid> {
let response = dashboard_client
.put("workload")
.json(&json!({
"invocation_uuid": invocation_uuid,
"name": &workload.name,
"max_runs": workload.run_count,
}))
.send()
.await
.context("could not create new workload")?;
if !response.status().is_success() {
bail!("creating new workload failed: {}", response.text().await.unwrap())
} }
let workload_uuid: Uuid = pub async fn create_workload(
response.json().await.context("could not deserialize JSON as UUID")?; &self,
Ok(workload_uuid) invocation_uuid: Uuid,
} workload: &Workload,
) -> anyhow::Result<Uuid> {
let Self::Client(dashboard_client) = self else { return Ok(Uuid::now_v7()) };
pub async fn create_run( let response = dashboard_client
dashboard_client: Client, .put("workload")
workload_uuid: Uuid, .json(&json!({
report: &BTreeMap<String, CallStats>, "invocation_uuid": invocation_uuid,
) -> anyhow::Result<()> { "name": &workload.name,
let response = dashboard_client "max_runs": workload.run_count,
.put("run") }))
.json(&json!({ .send()
"workload_uuid": workload_uuid, .await
"data": report .context("could not create new workload")?;
}))
.send() if !response.status().is_success() {
.await bail!("creating new workload failed: {}", response.text().await.unwrap())
.context("sending new run")?; }
if !response.status().is_success() {
bail!( let workload_uuid: Uuid =
"sending new run failed: {}", response.json().await.context("could not deserialize JSON as UUID")?;
response.text().await.unwrap_or_else(|_| "unknown".into()) Ok(workload_uuid)
) }
pub async fn create_run(
&self,
workload_uuid: Uuid,
report: &BTreeMap<String, CallStats>,
) -> anyhow::Result<()> {
let Self::Client(dashboard_client) = self else { return Ok(()) };
let response = dashboard_client
.put("run")
.json(&json!({
"workload_uuid": workload_uuid,
"data": report
}))
.send()
.await
.context("sending new run")?;
if !response.status().is_success() {
bail!(
"sending new run failed: {}",
response.text().await.unwrap_or_else(|_| "unknown".into())
)
}
Ok(())
}
pub async fn cancel_on_ctrl_c(self, invocation_uuid: Uuid, abort_handle: AbortHandle) {
tracing::info!("press Ctrl-C to cancel the invocation");
match ctrl_c().await {
Ok(()) => {
tracing::info!(%invocation_uuid, "received Ctrl-C, cancelling invocation");
self.mark_as_failed(invocation_uuid, None).await;
abort_handle.abort();
}
Err(error) => tracing::warn!(
error = &error as &dyn std::error::Error,
"failed to listen to Ctrl-C signal, invocation won't be canceled on Ctrl-C"
),
}
}
pub async fn mark_as_failed(&self, invocation_uuid: Uuid, failure_reason: Option<String>) {
if let DashboardClient::Client(client) = self {
let response = client
.post("cancel-invocation")
.json(&json!({
"invocation_uuid": invocation_uuid,
"failure_reason": failure_reason,
}))
.send()
.await;
let response = match response {
Ok(response) => response,
Err(response_error) => {
tracing::error!(error = &response_error as &dyn std::error::Error, %invocation_uuid, "could not mark invocation as failed");
return;
}
};
if !response.status().is_success() {
tracing::error!(
%invocation_uuid,
"could not mark invocation as failed: {}",
response.text().await.unwrap()
);
return;
}
}
tracing::warn!(%invocation_uuid, "marked invocation as failed or canceled");
} }
Ok(())
} }

View File

@ -50,6 +50,10 @@ pub struct BenchDeriveArgs {
#[arg(long, default_value_t = default_dashboard_url())] #[arg(long, default_value_t = default_dashboard_url())]
dashboard_url: String, dashboard_url: String,
/// Don't actually send results to the dashboard
#[arg(long)]
no_dashboard: bool,
/// Directory to output reports. /// Directory to output reports.
#[arg(long, default_value_t = default_report_folder())] #[arg(long, default_value_t = default_report_folder())]
report_folder: String, report_folder: String,
@ -103,11 +107,11 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
let assets_client = let assets_client =
Client::new(None, args.assets_key.as_deref(), Some(std::time::Duration::from_secs(3600)))?; // 1h Client::new(None, args.assets_key.as_deref(), Some(std::time::Duration::from_secs(3600)))?; // 1h
let dashboard_client = Client::new( let dashboard_client = if args.no_dashboard {
Some(format!("{}/api/v1", args.dashboard_url)), dashboard::DashboardClient::new_dry()
args.api_key.as_deref(), } else {
Some(std::time::Duration::from_secs(60)), dashboard::DashboardClient::new(&args.dashboard_url, args.api_key.as_deref())?
)?; };
// reporting uses its own client because keeping the stream open to wait for entries // reporting uses its own client because keeping the stream open to wait for entries
// blocks any other requests // blocks any other requests
@ -127,12 +131,12 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
// enter runtime // enter runtime
rt.block_on(async { rt.block_on(async {
dashboard::send_machine_info(&dashboard_client, &env).await?; dashboard_client.send_machine_info(&env).await?;
let commit_message = build_info.commit_msg.context("missing commit message")?.split('\n').next().unwrap(); let commit_message = build_info.commit_msg.context("missing commit message")?.split('\n').next().unwrap();
let max_workloads = args.workload_file.len(); let max_workloads = args.workload_file.len();
let reason: Option<&str> = args.reason.as_deref(); let reason: Option<&str> = args.reason.as_deref();
let invocation_uuid = dashboard::create_invocation(&dashboard_client, build_info, commit_message, env, max_workloads, reason).await?; let invocation_uuid = dashboard_client.create_invocation( build_info, commit_message, env, max_workloads, reason).await?;
tracing::info!(workload_count = args.workload_file.len(), "handling workload files"); tracing::info!(workload_count = args.workload_file.len(), "handling workload files");
@ -167,7 +171,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
let abort_handle = workload_runs.abort_handle(); let abort_handle = workload_runs.abort_handle();
tokio::spawn({ tokio::spawn({
let dashboard_client = dashboard_client.clone(); let dashboard_client = dashboard_client.clone();
dashboard::cancel_on_ctrl_c(invocation_uuid, dashboard_client, abort_handle) dashboard_client.cancel_on_ctrl_c(invocation_uuid, abort_handle)
}); });
// wait for the end of the main task, handle result // wait for the end of the main task, handle result
@ -178,7 +182,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
} }
Ok(Err(error)) => { Ok(Err(error)) => {
tracing::error!(%invocation_uuid, error = %error, "invocation failed, attempting to report the failure to dashboard"); tracing::error!(%invocation_uuid, error = %error, "invocation failed, attempting to report the failure to dashboard");
dashboard::mark_as_failed(dashboard_client, invocation_uuid, Some(error.to_string())).await; dashboard_client.mark_as_failed(invocation_uuid, Some(error.to_string())).await;
tracing::warn!(%invocation_uuid, "invocation marked as failed following error"); tracing::warn!(%invocation_uuid, "invocation marked as failed following error");
Err(error) Err(error)
}, },
@ -186,7 +190,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
match join_error.try_into_panic() { match join_error.try_into_panic() {
Ok(panic) => { Ok(panic) => {
tracing::error!("invocation panicked, attempting to report the failure to dashboard"); tracing::error!("invocation panicked, attempting to report the failure to dashboard");
dashboard::mark_as_failed(dashboard_client, invocation_uuid, Some("Panicked".into())).await; dashboard_client.mark_as_failed( invocation_uuid, Some("Panicked".into())).await;
std::panic::resume_unwind(panic) std::panic::resume_unwind(panic)
} }
Err(_) => { Err(_) => {

View File

@ -12,8 +12,9 @@ use uuid::Uuid;
use super::assets::Asset; use super::assets::Asset;
use super::client::Client; use super::client::Client;
use super::command::SyncMode; use super::command::SyncMode;
use super::dashboard::DashboardClient;
use super::BenchDeriveArgs; use super::BenchDeriveArgs;
use crate::bench::{assets, dashboard, meili_process}; use crate::bench::{assets, meili_process};
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct Workload { pub struct Workload {
@ -25,7 +26,7 @@ pub struct Workload {
} }
async fn run_commands( async fn run_commands(
dashboard_client: &Client, dashboard_client: &DashboardClient,
logs_client: &Client, logs_client: &Client,
meili_client: &Client, meili_client: &Client,
workload_uuid: Uuid, workload_uuid: Uuid,
@ -64,7 +65,7 @@ async fn run_commands(
#[tracing::instrument(skip(assets_client, dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = workload.name))] #[tracing::instrument(skip(assets_client, dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = workload.name))]
pub async fn execute( pub async fn execute(
assets_client: &Client, assets_client: &Client,
dashboard_client: &Client, dashboard_client: &DashboardClient,
logs_client: &Client, logs_client: &Client,
meili_client: &Client, meili_client: &Client,
invocation_uuid: Uuid, invocation_uuid: Uuid,
@ -74,8 +75,7 @@ pub async fn execute(
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
assets::fetch_assets(assets_client, &workload.assets, &args.asset_folder).await?; assets::fetch_assets(assets_client, &workload.assets, &args.asset_folder).await?;
let workload_uuid = let workload_uuid = dashboard_client.create_workload(invocation_uuid, &workload).await?;
dashboard::create_workload(dashboard_client, invocation_uuid, &workload).await?;
let mut tasks = Vec::new(); let mut tasks = Vec::new();
@ -113,7 +113,7 @@ pub async fn execute(
#[allow(clippy::too_many_arguments)] // not best code quality, but this is a benchmark runner #[allow(clippy::too_many_arguments)] // not best code quality, but this is a benchmark runner
#[tracing::instrument(skip(dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = %workload.name))] #[tracing::instrument(skip(dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = %workload.name))]
async fn execute_run( async fn execute_run(
dashboard_client: &Client, dashboard_client: &DashboardClient,
logs_client: &Client, logs_client: &Client,
meili_client: &Client, meili_client: &Client,
workload_uuid: Uuid, workload_uuid: Uuid,
@ -202,7 +202,7 @@ async fn start_report(
} }
async fn stop_report( async fn stop_report(
dashboard_client: &Client, dashboard_client: &DashboardClient,
logs_client: &Client, logs_client: &Client,
workload_uuid: Uuid, workload_uuid: Uuid,
filename: String, filename: String,
@ -232,7 +232,7 @@ async fn stop_report(
.context("could not convert trace to report")?; .context("could not convert trace to report")?;
let context = || format!("writing report to {filename}"); let context = || format!("writing report to {filename}");
dashboard::create_run(dashboard_client, workload_uuid, &report).await?; dashboard_client.create_run(workload_uuid, &report).await?;
let mut output_file = std::io::BufWriter::new( let mut output_file = std::io::BufWriter::new(
std::fs::File::options() std::fs::File::options()