diff --git a/meilisearch-http/src/analytics.rs b/meilisearch-http/src/analytics.rs index c10b65457..60f03e2d5 100644 --- a/meilisearch-http/src/analytics.rs +++ b/meilisearch-http/src/analytics.rs @@ -1,5 +1,7 @@ use actix_web::HttpRequest; +use meilisearch_lib::index::SearchQuery; use serde_json::Value; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::fs::read_to_string; @@ -11,12 +13,14 @@ mod segment { use crate::analytics::Analytics; use actix_web::http::header::USER_AGENT; use actix_web::HttpRequest; + use meilisearch_lib::index::SearchQuery; use meilisearch_lib::index_controller::Stats; use meilisearch_lib::MeiliSearch; use once_cell::sync::Lazy; use segment::message::{Identify, Track, User}; use segment::{AutoBatcher, Batcher, HttpClient}; use serde_json::{json, Value}; + use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::fs; use std::time::{Duration, Instant}; @@ -32,6 +36,8 @@ mod segment { user: User, opt: Opt, batcher: Mutex, + post_search_batcher: Mutex, + get_search_batcher: Mutex, } impl SegmentAnalytics { @@ -103,6 +109,8 @@ mod segment { user, opt: opt.clone(), batcher, + post_search_batcher: Mutex::new(SearchBatcher::default()), + get_search_batcher: Mutex::new(SearchBatcher::default()), }); let segment = Box::leak(segment); @@ -141,6 +149,92 @@ mod segment { } }); } + + fn start_search( + &'static self, + getter: impl Fn(&'static Self) -> &'static Mutex + Send + Sync + 'static, + query: &SearchQuery, + request: &HttpRequest, + ) { + let user_agent = SearchBatcher::extract_user_agents(request); + let sorted = query.sort.is_some() as usize; + let sort_with_geo_point = query + .sort + .as_ref() + .map_or(false, |s| s.iter().any(|s| s.contains("_geoPoint("))); + let sort_criteria_terms = query.sort.as_ref().map_or(0, |s| s.len()); + + // since there is quite a bit of computation made on the filter we are going to do that in the async task + let filter = query.filter.clone(); + let queried = query.q.is_some(); + let nb_terms = query.q.as_ref().map_or(0, |s| s.split_whitespace().count()); + + let max_limit = query.limit; + let max_offset = query.offset.unwrap_or_default(); + + // to avoid blocking the search we are going to do the heavier computation in an async task + // and take the mutex in the same task + tokio::spawn(async move { + let filtered = filter.is_some() as usize; + let syntax = match filter.as_ref() { + Some(Value::String(_)) => "string".to_string(), + Some(Value::Array(values)) => { + if values.iter().map(|v| v.to_string()).any(|s| { + s.contains(['=', '<', '>', '!'].as_ref()) + || s.contains("_geoRadius") + || s.contains("TO") + }) { + "mixed".to_string() + } else { + "array".to_string() + } + } + _ => "none".to_string(), + }; + let stringified_filters = filter.map_or(String::new(), |v| v.to_string()); + let filter_with_geo_radius = stringified_filters.contains("_geoRadius("); + let filter_number_of_criteria = stringified_filters + .split("!=") + .map(|s| s.split("<=")) + .flatten() + .map(|s| s.split(">=")) + .flatten() + .map(|s| s.split(['=', '<', '>', '!'].as_ref())) + .flatten() + .map(|s| s.split("_geoRadius(")) + .flatten() + .map(|s| s.split("TO")) + .flatten() + .count() + - 1; + + println!("Batching a search"); + let mut search_batcher = getter(self).lock().await; + user_agent.into_iter().for_each(|ua| { + search_batcher.user_agents.insert(ua); + }); + search_batcher.total_received += 1; + + // sort + search_batcher.sort_with_geo_point |= sort_with_geo_point; + search_batcher.sort_sum_of_criteria_terms += sort_criteria_terms; + search_batcher.sort_total_number_of_criteria += sorted; + + // filter + search_batcher.filter_with_geo_radius |= filter_with_geo_radius; + search_batcher.filter_sum_of_criteria_terms += filter_number_of_criteria; + search_batcher.filter_total_number_of_criteria += filtered as usize; + *search_batcher.used_syntax.entry(syntax).or_insert(0) += 1; + + // q + search_batcher.sum_of_terms_count += nb_terms; + search_batcher.total_number_of_q += queried as usize; + + // pagination + search_batcher.max_limit = search_batcher.max_limit.max(max_limit); + search_batcher.max_offset = search_batcher.max_offset.max(max_offset); + }); + } } #[async_trait::async_trait] @@ -160,7 +254,7 @@ mod segment { .push(Track { user: self.user.clone(), event: event_name.clone(), - context: content_type.map(|user_agent| json!({ "user-agent": user_agent.split(";").map(|u| u.trim()).collect::>() })), + context: content_type.map(|user_agent| json!({ "user-agent": user_agent.split(";").map(str::trim).collect::>() })), properties: send, ..Default::default() }) @@ -168,6 +262,30 @@ mod segment { println!("ANALYTICS {} pushed", event_name); }); } + + fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest) { + self.start_search(|s| &s.get_search_batcher, query, request) + } + + fn end_get_search(&'static self, process_time: usize) { + tokio::spawn(async move { + let mut search_batcher = self.get_search_batcher.lock().await; + search_batcher.total_succeeded += 1; + search_batcher.time_spent.push(process_time); + }); + } + + fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest) { + self.start_search(|s| &s.post_search_batcher, query, request) + } + + fn end_post_search(&'static self, process_time: usize) { + tokio::spawn(async move { + let mut search_batcher = self.get_search_batcher.lock().await; + search_batcher.total_succeeded += 1; + search_batcher.time_spent.push(process_time); + }); + } } impl Display for SegmentAnalytics { @@ -175,6 +293,96 @@ mod segment { write!(f, "{}", self.user) } } + + #[derive(Default)] + pub struct SearchBatcher { + // context + user_agents: HashSet, + + // requests + total_received: usize, + total_succeeded: usize, + time_spent: Vec, + + // sort + sort_with_geo_point: bool, + // everytime a request has a filter, this field must be incremented by the number of terms it contains + sort_sum_of_criteria_terms: usize, + // everytime a request has a filter, this field must be incremented by one + sort_total_number_of_criteria: usize, + + // filter + filter_with_geo_radius: bool, + // everytime a request has a filter, this field must be incremented by the number of terms it contains + filter_sum_of_criteria_terms: usize, + // everytime a request has a filter, this field must be incremented by one + filter_total_number_of_criteria: usize, + used_syntax: HashMap, + + // q + // everytime a request has a q field, this field must be incremented by the number of terms + sum_of_terms_count: usize, + // everytime a request has a q field, this field must be incremented by one + total_number_of_q: usize, + + // pagination + max_limit: usize, + max_offset: usize, + } + + impl SearchBatcher { + pub fn extract_user_agents(request: &HttpRequest) -> Vec { + request + .headers() + .get(USER_AGENT) + .map(|header| header.to_str().ok()) + .flatten() + .unwrap_or("unknown") + .split(";") + .map(str::trim) + .map(ToString::to_string) + .collect() + } + + pub fn into_event(mut self, user: User, event_name: String) -> Track { + let context = Some(json!({ "user-agent": self.user_agents})); + let percentile_99th = 0.99 * (self.total_succeeded as f64 - 1.) + 1.; + self.time_spent.drain(percentile_99th as usize..); + + let properties = json!({ + "requests": { + "99th_response_time": self.time_spent.len() as f64 / self.time_spent.iter().sum::() as f64, + "total_succeeded": self.total_succeeded, + "total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics + "total_received": self.total_received, + }, + "sort": { + "with_geoPoint": self.sort_with_geo_point, + "avg_criteria_number": self.sort_total_number_of_criteria as f64 / self.sort_sum_of_criteria_terms as f64, + }, + "filter": { + "with_geoRadius": self.filter_with_geo_radius, + "avg_criteria_number": self.filter_total_number_of_criteria as f64 / self.filter_sum_of_criteria_terms as f64, + "most_used_syntax": self.used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)), + }, + "q": { + "avg_terms_number": self.total_number_of_q as f64 / self.sum_of_terms_count as f64, + }, + "pagination": { + "max_limit": self.max_limit, + "max_offset": self.max_offset, + }, + }); + + Track { + user, + event: event_name, + context, + properties, + ..Default::default() + } + } + } } // if we are in debug mode OR the analytics feature is disabled @@ -199,8 +407,12 @@ impl MockAnalytics { #[async_trait::async_trait] impl Analytics for MockAnalytics { - /// This is a noop and should be optimized out + // These methods are noop and should be optimized out fn publish(&'static self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {} + fn start_get_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} + fn end_get_search(&'static self, _process_time: usize) {} + fn start_post_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} + fn end_post_search(&'static self, _process_time: usize) {} } impl Display for MockAnalytics { @@ -211,5 +423,16 @@ impl Display for MockAnalytics { #[async_trait::async_trait] pub trait Analytics: Display + Sync + Send { + /// The method used to publish most analytics that do not need to be batched every hours fn publish(&'static self, event_name: String, send: Value, request: Option<&HttpRequest>); + + /// This method should be called to batch a get search request + fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest); + /// This method should be called once a get search request has succeeded + fn end_get_search(&'static self, process_time: usize); + + /// This method should be called to batch a get search request + fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest); + /// This method should be called once a post search request has succeeded + fn end_post_search(&'static self, process_time: usize); }