Merge b2abef6522b87487ccb6c37155391a03472d29fc into 4224edea28bc66e8de30d6ad69cdab5aaa922335

This commit is contained in:
Louis Dureuil 2025-01-29 16:58:54 +00:00 committed by GitHub
commit 88d2c9b652
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1655 additions and 451 deletions

View File

@ -229,7 +229,7 @@ pub(crate) mod test {
use big_s::S;
use maplit::{btreemap, btreeset};
use meilisearch_types::facet_values_sort::FacetValuesSort;
use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures};
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::keys::{Action, Key};
use meilisearch_types::milli;
@ -455,6 +455,10 @@ pub(crate) mod test {
dump.create_experimental_features(features).unwrap();
// ========== network
let network = create_test_network();
dump.create_network(network).unwrap();
// create the dump
let mut file = tempfile::tempfile().unwrap();
dump.persist_to(&mut file).unwrap();
@ -467,6 +471,13 @@ pub(crate) mod test {
RuntimeTogglableFeatures::default()
}
fn create_test_network() -> Network {
Network {
local: Some("myself".to_string()),
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()) }},
}
}
#[test]
fn test_creating_and_read_dump() {
let mut file = create_test_dump();
@ -515,5 +526,9 @@ pub(crate) mod test {
// ==== checking the features
let expected = create_test_features();
assert_eq!(dump.features().unwrap().unwrap(), expected);
// ==== checking the network
let expected = create_test_network();
assert_eq!(&expected, dump.network().unwrap().unwrap());
}
}

View File

