implements the analytics on the health and version routes

This commit is contained in:
Tamo 2022-11-16 13:06:10 +01:00
parent 684b90066d
commit 10ab5f6a58
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
4 changed files with 78 additions and 1 deletions

View File

@ -57,4 +57,5 @@ impl Analytics for MockAnalytics {
_request: &HttpRequest, _request: &HttpRequest,
) { ) {
} }
fn health_seen(&self, _request: &HttpRequest) {}
} }

View File

@ -91,4 +91,7 @@ pub trait Analytics: Sync + Send {
index_creation: bool, index_creation: bool,
request: &HttpRequest, request: &HttpRequest,
); );
// this method should be called to aggregate a add documents request
fn health_seen(&self, request: &HttpRequest);
} }

View File

@ -70,6 +70,7 @@ pub enum AnalyticsMsg {
AggregateAddDocuments(DocumentsAggregator), AggregateAddDocuments(DocumentsAggregator),
AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateDeleteDocuments(DocumentsDeletionAggregator),
AggregateUpdateDocuments(DocumentsAggregator), AggregateUpdateDocuments(DocumentsAggregator),
AggregateHealth(HealthAggregator),
} }
pub struct SegmentAnalytics { pub struct SegmentAnalytics {
@ -130,6 +131,7 @@ impl SegmentAnalytics {
add_documents_aggregator: DocumentsAggregator::default(), add_documents_aggregator: DocumentsAggregator::default(),
delete_documents_aggregator: DocumentsDeletionAggregator::default(), delete_documents_aggregator: DocumentsDeletionAggregator::default(),
update_documents_aggregator: DocumentsAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(),
health_aggregator: HealthAggregator::default(),
}); });
tokio::spawn(segment.run(index_scheduler.clone())); tokio::spawn(segment.run(index_scheduler.clone()));
@ -189,6 +191,11 @@ impl super::Analytics for SegmentAnalytics {
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request); let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
} }
fn health_seen(&self, request: &HttpRequest) {
let aggregate = HealthAggregator::from_query(request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate));
}
} }
/// This structure represent the `infos` field we send in the analytics. /// This structure represent the `infos` field we send in the analytics.
@ -319,6 +326,7 @@ pub struct Segment {
add_documents_aggregator: DocumentsAggregator, add_documents_aggregator: DocumentsAggregator,
delete_documents_aggregator: DocumentsDeletionAggregator, delete_documents_aggregator: DocumentsDeletionAggregator,
update_documents_aggregator: DocumentsAggregator, update_documents_aggregator: DocumentsAggregator,
health_aggregator: HealthAggregator,
} }
impl Segment { impl Segment {
@ -374,6 +382,7 @@ impl Segment {
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
None => (), None => (),
} }
} }
@ -407,6 +416,8 @@ impl Segment {
.into_event(&self.user, "Documents Deleted"); .into_event(&self.user, "Documents Deleted");
let update_documents = std::mem::take(&mut self.update_documents_aggregator) let update_documents = std::mem::take(&mut self.update_documents_aggregator)
.into_event(&self.user, "Documents Updated"); .into_event(&self.user, "Documents Updated");
let health =
std::mem::take(&mut self.health_aggregator).into_event(&self.user, "Health Seen");
if let Some(get_search) = get_search { if let Some(get_search) = get_search {
let _ = self.batcher.push(get_search).await; let _ = self.batcher.push(get_search).await;
@ -423,6 +434,9 @@ impl Segment {
if let Some(update_documents) = update_documents { if let Some(update_documents) = update_documents {
let _ = self.batcher.push(update_documents).await; let _ = self.batcher.push(update_documents).await;
} }
if let Some(health) = health {
let _ = self.batcher.push(health).await;
}
let _ = self.batcher.flush().await; let _ = self.batcher.flush().await;
} }
} }
@ -825,3 +839,53 @@ impl DocumentsDeletionAggregator {
}) })
} }
} }
#[derive(Default, Serialize)]
pub struct HealthAggregator {
#[serde(skip)]
timestamp: Option<OffsetDateTime>,
// context
#[serde(rename = "user-agent")]
user_agents: HashSet<String>,
total_received: usize,
}
impl HealthAggregator {
pub fn from_query(request: &HttpRequest) -> Self {
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.user_agents = extract_user_agents(request).into_iter().collect();
ret.total_received = 1;
ret
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents {
self.user_agents.insert(user_agent);
}
self.total_received = self.total_received.saturating_add(other.total_received);
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
// if we had no timestamp it means we never encountered any events and
// thus we don't need to send this event.
let timestamp = self.timestamp?;
Some(Track {
timestamp: Some(timestamp),
user: user.clone(),
event: event_name.to_string(),
properties: serde_json::to_value(self).ok()?,
..Default::default()
})
}
}

View File

@ -308,7 +308,11 @@ struct VersionResponse {
async fn get_version( async fn get_version(
_index_scheduler: GuardedData<ActionPolicy<{ actions::VERSION }>, Data<IndexScheduler>>, _index_scheduler: GuardedData<ActionPolicy<{ actions::VERSION }>, Data<IndexScheduler>>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> HttpResponse { ) -> HttpResponse {
analytics.publish("Version Seen".to_string(), json!(null), Some(&req));
let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"); let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown"); let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown");
@ -325,6 +329,11 @@ struct KeysResponse {
public: Option<String>, public: Option<String>,
} }
pub async fn get_health() -> Result<HttpResponse, ResponseError> { pub async fn get_health(
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.health_seen(&req);
Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "available" }))) Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "available" })))
} }