Store analytics for the documents deletions

This commit is contained in:
Tamo 2022-11-15 17:51:54 +01:00
parent 51be75a264
commit b44c381c2a
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
4 changed files with 104 additions and 3 deletions

View File

@ -5,7 +5,7 @@ use actix_web::HttpRequest;
use meilisearch_types::InstanceUid; use meilisearch_types::InstanceUid;
use serde_json::Value; use serde_json::Value;
use super::{find_user_id, Analytics}; use super::{find_user_id, Analytics, DocumentDeletionKind};
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::Opt; use crate::Opt;
@ -49,6 +49,7 @@ impl Analytics for MockAnalytics {
_request: &HttpRequest, _request: &HttpRequest,
) { ) {
} }
fn delete_documents(&self, _kind: DocumentDeletionKind, _request: &HttpRequest) {}
fn update_documents( fn update_documents(
&self, &self,
_documents_query: &UpdateDocumentsQuery, _documents_query: &UpdateDocumentsQuery,

View File

@ -54,6 +54,13 @@ fn find_user_id(db_path: &Path) -> Option<InstanceUid> {
.and_then(|uid| InstanceUid::from_str(&uid).ok()) .and_then(|uid| InstanceUid::from_str(&uid).ok())
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum DocumentDeletionKind {
PerDocumentId,
ClearAll,
PerBatch,
}
pub trait Analytics: Sync + Send { pub trait Analytics: Sync + Send {
fn instance_uid(&self) -> Option<&InstanceUid>; fn instance_uid(&self) -> Option<&InstanceUid>;
@ -73,6 +80,10 @@ pub trait Analytics: Sync + Send {
index_creation: bool, index_creation: bool,
request: &HttpRequest, request: &HttpRequest,
); );
// this method should be called to aggregate a add documents request
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest);
// this method should be called to batch a update documents request // this method should be called to batch a update documents request
fn update_documents( fn update_documents(
&self, &self,

View File

@ -23,7 +23,7 @@ use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, Receiver, Sender};
use uuid::Uuid; use uuid::Uuid;
use super::{config_user_id_path, MEILISEARCH_CONFIG_PATH}; use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, SchedulerConfig}; use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, SchedulerConfig};
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::UpdateDocumentsQuery;
@ -68,6 +68,7 @@ pub enum AnalyticsMsg {
AggregateGetSearch(SearchAggregator), AggregateGetSearch(SearchAggregator),
AggregatePostSearch(SearchAggregator), AggregatePostSearch(SearchAggregator),
AggregateAddDocuments(DocumentsAggregator), AggregateAddDocuments(DocumentsAggregator),
AggregateDeleteDocuments(DocumentsDeletionAggregator),
AggregateUpdateDocuments(DocumentsAggregator), AggregateUpdateDocuments(DocumentsAggregator),
} }
@ -127,6 +128,7 @@ impl SegmentAnalytics {
post_search_aggregator: SearchAggregator::default(), post_search_aggregator: SearchAggregator::default(),
get_search_aggregator: SearchAggregator::default(), get_search_aggregator: SearchAggregator::default(),
add_documents_aggregator: DocumentsAggregator::default(), add_documents_aggregator: DocumentsAggregator::default(),
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
update_documents_aggregator: DocumentsAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(),
}); });
tokio::spawn(segment.run(index_scheduler.clone())); tokio::spawn(segment.run(index_scheduler.clone()));
@ -173,6 +175,11 @@ impl super::Analytics for SegmentAnalytics {
let _ = self.sender.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate));
} }
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest) {
let aggregate = DocumentsDeletionAggregator::from_query(kind, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateDeleteDocuments(aggregate));
}
fn update_documents( fn update_documents(
&self, &self,
documents_query: &UpdateDocumentsQuery, documents_query: &UpdateDocumentsQuery,
@ -310,6 +317,7 @@ pub struct Segment {
get_search_aggregator: SearchAggregator, get_search_aggregator: SearchAggregator,
post_search_aggregator: SearchAggregator, post_search_aggregator: SearchAggregator,
add_documents_aggregator: DocumentsAggregator, add_documents_aggregator: DocumentsAggregator,
delete_documents_aggregator: DocumentsDeletionAggregator,
update_documents_aggregator: DocumentsAggregator, update_documents_aggregator: DocumentsAggregator,
} }
@ -364,6 +372,7 @@ impl Segment {
Some(AnalyticsMsg::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
None => (), None => (),
} }
@ -394,6 +403,8 @@ impl Segment {
.into_event(&self.user, "Documents Searched POST"); .into_event(&self.user, "Documents Searched POST");
let add_documents = std::mem::take(&mut self.add_documents_aggregator) let add_documents = std::mem::take(&mut self.add_documents_aggregator)
.into_event(&self.user, "Documents Added"); .into_event(&self.user, "Documents Added");
let delete_documents = std::mem::take(&mut self.delete_documents_aggregator)
.into_event(&self.user, "Documents Deleted");
let update_documents = std::mem::take(&mut self.update_documents_aggregator) let update_documents = std::mem::take(&mut self.update_documents_aggregator)
.into_event(&self.user, "Documents Updated"); .into_event(&self.user, "Documents Updated");
@ -406,6 +417,9 @@ impl Segment {
if let Some(add_documents) = add_documents { if let Some(add_documents) = add_documents {
let _ = self.batcher.push(add_documents).await; let _ = self.batcher.push(add_documents).await;
} }
if let Some(delete_documents) = delete_documents {
let _ = self.batcher.push(delete_documents).await;
}
if let Some(update_documents) = update_documents { if let Some(update_documents) = update_documents {
let _ = self.batcher.push(update_documents).await; let _ = self.batcher.push(update_documents).await;
} }
@ -717,3 +731,65 @@ impl DocumentsAggregator {
} }
} }
} }
#[derive(Default, Serialize)]
pub struct DocumentsDeletionAggregator {
#[serde(skip)]
timestamp: Option<OffsetDateTime>,
// context
#[serde(rename = "user-agent")]
user_agents: HashSet<String>,
total_received: usize,
per_document_id: bool,
clear_all: bool,
per_batch: bool,
}
impl DocumentsDeletionAggregator {
pub fn from_query(kind: DocumentDeletionKind, request: &HttpRequest) -> Self {
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.user_agents = extract_user_agents(request).into_iter().collect();
ret.total_received = 1;
match kind {
DocumentDeletionKind::PerDocumentId => ret.per_document_id = true,
DocumentDeletionKind::ClearAll => ret.clear_all = true,
DocumentDeletionKind::PerBatch => ret.per_batch = true,
}
ret
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents {
self.user_agents.insert(user_agent);
}
self.total_received = self.total_received.saturating_add(other.total_received);
self.per_document_id |= other.per_document_id;
self.clear_all |= other.clear_all;
self.per_batch |= other.per_batch;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
// if we had no timestamp it means we never encountered any events and
// thus we don't need to send this event.
let timestamp = self.timestamp?;
Some(Track {
timestamp: Some(timestamp),
user: user.clone(),
event: event_name.to_string(),
properties: serde_json::to_value(self).ok()?,
..Default::default()
})
}
}

