implements all routes

This commit is contained in:
Tamo 2024-10-16 17:16:33 +02:00
parent e66fccc3f2
commit fdeb47fb54
14 changed files with 337 additions and 295 deletions

View File

@ -16,7 +16,9 @@ use serde::Serialize;
// if the feature analytics is enabled we use the real analytics
pub type SegmentAnalytics = segment_analytics::SegmentAnalytics;
pub use segment_analytics::SearchAggregator;
pub type SimilarAggregator = segment_analytics::SimilarAggregator;
pub use segment_analytics::SimilarAggregator;
use self::segment_analytics::extract_user_agents;
pub type MultiSearchAggregator = segment_analytics::MultiSearchAggregator;
pub type FacetSearchAggregator = segment_analytics::FacetSearchAggregator;
@ -32,14 +34,11 @@ macro_rules! empty_analytics {
$event_name
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
fn aggregate(self, _other: Self) -> Self {
self
}
fn into_event(self) -> serde_json::Value {
fn into_event(self) -> impl serde::Serialize {
serde_json::json!({})
}
}
@ -150,7 +149,8 @@ impl Analytics {
}
/// The method used to publish most analytics that do not need to be batched every hours
pub fn publish(&self, send: impl Aggregate, request: Option<&HttpRequest>) {
pub fn publish(&self, send: impl Aggregate, request: &HttpRequest) {
let Some(segment) = self.inner else { return };
let user_agents = extract_user_agents(request);
}
}

View File

@ -71,25 +71,8 @@ pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> {
.collect()
}
pub enum AnalyticsMsg {
BatchMessage(Track),
AggregateGetSearch(SearchAggregator),
AggregatePostSearch(SearchAggregator),
AggregateGetSimilar(SimilarAggregator),
AggregatePostSimilar(SimilarAggregator),
AggregatePostMultiSearch(MultiSearchAggregator),
AggregatePostFacetSearch(FacetSearchAggregator),
AggregateAddDocuments(DocumentsAggregator),
AggregateDeleteDocuments(DocumentsDeletionAggregator),
AggregateUpdateDocuments(DocumentsAggregator),
AggregateEditDocumentsByFunction(EditDocumentsByFunctionAggregator),
AggregateGetFetchDocuments(DocumentsFetchAggregator),
AggregatePostFetchDocuments(DocumentsFetchAggregator),
}
pub struct SegmentAnalytics {
pub instance_uid: InstanceUid,
sender: Sender<AnalyticsMsg>,
pub user: User,
}
@ -1083,8 +1066,6 @@ impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
#[derive(Default)]
pub struct MultiSearchAggregator {
timestamp: Option<OffsetDateTime>,
// requests
total_received: usize,
total_succeeded: usize,
@ -1103,9 +1084,6 @@ pub struct MultiSearchAggregator {
// federation
use_federation: bool,
// context
user_agents: HashSet<String>,
}
impl MultiSearchAggregator {
@ -1113,10 +1091,6 @@ impl MultiSearchAggregator {
federated_search: &FederatedSearch,
request: &HttpRequest,
) -> Self {
let timestamp = Some(OffsetDateTime::now_utc());
let user_agents = extract_user_agents(request).into_iter().collect();
let use_federation = federated_search.federation.is_some();
let distinct_indexes: HashSet<_> = federated_search
@ -1166,7 +1140,6 @@ impl MultiSearchAggregator {
federated_search.queries.iter().any(|query| query.show_ranking_score_details);
Self {
timestamp,
total_received: 1,
total_succeeded: 0,
total_distinct_index_count: distinct_indexes.len(),
@ -1174,7 +1147,6 @@ impl MultiSearchAggregator {
total_search_count: federated_search.queries.len(),
show_ranking_score,
show_ranking_score_details,
user_agents,
use_federation,
}
}
@ -1182,15 +1154,20 @@ impl MultiSearchAggregator {
pub fn succeed(&mut self) {
self.total_succeeded = self.total_succeeded.saturating_add(1);
}
}
impl Aggregate for MultiSearchAggregator {
fn event_name(&self) -> &'static str {
"Documents Searched by Multi-Search POST"
}
/// Aggregate one [MultiSearchAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
fn aggregate(mut self, other: Self) -> Self {
// write the aggregate in a way that will cause a compilation error if a field is added.
// get ownership of self, replacing it by a default value.
let this = std::mem::take(self);
let this = self;
let timestamp = this.timestamp.or(other.timestamp);
let total_received = this.total_received.saturating_add(other.total_received);
let total_succeeded = this.total_succeeded.saturating_add(other.total_succeeded);
let total_distinct_index_count =
@ -1207,44 +1184,31 @@ impl MultiSearchAggregator {
user_agents.insert(user_agent);
}
// need all fields or compile error
let mut aggregated = Self {
timestamp,
Self {
total_received,
total_succeeded,
total_distinct_index_count,
total_single_index,
total_search_count,
user_agents,
show_ranking_score,
show_ranking_score_details,
use_federation,
// do not add _ or ..Default::default() here
};
// replace the default self with the aggregated value
std::mem::swap(self, &mut aggregated);
}
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
fn into_event(self) -> impl Serialize {
let Self {
timestamp,
total_received,
total_succeeded,
total_distinct_index_count,
total_single_index,
total_search_count,
user_agents,
show_ranking_score,
show_ranking_score_details,
use_federation,
} = self;
if total_received == 0 {
None
} else {
let properties = json!({
"user-agent": user_agents,
json!({
"requests": {
"total_succeeded": total_succeeded,
"total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics
@ -1266,18 +1230,9 @@ impl MultiSearchAggregator {
"federation": {
"use_federation": use_federation,
}
});
Some(Track {
timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
}
#[derive(Default)]
pub struct FacetSearchAggregator {
@ -1752,13 +1707,13 @@ impl DocumentsFetchAggregator {
}
}
aggregate_methods!(
SimilarPOST => "Similar POST",
SimilarGET => "Similar GET",
);
#[derive(Default)]
pub struct SimilarAggregator {
timestamp: Option<OffsetDateTime>,
// context
user_agents: HashSet<String>,
pub struct SimilarAggregator<Method: AggregateMethod> {
// requests
total_received: usize,
total_succeeded: usize,
@ -1787,9 +1742,11 @@ pub struct SimilarAggregator {
show_ranking_score: bool,
show_ranking_score_details: bool,
ranking_score_threshold: bool,
marker: std::marker::PhantomData<Method>,
}
impl SimilarAggregator {
impl<Method: AggregateMethod> SimilarAggregator<Method> {
#[allow(clippy::field_reassign_with_default)]
pub fn from_query(query: &SimilarQuery, request: &HttpRequest) -> Self {
let SimilarQuery {
@ -1854,12 +1811,16 @@ impl SimilarAggregator {
self.time_spent.push(*processing_time_ms as usize);
}
}
impl<Method: AggregateMethod> Aggregate for SimilarAggregator<Method> {
fn event_name(&self) -> &'static str {
Method::event_name()
}
/// Aggregate one [SimilarAggregator] into another.
pub fn aggregate(&mut self, mut other: Self) {
fn aggregate(mut self, mut other: Self) -> Self {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
ref mut time_spent,
@ -1875,17 +1836,9 @@ impl SimilarAggregator {
show_ranking_score_details,
ranking_score_threshold,
retrieve_vectors,
marker: _,
} = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// context
for user_agent in user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
// request
self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
@ -1917,12 +1870,12 @@ impl SimilarAggregator {
self.show_ranking_score |= show_ranking_score;
self.show_ranking_score_details |= show_ranking_score_details;
self.ranking_score_threshold |= ranking_score_threshold;
self
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
fn into_event(self) -> impl Serialize {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
time_spent,
@ -1938,11 +1891,9 @@ impl SimilarAggregator {
show_ranking_score_details,
ranking_score_threshold,
retrieve_vectors,
marker: _,
} = self;
if total_received == 0 {
None
} else {
// we get all the values in a sorted manner
let time_spent = time_spent.into_sorted_vec();
// the index of the 99th percentage of value
@ -1950,8 +1901,7 @@ impl SimilarAggregator {
// We are only interested by the slowest value of the 99th fastest results
let time_spent = time_spent.get(percentile_99th);
let properties = json!({
"user-agent": user_agents,
json!({
"requests": {
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": total_succeeded,
@ -1978,16 +1928,7 @@ impl SimilarAggregator {
"show_ranking_score": show_ranking_score,
"show_ranking_score_details": show_ranking_score_details,
"ranking_score_threshold": ranking_score_threshold,
},
});
Some(Track {
timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
}
})
}
}
}

View File

@ -26,7 +26,7 @@ pub async fn create_dump(
opt: web::Data<Opt>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.publish(DumpAnalytics::default(), Some(&req));
analytics.publish(DumpAnalytics::default(), &req);
let task = KindWithContent::DumpCreation {
keys: auth_controller.list_keys()?,

View File

@ -35,7 +35,7 @@ async fn get_features(
) -> HttpResponse {
let features = index_scheduler.features();
analytics.publish(GetExperimentalFeatureAnalytics::default(), Some(&req));
analytics.publish(GetExperimentalFeatureAnalytics::default(), &req);
let features = features.runtime_features();
debug!(returns = ?features, "Get features");
HttpResponse::Ok().json(features)
@ -83,8 +83,8 @@ impl Aggregate for PatchExperimentalFeatureAnalytics {
}
}
fn into_event(self) -> serde_json::Value {
serde_json::to_value(self).unwrap()
fn into_event(self) -> impl Serialize {
self
}
}
@ -131,7 +131,7 @@ async fn patch_features(
edit_documents_by_function,
contains_filter,
},
Some(&req),
&req,
);
index_scheduler.put_runtime_features(new_features)?;
debug!(returns = ?new_features, "Patch features");

View File

@ -194,7 +194,7 @@ pub async fn get_document(
retrieve_vectors: param_retrieve_vectors.0,
..Default::default()
},
Some(&req),
&req,
);
let index = index_scheduler.index(&index_uid)?;
@ -253,7 +253,7 @@ pub async fn delete_document(
per_document_id: true,
..Default::default()
},
Some(&req),
&req,
);
let task = KindWithContent::DocumentDeletion {
@ -319,7 +319,7 @@ pub async fn documents_by_query_post(
max_offset: body.offset,
..Default::default()
},
Some(&req),
&req,
);
documents_by_query(&index_scheduler, index_uid, body)
@ -361,7 +361,7 @@ pub async fn get_documents(
max_offset: query.offset,
..Default::default()
},
Some(&req),
&req,
);
documents_by_query(&index_scheduler, index_uid, query)
@ -486,7 +486,7 @@ pub async fn replace_documents(
index_creation: index_scheduler.index_exists(&index_uid).map_or(true, |x| !x),
method: PhantomData,
},
Some(&req),
&req,
);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
@ -543,7 +543,7 @@ pub async fn update_documents(
index_creation: index_scheduler.index_exists(&index_uid).map_or(true, |x| !x),
method: PhantomData,
},
Some(&req),
&req,
);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
@ -718,7 +718,7 @@ pub async fn delete_documents_batch(
analytics.publish(
DocumentsDeletionAggregator { total_received: 1, per_batch: true, ..Default::default() },
Some(&req),
&req,
);
let ids = body
@ -761,7 +761,7 @@ pub async fn delete_documents_by_filter(
analytics.publish(
DocumentsDeletionAggregator { total_received: 1, per_filter: true, ..Default::default() },
Some(&req),
&req,
);
// we ensure the filter is well formed before enqueuing it
@ -847,7 +847,7 @@ pub async fn edit_documents_by_function(
with_context: params.context.is_some(),
index_creation: index_scheduler.index(&index_uid).is_err(),
},
Some(&req),
&req,
);
let DocumentEditionByFunction { filter, context, function } = params;
@ -902,7 +902,7 @@ pub async fn clear_all_documents(
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
analytics.publish(
DocumentsDeletionAggregator { total_received: 1, clear_all: true, ..Default::default() },
Some(&req),
&req,
);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };

View File

@ -200,7 +200,7 @@ pub async fn search(
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}
analytics.publish(aggregate, Some(&req));
analytics.publish(aggregate, &req);
let search_result = search_result?;

View File

@ -160,7 +160,7 @@ pub async fn create_index(
if allow_index_creation {
analytics.publish(
IndexCreatedAggregate { primary_key: primary_key.iter().cloned().collect() },
Some(&req),
&req,
);
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
@ -247,7 +247,7 @@ pub async fn update_index(
let body = body.into_inner();
analytics.publish(
IndexUpdatedAggregate { primary_key: body.primary_key.iter().cloned().collect() },
Some(&req),
&req,
);
let task = KindWithContent::IndexUpdate {

View File

@ -255,7 +255,7 @@ pub async fn search_with_url_query(
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}
analytics.publish(aggregate, Some(&req));
analytics.publish(aggregate, &req);
let search_result = search_result?;
@ -303,7 +303,7 @@ pub async fn search_with_post(
MEILISEARCH_DEGRADED_SEARCH_REQUESTS.inc();
}
}
analytics.publish(aggregate, Some(&req));
analytics.publish(aggregate, &req);
let search_result = search_result?;

View File

@ -8,6 +8,7 @@ use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::ResponseError;
use meilisearch_types::facet_values_sort::FacetValuesSort;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::locales::Locale;
use meilisearch_types::milli::update::Setting;
use meilisearch_types::settings::{
settings, ProximityPrecisionView, RankingRuleView, SecretPolicy, Settings, Unchecked,
@ -94,7 +95,7 @@ macro_rules! make_setting_route {
#[allow(clippy::redundant_closure_call)]
analytics.publish(
$crate::routes::indexes::settings::$analytics::new(body.as_ref()).to_settings(),
Some(&req),
&req,
);
let new_settings = Settings {
@ -491,11 +492,11 @@ impl Aggregate for SettingsAnalytics {
has_geo: self.filterable_attributes.has_geo.or(other.filterable_attributes.has_geo),
},
distinct_attribute: DistinctAttributeAnalytics {
set: self.distinct_attribute.set.or(other.distinct_attribute.set),
set: self.distinct_attribute.set | other.distinct_attribute.set,
},
proximity_precision: ProximityPrecisionAnalytics {
set: self.proximity_precision.set(other.proximity_precision.set),
value: self.proximity_precision.value(other.proximity_precision.value),
set: self.proximity_precision.set | other.proximity_precision.set,
value: self.proximity_precision.value.or(other.proximity_precision.value),
},
typo_tolerance: TypoToleranceAnalytics {
enabled: self.typo_tolerance.enabled.or(other.typo_tolerance.enabled),
@ -542,7 +543,7 @@ impl Aggregate for SettingsAnalytics {
sources: match (self.embedders.sources, other.embedders.sources) {
(None, None) => None,
(Some(sources), None) | (None, Some(sources)) => Some(sources),
(Some(this), Some(other)) => Some(this.union(&other).collect()),
(Some(this), Some(other)) => Some(this.union(&other).cloned().collect()),
},
document_template_used: match (
self.embedders.document_template_used,
@ -598,45 +599,70 @@ impl Aggregate for SettingsAnalytics {
#[derive(Serialize, Default)]
struct RankingRulesAnalytics {
words_position: Option<bool>,
typo_position: Option<bool>,
proximity_position: Option<bool>,
attribute_position: Option<bool>,
sort_position: Option<bool>,
exactness_position: Option<bool>,
values: Option<bool>,
words_position: Option<usize>,
typo_position: Option<usize>,
proximity_position: Option<usize>,
attribute_position: Option<usize>,
sort_position: Option<usize>,
exactness_position: Option<usize>,
values: Option<String>,
}
impl RankingRulesAnalytics {
pub fn new(rr: Option<&Vec<RankingRuleView>>) -> Self {
RankingRulesAnalytics {
words_position: rr.as_ref().map(|rr| {
rr.iter()
.position(|s| matches!(s, meilisearch_types::settings::RankingRuleView::Words))
}),
typo_position: rr.as_ref().map(|rr| {
rr.iter()
.position(|s| matches!(s, meilisearch_types::settings::RankingRuleView::Typo))
}),
proximity_position: rr.as_ref().map(|rr| {
words_position: rr
.as_ref()
.map(|rr| {
rr.iter().position(|s| {
matches!(s, meilisearch_types::settings::RankingRuleView::Words)
})
})
.flatten(),
typo_position: rr
.as_ref()
.map(|rr| {
rr.iter().position(|s| {
matches!(s, meilisearch_types::settings::RankingRuleView::Typo)
})
})
.flatten(),
proximity_position: rr
.as_ref()
.map(|rr| {
rr.iter().position(|s| {
matches!(s, meilisearch_types::settings::RankingRuleView::Proximity)
})
}),
attribute_position: rr.as_ref().map(|rr| {
})
.flatten(),
attribute_position: rr
.as_ref()
.map(|rr| {
rr.iter().position(|s| {
matches!(s, meilisearch_types::settings::RankingRuleView::Attribute)
})
}),
sort_position: rr.as_ref().map(|rr| {
rr.iter()
.position(|s| matches!(s, meilisearch_types::settings::RankingRuleView::Sort))
}),
exactness_position: rr.as_ref().map(|rr| {
})
.flatten(),
sort_position: rr
.as_ref()
.map(|rr| {
rr.iter().position(|s| {
matches!(s, meilisearch_types::settings::RankingRuleView::Sort)
})
})
.flatten(),
exactness_position: rr
.as_ref()
.map(|rr| {
rr.iter().position(|s| {
matches!(s, meilisearch_types::settings::RankingRuleView::Exactness)
})
}),
})
.flatten(),
values: rr.as_ref().map(|rr| {
rr.iter()
.filter(|s| {
@ -661,7 +687,7 @@ impl RankingRulesAnalytics {
#[derive(Serialize, Default)]
struct SearchableAttributesAnalytics {
total: Option<usize>,
with_wildcard: bool,
with_wildcard: Option<bool>,
}
impl SearchableAttributesAnalytics {
@ -681,8 +707,8 @@ impl SearchableAttributesAnalytics {
#[derive(Serialize, Default)]
struct DisplayedAttributesAnalytics {
total: usize,
with_wildcard: bool,
total: Option<usize>,
with_wildcard: Option<bool>,
}
impl DisplayedAttributesAnalytics {
@ -702,8 +728,8 @@ impl DisplayedAttributesAnalytics {
#[derive(Serialize, Default)]
struct SortableAttributesAnalytics {
total: usize,
has_geo: bool,
total: Option<usize>,
has_geo: Option<bool>,
}
impl SortableAttributesAnalytics {
@ -721,15 +747,15 @@ impl SortableAttributesAnalytics {
#[derive(Serialize, Default)]
struct FilterableAttributesAnalytics {
total: usize,
has_geo: bool,
total: Option<usize>,
has_geo: Option<bool>,
}
impl FilterableAttributesAnalytics {
pub fn new(setting: Option<&std::collections::BTreeSet<String>>) -> Self {
Self {
total: setting.as_ref().map(|filter| filter.len()).unwrap_or(0),
has_geo: setting.as_ref().map(|filter| filter.contains("_geo")).unwrap_or(false),
total: setting.as_ref().map(|filter| filter.len()),
has_geo: setting.as_ref().map(|filter| filter.contains("_geo")),
}
}
@ -761,7 +787,7 @@ struct ProximityPrecisionAnalytics {
impl ProximityPrecisionAnalytics {
pub fn new(precision: Option<&meilisearch_types::settings::ProximityPrecisionView>) -> Self {
Self { set: precision.is_some(), value: precision.unwrap_or_default() }
Self { set: precision.is_some(), value: precision.cloned() }
}
pub fn to_settings(self) -> SettingsAnalytics {
@ -774,8 +800,8 @@ struct TypoToleranceAnalytics {
enabled: Option<bool>,
disable_on_attributes: Option<bool>,
disable_on_words: Option<bool>,
min_word_size_for_one_typo: Option<bool>,
min_word_size_for_two_typos: Option<bool>,
min_word_size_for_one_typo: Option<u8>,
min_word_size_for_two_typos: Option<u8>,
}
impl TypoToleranceAnalytics {
@ -805,9 +831,9 @@ impl TypoToleranceAnalytics {
#[derive(Serialize, Default)]
struct FacetingAnalytics {
max_values_per_facet: Option<bool>,
max_values_per_facet: Option<usize>,
sort_facet_values_by_star_count: Option<bool>,
sort_facet_values_by_total: Option<bool>,
sort_facet_values_by_total: Option<usize>,
}
impl FacetingAnalytics {
@ -833,7 +859,7 @@ impl FacetingAnalytics {
#[derive(Serialize, Default)]
struct PaginationAnalytics {
max_total_hits: Option<bool>,
max_total_hits: Option<usize>,
}
impl PaginationAnalytics {
@ -909,18 +935,18 @@ impl EmbeddersAnalytics {
{
use meilisearch_types::milli::vector::settings::EmbedderSource;
match source {
EmbedderSource::OpenAi => sources.insert("openAi"),
EmbedderSource::HuggingFace => sources.insert("huggingFace"),
EmbedderSource::UserProvided => sources.insert("userProvided"),
EmbedderSource::Ollama => sources.insert("ollama"),
EmbedderSource::Rest => sources.insert("rest"),
EmbedderSource::OpenAi => sources.insert("openAi".to_string()),
EmbedderSource::HuggingFace => sources.insert("huggingFace".to_string()),
EmbedderSource::UserProvided => sources.insert("userProvided".to_string()),
EmbedderSource::Ollama => sources.insert("ollama".to_string()),
EmbedderSource::Rest => sources.insert("rest".to_string()),
};
}
};
Self {
total: setting.as_ref().map(|s| s.len()),
sources,
sources: Some(sources),
document_template_used: setting.as_ref().map(|map| {
map.values()
.filter_map(|config| config.clone().set())
@ -953,7 +979,7 @@ struct SearchCutoffMsAnalytics {
impl SearchCutoffMsAnalytics {
pub fn new(setting: Option<&u64>) -> Self {
Self { search_cutoff_ms: setting }
Self { search_cutoff_ms: setting.copied() }
}
pub fn to_settings(self) -> SettingsAnalytics {
@ -964,7 +990,7 @@ impl SearchCutoffMsAnalytics {
#[derive(Serialize, Default)]
#[serde(transparent)]
struct LocalesAnalytics {
locales: BTreeSet<String>,
locales: Option<BTreeSet<Locale>>,
}
impl LocalesAnalytics {
@ -988,7 +1014,7 @@ impl LocalesAnalytics {
#[derive(Serialize, Default)]
struct DictionaryAnalytics {
total: usize,
total: Option<usize>,
}
impl DictionaryAnalytics {
@ -1003,7 +1029,7 @@ impl DictionaryAnalytics {
#[derive(Serialize, Default)]
struct SeparatorTokensAnalytics {
total: usize,
total: Option<usize>,
}
impl SeparatorTokensAnalytics {
@ -1018,7 +1044,7 @@ impl SeparatorTokensAnalytics {
#[derive(Serialize, Default)]
struct NonSeparatorTokensAnalytics {
total: usize,
total: Option<usize>,
}
impl NonSeparatorTokensAnalytics {
@ -1088,7 +1114,7 @@ pub async fn update_all(
new_settings.non_separator_tokens.as_ref().set(),
),
},
Some(&req),
&req,
);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);

View File

@ -13,6 +13,7 @@ use serde_json::Value;
use tracing::debug;
use super::ActionPolicy;
use crate::analytics::segment_analytics::{SimilarGET, SimilarPOST};
use crate::analytics::{Analytics, SimilarAggregator};
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
@ -34,13 +35,13 @@ pub async fn similar_get(
index_uid: web::Path<String>,
params: AwebQueryParameter<SimilarQueryGet, DeserrQueryParamError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let query = params.0.try_into()?;
let mut aggregate = SimilarAggregator::from_query(&query, &req);
let mut aggregate = SimilarAggregator::<SimilarGET>::from_query(&query, &req);
debug!(parameters = ?query, "Similar get");
@ -49,7 +50,7 @@ pub async fn similar_get(
if let Ok(similar) = &similar {
aggregate.succeed(similar);
}
analytics.get_similar(aggregate);
analytics.publish(aggregate, &req);
let similar = similar?;
@ -62,21 +63,21 @@ pub async fn similar_post(
index_uid: web::Path<String>,
params: AwebJson<SimilarQuery, DeserrJsonError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let query = params.into_inner();
debug!(parameters = ?query, "Similar post");
let mut aggregate = SimilarAggregator::from_query(&query, &req);
let mut aggregate = SimilarAggregator::<SimilarPOST>::from_query(&query, &req);
let similar = similar(index_scheduler, index_uid, query).await;
if let Ok(similar) = &similar {
aggregate.succeed(similar);
}
analytics.post_similar(aggregate);
analytics.publish(aggregate, &req);
let similar = similar?;

View File

@ -35,7 +35,7 @@ pub async fn multi_search_with_post(
search_queue: Data<SearchQueue>,
params: AwebJson<FederatedSearch, DeserrJsonError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
// Since we don't want to process half of the search requests and then get a permit refused
// we're going to get one permit for the whole duration of the multi-search request.
@ -87,7 +87,7 @@ pub async fn multi_search_with_post(
multi_aggregate.succeed();
}
analytics.post_multi_search(multi_aggregate);
analytics.publish(multi_aggregate, &req);
HttpResponse::Ok().json(search_result??)
}
None => {
@ -149,7 +149,7 @@ pub async fn multi_search_with_post(
if search_results.is_ok() {
multi_aggregate.succeed();
}
analytics.post_multi_search(multi_aggregate);
analytics.publish(multi_aggregate, &req);
let search_results = search_results.map_err(|(mut err, query_index)| {
// Add the query index that failed as context for the error message.

View File

@ -3,7 +3,6 @@ use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::IndexScheduler;
use meilisearch_types::error::ResponseError;
use meilisearch_types::tasks::KindWithContent;
use serde_json::json;
use tracing::debug;
use crate::analytics::Analytics;
@ -17,13 +16,15 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
}
crate::empty_analytics!(SnapshotAnalytics, "Snapshot Created");
pub async fn create_snapshot(
index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
analytics.publish(SnapshotAnalytics::default(), &req);
let task = KindWithContent::SnapshotCreation;
let uid = get_task_id(&req, &opt)?;

View File

@ -8,10 +8,11 @@ use meilisearch_types::error::deserr_codes::InvalidSwapIndexes;
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
use serde::Serialize;
use serde_json::json;
use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics;
use crate::analytics::{Aggregate, Analytics};
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
@ -29,21 +30,34 @@ pub struct SwapIndexesPayload {
indexes: Vec<IndexUid>,
}
#[derive(Serialize)]
struct IndexSwappedAnalytics {
swap_operation_number: usize,
}
impl Aggregate for IndexSwappedAnalytics {
fn event_name(&self) -> &'static str {
"Indexes Swapped"
}
fn aggregate(self, other: Self) -> Self {
Self { swap_operation_number: self.swap_operation_number.max(other.swap_operation_number) }
}
fn into_event(self) -> impl Serialize {
self
}
}
pub async fn swap_indexes(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>,
params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner();
analytics.publish(
"Indexes Swapped".to_string(),
json!({
"swap_operation_number": params.len(), // Return the max ever encountered
}),
Some(&req),
);
analytics.publish(IndexSwappedAnalytics { swap_operation_number: params.len() }, &req);
let filters = index_scheduler.filters();
let mut swaps = vec![];

View File

@ -12,18 +12,17 @@ use meilisearch_types::star_or::{OptionStarOr, OptionStarOrList};
use meilisearch_types::task_view::TaskView;
use meilisearch_types::tasks::{Kind, KindWithContent, Status};
use serde::Serialize;
use serde_json::json;
use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task;
use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics;
use crate::analytics::{Aggregate, AggregateMethod, Analytics};
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
use crate::{aggregate_methods, Opt};
const DEFAULT_LIMIT: u32 = 20;
@ -158,12 +157,69 @@ impl TaskDeletionOrCancelationQuery {
}
}
aggregate_methods!(
CancelTasks => "Tasks Canceled",
DeleteTasks => "Tasks Deleted",
);
#[derive(Serialize)]
struct TaskFilterAnalytics<Method: AggregateMethod> {
filtered_by_uid: bool,
filtered_by_index_uid: bool,
filtered_by_type: bool,
filtered_by_status: bool,
filtered_by_canceled_by: bool,
filtered_by_before_enqueued_at: bool,
filtered_by_after_enqueued_at: bool,
filtered_by_before_started_at: bool,
filtered_by_after_started_at: bool,
filtered_by_before_finished_at: bool,
filtered_by_after_finished_at: bool,
#[serde(skip)]
marker: std::marker::PhantomData<Method>,
}
impl<Method: AggregateMethod> Aggregate for TaskFilterAnalytics<Method> {
fn event_name(&self) -> &'static str {
Method::event_name()
}
fn aggregate(self, other: Self) -> Self {
Self {
filtered_by_uid: self.filtered_by_uid | other.filtered_by_uid,
filtered_by_index_uid: self.filtered_by_index_uid | other.filtered_by_index_uid,
filtered_by_type: self.filtered_by_type | other.filtered_by_type,
filtered_by_status: self.filtered_by_status | other.filtered_by_status,
filtered_by_canceled_by: self.filtered_by_canceled_by | other.filtered_by_canceled_by,
filtered_by_before_enqueued_at: self.filtered_by_before_enqueued_at
| other.filtered_by_before_enqueued_at,
filtered_by_after_enqueued_at: self.filtered_by_after_enqueued_at
| other.filtered_by_after_enqueued_at,
filtered_by_before_started_at: self.filtered_by_before_started_at
| other.filtered_by_before_started_at,
filtered_by_after_started_at: self.filtered_by_after_started_at
| other.filtered_by_after_started_at,
filtered_by_before_finished_at: self.filtered_by_before_finished_at
| other.filtered_by_before_finished_at,
filtered_by_after_finished_at: self.filtered_by_after_finished_at
| other.filtered_by_after_finished_at,
marker: std::marker::PhantomData,
}
}
fn into_event(self) -> impl Serialize {
self
}
}
async fn cancel_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner();
@ -172,21 +228,22 @@ async fn cancel_tasks(
}
analytics.publish(
"Tasks Canceled".to_string(),
json!({
"filtered_by_uid": params.uids.is_some(),
"filtered_by_index_uid": params.index_uids.is_some(),
"filtered_by_type": params.types.is_some(),
"filtered_by_status": params.statuses.is_some(),
"filtered_by_canceled_by": params.canceled_by.is_some(),
"filtered_by_before_enqueued_at": params.before_enqueued_at.is_some(),
"filtered_by_after_enqueued_at": params.after_enqueued_at.is_some(),
"filtered_by_before_started_at": params.before_started_at.is_some(),
"filtered_by_after_started_at": params.after_started_at.is_some(),
"filtered_by_before_finished_at": params.before_finished_at.is_some(),
"filtered_by_after_finished_at": params.after_finished_at.is_some(),
}),
Some(&req),
TaskFilterAnalytics::<CancelTasks> {
filtered_by_uid: params.uids.is_some(),
filtered_by_index_uid: params.index_uids.is_some(),
filtered_by_type: params.types.is_some(),
filtered_by_status: params.statuses.is_some(),
filtered_by_canceled_by: params.canceled_by.is_some(),
filtered_by_before_enqueued_at: params.before_enqueued_at.is_some(),
filtered_by_after_enqueued_at: params.after_enqueued_at.is_some(),
filtered_by_before_started_at: params.before_started_at.is_some(),
filtered_by_after_started_at: params.after_started_at.is_some(),
filtered_by_before_finished_at: params.before_finished_at.is_some(),
filtered_by_after_finished_at: params.after_finished_at.is_some(),
marker: std::marker::PhantomData,
},
&req,
);
let query = params.into_query();
@ -214,7 +271,7 @@ async fn delete_tasks(
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner();
@ -223,22 +280,24 @@ async fn delete_tasks(
}
analytics.publish(
"Tasks Deleted".to_string(),
json!({
"filtered_by_uid": params.uids.is_some(),
"filtered_by_index_uid": params.index_uids.is_some(),
"filtered_by_type": params.types.is_some(),
"filtered_by_status": params.statuses.is_some(),
"filtered_by_canceled_by": params.canceled_by.is_some(),
"filtered_by_before_enqueued_at": params.before_enqueued_at.is_some(),
"filtered_by_after_enqueued_at": params.after_enqueued_at.is_some(),
"filtered_by_before_started_at": params.before_started_at.is_some(),
"filtered_by_after_started_at": params.after_started_at.is_some(),
"filtered_by_before_finished_at": params.before_finished_at.is_some(),
"filtered_by_after_finished_at": params.after_finished_at.is_some(),
}),
Some(&req),
TaskFilterAnalytics::<DeleteTasks> {
filtered_by_uid: params.uids.is_some(),
filtered_by_index_uid: params.index_uids.is_some(),
filtered_by_type: params.types.is_some(),
filtered_by_status: params.statuses.is_some(),
filtered_by_canceled_by: params.canceled_by.is_some(),
filtered_by_before_enqueued_at: params.before_enqueued_at.is_some(),
filtered_by_after_enqueued_at: params.after_enqueued_at.is_some(),
filtered_by_before_started_at: params.before_started_at.is_some(),
filtered_by_after_started_at: params.after_started_at.is_some(),
filtered_by_before_finished_at: params.before_finished_at.is_some(),
filtered_by_after_finished_at: params.after_finished_at.is_some(),
marker: std::marker::PhantomData,
},
&req,
);
let query = params.into_query();
let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes(