diff --git a/Cargo.lock b/Cargo.lock index edcd26bf9..496710399 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1677,9 +1677,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.26" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "miniz_oxide", @@ -1701,9 +1701,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -2753,9 +2753,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -2774,6 +2774,7 @@ dependencies = [ "dump", "enum-iterator", "file-store", + "flate2", "insta", "log", "meili-snap", @@ -2789,6 +2790,7 @@ dependencies = [ "tempfile", "thiserror", "time", + "ureq", "uuid 1.5.0", ] @@ -3559,6 +3561,7 @@ dependencies = [ "tokio", "tokio-stream", "toml", + "url", "urlencoding", "uuid 1.5.0", "vergen", @@ -4067,9 +4070,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "permissive-json-pointer" @@ -5597,13 +5600,14 @@ dependencies = [ [[package]] name = "url" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index c4a37b7d6..a8c19f435 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -18,6 +18,7 @@ derive_builder = "0.12.0" dump = { path = "../dump" } enum-iterator = "1.4.0" file-store = { path = "../file-store" } +flate2 = "1.0.28" log = "0.4.17" meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } @@ -30,6 +31,7 @@ synchronoise = "1.0.1" tempfile = "3.5.0" thiserror = "1.0.40" time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } +ureq = "2.9.1" uuid = { version = "1.3.1", features = ["serde", "v4"] } [dev-dependencies] diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index ddb9e934a..0adda43ff 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -37,6 +37,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { snapshots_path: _, auth_path: _, version_file_path: _, + webhook_url: _, + webhook_authorization_header: _, test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b9b360fa4..296f8add1 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -34,6 +34,7 @@ pub type TaskId = u32; use std::collections::{BTreeMap, HashMap}; use std::fs::File; +use std::io::{self, BufReader, Read}; use std::ops::{Bound, RangeBounds}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; @@ -45,6 +46,8 @@ use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; pub use features::RoFeatures; use file_store::FileStore; +use flate2::bufread::GzEncoder; +use flate2::Compression; use meilisearch_types::error::ResponseError; use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures}; use meilisearch_types::heed::byteorder::BE; @@ -54,6 +57,7 @@ use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; +use meilisearch_types::task_view::TaskView; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use puffin::FrameView; use roaring::RoaringBitmap; @@ -170,8 +174,8 @@ impl ProcessingTasks { } /// Set the processing tasks to an empty list - fn stop_processing(&mut self) { - self.processing = RoaringBitmap::new(); + fn stop_processing(&mut self) -> RoaringBitmap { + std::mem::take(&mut self.processing) } /// Returns `true` if there, at least, is one task that is currently processing that we must stop. @@ -241,6 +245,10 @@ pub struct IndexSchedulerOptions { pub snapshots_path: PathBuf, /// The path to the folder containing the dumps. pub dumps_path: PathBuf, + /// The URL on which we must send the tasks statuses + pub webhook_url: Option, + /// The value we will send into the Authorization HTTP header on the webhook URL + pub webhook_authorization_header: Option, /// The maximum size, in bytes, of the task index. pub task_db_size: usize, /// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index. @@ -323,6 +331,11 @@ pub struct IndexScheduler { /// The maximum number of tasks that will be batched together. pub(crate) max_number_of_batched_tasks: usize, + /// The webhook url we should send tasks to after processing every batches. + pub(crate) webhook_url: Option, + /// The Authorization header to send to the webhook URL. + pub(crate) webhook_authorization_header: Option, + /// A frame to output the indexation profiling files to disk. pub(crate) puffin_frame: Arc, @@ -388,6 +401,8 @@ impl IndexScheduler { dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), version_file_path: self.version_file_path.clone(), + webhook_url: self.webhook_url.clone(), + webhook_authorization_header: self.webhook_authorization_header.clone(), currently_updating_index: self.currently_updating_index.clone(), embedders: self.embedders.clone(), #[cfg(test)] @@ -487,6 +502,8 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, + webhook_url: options.webhook_url, + webhook_authorization_header: options.webhook_authorization_header, currently_updating_index: Arc::new(RwLock::new(None)), embedders: Default::default(), @@ -1251,19 +1268,99 @@ impl IndexScheduler { } } - self.processing_tasks.write().unwrap().stop_processing(); + let processed = self.processing_tasks.write().unwrap().stop_processing(); #[cfg(test)] self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; wtxn.commit().map_err(Error::HeedTransaction)?; + // We shouldn't crash the tick function if we can't send data to the webhook. + let _ = self.notify_webhook(&processed); + #[cfg(test)] self.breakpoint(Breakpoint::AfterProcessing); Ok(TickOutcome::TickAgain(processed_tasks)) } + /// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one. + fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> { + if let Some(ref url) = self.webhook_url { + struct TaskReader<'a, 'b> { + rtxn: &'a RoTxn<'a>, + index_scheduler: &'a IndexScheduler, + tasks: &'b mut roaring::bitmap::Iter<'b>, + buffer: Vec, + written: usize, + } + + impl<'a, 'b> Read for TaskReader<'a, 'b> { + fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { + if self.buffer.is_empty() { + match self.tasks.next() { + None => return Ok(0), + Some(task_id) => { + let task = self + .index_scheduler + .get_task(self.rtxn, task_id) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))? + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + Error::CorruptedTaskQueue, + ) + })?; + + serde_json::to_writer( + &mut self.buffer, + &TaskView::from_task(&task), + )?; + self.buffer.push(b'\n'); + } + } + } + + let mut to_write = &self.buffer[self.written..]; + let wrote = io::copy(&mut to_write, &mut buf)?; + self.written += wrote as usize; + + // we wrote everything and must refresh our buffer on the next call + if self.written == self.buffer.len() { + self.written = 0; + self.buffer.clear(); + } + + Ok(wrote as usize) + } + } + + let rtxn = self.env.read_txn()?; + + let task_reader = TaskReader { + rtxn: &rtxn, + index_scheduler: self, + tasks: &mut updated.into_iter(), + buffer: Vec::with_capacity(50), // on average a task is around ~100 bytes + written: 0, + }; + + // let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); + let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); + let request = ureq::post(url).set("Content-Encoding", "gzip"); + let request = match &self.webhook_authorization_header { + Some(header) => request.set("Authorization", header), + None => request, + }; + + if let Err(e) = request.send(reader) { + log::error!("While sending data to the webhook: {e}"); + } + } + + Ok(()) + } + /// Register a task to cleanup the task queue if needed fn cleanup_task_queue(&self) -> Result<()> { let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; @@ -1677,6 +1774,8 @@ mod tests { indexes_path: tempdir.path().join("indexes"), snapshots_path: tempdir.path().join("snapshots"), dumps_path: tempdir.path().join("dumps"), + webhook_url: None, + webhook_authorization_header: None, task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. enable_mdb_writemap: false, diff --git a/meilisearch-types/src/lib.rs b/meilisearch-types/src/lib.rs index b0762563a..e4f5cbeb4 100644 --- a/meilisearch-types/src/lib.rs +++ b/meilisearch-types/src/lib.rs @@ -9,6 +9,7 @@ pub mod index_uid_pattern; pub mod keys; pub mod settings; pub mod star_or; +pub mod task_view; pub mod tasks; pub mod versioning; pub use milli::{heed, Index}; diff --git a/meilisearch-types/src/task_view.rs b/meilisearch-types/src/task_view.rs new file mode 100644 index 000000000..02be91a88 --- /dev/null +++ b/meilisearch-types/src/task_view.rs @@ -0,0 +1,139 @@ +use serde::Serialize; +use time::{Duration, OffsetDateTime}; + +use crate::error::ResponseError; +use crate::settings::{Settings, Unchecked}; +use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskView { + pub uid: TaskId, + #[serde(default)] + pub index_uid: Option, + pub status: Status, + #[serde(rename = "type")] + pub kind: Kind, + pub canceled_by: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option, + pub error: Option, + #[serde(serialize_with = "serialize_duration", default)] + pub duration: Option, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339::option", default)] + pub started_at: Option, + #[serde(with = "time::serde::rfc3339::option", default)] + pub finished_at: Option, +} + +impl TaskView { + pub fn from_task(task: &Task) -> TaskView { + TaskView { + uid: task.uid, + index_uid: task.index_uid().map(ToOwned::to_owned), + status: task.status, + kind: task.kind.as_kind(), + canceled_by: task.canceled_by, + details: task.details.clone().map(DetailsView::from), + error: task.error.clone(), + duration: task.started_at.zip(task.finished_at).map(|(start, end)| end - start), + enqueued_at: task.enqueued_at, + started_at: task.started_at, + finished_at: task.finished_at, + } + } +} + +#[derive(Default, Debug, PartialEq, Eq, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DetailsView { + #[serde(skip_serializing_if = "Option::is_none")] + pub received_documents: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub indexed_documents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub primary_key: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub provided_ids: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_documents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub matched_tasks: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub canceled_tasks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_tasks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub original_filter: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub dump_uid: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(flatten)] + pub settings: Option>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub swaps: Option>, +} + +impl From
for DetailsView { + fn from(details: Details) -> Self { + match details { + Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => { + DetailsView { + received_documents: Some(received_documents), + indexed_documents: Some(indexed_documents), + ..DetailsView::default() + } + } + Details::SettingsUpdate { settings } => { + DetailsView { settings: Some(settings), ..DetailsView::default() } + } + Details::IndexInfo { primary_key } => { + DetailsView { primary_key: Some(primary_key), ..DetailsView::default() } + } + Details::DocumentDeletion { + provided_ids: received_document_ids, + deleted_documents, + } => DetailsView { + provided_ids: Some(received_document_ids), + deleted_documents: Some(deleted_documents), + original_filter: Some(None), + ..DetailsView::default() + }, + Details::DocumentDeletionByFilter { original_filter, deleted_documents } => { + DetailsView { + provided_ids: Some(0), + original_filter: Some(Some(original_filter)), + deleted_documents: Some(deleted_documents), + ..DetailsView::default() + } + } + Details::ClearAll { deleted_documents } => { + DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() } + } + Details::TaskCancelation { matched_tasks, canceled_tasks, original_filter } => { + DetailsView { + matched_tasks: Some(matched_tasks), + canceled_tasks: Some(canceled_tasks), + original_filter: Some(Some(original_filter)), + ..DetailsView::default() + } + } + Details::TaskDeletion { matched_tasks, deleted_tasks, original_filter } => { + DetailsView { + matched_tasks: Some(matched_tasks), + deleted_tasks: Some(deleted_tasks), + original_filter: Some(Some(original_filter)), + ..DetailsView::default() + } + } + Details::Dump { dump_uid } => { + DetailsView { dump_uid: Some(dump_uid), ..DetailsView::default() } + } + Details::IndexSwap { swaps } => { + DetailsView { swaps: Some(swaps), ..Default::default() } + } + } + } +} diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index ceadbb1f1..5d2423b95 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -104,6 +104,7 @@ walkdir = "2.3.3" yaup = "0.2.1" serde_urlencoded = "0.7.1" termcolor = "1.2.0" +url = { version = "2.5.0", features = ["serde"] } [dev-dependencies] actix-rt = "2.8.0" diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 1ad277c28..86a5eddb9 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -264,6 +264,8 @@ struct Infos { ignore_snapshot_if_db_exists: bool, http_addr: bool, http_payload_size_limit: Byte, + task_queue_webhook: bool, + task_webhook_authorization_header: bool, log_level: String, max_indexing_memory: MaxMemory, max_indexing_threads: MaxThreads, @@ -290,6 +292,8 @@ impl From for Infos { http_addr, master_key: _, env, + task_webhook_url, + task_webhook_authorization_header, max_index_size: _, max_task_db_size: _, http_payload_size_limit, @@ -343,6 +347,8 @@ impl From for Infos { http_addr: http_addr != default_http_addr(), http_payload_size_limit, experimental_max_number_of_batched_tasks, + task_queue_webhook: task_webhook_url.is_some(), + task_webhook_authorization_header: task_webhook_authorization_header.is_some(), log_level: log_level.to_string(), max_indexing_memory, max_indexing_threads, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index e0f488eea..f1111962c 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -228,6 +228,8 @@ fn open_or_create_database_unchecked( indexes_path: opt.db_path.join("indexes"), snapshots_path: opt.snapshot_dir.clone(), dumps_path: opt.dump_dir.clone(), + webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()), + webhook_authorization_header: opt.task_webhook_authorization_header.clone(), task_db_size: opt.max_task_db_size.get_bytes() as usize, index_base_map_size: opt.max_index_size.get_bytes() as usize, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 1ed20f5b5..a0672c9cf 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -21,6 +21,7 @@ use rustls::RootCertStore; use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; use serde::{Deserialize, Serialize}; use sysinfo::{RefreshKind, System, SystemExt}; +use url::Url; const POSSIBLE_ENV: [&str; 2] = ["development", "production"]; @@ -28,6 +29,8 @@ const MEILI_DB_PATH: &str = "MEILI_DB_PATH"; const MEILI_HTTP_ADDR: &str = "MEILI_HTTP_ADDR"; const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY"; const MEILI_ENV: &str = "MEILI_ENV"; +const MEILI_TASK_WEBHOOK_URL: &str = "MEILI_TASK_WEBHOOK_URL"; +const MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER: &str = "MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER"; #[cfg(feature = "analytics")] const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS"; const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT"; @@ -156,6 +159,14 @@ pub struct Opt { #[serde(default = "default_env")] pub env: String, + /// Called whenever a task finishes so a third party can be notified. + #[clap(long, env = MEILI_TASK_WEBHOOK_URL)] + pub task_webhook_url: Option, + + /// The Authorization header to send on the webhook URL whenever a task finishes so a third party can be notified. + #[clap(long, env = MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER)] + pub task_webhook_authorization_header: Option, + /// Deactivates Meilisearch's built-in telemetry when provided. /// /// Meilisearch automatically collects data from all instances that do not opt out using this flag. @@ -375,6 +386,8 @@ impl Opt { http_addr, master_key, env, + task_webhook_url, + task_webhook_authorization_header, max_index_size: _, max_task_db_size: _, http_payload_size_limit, @@ -409,6 +422,16 @@ impl Opt { export_to_env_if_not_present(MEILI_MASTER_KEY, master_key); } export_to_env_if_not_present(MEILI_ENV, env); + if let Some(task_webhook_url) = task_webhook_url { + export_to_env_if_not_present(MEILI_TASK_WEBHOOK_URL, task_webhook_url.to_string()); + } + if let Some(task_webhook_authorization_header) = task_webhook_authorization_header { + export_to_env_if_not_present( + MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER, + task_webhook_authorization_header, + ); + } + #[cfg(feature = "analytics")] { export_to_env_if_not_present(MEILI_NO_ANALYTICS, no_analytics.to_string()); diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index f7d4c44d7..03b63001d 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -8,11 +8,9 @@ use meilisearch_types::deserr::DeserrQueryParamError; use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::{InvalidTaskDateError, ResponseError}; use meilisearch_types::index_uid::IndexUid; -use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::star_or::{OptionStarOr, OptionStarOrList}; -use meilisearch_types::tasks::{ - serialize_duration, Details, IndexSwap, Kind, KindWithContent, Status, Task, -}; +use meilisearch_types::task_view::TaskView; +use meilisearch_types::tasks::{Kind, KindWithContent, Status}; use serde::Serialize; use serde_json::json; use time::format_description::well_known::Rfc3339; @@ -37,140 +35,6 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); } - -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct TaskView { - pub uid: TaskId, - #[serde(default)] - pub index_uid: Option, - pub status: Status, - #[serde(rename = "type")] - pub kind: Kind, - pub canceled_by: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub details: Option, - pub error: Option, - #[serde(serialize_with = "serialize_duration", default)] - pub duration: Option, - #[serde(with = "time::serde::rfc3339")] - pub enqueued_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339::option", default)] - pub started_at: Option, - #[serde(with = "time::serde::rfc3339::option", default)] - pub finished_at: Option, -} - -impl TaskView { - pub fn from_task(task: &Task) -> TaskView { - TaskView { - uid: task.uid, - index_uid: task.index_uid().map(ToOwned::to_owned), - status: task.status, - kind: task.kind.as_kind(), - canceled_by: task.canceled_by, - details: task.details.clone().map(DetailsView::from), - error: task.error.clone(), - duration: task.started_at.zip(task.finished_at).map(|(start, end)| end - start), - enqueued_at: task.enqueued_at, - started_at: task.started_at, - finished_at: task.finished_at, - } - } -} - -#[derive(Default, Debug, PartialEq, Eq, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DetailsView { - #[serde(skip_serializing_if = "Option::is_none")] - pub received_documents: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub indexed_documents: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub primary_key: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub provided_ids: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_documents: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub matched_tasks: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub canceled_tasks: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_tasks: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub original_filter: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub dump_uid: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(flatten)] - pub settings: Option>>, - #[serde(skip_serializing_if = "Option::is_none")] - pub swaps: Option>, -} - -impl From
for DetailsView { - fn from(details: Details) -> Self { - match details { - Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => { - DetailsView { - received_documents: Some(received_documents), - indexed_documents: Some(indexed_documents), - ..DetailsView::default() - } - } - Details::SettingsUpdate { settings } => { - DetailsView { settings: Some(settings), ..DetailsView::default() } - } - Details::IndexInfo { primary_key } => { - DetailsView { primary_key: Some(primary_key), ..DetailsView::default() } - } - Details::DocumentDeletion { - provided_ids: received_document_ids, - deleted_documents, - } => DetailsView { - provided_ids: Some(received_document_ids), - deleted_documents: Some(deleted_documents), - original_filter: Some(None), - ..DetailsView::default() - }, - Details::DocumentDeletionByFilter { original_filter, deleted_documents } => { - DetailsView { - provided_ids: Some(0), - original_filter: Some(Some(original_filter)), - deleted_documents: Some(deleted_documents), - ..DetailsView::default() - } - } - Details::ClearAll { deleted_documents } => { - DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() } - } - Details::TaskCancelation { matched_tasks, canceled_tasks, original_filter } => { - DetailsView { - matched_tasks: Some(matched_tasks), - canceled_tasks: Some(canceled_tasks), - original_filter: Some(Some(original_filter)), - ..DetailsView::default() - } - } - Details::TaskDeletion { matched_tasks, deleted_tasks, original_filter } => { - DetailsView { - matched_tasks: Some(matched_tasks), - deleted_tasks: Some(deleted_tasks), - original_filter: Some(Some(original_filter)), - ..DetailsView::default() - } - } - Details::Dump { dump_uid } => { - DetailsView { dump_uid: Some(dump_uid), ..DetailsView::default() } - } - Details::IndexSwap { swaps } => { - DetailsView { swaps: Some(swaps), ..Default::default() } - } - } - } -} - #[derive(Debug, Deserr)] #[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)] pub struct TasksFilterQuery { diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index 7a5fa6388..ed387224e 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1,4 +1,5 @@ mod errors; +mod webhook; use meili_snap::insta::assert_json_snapshot; use time::format_description::well_known::Rfc3339; diff --git a/meilisearch/tests/tasks/webhook.rs b/meilisearch/tests/tasks/webhook.rs new file mode 100644 index 000000000..6979ff294 --- /dev/null +++ b/meilisearch/tests/tasks/webhook.rs @@ -0,0 +1,123 @@ +//! To test the webhook, we need to spawn a new server with a URL listening for +//! post requests. The webhook handle starts a server and forwards all the +//! received requests into a channel for you to handle. + +use std::sync::Arc; + +use actix_http::body::MessageBody; +use actix_web::dev::{ServiceFactory, ServiceResponse}; +use actix_web::web::{Bytes, Data}; +use actix_web::{post, App, HttpResponse, HttpServer}; +use meili_snap::{json_string, snapshot}; +use meilisearch::Opt; +use tokio::sync::mpsc; +use url::Url; + +use crate::common::{default_settings, Server}; +use crate::json; + +#[post("/")] +async fn forward_body(sender: Data>>, body: Bytes) -> HttpResponse { + let body = body.to_vec(); + sender.send(body).unwrap(); + HttpResponse::Ok().into() +} + +fn create_app( + sender: Arc>>, +) -> actix_web::App< + impl ServiceFactory< + actix_web::dev::ServiceRequest, + Config = (), + Response = ServiceResponse, + Error = actix_web::Error, + InitError = (), + >, +> { + App::new().service(forward_body).app_data(Data::from(sender)) +} + +struct WebhookHandle { + pub server_handle: tokio::task::JoinHandle>, + pub url: String, + pub receiver: mpsc::UnboundedReceiver>, +} + +async fn create_webhook_server() -> WebhookHandle { + let mut log_builder = env_logger::Builder::new(); + log_builder.parse_filters("info"); + log_builder.init(); + + let (sender, receiver) = mpsc::unbounded_channel(); + let sender = Arc::new(sender); + + // By listening on the port 0, the system will give us any available port. + let server = + HttpServer::new(move || create_app(sender.clone())).bind(("127.0.0.1", 0)).unwrap(); + let (ip, scheme) = server.addrs_with_scheme()[0]; + let url = format!("{scheme}://{ip}/"); + + let server_handle = tokio::spawn(server.run()); + WebhookHandle { server_handle, url, receiver } +} + +#[actix_web::test] +async fn test_basic_webhook() { + let WebhookHandle { server_handle, url, mut receiver } = create_webhook_server().await; + + let db_path = tempfile::tempdir().unwrap(); + let server = Server::new_with_options(Opt { + task_webhook_url: Some(Url::parse(&url).unwrap()), + ..default_settings(db_path.path()) + }) + .await + .unwrap(); + + let index = server.index("tamo"); + // May be flaky: we're relying on the fact that while the first document addition is processed, the other + // operations will be received and will be batched together. If it doesn't happen it's not a problem + // the rest of the test won't assume anything about the number of tasks per batch. + for i in 0..5 { + let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await; + } + + let mut nb_tasks = 0; + while let Some(payload) = receiver.recv().await { + let payload = String::from_utf8(payload).unwrap(); + let jsonl = payload.split('\n'); + for json in jsonl { + if json.is_empty() { + break; // we reached EOF + } + nb_tasks += 1; + let json: serde_json::Value = serde_json::from_str(json).unwrap(); + snapshot!( + json_string!(json, { ".uid" => "[uid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }), + @r###" + { + "uid": "[uid]", + "indexUid": "tamo", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "error": null, + "duration": "[duration]", + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]" + } + "###); + } + if nb_tasks == 5 { + break; + } + } + + assert!(nb_tasks == 5, "We should have received the 5 tasks but only received {nb_tasks}"); + + server_handle.abort(); +}