@ -196,6 +196,10 @@ impl CompatV5ToV6 {
pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> {
Ok(None)
}
pub fn network(&self) -> Result<Option<&v6::Network>> {
Ok(None)
}
}
pub enum CompatIndexV5ToV6 {

View File

@ -23,6 +23,7 @@ mod v6;
pub type Document = serde_json::Map<String, serde_json::Value>;
pub type UpdateFile = dyn Iterator<Item = Result<Document>>;
#[allow(clippy::large_enum_variant)]
pub enum DumpReader {
Current(V6Reader),
Compat(CompatV5ToV6),
@ -114,6 +115,13 @@ impl DumpReader {
DumpReader::Compat(compat) => compat.features(),
}
}
pub fn network(&self) -> Result<Option<&v6::Network>> {
match self {
DumpReader::Current(current) => Ok(current.network()),
DumpReader::Compat(compat) => compat.network(),
}
}
}
impl From<V6Reader> for DumpReader {
@ -328,6 +336,7 @@ pub(crate) mod test {
}
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
assert_eq!(dump.network().unwrap(), None);
}
#[test]
@ -373,6 +382,27 @@ pub(crate) mod test {
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
}
#[test]
fn import_dump_v6_network() {
let dump = File::open("tests/assets/v6-with-network.dump").unwrap();
let dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_snapshot!(dump.date().unwrap(), @"2025-01-29 15:45:32.738676 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
// network
let network = dump.network().unwrap().unwrap();
insta::assert_snapshot!(network.local.as_ref().unwrap(), @"ms-0");
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().url, @"http://localhost:7700");
insta::assert_snapshot!(network.remotes.get("ms-0").as_ref().unwrap().search_api_key.is_none(), @"true");
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().url, @"http://localhost:7701");
insta::assert_snapshot!(network.remotes.get("ms-1").as_ref().unwrap().search_api_key.is_none(), @"true");
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().url, @"http://ms-5679.example.meilisearch.io");
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().search_api_key.as_ref().unwrap(), @"foo");
}
#[test]
fn import_dump_v5() {
let dump = File::open("tests/assets/v5.dump").unwrap();

View File

@ -20,6 +20,7 @@ pub type Unchecked = meilisearch_types::settings::Unchecked;
pub type Task = crate::TaskDump;
pub type Key = meilisearch_types::keys::Key;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
pub type Network = meilisearch_types::features::Network;
// ===== Other types to clarify the code of the compat module
// everything related to the tasks
@ -50,6 +51,7 @@ pub struct V6Reader {
tasks: BufReader<File>,
keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>,
network: Option<Network>,
}
impl V6Reader {
@ -78,12 +80,30 @@ impl V6Reader {
None
};
let network_file = match fs::read(dump.path().join("network.json")) {
Ok(network_file) => Some(network_file),
Err(error) => match error.kind() {
// Allows the file to be missing, this will only result in all experimental features disabled.
ErrorKind::NotFound => {
debug!("`network.json` not found in dump");
None
}
_ => return Err(error.into()),
},
};
let network = if let Some(network_file) = network_file {
Some(serde_json::from_reader(&*network_file)?)
} else {
None
};
Ok(V6Reader {
metadata: serde_json::from_reader(&*meta_file)?,
instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features,
network,
dump,
})
}
@ -154,6 +174,10 @@ impl V6Reader {
pub fn features(&self) -> Option<RuntimeTogglableFeatures> {
self.features
}
pub fn network(&self) -> Option<&Network> {
self.network.as_ref()
}
}
pub struct UpdateFile {

View File

@ -4,7 +4,7 @@ use std::path::PathBuf;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::features::{Network, RuntimeTogglableFeatures};
use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings};
use serde_json::{Map, Value};
@ -61,6 +61,10 @@ impl DumpWriter {
)?)
}
pub fn create_network(&self, network: Network) -> Result<()> {
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
}
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
let mut tar_encoder = tar::Builder::new(gz_encoder);
@ -295,7 +299,8 @@ pub(crate) mod test {
---- experimental-features.json
---- instance_uid.uuid
---- keys.jsonl
---- metadata.json
---- metadata.json
---- network.json
"###);
// ==== checking the top level infos

View File

@ -1,6 +1,6 @@
use std::sync::{Arc, RwLock};
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RwTxn};
@ -14,10 +14,16 @@ mod db_name {
pub const EXPERIMENTAL_FEATURES: &str = "experimental-features";
}
mod db_keys {
pub const EXPERIMENTAL_FEATURES: &str = "experimental-features";
pub const NETWORK: &str = "network";
}
#[derive(Clone)]
pub(crate) struct FeatureData {
persisted: Database<Str, SerdeJson<RuntimeTogglableFeatures>>,
runtime: Arc<RwLock<RuntimeTogglableFeatures>>,
network: Arc<RwLock<Network>>,
}
#[derive(Debug, Clone, Copy)]
@ -86,6 +92,19 @@ impl RoFeatures {
.into())
}
}
pub fn check_proxy_search(&self, disabled_action: &'static str) -> Result<()> {
if self.runtime.proxy_search {
Ok(())
} else {
Err(FeatureNotEnabledError {
disabled_action,
feature: "proxy search",
issue_link: "https://github.com/orgs/meilisearch/discussions/805",
}
.into())
}
}
}
impl FeatureData {
@ -102,7 +121,7 @@ impl FeatureData {
env.create_database(wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?;
let persisted_features: RuntimeTogglableFeatures =
runtime_features_db.get(wtxn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default();
runtime_features_db.get(wtxn, db_keys::EXPERIMENTAL_FEATURES)?.unwrap_or_default();
let InstanceTogglableFeatures { metrics, logs_route, contains_filter } = instance_features;
let runtime = Arc::new(RwLock::new(RuntimeTogglableFeatures {
metrics: metrics || persisted_features.metrics,
@ -111,7 +130,14 @@ impl FeatureData {
..persisted_features
}));
Ok(Self { persisted: runtime_features_db, runtime })
let network_db = runtime_features_db.remap_data_type::<SerdeJson<Network>>();
let network: Network = network_db.get(wtxn, db_keys::NETWORK)?.unwrap_or_default();
Ok(Self {
persisted: runtime_features_db,
runtime,
network: Arc::new(RwLock::new(network)),
})
}
pub fn put_runtime_features(
@ -119,7 +145,7 @@ impl FeatureData {
mut wtxn: RwTxn,
features: RuntimeTogglableFeatures,
) -> Result<()> {
self.persisted.put(&mut wtxn, db_name::EXPERIMENTAL_FEATURES, &features)?;
self.persisted.put(&mut wtxn, db_keys::EXPERIMENTAL_FEATURES, &features)?;
wtxn.commit()?;
// safe to unwrap, the lock will only fail if:
@ -140,4 +166,21 @@ impl FeatureData {
pub fn features(&self) -> RoFeatures {
RoFeatures::new(self)
}
pub fn put_network(&self, mut wtxn: RwTxn, new_network: Network) -> Result<()> {
self.persisted.remap_data_type::<SerdeJson<Network>>().put(
&mut wtxn,
db_keys::NETWORK,
&new_network,
)?;
wtxn.commit()?;
let mut network = self.network.write().unwrap();
*network = new_network;
Ok(())
}
pub fn network(&self) -> Network {
Network::clone(&*self.network.read().unwrap())
}
}

View File

@ -51,7 +51,7 @@ pub use features::RoFeatures;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use meilisearch_types::batches::Batch;
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::I128;
use meilisearch_types::heed::{self, Env, RoTxn};
@ -770,7 +770,16 @@ impl IndexScheduler {
Ok(())
}
// TODO: consider using a type alias or a struct embedder/template
pub fn put_network(&self, network: Network) -> Result<()> {
let wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
self.features.put_network(wtxn, network)?;
Ok(())
}
pub fn network(&self) -> Network {
self.features.network()
}
pub fn embedders(
&self,
index_uid: String,

View File

@ -326,7 +326,7 @@ fn test_auto_deletion_of_tasks() {
fn test_task_queue_is_full() {
let (index_scheduler, mut handle) = IndexScheduler::test_with_custom_config(vec![], |config| {
// that's the minimum map size possible
config.task_db_size = 1048576;
config.task_db_size = 1048576 * 3;
None
});

View File

@ -219,6 +219,8 @@ impl IndexScheduler {
progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures);
let features = self.features().runtime_features();
dump.create_experimental_features(features)?;
let network = self.network();
dump.create_network(network)?;
let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"

View File

@ -260,6 +260,8 @@ InvalidMultiSearchMergeFacets , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchQueryFacets , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchQueryPagination , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchQueryRankingRules , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchQueryPosition , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchRemote , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchWeight , InvalidRequest , BAD_REQUEST ;
InvalidSearchAttributesToSearchOn , InvalidRequest , BAD_REQUEST ;
InvalidSearchAttributesToCrop , InvalidRequest , BAD_REQUEST ;
@ -357,6 +359,12 @@ MissingSwapIndexes , InvalidRequest , BAD_REQUEST ;
MissingTaskFilters , InvalidRequest , BAD_REQUEST ;
NoSpaceLeftOnDevice , System , UNPROCESSABLE_ENTITY;
PayloadTooLarge , InvalidRequest , PAYLOAD_TOO_LARGE ;
ProxyBadResponse , System , BAD_GATEWAY ;
ProxyBadRequest , InvalidRequest , BAD_REQUEST ;
ProxyCouldNotSendRequest , System , BAD_GATEWAY ;
ProxyInvalidApiKey , Auth , FORBIDDEN ;
ProxyRemoteError , System , BAD_GATEWAY ;
ProxyTimeout , System , BAD_GATEWAY ;
TooManySearchRequests , System , SERVICE_UNAVAILABLE ;
TaskNotFound , InvalidRequest , NOT_FOUND ;
BatchNotFound , InvalidRequest , NOT_FOUND ;

View File

@ -1,3 +1,5 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Default, PartialEq, Eq)]
@ -7,6 +9,7 @@ pub struct RuntimeTogglableFeatures {
pub logs_route: bool,
pub edit_documents_by_function: bool,
pub contains_filter: bool,
pub proxy_search: bool,
}
#[derive(Default, Debug, Clone, Copy)]
@ -15,3 +18,18 @@ pub struct InstanceTogglableFeatures {
pub logs_route: bool,
pub contains_filter: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Remote {
pub url: String,
pub search_api_key: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub struct Network {
#[serde(rename = "self")]
pub local: Option<String>,
pub remotes: BTreeMap<String, Remote>,
}

View File

@ -4,13 +4,14 @@ use std::fmt;
use std::str::FromStr;
use deserr::Deserr;
use serde::Serialize;
use utoipa::ToSchema;
use crate::error::{Code, ErrorCode};
/// An index uid is composed of only ascii alphanumeric characters, - and _, between 1 and 400
/// bytes long
#[derive(Debug, Clone, PartialEq, Eq, Deserr, PartialOrd, Ord, ToSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Deserr, PartialOrd, Ord, Serialize, ToSchema)]
#[deserr(try_from(String) = IndexUid::try_from -> IndexUidFormatError)]
#[schema(value_type = String, example = "movies")]
pub struct IndexUid(String);

View File

@ -302,6 +302,12 @@ pub enum Action {
#[serde(rename = "experimental.update")]
#[deserr(rename = "experimental.update")]
ExperimentalFeaturesUpdate,
#[serde(rename = "network.get")]
#[deserr(rename = "network.get")]
NetworkGet,
#[serde(rename = "network.update")]
#[deserr(rename = "network.update")]
NetworkUpdate,
}
impl Action {
@ -341,6 +347,8 @@ impl Action {
KEYS_DELETE => Some(Self::KeysDelete),
EXPERIMENTAL_FEATURES_GET => Some(Self::ExperimentalFeaturesGet),
EXPERIMENTAL_FEATURES_UPDATE => Some(Self::ExperimentalFeaturesUpdate),
NETWORK_GET => Some(Self::NetworkGet),
NETWORK_UPDATE => Some(Self::NetworkUpdate),
_otherwise => None,
}
}
@ -386,4 +394,7 @@ pub mod actions {
pub const KEYS_DELETE: u8 = KeysDelete.repr();
pub const EXPERIMENTAL_FEATURES_GET: u8 = ExperimentalFeaturesGet.repr();
pub const EXPERIMENTAL_FEATURES_UPDATE: u8 = ExperimentalFeaturesUpdate.repr();
pub const NETWORK_GET: u8 = NetworkGet.repr();
pub const NETWORK_UPDATE: u8 = NetworkUpdate.repr();
}

View File

@ -195,6 +195,7 @@ struct Infos {
experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize,
experimental_limit_batched_tasks_total_size: u64,
experimental_proxy_search: bool,
gpu_enabled: bool,
db_path: bool,
import_dump: bool,
@ -285,6 +286,7 @@ impl Infos {
logs_route,
edit_documents_by_function,
contains_filter,
proxy_search,
} = features;
// We're going to override every sensible information.
@ -302,6 +304,7 @@ impl Infos {
experimental_replication_parameters,
experimental_enable_logs_route: experimental_enable_logs_route | logs_route,
experimental_reduce_indexing_memory_usage,
experimental_proxy_search: proxy_search,
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
db_path: db_path != PathBuf::from("./data.ms"),
import_dump: import_dump.is_some(),

View File

@ -431,10 +431,13 @@ fn import_dump(
keys.push(key);
}
// 3. Import the runtime features.
// 3. Import the runtime features and network
let features = dump_reader.features()?.unwrap_or_default();
index_scheduler.put_runtime_features(features)?;
let network = dump_reader.network()?.cloned().unwrap_or_default();
index_scheduler.put_network(network)?;
let indexer_config = index_scheduler.indexer_config();
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might

View File

@ -50,6 +50,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
logs_route: Some(false),
edit_documents_by_function: Some(false),
contains_filter: Some(false),
proxy_search: Some(false),
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
@ -88,6 +89,8 @@ pub struct RuntimeTogglableFeatures {
pub edit_documents_by_function: Option<bool>,
#[deserr(default)]
pub contains_filter: Option<bool>,
#[deserr(default)]
pub proxy_search: Option<bool>,
}
impl From<meilisearch_types::features::RuntimeTogglableFeatures> for RuntimeTogglableFeatures {
@ -97,6 +100,7 @@ impl From<meilisearch_types::features::RuntimeTogglableFeatures> for RuntimeTogg
logs_route,
edit_documents_by_function,
contains_filter,
proxy_search,
} = value;
Self {
@ -104,6 +108,7 @@ impl From<meilisearch_types::features::RuntimeTogglableFeatures> for RuntimeTogg
logs_route: Some(logs_route),
edit_documents_by_function: Some(edit_documents_by_function),
contains_filter: Some(contains_filter),
proxy_search: Some(proxy_search),
}
}
}
@ -114,6 +119,7 @@ pub struct PatchExperimentalFeatureAnalytics {
logs_route: bool,
edit_documents_by_function: bool,
contains_filter: bool,
proxy_search: bool,
}
impl Aggregate for PatchExperimentalFeatureAnalytics {
@ -127,6 +133,7 @@ impl Aggregate for PatchExperimentalFeatureAnalytics {
logs_route: new.logs_route,
edit_documents_by_function: new.edit_documents_by_function,
contains_filter: new.contains_filter,
proxy_search: new.proxy_search,
})
}
@ -149,6 +156,7 @@ impl Aggregate for PatchExperimentalFeatureAnalytics {
logs_route: Some(false),
edit_documents_by_function: Some(false),
contains_filter: Some(false),
proxy_search: Some(false),
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
@ -181,16 +189,18 @@ async fn patch_features(
.edit_documents_by_function
.unwrap_or(old_features.edit_documents_by_function),
contains_filter: new_features.0.contains_filter.unwrap_or(old_features.contains_filter),
proxy_search: new_features.0.proxy_search.unwrap_or(old_features.proxy_search),
};
// explicitly destructure for analytics rather than using the `Serialize` implementation, because
// the it renames to camelCase, which we don't want for analytics.
// it renames to camelCase, which we don't want for analytics.
// **Do not** ignore fields with `..` or `_` here, because we want to add them in the future.
let meilisearch_types::features::RuntimeTogglableFeatures {
metrics,
logs_route,
edit_documents_by_function,
contains_filter,
proxy_search,
} = new_features;
analytics.publish(
@ -199,6 +209,7 @@ async fn patch_features(
logs_route,
edit_documents_by_function,
contains_filter,
proxy_search,
},
&req,
);

View File

@ -54,6 +54,7 @@ mod logs;
mod metrics;
mod multi_search;
mod multi_search_analytics;
pub mod network;
mod open_api_utils;
mod snapshot;
mod swap_indexes;
@ -75,6 +76,7 @@ pub mod tasks;
(path = "/multi-search", api = multi_search::MultiSearchApi),
(path = "/swap-indexes", api = swap_indexes::SwapIndexesApi),
(path = "/experimental-features", api = features::ExperimentalFeaturesApi),
(path = "/network", api = network::NetworkApi),
),
paths(get_health, get_version, get_stats),
tags(
@ -103,7 +105,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/multi-search").configure(multi_search::configure))
.service(web::scope("/swap-indexes").configure(swap_indexes::configure))
.service(web::scope("/metrics").configure(metrics::configure))
.service(web::scope("/experimental-features").configure(features::configure));
.service(web::scope("/experimental-features").configure(features::configure))
.service(web::scope("/network").configure(network::configure));
#[cfg(feature = "swagger")]
{

View File

@ -20,6 +20,7 @@ use crate::routes::indexes::search::search_kind;
use crate::search::{
add_search_rules, perform_federated_search, perform_search, FederatedSearch,
FederatedSearchResult, RetrieveVectors, SearchQueryWithIndex, SearchResultWithIndex,
PROXY_SEARCH_HEADER, PROXY_SEARCH_HEADER_VALUE,
};
use crate::search_queue::SearchQueue;
@ -186,18 +187,22 @@ pub async fn multi_search_with_post(
let response = match federation {
Some(federation) => {
let search_result = tokio::task::spawn_blocking(move || {
perform_federated_search(&index_scheduler, queries, federation, features)
})
.await;
// check remote header
let is_proxy = req
.headers()
.get(PROXY_SEARCH_HEADER)
.is_some_and(|value| value.as_bytes() == PROXY_SEARCH_HEADER_VALUE.as_bytes());
let search_result =
perform_federated_search(&index_scheduler, queries, federation, features, is_proxy)
.await;
permit.drop().await;
if let Ok(Ok(_)) = search_result {
if search_result.is_ok() {
multi_aggregate.succeed();
}
analytics.publish(multi_aggregate, &req);
HttpResponse::Ok().json(search_result??)
HttpResponse::Ok().json(search_result?)
}
None => {
// Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only,

View File

@ -0,0 +1,246 @@
use std::collections::BTreeMap;
use actix_web::web::{self, Data};
use actix_web::{HttpRequest, HttpResponse};
use deserr::actix_web::AwebJson;
use deserr::Deserr;
use index_scheduler::IndexScheduler;
use itertools::{EitherOrBoth, Itertools};
use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{Network as DbNetwork, Remote as DbRemote};
use meilisearch_types::keys::actions;
use meilisearch_types::milli::update::Setting;
use serde::Serialize;
use tracing::debug;
use utoipa::{OpenApi, ToSchema};
use crate::analytics::{Aggregate, Analytics};
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
#[derive(OpenApi)]
#[openapi(
paths(get_network, patch_network),
tags((
name = "Network",
description = "The `/network` route allows you to describe the topology of a network of Meilisearch instances.
This route is **synchronous**. This means that no task object will be returned, and any change to the network will be made available immediately.",
external_docs(url = "https://www.meilisearch.com/docs/reference/api/network"),
)),
)]
pub struct NetworkApi;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
.route(web::get().to(get_network))
.route(web::patch().to(SeqHandler(patch_network))),
);
}
/// Get network topology
///
/// Get a list of all Meilisearch instances currently known to this instance.
#[utoipa::path(
get,
path = "",
tag = "Network",
security(("Bearer" = ["network.get", "network.*", "*"])),
responses(
(status = OK, description = "Known nodes are returned", body = Network, content_type = "application/json", example = json!(
{
"self": "ms-0",
"remotes": {
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
}
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
"message": "The Authorization header is missing. It must use the bearer authorization method.",
"code": "missing_authorization_header",
"type": "auth",
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
}
)),
)
)]
async fn get_network(
index_scheduler: GuardedData<ActionPolicy<{ actions::NETWORK_GET }>, Data<IndexScheduler>>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_proxy_search("Using the /network route")?;
let network = index_scheduler.network();
debug!(returns = ?network, "Get network");
Ok(HttpResponse::Ok().json(network))
}
#[derive(Debug, Deserr, ToSchema, Serialize)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct Remote {
#[schema(value_type = Option<String>, example = json!("http://localhost:7700"))]
#[deserr(default)]
#[serde(default)]
pub url: Setting<String>,
#[schema(value_type = Option<String>, example = json!("XWnBI8QHUc-4IlqbKPLUDuhftNq19mQtjc6JvmivzJU"))]
#[deserr(default)]
#[serde(default)]
pub search_api_key: Setting<String>,
}
#[derive(Debug, Deserr, ToSchema, Serialize)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct Network {
#[schema(value_type = Option<BTreeMap<String, Remote>>, example = json!("http://localhost:7700"))]
#[deserr(default)]
#[serde(default)]
pub remotes: Setting<BTreeMap<String, Option<Remote>>>,
#[schema(value_type = Option<String>, example = json!("ms-00"), rename = "self")]
#[serde(default, rename = "self")]
#[deserr(default, rename = "self")]
pub local: Setting<String>,
}
impl Remote {
pub fn try_into_db_node(self, name: &str) -> Result<DbRemote, ResponseError> {
Ok(DbRemote {
url: self.url.set().ok_or(ResponseError::from_msg(
format!("Missing field `{name}.url`"),
meilisearch_types::error::Code::BadRequest,
))?,
search_api_key: self.search_api_key.set(),
})
}
}
#[derive(Serialize)]
pub struct PatchNetworkAnalytics {
network_size: usize,
network_has_self: bool,
}
impl Aggregate for PatchNetworkAnalytics {
fn event_name(&self) -> &'static str {
"Network Updated"
}
fn aggregate(self: Box<Self>, new: Box<Self>) -> Box<Self> {
Box::new(Self { network_size: new.network_size, network_has_self: new.network_has_self })
}
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}
/// Configure Network
///
/// Add or remove nodes from network.
#[utoipa::path(
patch,
path = "",
tag = "Network",
security(("Bearer" = ["network.update", "network.*", "*"])),
responses(
(status = OK, description = "New network state is returned", body = Network, content_type = "application/json", example = json!(
{
"self": "ms-0",
"remotes": {
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
}
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
"message": "The Authorization header is missing. It must use the bearer authorization method.",
"code": "missing_authorization_header",
"type": "auth",
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
}
)),
)
)]
async fn patch_network(
index_scheduler: GuardedData<ActionPolicy<{ actions::NETWORK_UPDATE }>, Data<IndexScheduler>>,
new_network: AwebJson<Network, DeserrJsonError>,
req: HttpRequest,
analytics: Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_proxy_search("Using the /network route")?;
let new_network = new_network.0;
let old_network = index_scheduler.network();
debug!(parameters = ?new_network, "Patch network");
let merged_self = match new_network.local {
Setting::Set(new_self) => Some(new_self),
Setting::Reset => None,
Setting::NotSet => old_network.local,
};
let merged_remotes = match new_network.remotes {
Setting::Set(new_remotes) => {
let mut merged_remotes = BTreeMap::new();
for either_or_both in old_network
.remotes
.into_iter()
.merge_join_by(new_remotes.into_iter(), |left, right| left.0.cmp(&right.0))
{
match either_or_both {
EitherOrBoth::Both((key, old), (_, Some(new))) => {
let DbRemote { url: old_url, search_api_key: old_search_api_key } = old;
let Remote { url: new_url, search_api_key: new_search_api_key } = new;
let merged = DbRemote {
url: match new_url {
Setting::Set(new_url) => new_url,
Setting::Reset => todo!(),
Setting::NotSet => old_url,
},
search_api_key: match new_search_api_key {
Setting::Set(new_search_api_key) => Some(new_search_api_key),
Setting::Reset => None,
Setting::NotSet => old_search_api_key,
},
};
merged_remotes.insert(key, merged);
}
EitherOrBoth::Both((_, _), (_, None)) | EitherOrBoth::Right((_, None)) => {}
EitherOrBoth::Left((key, node)) => {
merged_remotes.insert(key, node);
}
EitherOrBoth::Right((key, Some(node))) => {
let node = node.try_into_db_node(&key)?;
merged_remotes.insert(key, node);
}
}
}
merged_remotes
}
Setting::Reset => BTreeMap::new(),
Setting::NotSet => old_network.remotes,
};
analytics.publish(
PatchNetworkAnalytics {
network_size: merged_remotes.len(),
network_has_self: merged_self.is_some(),
},
&req,
);
let merged_network = DbNetwork { local: merged_self, remotes: merged_remotes };
index_scheduler.put_network(merged_network.clone())?;
debug!(returns = ?merged_network, "Patch network");
Ok(HttpResponse::Ok().json(merged_network))
}

View File

@ -0,0 +1,10 @@
mod perform;
mod proxy;
mod types;
mod weighted_scores;
pub use perform::perform_federated_search;
pub use proxy::{PROXY_SEARCH_HEADER, PROXY_SEARCH_HEADER_VALUE};
pub use types::{
FederatedSearch, FederatedSearchResult, Federation, FederationOptions, MergeFacets,
};

View File

@ -1,6 +1,5 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt;
use std::iter::Zip;
use std::rc::Rc;
use std::str::FromStr as _;
@ -9,262 +8,45 @@ use std::vec::{IntoIter, Vec};
use actix_http::StatusCode;
use index_scheduler::{IndexScheduler, RoFeatures};
use indexmap::IndexMap;
use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::{
InvalidMultiSearchFacetsByIndex, InvalidMultiSearchMaxValuesPerFacet,
InvalidMultiSearchMergeFacets, InvalidMultiSearchWeight, InvalidSearchLimit,
InvalidSearchOffset,
};
use itertools::Itertools;
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::milli::score_details::{ScoreDetails, ScoreValue};
use meilisearch_types::features::Remote;
use meilisearch_types::milli::score_details::{ScoreDetails, WeightedScoreValue};
use meilisearch_types::milli::{self, DocumentId, OrderBy, TimeBudget};
use roaring::RoaringBitmap;
use serde::Serialize;
use utoipa::ToSchema;
use super::ranking_rules::{self, RankingRules};
use super::{
compute_facet_distribution_stats, prepare_search, AttributesFormat, ComputedFacets, FacetStats,
HitMaker, HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchQuery, SearchQueryWithIndex,
use super::super::ranking_rules::{self, RankingRules};
use super::super::{
compute_facet_distribution_stats, prepare_search, AttributesFormat, ComputedFacets, HitMaker,
HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchQuery, SearchQueryWithIndex,
};
use super::proxy::{proxy_search, ProxySearchError, ProxySearchParams};
use super::types::{
FederatedFacets, FederatedSearchResult, Federation, FederationOptions, MergeFacets, Weight,
FEDERATION_HIT, FEDERATION_REMOTE, WEIGHTED_SCORE_VALUES,
};
use super::weighted_scores;
use crate::error::MeilisearchHttpError;
use crate::routes::indexes::search::search_kind;
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;
#[derive(Debug, Default, Clone, Copy, PartialEq, deserr::Deserr, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct FederationOptions {
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchWeight>)]
#[schema(value_type = f64)]
pub weight: Weight,
}
#[derive(Debug, Clone, Copy, PartialEq, deserr::Deserr)]
#[deserr(try_from(f64) = TryFrom::try_from -> InvalidMultiSearchWeight)]
pub struct Weight(f64);
impl Default for Weight {
fn default() -> Self {
Weight(DEFAULT_FEDERATED_WEIGHT)
}
}
impl std::convert::TryFrom<f64> for Weight {
type Error = InvalidMultiSearchWeight;
fn try_from(f: f64) -> Result<Self, Self::Error> {
if f < 0.0 {
Err(InvalidMultiSearchWeight)
} else {
Ok(Weight(f))
}
}
}
impl std::ops::Deref for Weight {
type Target = f64;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, deserr::Deserr, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[schema(rename_all = "camelCase")]
pub struct Federation {
#[deserr(default = super::DEFAULT_SEARCH_LIMIT(), error = DeserrJsonError<InvalidSearchLimit>)]
pub limit: usize,
#[deserr(default = super::DEFAULT_SEARCH_OFFSET(), error = DeserrJsonError<InvalidSearchOffset>)]
pub offset: usize,
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchFacetsByIndex>)]
pub facets_by_index: BTreeMap<IndexUid, Option<Vec<String>>>,
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchMergeFacets>)]
pub merge_facets: Option<MergeFacets>,
}
#[derive(Copy, Clone, Debug, deserr::Deserr, Default, ToSchema)]
#[deserr(error = DeserrJsonError<InvalidMultiSearchMergeFacets>, rename_all = camelCase, deny_unknown_fields)]
#[schema(rename_all = "camelCase")]
pub struct MergeFacets {
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchMaxValuesPerFacet>)]
pub max_values_per_facet: Option<usize>,
}
#[derive(Debug, deserr::Deserr, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[schema(rename_all = "camelCase")]
pub struct FederatedSearch {
pub queries: Vec<SearchQueryWithIndex>,
#[deserr(default)]
pub federation: Option<Federation>,
}
#[derive(Serialize, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct FederatedSearchResult {
pub hits: Vec<SearchHit>,
pub processing_time_ms: u128,
#[serde(flatten)]
pub hits_info: HitsInfo,
#[serde(skip_serializing_if = "Option::is_none")]
pub semantic_hit_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<BTreeMap<String, BTreeMap<String, u64>>>)]
pub facet_distribution: Option<BTreeMap<String, IndexMap<String, u64>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub facet_stats: Option<BTreeMap<String, FacetStats>>,
#[serde(skip_serializing_if = "FederatedFacets::is_empty")]
pub facets_by_index: FederatedFacets,
// These fields are only used for analytics purposes
#[serde(skip)]
pub degraded: bool,
#[serde(skip)]
pub used_negative_operator: bool,
}
impl fmt::Debug for FederatedSearchResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let FederatedSearchResult {
hits,
processing_time_ms,
hits_info,
semantic_hit_count,
degraded,
used_negative_operator,
facet_distribution,
facet_stats,
facets_by_index,
} = self;
let mut debug = f.debug_struct("SearchResult");
// The most important thing when looking at a search result is the time it took to process
debug.field("processing_time_ms", &processing_time_ms);
debug.field("hits", &format!("[{} hits returned]", hits.len()));
debug.field("hits_info", &hits_info);
if *used_negative_operator {
debug.field("used_negative_operator", used_negative_operator);
}
if *degraded {
debug.field("degraded", degraded);
}
if let Some(facet_distribution) = facet_distribution {
debug.field("facet_distribution", &facet_distribution);
}
if let Some(facet_stats) = facet_stats {
debug.field("facet_stats", &facet_stats);
}
if let Some(semantic_hit_count) = semantic_hit_count {
debug.field("semantic_hit_count", &semantic_hit_count);
}
if !facets_by_index.is_empty() {
debug.field("facets_by_index", &facets_by_index);
}
debug.finish()
}
}
struct WeightedScore<'a> {
details: &'a [ScoreDetails],
weight: f64,
}
impl<'a> WeightedScore<'a> {
pub fn new(details: &'a [ScoreDetails], weight: f64) -> Self {
Self { details, weight }
}
pub fn weighted_global_score(&self) -> f64 {
ScoreDetails::global_score(self.details.iter()) * self.weight
}
pub fn compare_weighted_global_scores(&self, other: &Self) -> Ordering {
self.weighted_global_score()
.partial_cmp(&other.weighted_global_score())
// both are numbers, possibly infinite
.unwrap()
}
pub fn compare(&self, other: &Self) -> Ordering {
let mut left_it = ScoreDetails::score_values(self.details.iter());
let mut right_it = ScoreDetails::score_values(other.details.iter());
loop {
let left = left_it.next();
let right = right_it.next();
match (left, right) {
(None, None) => return Ordering::Equal,
(None, Some(_)) => return Ordering::Less,
(Some(_), None) => return Ordering::Greater,
(Some(ScoreValue::Score(left)), Some(ScoreValue::Score(right))) => {
let left = left * self.weight;
let right = right * other.weight;
if (left - right).abs() <= f64::EPSILON {
continue;
}
return left.partial_cmp(&right).unwrap();
}
(Some(ScoreValue::Sort(left)), Some(ScoreValue::Sort(right))) => {
match left.partial_cmp(right) {
Some(Ordering::Equal) => continue,
Some(order) => return order,
None => return self.compare_weighted_global_scores(other),
}
}
(Some(ScoreValue::GeoSort(left)), Some(ScoreValue::GeoSort(right))) => {
match left.partial_cmp(right) {
Some(Ordering::Equal) => continue,
Some(order) => return order,
None => {
return self.compare_weighted_global_scores(other);
}
}
}
// not comparable details, use global
(Some(ScoreValue::Score(_)), Some(_))
| (Some(_), Some(ScoreValue::Score(_)))
| (Some(ScoreValue::GeoSort(_)), Some(ScoreValue::Sort(_)))
| (Some(ScoreValue::Sort(_)), Some(ScoreValue::GeoSort(_))) => {
let left_count = left_it.count();
let right_count = right_it.count();
// compare how many remaining groups of rules each side has.
// the group with the most remaining groups wins.
return left_count
.cmp(&right_count)
// breaks ties with the global ranking score
.then_with(|| self.compare_weighted_global_scores(other));
}
}
}
}
}
use crate::search::federated::types::{INDEX_UID, QUERIES_POSITION, WEIGHTED_RANKING_SCORE};
struct QueryByIndex {
query: SearchQuery,
federation_options: FederationOptions,
weight: Weight,
query_index: usize,
}
struct SearchResultByQuery<'a> {
documents_ids: Vec<DocumentId>,
document_scores: Vec<Vec<ScoreDetails>>,
federation_options: FederationOptions,
weight: Weight,
hit_maker: HitMaker<'a>,
query_index: usize,
}
struct SearchResultByQueryIter<'a> {
it: Zip<IntoIter<DocumentId>, IntoIter<Vec<ScoreDetails>>>,
federation_options: FederationOptions,
weight: Weight,
hit_maker: Rc<HitMaker<'a>>,
query_index: usize,
}
@ -272,22 +54,22 @@ struct SearchResultByQueryIter<'a> {
impl<'a> SearchResultByQueryIter<'a> {
fn new(
SearchResultByQuery {
documents_ids,
document_scores,
federation_options,
hit_maker,
query_index,
}: SearchResultByQuery<'a>,
documents_ids,
document_scores,
weight,
hit_maker,
query_index,
}: SearchResultByQuery<'a>,
) -> Self {
let it = documents_ids.into_iter().zip(document_scores);
Self { it, federation_options, hit_maker: Rc::new(hit_maker), query_index }
Self { it, weight, hit_maker: Rc::new(hit_maker), query_index }
}
}
struct SearchResultByQueryIterItem<'a> {
docid: DocumentId,
score: Vec<ScoreDetails>,
federation_options: FederationOptions,
weight: Weight,
hit_maker: Rc<HitMaker<'a>>,
query_index: usize,
}
@ -298,10 +80,12 @@ fn merge_index_local_results(
itertools::kmerge_by(
results_by_query.into_iter().map(SearchResultByQueryIter::new),
|left: &SearchResultByQueryIterItem, right: &SearchResultByQueryIterItem| {
let left_score = WeightedScore::new(&left.score, *left.federation_options.weight);
let right_score = WeightedScore::new(&right.score, *right.federation_options.weight);
match left_score.compare(&right_score) {
match weighted_scores::compare(
ScoreDetails::weighted_score_values(left.score.iter(), *left.weight),
ScoreDetails::global_score(left.score.iter()) * *left.weight,
ScoreDetails::weighted_score_values(right.score.iter(), *right.weight),
ScoreDetails::global_score(right.score.iter()) * *right.weight,
) {
// the biggest score goes first
Ordering::Greater => true,
// break ties using query index
@ -314,24 +98,134 @@ fn merge_index_local_results(
fn merge_index_global_results(
results_by_index: Vec<SearchResultByIndex>,
) -> impl Iterator<Item = SearchHitByIndex> {
remote_results: &mut [FederatedSearchResult],
) -> impl Iterator<Item = MergedSearchHit> + '_ {
itertools::kmerge_by(
results_by_index.into_iter().map(|result_by_index| result_by_index.hits.into_iter()),
|left: &SearchHitByIndex, right: &SearchHitByIndex| {
let left_score = WeightedScore::new(&left.score, *left.federation_options.weight);
let right_score = WeightedScore::new(&right.score, *right.federation_options.weight);
// local results
results_by_index
.into_iter()
.map(|result_by_index| {
either::Either::Left(result_by_index.hits.into_iter().map(MergedSearchHit::Local))
})
// remote results
.chain(remote_results.iter_mut().map(|x| either::Either::Right(iter_remote_hits(x)))),
|left: &MergedSearchHit, right: &MergedSearchHit| {
let (left_it, left_weighted_global_score, left_query_index) = left.to_score();
let (right_it, right_weighted_global_score, right_query_index) = right.to_score();
match left_score.compare(&right_score) {
match weighted_scores::compare(
left_it,
left_weighted_global_score,
right_it,
right_weighted_global_score,
) {
// the biggest score goes first
Ordering::Greater => true,
// break ties using query index
Ordering::Equal => left.query_index < right.query_index,
Ordering::Equal => left_query_index < right_query_index,
Ordering::Less => false,
}
},
)
}
enum MergedSearchHit {
Local(SearchHitByIndex),
Remote {
hit: SearchHit,
score: Vec<WeightedScoreValue>,
global_weighted_score: f64,
query_index: usize,
},
}
impl MergedSearchHit {
fn remote(mut hit: SearchHit) -> Result<Self, ProxySearchError> {
let federation = hit
.document
.get_mut(FEDERATION_HIT)
.ok_or(ProxySearchError::MissingPathInResponse("._federation"))?;
let federation = match federation.as_object_mut() {
Some(federation) => federation,
None => {
return Err(ProxySearchError::UnexpectedValueInPath {
path: "._federation",
expected_type: "map",
received_value: federation.to_string(),
});
}
};
let global_weighted_score = federation
.get(WEIGHTED_RANKING_SCORE)
.ok_or(ProxySearchError::MissingPathInResponse("._federation.weightedRankingScore"))?;
let global_weighted_score = global_weighted_score.as_f64().ok_or_else(|| {
ProxySearchError::UnexpectedValueInPath {
path: "._federation.weightedRankingScore",
expected_type: "number",
received_value: global_weighted_score.to_string(),
}
})?;
let score: Vec<WeightedScoreValue> =
serde_json::from_value(federation.remove(WEIGHTED_SCORE_VALUES).ok_or(
ProxySearchError::MissingPathInResponse("._federation.weightedScoreValues"),
)?)
.map_err(ProxySearchError::CouldNotParseWeightedScoreValues)?;
let query_index = federation
.get(QUERIES_POSITION)
.ok_or(ProxySearchError::MissingPathInResponse("._federation.queriesPosition"))?;
let query_index =
query_index.as_u64().ok_or_else(|| ProxySearchError::UnexpectedValueInPath {
path: "._federation.queriesPosition",
expected_type: "integer",
received_value: query_index.to_string(),
})? as usize;
Ok(Self::Remote { hit, score, global_weighted_score, query_index })
}
fn hit(self) -> SearchHit {
match self {
MergedSearchHit::Local(search_hit_by_index) => search_hit_by_index.hit,
MergedSearchHit::Remote { hit, .. } => hit,
}
}
fn to_score(&self) -> (impl Iterator<Item = WeightedScoreValue> + '_, f64, usize) {
match self {
MergedSearchHit::Local(search_hit_by_index) => (
either::Left(ScoreDetails::weighted_score_values(
search_hit_by_index.score.iter(),
*search_hit_by_index.weight,
)),
ScoreDetails::global_score(search_hit_by_index.score.iter())
* *search_hit_by_index.weight,
search_hit_by_index.query_index,
),
MergedSearchHit::Remote { hit: _, score, global_weighted_score, query_index } => {
let global_weighted_score = *global_weighted_score;
let query_index = *query_index;
(either::Right(score.iter().cloned()), global_weighted_score, query_index)
}
}
}
}
fn iter_remote_hits(
results_by_host: &mut FederatedSearchResult,
) -> impl Iterator<Item = MergedSearchHit> + '_ {
// have a per node registry of failed hits
results_by_host.hits.drain(..).filter_map(|hit| match MergedSearchHit::remote(hit) {
Ok(hit) => Some(hit),
Err(err) => {
tracing::warn!("skipping remote hit due to error: {err}");
None
}
})
}
impl<'a> Iterator for SearchResultByQueryIter<'a> {
type Item = SearchResultByQueryIterItem<'a>;
@ -340,7 +234,7 @@ impl<'a> Iterator for SearchResultByQueryIter<'a> {
Some(SearchResultByQueryIterItem {
docid,
score,
federation_options: self.federation_options,
weight: self.weight,
hit_maker: Rc::clone(&self.hit_maker),
query_index: self.query_index,
})
@ -350,7 +244,7 @@ impl<'a> Iterator for SearchResultByQueryIter<'a> {
struct SearchHitByIndex {
hit: SearchHit,
score: Vec<ScoreDetails>,
federation_options: FederationOptions,
weight: Weight,
query_index: usize,
}
@ -363,106 +257,31 @@ struct SearchResultByIndex {
facets: Option<ComputedFacets>,
}
#[derive(Debug, Clone, Default, Serialize, ToSchema)]
pub struct FederatedFacets(pub BTreeMap<String, ComputedFacets>);
impl FederatedFacets {
pub fn insert(&mut self, index: String, facets: Option<ComputedFacets>) {
if let Some(facets) = facets {
self.0.insert(index, facets);
}
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn merge(
self,
MergeFacets { max_values_per_facet }: MergeFacets,
facet_order: BTreeMap<String, (String, OrderBy)>,
) -> Option<ComputedFacets> {
if self.is_empty() {
return None;
}
let mut distribution: BTreeMap<String, _> = Default::default();
let mut stats: BTreeMap<String, FacetStats> = Default::default();
for facets_by_index in self.0.into_values() {
for (facet, index_distribution) in facets_by_index.distribution {
match distribution.entry(facet) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(index_distribution);
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
let distribution = entry.get_mut();
for (value, index_count) in index_distribution {
distribution
.entry(value)
.and_modify(|count| *count += index_count)
.or_insert(index_count);
}
}
}
}
for (facet, index_stats) in facets_by_index.stats {
match stats.entry(facet) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(index_stats);
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
let stats = entry.get_mut();
stats.min = f64::min(stats.min, index_stats.min);
stats.max = f64::max(stats.max, index_stats.max);
}
}
}
}
// fixup order
for (facet, values) in &mut distribution {
let order_by = facet_order.get(facet).map(|(_, order)| *order).unwrap_or_default();
match order_by {
OrderBy::Lexicographic => {
values.sort_unstable_by(|left, _, right, _| left.cmp(right))
}
OrderBy::Count => {
values.sort_unstable_by(|_, left, _, right| {
left.cmp(right)
// biggest first
.reverse()
})
}
}
if let Some(max_values_per_facet) = max_values_per_facet {
values.truncate(max_values_per_facet)
};
}
Some(ComputedFacets { distribution, stats })
}
}
pub fn perform_federated_search(
pub async fn perform_federated_search(
index_scheduler: &IndexScheduler,
queries: Vec<SearchQueryWithIndex>,
mut federation: Federation,
features: RoFeatures,
is_proxy: bool,
) -> Result<FederatedSearchResult, ResponseError> {
if is_proxy {
index_scheduler.features().check_proxy_search("Performing a proxy search")?;
}
let before_search = std::time::Instant::now();
let deadline = before_search + std::time::Duration::from_secs(9);
let network = index_scheduler.network();
// this implementation partition the queries by index to guarantee an important property:
// - all the queries to a particular index use the same read transaction.
// This is an important property, otherwise we cannot guarantee the self-consistency of the results.
// 1. partition queries by index
let mut queries_by_index: BTreeMap<String, Vec<QueryByIndex>> = Default::default();
// 1. partition queries by host and index
let mut remote_queries_by_host: BTreeMap<String, (Remote, Vec<SearchQueryWithIndex>)> =
Default::default();
let mut local_queries_by_index: BTreeMap<String, Vec<QueryByIndex>> = Default::default();
let mut has_remote = false;
let mut has_query_position = false;
for (query_index, federated_query) in queries.into_iter().enumerate() {
if let Some(pagination_field) = federated_query.has_pagination() {
return Err(MeilisearchHttpError::PaginationInFederatedQuery(
@ -484,11 +303,68 @@ pub fn perform_federated_search(
let (index_uid, query, federation_options) = federated_query.into_index_query_federation();
queries_by_index.entry(index_uid.into_inner()).or_default().push(QueryByIndex {
query,
federation_options: federation_options.unwrap_or_default(),
query_index,
})
let federation_options = federation_options.unwrap_or_default();
// local or remote node?
'local_query: {
let queries_by_index = match federation_options.remote {
None => local_queries_by_index.entry(index_uid.into_inner()).or_default(),
Some(remote_name) => {
has_remote = true;
match &network.local {
Some(local) if local == &remote_name => {
local_queries_by_index.entry(index_uid.into_inner()).or_default()
}
_ => {
// node from the network
let Some(remote) = network.remotes.get(&remote_name) else {
return Err(ResponseError::from_msg(format!("Invalid `queries[{query_index}].federation_options.remote`: remote `{remote_name}` is not registered"),
meilisearch_types::error::Code::InvalidMultiSearchRemote));
};
let query = SearchQueryWithIndex::from_index_query_federation(
index_uid,
query,
Some(FederationOptions {
weight: federation_options.weight,
// do not pass the `remote` to not require the remote instance to have itself has a local node
remote: None,
// pass an explicit query index
query_position: Some(query_index),
}),
);
remote_queries_by_host
.entry(remote_name)
.or_insert_with(|| (remote.clone(), Default::default()))
.1
.push(query);
break 'local_query;
}
}
}
};
queries_by_index.push(QueryByIndex {
query,
weight: federation_options.weight,
// override query index here with the one in federation.
// this will fix-up error messages to refer to the global query index of the original request.
query_index: if let Some(query_index) = federation_options.query_position {
has_query_position = true;
query_index
} else {
query_index
},
})
}
}
if has_remote {
index_scheduler.features().check_proxy_search("Performing a proxy search")?;
}
if has_query_position {
index_scheduler.features().check_proxy_search("Using `federationOptions.queryPosition`")?;
}
// 2. perform queries, merge and make hits index by index
@ -497,7 +373,7 @@ pub fn perform_federated_search(
// In step (2), semantic_hit_count will be set to Some(0) if any search kind uses semantic
// Then in step (3), we'll update its value if there is any semantic search
let mut semantic_hit_count = None;
let mut results_by_index = Vec::with_capacity(queries_by_index.len());
let mut results_by_index = Vec::with_capacity(local_queries_by_index.len());
let mut previous_query_data: Option<(RankingRules, usize, String)> = None;
// remember the order and name of first index for each facet when merging with index settings
@ -508,7 +384,29 @@ pub fn perform_federated_search(
_ => None,
};
for (index_uid, queries) in queries_by_index {
// run remote queries
let mut in_flight_remote_queries = BTreeMap::new();
let client = reqwest::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_millis(200))
.build()
.unwrap();
let params =
ProxySearchParams { deadline: Some(deadline), try_count: 3, client: client.clone() };
for (node_name, (node, queries)) in remote_queries_by_host {
// spawn one task per host
in_flight_remote_queries.insert(
node_name,
tokio::spawn({
let mut federation = federation.clone();
// never merge distant facets
federation.merge_facets = None;
let params = params.clone();
async move { proxy_search(&node, queries, federation, &params).await }
}),
);
}
for (index_uid, queries) in local_queries_by_index {
let first_query_index = queries.first().map(|query| query.query_index);
let index = match index_scheduler.index(&index_uid) {
@ -564,7 +462,7 @@ pub fn perform_federated_search(
// 2.1. Compute all candidates for each query in the index
let mut results_by_query = Vec::with_capacity(queries.len());
for QueryByIndex { query, federation_options, query_index } in queries {
for QueryByIndex { query, weight, query_index } in queries {
// use an immediately invoked lambda to capture the result without returning from the function
let res: Result<(), ResponseError> = (|| {
@ -646,7 +544,7 @@ pub fn perform_federated_search(
search.limit(required_hit_count);
let (result, _semantic_hit_count) =
super::search_from_kind(index_uid.to_string(), search_kind, search)?;
super::super::search_from_kind(index_uid.to_string(), search_kind, search)?;
let format = AttributesFormat {
attributes_to_retrieve: query.attributes_to_retrieve,
retrieve_vectors,
@ -686,7 +584,7 @@ pub fn perform_federated_search(
})?;
results_by_query.push(SearchResultByQuery {
federation_options,
weight,
hit_maker,
query_index,
documents_ids,
@ -712,23 +610,38 @@ pub fn perform_federated_search(
|SearchResultByQueryIterItem {
docid,
score,
federation_options,
weight,
hit_maker,
query_index,
}| {
let mut hit = hit_maker.make_hit(docid, &score)?;
let weighted_score =
ScoreDetails::global_score(score.iter()) * (*federation_options.weight);
let weighted_score = ScoreDetails::global_score(score.iter()) * (*weight);
let _federation = serde_json::json!(
let mut _federation = serde_json::json!(
{
"indexUid": index_uid,
"queriesPosition": query_index,
"weightedRankingScore": weighted_score,
INDEX_UID: index_uid,
QUERIES_POSITION: query_index,
WEIGHTED_RANKING_SCORE: weighted_score,
}
);
hit.document.insert("_federation".to_string(), _federation);
Ok(SearchHitByIndex { hit, score, federation_options, query_index })
if has_remote && !is_proxy {
_federation.as_object_mut().unwrap().insert(
FEDERATION_REMOTE.to_string(),
network.local.clone().into(),
);
}
if is_proxy {
_federation.as_object_mut().unwrap().insert(
WEIGHTED_SCORE_VALUES.to_string(),
serde_json::json!(ScoreDetails::weighted_score_values(
score.iter(),
*weight
)
.collect_vec()),
);
}
hit.document.insert(FEDERATION_HIT.to_string(), _federation);
Ok(SearchHitByIndex { hit, score, weight, query_index })
},
)
.collect();
@ -744,7 +657,7 @@ pub fn perform_federated_search(
&index,
&rtxn,
candidates,
super::Route::MultiSearch,
super::super::Route::MultiSearch,
)
})
.transpose()
@ -781,9 +694,9 @@ pub fn perform_federated_search(
// here the resource not found is not part of the URL.
err.code = StatusCode::BAD_REQUEST;
err.message = format!(
"Inside `.federation.facetsByIndex.{index_uid}`: {}\n - Note: index `{index_uid}` is not used in queries",
err.message
);
"Inside `.federation.facetsByIndex.{index_uid}`: {}\n - Note: index `{index_uid}` is not used in queries",
err.message
);
return Err(err);
}
};
@ -795,8 +708,8 @@ pub fn perform_federated_search(
check_facet_order(&mut facet_order, &index_uid, &facets, &index, &rtxn)
{
error.message = format!(
"Inside `.federation.facetsByIndex.{index_uid}`: {error}\n - Note: index `{index_uid}` is not used in queries",
);
"Inside `.federation.facetsByIndex.{index_uid}`: {error}\n - Note: index `{index_uid}` is not used in queries",
);
return Err(error);
}
@ -806,17 +719,71 @@ pub fn perform_federated_search(
&index,
&rtxn,
Default::default(),
super::Route::MultiSearch,
super::super::Route::MultiSearch,
) {
error.message =
format!("Inside `.federation.facetsByIndex.{index_uid}`: {}\n - Note: index `{index_uid}` is not used in queries", error.message);
format!("Inside `.federation.facetsByIndex.{index_uid}`: {}\n - Note: index `{index_uid}` is not used in queries", error.message);
return Err(error);
}
}
}
// 3. merge hits and metadata across indexes
// 3.1 merge metadata
// 3. merge hits and metadata across indexes and hosts
// 3.1. Wait for proxy search requests to complete
let mut remote_results = Vec::with_capacity(in_flight_remote_queries.len());
let mut remote_errors: BTreeMap<String, ResponseError> = BTreeMap::new();
'remote_queries: for (node_name, handle) in in_flight_remote_queries {
match handle.await {
Ok(Ok(mut res)) => {
for hit in &mut res.hits {
let Some(federation) = hit.document.get_mut(FEDERATION_HIT) else {
let error = ProxySearchError::MissingPathInResponse("._federation");
remote_errors.insert(node_name, error.as_response_error());
continue 'remote_queries;
};
let Some(federation) = federation.as_object_mut() else {
let error = ProxySearchError::UnexpectedValueInPath {
path: "._federation",
expected_type: "map",
received_value: federation.to_string(),
};
remote_errors.insert(node_name, error.as_response_error());
continue 'remote_queries;
};
federation.insert(
FEDERATION_REMOTE.to_string(),
serde_json::Value::String(node_name.clone()),
);
}
remote_results.push(res);
}
Ok(Err(error)) => {
remote_errors.insert(node_name, error.as_response_error());
}
Err(panic) => match panic.try_into_panic() {
Ok(panic) => {
let msg = match panic.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match panic.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
remote_errors.insert(
node_name,
ResponseError::from_msg(
msg.to_string(),
meilisearch_types::error::Code::Internal,
),
);
}
Err(_) => tracing::error!("proxy search task was unexpectedly cancelled"),
},
}
}
// 3.2. merge metadata
let (estimated_total_hits, degraded, used_negative_operator, facets) = {
let mut estimated_total_hits = 0;
let mut degraded = false;
@ -843,21 +810,43 @@ pub fn perform_federated_search(
facets.insert(index, facets_by_index);
}
for FederatedSearchResult {
hits: _,
processing_time_ms: _,
hits_info,
semantic_hit_count: _,
facet_distribution: _,
facet_stats: _,
facets_by_index: facets_by_index_by_host,
degraded: degraded_for_host,
used_negative_operator: host_used_negative_operator,
remote_errors: _,
} in &remote_results
{
estimated_total_hits += match hits_info {
HitsInfo::Pagination { total_hits: estimated_total_hits, .. }
| HitsInfo::OffsetLimit { estimated_total_hits, .. } => estimated_total_hits,
};
degraded |= degraded_for_host;
used_negative_operator |= host_used_negative_operator;
}
(estimated_total_hits, degraded, used_negative_operator, facets)
};
// 3.2 merge hits
let merged_hits: Vec<_> = merge_index_global_results(results_by_index)
// 3.3. merge hits
let merged_hits: Vec<_> = merge_index_global_results(results_by_index, &mut remote_results)
.skip(federation.offset)
.take(federation.limit)
.inspect(|hit| {
if let Some(semantic_hit_count) = &mut semantic_hit_count {
if hit.score.iter().any(|score| matches!(&score, ScoreDetails::Vector(_))) {
if hit.to_score().0.any(|score| matches!(&score, WeightedScoreValue::VectorSort(_)))
{
*semantic_hit_count += 1;
}
}
})
.map(|hit| hit.hit)
.map(|hit| hit.hit())
.collect();
let (facet_distribution, facet_stats, facets_by_index) =
@ -888,6 +877,7 @@ pub fn perform_federated_search(
facet_distribution,
facet_stats,
facets_by_index,
remote_errors: has_remote.then_some(remote_errors),
};
Ok(search_result)

View File

@ -0,0 +1,268 @@
pub use error::ProxySearchError;
use error::ReqwestErrorWithoutUrl;
use meilisearch_types::features::Remote;
use rand::Rng as _;
use reqwest::{Client, Response, StatusCode};
use serde::de::DeserializeOwned;
use serde_json::Value;
use super::types::{FederatedSearch, FederatedSearchResult, Federation};
use crate::search::SearchQueryWithIndex;
pub const PROXY_SEARCH_HEADER: &str = "Meili-Proxy-Search";
pub const PROXY_SEARCH_HEADER_VALUE: &str = "true";
mod error {
use meilisearch_types::error::ResponseError;
use reqwest::StatusCode;
#[derive(Debug, thiserror::Error)]
pub enum ProxySearchError {
#[error("{0}")]
CouldNotSendRequest(ReqwestErrorWithoutUrl),
#[error("could not authenticate against the remote host")]
AuthenticationError,
#[error(
"could not parse response from the remote host as a federated search response{}",
response_from_remote(response)
)]
CouldNotParseResponse { response: Result<String, ReqwestErrorWithoutUrl> },
#[error("remote host responded with code {}{}", status_code.as_u16(), response_from_remote(response))]
BadRequest { status_code: StatusCode, response: Result<String, ReqwestErrorWithoutUrl> },
#[error("remote host did not answer before the deadline")]
Timeout,
#[error("remote hit does not contain `{0}`")]
MissingPathInResponse(&'static str),
#[error("remote host responded with code {}{}", status_code.as_u16(), response_from_remote(response))]
RemoteError { status_code: StatusCode, response: Result<String, ReqwestErrorWithoutUrl> },
#[error("remote hit contains an unexpected value at path `{path}`: expected {expected_type}, received `{received_value}`")]
UnexpectedValueInPath {
path: &'static str,
expected_type: &'static str,
received_value: String,
},
#[error("could not parse weighted score values in the remote hit: {0}")]
CouldNotParseWeightedScoreValues(serde_json::Error),
}
impl ProxySearchError {
pub fn as_response_error(&self) -> ResponseError {
use meilisearch_types::error::Code;
let message = self.to_string();
let code = match self {
ProxySearchError::CouldNotSendRequest(_) => Code::ProxyCouldNotSendRequest,
ProxySearchError::AuthenticationError => Code::ProxyInvalidApiKey,
ProxySearchError::BadRequest { .. } => Code::ProxyBadRequest,
ProxySearchError::Timeout => Code::ProxyTimeout,
ProxySearchError::RemoteError { .. } => Code::ProxyRemoteError,
ProxySearchError::CouldNotParseResponse { .. }
| ProxySearchError::MissingPathInResponse(_)
| ProxySearchError::UnexpectedValueInPath { .. }
| ProxySearchError::CouldNotParseWeightedScoreValues(_) => Code::ProxyBadResponse,
};
ResponseError::from_msg(message, code)
}
}
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub struct ReqwestErrorWithoutUrl(reqwest::Error);
impl ReqwestErrorWithoutUrl {
pub fn new(inner: reqwest::Error) -> Self {
Self(inner.without_url())
}
}
fn response_from_remote(response: &Result<String, ReqwestErrorWithoutUrl>) -> String {
match response {
Ok(response) => {
// unwrap: to_string of a value should not fail
format!(":\n - response from remote: {}", response)
}
Err(error) => {
format!(":\n - additionally, could not retrieve response from remote: {error}")
}
}
}
}
#[derive(Clone)]
pub struct ProxySearchParams {
pub deadline: Option<std::time::Instant>,
pub try_count: u32,
pub client: reqwest::Client,
}
/// Performs a federated search on a remote host and returns the results
pub async fn proxy_search(
node: &Remote,
queries: Vec<SearchQueryWithIndex>,
federation: Federation,
params: &ProxySearchParams,
) -> Result<FederatedSearchResult, ProxySearchError> {
let url = format!("{}/multi-search", node.url);
let federated = FederatedSearch { queries, federation: Some(federation) };
let search_api_key = node.search_api_key.as_deref();
let max_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
let deadline = if let Some(deadline) = params.deadline {
std::time::Instant::min(deadline, max_deadline)
} else {
max_deadline
};
for i in 0..=params.try_count {
match try_proxy_search(&url, search_api_key, &federated, &params.client, deadline).await {
Ok(response) => return Ok(response),
Err(retry) => {
let duration = retry.into_duration(i)?;
tokio::time::sleep(duration).await;
}
}
}
try_proxy_search(&url, search_api_key, &federated, &params.client, deadline)
.await
.map_err(Retry::into_error)
}
async fn try_proxy_search(
url: &str,
search_api_key: Option<&str>,
federated: &FederatedSearch,
client: &Client,
deadline: std::time::Instant,
) -> Result<FederatedSearchResult, Retry> {
let timeout = deadline.saturating_duration_since(std::time::Instant::now());
let request = client.post(url).json(&federated).timeout(timeout);
let request = if let Some(search_api_key) = search_api_key {
request.bearer_auth(search_api_key)
} else {
request
};
let request = request.header(PROXY_SEARCH_HEADER, PROXY_SEARCH_HEADER_VALUE);
let response = request.send().await;
let response = match response {
Ok(response) => response,
Err(error) if error.is_timeout() => return Err(Retry::give_up(ProxySearchError::Timeout)),
Err(error) => {
return Err(Retry::retry_later(ProxySearchError::CouldNotSendRequest(
ReqwestErrorWithoutUrl::new(error),
)))
}
};
match response.status() {
status_code if status_code.is_success() => (),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
return Err(Retry::give_up(ProxySearchError::AuthenticationError))
}
status_code if status_code.is_client_error() => {
let response = parse_error(response).await;
return Err(Retry::give_up(ProxySearchError::BadRequest { status_code, response }));
}
status_code if status_code.is_server_error() => {
let response = parse_error(response).await;
return Err(Retry::retry_later(ProxySearchError::RemoteError {
status_code,
response,
}));
}
status_code => {
tracing::debug!(
status_code = status_code.as_u16(),
"remote replied with unexpected status code"
);
}
}
let response = match parse_response(response).await {
Ok(response) => response,
Err(response) => {
return Err(Retry::retry_later(ProxySearchError::CouldNotParseResponse { response }))
}
};
Ok(response)
}
/// Always parse the body of the response of a failed request as JSON.
async fn parse_error(response: Response) -> Result<String, ReqwestErrorWithoutUrl> {
let bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(error) => return Err(ReqwestErrorWithoutUrl::new(error)),
};
Ok(parse_bytes_as_error(&bytes))
}
fn parse_bytes_as_error(bytes: &[u8]) -> String {
match serde_json::from_slice::<Value>(bytes) {
Ok(value) => value.to_string(),
Err(_) => String::from_utf8_lossy(bytes).into_owned(),
}
}
async fn parse_response<T: DeserializeOwned>(
response: Response,
) -> Result<T, Result<String, ReqwestErrorWithoutUrl>> {
let bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(error) => return Err(Err(ReqwestErrorWithoutUrl::new(error))),
};
match serde_json::from_slice::<T>(&bytes) {
Ok(value) => Ok(value),
Err(_) => Err(Ok(parse_bytes_as_error(&bytes))),
}
}
pub struct Retry {
error: ProxySearchError,
strategy: RetryStrategy,
}
pub enum RetryStrategy {
GiveUp,
Retry,
}
impl Retry {
pub fn give_up(error: ProxySearchError) -> Self {
Self { error, strategy: RetryStrategy::GiveUp }
}
pub fn retry_later(error: ProxySearchError) -> Self {
Self { error, strategy: RetryStrategy::Retry }
}
pub fn into_duration(self, attempt: u32) -> Result<std::time::Duration, ProxySearchError> {
match self.strategy {
RetryStrategy::GiveUp => Err(self.error),
RetryStrategy::Retry => {
let retry_duration = std::time::Duration::from_millis((10u64).pow(attempt));
let retry_duration = retry_duration.min(std::time::Duration::from_secs(1)); // don't wait more than a minute
// randomly up to double the retry duration
let retry_duration = retry_duration
+ rand::thread_rng().gen_range(std::time::Duration::ZERO..retry_duration);
tracing::warn!(
"Attempt #{}, failed with {}, retrying after {}ms.",
attempt,
self.error,
retry_duration.as_millis()
);
Ok(retry_duration)
}
}
}
pub fn into_error(self) -> ProxySearchError {
self.error
}
}

View File

@ -0,0 +1,270 @@
use std::collections::BTreeMap;
use std::fmt;
use std::vec::Vec;
use indexmap::IndexMap;
use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::{
InvalidMultiSearchFacetsByIndex, InvalidMultiSearchMaxValuesPerFacet,
InvalidMultiSearchMergeFacets, InvalidMultiSearchQueryPosition, InvalidMultiSearchRemote,
InvalidMultiSearchWeight, InvalidSearchLimit, InvalidSearchOffset,
};
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::milli::OrderBy;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;
// fields in the response
pub const FEDERATION_HIT: &str = "_federation";
pub const INDEX_UID: &str = "indexUid";
pub const QUERIES_POSITION: &str = "queriesPosition";
pub const WEIGHTED_RANKING_SCORE: &str = "weightedRankingScore";
pub const WEIGHTED_SCORE_VALUES: &str = "weightedScoreValues";
pub const FEDERATION_REMOTE: &str = "remote";
#[derive(Debug, Default, Clone, PartialEq, Serialize, deserr::Deserr, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub struct FederationOptions {
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchWeight>)]
#[schema(value_type = f64)]
pub weight: Weight,
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchRemote>)]
pub remote: Option<String>,
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchQueryPosition>)]
pub query_position: Option<usize>,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, deserr::Deserr)]
#[deserr(try_from(f64) = TryFrom::try_from -> InvalidMultiSearchWeight)]
pub struct Weight(f64);
impl Default for Weight {
fn default() -> Self {
Weight(DEFAULT_FEDERATED_WEIGHT)
}
}
impl std::convert::TryFrom<f64> for Weight {
type Error = InvalidMultiSearchWeight;
fn try_from(f: f64) -> Result<Self, Self::Error> {
if f < 0.0 {
Err(InvalidMultiSearchWeight)
} else {
Ok(Weight(f))
}
}
}
impl std::ops::Deref for Weight {
type Target = f64;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, Clone, deserr::Deserr, Serialize, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[schema(rename_all = "camelCase")]
#[serde(rename_all = "camelCase")]
pub struct Federation {
#[deserr(default = super::super::DEFAULT_SEARCH_LIMIT(), error = DeserrJsonError<InvalidSearchLimit>)]
pub limit: usize,
#[deserr(default = super::super::DEFAULT_SEARCH_OFFSET(), error = DeserrJsonError<InvalidSearchOffset>)]
pub offset: usize,
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchFacetsByIndex>)]
pub facets_by_index: BTreeMap<IndexUid, Option<Vec<String>>>,
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchMergeFacets>)]
pub merge_facets: Option<MergeFacets>,
}
#[derive(Copy, Clone, Debug, deserr::Deserr, Serialize, Default, ToSchema)]
#[deserr(error = DeserrJsonError<InvalidMultiSearchMergeFacets>, rename_all = camelCase, deny_unknown_fields)]
#[schema(rename_all = "camelCase")]
#[serde(rename_all = "camelCase")]
pub struct MergeFacets {
#[deserr(default, error = DeserrJsonError<InvalidMultiSearchMaxValuesPerFacet>)]
pub max_values_per_facet: Option<usize>,
}
#[derive(Debug, deserr::Deserr, Serialize, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[schema(rename_all = "camelCase")]
#[serde(rename_all = "camelCase")]
pub struct FederatedSearch {
pub queries: Vec<SearchQueryWithIndex>,
#[deserr(default)]
pub federation: Option<Federation>,
}
#[derive(Serialize, Deserialize, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct FederatedSearchResult {
pub hits: Vec<SearchHit>,
pub processing_time_ms: u128,
#[serde(flatten)]
pub hits_info: HitsInfo,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub semantic_hit_count: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<BTreeMap<String, BTreeMap<String, u64>>>)]
pub facet_distribution: Option<BTreeMap<String, IndexMap<String, u64>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub facet_stats: Option<BTreeMap<String, FacetStats>>,
#[serde(default, skip_serializing_if = "FederatedFacets::is_empty")]
pub facets_by_index: FederatedFacets,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub remote_errors: Option<BTreeMap<String, ResponseError>>,
// These fields are only used for analytics purposes
#[serde(skip)]
pub degraded: bool,
#[serde(skip)]
pub used_negative_operator: bool,
}
impl fmt::Debug for FederatedSearchResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let FederatedSearchResult {
hits,
processing_time_ms,
hits_info,
semantic_hit_count,
degraded,
used_negative_operator,
facet_distribution,
facet_stats,
facets_by_index,
remote_errors,
} = self;
let mut debug = f.debug_struct("SearchResult");
// The most important thing when looking at a search result is the time it took to process
debug.field("processing_time_ms", &processing_time_ms);
debug.field("hits", &format!("[{} hits returned]", hits.len()));
debug.field("hits_info", &hits_info);
if *used_negative_operator {
debug.field("used_negative_operator", used_negative_operator);
}
if *degraded {
debug.field("degraded", degraded);
}
if let Some(facet_distribution) = facet_distribution {
debug.field("facet_distribution", &facet_distribution);
}
if let Some(facet_stats) = facet_stats {
debug.field("facet_stats", &facet_stats);
}
if let Some(semantic_hit_count) = semantic_hit_count {
debug.field("semantic_hit_count", &semantic_hit_count);
}
if !facets_by_index.is_empty() {
debug.field("facets_by_index", &facets_by_index);
}
if !remote_errors.is_none() {
debug.field("remote_errors", &remote_errors);
}
debug.finish()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)]
pub struct FederatedFacets(pub BTreeMap<String, ComputedFacets>);
impl FederatedFacets {
pub fn insert(&mut self, index: String, facets: Option<ComputedFacets>) {
if let Some(facets) = facets {
self.0.insert(index, facets);
}
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn merge(
self,
MergeFacets { max_values_per_facet }: MergeFacets,
facet_order: BTreeMap<String, (String, OrderBy)>,
) -> Option<ComputedFacets> {
if self.is_empty() {
return None;
}
let mut distribution: BTreeMap<String, _> = Default::default();
let mut stats: BTreeMap<String, FacetStats> = Default::default();
for facets_by_index in self.0.into_values() {
for (facet, index_distribution) in facets_by_index.distribution {
match distribution.entry(facet) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(index_distribution);
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
let distribution = entry.get_mut();
for (value, index_count) in index_distribution {
distribution
.entry(value)
.and_modify(|count| *count += index_count)
.or_insert(index_count);
}
}
}
}
for (facet, index_stats) in facets_by_index.stats {
match stats.entry(facet) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(index_stats);
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
let stats = entry.get_mut();
stats.min = f64::min(stats.min, index_stats.min);
stats.max = f64::max(stats.max, index_stats.max);
}
}
}
}
// fixup order
for (facet, values) in &mut distribution {
let order_by = facet_order.get(facet).map(|(_, order)| *order).unwrap_or_default();
match order_by {
OrderBy::Lexicographic => {
values.sort_unstable_by(|left, _, right, _| left.cmp(right))
}
OrderBy::Count => {
values.sort_unstable_by(|_, left, _, right| {
left.cmp(right)
// biggest first
.reverse()
})
}
}
if let Some(max_values_per_facet) = max_values_per_facet {
values.truncate(max_values_per_facet)
};
}
Some(ComputedFacets { distribution, stats })
}
}

