mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-30 00:55:00 +08:00
Merge #3065
3065: Implement the analytics on the health and version routes r=Kerollmops a=irevoire Fix https://github.com/meilisearch/meilisearch/issues/2955 Must be merged after https://github.com/meilisearch/meilisearch/pull/3063 Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
b4d0403518
@ -57,4 +57,5 @@ impl Analytics for MockAnalytics {
|
|||||||
_request: &HttpRequest,
|
_request: &HttpRequest,
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
fn health_seen(&self, _request: &HttpRequest) {}
|
||||||
}
|
}
|
||||||
|
@ -91,4 +91,7 @@ pub trait Analytics: Sync + Send {
|
|||||||
index_creation: bool,
|
index_creation: bool,
|
||||||
request: &HttpRequest,
|
request: &HttpRequest,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// this method should be called to aggregate a add documents request
|
||||||
|
fn health_seen(&self, request: &HttpRequest);
|
||||||
}
|
}
|
||||||
|
@ -70,6 +70,7 @@ pub enum AnalyticsMsg {
|
|||||||
AggregateAddDocuments(DocumentsAggregator),
|
AggregateAddDocuments(DocumentsAggregator),
|
||||||
AggregateDeleteDocuments(DocumentsDeletionAggregator),
|
AggregateDeleteDocuments(DocumentsDeletionAggregator),
|
||||||
AggregateUpdateDocuments(DocumentsAggregator),
|
AggregateUpdateDocuments(DocumentsAggregator),
|
||||||
|
AggregateHealth(HealthAggregator),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SegmentAnalytics {
|
pub struct SegmentAnalytics {
|
||||||
@ -130,6 +131,7 @@ impl SegmentAnalytics {
|
|||||||
add_documents_aggregator: DocumentsAggregator::default(),
|
add_documents_aggregator: DocumentsAggregator::default(),
|
||||||
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
|
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
|
||||||
update_documents_aggregator: DocumentsAggregator::default(),
|
update_documents_aggregator: DocumentsAggregator::default(),
|
||||||
|
health_aggregator: HealthAggregator::default(),
|
||||||
});
|
});
|
||||||
tokio::spawn(segment.run(index_scheduler.clone()));
|
tokio::spawn(segment.run(index_scheduler.clone()));
|
||||||
|
|
||||||
@ -189,6 +191,11 @@ impl super::Analytics for SegmentAnalytics {
|
|||||||
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
|
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
|
||||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
|
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn health_seen(&self, request: &HttpRequest) {
|
||||||
|
let aggregate = HealthAggregator::from_query(request);
|
||||||
|
let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This structure represent the `infos` field we send in the analytics.
|
/// This structure represent the `infos` field we send in the analytics.
|
||||||
@ -319,6 +326,7 @@ pub struct Segment {
|
|||||||
add_documents_aggregator: DocumentsAggregator,
|
add_documents_aggregator: DocumentsAggregator,
|
||||||
delete_documents_aggregator: DocumentsDeletionAggregator,
|
delete_documents_aggregator: DocumentsDeletionAggregator,
|
||||||
update_documents_aggregator: DocumentsAggregator,
|
update_documents_aggregator: DocumentsAggregator,
|
||||||
|
health_aggregator: HealthAggregator,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Segment {
|
impl Segment {
|
||||||
@ -374,6 +382,7 @@ impl Segment {
|
|||||||
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
|
||||||
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
|
||||||
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
|
||||||
|
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
|
||||||
None => (),
|
None => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -407,6 +416,8 @@ impl Segment {
|
|||||||
.into_event(&self.user, "Documents Deleted");
|
.into_event(&self.user, "Documents Deleted");
|
||||||
let update_documents = std::mem::take(&mut self.update_documents_aggregator)
|
let update_documents = std::mem::take(&mut self.update_documents_aggregator)
|
||||||
.into_event(&self.user, "Documents Updated");
|
.into_event(&self.user, "Documents Updated");
|
||||||
|
let health =
|
||||||
|
std::mem::take(&mut self.health_aggregator).into_event(&self.user, "Health Seen");
|
||||||
|
|
||||||
if let Some(get_search) = get_search {
|
if let Some(get_search) = get_search {
|
||||||
let _ = self.batcher.push(get_search).await;
|
let _ = self.batcher.push(get_search).await;
|
||||||
@ -423,6 +434,9 @@ impl Segment {
|
|||||||
if let Some(update_documents) = update_documents {
|
if let Some(update_documents) = update_documents {
|
||||||
let _ = self.batcher.push(update_documents).await;
|
let _ = self.batcher.push(update_documents).await;
|
||||||
}
|
}
|
||||||
|
if let Some(health) = health {
|
||||||
|
let _ = self.batcher.push(health).await;
|
||||||
|
}
|
||||||
let _ = self.batcher.flush().await;
|
let _ = self.batcher.flush().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -825,3 +839,53 @@ impl DocumentsDeletionAggregator {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Serialize)]
|
||||||
|
pub struct HealthAggregator {
|
||||||
|
#[serde(skip)]
|
||||||
|
timestamp: Option<OffsetDateTime>,
|
||||||
|
|
||||||
|
// context
|
||||||
|
#[serde(rename = "user-agent")]
|
||||||
|
user_agents: HashSet<String>,
|
||||||
|
|
||||||
|
total_received: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HealthAggregator {
|
||||||
|
pub fn from_query(request: &HttpRequest) -> Self {
|
||||||
|
let mut ret = Self::default();
|
||||||
|
ret.timestamp = Some(OffsetDateTime::now_utc());
|
||||||
|
|
||||||
|
ret.user_agents = extract_user_agents(request).into_iter().collect();
|
||||||
|
ret.total_received = 1;
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Aggregate one [DocumentsAggregator] into another.
|
||||||
|
pub fn aggregate(&mut self, other: Self) {
|
||||||
|
if self.timestamp.is_none() {
|
||||||
|
self.timestamp = other.timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we can't create a union because there is no `into_union` method
|
||||||
|
for user_agent in other.user_agents {
|
||||||
|
self.user_agents.insert(user_agent);
|
||||||
|
}
|
||||||
|
self.total_received = self.total_received.saturating_add(other.total_received);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
|
||||||
|
// if we had no timestamp it means we never encountered any events and
|
||||||
|
// thus we don't need to send this event.
|
||||||
|
let timestamp = self.timestamp?;
|
||||||
|
|
||||||
|
Some(Track {
|
||||||
|
timestamp: Some(timestamp),
|
||||||
|
user: user.clone(),
|
||||||
|
event: event_name.to_string(),
|
||||||
|
properties: serde_json::to_value(self).ok()?,
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -308,7 +308,11 @@ struct VersionResponse {
|
|||||||
|
|
||||||
async fn get_version(
|
async fn get_version(
|
||||||
_index_scheduler: GuardedData<ActionPolicy<{ actions::VERSION }>, Data<IndexScheduler>>,
|
_index_scheduler: GuardedData<ActionPolicy<{ actions::VERSION }>, Data<IndexScheduler>>,
|
||||||
|
req: HttpRequest,
|
||||||
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> HttpResponse {
|
) -> HttpResponse {
|
||||||
|
analytics.publish("Version Seen".to_string(), json!(null), Some(&req));
|
||||||
|
|
||||||
let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
|
let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
|
||||||
let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown");
|
let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown");
|
||||||
|
|
||||||
@ -325,6 +329,11 @@ struct KeysResponse {
|
|||||||
public: Option<String>,
|
public: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_health() -> Result<HttpResponse, ResponseError> {
|
pub async fn get_health(
|
||||||
|
req: HttpRequest,
|
||||||
|
analytics: web::Data<dyn Analytics>,
|
||||||
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
|
analytics.health_seen(&req);
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "available" })))
|
Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "available" })))
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user