View File

@ -21,7 +21,7 @@ use serde::Deserialize;
use serde_cs::vec::CS; use serde_cs::vec::CS;
use serde_json::Value; use serde_json::Value;
use crate::analytics::Analytics; use crate::analytics::{Analytics, DocumentDeletionKind};
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
@ -95,7 +95,11 @@ pub async fn get_document(
pub async fn delete_document( pub async fn delete_document(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>, path: web::Path<DocumentParam>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
analytics.delete_documents(DocumentDeletionKind::PerDocumentId, &req);
let DocumentParam { document_id, index_uid } = path.into_inner(); let DocumentParam { document_id, index_uid } = path.into_inner();
let task = KindWithContent::DocumentDeletion { index_uid, documents_ids: vec![document_id] }; let task = KindWithContent::DocumentDeletion { index_uid, documents_ids: vec![document_id] };
let task: SummarizedTaskView = let task: SummarizedTaskView =
@ -296,8 +300,13 @@ pub async fn delete_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<String>, path: web::Path<String>,
body: web::Json<Vec<Value>>, body: web::Json<Vec<Value>>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", body); debug!("called with params: {:?}", body);
analytics.delete_documents(DocumentDeletionKind::PerBatch, &req);
let ids = body let ids = body
.iter() .iter()
.map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string())) .map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string()))
@ -315,7 +324,11 @@ pub async fn delete_documents(
pub async fn clear_all_documents( pub async fn clear_all_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<String>, path: web::Path<String>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
let task = KindWithContent::DocumentClear { index_uid: path.into_inner() }; let task = KindWithContent::DocumentClear { index_uid: path.into_inner() };
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();