diff --git a/meilisearch/src/analytics/mod.rs b/meilisearch/src/analytics/mod.rs index a3b8d6d1d..d08f3307c 100644 --- a/meilisearch/src/analytics/mod.rs +++ b/meilisearch/src/analytics/mod.rs @@ -22,9 +22,7 @@ pub use segment_analytics::SimilarAggregator; use crate::Opt; -use self::segment_analytics::extract_user_agents; -pub type MultiSearchAggregator = segment_analytics::MultiSearchAggregator; -pub type FacetSearchAggregator = segment_analytics::FacetSearchAggregator; +pub use self::segment_analytics::MultiSearchAggregator; /// A macro used to quickly define events that don't aggregate or send anything besides an empty event with its name. #[macro_export] diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 00a3adaaf..1edfa1bdd 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -5,7 +5,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; -use actix_web::http::header::{CONTENT_TYPE, USER_AGENT}; +use actix_web::http::header::USER_AGENT; use actix_web::HttpRequest; use byte_unit::Byte; use index_scheduler::IndexScheduler; @@ -24,21 +24,15 @@ use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use uuid::Uuid; -use super::{ - config_user_id_path, Aggregate, AggregateMethod, DocumentDeletionKind, DocumentFetchKind, - MEILISEARCH_CONFIG_PATH, -}; +use super::{config_user_id_path, Aggregate, AggregateMethod, MEILISEARCH_CONFIG_PATH}; use crate::option::{ default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot, }; -use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery}; -use crate::routes::indexes::facet_search::FacetSearchQuery; use crate::routes::{create_all_stats, Stats}; use crate::search::{ - FacetSearchResult, FederatedSearch, MatchingStrategy, SearchQuery, SearchQueryWithIndex, - SearchResult, SimilarQuery, SimilarResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, - DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, - DEFAULT_SEMANTIC_RATIO, + FederatedSearch, SearchQuery, SearchQueryWithIndex, SearchResult, SimilarQuery, SimilarResult, + DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, + DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEMANTIC_RATIO, }; use crate::{aggregate_methods, Opt}; @@ -75,6 +69,7 @@ pub struct Message { // Thus we have to send it in the message directly. type_id: TypeId, // Same for the aggregate function. + #[allow(clippy::type_complexity)] aggregator_function: fn(Box, Box) -> Option>, event: Event, } @@ -169,97 +164,6 @@ impl SegmentAnalytics { } } -/* -impl super::Analytics for SegmentAnalytics { - fn instance_uid(&self) -> Option<&InstanceUid> { - Some(&self.instance_uid) - } - - fn publish(&self, event_name: String, mut send: Value, request: Option<&HttpRequest>) { - let user_agent = request.map(extract_user_agents); - - send["user-agent"] = json!(user_agent); - let event = Track { - user: self.user.clone(), - event: event_name.clone(), - properties: send, - ..Default::default() - }; - let _ = self.sender.try_send(AnalyticsMsg::BatchMessage(event)); - } - - fn get_search(&self, aggregate: SearchAggregator) { - let _ = self.sender.try_send(AnalyticsMsg::AggregateGetSearch(aggregate)); - } - - fn post_search(&self, aggregate: SearchAggregator) { - let _ = self.sender.try_send(AnalyticsMsg::AggregatePostSearch(aggregate)); - } - - fn get_similar(&self, aggregate: SimilarAggregator) { - let _ = self.sender.try_send(AnalyticsMsg::AggregateGetSimilar(aggregate)); - } - - fn post_similar(&self, aggregate: SimilarAggregator) { - let _ = self.sender.try_send(AnalyticsMsg::AggregatePostSimilar(aggregate)); - } - - fn post_facet_search(&self, aggregate: FacetSearchAggregator) { - let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFacetSearch(aggregate)); - } - - fn post_multi_search(&self, aggregate: MultiSearchAggregator) { - let _ = self.sender.try_send(AnalyticsMsg::AggregatePostMultiSearch(aggregate)); - } - - fn add_documents( - &self, - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ) { - let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request); - let _ = self.sender.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate)); - } - - fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest) { - let aggregate = DocumentsDeletionAggregator::from_query(kind, request); - let _ = self.sender.try_send(AnalyticsMsg::AggregateDeleteDocuments(aggregate)); - } - - fn update_documents( - &self, - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ) { - let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request); - let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); - } - - fn update_documents_by_function( - &self, - documents_query: &DocumentEditionByFunction, - index_creation: bool, - request: &HttpRequest, - ) { - let aggregate = - EditDocumentsByFunctionAggregator::from_query(documents_query, index_creation, request); - let _ = self.sender.try_send(AnalyticsMsg::AggregateEditDocumentsByFunction(aggregate)); - } - - fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) { - let aggregate = DocumentsFetchAggregator::from_query(documents_query, request); - let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate)); - } - - fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) { - let aggregate = DocumentsFetchAggregator::from_query(documents_query, request); - let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate)); - } -} -*/ - /// This structure represent the `infos` field we send in the analytics. /// It's quite close to the `Opt` structure except all sensitive informations /// have been simplified to a boolean. @@ -536,13 +440,16 @@ impl Segment { properties["requests"]["total_received"] = total.into(); }; - self.batcher.push(Track { - user: self.user.clone(), - event: name.to_string(), - properties, - timestamp: Some(timestamp), - ..Default::default() - }); + let _ = self + .batcher + .push(Track { + user: self.user.clone(), + event: name.to_string(), + properties, + timestamp: Some(timestamp), + ..Default::default() + }) + .await; } let _ = self.batcher.flush().await; @@ -1181,479 +1088,6 @@ impl Aggregate for MultiSearchAggregator { } } -#[derive(Default)] -pub struct FacetSearchAggregator { - timestamp: Option, - - // context - user_agents: HashSet, - - // requests - total_received: usize, - total_succeeded: usize, - time_spent: BinaryHeap, - - // The set of all facetNames that were used - facet_names: HashSet, - - // As there been any other parameter than the facetName or facetQuery ones? - additional_search_parameters_provided: bool, -} - -impl FacetSearchAggregator { - #[allow(clippy::field_reassign_with_default)] - pub fn from_query(query: &FacetSearchQuery, request: &HttpRequest) -> Self { - let FacetSearchQuery { - facet_query: _, - facet_name, - vector, - q, - filter, - matching_strategy, - attributes_to_search_on, - hybrid, - ranking_score_threshold, - locales, - } = query; - - let mut ret = Self::default(); - ret.timestamp = Some(OffsetDateTime::now_utc()); - - ret.total_received = 1; - ret.user_agents = extract_user_agents(request).into_iter().collect(); - ret.facet_names = Some(facet_name.clone()).into_iter().collect(); - - ret.additional_search_parameters_provided = q.is_some() - || vector.is_some() - || filter.is_some() - || *matching_strategy != MatchingStrategy::default() - || attributes_to_search_on.is_some() - || hybrid.is_some() - || ranking_score_threshold.is_some() - || locales.is_some(); - - ret - } - - pub fn succeed(&mut self, result: &FacetSearchResult) { - let FacetSearchResult { facet_hits: _, facet_query: _, processing_time_ms } = result; - self.total_succeeded = self.total_succeeded.saturating_add(1); - self.time_spent.push(*processing_time_ms as usize); - } - - /// Aggregate one [FacetSearchAggregator] into another. - pub fn aggregate(&mut self, mut other: Self) { - let Self { - timestamp, - user_agents, - total_received, - total_succeeded, - ref mut time_spent, - facet_names, - additional_search_parameters_provided, - } = other; - - if self.timestamp.is_none() { - self.timestamp = timestamp; - } - - // context - for user_agent in user_agents.into_iter() { - self.user_agents.insert(user_agent); - } - - // request - self.total_received = self.total_received.saturating_add(total_received); - self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded); - self.time_spent.append(time_spent); - - // facet_names - for facet_name in facet_names.into_iter() { - self.facet_names.insert(facet_name); - } - - // additional_search_parameters_provided - self.additional_search_parameters_provided |= additional_search_parameters_provided; - } - - pub fn into_event(self, user: &User, event_name: &str) -> Option { - let Self { - timestamp, - user_agents, - total_received, - total_succeeded, - time_spent, - facet_names, - additional_search_parameters_provided, - } = self; - - if total_received == 0 { - None - } else { - // the index of the 99th percentage of value - let percentile_99th = 0.99 * (total_succeeded as f64 - 1.) + 1.; - // we get all the values in a sorted manner - let time_spent = time_spent.into_sorted_vec(); - // We are only interested by the slowest value of the 99th fastest results - let time_spent = time_spent.get(percentile_99th as usize); - - let properties = json!({ - "user-agent": user_agents, - "requests": { - "99th_response_time": time_spent.map(|t| format!("{:.2}", t)), - "total_succeeded": total_succeeded, - "total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics - "total_received": total_received, - }, - "facets": { - "total_distinct_facet_count": facet_names.len(), - "additional_search_parameters_provided": additional_search_parameters_provided, - }, - }); - - Some(Track { - timestamp, - user: user.clone(), - event: event_name.to_string(), - properties, - ..Default::default() - }) - } - } -} - -#[derive(Default)] -pub struct DocumentsAggregator { - timestamp: Option, - - // set to true when at least one request was received - updated: bool, - - // context - user_agents: HashSet, - - content_types: HashSet, - primary_keys: HashSet, - index_creation: bool, -} - -impl DocumentsAggregator { - pub fn from_query( - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ) -> Self { - let UpdateDocumentsQuery { primary_key, csv_delimiter: _ } = documents_query; - - let mut primary_keys = HashSet::new(); - if let Some(primary_key) = primary_key.clone() { - primary_keys.insert(primary_key); - } - - let mut content_types = HashSet::new(); - let content_type = request - .headers() - .get(CONTENT_TYPE) - .and_then(|s| s.to_str().ok()) - .unwrap_or("unknown") - .to_string(); - content_types.insert(content_type); - - Self { - timestamp: Some(OffsetDateTime::now_utc()), - updated: true, - user_agents: extract_user_agents(request).into_iter().collect(), - content_types, - primary_keys, - index_creation, - } - } - - /// Aggregate one [DocumentsAggregator] into another. - pub fn aggregate(&mut self, other: Self) { - let Self { timestamp, user_agents, primary_keys, content_types, index_creation, updated } = - other; - - if self.timestamp.is_none() { - self.timestamp = timestamp; - } - - self.updated |= updated; - // we can't create a union because there is no `into_union` method - for user_agent in user_agents { - self.user_agents.insert(user_agent); - } - for primary_key in primary_keys { - self.primary_keys.insert(primary_key); - } - for content_type in content_types { - self.content_types.insert(content_type); - } - self.index_creation |= index_creation; - } - - pub fn into_event(self, user: &User, event_name: &str) -> Option { - let Self { timestamp, user_agents, primary_keys, content_types, index_creation, updated } = - self; - - if !updated { - None - } else { - let properties = json!({ - "user-agent": user_agents, - "payload_type": content_types, - "primary_key": primary_keys, - "index_creation": index_creation, - }); - - Some(Track { - timestamp, - user: user.clone(), - event: event_name.to_string(), - properties, - ..Default::default() - }) - } - } -} - -#[derive(Default)] -pub struct EditDocumentsByFunctionAggregator { - timestamp: Option, - - // Set to true if at least one request was filtered - filtered: bool, - // Set to true if at least one request contained a context - with_context: bool, - - // context - user_agents: HashSet, - - index_creation: bool, -} - -impl EditDocumentsByFunctionAggregator { - pub fn from_query( - documents_query: &DocumentEditionByFunction, - index_creation: bool, - request: &HttpRequest, - ) -> Self { - let DocumentEditionByFunction { filter, context, function: _ } = documents_query; - - Self { - timestamp: Some(OffsetDateTime::now_utc()), - user_agents: extract_user_agents(request).into_iter().collect(), - filtered: filter.is_some(), - with_context: context.is_some(), - index_creation, - } - } - - /// Aggregate one [DocumentsAggregator] into another. - pub fn aggregate(&mut self, other: Self) { - let Self { timestamp, user_agents, index_creation, filtered, with_context } = other; - - if self.timestamp.is_none() { - self.timestamp = timestamp; - } - - // we can't create a union because there is no `into_union` method - for user_agent in user_agents { - self.user_agents.insert(user_agent); - } - self.index_creation |= index_creation; - self.filtered |= filtered; - self.with_context |= with_context; - } - - pub fn into_event(self, user: &User, event_name: &str) -> Option { - let Self { timestamp, user_agents, index_creation, filtered, with_context } = self; - - // if we had no timestamp it means we never encountered any events and - // thus we don't need to send this event. - let timestamp = timestamp?; - - let properties = json!({ - "user-agent": user_agents, - "filtered": filtered, - "with_context": with_context, - "index_creation": index_creation, - }); - - Some(Track { - timestamp: Some(timestamp), - user: user.clone(), - event: event_name.to_string(), - properties, - ..Default::default() - }) - } -} - -#[derive(Default, Serialize)] -pub struct DocumentsDeletionAggregator { - #[serde(skip)] - timestamp: Option, - - // context - #[serde(rename = "user-agent")] - user_agents: HashSet, - - #[serde(rename = "requests.total_received")] - total_received: usize, - per_document_id: bool, - clear_all: bool, - per_batch: bool, - per_filter: bool, -} - -impl DocumentsDeletionAggregator { - pub fn from_query(kind: DocumentDeletionKind, request: &HttpRequest) -> Self { - Self { - timestamp: Some(OffsetDateTime::now_utc()), - user_agents: extract_user_agents(request).into_iter().collect(), - total_received: 1, - per_document_id: matches!(kind, DocumentDeletionKind::PerDocumentId), - clear_all: matches!(kind, DocumentDeletionKind::ClearAll), - per_batch: matches!(kind, DocumentDeletionKind::PerBatch), - per_filter: matches!(kind, DocumentDeletionKind::PerFilter), - } - } - - /// Aggregate one [DocumentsAggregator] into another. - pub fn aggregate(&mut self, other: Self) { - let Self { - timestamp, - user_agents, - total_received, - per_document_id, - clear_all, - per_batch, - per_filter, - } = other; - - if self.timestamp.is_none() { - self.timestamp = timestamp; - } - - // we can't create a union because there is no `into_union` method - for user_agent in user_agents { - self.user_agents.insert(user_agent); - } - self.total_received = self.total_received.saturating_add(total_received); - self.per_document_id |= per_document_id; - self.clear_all |= clear_all; - self.per_batch |= per_batch; - self.per_filter |= per_filter; - } - - pub fn into_event(self, user: &User, event_name: &str) -> Option { - // if we had no timestamp it means we never encountered any events and - // thus we don't need to send this event. - let timestamp = self.timestamp?; - - Some(Track { - timestamp: Some(timestamp), - user: user.clone(), - event: event_name.to_string(), - properties: serde_json::to_value(self).ok()?, - ..Default::default() - }) - } -} - -#[derive(Default, Serialize)] -pub struct DocumentsFetchAggregator { - #[serde(skip)] - timestamp: Option, - - // context - #[serde(rename = "user-agent")] - user_agents: HashSet, - - #[serde(rename = "requests.total_received")] - total_received: usize, - - // a call on ../documents/:doc_id - per_document_id: bool, - // if a filter was used - per_filter: bool, - - #[serde(rename = "vector.retrieve_vectors")] - retrieve_vectors: bool, - - // pagination - #[serde(rename = "pagination.max_limit")] - max_limit: usize, - #[serde(rename = "pagination.max_offset")] - max_offset: usize, -} - -impl DocumentsFetchAggregator { - pub fn from_query(query: &DocumentFetchKind, request: &HttpRequest) -> Self { - let (limit, offset, retrieve_vectors) = match query { - DocumentFetchKind::PerDocumentId { retrieve_vectors } => (1, 0, *retrieve_vectors), - DocumentFetchKind::Normal { limit, offset, retrieve_vectors, .. } => { - (*limit, *offset, *retrieve_vectors) - } - }; - Self { - timestamp: Some(OffsetDateTime::now_utc()), - user_agents: extract_user_agents(request).into_iter().collect(), - total_received: 1, - per_document_id: matches!(query, DocumentFetchKind::PerDocumentId { .. }), - per_filter: matches!(query, DocumentFetchKind::Normal { with_filter, .. } if *with_filter), - max_limit: limit, - max_offset: offset, - retrieve_vectors, - } - } - - /// Aggregate one [DocumentsFetchAggregator] into another. - pub fn aggregate(&mut self, other: Self) { - let Self { - timestamp, - user_agents, - total_received, - per_document_id, - per_filter, - max_limit, - max_offset, - retrieve_vectors, - } = other; - - if self.timestamp.is_none() { - self.timestamp = timestamp; - } - for user_agent in user_agents { - self.user_agents.insert(user_agent); - } - - self.total_received = self.total_received.saturating_add(total_received); - self.per_document_id |= per_document_id; - self.per_filter |= per_filter; - - self.max_limit = self.max_limit.max(max_limit); - self.max_offset = self.max_offset.max(max_offset); - - self.retrieve_vectors |= retrieve_vectors; - } - - pub fn into_event(self, user: &User, event_name: &str) -> Option { - // if we had no timestamp it means we never encountered any events and - // thus we don't need to send this event. - let timestamp = self.timestamp?; - - Some(Track { - timestamp: Some(timestamp), - user: user.clone(), - event: event_name.to_string(), - properties: serde_json::to_value(self).ok()?, - ..Default::default() - }) - } -} - aggregate_methods!( SimilarPOST => "Similar POST", SimilarGET => "Similar GET", diff --git a/meilisearch/src/routes/indexes/facet_search.rs b/meilisearch/src/routes/indexes/facet_search.rs index 715eaaaa7..8e40397c7 100644 --- a/meilisearch/src/routes/indexes/facet_search.rs +++ b/meilisearch/src/routes/indexes/facet_search.rs @@ -9,7 +9,6 @@ use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::ResponseError; use meilisearch_types::index_uid::IndexUid; use meilisearch_types::locales::Locale; -use serde::Serialize; use serde_json::Value; use tracing::debug;