View File

@ -0,0 +1,88 @@
use std::cmp::Ordering;
use meilisearch_types::milli::score_details::{self, WeightedScoreValue};
pub fn compare(
mut left_it: impl Iterator<Item = WeightedScoreValue>,
left_weighted_global_score: f64,
mut right_it: impl Iterator<Item = WeightedScoreValue>,
right_weighted_global_score: f64,
) -> Ordering {
loop {
let left = left_it.next();
let right = right_it.next();
match (left, right) {
(None, None) => return Ordering::Equal,
(None, Some(_)) => return Ordering::Less,
(Some(_), None) => return Ordering::Greater,
(
Some(
WeightedScoreValue::WeightedScore(left) | WeightedScoreValue::VectorSort(left),
),
Some(
WeightedScoreValue::WeightedScore(right)
| WeightedScoreValue::VectorSort(right),
),
) => {
if (left - right).abs() <= f64::EPSILON {
continue;
}
return left.partial_cmp(&right).unwrap();
}
(
Some(WeightedScoreValue::Sort { asc: left_asc, value: left }),
Some(WeightedScoreValue::Sort { asc: right_asc, value: right }),
) => {
if left_asc != right_asc {
return left_weighted_global_score
.partial_cmp(&right_weighted_global_score)
.unwrap();
}
match score_details::compare_sort_values(left_asc, &left, &right) {
Ordering::Equal => continue,
order => return order,
}
}
(
Some(WeightedScoreValue::GeoSort { asc: left_asc, distance: left }),
Some(WeightedScoreValue::GeoSort { asc: right_asc, distance: right }),
) => {
if left_asc != right_asc {
continue;
}
match (left, right) {
(None, None) => continue,
(None, Some(_)) => return Ordering::Less,
(Some(_), None) => return Ordering::Greater,
(Some(left), Some(right)) => {
if (left - right).abs() <= f64::EPSILON {
continue;
}
return left.partial_cmp(&right).unwrap();
}
}
}
// not comparable details, use global
(Some(WeightedScoreValue::WeightedScore(_)), Some(_))
| (Some(_), Some(WeightedScoreValue::WeightedScore(_)))
| (Some(WeightedScoreValue::VectorSort(_)), Some(_))
| (Some(_), Some(WeightedScoreValue::VectorSort(_)))
| (Some(WeightedScoreValue::GeoSort { .. }), Some(WeightedScoreValue::Sort { .. }))
| (Some(WeightedScoreValue::Sort { .. }), Some(WeightedScoreValue::GeoSort { .. })) => {
let left_count = left_it.count();
let right_count = right_it.count();
// compare how many remaining groups of rules each side has.
// the group with the most remaining groups wins.
return left_count
.cmp(&right_count)
// breaks ties with the global ranking score
.then_with(|| {
left_weighted_global_score
.partial_cmp(&right_weighted_global_score)
.unwrap()
});
}
}
}
}

