From ef77c7699b21422b4857878d072494e1bfc49d6b Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 17 Oct 2024 09:06:23 +0200 Subject: [PATCH] add the required shared values between all the events and fix the timestamp --- meilisearch/src/analytics/mod.rs | 6 +- .../src/analytics/segment_analytics.rs | 75 +++++++++++++------ 2 files changed, 57 insertions(+), 24 deletions(-) diff --git a/meilisearch/src/analytics/mod.rs b/meilisearch/src/analytics/mod.rs index 91139e1dd..a3b8d6d1d 100644 --- a/meilisearch/src/analytics/mod.rs +++ b/meilisearch/src/analytics/mod.rs @@ -166,8 +166,8 @@ impl Analytics { /// The method used to publish most analytics that do not need to be batched every hours pub fn publish(&self, event: T, request: &HttpRequest) { - let Some(ref segment) = self.segment else { return }; - let user_agents = extract_user_agents(request); - let _ = segment.sender.try_send(segment_analytics::Message::new(event)); + if let Some(ref segment) = self.segment { + let _ = segment.sender.try_send(segment_analytics::Message::new(event, request)); + } } } diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 3496853ff..00a3adaaf 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -28,7 +28,6 @@ use super::{ config_user_id_path, Aggregate, AggregateMethod, DocumentDeletionKind, DocumentFetchKind, MEILISEARCH_CONFIG_PATH, }; -use crate::analytics::Analytics; use crate::option::{ default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot, }; @@ -58,7 +57,7 @@ fn write_user_id(db_path: &Path, user_id: &InstanceUid) { const SEGMENT_API_KEY: &str = "P3FWhhEsJiEDCuEHpmcN9DHcK4hVfBvb"; -pub fn extract_user_agents(request: &HttpRequest) -> Vec { +pub fn extract_user_agents(request: &HttpRequest) -> HashSet { request .headers() .get(ANALYTICS_HEADER) @@ -77,14 +76,26 @@ pub struct Message { type_id: TypeId, // Same for the aggregate function. aggregator_function: fn(Box, Box) -> Option>, - event: Box, + event: Event, +} + +pub struct Event { + original: Box, + timestamp: OffsetDateTime, + user_agents: HashSet, + total: usize, } impl Message { - pub fn new(event: T) -> Self { + pub fn new(event: T, request: &HttpRequest) -> Self { Self { type_id: TypeId::of::(), - event: Box::new(event), + event: Event { + original: Box::new(event), + timestamp: OffsetDateTime::now_utc(), + user_agents: extract_user_agents(request), + total: 1, + }, aggregator_function: T::downcast_aggregate, } } @@ -400,7 +411,7 @@ pub struct Segment { user: User, opt: Opt, batcher: AutoBatcher, - events: HashMap>, + events: HashMap, } impl Segment { @@ -451,22 +462,34 @@ impl Segment { _ = interval.tick() => { self.tick(index_scheduler.clone(), auth_controller.clone()).await; }, - msg = self.inbox.recv() => { - match msg { - Some(Message { type_id, event, aggregator_function }) => { - let new_event = match self.events.remove(&type_id) { - Some(old) => (aggregator_function)(old, event).unwrap(), - None => event, - }; - self.events.insert(type_id, new_event); - }, - None => (), - } - } + Some(msg) = self.inbox.recv() => { + self.handle_msg(msg); + } } } } + fn handle_msg(&mut self, Message { type_id, aggregator_function, event }: Message) { + let new_event = match self.events.remove(&type_id) { + Some(old) => { + // The function should never fail since we retrieved the corresponding TypeId in the map. But in the unfortunate + // case it could happens we're going to silently ignore the error + let Some(original) = (aggregator_function)(old.original, event.original) else { + return; + }; + Event { + original, + // We always want to return the FIRST timestamp ever encountered + timestamp: old.timestamp, + user_agents: old.user_agents.union(&event.user_agents).cloned().collect(), + total: old.total.saturating_add(event.total), + } + } + None => event, + }; + self.events.insert(type_id, new_event); + } + async fn tick( &mut self, index_scheduler: Arc, @@ -503,11 +526,21 @@ impl Segment { let events = std::mem::take(&mut self.events); for (_, event) in events { + let Event { original, timestamp, user_agents, total } = event; + let name = original.event_name(); + let mut properties = original.into_event(); + if properties["user-agent"].is_null() { + properties["user-agent"] = json!(user_agents); + }; + if properties["requests"]["total_received"].is_null() { + properties["requests"]["total_received"] = total.into(); + }; + self.batcher.push(Track { user: self.user.clone(), - event: event.event_name().to_string(), - properties: event.into_event(), - timestamp: todo!(), + event: name.to_string(), + properties, + timestamp: Some(timestamp), ..Default::default() }); }