Add an Authorization Header along with the webhook calls

This commit is contained in:
Clément Renault 2023-12-19 12:18:45 +01:00
parent 19736cefe8
commit fa2b96b9a5
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 33 additions and 1 deletions

View File

@ -38,6 +38,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
auth_path: _, auth_path: _,
version_file_path: _, version_file_path: _,
webhook_url: _, webhook_url: _,
webhook_authorization_header: _,
test_breakpoint_sdr: _, test_breakpoint_sdr: _,
planned_failures: _, planned_failures: _,
run_loop_iteration: _, run_loop_iteration: _,

View File

@ -245,7 +245,10 @@ pub struct IndexSchedulerOptions {
pub snapshots_path: PathBuf, pub snapshots_path: PathBuf,
/// The path to the folder containing the dumps. /// The path to the folder containing the dumps.
pub dumps_path: PathBuf, pub dumps_path: PathBuf,
/// The URL on which we must send the tasks statuses
pub webhook_url: Option<String>, pub webhook_url: Option<String>,
/// The value we will send into the Authorization HTTP header on the webhook URL
pub webhook_authorization_header: Option<String>,
/// The maximum size, in bytes, of the task index. /// The maximum size, in bytes, of the task index.
pub task_db_size: usize, pub task_db_size: usize,
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index. /// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
@ -330,6 +333,8 @@ pub struct IndexScheduler {
/// The webhook url we should send tasks to after processing every batches. /// The webhook url we should send tasks to after processing every batches.
pub(crate) webhook_url: Option<String>, pub(crate) webhook_url: Option<String>,
/// The Authorization header to send to the webhook URL.
pub(crate) webhook_authorization_header: Option<String>,
/// A frame to output the indexation profiling files to disk. /// A frame to output the indexation profiling files to disk.
pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>, pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>,
@ -397,6 +402,7 @@ impl IndexScheduler {
auth_path: self.auth_path.clone(), auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(), version_file_path: self.version_file_path.clone(),
webhook_url: self.webhook_url.clone(), webhook_url: self.webhook_url.clone(),
webhook_authorization_header: self.webhook_authorization_header.clone(),
currently_updating_index: self.currently_updating_index.clone(), currently_updating_index: self.currently_updating_index.clone(),
embedders: self.embedders.clone(), embedders: self.embedders.clone(),
#[cfg(test)] #[cfg(test)]
@ -497,6 +503,7 @@ impl IndexScheduler {
auth_path: options.auth_path, auth_path: options.auth_path,
version_file_path: options.version_file_path, version_file_path: options.version_file_path,
webhook_url: options.webhook_url, webhook_url: options.webhook_url,
webhook_authorization_header: options.webhook_authorization_header,
currently_updating_index: Arc::new(RwLock::new(None)), currently_updating_index: Arc::new(RwLock::new(None)),
embedders: Default::default(), embedders: Default::default(),
@ -1338,8 +1345,15 @@ impl IndexScheduler {
written: 0, written: 0,
}; };
// let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
if let Err(e) = ureq::post(url).set("Content-Encoding", "gzip").send(reader) { let request = ureq::post(url).set("Content-Encoding", "gzip");
let request = match &self.webhook_authorization_header {
Some(header) => request.set("Authorization", header),
None => request,
};
if let Err(e) = request.send(reader) {
log::error!("While sending data to the webhook: {e}"); log::error!("While sending data to the webhook: {e}");
} }
} }
@ -1761,6 +1775,7 @@ mod tests {
snapshots_path: tempdir.path().join("snapshots"), snapshots_path: tempdir.path().join("snapshots"),
dumps_path: tempdir.path().join("dumps"), dumps_path: tempdir.path().join("dumps"),
webhook_url: None, webhook_url: None,
webhook_authorization_header: None,
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
enable_mdb_writemap: false, enable_mdb_writemap: false,

View File

@ -265,6 +265,7 @@ struct Infos {
http_addr: bool, http_addr: bool,
http_payload_size_limit: Byte, http_payload_size_limit: Byte,
task_queue_webhook: bool, task_queue_webhook: bool,
task_webhook_authorization_header: bool,
log_level: String, log_level: String,
max_indexing_memory: MaxMemory, max_indexing_memory: MaxMemory,
max_indexing_threads: MaxThreads, max_indexing_threads: MaxThreads,
@ -292,6 +293,7 @@ impl From<Opt> for Infos {
master_key: _, master_key: _,
env, env,
task_webhook_url, task_webhook_url,
task_webhook_authorization_header,
max_index_size: _, max_index_size: _,
max_task_db_size: _, max_task_db_size: _,
http_payload_size_limit, http_payload_size_limit,
@ -346,6 +348,7 @@ impl From<Opt> for Infos {
http_payload_size_limit, http_payload_size_limit,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
task_queue_webhook: task_webhook_url.is_some(), task_queue_webhook: task_webhook_url.is_some(),
task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
log_level: log_level.to_string(), log_level: log_level.to_string(),
max_indexing_memory, max_indexing_memory,
max_indexing_threads, max_indexing_threads,

View File

@ -229,6 +229,7 @@ fn open_or_create_database_unchecked(
snapshots_path: opt.snapshot_dir.clone(), snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(), dumps_path: opt.dump_dir.clone(),
webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()), webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()),
webhook_authorization_header: opt.task_webhook_authorization_header.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize, task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize, index_base_map_size: opt.max_index_size.get_bytes() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,

View File

@ -30,6 +30,7 @@ const MEILI_HTTP_ADDR: &str = "MEILI_HTTP_ADDR";
const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY"; const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY";
const MEILI_ENV: &str = "MEILI_ENV"; const MEILI_ENV: &str = "MEILI_ENV";
const MEILI_TASK_WEBHOOK_URL: &str = "MEILI_TASK_WEBHOOK_URL"; const MEILI_TASK_WEBHOOK_URL: &str = "MEILI_TASK_WEBHOOK_URL";
const MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER: &str = "MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER";
#[cfg(feature = "analytics")] #[cfg(feature = "analytics")]
const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS"; const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS";
const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT"; const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT";
@ -162,6 +163,10 @@ pub struct Opt {
#[clap(long, env = MEILI_TASK_WEBHOOK_URL)] #[clap(long, env = MEILI_TASK_WEBHOOK_URL)]
pub task_webhook_url: Option<Url>, pub task_webhook_url: Option<Url>,
/// The Authorization header to send on the webhook URL whenever a task finishes so a third party can be notified.
#[clap(long, env = MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER)]
pub task_webhook_authorization_header: Option<String>,
/// Deactivates Meilisearch's built-in telemetry when provided. /// Deactivates Meilisearch's built-in telemetry when provided.
/// ///
/// Meilisearch automatically collects data from all instances that do not opt out using this flag. /// Meilisearch automatically collects data from all instances that do not opt out using this flag.
@ -382,6 +387,7 @@ impl Opt {
master_key, master_key,
env, env,
task_webhook_url, task_webhook_url,
task_webhook_authorization_header,
max_index_size: _, max_index_size: _,
max_task_db_size: _, max_task_db_size: _,
http_payload_size_limit, http_payload_size_limit,
@ -419,6 +425,12 @@ impl Opt {
if let Some(task_webhook_url) = task_webhook_url { if let Some(task_webhook_url) = task_webhook_url {
export_to_env_if_not_present(MEILI_TASK_WEBHOOK_URL, task_webhook_url.to_string()); export_to_env_if_not_present(MEILI_TASK_WEBHOOK_URL, task_webhook_url.to_string());
} }
if let Some(task_webhook_authorization_header) = task_webhook_authorization_header {
export_to_env_if_not_present(
MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER,
task_webhook_authorization_header,
);
}
#[cfg(feature = "analytics")] #[cfg(feature = "analytics")]
{ {