mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
Changes to multi_search route
This commit is contained in:
parent
5c323cecc7
commit
83d71662aa
@ -10,12 +10,14 @@ use serde::Serialize;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::analytics::{Analytics, MultiSearchAggregator};
|
||||
use crate::error::MeilisearchHttpError;
|
||||
use crate::extractors::authentication::policies::ActionPolicy;
|
||||
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
||||
use crate::extractors::sequential_extractor::SeqHandler;
|
||||
use crate::routes::indexes::search::search_kind;
|
||||
use crate::search::{
|
||||
add_search_rules, perform_search, RetrieveVectors, SearchQueryWithIndex, SearchResultWithIndex,
|
||||
add_search_rules, perform_federated_search, perform_search, FederatedSearch, RetrieveVectors,
|
||||
SearchQueryWithIndex, SearchResultWithIndex,
|
||||
};
|
||||
use crate::search_queue::SearchQueue;
|
||||
|
||||
@ -28,85 +30,44 @@ struct SearchResults {
|
||||
results: Vec<SearchResultWithIndex>,
|
||||
}
|
||||
|
||||
#[derive(Debug, deserr::Deserr)]
|
||||
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||
pub struct SearchQueries {
|
||||
queries: Vec<SearchQueryWithIndex>,
|
||||
}
|
||||
|
||||
pub async fn multi_search_with_post(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
|
||||
search_queue: Data<SearchQueue>,
|
||||
params: AwebJson<SearchQueries, DeserrJsonError>,
|
||||
params: AwebJson<FederatedSearch, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let queries = params.into_inner().queries;
|
||||
|
||||
let mut multi_aggregate = MultiSearchAggregator::from_queries(&queries, &req);
|
||||
let features = index_scheduler.features();
|
||||
|
||||
// Since we don't want to process half of the search requests and then get a permit refused
|
||||
// we're going to get one permit for the whole duration of the multi-search request.
|
||||
let _permit = search_queue.try_get_search_permit().await?;
|
||||
|
||||
// Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only,
|
||||
// so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code
|
||||
// changes.
|
||||
let search_results: Result<_, (ResponseError, usize)> = async {
|
||||
let mut search_results = Vec::with_capacity(queries.len());
|
||||
for (query_index, (index_uid, mut query)) in
|
||||
queries.into_iter().map(SearchQueryWithIndex::into_index_query).enumerate()
|
||||
{
|
||||
debug!(on_index = query_index, parameters = ?query, "Multi-search");
|
||||
let federated_search = params.into_inner();
|
||||
|
||||
let mut multi_aggregate = MultiSearchAggregator::from_federated_search(&federated_search, &req);
|
||||
|
||||
let FederatedSearch { mut queries, federation } = federated_search;
|
||||
|
||||
let features = index_scheduler.features();
|
||||
|
||||
// regardless of federation, check authorization and apply search rules
|
||||
let auth = 'check_authorization: {
|
||||
for (query_index, federated_query) in queries.iter_mut().enumerate() {
|
||||
let index_uid = federated_query.index_uid.as_str();
|
||||
// Check index from API key
|
||||
if !index_scheduler.filters().is_index_authorized(&index_uid) {
|
||||
return Err(AuthenticationError::InvalidToken).with_index(query_index);
|
||||
if !index_scheduler.filters().is_index_authorized(index_uid) {
|
||||
break 'check_authorization Err(AuthenticationError::InvalidToken)
|
||||
.with_index(query_index);
|
||||
}
|
||||
// Apply search rules from tenant token
|
||||
if let Some(search_rules) = index_scheduler.filters().get_index_search_rules(&index_uid)
|
||||
if let Some(search_rules) = index_scheduler.filters().get_index_search_rules(index_uid)
|
||||
{
|
||||
add_search_rules(&mut query.filter, search_rules);
|
||||
add_search_rules(&mut federated_query.filter, search_rules);
|
||||
}
|
||||
|
||||
let index = index_scheduler
|
||||
.index(&index_uid)
|
||||
.map_err(|err| {
|
||||
let mut err = ResponseError::from(err);
|
||||
// Patch the HTTP status code to 400 as it defaults to 404 for `index_not_found`, but
|
||||
// here the resource not found is not part of the URL.
|
||||
err.code = StatusCode::BAD_REQUEST;
|
||||
err
|
||||
})
|
||||
.with_index(query_index)?;
|
||||
|
||||
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)
|
||||
.with_index(query_index)?;
|
||||
let retrieve_vector =
|
||||
RetrieveVectors::new(query.retrieve_vectors, features).with_index(query_index)?;
|
||||
|
||||
let search_result = tokio::task::spawn_blocking(move || {
|
||||
perform_search(&index, query, search_kind, retrieve_vector)
|
||||
})
|
||||
.await
|
||||
.with_index(query_index)?;
|
||||
|
||||
search_results.push(SearchResultWithIndex {
|
||||
index_uid: index_uid.into_inner(),
|
||||
result: search_result.with_index(query_index)?,
|
||||
});
|
||||
}
|
||||
Ok(search_results)
|
||||
}
|
||||
.await;
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if search_results.is_ok() {
|
||||
multi_aggregate.succeed();
|
||||
}
|
||||
analytics.post_multi_search(multi_aggregate);
|
||||
|
||||
let search_results = search_results.map_err(|(mut err, query_index)| {
|
||||
auth.map_err(|(mut err, query_index)| {
|
||||
// Add the query index that failed as context for the error message.
|
||||
// We're doing it only here and not directly in the `WithIndex` trait so that the `with_index` function returns a different type
|
||||
// of result and we can benefit from static typing.
|
||||
@ -114,9 +75,95 @@ pub async fn multi_search_with_post(
|
||||
err
|
||||
})?;
|
||||
|
||||
debug!(returns = ?search_results, "Multi-search");
|
||||
let response = match federation {
|
||||
Some(federation) => {
|
||||
let search_result = tokio::task::spawn_blocking(move || {
|
||||
perform_federated_search(&index_scheduler, queries, federation, features)
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(HttpResponse::Ok().json(SearchResults { results: search_results }))
|
||||
if let Ok(Ok(_)) = search_result {
|
||||
multi_aggregate.succeed();
|
||||
}
|
||||
|
||||
analytics.post_multi_search(multi_aggregate);
|
||||
HttpResponse::Ok().json(search_result??)
|
||||
}
|
||||
None => {
|
||||
// Explicitly expect a `(ResponseError, usize)` for the error type rather than `ResponseError` only,
|
||||
// so that `?` doesn't work if it doesn't use `with_index`, ensuring that it is not forgotten in case of code
|
||||
// changes.
|
||||
let search_results: Result<_, (ResponseError, usize)> = async {
|
||||
let mut search_results = Vec::with_capacity(queries.len());
|
||||
for (query_index, (index_uid, query, federation_options)) in queries
|
||||
.into_iter()
|
||||
.map(SearchQueryWithIndex::into_index_query_federation)
|
||||
.enumerate()
|
||||
{
|
||||
debug!(on_index = query_index, parameters = ?query, "Multi-search");
|
||||
|
||||
if federation_options.is_some() {
|
||||
return Err((
|
||||
MeilisearchHttpError::FederationOptionsInNonFederatedRequest(
|
||||
query_index,
|
||||
)
|
||||
.into(),
|
||||
query_index,
|
||||
));
|
||||
}
|
||||
|
||||
let index = index_scheduler
|
||||
.index(&index_uid)
|
||||
.map_err(|err| {
|
||||
let mut err = ResponseError::from(err);
|
||||
// Patch the HTTP status code to 400 as it defaults to 404 for `index_not_found`, but
|
||||
// here the resource not found is not part of the URL.
|
||||
err.code = StatusCode::BAD_REQUEST;
|
||||
err
|
||||
})
|
||||
.with_index(query_index)?;
|
||||
|
||||
let search_kind =
|
||||
search_kind(&query, index_scheduler.get_ref(), &index, features)
|
||||
.with_index(query_index)?;
|
||||
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)
|
||||
.with_index(query_index)?;
|
||||
|
||||
let search_result = tokio::task::spawn_blocking(move || {
|
||||
perform_search(&index, query, search_kind, retrieve_vector)
|
||||
})
|
||||
.await
|
||||
.with_index(query_index)?;
|
||||
|
||||
search_results.push(SearchResultWithIndex {
|
||||
index_uid: index_uid.into_inner(),
|
||||
result: search_result.with_index(query_index)?,
|
||||
});
|
||||
}
|
||||
Ok(search_results)
|
||||
}
|
||||
.await;
|
||||
|
||||
if search_results.is_ok() {
|
||||
multi_aggregate.succeed();
|
||||
}
|
||||
analytics.post_multi_search(multi_aggregate);
|
||||
|
||||
let search_results = search_results.map_err(|(mut err, query_index)| {
|
||||
// Add the query index that failed as context for the error message.
|
||||
// We're doing it only here and not directly in the `WithIndex` trait so that the `with_index` function returns a different type
|
||||
// of result and we can benefit from static typing.
|
||||
err.message = format!("Inside `.queries[{query_index}]`: {}", err.message);
|
||||
err
|
||||
})?;
|
||||
|
||||
debug!(returns = ?search_results, "Multi-search");
|
||||
|
||||
HttpResponse::Ok().json(SearchResults { results: search_results })
|
||||
}
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Local `Result` extension trait to avoid `map_err` boilerplate.
|
||||
|
Loading…
Reference in New Issue
Block a user