From 10ab5f6a588409d514572360220a03589e675a5f Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 16 Nov 2022 13:06:10 +0100 Subject: [PATCH] implements the analytics on the health and version routes --- .../src/analytics/mock_analytics.rs | 1 + meilisearch-http/src/analytics/mod.rs | 3 + .../src/analytics/segment_analytics.rs | 64 +++++++++++++++++++ meilisearch-http/src/routes/mod.rs | 11 +++- 4 files changed, 78 insertions(+), 1 deletion(-) diff --git a/meilisearch-http/src/analytics/mock_analytics.rs b/meilisearch-http/src/analytics/mock_analytics.rs index 82460be72..3ab13cd34 100644 --- a/meilisearch-http/src/analytics/mock_analytics.rs +++ b/meilisearch-http/src/analytics/mock_analytics.rs @@ -57,4 +57,5 @@ impl Analytics for MockAnalytics { _request: &HttpRequest, ) { } + fn health_seen(&self, _request: &HttpRequest) {} } diff --git a/meilisearch-http/src/analytics/mod.rs b/meilisearch-http/src/analytics/mod.rs index 2fe5d81a4..734efff5d 100644 --- a/meilisearch-http/src/analytics/mod.rs +++ b/meilisearch-http/src/analytics/mod.rs @@ -91,4 +91,7 @@ pub trait Analytics: Sync + Send { index_creation: bool, request: &HttpRequest, ); + + // this method should be called to aggregate a add documents request + fn health_seen(&self, request: &HttpRequest); } diff --git a/meilisearch-http/src/analytics/segment_analytics.rs b/meilisearch-http/src/analytics/segment_analytics.rs index 4a9d7aee7..b0dbe13f7 100644 --- a/meilisearch-http/src/analytics/segment_analytics.rs +++ b/meilisearch-http/src/analytics/segment_analytics.rs @@ -70,6 +70,7 @@ pub enum AnalyticsMsg { AggregateAddDocuments(DocumentsAggregator), AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateUpdateDocuments(DocumentsAggregator), + AggregateHealth(HealthAggregator), } pub struct SegmentAnalytics { @@ -130,6 +131,7 @@ impl SegmentAnalytics { add_documents_aggregator: DocumentsAggregator::default(), delete_documents_aggregator: DocumentsDeletionAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(), + health_aggregator: HealthAggregator::default(), }); tokio::spawn(segment.run(index_scheduler.clone())); @@ -189,6 +191,11 @@ impl super::Analytics for SegmentAnalytics { let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request); let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); } + + fn health_seen(&self, request: &HttpRequest) { + let aggregate = HealthAggregator::from_query(request); + let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate)); + } } /// This structure represent the `infos` field we send in the analytics. @@ -319,6 +326,7 @@ pub struct Segment { add_documents_aggregator: DocumentsAggregator, delete_documents_aggregator: DocumentsDeletionAggregator, update_documents_aggregator: DocumentsAggregator, + health_aggregator: HealthAggregator, } impl Segment { @@ -374,6 +382,7 @@ impl Segment { Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), + Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg), None => (), } } @@ -407,6 +416,8 @@ impl Segment { .into_event(&self.user, "Documents Deleted"); let update_documents = std::mem::take(&mut self.update_documents_aggregator) .into_event(&self.user, "Documents Updated"); + let health = + std::mem::take(&mut self.health_aggregator).into_event(&self.user, "Health Seen"); if let Some(get_search) = get_search { let _ = self.batcher.push(get_search).await; @@ -423,6 +434,9 @@ impl Segment { if let Some(update_documents) = update_documents { let _ = self.batcher.push(update_documents).await; } + if let Some(health) = health { + let _ = self.batcher.push(health).await; + } let _ = self.batcher.flush().await; } } @@ -825,3 +839,53 @@ impl DocumentsDeletionAggregator { }) } } + +#[derive(Default, Serialize)] +pub struct HealthAggregator { + #[serde(skip)] + timestamp: Option, + + // context + #[serde(rename = "user-agent")] + user_agents: HashSet, + + total_received: usize, +} + +impl HealthAggregator { + pub fn from_query(request: &HttpRequest) -> Self { + let mut ret = Self::default(); + ret.timestamp = Some(OffsetDateTime::now_utc()); + + ret.user_agents = extract_user_agents(request).into_iter().collect(); + ret.total_received = 1; + ret + } + + /// Aggregate one [DocumentsAggregator] into another. + pub fn aggregate(&mut self, other: Self) { + if self.timestamp.is_none() { + self.timestamp = other.timestamp; + } + + // we can't create a union because there is no `into_union` method + for user_agent in other.user_agents { + self.user_agents.insert(user_agent); + } + self.total_received = self.total_received.saturating_add(other.total_received); + } + + pub fn into_event(self, user: &User, event_name: &str) -> Option { + // if we had no timestamp it means we never encountered any events and + // thus we don't need to send this event. + let timestamp = self.timestamp?; + + Some(Track { + timestamp: Some(timestamp), + user: user.clone(), + event: event_name.to_string(), + properties: serde_json::to_value(self).ok()?, + ..Default::default() + }) + } +} diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 8cf4af718..658b30449 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -308,7 +308,11 @@ struct VersionResponse { async fn get_version( _index_scheduler: GuardedData, Data>, + req: HttpRequest, + analytics: web::Data, ) -> HttpResponse { + analytics.publish("Version Seen".to_string(), json!(null), Some(&req)); + let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"); let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown"); @@ -325,6 +329,11 @@ struct KeysResponse { public: Option, } -pub async fn get_health() -> Result { +pub async fn get_health( + req: HttpRequest, + analytics: web::Data, +) -> Result { + analytics.health_seen(&req); + Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "available" }))) }