diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 903ec1217..176bdd410 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -448,20 +448,20 @@ impl IndexScheduler { match (document_import, settings) { ( Some(Batch::IndexOperation { - op: - IndexOperation::DocumentOperation { - primary_key, - documents_counts, - operations, - tasks: document_import_tasks, - .. - }, - .. - }), + op: + IndexOperation::DocumentOperation { + primary_key, + documents_counts, + operations, + tasks: document_import_tasks, + .. + }, + .. + }), Some(Batch::IndexOperation { - op: IndexOperation::Settings { settings, tasks: settings_tasks, .. }, - .. - }), + op: IndexOperation::Settings { settings, tasks: settings_tasks, .. }, + .. + }), ) => Ok(Some(Batch::IndexOperation { op: IndexOperation::SettingsAndDocumentOperation { index_uid, @@ -618,7 +618,12 @@ impl IndexScheduler { /// The list of tasks that were processed. The metadata of each task in the returned /// list is updated accordingly, with the exception of the its date fields /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). - #[tracing::instrument(level = "trace", skip(self, batch), target = "indexing::scheduler", fields(batch=batch.to_string()))] + #[tracing::instrument( + level = "trace", + skip(self, batch), + target = "indexing::scheduler", + fields(batch=batch.to_string()) + )] pub(crate) fn process_batch(&self, batch: Batch) -> Result> { #[cfg(test)] { @@ -627,6 +632,8 @@ impl IndexScheduler { self.breakpoint(crate::Breakpoint::InsideProcessBatch); } + let index_uid = batch.index_uid().map(String::from); + match batch { Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => { // 1. Retrieve the tasks that matched the query at enqueue-time. @@ -649,10 +656,10 @@ impl IndexScheduler { task.status = Status::Succeeded; match &mut task.details { Some(Details::TaskCancelation { - matched_tasks: _, - canceled_tasks, - original_filter: _, - }) => { + matched_tasks: _, + canceled_tasks, + original_filter: _, + }) => { *canceled_tasks = Some(canceled_tasks_content_uuids.len() as u64); } _ => unreachable!(), @@ -706,10 +713,10 @@ impl IndexScheduler { match &mut task.details { Some(Details::TaskDeletion { - matched_tasks: _, - deleted_tasks, - original_filter: _, - }) => { + matched_tasks: _, + deleted_tasks, + original_filter: _, + }) => { *deleted_tasks = Some(deleted_tasks_count); } _ => unreachable!(), @@ -765,7 +772,8 @@ impl IndexScheduler { let index = self.index_mapper.index(&rtxn, name)?; let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); fs::create_dir_all(&dst)?; - index.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; + index.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled) + .map_err(|e| Error::from_milli(e, Some(name.to_string())))?; } drop(rtxn); @@ -843,7 +851,6 @@ impl IndexScheduler { let (_, mut t) = ret?; let status = t.status; let content_file = t.content_uuid(); - // In the case we're dumping ourselves we want to be marked as finished // to not loop over ourselves indefinitely. if t.uid == task.uid { @@ -867,18 +874,24 @@ impl IndexScheduler { let content_file = self.file_store.get_update(content_file)?; let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(milli::Error::from)?; + .map_err(|e| Error::Milli { + error: e.into(), + index_name: index_uid.clone(), + })?; let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); - while let Some(doc) = - cursor.next_document().map_err(milli::Error::from)? + while let Some(doc) = cursor.next_document() + .map_err(|e| Error::Milli { + error: e.into(), + index_name: index_uid.clone(), + })? { dump_content_file.push_document(&obkv_to_object( &doc, &documents_batch_index, - )?)?; + ).map_err(|e| Error::from_milli(e, index_uid.clone()))?)?; } dump_content_file.flush()?; } @@ -892,27 +905,35 @@ impl IndexScheduler { let metadata = IndexMetadata { uid: uid.to_owned(), primary_key: index.primary_key(&rtxn)?.map(String::from), - created_at: index.created_at(&rtxn)?, - updated_at: index.updated_at(&rtxn)?, + created_at: index.created_at(&rtxn) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?, + updated_at: index.updated_at(&rtxn) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?, }; let mut index_dumper = dump.create_index(uid, &metadata)?; let fields_ids_map = index.fields_ids_map(&rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - let embedding_configs = index.embedding_configs(&rtxn)?; + let embedding_configs = index.embedding_configs(&rtxn) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + let documents = index.all_documents(&rtxn) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; // 3.1. Dump the documents - for ret in index.all_documents(&rtxn)? { + for ret in documents { if self.must_stop_processing.get() { return Err(Error::AbortedTask); } - let (id, doc) = ret?; + let (id, doc) = ret + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; - let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; + let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; 'inject_vectors: { - let embeddings = index.embeddings(&rtxn, id)?; + let embeddings = index.embeddings(&rtxn, id) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; if embeddings.is_empty() { break 'inject_vectors; @@ -923,7 +944,7 @@ impl IndexScheduler { .or_insert(serde_json::Value::Object(Default::default())); let serde_json::Value::Object(vectors) = vectors else { - return Err(milli::Error::UserError( + return Err(Error::from_milli(milli::Error::UserError( milli::UserError::InvalidVectorsMapType { document_id: { if let Ok(Some(Ok(index))) = index @@ -937,8 +958,7 @@ impl IndexScheduler { }, value: vectors.clone(), }, - ) - .into()); + ), Some(uid.to_string()))); }; for (embedder_name, embeddings) in embeddings { @@ -968,7 +988,7 @@ impl IndexScheduler { index, &rtxn, meilisearch_types::settings::SecretPolicy::RevealSecrets, - )?; + ).map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; index_dumper.settings(&settings)?; Ok(()) })?; @@ -1018,7 +1038,8 @@ impl IndexScheduler { // the entire batch. let res = || -> Result<()> { let index_rtxn = index.read_txn()?; - let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?; + let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; let mut wtxn = self.env.write_txn()?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; wtxn.commit()?; @@ -1060,7 +1081,7 @@ impl IndexScheduler { builder.execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), - )?; + ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; index_wtxn.commit()?; } @@ -1077,7 +1098,8 @@ impl IndexScheduler { let res = || -> Result<()> { let mut wtxn = self.env.write_txn()?; let index_rtxn = index.read_txn()?; - let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?; + let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; wtxn.commit()?; Ok(()) @@ -1100,9 +1122,11 @@ impl IndexScheduler { let number_of_documents = || -> Result { let index = self.index_mapper.index(&wtxn, &index_uid)?; let index_rtxn = index.read_txn()?; - Ok(index.number_of_documents(&index_rtxn)?) + Ok(index.number_of_documents(&index_rtxn) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))? + ) }() - .unwrap_or_default(); + .unwrap_or_default(); // The write transaction is directly owned and committed inside. match self.index_mapper.delete_index(wtxn, &index_uid) { @@ -1219,8 +1243,9 @@ impl IndexScheduler { operation: IndexOperation, ) -> Result> { match operation { - IndexOperation::DocumentClear { mut tasks, .. } => { - let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?; + IndexOperation::DocumentClear { index_uid, mut tasks, .. } => { + let count = milli::update::ClearDocuments::new(index_wtxn, index).execute() + .map_err(|e| Error::from_milli(e, Some(index_uid)))?; let mut first_clear_found = false; for task in &mut tasks { @@ -1240,7 +1265,7 @@ impl IndexScheduler { Ok(tasks) } IndexOperation::DocumentOperation { - index_uid: _, + index_uid, primary_key, method, documents_counts: _, @@ -1258,10 +1283,11 @@ impl IndexScheduler { // but to a different value, we can make the whole batch fail. Some(pk) => { if primary_key != pk { - return Err(milli::Error::from( - milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()), + return Err(Error::from_milli( + milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()).into(), + Some(index_uid.clone()), ) - .into()); + .into()); } } // if the primary key was set and there was no primary key set for this index @@ -1273,7 +1299,7 @@ impl IndexScheduler { builder.execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.clone().get(), - )?; + ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; primary_key_has_been_set = true; } } @@ -1281,9 +1307,10 @@ impl IndexScheduler { let config = IndexDocumentsConfig { update_method: method, ..Default::default() }; - let embedder_configs = index.embedding_configs(index_wtxn)?; + let embedder_configs = index.embedding_configs(index_wtxn) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense) - let embedders = self.embedders(embedder_configs)?; + let embedders = self.embedders(index_uid.clone(), embedder_configs)?; let mut builder = milli::update::IndexDocuments::new( index_wtxn, @@ -1292,24 +1319,25 @@ impl IndexScheduler { config, |indexing_step| tracing::trace!(?indexing_step, "Update"), || must_stop_processing.get(), - )?; + ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { match operation { DocumentOperation::Add(content_uuid) => { let content_file = self.file_store.get_update(content_uuid)?; let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(milli::Error::from)?; - let (new_builder, user_result) = builder.add_documents(reader)?; + .map_err(|e| Error::from_milli(e.into(), Some(index_uid.clone())))?; + let (new_builder, user_result) = builder.add_documents(reader) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; builder = new_builder; builder = builder.with_embedders(embedders.clone()); let received_documents = if let Some(Details::DocumentAdditionOrUpdate { - received_documents, - .. - }) = task.details + received_documents, + .. + }) = task.details { received_documents } else { @@ -1337,7 +1365,8 @@ impl IndexScheduler { } DocumentOperation::Delete(document_ids) => { let (new_builder, user_result) = - builder.remove_documents(document_ids)?; + builder.remove_documents(document_ids) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; builder = new_builder; // Uses Invariant: remove documents actually always returns Ok for the inner result let count = user_result.unwrap(); @@ -1361,7 +1390,7 @@ impl IndexScheduler { } if !tasks.iter().all(|res| res.error.is_some()) { - let addition = builder.execute()?; + let addition = builder.execute().map_err(|e| Error::from_milli(e, Some(index_uid)))?; tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } else if primary_key_has_been_set { // Everything failed but we've set a primary key. @@ -1372,12 +1401,12 @@ impl IndexScheduler { builder.execute( |indexing_step| tracing::trace!(update = ?indexing_step), || must_stop_processing.clone().get(), - )?; + ).map_err(|e| Error::from_milli(e, Some(index_uid)))?; } Ok(tasks) } - IndexOperation::DocumentEdition { mut task, .. } => { + IndexOperation::DocumentEdition { index_uid, mut task } => { let (filter, context, function) = if let KindWithContent::DocumentEdition { filter_expr, context, function, .. @@ -1395,6 +1424,7 @@ impl IndexScheduler { self.index_mapper.indexer_config(), self.must_stop_processing.clone(), index, + index_uid ); let (original_filter, context, function) = if let Some(Details::DocumentEdition { original_filter, @@ -1435,7 +1465,7 @@ impl IndexScheduler { Ok(vec![task]) } - IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => { + IndexOperation::DocumentDeletion { mut tasks, index_uid } => { let mut to_delete = RoaringBitmap::new(); let external_documents_ids = index.external_documents_ids(); @@ -1456,7 +1486,7 @@ impl IndexScheduler { deleted_documents: Some(will_be_removed), }); } - KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => { + KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } => { let before = to_delete.len(); let filter = match Filter::from_json(filter_expr) { Ok(filter) => filter, @@ -1467,7 +1497,7 @@ impl IndexScheduler { milli::Error::UserError( milli::UserError::InvalidFilterExpression { .. }, ) => Some( - Error::from(err) + Error::from_milli(err, Some(index_uid.clone())) .with_custom_error_code(Code::InvalidDocumentFilter) .into(), ), @@ -1481,9 +1511,9 @@ impl IndexScheduler { filter.evaluate(index_wtxn, index).map_err(|err| match err { milli::Error::UserError( milli::UserError::InvalidFilter(_), - ) => Error::from(err) + ) => Error::from_milli(err, Some(index_uid.clone())) .with_custom_error_code(Code::InvalidDocumentFilter), - e => e.into(), + e => Error::from_milli(e, Some(index_uid.clone())), }); match candidates { Ok(candidates) => to_delete |= candidates, @@ -1522,17 +1552,18 @@ impl IndexScheduler { config, |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), - )?; + ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; let (new_builder, _count) = - builder.remove_documents_from_db_no_batch(&to_delete)?; + builder.remove_documents_from_db_no_batch(&to_delete) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; builder = new_builder; - let _ = builder.execute()?; + let _ = builder.execute().map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; Ok(tasks) } - IndexOperation::Settings { index_uid: _, settings, mut tasks } => { + IndexOperation::Settings { index_uid, settings, mut tasks } => { let indexer_config = self.index_mapper.indexer_config(); let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); @@ -1550,7 +1581,7 @@ impl IndexScheduler { builder.execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), - )?; + ).map_err(|e| Error::from_milli(e, Some(index_uid)))?; Ok(tasks) } @@ -1742,16 +1773,17 @@ fn edit_documents_by_function<'a>( indexer_config: &IndexerConfig, must_stop_processing: MustStopProcessing, index: &'a Index, + index_uid: String, ) -> Result<(u64, u64)> { let candidates = match filter.as_ref().map(Filter::from_json) { Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { - Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) + Error::from_milli(err, Some(index_uid.clone())).with_custom_error_code(Code::InvalidDocumentFilter) } - e => e.into(), + e => Error::from_milli(e.into(), Some(index_uid.clone())), })?, None | Some(Ok(None)) => index.documents_ids(wtxn)?, - Some(Err(e)) => return Err(e.into()), + Some(Err(e)) => return Err(Error::from_milli(e.into(), Some(index_uid.clone()))), }; let config = IndexDocumentsConfig { @@ -1766,11 +1798,15 @@ fn edit_documents_by_function<'a>( config, |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), - )?; + ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; - let (new_builder, count) = builder.edit_documents(&candidates, context, code)?; + let (new_builder, count) = builder.edit_documents( + &candidates, + context, + code + ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; builder = new_builder; - let _ = builder.execute()?; + let _ = builder.execute().map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; Ok(count.unwrap()) } diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index 3bd378fd6..91579fd68 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -117,8 +117,11 @@ pub enum Error { Dump(#[from] dump::Error), #[error(transparent)] Heed(#[from] heed::Error), - #[error(transparent)] - Milli(#[from] milli::Error), + #[error("{}", match .index_name { + Some(name) if !name.is_empty() => format!("Index `{}`: {error}", name), + _ => format!("{error}") + })] + Milli { error: milli::Error, index_name: Option }, #[error("An unexpected crash occurred when processing the task.")] ProcessBatchPanicked, #[error(transparent)] @@ -183,7 +186,7 @@ impl Error { | Error::AbortedTask | Error::Dump(_) | Error::Heed(_) - | Error::Milli(_) + | Error::Milli { .. } | Error::ProcessBatchPanicked | Error::FileStore(_) | Error::IoError(_) @@ -202,6 +205,10 @@ impl Error { pub fn with_custom_error_code(self, code: Code) -> Self { Self::WithCustomErrorCode(code, Box::new(self)) } + + pub fn from_milli(error: milli::Error, index_name: Option) -> Self { + Self::Milli { error, index_name } + } } impl ErrorCode for Error { @@ -227,7 +234,7 @@ impl ErrorCode for Error { // TODO: not sure of the Code to use Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::Dump(e) => e.error_code(), - Error::Milli(e) => e.error_code(), + Error::Milli { error, .. } => error.error_code(), Error::ProcessBatchPanicked => Code::Internal, Error::Heed(e) => e.error_code(), Error::HeedTransaction(e) => e.error_code(), diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index f8080d23b..c20782068 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -3,13 +3,13 @@ use std::path::Path; use std::time::Duration; use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions}; -use meilisearch_types::milli::Index; +use meilisearch_types::milli::{Index, Result}; use time::OffsetDateTime; use uuid::Uuid; use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing}; use crate::lru::{InsertionOutcome, LruMap}; -use crate::{clamp_to_page_size, Result}; +use crate::{clamp_to_page_size}; /// Keep an internally consistent view of the open indexes in memory. /// diff --git a/crates/index-scheduler/src/index_mapper/mod.rs b/crates/index-scheduler/src/index_mapper/mod.rs index 3cccb5a69..500e4cf83 100644 --- a/crates/index-scheduler/src/index_mapper/mod.rs +++ b/crates/index-scheduler/src/index_mapper/mod.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use tracing::error; use uuid::Uuid; - +use meilisearch_types::milli; use self::index_map::IndexMap; use self::IndexStatus::{Available, BeingDeleted, Closing, Missing}; use crate::uuid_codec::UuidCodec; @@ -121,7 +121,7 @@ impl IndexStats { /// # Parameters /// /// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`. - pub fn new(index: &Index, rtxn: &RoTxn) -> Result { + pub fn new(index: &Index, rtxn: &RoTxn) -> milli::Result { Ok(IndexStats { number_of_documents: index.number_of_documents(rtxn)?, database_size: index.on_disk_size()?, @@ -189,7 +189,7 @@ impl IndexMapper { date, self.enable_mdb_writemap, self.index_base_map_size, - )?; + ).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?; wtxn.commit()?; @@ -357,7 +357,8 @@ impl IndexMapper { }; let index_path = self.base_path.join(uuid.to_string()); // take the lock to reopen the environment. - reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?; + reopen.reopen(&mut self.index_map.write().unwrap(), &index_path) + .map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?; continue; } BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), @@ -378,7 +379,7 @@ impl IndexMapper { None, self.enable_mdb_writemap, self.index_base_map_size, - )?; + ).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?; } Available(index) => break index, Closing(_) => { @@ -459,7 +460,7 @@ impl IndexMapper { None => { let index = self.index(rtxn, index_uid)?; let index_rtxn = index.read_txn()?; - IndexStats::new(&index, &index_rtxn) + IndexStats::new(&index, &index_rtxn).map_err(|e| Error::from_milli(e, Some(uuid.to_string()))) } } } diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index e0e2bfb75..3a2c532d4 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -1210,9 +1210,9 @@ impl IndexScheduler { tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks."); } // If we have an abortion error we must stop the tick here and re-schedule tasks. - Err(Error::Milli(milli::Error::InternalError( - milli::InternalError::AbortedIndexation, - ))) + Err(Error::Milli{ + error: milli::Error::InternalError(milli::InternalError::AbortedIndexation), .. + }) | Err(Error::AbortedTask) => { #[cfg(test)] self.breakpoint(Breakpoint::AbortedIndexation); @@ -1231,9 +1231,9 @@ impl IndexScheduler { // 2. close the associated environment // 3. resize it // 4. re-schedule tasks - Err(Error::Milli(milli::Error::UserError( - milli::UserError::MaxDatabaseSizeReached, - ))) if index_uid.is_some() => { + Err(Error::Milli { + error: milli::Error::UserError(milli::UserError::MaxDatabaseSizeReached), .. + }) if index_uid.is_some() => { // fixme: add index_uid to match to avoid the unwrap let index_uid = index_uid.unwrap(); // fixme: handle error more gracefully? not sure when this could happen @@ -1470,6 +1470,7 @@ impl IndexScheduler { // TODO: consider using a type alias or a struct embedder/template pub fn embedders( &self, + index_uid: String, embedding_configs: Vec, ) -> Result { let res: Result<_> = embedding_configs @@ -1481,7 +1482,10 @@ impl IndexScheduler { .. }| { let prompt = - Arc::new(prompt.try_into().map_err(meilisearch_types::milli::Error::from)?); + Arc::new(prompt.try_into() + .map_err(meilisearch_types::milli::Error::from) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))? + ); // optimistically return existing embedder { let embedders = self.embedders.read().unwrap(); @@ -1497,7 +1501,8 @@ impl IndexScheduler { let embedder = Arc::new( Embedder::new(embedder_options.clone()) .map_err(meilisearch_types::milli::vector::Error::from) - .map_err(meilisearch_types::milli::Error::from)?, + .map_err(meilisearch_types::milli::Error::from) + .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?, ); { let mut embedders = self.embedders.write().unwrap(); diff --git a/crates/meilisearch/src/error.rs b/crates/meilisearch/src/error.rs index 5c4ce171f..6e7283a18 100644 --- a/crates/meilisearch/src/error.rs +++ b/crates/meilisearch/src/error.rs @@ -7,6 +7,7 @@ use meilisearch_types::index_uid::{IndexUid, IndexUidFormatError}; use meilisearch_types::milli::OrderBy; use serde_json::Value; use tokio::task::JoinError; +use meilisearch_types::milli; #[derive(Debug, thiserror::Error)] pub enum MeilisearchHttpError { @@ -62,8 +63,11 @@ pub enum MeilisearchHttpError { HeedError(#[from] meilisearch_types::heed::Error), #[error(transparent)] IndexScheduler(#[from] index_scheduler::Error), - #[error(transparent)] - Milli(#[from] meilisearch_types::milli::Error), + #[error("{}", match .index_name { + Some(name) if !name.is_empty() => format!("Index `{}`: {error}", name), + _ => format!("{error}") + })] + Milli { error: meilisearch_types::milli::Error, index_name: Option }, #[error(transparent)] Payload(#[from] PayloadError), #[error(transparent)] @@ -76,6 +80,12 @@ pub enum MeilisearchHttpError { MissingSearchHybrid, } +impl MeilisearchHttpError { + pub(crate) fn from_milli(error: milli::Error, index_name: Option) -> Self { + Self::Milli { error, index_name } + } +} + impl ErrorCode for MeilisearchHttpError { fn error_code(&self) -> Code { match self { @@ -95,7 +105,7 @@ impl ErrorCode for MeilisearchHttpError { MeilisearchHttpError::SerdeJson(_) => Code::Internal, MeilisearchHttpError::HeedError(_) => Code::Internal, MeilisearchHttpError::IndexScheduler(e) => e.error_code(), - MeilisearchHttpError::Milli(e) => e.error_code(), + MeilisearchHttpError::Milli{error, ..} => error.error_code(), MeilisearchHttpError::Payload(e) => e.error_code(), MeilisearchHttpError::FileStore(_) => Code::Internal, MeilisearchHttpError::DocumentFormat(e) => e.error_code(), diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 633ad2776..779af63f2 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -395,6 +395,7 @@ fn import_dump( for index_reader in dump_reader.indexes()? { let mut index_reader = index_reader?; let metadata = index_reader.metadata(); + let uid = metadata.uid.clone(); tracing::info!("Importing index `{}`.", metadata.uid); let date = Some((metadata.created_at, metadata.updated_at)); @@ -432,7 +433,7 @@ fn import_dump( let reader = DocumentsBatchReader::from_reader(reader)?; let embedder_configs = index.embedding_configs(&wtxn)?; - let embedders = index_scheduler.embedders(embedder_configs)?; + let embedders = index_scheduler.embedders(uid, embedder_configs)?; let builder = milli::update::IndexDocuments::new( &mut wtxn, diff --git a/crates/meilisearch/src/routes/indexes/facet_search.rs b/crates/meilisearch/src/routes/indexes/facet_search.rs index 99a4a4f28..fc29d3406 100644 --- a/crates/meilisearch/src/routes/indexes/facet_search.rs +++ b/crates/meilisearch/src/routes/indexes/facet_search.rs @@ -185,7 +185,7 @@ pub async fn search( let index = index_scheduler.index(&index_uid)?; let features = index_scheduler.features(); - let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?; + let search_kind = search_kind(&search_query, &index_scheduler, index_uid.to_string(), &index, features)?; let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { perform_facet_search( diff --git a/crates/meilisearch/src/routes/indexes/mod.rs b/crates/meilisearch/src/routes/indexes/mod.rs index 7d073ec5f..1dda27a98 100644 --- a/crates/meilisearch/src/routes/indexes/mod.rs +++ b/crates/meilisearch/src/routes/indexes/mod.rs @@ -5,7 +5,7 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::{DeserializeError, Deserr, ValuePointerRef}; -use index_scheduler::IndexScheduler; +use index_scheduler::{Error, IndexScheduler}; use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::{immutable_field_error, DeserrJsonError, DeserrQueryParamError}; use meilisearch_types::error::deserr_codes::*; @@ -107,7 +107,7 @@ pub async fn list_indexes( if !filters.is_index_authorized(uid) { return Ok(None); } - Ok(Some(IndexView::new(uid.to_string(), index)?)) + Ok(Some(IndexView::new(uid.to_string(), index).map_err(|e| Error::from_milli(e, Some(uid.to_string())))?)) })?; // Won't cause to open all indexes because IndexView doesn't keep the `Index` opened. let indexes: Vec = indexes.into_iter().flatten().collect(); diff --git a/crates/meilisearch/src/routes/indexes/search.rs b/crates/meilisearch/src/routes/indexes/search.rs index 2f5cb4a36..609439b4a 100644 --- a/crates/meilisearch/src/routes/indexes/search.rs +++ b/crates/meilisearch/src/routes/indexes/search.rs @@ -243,11 +243,11 @@ pub async fn search_with_url_query( let index = index_scheduler.index(&index_uid)?; let features = index_scheduler.features(); - let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; + let search_kind = search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index, features)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?; let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { - perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) + perform_search(index_uid.to_string(), &index, query, search_kind, retrieve_vector, index_scheduler.features()) }) .await; permit.drop().await; @@ -287,12 +287,12 @@ pub async fn search_with_post( let features = index_scheduler.features(); - let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; + let search_kind = search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index, features)?; let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?; let permit = search_queue.try_get_search_permit().await?; let search_result = tokio::task::spawn_blocking(move || { - perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) + perform_search(index_uid.to_string(), &index, query, search_kind, retrieve_vectors, index_scheduler.features()) }) .await; permit.drop().await; @@ -314,6 +314,7 @@ pub async fn search_with_post( pub fn search_kind( query: &SearchQuery, index_scheduler: &IndexScheduler, + index_uid: String, index: &milli::Index, features: RoFeatures, ) -> Result { @@ -332,7 +333,7 @@ pub fn search_kind( (None, _, None) => Ok(SearchKind::KeywordOnly), // hybrid.semantic_ratio == 1.0 => vector (_, Some(HybridQuery { semantic_ratio, embedder }), v) if **semantic_ratio == 1.0 => { - SearchKind::semantic(index_scheduler, index, embedder, v.map(|v| v.len())) + SearchKind::semantic(index_scheduler, index_uid, index, embedder, v.map(|v| v.len())) } // hybrid.semantic_ratio == 0.0 => keyword (_, Some(HybridQuery { semantic_ratio, embedder: _ }), _) if **semantic_ratio == 0.0 => { @@ -340,13 +341,14 @@ pub fn search_kind( } // no query, hybrid, vector => semantic (None, Some(HybridQuery { semantic_ratio: _, embedder }), Some(v)) => { - SearchKind::semantic(index_scheduler, index, embedder, Some(v.len())) + SearchKind::semantic(index_scheduler, index_uid, index, embedder, Some(v.len())) } // query, no hybrid, no vector => keyword (Some(_), None, None) => Ok(SearchKind::KeywordOnly), // query, hybrid, maybe vector => hybrid (Some(_), Some(HybridQuery { semantic_ratio, embedder }), v) => SearchKind::hybrid( index_scheduler, + index_uid, index, embedder, **semantic_ratio, diff --git a/crates/meilisearch/src/routes/indexes/similar.rs b/crates/meilisearch/src/routes/indexes/similar.rs index 79f42f0aa..a0fccff52 100644 --- a/crates/meilisearch/src/routes/indexes/similar.rs +++ b/crates/meilisearch/src/routes/indexes/similar.rs @@ -104,7 +104,7 @@ async fn similar( let index = index_scheduler.index(&index_uid)?; let (embedder_name, embedder, quantized) = - SearchKind::embedder(&index_scheduler, &index, &query.embedder, None)?; + SearchKind::embedder(&index_scheduler, index_uid.to_string(), &index, &query.embedder, None)?; tokio::task::spawn_blocking(move || { perform_similar( diff --git a/crates/meilisearch/src/routes/multi_search.rs b/crates/meilisearch/src/routes/multi_search.rs index f8b1bc6ee..c4496e41c 100644 --- a/crates/meilisearch/src/routes/multi_search.rs +++ b/crates/meilisearch/src/routes/multi_search.rs @@ -125,14 +125,16 @@ pub async fn multi_search_with_post( }) .with_index(query_index)?; + let index_uid_str = index_uid.to_string(); + let search_kind = - search_kind(&query, index_scheduler.get_ref(), &index, features) + search_kind(&query, index_scheduler.get_ref(), index_uid_str.clone(), &index, features) .with_index(query_index)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features) .with_index(query_index)?; let search_result = tokio::task::spawn_blocking(move || { - perform_search(&index, query, search_kind, retrieve_vector, features) + perform_search(index_uid_str.clone(), &index, query, search_kind, retrieve_vector, features) }) .await .with_index(query_index)?; diff --git a/crates/meilisearch/src/search/federated.rs b/crates/meilisearch/src/search/federated.rs index 5279c26bb..5aae82c66 100644 --- a/crates/meilisearch/src/search/federated.rs +++ b/crates/meilisearch/src/search/federated.rs @@ -560,7 +560,7 @@ pub fn perform_federated_search( // use an immediately invoked lambda to capture the result without returning from the function let res: Result<(), ResponseError> = (|| { - let search_kind = search_kind(&query, index_scheduler, &index, features)?; + let search_kind = search_kind(&query, index_scheduler, index_uid.to_string(), &index, features)?; let canonicalization_kind = match (&search_kind, &query.q) { (SearchKind::SemanticOnly { .. }, _) => { @@ -636,7 +636,7 @@ pub fn perform_federated_search( search.offset(0); search.limit(required_hit_count); - let (result, _semantic_hit_count) = super::search_from_kind(search_kind, search)?; + let (result, _semantic_hit_count) = super::search_from_kind(index_uid.to_string(), search_kind, search)?; let format = AttributesFormat { attributes_to_retrieve: query.attributes_to_retrieve, retrieve_vectors, @@ -670,7 +670,8 @@ pub fn perform_federated_search( let formatter_builder = HitMaker::formatter_builder(matching_words, tokenizer); - let hit_maker = HitMaker::new(&index, &rtxn, format, formatter_builder)?; + let hit_maker = HitMaker::new(&index, &rtxn, format, formatter_builder) + .map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid.to_string())))?; results_by_query.push(SearchResultByQuery { federation_options, diff --git a/crates/meilisearch/src/search/mod.rs b/crates/meilisearch/src/search/mod.rs index 7832c1761..9c386eeb2 100644 --- a/crates/meilisearch/src/search/mod.rs +++ b/crates/meilisearch/src/search/mod.rs @@ -19,7 +19,7 @@ use meilisearch_types::locales::Locale; use meilisearch_types::milli::score_details::{ScoreDetails, ScoringStrategy}; use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors; use meilisearch_types::milli::vector::Embedder; -use meilisearch_types::milli::{FacetValueHit, OrderBy, SearchForFacetValues, TimeBudget}; +use meilisearch_types::milli::{FacetValueHit, InternalError, OrderBy, SearchForFacetValues, TimeBudget}; use meilisearch_types::settings::DEFAULT_PAGINATION_MAX_TOTAL_HITS; use meilisearch_types::{milli, Document}; use milli::tokenizer::{Language, TokenizerBuilder}; @@ -281,35 +281,38 @@ pub enum SearchKind { impl SearchKind { pub(crate) fn semantic( index_scheduler: &index_scheduler::IndexScheduler, + index_uid: String, index: &Index, embedder_name: &str, vector_len: Option, ) -> Result { let (embedder_name, embedder, quantized) = - Self::embedder(index_scheduler, index, embedder_name, vector_len)?; + Self::embedder(index_scheduler, index_uid, index, embedder_name, vector_len)?; Ok(Self::SemanticOnly { embedder_name, embedder, quantized }) } pub(crate) fn hybrid( index_scheduler: &index_scheduler::IndexScheduler, + index_uid: String, index: &Index, embedder_name: &str, semantic_ratio: f32, vector_len: Option, ) -> Result { let (embedder_name, embedder, quantized) = - Self::embedder(index_scheduler, index, embedder_name, vector_len)?; + Self::embedder(index_scheduler, index_uid, index, embedder_name, vector_len)?; Ok(Self::Hybrid { embedder_name, embedder, quantized, semantic_ratio }) } pub(crate) fn embedder( index_scheduler: &index_scheduler::IndexScheduler, + index_uid: String, index: &Index, embedder_name: &str, vector_len: Option, ) -> Result<(String, Arc, bool), ResponseError> { let embedder_configs = index.embedding_configs(&index.read_txn()?)?; - let embedders = index_scheduler.embedders(embedder_configs)?; + let embedders = index_scheduler.embedders(index_uid, embedder_configs)?; let (embedder, _, quantized) = embedders .get(embedder_name) @@ -888,6 +891,7 @@ fn prepare_search<'t>( } pub fn perform_search( + index_uid: String, index: &Index, query: SearchQuery, search_kind: SearchKind, @@ -914,7 +918,7 @@ pub fn perform_search( used_negative_operator, }, semantic_hit_count, - ) = search_from_kind(search_kind, search)?; + ) = search_from_kind(index_uid, search_kind, search)?; let SearchQuery { q, @@ -1067,17 +1071,24 @@ fn compute_facet_distribution_stats>( } pub fn search_from_kind( + index_uid: String, search_kind: SearchKind, search: milli::Search<'_>, ) -> Result<(milli::SearchResult, Option), MeilisearchHttpError> { let (milli_result, semantic_hit_count) = match &search_kind { - SearchKind::KeywordOnly => (search.execute()?, None), + SearchKind::KeywordOnly => { + let results = search.execute() + .map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid.to_string())))?; + (results, None) + }, SearchKind::SemanticOnly { .. } => { - let results = search.execute()?; + let results = search.execute() + .map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid.to_string())))?; let semantic_hit_count = results.document_scores.len() as u32; (results, Some(semantic_hit_count)) } - SearchKind::Hybrid { semantic_ratio, .. } => search.execute_hybrid(*semantic_ratio)?, + SearchKind::Hybrid { semantic_ratio, .. } => search.execute_hybrid(*semantic_ratio) + .map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid)))?, }; Ok((milli_result, semantic_hit_count)) } @@ -1179,7 +1190,7 @@ impl<'a> HitMaker<'a> { rtxn: &'a RoTxn<'a>, format: AttributesFormat, mut formatter_builder: MatcherBuilder<'a>, - ) -> Result { + ) -> milli::Result { formatter_builder.crop_marker(format.crop_marker); formatter_builder.highlight_prefix(format.highlight_pre_tag); formatter_builder.highlight_suffix(format.highlight_post_tag); @@ -1278,7 +1289,7 @@ impl<'a> HitMaker<'a> { &self, id: u32, score: &[ScoreDetails], - ) -> Result { + ) -> milli::Result { let (_, obkv) = self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?; @@ -1321,7 +1332,7 @@ impl<'a> HitMaker<'a> { .is_some_and(|conf| conf.user_provided.contains(id)); let embeddings = ExplicitVectors { embeddings: Some(vector.into()), regenerate: !user_provided }; - vectors.insert(name, serde_json::to_value(embeddings)?); + vectors.insert(name, serde_json::to_value(embeddings).map_err(InternalError::SerdeJson)?); } document.insert("_vectors".into(), vectors.into()); } @@ -1367,7 +1378,7 @@ fn make_hits<'a>( format: AttributesFormat, matching_words: milli::MatchingWords, documents_ids_scores: impl Iterator)> + 'a, -) -> Result, MeilisearchHttpError> { +) -> milli::Result> { let mut documents = Vec::new(); let dictionary = index.dictionary(rtxn)?; @@ -1688,12 +1699,12 @@ fn make_document( displayed_attributes: &BTreeSet, field_ids_map: &FieldsIdsMap, obkv: obkv::KvReaderU16, -) -> Result { +) -> milli::Result { let mut document = serde_json::Map::new(); // recreate the original json for (key, value) in obkv.iter() { - let value = serde_json::from_slice(value)?; + let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?; let key = field_ids_map.name(key).expect("Missing field name").to_string(); document.insert(key, value); @@ -1718,7 +1729,7 @@ fn format_fields( displayable_ids: &BTreeSet, locales: Option<&[Language]>, localized_attributes: &[LocalizedAttributesRule], -) -> Result<(Option, Document), MeilisearchHttpError> { +) -> milli::Result<(Option, Document)> { let mut matches_position = compute_matches.then(BTreeMap::new); let mut document = document.clone(); @@ -1926,7 +1937,7 @@ fn parse_filter_array(arr: &[Value]) -> Result, MeilisearchHttpEr } } - Ok(Filter::from_array(ands)?) + Ok(Filter::from_array(ands).map_err(|e|MeilisearchHttpError::from_milli(e,None))?) } #[cfg(test)]