From f1884d691066db5cd151d4ff97b50086fd29c35b Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 16 Nov 2022 18:44:58 +0100 Subject: [PATCH] batch the tasks seen events --- .../src/analytics/mock_analytics.rs | 2 + meilisearch-http/src/analytics/mod.rs | 4 + .../src/analytics/segment_analytics.rs | 100 ++++++++++++++++++ meilisearch-http/src/routes/mod.rs | 2 +- meilisearch-http/src/routes/tasks.rs | 54 ++++------ 5 files changed, 126 insertions(+), 36 deletions(-) diff --git a/meilisearch-http/src/analytics/mock_analytics.rs b/meilisearch-http/src/analytics/mock_analytics.rs index 3ab13cd34..ad45a1ac8 100644 --- a/meilisearch-http/src/analytics/mock_analytics.rs +++ b/meilisearch-http/src/analytics/mock_analytics.rs @@ -7,6 +7,7 @@ use serde_json::Value; use super::{find_user_id, Analytics, DocumentDeletionKind}; use crate::routes::indexes::documents::UpdateDocumentsQuery; +use crate::routes::tasks::TasksFilterQueryRaw; use crate::Opt; pub struct MockAnalytics { @@ -57,5 +58,6 @@ impl Analytics for MockAnalytics { _request: &HttpRequest, ) { } + fn get_tasks(&self, _query: &TasksFilterQueryRaw, _request: &HttpRequest) {} fn health_seen(&self, _request: &HttpRequest) {} } diff --git a/meilisearch-http/src/analytics/mod.rs b/meilisearch-http/src/analytics/mod.rs index 734efff5d..46c4b2090 100644 --- a/meilisearch-http/src/analytics/mod.rs +++ b/meilisearch-http/src/analytics/mod.rs @@ -15,6 +15,7 @@ use platform_dirs::AppDirs; use serde_json::Value; use crate::routes::indexes::documents::UpdateDocumentsQuery; +use crate::routes::tasks::TasksFilterQueryRaw; // if we are in debug mode OR the analytics feature is disabled // the `SegmentAnalytics` point to the mock instead of the real analytics @@ -92,6 +93,9 @@ pub trait Analytics: Sync + Send { request: &HttpRequest, ); + // this method should be called to aggregate the get tasks requests. + fn get_tasks(&self, query: &TasksFilterQueryRaw, request: &HttpRequest); + // this method should be called to aggregate a add documents request fn health_seen(&self, request: &HttpRequest); } diff --git a/meilisearch-http/src/analytics/segment_analytics.rs b/meilisearch-http/src/analytics/segment_analytics.rs index b0dbe13f7..1bed8a47d 100644 --- a/meilisearch-http/src/analytics/segment_analytics.rs +++ b/meilisearch-http/src/analytics/segment_analytics.rs @@ -27,6 +27,7 @@ use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH}; use crate::analytics::Analytics; use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, SchedulerConfig}; use crate::routes::indexes::documents::UpdateDocumentsQuery; +use crate::routes::tasks::TasksFilterQueryRaw; use crate::routes::{create_all_stats, Stats}; use crate::search::{ SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, @@ -70,6 +71,7 @@ pub enum AnalyticsMsg { AggregateAddDocuments(DocumentsAggregator), AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateUpdateDocuments(DocumentsAggregator), + AggregateTasks(TasksAggregator), AggregateHealth(HealthAggregator), } @@ -131,6 +133,7 @@ impl SegmentAnalytics { add_documents_aggregator: DocumentsAggregator::default(), delete_documents_aggregator: DocumentsDeletionAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(), + get_tasks_aggregator: TasksAggregator::default(), health_aggregator: HealthAggregator::default(), }); tokio::spawn(segment.run(index_scheduler.clone())); @@ -192,6 +195,11 @@ impl super::Analytics for SegmentAnalytics { let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); } + fn get_tasks(&self, query: &TasksFilterQueryRaw, request: &HttpRequest) { + let aggregate = TasksAggregator::from_query(query, request); + let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate)); + } + fn health_seen(&self, request: &HttpRequest) { let aggregate = HealthAggregator::from_query(request); let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate)); @@ -326,6 +334,7 @@ pub struct Segment { add_documents_aggregator: DocumentsAggregator, delete_documents_aggregator: DocumentsDeletionAggregator, update_documents_aggregator: DocumentsAggregator, + get_tasks_aggregator: TasksAggregator, health_aggregator: HealthAggregator, } @@ -382,6 +391,7 @@ impl Segment { Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), + Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg), None => (), } @@ -416,6 +426,8 @@ impl Segment { .into_event(&self.user, "Documents Deleted"); let update_documents = std::mem::take(&mut self.update_documents_aggregator) .into_event(&self.user, "Documents Updated"); + let get_tasks = + std::mem::take(&mut self.get_tasks_aggregator).into_event(&self.user, "Tasks Seen"); let health = std::mem::take(&mut self.health_aggregator).into_event(&self.user, "Health Seen"); @@ -434,6 +446,9 @@ impl Segment { if let Some(update_documents) = update_documents { let _ = self.batcher.push(update_documents).await; } + if let Some(get_tasks) = get_tasks { + let _ = self.batcher.push(get_tasks).await; + } if let Some(health) = health { let _ = self.batcher.push(health).await; } @@ -840,6 +855,91 @@ impl DocumentsDeletionAggregator { } } +#[derive(Default, Serialize)] +pub struct TasksAggregator { + #[serde(skip)] + timestamp: Option, + + // context + #[serde(rename = "user-agent")] + user_agents: HashSet, + + filtered_by_uid: bool, + filtered_by_index_uid: bool, + filtered_by_type: bool, + filtered_by_status: bool, + filtered_by_canceled_by: bool, + filtered_by_before_enqueued_at: bool, + filtered_by_after_enqueued_at: bool, + filtered_by_before_started_at: bool, + filtered_by_after_started_at: bool, + filtered_by_before_finished_at: bool, + filtered_by_after_finished_at: bool, + total_received: usize, +} + +impl TasksAggregator { + pub fn from_query(query: &TasksFilterQueryRaw, request: &HttpRequest) -> Self { + Self { + timestamp: Some(OffsetDateTime::now_utc()), + user_agents: extract_user_agents(request).into_iter().collect(), + filtered_by_uid: query.common.uids.is_some(), + filtered_by_index_uid: query.common.index_uids.is_some(), + filtered_by_type: query.common.types.is_some(), + filtered_by_status: query.common.statuses.is_some(), + filtered_by_canceled_by: query.common.canceled_by.is_some(), + filtered_by_before_enqueued_at: query.dates.before_enqueued_at.is_some(), + filtered_by_after_enqueued_at: query.dates.after_enqueued_at.is_some(), + filtered_by_before_started_at: query.dates.before_started_at.is_some(), + filtered_by_after_started_at: query.dates.after_started_at.is_some(), + filtered_by_before_finished_at: query.dates.before_finished_at.is_some(), + filtered_by_after_finished_at: query.dates.after_finished_at.is_some(), + total_received: 1, + } + } + + /// Aggregate one [DocumentsAggregator] into another. + pub fn aggregate(&mut self, other: Self) { + if self.timestamp.is_none() { + self.timestamp = other.timestamp; + } + + // we can't create a union because there is no `into_union` method + for user_agent in other.user_agents { + self.user_agents.insert(user_agent); + } + + self.filtered_by_uid |= other.filtered_by_uid; + self.filtered_by_index_uid |= other.filtered_by_index_uid; + self.filtered_by_type |= other.filtered_by_type; + self.filtered_by_status |= other.filtered_by_status; + self.filtered_by_canceled_by |= other.filtered_by_canceled_by; + self.filtered_by_before_enqueued_at |= other.filtered_by_before_enqueued_at; + self.filtered_by_after_enqueued_at |= other.filtered_by_after_enqueued_at; + self.filtered_by_before_started_at |= other.filtered_by_before_started_at; + self.filtered_by_after_started_at |= other.filtered_by_after_started_at; + self.filtered_by_before_finished_at |= other.filtered_by_before_finished_at; + self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at; + self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at; + + self.total_received = self.total_received.saturating_add(other.total_received); + } + + pub fn into_event(self, user: &User, event_name: &str) -> Option { + // if we had no timestamp it means we never encountered any events and + // thus we don't need to send this event. + let timestamp = self.timestamp?; + + Some(Track { + timestamp: Some(timestamp), + user: user.clone(), + event: event_name.to_string(), + properties: serde_json::to_value(self).ok()?, + ..Default::default() + }) + } +} + #[derive(Default, Serialize)] pub struct HealthAggregator { #[serde(skip)] diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 658b30449..9fcb1c4b7 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -21,7 +21,7 @@ mod api_key; mod dump; pub mod indexes; mod swap_indexes; -mod tasks; +pub mod tasks; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::scope("/tasks").configure(tasks::configure)) diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 3419e6e8a..7002f290b 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -162,11 +162,11 @@ impl From
for DetailsView { #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct TaskCommonQueryRaw { - uids: Option>, - canceled_by: Option>, - types: Option>>, - statuses: Option>>, - index_uids: Option>>, + pub uids: Option>, + pub canceled_by: Option>, + pub types: Option>>, + pub statuses: Option>>, + pub index_uids: Option>>, } impl TaskCommonQueryRaw { fn validate(self) -> Result { @@ -261,12 +261,12 @@ impl TaskCommonQueryRaw { #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct TaskDateQueryRaw { - after_enqueued_at: Option, - before_enqueued_at: Option, - after_started_at: Option, - before_started_at: Option, - after_finished_at: Option, - before_finished_at: Option, + pub after_enqueued_at: Option, + pub before_enqueued_at: Option, + pub after_started_at: Option, + pub before_started_at: Option, + pub after_finished_at: Option, + pub before_finished_at: Option, } impl TaskDateQueryRaw { fn validate(self) -> Result { @@ -339,21 +339,21 @@ impl TaskDateQueryRaw { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct TasksFilterQueryRaw { #[serde(flatten)] - common: TaskCommonQueryRaw, + pub common: TaskCommonQueryRaw, #[serde(default = "DEFAULT_LIMIT")] - limit: u32, - from: Option, + pub limit: u32, + pub from: Option, #[serde(flatten)] - dates: TaskDateQueryRaw, + pub dates: TaskDateQueryRaw, } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct TaskDeletionOrCancelationQueryRaw { #[serde(flatten)] - common: TaskCommonQueryRaw, + pub common: TaskCommonQueryRaw, #[serde(flatten)] - dates: TaskDateQueryRaw, + pub dates: TaskDateQueryRaw, } impl TasksFilterQueryRaw { @@ -597,6 +597,8 @@ async fn get_tasks( req: HttpRequest, analytics: web::Data, ) -> Result { + analytics.get_tasks(¶ms, &req); + let TasksFilterQuery { common: TaskCommonQuery { types, uids, canceled_by, statuses, index_uids }, limit, @@ -612,24 +614,6 @@ async fn get_tasks( }, } = params.into_inner().validate()?; - analytics.publish( - "Tasks Seen".to_string(), - json!({ - "filtered_by_uid": uids.is_some(), - "filtered_by_index_uid": index_uids.is_some(), - "filtered_by_type": types.is_some(), - "filtered_by_status": statuses.is_some(), - "filtered_by_canceled_by": canceled_by.is_some(), - "filtered_by_before_enqueued_at": before_enqueued_at.is_some(), - "filtered_by_after_enqueued_at": after_enqueued_at.is_some(), - "filtered_by_before_started_at": before_started_at.is_some(), - "filtered_by_after_started_at": after_started_at.is_some(), - "filtered_by_before_finished_at": before_finished_at.is_some(), - "filtered_by_after_finished_at": after_finished_at.is_some(), - }), - Some(&req), - ); - // We +1 just to know if there is more after this "page" or not. let limit = limit.saturating_add(1);