remove the Task Seen analytic

This commit is contained in:
Tamo 2024-04-16 18:48:10 +02:00
parent 70ce0095ea
commit abae31aee0
4 changed files with 0 additions and 146 deletions

View File

@ -7,7 +7,6 @@ use serde_json::Value;
use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind}; use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind};
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::routes::tasks::TasksFilterQuery;
use crate::Opt; use crate::Opt;
pub struct MockAnalytics { pub struct MockAnalytics {
@ -86,6 +85,5 @@ impl Analytics for MockAnalytics {
} }
fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
fn get_tasks(&self, _query: &TasksFilterQuery, _request: &HttpRequest) {}
fn health_seen(&self, _request: &HttpRequest) {} fn health_seen(&self, _request: &HttpRequest) {}
} }

View File

@ -14,7 +14,6 @@ use platform_dirs::AppDirs;
use serde_json::Value; use serde_json::Value;
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::routes::tasks::TasksFilterQuery;
// if the analytics feature is disabled // if the analytics feature is disabled
// the `SegmentAnalytics` point to the mock instead of the real analytics // the `SegmentAnalytics` point to the mock instead of the real analytics
@ -118,9 +117,6 @@ pub trait Analytics: Sync + Send {
request: &HttpRequest, request: &HttpRequest,
); );
// this method should be called to aggregate the get tasks requests.
fn get_tasks(&self, query: &TasksFilterQuery, request: &HttpRequest);
// this method should be called to aggregate a add documents request // this method should be called to aggregate a add documents request
fn health_seen(&self, request: &HttpRequest); fn health_seen(&self, request: &HttpRequest);
} }

View File

