mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-30 00:55:00 +08:00
Merge #3738
3738: Add analytics on the get documents resource r=dureuill a=irevoire # Pull Request ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/3737 Related spec https://github.com/meilisearch/specifications/pull/234 ## What does this PR do? Add the analytics for the following routes: - `GET` - `/indexes/:uid/documents` - `GET` - `/indexes/:uid/documents/:doc_id` - `POST` - `/indexes/:uid/documents/fetch` These analytics are aggregated between two events: - `Documents Fetched GET` - `Documents Fetched POST` That shares the same payload: Property name | Description | Example | |---------------|-------------|---------| | `requests.total_received` | Total number of request received in this batch | 325 | | `per_document_id` | `false` | false | | `per_filter` | `true` if `POST /indexes/:indexUid/documents/fetch` endpoint was used with a filter in this batch, otherwise `false` | false | | `pagination.max_limit` | Highest value given for the `limit` parameter in this batch | 60 | | `pagination.max_offset` | Highest value given for the `offset` parameter in this batch | 1000 | Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
6ce1ce77e6
@ -5,7 +5,7 @@ use actix_web::HttpRequest;
|
|||||||
use meilisearch_types::InstanceUid;
|
use meilisearch_types::InstanceUid;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::{find_user_id, Analytics, DocumentDeletionKind};
|
use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind};
|
||||||
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||||
use crate::routes::tasks::TasksFilterQuery;
|
use crate::routes::tasks::TasksFilterQuery;
|
||||||
use crate::Opt;
|
use crate::Opt;
|
||||||
@ -71,6 +71,8 @@ impl Analytics for MockAnalytics {
|
|||||||
_request: &HttpRequest,
|
_request: &HttpRequest,
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
|
||||||
|
fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
|
||||||
fn get_tasks(&self, _query: &TasksFilterQuery, _request: &HttpRequest) {}
|
fn get_tasks(&self, _query: &TasksFilterQuery, _request: &HttpRequest) {}
|
||||||
fn health_seen(&self, _request: &HttpRequest) {}
|
fn health_seen(&self, _request: &HttpRequest) {}
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,12 @@ pub enum DocumentDeletionKind {
|
|||||||
PerFilter,
|
PerFilter,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||||
|
pub enum DocumentFetchKind {
|
||||||
|
PerDocumentId,
|
||||||
|
Normal { with_filter: bool, limit: usize, offset: usize },
|
||||||
|
}
|
||||||
|
|
||||||
pub trait Analytics: Sync + Send {
|
pub trait Analytics: Sync + Send {
|
||||||
fn instance_uid(&self) -> Option<&InstanceUid>;
|
fn instance_uid(&self) -> Option<&InstanceUid>;
|
||||||
|
|
||||||
@ -90,6 +96,12 @@ pub trait Analytics: Sync + Send {
|
|||||||
request: &HttpRequest,
|
request: &HttpRequest,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// this method should be called to aggregate a fetch documents request
|
||||||
|
fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest);
|
||||||
|
|
||||||
|
// this method should be called to aggregate a fetch documents request
|
||||||
|
fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest);
|
||||||
|
|
||||||
// this method should be called to aggregate a add documents request
|
// this method should be called to aggregate a add documents request
|
||||||
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest);
|
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest);
|
||||||
|
|
||||||
|
@ -23,7 +23,9 @@ use tokio::select;
|
|||||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH};
|
use super::{
|
||||||
|
config_user_id_path, DocumentDeletionKind, DocumentFetchKind, MEILISEARCH_CONFIG_PATH,
|
||||||
|
};
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, ScheduleSnapshot};
|
use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, ScheduleSnapshot};
|
||||||
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||||
@ -72,6 +74,8 @@ pub enum AnalyticsMsg {
|
|||||||
AggregateAddDocuments(DocumentsAggregator),
|
AggregateAddDocuments(DocumentsAggregator),
|
||||||
AggregateDeleteDocuments(DocumentsDeletionAggregator),
|
AggregateDeleteDocuments(DocumentsDeletionAggregator),
|
||||||
AggregateUpdateDocuments(DocumentsAggregator),
|
AggregateUpdateDocuments(DocumentsAggregator),
|
||||||
|
AggregateGetFetchDocuments(DocumentsFetchAggregator),
|
||||||
|
AggregatePostFetchDocuments(DocumentsFetchAggregator),
|
||||||
AggregateTasks(TasksAggregator),
|
AggregateTasks(TasksAggregator),
|
||||||
AggregateHealth(HealthAggregator),
|
AggregateHealth(HealthAggregator),
|
||||||
}
|
}
|
||||||
@ -139,6 +143,8 @@ impl SegmentAnalytics {
|
|||||||
add_documents_aggregator: DocumentsAggregator::default(),
|
add_documents_aggregator: DocumentsAggregator::default(),
|
||||||
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
|
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
|
||||||
update_documents_aggregator: DocumentsAggregator::default(),
|
update_documents_aggregator: DocumentsAggregator::default(),
|
||||||
|
get_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
|
||||||
|
post_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
|
||||||
get_tasks_aggregator: TasksAggregator::default(),
|
get_tasks_aggregator: TasksAggregator::default(),
|
||||||
health_aggregator: HealthAggregator::default(),
|
health_aggregator: HealthAggregator::default(),
|
||||||
});
|
});
|
||||||
@ -205,6 +211,16 @@ impl super::Analytics for SegmentAnalytics {
|
|||||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
|
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) {
|
||||||
|
let aggregate = DocumentsFetchAggregator::from_query(documents_query, request);
|
||||||
|
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) {
|
||||||
|
let aggregate = DocumentsFetchAggregator::from_query(documents_query, request);
|
||||||
|
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate));
|
||||||
|
}
|
||||||
|
|
||||||
fn get_tasks(&self, query: &TasksFilterQuery, request: &HttpRequest) {
|
fn get_tasks(&self, query: &TasksFilterQuery, request: &HttpRequest) {
|
||||||
let aggregate = TasksAggregator::from_query(query, request);
|
let aggregate = TasksAggregator::from_query(query, request);
|
||||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate));
|
let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate));
|
||||||
@ -341,6 +357,8 @@ pub struct Segment {
|
|||||||
add_documents_aggregator: DocumentsAggregator,
|
add_documents_aggregator: DocumentsAggregator,
|
||||||
delete_documents_aggregator: DocumentsDeletionAggregator,
|
delete_documents_aggregator: DocumentsDeletionAggregator,
|
||||||
update_documents_aggregator: DocumentsAggregator,
|
update_documents_aggregator: DocumentsAggregator,
|
||||||
|
get_fetch_documents_aggregator: DocumentsFetchAggregator,
|
||||||
|
post_fetch_documents_aggregator: DocumentsFetchAggregator,
|
||||||
get_tasks_aggregator: TasksAggregator,
|
get_tasks_aggregator: TasksAggregator,
|
||||||
health_aggregator: HealthAggregator,
|
health_aggregator: HealthAggregator,
|
||||||
}
|
}
|
||||||
@ -403,6 +421,8 @@ impl Segment {
|
|||||||
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
|
||||||
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
|
||||||
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
|
||||||
|
Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg),
|
||||||
|
Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg),
|
||||||
Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg),
|
||||||
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
|
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
|
||||||
None => (),
|
None => (),
|
||||||
@ -453,6 +473,10 @@ impl Segment {
|
|||||||
.into_event(&self.user, "Documents Deleted");
|
.into_event(&self.user, "Documents Deleted");
|
||||||
let update_documents = std::mem::take(&mut self.update_documents_aggregator)
|
let update_documents = std::mem::take(&mut self.update_documents_aggregator)
|
||||||
.into_event(&self.user, "Documents Updated");
|
.into_event(&self.user, "Documents Updated");
|
||||||
|
let get_fetch_documents = std::mem::take(&mut self.get_fetch_documents_aggregator)
|
||||||
|
.into_event(&self.user, "Documents Fetched GET");
|
||||||
|
let post_fetch_documents = std::mem::take(&mut self.post_fetch_documents_aggregator)
|
||||||
|
.into_event(&self.user, "Documents Fetched POST");
|
||||||
let get_tasks =
|
let get_tasks =
|
||||||
std::mem::take(&mut self.get_tasks_aggregator).into_event(&self.user, "Tasks Seen");
|
std::mem::take(&mut self.get_tasks_aggregator).into_event(&self.user, "Tasks Seen");
|
||||||
let health =
|
let health =
|
||||||
@ -476,6 +500,12 @@ impl Segment {
|
|||||||
if let Some(update_documents) = update_documents {
|
if let Some(update_documents) = update_documents {
|
||||||
let _ = self.batcher.push(update_documents).await;
|
let _ = self.batcher.push(update_documents).await;
|
||||||
}
|
}
|
||||||
|
if let Some(get_fetch_documents) = get_fetch_documents {
|
||||||
|
let _ = self.batcher.push(get_fetch_documents).await;
|
||||||
|
}
|
||||||
|
if let Some(post_fetch_documents) = post_fetch_documents {
|
||||||
|
let _ = self.batcher.push(post_fetch_documents).await;
|
||||||
|
}
|
||||||
if let Some(get_tasks) = get_tasks {
|
if let Some(get_tasks) = get_tasks {
|
||||||
let _ = self.batcher.push(get_tasks).await;
|
let _ = self.batcher.push(get_tasks).await;
|
||||||
}
|
}
|
||||||
@ -1138,3 +1168,76 @@ impl HealthAggregator {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Serialize)]
|
||||||
|
pub struct DocumentsFetchAggregator {
|
||||||
|
#[serde(skip)]
|
||||||
|
timestamp: Option<OffsetDateTime>,
|
||||||
|
|
||||||
|
// context
|
||||||
|
#[serde(rename = "user-agent")]
|
||||||
|
user_agents: HashSet<String>,
|
||||||
|
|
||||||
|
#[serde(rename = "requests.max_limit")]
|
||||||
|
total_received: usize,
|
||||||
|
|
||||||
|
// a call on ../documents/:doc_id
|
||||||
|
per_document_id: bool,
|
||||||
|
// if a filter was used
|
||||||
|
per_filter: bool,
|
||||||
|
|
||||||
|
// pagination
|
||||||
|
#[serde(rename = "pagination.max_limit")]
|
||||||
|
max_limit: usize,
|
||||||
|
#[serde(rename = "pagination.max_offset")]
|
||||||
|
max_offset: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DocumentsFetchAggregator {
|
||||||
|
pub fn from_query(query: &DocumentFetchKind, request: &HttpRequest) -> Self {
|
||||||
|
let (limit, offset) = match query {
|
||||||
|
DocumentFetchKind::PerDocumentId => (1, 0),
|
||||||
|
DocumentFetchKind::Normal { limit, offset, .. } => (*limit, *offset),
|
||||||
|
};
|
||||||
|
Self {
|
||||||
|
timestamp: Some(OffsetDateTime::now_utc()),
|
||||||
|
user_agents: extract_user_agents(request).into_iter().collect(),
|
||||||
|
total_received: 1,
|
||||||
|
per_document_id: matches!(query, DocumentFetchKind::PerDocumentId),
|
||||||
|
per_filter: matches!(query, DocumentFetchKind::Normal { with_filter, .. } if *with_filter),
|
||||||
|
max_limit: limit,
|
||||||
|
max_offset: offset,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Aggregate one [DocumentsFetchAggregator] into another.
|
||||||
|
pub fn aggregate(&mut self, other: Self) {
|
||||||
|
if self.timestamp.is_none() {
|
||||||
|
self.timestamp = other.timestamp;
|
||||||
|
}
|
||||||
|
for user_agent in other.user_agents {
|
||||||
|
self.user_agents.insert(user_agent);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.total_received = self.total_received.saturating_add(other.total_received);
|
||||||
|
self.per_document_id |= other.per_document_id;
|
||||||
|
self.per_filter |= other.per_filter;
|
||||||
|
|
||||||
|
self.max_limit = self.max_limit.max(other.max_limit);
|
||||||
|
self.max_offset = self.max_offset.max(other.max_offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
|
||||||
|
// if we had no timestamp it means we never encountered any events and
|
||||||
|
// thus we don't need to send this event.
|
||||||
|
let timestamp = self.timestamp?;
|
||||||
|
|
||||||
|
Some(Track {
|
||||||
|
timestamp: Some(timestamp),
|
||||||
|
user: user.clone(),
|
||||||
|
event: event_name.to_string(),
|
||||||
|
properties: serde_json::to_value(self).ok()?,
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -29,7 +29,7 @@ use tempfile::tempfile;
|
|||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
|
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
|
||||||
|
|
||||||
use crate::analytics::{Analytics, DocumentDeletionKind};
|
use crate::analytics::{Analytics, DocumentDeletionKind, DocumentFetchKind};
|
||||||
use crate::error::MeilisearchHttpError;
|
use crate::error::MeilisearchHttpError;
|
||||||
use crate::error::PayloadError::ReceivePayload;
|
use crate::error::PayloadError::ReceivePayload;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
@ -97,10 +97,14 @@ pub async fn get_document(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
|
||||||
document_param: web::Path<DocumentParam>,
|
document_param: web::Path<DocumentParam>,
|
||||||
params: AwebQueryParameter<GetDocument, DeserrQueryParamError>,
|
params: AwebQueryParameter<GetDocument, DeserrQueryParamError>,
|
||||||
|
req: HttpRequest,
|
||||||
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let DocumentParam { index_uid, document_id } = document_param.into_inner();
|
let DocumentParam { index_uid, document_id } = document_param.into_inner();
|
||||||
let index_uid = IndexUid::try_from(index_uid)?;
|
let index_uid = IndexUid::try_from(index_uid)?;
|
||||||
|
|
||||||
|
analytics.get_fetch_documents(&DocumentFetchKind::PerDocumentId, &req);
|
||||||
|
|
||||||
let GetDocument { fields } = params.into_inner();
|
let GetDocument { fields } = params.into_inner();
|
||||||
let attributes_to_retrieve = fields.merge_star_and_none();
|
let attributes_to_retrieve = fields.merge_star_and_none();
|
||||||
|
|
||||||
@ -161,16 +165,31 @@ pub async fn documents_by_query_post(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
body: AwebJson<BrowseQuery, DeserrJsonError>,
|
body: AwebJson<BrowseQuery, DeserrJsonError>,
|
||||||
|
req: HttpRequest,
|
||||||
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
debug!("called with body: {:?}", body);
|
debug!("called with body: {:?}", body);
|
||||||
|
|
||||||
documents_by_query(&index_scheduler, index_uid, body.into_inner())
|
let body = body.into_inner();
|
||||||
|
|
||||||
|
analytics.post_fetch_documents(
|
||||||
|
&DocumentFetchKind::Normal {
|
||||||
|
with_filter: body.filter.is_some(),
|
||||||
|
limit: body.limit,
|
||||||
|
offset: body.offset,
|
||||||
|
},
|
||||||
|
&req,
|
||||||
|
);
|
||||||
|
|
||||||
|
documents_by_query(&index_scheduler, index_uid, body)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_documents(
|
pub async fn get_documents(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
params: AwebQueryParameter<BrowseQueryGet, DeserrQueryParamError>,
|
params: AwebQueryParameter<BrowseQueryGet, DeserrQueryParamError>,
|
||||||
|
req: HttpRequest,
|
||||||
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
debug!("called with params: {:?}", params);
|
debug!("called with params: {:?}", params);
|
||||||
|
|
||||||
@ -191,6 +210,15 @@ pub async fn get_documents(
|
|||||||
filter,
|
filter,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
analytics.get_fetch_documents(
|
||||||
|
&DocumentFetchKind::Normal {
|
||||||
|
with_filter: query.filter.is_some(),
|
||||||
|
limit: query.limit,
|
||||||
|
offset: query.offset,
|
||||||
|
},
|
||||||
|
&req,
|
||||||
|
);
|
||||||
|
|
||||||
documents_by_query(&index_scheduler, index_uid, query)
|
documents_by_query(&index_scheduler, index_uid, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user