attach index name in errors

# Conflicts:
#	crates/index-scheduler/src/batch.rs

# Conflicts:
#	crates/index-scheduler/src/batch.rs
#	crates/meilisearch/src/search/mod.rs
This commit is contained in:
airycanon 2024-11-22 14:11:56 +08:00
parent dd76eaaaec
commit bb3291fd49
12 changed files with 68 additions and 39 deletions

View File

@ -122,8 +122,11 @@ pub enum Error {
Dump(#[from] dump::Error), Dump(#[from] dump::Error),
#[error(transparent)] #[error(transparent)]
Heed(#[from] heed::Error), Heed(#[from] heed::Error),
#[error(transparent)] #[error("{}", match .index_name {
Milli(#[from] milli::Error), Some(name) if !name.is_empty() => format!("Index `{}`: {error}", name),
_ => format!("{error}")
})]
Milli { error: milli::Error, index_name: Option<String> },
#[error("An unexpected crash occurred when processing the task.")] #[error("An unexpected crash occurred when processing the task.")]
ProcessBatchPanicked, ProcessBatchPanicked,
#[error(transparent)] #[error(transparent)]
@ -190,7 +193,7 @@ impl Error {
| Error::AbortedTask | Error::AbortedTask
| Error::Dump(_) | Error::Dump(_)
| Error::Heed(_) | Error::Heed(_)
| Error::Milli(_) | Error::Milli { .. }
| Error::ProcessBatchPanicked | Error::ProcessBatchPanicked
| Error::FileStore(_) | Error::FileStore(_)
| Error::IoError(_) | Error::IoError(_)
@ -209,6 +212,10 @@ impl Error {
pub fn with_custom_error_code(self, code: Code) -> Self { pub fn with_custom_error_code(self, code: Code) -> Self {
Self::WithCustomErrorCode(code, Box::new(self)) Self::WithCustomErrorCode(code, Box::new(self))
} }
pub fn from_milli(error: milli::Error, index_name: Option<String>) -> Self {
Self::Milli { error, index_name }
}
} }
impl ErrorCode for Error { impl ErrorCode for Error {
@ -236,7 +243,7 @@ impl ErrorCode for Error {
// TODO: not sure of the Code to use // TODO: not sure of the Code to use
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(), Error::Milli { error, .. } => error.error_code(),
Error::ProcessBatchPanicked => Code::Internal, Error::ProcessBatchPanicked => Code::Internal,
Error::Heed(e) => e.error_code(), Error::Heed(e) => e.error_code(),
Error::HeedTransaction(e) => e.error_code(), Error::HeedTransaction(e) => e.error_code(),

View File

@ -3,13 +3,13 @@ use std::path::Path;
use std::time::Duration; use std::time::Duration;
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions}; use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
use meilisearch_types::milli::Index; use meilisearch_types::milli::{Index, Result};
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing}; use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
use crate::lru::{InsertionOutcome, LruMap}; use crate::lru::{InsertionOutcome, LruMap};
use crate::{clamp_to_page_size, Result}; use crate::{clamp_to_page_size};
/// Keep an internally consistent view of the open indexes in memory. /// Keep an internally consistent view of the open indexes in memory.
/// ///

View File

@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use tracing::error; use tracing::error;
use uuid::Uuid; use uuid::Uuid;
use meilisearch_types::milli;
use self::index_map::IndexMap; use self::index_map::IndexMap;
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing}; use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
use crate::uuid_codec::UuidCodec; use crate::uuid_codec::UuidCodec;
@ -121,7 +121,7 @@ impl IndexStats {
/// # Parameters /// # Parameters
/// ///
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`. /// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
pub fn new(index: &Index, rtxn: &RoTxn) -> Result<Self> { pub fn new(index: &Index, rtxn: &RoTxn) -> milli::Result<Self> {
Ok(IndexStats { Ok(IndexStats {
number_of_documents: index.number_of_documents(rtxn)?, number_of_documents: index.number_of_documents(rtxn)?,
database_size: index.on_disk_size()?, database_size: index.on_disk_size()?,
@ -189,7 +189,7 @@ impl IndexMapper {
date, date,
self.enable_mdb_writemap, self.enable_mdb_writemap,
self.index_base_map_size, self.index_base_map_size,
)?; ).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
wtxn.commit()?; wtxn.commit()?;
@ -357,7 +357,8 @@ impl IndexMapper {
}; };
let index_path = self.base_path.join(uuid.to_string()); let index_path = self.base_path.join(uuid.to_string());
// take the lock to reopen the environment. // take the lock to reopen the environment.
reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?; reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)
.map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
continue; continue;
} }
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
@ -378,7 +379,7 @@ impl IndexMapper {
None, None,
self.enable_mdb_writemap, self.enable_mdb_writemap,
self.index_base_map_size, self.index_base_map_size,
)?; ).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
} }
Available(index) => break index, Available(index) => break index,
Closing(_) => { Closing(_) => {
@ -459,7 +460,7 @@ impl IndexMapper {
None => { None => {
let index = self.index(rtxn, index_uid)?; let index = self.index(rtxn, index_uid)?;
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
IndexStats::new(&index, &index_rtxn) IndexStats::new(&index, &index_rtxn).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))
} }
} }
} }

View File

@ -1678,9 +1678,9 @@ impl IndexScheduler {
tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks."); tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks.");
} }
// If we have an abortion error we must stop the tick here and re-schedule tasks. // If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError( Err(Error::Milli{
milli::InternalError::AbortedIndexation, error: milli::Error::InternalError(milli::InternalError::AbortedIndexation), ..
))) })
| Err(Error::AbortedTask) => { | Err(Error::AbortedTask) => {
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::AbortedIndexation); self.breakpoint(Breakpoint::AbortedIndexation);
@ -1699,9 +1699,9 @@ impl IndexScheduler {
// 2. close the associated environment // 2. close the associated environment
// 3. resize it // 3. resize it
// 4. re-schedule tasks // 4. re-schedule tasks
Err(Error::Milli(milli::Error::UserError( Err(Error::Milli {
milli::UserError::MaxDatabaseSizeReached, error: milli::Error::UserError(milli::UserError::MaxDatabaseSizeReached), ..
))) if index_uid.is_some() => { }) if index_uid.is_some() => {
// fixme: add index_uid to match to avoid the unwrap // fixme: add index_uid to match to avoid the unwrap
let index_uid = index_uid.unwrap(); let index_uid = index_uid.unwrap();
// fixme: handle error more gracefully? not sure when this could happen // fixme: handle error more gracefully? not sure when this could happen
@ -1942,6 +1942,7 @@ impl IndexScheduler {
// TODO: consider using a type alias or a struct embedder/template // TODO: consider using a type alias or a struct embedder/template
pub fn embedders( pub fn embedders(
&self, &self,
index_uid: String,
embedding_configs: Vec<IndexEmbeddingConfig>, embedding_configs: Vec<IndexEmbeddingConfig>,
) -> Result<EmbeddingConfigs> { ) -> Result<EmbeddingConfigs> {
let res: Result<_> = embedding_configs let res: Result<_> = embedding_configs
@ -1953,7 +1954,10 @@ impl IndexScheduler {
.. ..
}| { }| {
let prompt = let prompt =
Arc::new(prompt.try_into().map_err(meilisearch_types::milli::Error::from)?); Arc::new(prompt.try_into()
.map_err(meilisearch_types::milli::Error::from)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?
);
// optimistically return existing embedder // optimistically return existing embedder
{ {
let embedders = self.embedders.read().unwrap(); let embedders = self.embedders.read().unwrap();
@ -1969,7 +1973,8 @@ impl IndexScheduler {
let embedder = Arc::new( let embedder = Arc::new(
Embedder::new(embedder_options.clone()) Embedder::new(embedder_options.clone())
.map_err(meilisearch_types::milli::vector::Error::from) .map_err(meilisearch_types::milli::vector::Error::from)
.map_err(meilisearch_types::milli::Error::from)?, .map_err(meilisearch_types::milli::Error::from)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
); );
{ {
let mut embedders = self.embedders.write().unwrap(); let mut embedders = self.embedders.write().unwrap();

View File

@ -7,6 +7,7 @@ use meilisearch_types::index_uid::{IndexUid, IndexUidFormatError};
use meilisearch_types::milli::OrderBy; use meilisearch_types::milli::OrderBy;
use serde_json::Value; use serde_json::Value;
use tokio::task::JoinError; use tokio::task::JoinError;
use meilisearch_types::milli;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum MeilisearchHttpError { pub enum MeilisearchHttpError {
@ -62,8 +63,11 @@ pub enum MeilisearchHttpError {
HeedError(#[from] meilisearch_types::heed::Error), HeedError(#[from] meilisearch_types::heed::Error),
#[error(transparent)] #[error(transparent)]
IndexScheduler(#[from] index_scheduler::Error), IndexScheduler(#[from] index_scheduler::Error),
#[error(transparent)] #[error("{}", match .index_name {
Milli(#[from] meilisearch_types::milli::Error), Some(name) if !name.is_empty() => format!("Index `{}`: {error}", name),
_ => format!("{error}")
})]
Milli { error: meilisearch_types::milli::Error, index_name: Option<String> },
#[error(transparent)] #[error(transparent)]
Payload(#[from] PayloadError), Payload(#[from] PayloadError),
#[error(transparent)] #[error(transparent)]
@ -76,6 +80,12 @@ pub enum MeilisearchHttpError {
MissingSearchHybrid, MissingSearchHybrid,
} }
impl MeilisearchHttpError {
pub(crate) fn from_milli(error: milli::Error, index_name: Option<String>) -> Self {
Self::Milli { error, index_name }
}
}
impl ErrorCode for MeilisearchHttpError { impl ErrorCode for MeilisearchHttpError {
fn error_code(&self) -> Code { fn error_code(&self) -> Code {
match self { match self {
@ -95,7 +105,7 @@ impl ErrorCode for MeilisearchHttpError {
MeilisearchHttpError::SerdeJson(_) => Code::Internal, MeilisearchHttpError::SerdeJson(_) => Code::Internal,
MeilisearchHttpError::HeedError(_) => Code::Internal, MeilisearchHttpError::HeedError(_) => Code::Internal,
MeilisearchHttpError::IndexScheduler(e) => e.error_code(), MeilisearchHttpError::IndexScheduler(e) => e.error_code(),
MeilisearchHttpError::Milli(e) => e.error_code(), MeilisearchHttpError::Milli{error, ..} => error.error_code(),
MeilisearchHttpError::Payload(e) => e.error_code(), MeilisearchHttpError::Payload(e) => e.error_code(),
MeilisearchHttpError::FileStore(_) => Code::Internal, MeilisearchHttpError::FileStore(_) => Code::Internal,
MeilisearchHttpError::DocumentFormat(e) => e.error_code(), MeilisearchHttpError::DocumentFormat(e) => e.error_code(),

View File

@ -395,6 +395,7 @@ fn import_dump(
for index_reader in dump_reader.indexes()? { for index_reader in dump_reader.indexes()? {
let mut index_reader = index_reader?; let mut index_reader = index_reader?;
let metadata = index_reader.metadata(); let metadata = index_reader.metadata();
let uid = metadata.uid.clone();
tracing::info!("Importing index `{}`.", metadata.uid); tracing::info!("Importing index `{}`.", metadata.uid);
let date = Some((metadata.created_at, metadata.updated_at)); let date = Some((metadata.created_at, metadata.updated_at));
@ -432,7 +433,7 @@ fn import_dump(
let reader = DocumentsBatchReader::from_reader(reader)?; let reader = DocumentsBatchReader::from_reader(reader)?;
let embedder_configs = index.embedding_configs(&wtxn)?; let embedder_configs = index.embedding_configs(&wtxn)?;
let embedders = index_scheduler.embedders(embedder_configs)?; let embedders = index_scheduler.embedders(uid, embedder_configs)?;
let builder = milli::update::IndexDocuments::new( let builder = milli::update::IndexDocuments::new(
&mut wtxn, &mut wtxn,

View File

@ -185,7 +185,7 @@ pub async fn search(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?; let search_kind = search_kind(&search_query, &index_scheduler, index_uid.to_string(), &index, features)?;
let permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search( perform_facet_search(

View File

@ -5,7 +5,7 @@ use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::actix_web::{AwebJson, AwebQueryParameter};
use deserr::{DeserializeError, Deserr, ValuePointerRef}; use deserr::{DeserializeError, Deserr, ValuePointerRef};
use index_scheduler::IndexScheduler; use index_scheduler::{Error, IndexScheduler};
use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::{immutable_field_error, DeserrJsonError, DeserrQueryParamError}; use meilisearch_types::deserr::{immutable_field_error, DeserrJsonError, DeserrQueryParamError};
use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::deserr_codes::*;
@ -107,7 +107,7 @@ pub async fn list_indexes(
if !filters.is_index_authorized(uid) { if !filters.is_index_authorized(uid) {
return Ok(None); return Ok(None);
} }
Ok(Some(IndexView::new(uid.to_string(), index)?)) Ok(Some(IndexView::new(uid.to_string(), index).map_err(|e| Error::from_milli(e, Some(uid.to_string())))?))
})?; })?;
// Won't cause to open all indexes because IndexView doesn't keep the `Index` opened. // Won't cause to open all indexes because IndexView doesn't keep the `Index` opened.
let indexes: Vec<IndexView> = indexes.into_iter().flatten().collect(); let indexes: Vec<IndexView> = indexes.into_iter().flatten().collect();

View File

@ -243,11 +243,11 @@ pub async fn search_with_url_query(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index, features)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
let permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) perform_search(index_uid.to_string(), &index, query, search_kind, retrieve_vector, index_scheduler.features())
}) })
.await; .await;
permit.drop().await; permit.drop().await;
@ -287,12 +287,12 @@ pub async fn search_with_post(
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index, features)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
let permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) perform_search(index_uid.to_string(), &index, query, search_kind, retrieve_vectors, index_scheduler.features())
}) })
.await; .await;
permit.drop().await; permit.drop().await;
@ -314,6 +314,7 @@ pub async fn search_with_post(
pub fn search_kind( pub fn search_kind(
query: &SearchQuery, query: &SearchQuery,
index_scheduler: &IndexScheduler, index_scheduler: &IndexScheduler,
index_uid: String,
index: &milli::Index, index: &milli::Index,
features: RoFeatures, features: RoFeatures,
) -> Result<SearchKind, ResponseError> { ) -> Result<SearchKind, ResponseError> {
@ -332,7 +333,7 @@ pub fn search_kind(
(None, _, None) => Ok(SearchKind::KeywordOnly), (None, _, None) => Ok(SearchKind::KeywordOnly),
// hybrid.semantic_ratio == 1.0 => vector // hybrid.semantic_ratio == 1.0 => vector
(_, Some(HybridQuery { semantic_ratio, embedder }), v) if **semantic_ratio == 1.0 => { (_, Some(HybridQuery { semantic_ratio, embedder }), v) if **semantic_ratio == 1.0 => {
SearchKind::semantic(index_scheduler, index, embedder, v.map(|v| v.len())) SearchKind::semantic(index_scheduler, index_uid, index, embedder, v.map(|v| v.len()))
} }
// hybrid.semantic_ratio == 0.0 => keyword // hybrid.semantic_ratio == 0.0 => keyword
(_, Some(HybridQuery { semantic_ratio, embedder: _ }), _) if **semantic_ratio == 0.0 => { (_, Some(HybridQuery { semantic_ratio, embedder: _ }), _) if **semantic_ratio == 0.0 => {
@ -340,13 +341,14 @@ pub fn search_kind(
} }
// no query, hybrid, vector => semantic // no query, hybrid, vector => semantic
(None, Some(HybridQuery { semantic_ratio: _, embedder }), Some(v)) => { (None, Some(HybridQuery { semantic_ratio: _, embedder }), Some(v)) => {
SearchKind::semantic(index_scheduler, index, embedder, Some(v.len())) SearchKind::semantic(index_scheduler, index_uid, index, embedder, Some(v.len()))
} }
// query, no hybrid, no vector => keyword // query, no hybrid, no vector => keyword
(Some(_), None, None) => Ok(SearchKind::KeywordOnly), (Some(_), None, None) => Ok(SearchKind::KeywordOnly),
// query, hybrid, maybe vector => hybrid // query, hybrid, maybe vector => hybrid
(Some(_), Some(HybridQuery { semantic_ratio, embedder }), v) => SearchKind::hybrid( (Some(_), Some(HybridQuery { semantic_ratio, embedder }), v) => SearchKind::hybrid(
index_scheduler, index_scheduler,
index_uid,
index, index,
embedder, embedder,
**semantic_ratio, **semantic_ratio,

View File

@ -104,7 +104,7 @@ async fn similar(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let (embedder_name, embedder, quantized) = let (embedder_name, embedder, quantized) =
SearchKind::embedder(&index_scheduler, &index, &query.embedder, None)?; SearchKind::embedder(&index_scheduler, index_uid.to_string(), &index, &query.embedder, None)?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
perform_similar( perform_similar(

View File

@ -125,14 +125,16 @@ pub async fn multi_search_with_post(
}) })
.with_index(query_index)?; .with_index(query_index)?;
let index_uid_str = index_uid.to_string();
let search_kind = let search_kind =
search_kind(&query, index_scheduler.get_ref(), &index, features) search_kind(&query, index_scheduler.get_ref(), index_uid_str.clone(), &index, features)
.with_index(query_index)?; .with_index(query_index)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features) let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)
.with_index(query_index)?; .with_index(query_index)?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, features) perform_search(index_uid_str.clone(), &index, query, search_kind, retrieve_vector, features)
}) })
.await .await
.with_index(query_index)?; .with_index(query_index)?;

View File

@ -560,7 +560,7 @@ pub fn perform_federated_search(
// use an immediately invoked lambda to capture the result without returning from the function // use an immediately invoked lambda to capture the result without returning from the function
let res: Result<(), ResponseError> = (|| { let res: Result<(), ResponseError> = (|| {
let search_kind = search_kind(&query, index_scheduler, &index, features)?; let search_kind = search_kind(&query, index_scheduler, index_uid.to_string(), &index, features)?;
let canonicalization_kind = match (&search_kind, &query.q) { let canonicalization_kind = match (&search_kind, &query.q) {
(SearchKind::SemanticOnly { .. }, _) => { (SearchKind::SemanticOnly { .. }, _) => {
@ -636,7 +636,7 @@ pub fn perform_federated_search(
search.offset(0); search.offset(0);
search.limit(required_hit_count); search.limit(required_hit_count);
let (result, _semantic_hit_count) = super::search_from_kind(search_kind, search)?; let (result, _semantic_hit_count) = super::search_from_kind(index_uid.to_string(), search_kind, search)?;
let format = AttributesFormat { let format = AttributesFormat {
attributes_to_retrieve: query.attributes_to_retrieve, attributes_to_retrieve: query.attributes_to_retrieve,
retrieve_vectors, retrieve_vectors,
@ -670,7 +670,8 @@ pub fn perform_federated_search(
let formatter_builder = HitMaker::formatter_builder(matching_words, tokenizer); let formatter_builder = HitMaker::formatter_builder(matching_words, tokenizer);
let hit_maker = HitMaker::new(&index, &rtxn, format, formatter_builder)?; let hit_maker = HitMaker::new(&index, &rtxn, format, formatter_builder)
.map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid.to_string())))?;
results_by_query.push(SearchResultByQuery { results_by_query.push(SearchResultByQuery {
federation_options, federation_options,