From b44c381c2ae77ec98223dcc390ac8b7543e98850 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 15 Nov 2022 17:51:54 +0100 Subject: [PATCH] Store analytics for the documents deletions --- .../src/analytics/mock_analytics.rs | 3 +- meilisearch-http/src/analytics/mod.rs | 11 +++ .../src/analytics/segment_analytics.rs | 78 ++++++++++++++++++- .../src/routes/indexes/documents.rs | 15 +++- 4 files changed, 104 insertions(+), 3 deletions(-) diff --git a/meilisearch-http/src/analytics/mock_analytics.rs b/meilisearch-http/src/analytics/mock_analytics.rs index ab93f5edc..82460be72 100644 --- a/meilisearch-http/src/analytics/mock_analytics.rs +++ b/meilisearch-http/src/analytics/mock_analytics.rs @@ -5,7 +5,7 @@ use actix_web::HttpRequest; use meilisearch_types::InstanceUid; use serde_json::Value; -use super::{find_user_id, Analytics}; +use super::{find_user_id, Analytics, DocumentDeletionKind}; use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::Opt; @@ -49,6 +49,7 @@ impl Analytics for MockAnalytics { _request: &HttpRequest, ) { } + fn delete_documents(&self, _kind: DocumentDeletionKind, _request: &HttpRequest) {} fn update_documents( &self, _documents_query: &UpdateDocumentsQuery, diff --git a/meilisearch-http/src/analytics/mod.rs b/meilisearch-http/src/analytics/mod.rs index ffebaea77..2fe5d81a4 100644 --- a/meilisearch-http/src/analytics/mod.rs +++ b/meilisearch-http/src/analytics/mod.rs @@ -54,6 +54,13 @@ fn find_user_id(db_path: &Path) -> Option { .and_then(|uid| InstanceUid::from_str(&uid).ok()) } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum DocumentDeletionKind { + PerDocumentId, + ClearAll, + PerBatch, +} + pub trait Analytics: Sync + Send { fn instance_uid(&self) -> Option<&InstanceUid>; @@ -73,6 +80,10 @@ pub trait Analytics: Sync + Send { index_creation: bool, request: &HttpRequest, ); + + // this method should be called to aggregate a add documents request + fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest); + // this method should be called to batch a update documents request fn update_documents( &self, diff --git a/meilisearch-http/src/analytics/segment_analytics.rs b/meilisearch-http/src/analytics/segment_analytics.rs index d5bc4cf0d..8028aee36 100644 --- a/meilisearch-http/src/analytics/segment_analytics.rs +++ b/meilisearch-http/src/analytics/segment_analytics.rs @@ -23,7 +23,7 @@ use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use uuid::Uuid; -use super::{config_user_id_path, MEILISEARCH_CONFIG_PATH}; +use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH}; use crate::analytics::Analytics; use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, SchedulerConfig}; use crate::routes::indexes::documents::UpdateDocumentsQuery; @@ -68,6 +68,7 @@ pub enum AnalyticsMsg { AggregateGetSearch(SearchAggregator), AggregatePostSearch(SearchAggregator), AggregateAddDocuments(DocumentsAggregator), + AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateUpdateDocuments(DocumentsAggregator), } @@ -127,6 +128,7 @@ impl SegmentAnalytics { post_search_aggregator: SearchAggregator::default(), get_search_aggregator: SearchAggregator::default(), add_documents_aggregator: DocumentsAggregator::default(), + delete_documents_aggregator: DocumentsDeletionAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(), }); tokio::spawn(segment.run(index_scheduler.clone())); @@ -173,6 +175,11 @@ impl super::Analytics for SegmentAnalytics { let _ = self.sender.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate)); } + fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest) { + let aggregate = DocumentsDeletionAggregator::from_query(kind, request); + let _ = self.sender.try_send(AnalyticsMsg::AggregateDeleteDocuments(aggregate)); + } + fn update_documents( &self, documents_query: &UpdateDocumentsQuery, @@ -310,6 +317,7 @@ pub struct Segment { get_search_aggregator: SearchAggregator, post_search_aggregator: SearchAggregator, add_documents_aggregator: DocumentsAggregator, + delete_documents_aggregator: DocumentsDeletionAggregator, update_documents_aggregator: DocumentsAggregator, } @@ -364,6 +372,7 @@ impl Segment { Some(AnalyticsMsg::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), + Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), None => (), } @@ -394,6 +403,8 @@ impl Segment { .into_event(&self.user, "Documents Searched POST"); let add_documents = std::mem::take(&mut self.add_documents_aggregator) .into_event(&self.user, "Documents Added"); + let delete_documents = std::mem::take(&mut self.delete_documents_aggregator) + .into_event(&self.user, "Documents Deleted"); let update_documents = std::mem::take(&mut self.update_documents_aggregator) .into_event(&self.user, "Documents Updated"); @@ -406,6 +417,9 @@ impl Segment { if let Some(add_documents) = add_documents { let _ = self.batcher.push(add_documents).await; } + if let Some(delete_documents) = delete_documents { + let _ = self.batcher.push(delete_documents).await; + } if let Some(update_documents) = update_documents { let _ = self.batcher.push(update_documents).await; } @@ -717,3 +731,65 @@ impl DocumentsAggregator { } } } + +#[derive(Default, Serialize)] +pub struct DocumentsDeletionAggregator { + #[serde(skip)] + timestamp: Option, + + // context + #[serde(rename = "user-agent")] + user_agents: HashSet, + + total_received: usize, + per_document_id: bool, + clear_all: bool, + per_batch: bool, +} + +impl DocumentsDeletionAggregator { + pub fn from_query(kind: DocumentDeletionKind, request: &HttpRequest) -> Self { + let mut ret = Self::default(); + ret.timestamp = Some(OffsetDateTime::now_utc()); + + ret.user_agents = extract_user_agents(request).into_iter().collect(); + ret.total_received = 1; + match kind { + DocumentDeletionKind::PerDocumentId => ret.per_document_id = true, + DocumentDeletionKind::ClearAll => ret.clear_all = true, + DocumentDeletionKind::PerBatch => ret.per_batch = true, + } + + ret + } + + /// Aggregate one [DocumentsAggregator] into another. + pub fn aggregate(&mut self, other: Self) { + if self.timestamp.is_none() { + self.timestamp = other.timestamp; + } + + // we can't create a union because there is no `into_union` method + for user_agent in other.user_agents { + self.user_agents.insert(user_agent); + } + self.total_received = self.total_received.saturating_add(other.total_received); + self.per_document_id |= other.per_document_id; + self.clear_all |= other.clear_all; + self.per_batch |= other.per_batch; + } + + pub fn into_event(self, user: &User, event_name: &str) -> Option { + // if we had no timestamp it means we never encountered any events and + // thus we don't need to send this event. + let timestamp = self.timestamp?; + + Some(Track { + timestamp: Some(timestamp), + user: user.clone(), + event: event_name.to_string(), + properties: serde_json::to_value(self).ok()?, + ..Default::default() + }) + } +} diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 0cdb11e8a..0fe3cf102 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -21,7 +21,7 @@ use serde::Deserialize; use serde_cs::vec::CS; use serde_json::Value; -use crate::analytics::Analytics; +use crate::analytics::{Analytics, DocumentDeletionKind}; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; @@ -95,7 +95,11 @@ pub async fn get_document( pub async fn delete_document( index_scheduler: GuardedData, Data>, path: web::Path, + req: HttpRequest, + analytics: web::Data, ) -> Result { + analytics.delete_documents(DocumentDeletionKind::PerDocumentId, &req); + let DocumentParam { document_id, index_uid } = path.into_inner(); let task = KindWithContent::DocumentDeletion { index_uid, documents_ids: vec![document_id] }; let task: SummarizedTaskView = @@ -296,8 +300,13 @@ pub async fn delete_documents( index_scheduler: GuardedData, Data>, path: web::Path, body: web::Json>, + req: HttpRequest, + analytics: web::Data, ) -> Result { debug!("called with params: {:?}", body); + + analytics.delete_documents(DocumentDeletionKind::PerBatch, &req); + let ids = body .iter() .map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string())) @@ -315,7 +324,11 @@ pub async fn delete_documents( pub async fn clear_all_documents( index_scheduler: GuardedData, Data>, path: web::Path, + req: HttpRequest, + analytics: web::Data, ) -> Result { + analytics.delete_documents(DocumentDeletionKind::ClearAll, &req); + let task = KindWithContent::DocumentClear { index_uid: path.into_inner() }; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();