batch the tasks seen events

This commit is contained in:
Tamo 2022-11-16 18:44:58 +01:00
parent 0e6394fafc
commit f1884d6910
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
5 changed files with 126 additions and 36 deletions

View File

@ -7,6 +7,7 @@ use serde_json::Value;
use super::{find_user_id, Analytics, DocumentDeletionKind}; use super::{find_user_id, Analytics, DocumentDeletionKind};
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::routes::tasks::TasksFilterQueryRaw;
use crate::Opt; use crate::Opt;
pub struct MockAnalytics { pub struct MockAnalytics {
@ -57,5 +58,6 @@ impl Analytics for MockAnalytics {
_request: &HttpRequest, _request: &HttpRequest,
) { ) {
} }
fn get_tasks(&self, _query: &TasksFilterQueryRaw, _request: &HttpRequest) {}
fn health_seen(&self, _request: &HttpRequest) {} fn health_seen(&self, _request: &HttpRequest) {}
} }

View File

@ -15,6 +15,7 @@ use platform_dirs::AppDirs;
use serde_json::Value; use serde_json::Value;
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::routes::tasks::TasksFilterQueryRaw;
// if we are in debug mode OR the analytics feature is disabled // if we are in debug mode OR the analytics feature is disabled
// the `SegmentAnalytics` point to the mock instead of the real analytics // the `SegmentAnalytics` point to the mock instead of the real analytics
@ -92,6 +93,9 @@ pub trait Analytics: Sync + Send {
request: &HttpRequest, request: &HttpRequest,
); );
// this method should be called to aggregate the get tasks requests.
fn get_tasks(&self, query: &TasksFilterQueryRaw, request: &HttpRequest);
// this method should be called to aggregate a add documents request // this method should be called to aggregate a add documents request
fn health_seen(&self, request: &HttpRequest); fn health_seen(&self, request: &HttpRequest);
} }

View File