View File

@ -30,7 +30,7 @@ use milli::{
MatchBounds, MatcherBuilder, SortError, TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET,
};
use regex::Regex;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
#[cfg(test)]
mod mod_test;
@ -41,7 +41,7 @@ use crate::error::MeilisearchHttpError;
mod federated;
pub use federated::{
perform_federated_search, FederatedSearch, FederatedSearchResult, Federation,
FederationOptions, MergeFacets,
FederationOptions, MergeFacets, PROXY_SEARCH_HEADER, PROXY_SEARCH_HEADER_VALUE,
};
mod ranking_rules;
@ -119,7 +119,7 @@ pub struct SearchQuery {
pub locales: Option<Vec<Locale>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Deserr, ToSchema)]
#[derive(Debug, Clone, Copy, PartialEq, Deserr, ToSchema, Serialize)]
#[deserr(try_from(f64) = TryFrom::try_from -> InvalidSearchRankingScoreThreshold)]
pub struct RankingScoreThreshold(f64);
impl std::convert::TryFrom<f64> for RankingScoreThreshold {
@ -275,11 +275,13 @@ impl fmt::Debug for SearchQuery {
}
}
#[derive(Debug, Clone, Default, PartialEq, Deserr, ToSchema)]
#[derive(Debug, Clone, Default, PartialEq, Deserr, ToSchema, Serialize)]
#[deserr(error = DeserrJsonError<InvalidSearchHybridQuery>, rename_all = camelCase, deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub struct HybridQuery {
#[deserr(default, error = DeserrJsonError<InvalidSearchSemanticRatio>, default)]
#[schema(value_type = f32, default)]
#[serde(default)]
pub semantic_ratio: SemanticRatio,
#[deserr(error = DeserrJsonError<InvalidSearchEmbedder>)]
pub embedder: String,
@ -369,7 +371,7 @@ impl SearchKind {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Deserr)]
#[derive(Debug, Clone, Copy, PartialEq, Deserr, Serialize)]
#[deserr(try_from(f32) = TryFrom::try_from -> InvalidSearchSemanticRatio)]
pub struct SemanticRatio(f32);
@ -411,8 +413,9 @@ impl SearchQuery {
// This struct contains the fields of `SearchQuery` inline.
// This is because neither deserr nor serde support `flatten` when using `deny_unknown_fields.
// The `From<SearchQueryWithIndex>` implementation ensures both structs remain up to date.
#[derive(Debug, Clone, PartialEq, Deserr, ToSchema)]
#[derive(Debug, Clone, Serialize, PartialEq, Deserr, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct SearchQueryWithIndex {
#[deserr(error = DeserrJsonError<InvalidIndexUid>, missing_field_error = DeserrJsonError::missing_index_uid)]
@ -493,6 +496,72 @@ impl SearchQueryWithIndex {
self.facets.as_deref().filter(|v| !v.is_empty())
}
pub fn from_index_query_federation(
index_uid: IndexUid,
query: SearchQuery,
federation_options: Option<FederationOptions>,
) -> Self {
let SearchQuery {
q,
vector,
hybrid,
offset,
limit,
page,
hits_per_page,
attributes_to_retrieve,
retrieve_vectors,
attributes_to_crop,
crop_length,
attributes_to_highlight,
show_matches_position,
show_ranking_score,
show_ranking_score_details,
filter,
sort,
distinct,
facets,
highlight_pre_tag,
highlight_post_tag,
crop_marker,
matching_strategy,
attributes_to_search_on,
ranking_score_threshold,
locales,
} = query;
SearchQueryWithIndex {
index_uid,
q,
vector,
hybrid,
offset: if offset == DEFAULT_SEARCH_OFFSET() { None } else { Some(offset) },
limit: if limit == DEFAULT_SEARCH_LIMIT() { None } else { Some(limit) },
page,
hits_per_page,
attributes_to_retrieve,
retrieve_vectors,
attributes_to_crop,
crop_length,
attributes_to_highlight,
show_ranking_score,
show_ranking_score_details,
show_matches_position,
filter,
sort,
distinct,
facets,
highlight_pre_tag,
highlight_post_tag,
crop_marker,
matching_strategy,
attributes_to_search_on,
ranking_score_threshold,
locales,
federation_options,
}
}
pub fn into_index_query_federation(self) -> (IndexUid, SearchQuery, Option<FederationOptions>) {
let SearchQueryWithIndex {
index_uid,
@ -620,8 +689,9 @@ impl TryFrom<Value> for ExternalDocumentId {
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserr, ToSchema)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserr, ToSchema, Serialize)]
#[deserr(rename_all = camelCase)]
#[serde(rename_all = "camelCase")]
pub enum MatchingStrategy {
/// Remove query words from last to first
Last,
@ -667,19 +737,19 @@ impl From<FacetValuesSort> for OrderBy {
}
}
#[derive(Debug, Clone, Serialize, PartialEq, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
pub struct SearchHit {
#[serde(flatten)]
#[schema(additional_properties, inline, value_type = HashMap<String, Value>)]
pub document: Document,
#[serde(rename = "_formatted", skip_serializing_if = "Document::is_empty")]
#[serde(default, rename = "_formatted", skip_serializing_if = "Document::is_empty")]
#[schema(additional_properties, value_type = HashMap<String, Value>)]
pub formatted: Document,
#[serde(rename = "_matchesPosition", skip_serializing_if = "Option::is_none")]
#[serde(default, rename = "_matchesPosition", skip_serializing_if = "Option::is_none")]
pub matches_position: Option<MatchesPosition>,
#[serde(rename = "_rankingScore", skip_serializing_if = "Option::is_none")]
#[serde(default, rename = "_rankingScore", skip_serializing_if = "Option::is_none")]
pub ranking_score: Option<f64>,
#[serde(rename = "_rankingScoreDetails", skip_serializing_if = "Option::is_none")]
#[serde(default, rename = "_rankingScoreDetails", skip_serializing_if = "Option::is_none")]
pub ranking_score_details: Option<serde_json::Map<String, serde_json::Value>>,
}
@ -767,7 +837,7 @@ pub struct SearchResultWithIndex {
pub result: SearchResult,
}
#[derive(Serialize, Debug, Clone, PartialEq, Eq, ToSchema)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, ToSchema)]
#[serde(untagged)]
pub enum HitsInfo {
#[serde(rename_all = "camelCase")]
@ -778,7 +848,7 @@ pub enum HitsInfo {
OffsetLimit { limit: usize, offset: usize, estimated_total_hits: usize },
}
#[derive(Serialize, Debug, Clone, PartialEq, ToSchema)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)]
pub struct FacetStats {
pub min: f64,
pub max: f64,
@ -1061,7 +1131,7 @@ pub fn perform_search(
Ok(result)
}
#[derive(Debug, Clone, Default, Serialize, ToSchema)]
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)]
pub struct ComputedFacets {
#[schema(value_type = BTreeMap<String, BTreeMap<String, u64>>)]
pub distribution: BTreeMap<String, IndexMap<String, u64>>,

View File

@ -421,7 +421,7 @@ async fn error_add_api_key_invalid_parameters_actions() {
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response, { ".createdAt" => "[ignored]", ".updatedAt" => "[ignored]" }), @r###"
{
"message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`",
"message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `network.get`, `network.update`",
"code": "invalid_api_key_actions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"

View File

@ -68,6 +68,8 @@ pub static AUTHORIZATIONS: Lazy<HashMap<(&'static str, &'static str), HashSet<&'
("GET", "/keys") => hashset!{"keys.get", "*"},
("GET", "/experimental-features") => hashset!{"experimental.get", "*"},
("PATCH", "/experimental-features") => hashset!{"experimental.update", "*"},
("GET", "/network") => hashset!{"network.get", "*"},
("PATCH", "/network") => hashset!{"network.update", "*"},
};
authorizations

View File

@ -93,7 +93,7 @@ async fn create_api_key_bad_actions() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###"
{
"message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`",
"message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `network.get`, `network.update`",
"code": "invalid_api_key_actions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"

View File

@ -1908,7 +1908,8 @@ async fn import_dump_v6_containing_experimental_features() {
"metrics": false,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -2069,7 +2070,8 @@ async fn generate_and_import_dump_containing_vectors() {
"metrics": false,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);

View File

@ -21,7 +21,8 @@ async fn experimental_features() {
"metrics": false,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -33,7 +34,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -45,7 +47,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -58,7 +61,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -71,7 +75,8 @@ async fn experimental_features() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
}
@ -91,7 +96,8 @@ async fn experimental_feature_metrics() {
"metrics": true,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false
"containsFilter": false,
"proxySearch": false
}
"###);
@ -146,7 +152,7 @@ async fn errors() {
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`",
"message": "Unknown field `NotAFeature`: expected one of `metrics`, `logsRoute`, `editDocumentsByFunction`, `containsFilter`, `proxySearch`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"

View File

@ -1,7 +1,7 @@
use std::cmp::Ordering;
use itertools::Itertools;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use crate::distance_between_two_points;
@ -36,6 +36,15 @@ enum RankOrValue<'a> {
Score(f64),
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum WeightedScoreValue {
WeightedScore(f64),
Sort { asc: bool, value: serde_json::Value },
GeoSort { asc: bool, distance: Option<f64> },
VectorSort(f64),
}
impl ScoreDetails {
pub fn local_score(&self) -> Option<f64> {
self.rank().map(Rank::local_score)
@ -87,6 +96,30 @@ impl ScoreDetails {
})
}
pub fn weighted_score_values<'a>(
details: impl Iterator<Item = &'a Self> + 'a,
weight: f64,
) -> impl Iterator<Item = WeightedScoreValue> + 'a {
details
.map(ScoreDetails::rank_or_value)
.coalesce(|left, right| match (left, right) {
(RankOrValue::Rank(left), RankOrValue::Rank(right)) => {
Ok(RankOrValue::Rank(Rank::merge(left, right)))
}
(left, right) => Err((left, right)),
})
.map(move |rank_or_value| match rank_or_value {
RankOrValue::Rank(r) => WeightedScoreValue::WeightedScore(r.local_score() * weight),
RankOrValue::Sort(s) => {
WeightedScoreValue::Sort { asc: s.ascending, value: s.value.clone() }
}
RankOrValue::GeoSort(g) => {
WeightedScoreValue::GeoSort { asc: g.ascending, distance: g.distance() }
}
RankOrValue::Score(s) => WeightedScoreValue::VectorSort(s * weight),
})
}
fn rank_or_value(&self) -> RankOrValue<'_> {
match self {
ScoreDetails::Words(w) => RankOrValue::Rank(w.rank()),
@ -423,34 +456,58 @@ pub struct Sort {
pub value: serde_json::Value,
}
pub fn compare_sort_values(
ascending: bool,
left: &serde_json::Value,
right: &serde_json::Value,
) -> Ordering {
use serde_json::Value::*;
match (left, right) {
(Null, Null) => Ordering::Equal,
(Null, _) => Ordering::Less,
(_, Null) => Ordering::Greater,
// numbers are always before strings
(Number(_), String(_)) => Ordering::Greater,
(String(_), Number(_)) => Ordering::Less,
(Number(left), Number(right)) => {
// FIXME: unwrap permitted here?
let order = left
.as_f64()
.unwrap()
.partial_cmp(&right.as_f64().unwrap())
.unwrap_or(Ordering::Equal);
// 12 < 42, and when ascending, we want to see 12 first, so the smallest.
// Hence, when ascending, smaller is better
if ascending {
order.reverse()
} else {
order
}
}
(String(left), String(right)) => {
let order = left.cmp(right);
// Taking e.g. "a" and "z"
// "a" < "z", and when ascending, we want to see "a" first, so the smallest.
// Hence, when ascending, smaller is better
if ascending {
order.reverse()
} else {
order
}
}
(left, right) => {
tracing::warn!(%left, %right, "sort values that are neither numbers, strings or null, handling as equal");
Ordering::Equal
}
}
}
impl PartialOrd for Sort {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.ascending != other.ascending {
return None;
}
match (&self.value, &other.value) {
(serde_json::Value::Null, serde_json::Value::Null) => Some(Ordering::Equal),
(serde_json::Value::Null, _) => Some(Ordering::Less),
(_, serde_json::Value::Null) => Some(Ordering::Greater),
// numbers are always before strings
(serde_json::Value::Number(_), serde_json::Value::String(_)) => Some(Ordering::Greater),
(serde_json::Value::String(_), serde_json::Value::Number(_)) => Some(Ordering::Less),
(serde_json::Value::Number(left), serde_json::Value::Number(right)) => {
// FIXME: unwrap permitted here?
let order = left.as_f64().unwrap().partial_cmp(&right.as_f64().unwrap())?;
// 12 < 42, and when ascending, we want to see 12 first, so the smallest.
// Hence, when ascending, smaller is better
Some(if self.ascending { order.reverse() } else { order })
}
(serde_json::Value::String(left), serde_json::Value::String(right)) => {
let order = left.cmp(right);
// Taking e.g. "a" and "z"
// "a" < "z", and when ascending, we want to see "a" first, so the smallest.
// Hence, when ascending, smaller is better
Some(if self.ascending { order.reverse() } else { order })
}
_ => None,
}
Some(compare_sort_values(self.ascending, &self.value, &other.value))
}
}

View File

@ -11,7 +11,7 @@ use either::Either;
pub use matching_words::MatchingWords;
use matching_words::{MatchType, PartialMatch};
use r#match::{Match, MatchPosition};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use simple_token_kind::SimpleTokenKind;
use utoipa::ToSchema;
@ -101,11 +101,11 @@ impl FormatOptions {
}
}
#[derive(Serialize, Debug, Clone, PartialEq, Eq, ToSchema)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, ToSchema)]
pub struct MatchBounds {
pub start: usize,
pub length: usize,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none", default)]
pub indices: Option<Vec<usize>>,
}