@ -33,7 +33,6 @@ use crate::option::{
}; };
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::routes::indexes::facet_search::FacetSearchQuery; use crate::routes::indexes::facet_search::FacetSearchQuery;
use crate::routes::tasks::TasksFilterQuery;
use crate::routes::{create_all_stats, Stats}; use crate::routes::{create_all_stats, Stats};
use crate::search::{ use crate::search::{
FacetSearchResult, MatchingStrategy, SearchQuery, SearchQueryWithIndex, SearchResult, FacetSearchResult, MatchingStrategy, SearchQuery, SearchQueryWithIndex, SearchResult,
@ -81,7 +80,6 @@ pub enum AnalyticsMsg {
AggregateUpdateDocuments(DocumentsAggregator), AggregateUpdateDocuments(DocumentsAggregator),
AggregateGetFetchDocuments(DocumentsFetchAggregator), AggregateGetFetchDocuments(DocumentsFetchAggregator),
AggregatePostFetchDocuments(DocumentsFetchAggregator), AggregatePostFetchDocuments(DocumentsFetchAggregator),
AggregateTasks(TasksAggregator),
AggregateHealth(HealthAggregator), AggregateHealth(HealthAggregator),
} }
@ -152,7 +150,6 @@ impl SegmentAnalytics {
update_documents_aggregator: DocumentsAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(),
get_fetch_documents_aggregator: DocumentsFetchAggregator::default(), get_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
post_fetch_documents_aggregator: DocumentsFetchAggregator::default(), post_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
get_tasks_aggregator: TasksAggregator::default(),
health_aggregator: HealthAggregator::default(), health_aggregator: HealthAggregator::default(),
}); });
tokio::spawn(segment.run(index_scheduler.clone(), auth_controller.clone())); tokio::spawn(segment.run(index_scheduler.clone(), auth_controller.clone()));
@ -232,11 +229,6 @@ impl super::Analytics for SegmentAnalytics {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate));
} }
fn get_tasks(&self, query: &TasksFilterQuery, request: &HttpRequest) {
let aggregate = TasksAggregator::from_query(query, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate));
}
fn health_seen(&self, request: &HttpRequest) { fn health_seen(&self, request: &HttpRequest) {
let aggregate = HealthAggregator::from_query(request); let aggregate = HealthAggregator::from_query(request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate));
@ -394,7 +386,6 @@ pub struct Segment {
update_documents_aggregator: DocumentsAggregator, update_documents_aggregator: DocumentsAggregator,
get_fetch_documents_aggregator: DocumentsFetchAggregator, get_fetch_documents_aggregator: DocumentsFetchAggregator,
post_fetch_documents_aggregator: DocumentsFetchAggregator, post_fetch_documents_aggregator: DocumentsFetchAggregator,
get_tasks_aggregator: TasksAggregator,
health_aggregator: HealthAggregator, health_aggregator: HealthAggregator,
} }
@ -458,7 +449,6 @@ impl Segment {
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
None => (), None => (),
} }
@ -513,7 +503,6 @@ impl Segment {
update_documents_aggregator, update_documents_aggregator,
get_fetch_documents_aggregator, get_fetch_documents_aggregator,
post_fetch_documents_aggregator, post_fetch_documents_aggregator,
get_tasks_aggregator,
health_aggregator, health_aggregator,
} = self; } = self;
@ -562,9 +551,6 @@ impl Segment {
{ {
let _ = self.batcher.push(post_fetch_documents).await; let _ = self.batcher.push(post_fetch_documents).await;
} }
if let Some(get_tasks) = take(get_tasks_aggregator).into_event(user, "Tasks Seen") {
let _ = self.batcher.push(get_tasks).await;
}
if let Some(health) = take(health_aggregator).into_event(user, "Health Seen") { if let Some(health) = take(health_aggregator).into_event(user, "Health Seen") {
let _ = self.batcher.push(health).await; let _ = self.batcher.push(health).await;
} }
@ -1503,124 +1489,6 @@ impl DocumentsDeletionAggregator {
} }
} }
#[derive(Default, Serialize)]
pub struct TasksAggregator {
#[serde(skip)]
timestamp: Option<OffsetDateTime>,
// context
#[serde(rename = "user-agent")]
user_agents: HashSet<String>,
filtered_by_uid: bool,
filtered_by_index_uid: bool,
filtered_by_type: bool,
filtered_by_status: bool,
filtered_by_canceled_by: bool,
filtered_by_before_enqueued_at: bool,
filtered_by_after_enqueued_at: bool,
filtered_by_before_started_at: bool,
filtered_by_after_started_at: bool,
filtered_by_before_finished_at: bool,
filtered_by_after_finished_at: bool,
total_received: usize,
}
impl TasksAggregator {
pub fn from_query(query: &TasksFilterQuery, request: &HttpRequest) -> Self {
let TasksFilterQuery {
limit: _,
from: _,
uids,
index_uids,
types,
statuses,
canceled_by,
before_enqueued_at,
after_enqueued_at,
before_started_at,
after_started_at,
before_finished_at,
after_finished_at,
} = query;
Self {
timestamp: Some(OffsetDateTime::now_utc()),
user_agents: extract_user_agents(request).into_iter().collect(),
filtered_by_uid: uids.is_some(),
filtered_by_index_uid: index_uids.is_some(),
filtered_by_type: types.is_some(),
filtered_by_status: statuses.is_some(),
filtered_by_canceled_by: canceled_by.is_some(),
filtered_by_before_enqueued_at: before_enqueued_at.is_some(),
filtered_by_after_enqueued_at: after_enqueued_at.is_some(),
filtered_by_before_started_at: before_started_at.is_some(),
filtered_by_after_started_at: after_started_at.is_some(),
filtered_by_before_finished_at: before_finished_at.is_some(),
filtered_by_after_finished_at: after_finished_at.is_some(),
total_received: 1,
}
}
/// Aggregate one [TasksAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self {
timestamp,
user_agents,
total_received,
filtered_by_uid,
filtered_by_index_uid,
filtered_by_type,
filtered_by_status,
filtered_by_canceled_by,
filtered_by_before_enqueued_at,
filtered_by_after_enqueued_at,
filtered_by_before_started_at,
filtered_by_after_started_at,
filtered_by_before_finished_at,
filtered_by_after_finished_at,
} = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.filtered_by_uid |= filtered_by_uid;
self.filtered_by_index_uid |= filtered_by_index_uid;
self.filtered_by_type |= filtered_by_type;
self.filtered_by_status |= filtered_by_status;
self.filtered_by_canceled_by |= filtered_by_canceled_by;
self.filtered_by_before_enqueued_at |= filtered_by_before_enqueued_at;
self.filtered_by_after_enqueued_at |= filtered_by_after_enqueued_at;
self.filtered_by_before_started_at |= filtered_by_before_started_at;
self.filtered_by_after_started_at |= filtered_by_after_started_at;
self.filtered_by_before_finished_at |= filtered_by_before_finished_at;
self.filtered_by_after_finished_at |= filtered_by_after_finished_at;
self.filtered_by_after_finished_at |= filtered_by_after_finished_at;
self.total_received = self.total_received.saturating_add(total_received);
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
// if we had no timestamp it means we never encountered any events and
// thus we don't need to send this event.
let timestamp = self.timestamp?;
Some(Track {
timestamp: Some(timestamp),
user: user.clone(),
event: event_name.to_string(),
properties: serde_json::to_value(self).ok()?,
..Default::default()
})
}
}
#[derive(Default, Serialize)] #[derive(Default, Serialize)]
pub struct HealthAggregator { pub struct HealthAggregator {
#[serde(skip)] #[serde(skip)]

View File

@ -270,12 +270,8 @@ pub struct AllTasks {
async fn get_tasks( async fn get_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TasksFilterQuery, DeserrQueryParamError>, params: AwebQueryParameter<TasksFilterQuery, DeserrQueryParamError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let mut params = params.into_inner(); let mut params = params.into_inner();
analytics.get_tasks(&params, &req);
// We +1 just to know if there is more after this "page" or not. // We +1 just to know if there is more after this "page" or not.
params.limit.0 = params.limit.0.saturating_add(1); params.limit.0 = params.limit.0.saturating_add(1);
let limit = params.limit.0; let limit = params.limit.0;
@ -298,8 +294,6 @@ async fn get_tasks(
async fn get_task( async fn get_task(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
task_uid: web::Path<String>, task_uid: web::Path<String>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let task_uid_string = task_uid.into_inner(); let task_uid_string = task_uid.into_inner();
@ -310,8 +304,6 @@ async fn get_task(
} }
}; };
analytics.publish("Tasks Seen".to_string(), json!({ "per_task_uid": true }), Some(&req));
let query = index_scheduler::Query { uids: Some(vec![task_uid]), ..Query::default() }; let query = index_scheduler::Query { uids: Some(vec![task_uid]), ..Query::default() };
let filters = index_scheduler.filters(); let filters = index_scheduler.filters();
let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(query, filters)?; let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(query, filters)?;