@ -27,6 +27,7 @@ use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, SchedulerConfig}; use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, SchedulerConfig};
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::routes::tasks::TasksFilterQueryRaw;
use crate::routes::{create_all_stats, Stats}; use crate::routes::{create_all_stats, Stats};
use crate::search::{ use crate::search::{
SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER,
@ -70,6 +71,7 @@ pub enum AnalyticsMsg {
AggregateAddDocuments(DocumentsAggregator), AggregateAddDocuments(DocumentsAggregator),
AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateDeleteDocuments(DocumentsDeletionAggregator),
AggregateUpdateDocuments(DocumentsAggregator), AggregateUpdateDocuments(DocumentsAggregator),
AggregateTasks(TasksAggregator),
AggregateHealth(HealthAggregator), AggregateHealth(HealthAggregator),
} }
@ -131,6 +133,7 @@ impl SegmentAnalytics {
add_documents_aggregator: DocumentsAggregator::default(), add_documents_aggregator: DocumentsAggregator::default(),
delete_documents_aggregator: DocumentsDeletionAggregator::default(), delete_documents_aggregator: DocumentsDeletionAggregator::default(),
update_documents_aggregator: DocumentsAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(),
get_tasks_aggregator: TasksAggregator::default(),
health_aggregator: HealthAggregator::default(), health_aggregator: HealthAggregator::default(),
}); });
tokio::spawn(segment.run(index_scheduler.clone())); tokio::spawn(segment.run(index_scheduler.clone()));
@ -192,6 +195,11 @@ impl super::Analytics for SegmentAnalytics {
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
} }
fn get_tasks(&self, query: &TasksFilterQueryRaw, request: &HttpRequest) {
let aggregate = TasksAggregator::from_query(query, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate));
}
fn health_seen(&self, request: &HttpRequest) { fn health_seen(&self, request: &HttpRequest) {
let aggregate = HealthAggregator::from_query(request); let aggregate = HealthAggregator::from_query(request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate));
@ -326,6 +334,7 @@ pub struct Segment {
add_documents_aggregator: DocumentsAggregator, add_documents_aggregator: DocumentsAggregator,
delete_documents_aggregator: DocumentsDeletionAggregator, delete_documents_aggregator: DocumentsDeletionAggregator,
update_documents_aggregator: DocumentsAggregator, update_documents_aggregator: DocumentsAggregator,
get_tasks_aggregator: TasksAggregator,
health_aggregator: HealthAggregator, health_aggregator: HealthAggregator,
} }
@ -382,6 +391,7 @@ impl Segment {
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
None => (), None => (),
} }
@ -416,6 +426,8 @@ impl Segment {
.into_event(&self.user, "Documents Deleted"); .into_event(&self.user, "Documents Deleted");
let update_documents = std::mem::take(&mut self.update_documents_aggregator) let update_documents = std::mem::take(&mut self.update_documents_aggregator)
.into_event(&self.user, "Documents Updated"); .into_event(&self.user, "Documents Updated");
let get_tasks =
std::mem::take(&mut self.get_tasks_aggregator).into_event(&self.user, "Tasks Seen");
let health = let health =
std::mem::take(&mut self.health_aggregator).into_event(&self.user, "Health Seen"); std::mem::take(&mut self.health_aggregator).into_event(&self.user, "Health Seen");
@ -434,6 +446,9 @@ impl Segment {
if let Some(update_documents) = update_documents { if let Some(update_documents) = update_documents {
let _ = self.batcher.push(update_documents).await; let _ = self.batcher.push(update_documents).await;
} }
if let Some(get_tasks) = get_tasks {
let _ = self.batcher.push(get_tasks).await;
}
if let Some(health) = health { if let Some(health) = health {
let _ = self.batcher.push(health).await; let _ = self.batcher.push(health).await;
} }
@ -840,6 +855,91 @@ impl DocumentsDeletionAggregator {
} }
} }
#[derive(Default, Serialize)]
pub struct TasksAggregator {
#[serde(skip)]
timestamp: Option<OffsetDateTime>,
// context
#[serde(rename = "user-agent")]
user_agents: HashSet<String>,
filtered_by_uid: bool,
filtered_by_index_uid: bool,
filtered_by_type: bool,
filtered_by_status: bool,
filtered_by_canceled_by: bool,
filtered_by_before_enqueued_at: bool,
filtered_by_after_enqueued_at: bool,
filtered_by_before_started_at: bool,
filtered_by_after_started_at: bool,
filtered_by_before_finished_at: bool,
filtered_by_after_finished_at: bool,
total_received: usize,
}
impl TasksAggregator {
pub fn from_query(query: &TasksFilterQueryRaw, request: &HttpRequest) -> Self {
Self {
timestamp: Some(OffsetDateTime::now_utc()),
user_agents: extract_user_agents(request).into_iter().collect(),
filtered_by_uid: query.common.uids.is_some(),
filtered_by_index_uid: query.common.index_uids.is_some(),
filtered_by_type: query.common.types.is_some(),
filtered_by_status: query.common.statuses.is_some(),
filtered_by_canceled_by: query.common.canceled_by.is_some(),
filtered_by_before_enqueued_at: query.dates.before_enqueued_at.is_some(),
filtered_by_after_enqueued_at: query.dates.after_enqueued_at.is_some(),
filtered_by_before_started_at: query.dates.before_started_at.is_some(),
filtered_by_after_started_at: query.dates.after_started_at.is_some(),
filtered_by_before_finished_at: query.dates.before_finished_at.is_some(),
filtered_by_after_finished_at: query.dates.after_finished_at.is_some(),
total_received: 1,
}
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents {
self.user_agents.insert(user_agent);
}
self.filtered_by_uid |= other.filtered_by_uid;
self.filtered_by_index_uid |= other.filtered_by_index_uid;
self.filtered_by_type |= other.filtered_by_type;
self.filtered_by_status |= other.filtered_by_status;
self.filtered_by_canceled_by |= other.filtered_by_canceled_by;
self.filtered_by_before_enqueued_at |= other.filtered_by_before_enqueued_at;
self.filtered_by_after_enqueued_at |= other.filtered_by_after_enqueued_at;
self.filtered_by_before_started_at |= other.filtered_by_before_started_at;
self.filtered_by_after_started_at |= other.filtered_by_after_started_at;
self.filtered_by_before_finished_at |= other.filtered_by_before_finished_at;
self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at;
self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at;
self.total_received = self.total_received.saturating_add(other.total_received);
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
// if we had no timestamp it means we never encountered any events and
// thus we don't need to send this event.
let timestamp = self.timestamp?;
Some(Track {
timestamp: Some(timestamp),
user: user.clone(),
event: event_name.to_string(),
properties: serde_json::to_value(self).ok()?,
..Default::default()
})
}
}
#[derive(Default, Serialize)] #[derive(Default, Serialize)]
pub struct HealthAggregator { pub struct HealthAggregator {
#[serde(skip)] #[serde(skip)]

View File

@ -21,7 +21,7 @@ mod api_key;
mod dump; mod dump;
pub mod indexes; pub mod indexes;
mod swap_indexes; mod swap_indexes;
mod tasks; pub mod tasks;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::scope("/tasks").configure(tasks::configure)) cfg.service(web::scope("/tasks").configure(tasks::configure))

View File

@ -162,11 +162,11 @@ impl From<Details> for DetailsView {
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct TaskCommonQueryRaw { pub struct TaskCommonQueryRaw {
uids: Option<CS<String>>, pub uids: Option<CS<String>>,
canceled_by: Option<CS<String>>, pub canceled_by: Option<CS<String>>,
types: Option<CS<StarOr<String>>>, pub types: Option<CS<StarOr<String>>>,
statuses: Option<CS<StarOr<String>>>, pub statuses: Option<CS<StarOr<String>>>,
index_uids: Option<CS<StarOr<String>>>, pub index_uids: Option<CS<StarOr<String>>>,
} }
impl TaskCommonQueryRaw { impl TaskCommonQueryRaw {
fn validate(self) -> Result<TaskCommonQuery, ResponseError> { fn validate(self) -> Result<TaskCommonQuery, ResponseError> {
@ -261,12 +261,12 @@ impl TaskCommonQueryRaw {
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct TaskDateQueryRaw { pub struct TaskDateQueryRaw {
after_enqueued_at: Option<String>, pub after_enqueued_at: Option<String>,
before_enqueued_at: Option<String>, pub before_enqueued_at: Option<String>,
after_started_at: Option<String>, pub after_started_at: Option<String>,
before_started_at: Option<String>, pub before_started_at: Option<String>,
after_finished_at: Option<String>, pub after_finished_at: Option<String>,
before_finished_at: Option<String>, pub before_finished_at: Option<String>,
} }
impl TaskDateQueryRaw { impl TaskDateQueryRaw {
fn validate(self) -> Result<TaskDateQuery, ResponseError> { fn validate(self) -> Result<TaskDateQuery, ResponseError> {
@ -339,21 +339,21 @@ impl TaskDateQueryRaw {
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct TasksFilterQueryRaw { pub struct TasksFilterQueryRaw {
#[serde(flatten)] #[serde(flatten)]
common: TaskCommonQueryRaw, pub common: TaskCommonQueryRaw,
#[serde(default = "DEFAULT_LIMIT")] #[serde(default = "DEFAULT_LIMIT")]
limit: u32, pub limit: u32,
from: Option<TaskId>, pub from: Option<TaskId>,
#[serde(flatten)] #[serde(flatten)]
dates: TaskDateQueryRaw, pub dates: TaskDateQueryRaw,
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct TaskDeletionOrCancelationQueryRaw { pub struct TaskDeletionOrCancelationQueryRaw {
#[serde(flatten)] #[serde(flatten)]
common: TaskCommonQueryRaw, pub common: TaskCommonQueryRaw,
#[serde(flatten)] #[serde(flatten)]
dates: TaskDateQueryRaw, pub dates: TaskDateQueryRaw,
} }
impl TasksFilterQueryRaw { impl TasksFilterQueryRaw {
@ -597,6 +597,8 @@ async fn get_tasks(
req: HttpRequest, req: HttpRequest,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
analytics.get_tasks(&params, &req);
let TasksFilterQuery { let TasksFilterQuery {
common: TaskCommonQuery { types, uids, canceled_by, statuses, index_uids }, common: TaskCommonQuery { types, uids, canceled_by, statuses, index_uids },
limit, limit,
@ -612,24 +614,6 @@ async fn get_tasks(
}, },
} = params.into_inner().validate()?; } = params.into_inner().validate()?;
analytics.publish(
"Tasks Seen".to_string(),
json!({
"filtered_by_uid": uids.is_some(),
"filtered_by_index_uid": index_uids.is_some(),
"filtered_by_type": types.is_some(),
"filtered_by_status": statuses.is_some(),
"filtered_by_canceled_by": canceled_by.is_some(),
"filtered_by_before_enqueued_at": before_enqueued_at.is_some(),
"filtered_by_after_enqueued_at": after_enqueued_at.is_some(),
"filtered_by_before_started_at": before_started_at.is_some(),
"filtered_by_after_started_at": after_started_at.is_some(),
"filtered_by_before_finished_at": before_finished_at.is_some(),
"filtered_by_after_finished_at": after_finished_at.is_some(),
}),
Some(&req),
);
// We +1 just to know if there is more after this "page" or not. // We +1 just to know if there is more after this "page" or not.
let limit = limit.saturating_add(1); let limit = limit.saturating_add(1);