attach index name in errors

This commit is contained in:
airycanon 2024-11-14 15:59:52 +08:00
parent 94fb55bb6f
commit a476f1aeb6
14 changed files with 210 additions and 134 deletions

View File

@ -448,20 +448,20 @@ impl IndexScheduler {
match (document_import, settings) { match (document_import, settings) {
( (
Some(Batch::IndexOperation { Some(Batch::IndexOperation {
op: op:
IndexOperation::DocumentOperation { IndexOperation::DocumentOperation {
primary_key, primary_key,
documents_counts, documents_counts,
operations, operations,
tasks: document_import_tasks, tasks: document_import_tasks,
.. ..
}, },
.. ..
}), }),
Some(Batch::IndexOperation { Some(Batch::IndexOperation {
op: IndexOperation::Settings { settings, tasks: settings_tasks, .. }, op: IndexOperation::Settings { settings, tasks: settings_tasks, .. },
.. ..
}), }),
) => Ok(Some(Batch::IndexOperation { ) => Ok(Some(Batch::IndexOperation {
op: IndexOperation::SettingsAndDocumentOperation { op: IndexOperation::SettingsAndDocumentOperation {
index_uid, index_uid,
@ -618,7 +618,12 @@ impl IndexScheduler {
/// The list of tasks that were processed. The metadata of each task in the returned /// The list of tasks that were processed. The metadata of each task in the returned
/// list is updated accordingly, with the exception of the its date fields /// list is updated accordingly, with the exception of the its date fields
/// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at).
#[tracing::instrument(level = "trace", skip(self, batch), target = "indexing::scheduler", fields(batch=batch.to_string()))] #[tracing::instrument(
level = "trace",
skip(self, batch),
target = "indexing::scheduler",
fields(batch=batch.to_string())
)]
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> { pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
#[cfg(test)] #[cfg(test)]
{ {
@ -627,6 +632,8 @@ impl IndexScheduler {
self.breakpoint(crate::Breakpoint::InsideProcessBatch); self.breakpoint(crate::Breakpoint::InsideProcessBatch);
} }
let index_uid = batch.index_uid().map(String::from);
match batch { match batch {
Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => { Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
@ -649,10 +656,10 @@ impl IndexScheduler {
task.status = Status::Succeeded; task.status = Status::Succeeded;
match &mut task.details { match &mut task.details {
Some(Details::TaskCancelation { Some(Details::TaskCancelation {
matched_tasks: _, matched_tasks: _,
canceled_tasks, canceled_tasks,
original_filter: _, original_filter: _,
}) => { }) => {
*canceled_tasks = Some(canceled_tasks_content_uuids.len() as u64); *canceled_tasks = Some(canceled_tasks_content_uuids.len() as u64);
} }
_ => unreachable!(), _ => unreachable!(),
@ -706,10 +713,10 @@ impl IndexScheduler {
match &mut task.details { match &mut task.details {
Some(Details::TaskDeletion { Some(Details::TaskDeletion {
matched_tasks: _, matched_tasks: _,
deleted_tasks, deleted_tasks,
original_filter: _, original_filter: _,
}) => { }) => {
*deleted_tasks = Some(deleted_tasks_count); *deleted_tasks = Some(deleted_tasks_count);
} }
_ => unreachable!(), _ => unreachable!(),
@ -765,7 +772,8 @@ impl IndexScheduler {
let index = self.index_mapper.index(&rtxn, name)?; let index = self.index_mapper.index(&rtxn, name)?;
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
index.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; index.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
} }
drop(rtxn); drop(rtxn);
@ -843,7 +851,6 @@ impl IndexScheduler {
let (_, mut t) = ret?; let (_, mut t) = ret?;
let status = t.status; let status = t.status;
let content_file = t.content_uuid(); let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished // In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely. // to not loop over ourselves indefinitely.
if t.uid == task.uid { if t.uid == task.uid {
@ -867,18 +874,24 @@ impl IndexScheduler {
let content_file = self.file_store.get_update(content_file)?; let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file) let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?; .map_err(|e| Error::Milli {
error: e.into(),
index_name: index_uid.clone(),
})?;
let (mut cursor, documents_batch_index) = let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index(); reader.into_cursor_and_fields_index();
while let Some(doc) = while let Some(doc) = cursor.next_document()
cursor.next_document().map_err(milli::Error::from)? .map_err(|e| Error::Milli {
error: e.into(),
index_name: index_uid.clone(),
})?
{ {
dump_content_file.push_document(&obkv_to_object( dump_content_file.push_document(&obkv_to_object(
&doc, &doc,
&documents_batch_index, &documents_batch_index,
)?)?; ).map_err(|e| Error::from_milli(e, index_uid.clone()))?)?;
} }
dump_content_file.flush()?; dump_content_file.flush()?;
} }
@ -892,27 +905,35 @@ impl IndexScheduler {
let metadata = IndexMetadata { let metadata = IndexMetadata {
uid: uid.to_owned(), uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from), primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?, created_at: index.created_at(&rtxn)
updated_at: index.updated_at(&rtxn)?, .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?,
updated_at: index.updated_at(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?,
}; };
let mut index_dumper = dump.create_index(uid, &metadata)?; let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?; let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index.embedding_configs(&rtxn)?; let embedding_configs = index.embedding_configs(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
let documents = index.all_documents(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// 3.1. Dump the documents // 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? { for ret in documents {
if self.must_stop_processing.get() { if self.must_stop_processing.get() {
return Err(Error::AbortedTask); return Err(Error::AbortedTask);
} }
let (id, doc) = ret?; let (id, doc) = ret
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
'inject_vectors: { 'inject_vectors: {
let embeddings = index.embeddings(&rtxn, id)?; let embeddings = index.embeddings(&rtxn, id)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
if embeddings.is_empty() { if embeddings.is_empty() {
break 'inject_vectors; break 'inject_vectors;
@ -923,7 +944,7 @@ impl IndexScheduler {
.or_insert(serde_json::Value::Object(Default::default())); .or_insert(serde_json::Value::Object(Default::default()));
let serde_json::Value::Object(vectors) = vectors else { let serde_json::Value::Object(vectors) = vectors else {
return Err(milli::Error::UserError( return Err(Error::from_milli(milli::Error::UserError(
milli::UserError::InvalidVectorsMapType { milli::UserError::InvalidVectorsMapType {
document_id: { document_id: {
if let Ok(Some(Ok(index))) = index if let Ok(Some(Ok(index))) = index
@ -937,8 +958,7 @@ impl IndexScheduler {
}, },
value: vectors.clone(), value: vectors.clone(),
}, },
) ), Some(uid.to_string())));
.into());
}; };
for (embedder_name, embeddings) in embeddings { for (embedder_name, embeddings) in embeddings {
@ -968,7 +988,7 @@ impl IndexScheduler {
index, index,
&rtxn, &rtxn,
meilisearch_types::settings::SecretPolicy::RevealSecrets, meilisearch_types::settings::SecretPolicy::RevealSecrets,
)?; ).map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
index_dumper.settings(&settings)?; index_dumper.settings(&settings)?;
Ok(()) Ok(())
})?; })?;
@ -1018,7 +1038,8 @@ impl IndexScheduler {
// the entire batch. // the entire batch.
let res = || -> Result<()> { let res = || -> Result<()> {
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?; let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
wtxn.commit()?; wtxn.commit()?;
@ -1060,7 +1081,7 @@ impl IndexScheduler {
builder.execute( builder.execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
index_wtxn.commit()?; index_wtxn.commit()?;
} }
@ -1077,7 +1098,8 @@ impl IndexScheduler {
let res = || -> Result<()> { let res = || -> Result<()> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?; let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
wtxn.commit()?; wtxn.commit()?;
Ok(()) Ok(())
@ -1100,9 +1122,11 @@ impl IndexScheduler {
let number_of_documents = || -> Result<u64> { let number_of_documents = || -> Result<u64> {
let index = self.index_mapper.index(&wtxn, &index_uid)?; let index = self.index_mapper.index(&wtxn, &index_uid)?;
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
Ok(index.number_of_documents(&index_rtxn)?) Ok(index.number_of_documents(&index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?
)
}() }()
.unwrap_or_default(); .unwrap_or_default();
// The write transaction is directly owned and committed inside. // The write transaction is directly owned and committed inside.
match self.index_mapper.delete_index(wtxn, &index_uid) { match self.index_mapper.delete_index(wtxn, &index_uid) {
@ -1219,8 +1243,9 @@ impl IndexScheduler {
operation: IndexOperation, operation: IndexOperation,
) -> Result<Vec<Task>> { ) -> Result<Vec<Task>> {
match operation { match operation {
IndexOperation::DocumentClear { mut tasks, .. } => { IndexOperation::DocumentClear { index_uid, mut tasks, .. } => {
let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?; let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()
.map_err(|e| Error::from_milli(e, Some(index_uid)))?;
let mut first_clear_found = false; let mut first_clear_found = false;
for task in &mut tasks { for task in &mut tasks {
@ -1240,7 +1265,7 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentOperation { IndexOperation::DocumentOperation {
index_uid: _, index_uid,
primary_key, primary_key,
method, method,
documents_counts: _, documents_counts: _,
@ -1258,10 +1283,11 @@ impl IndexScheduler {
// but to a different value, we can make the whole batch fail. // but to a different value, we can make the whole batch fail.
Some(pk) => { Some(pk) => {
if primary_key != pk { if primary_key != pk {
return Err(milli::Error::from( return Err(Error::from_milli(
milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()), milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()).into(),
Some(index_uid.clone()),
) )
.into()); .into());
} }
} }
// if the primary key was set and there was no primary key set for this index // if the primary key was set and there was no primary key set for this index
@ -1273,7 +1299,7 @@ impl IndexScheduler {
builder.execute( builder.execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.clone().get(), || must_stop_processing.clone().get(),
)?; ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
primary_key_has_been_set = true; primary_key_has_been_set = true;
} }
} }
@ -1281,9 +1307,10 @@ impl IndexScheduler {
let config = IndexDocumentsConfig { update_method: method, ..Default::default() }; let config = IndexDocumentsConfig { update_method: method, ..Default::default() };
let embedder_configs = index.embedding_configs(index_wtxn)?; let embedder_configs = index.embedding_configs(index_wtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
// TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense) // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense)
let embedders = self.embedders(embedder_configs)?; let embedders = self.embedders(index_uid.clone(), embedder_configs)?;
let mut builder = milli::update::IndexDocuments::new( let mut builder = milli::update::IndexDocuments::new(
index_wtxn, index_wtxn,
@ -1292,24 +1319,25 @@ impl IndexScheduler {
config, config,
|indexing_step| tracing::trace!(?indexing_step, "Update"), |indexing_step| tracing::trace!(?indexing_step, "Update"),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
match operation { match operation {
DocumentOperation::Add(content_uuid) => { DocumentOperation::Add(content_uuid) => {
let content_file = self.file_store.get_update(content_uuid)?; let content_file = self.file_store.get_update(content_uuid)?;
let reader = DocumentsBatchReader::from_reader(content_file) let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?; .map_err(|e| Error::from_milli(e.into(), Some(index_uid.clone())))?;
let (new_builder, user_result) = builder.add_documents(reader)?; let (new_builder, user_result) = builder.add_documents(reader)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
builder = new_builder; builder = new_builder;
builder = builder.with_embedders(embedders.clone()); builder = builder.with_embedders(embedders.clone());
let received_documents = let received_documents =
if let Some(Details::DocumentAdditionOrUpdate { if let Some(Details::DocumentAdditionOrUpdate {
received_documents, received_documents,
.. ..
}) = task.details }) = task.details
{ {
received_documents received_documents
} else { } else {
@ -1337,7 +1365,8 @@ impl IndexScheduler {
} }
DocumentOperation::Delete(document_ids) => { DocumentOperation::Delete(document_ids) => {
let (new_builder, user_result) = let (new_builder, user_result) =
builder.remove_documents(document_ids)?; builder.remove_documents(document_ids)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
builder = new_builder; builder = new_builder;
// Uses Invariant: remove documents actually always returns Ok for the inner result // Uses Invariant: remove documents actually always returns Ok for the inner result
let count = user_result.unwrap(); let count = user_result.unwrap();
@ -1361,7 +1390,7 @@ impl IndexScheduler {
} }
if !tasks.iter().all(|res| res.error.is_some()) { if !tasks.iter().all(|res| res.error.is_some()) {
let addition = builder.execute()?; let addition = builder.execute().map_err(|e| Error::from_milli(e, Some(index_uid)))?;
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} else if primary_key_has_been_set { } else if primary_key_has_been_set {
// Everything failed but we've set a primary key. // Everything failed but we've set a primary key.
@ -1372,12 +1401,12 @@ impl IndexScheduler {
builder.execute( builder.execute(
|indexing_step| tracing::trace!(update = ?indexing_step), |indexing_step| tracing::trace!(update = ?indexing_step),
|| must_stop_processing.clone().get(), || must_stop_processing.clone().get(),
)?; ).map_err(|e| Error::from_milli(e, Some(index_uid)))?;
} }
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentEdition { mut task, .. } => { IndexOperation::DocumentEdition { index_uid, mut task } => {
let (filter, context, function) = let (filter, context, function) =
if let KindWithContent::DocumentEdition { if let KindWithContent::DocumentEdition {
filter_expr, context, function, .. filter_expr, context, function, ..
@ -1395,6 +1424,7 @@ impl IndexScheduler {
self.index_mapper.indexer_config(), self.index_mapper.indexer_config(),
self.must_stop_processing.clone(), self.must_stop_processing.clone(),
index, index,
index_uid
); );
let (original_filter, context, function) = if let Some(Details::DocumentEdition { let (original_filter, context, function) = if let Some(Details::DocumentEdition {
original_filter, original_filter,
@ -1435,7 +1465,7 @@ impl IndexScheduler {
Ok(vec![task]) Ok(vec![task])
} }
IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => { IndexOperation::DocumentDeletion { mut tasks, index_uid } => {
let mut to_delete = RoaringBitmap::new(); let mut to_delete = RoaringBitmap::new();
let external_documents_ids = index.external_documents_ids(); let external_documents_ids = index.external_documents_ids();
@ -1456,7 +1486,7 @@ impl IndexScheduler {
deleted_documents: Some(will_be_removed), deleted_documents: Some(will_be_removed),
}); });
} }
KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => { KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } => {
let before = to_delete.len(); let before = to_delete.len();
let filter = match Filter::from_json(filter_expr) { let filter = match Filter::from_json(filter_expr) {
Ok(filter) => filter, Ok(filter) => filter,
@ -1467,7 +1497,7 @@ impl IndexScheduler {
milli::Error::UserError( milli::Error::UserError(
milli::UserError::InvalidFilterExpression { .. }, milli::UserError::InvalidFilterExpression { .. },
) => Some( ) => Some(
Error::from(err) Error::from_milli(err, Some(index_uid.clone()))
.with_custom_error_code(Code::InvalidDocumentFilter) .with_custom_error_code(Code::InvalidDocumentFilter)
.into(), .into(),
), ),
@ -1481,9 +1511,9 @@ impl IndexScheduler {
filter.evaluate(index_wtxn, index).map_err(|err| match err { filter.evaluate(index_wtxn, index).map_err(|err| match err {
milli::Error::UserError( milli::Error::UserError(
milli::UserError::InvalidFilter(_), milli::UserError::InvalidFilter(_),
) => Error::from(err) ) => Error::from_milli(err, Some(index_uid.clone()))
.with_custom_error_code(Code::InvalidDocumentFilter), .with_custom_error_code(Code::InvalidDocumentFilter),
e => e.into(), e => Error::from_milli(e, Some(index_uid.clone())),
}); });
match candidates { match candidates {
Ok(candidates) => to_delete |= candidates, Ok(candidates) => to_delete |= candidates,
@ -1522,17 +1552,18 @@ impl IndexScheduler {
config, config,
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
let (new_builder, _count) = let (new_builder, _count) =
builder.remove_documents_from_db_no_batch(&to_delete)?; builder.remove_documents_from_db_no_batch(&to_delete)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
builder = new_builder; builder = new_builder;
let _ = builder.execute()?; let _ = builder.execute().map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
Ok(tasks) Ok(tasks)
} }
IndexOperation::Settings { index_uid: _, settings, mut tasks } => { IndexOperation::Settings { index_uid, settings, mut tasks } => {
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config);
@ -1550,7 +1581,7 @@ impl IndexScheduler {
builder.execute( builder.execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; ).map_err(|e| Error::from_milli(e, Some(index_uid)))?;
Ok(tasks) Ok(tasks)
} }
@ -1742,16 +1773,17 @@ fn edit_documents_by_function<'a>(
indexer_config: &IndexerConfig, indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing, must_stop_processing: MustStopProcessing,
index: &'a Index, index: &'a Index,
index_uid: String,
) -> Result<(u64, u64)> { ) -> Result<(u64, u64)> {
let candidates = match filter.as_ref().map(Filter::from_json) { let candidates = match filter.as_ref().map(Filter::from_json) {
Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err { Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) Error::from_milli(err, Some(index_uid.clone())).with_custom_error_code(Code::InvalidDocumentFilter)
} }
e => e.into(), e => Error::from_milli(e.into(), Some(index_uid.clone())),
})?, })?,
None | Some(Ok(None)) => index.documents_ids(wtxn)?, None | Some(Ok(None)) => index.documents_ids(wtxn)?,
Some(Err(e)) => return Err(e.into()), Some(Err(e)) => return Err(Error::from_milli(e.into(), Some(index_uid.clone()))),
}; };
let config = IndexDocumentsConfig { let config = IndexDocumentsConfig {
@ -1766,11 +1798,15 @@ fn edit_documents_by_function<'a>(
config, config,
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; ).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
let (new_builder, count) = builder.edit_documents(&candidates, context, code)?; let (new_builder, count) = builder.edit_documents(
&candidates,
context,
code
).map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
builder = new_builder; builder = new_builder;
let _ = builder.execute()?; let _ = builder.execute().map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
Ok(count.unwrap()) Ok(count.unwrap())
} }

View File

@ -117,8 +117,11 @@ pub enum Error {
Dump(#[from] dump::Error), Dump(#[from] dump::Error),
#[error(transparent)] #[error(transparent)]
Heed(#[from] heed::Error), Heed(#[from] heed::Error),
#[error(transparent)] #[error("{}", match .index_name {
Milli(#[from] milli::Error), Some(name) if !name.is_empty() => format!("Index `{}`: {error}", name),
_ => format!("{error}")
})]
Milli { error: milli::Error, index_name: Option<String> },
#[error("An unexpected crash occurred when processing the task.")] #[error("An unexpected crash occurred when processing the task.")]
ProcessBatchPanicked, ProcessBatchPanicked,
#[error(transparent)] #[error(transparent)]
@ -183,7 +186,7 @@ impl Error {
| Error::AbortedTask | Error::AbortedTask
| Error::Dump(_) | Error::Dump(_)
| Error::Heed(_) | Error::Heed(_)
| Error::Milli(_) | Error::Milli { .. }
| Error::ProcessBatchPanicked | Error::ProcessBatchPanicked
| Error::FileStore(_) | Error::FileStore(_)
| Error::IoError(_) | Error::IoError(_)
@ -202,6 +205,10 @@ impl Error {
pub fn with_custom_error_code(self, code: Code) -> Self { pub fn with_custom_error_code(self, code: Code) -> Self {
Self::WithCustomErrorCode(code, Box::new(self)) Self::WithCustomErrorCode(code, Box::new(self))
} }
pub fn from_milli(error: milli::Error, index_name: Option<String>) -> Self {
Self::Milli { error, index_name }
}
} }
impl ErrorCode for Error { impl ErrorCode for Error {
@ -227,7 +234,7 @@ impl ErrorCode for Error {
// TODO: not sure of the Code to use // TODO: not sure of the Code to use
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(), Error::Milli { error, .. } => error.error_code(),
Error::ProcessBatchPanicked => Code::Internal, Error::ProcessBatchPanicked => Code::Internal,
Error::Heed(e) => e.error_code(), Error::Heed(e) => e.error_code(),
Error::HeedTransaction(e) => e.error_code(), Error::HeedTransaction(e) => e.error_code(),

View File

@ -3,13 +3,13 @@ use std::path::Path;
use std::time::Duration; use std::time::Duration;
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions}; use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
use meilisearch_types::milli::Index; use meilisearch_types::milli::{Index, Result};
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing}; use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
use crate::lru::{InsertionOutcome, LruMap}; use crate::lru::{InsertionOutcome, LruMap};
use crate::{clamp_to_page_size, Result}; use crate::{clamp_to_page_size};
/// Keep an internally consistent view of the open indexes in memory. /// Keep an internally consistent view of the open indexes in memory.
/// ///

View File

@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use tracing::error; use tracing::error;
use uuid::Uuid; use uuid::Uuid;
use meilisearch_types::milli;
use self::index_map::IndexMap; use self::index_map::IndexMap;
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing}; use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
use crate::uuid_codec::UuidCodec; use crate::uuid_codec::UuidCodec;
@ -121,7 +121,7 @@ impl IndexStats {
/// # Parameters /// # Parameters
/// ///
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`. /// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
pub fn new(index: &Index, rtxn: &RoTxn) -> Result<Self> { pub fn new(index: &Index, rtxn: &RoTxn) -> milli::Result<Self> {
Ok(IndexStats { Ok(IndexStats {
number_of_documents: index.number_of_documents(rtxn)?, number_of_documents: index.number_of_documents(rtxn)?,
database_size: index.on_disk_size()?, database_size: index.on_disk_size()?,
@ -189,7 +189,7 @@ impl IndexMapper {
date, date,
self.enable_mdb_writemap, self.enable_mdb_writemap,
self.index_base_map_size, self.index_base_map_size,
)?; ).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
wtxn.commit()?; wtxn.commit()?;
@ -357,7 +357,8 @@ impl IndexMapper {
}; };
let index_path = self.base_path.join(uuid.to_string()); let index_path = self.base_path.join(uuid.to_string());
// take the lock to reopen the environment. // take the lock to reopen the environment.
reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?; reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)
.map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
continue; continue;
} }
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
@ -378,7 +379,7 @@ impl IndexMapper {
None, None,
self.enable_mdb_writemap, self.enable_mdb_writemap,
self.index_base_map_size, self.index_base_map_size,
)?; ).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
} }
Available(index) => break index, Available(index) => break index,
Closing(_) => { Closing(_) => {
@ -459,7 +460,7 @@ impl IndexMapper {
None => { None => {
let index = self.index(rtxn, index_uid)?; let index = self.index(rtxn, index_uid)?;
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
IndexStats::new(&index, &index_rtxn) IndexStats::new(&index, &index_rtxn).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))
} }
} }
} }

View File

@ -1210,9 +1210,9 @@ impl IndexScheduler {
tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks."); tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks.");
} }
// If we have an abortion error we must stop the tick here and re-schedule tasks. // If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError( Err(Error::Milli{
milli::InternalError::AbortedIndexation, error: milli::Error::InternalError(milli::InternalError::AbortedIndexation), ..
))) })
| Err(Error::AbortedTask) => { | Err(Error::AbortedTask) => {
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::AbortedIndexation); self.breakpoint(Breakpoint::AbortedIndexation);
@ -1231,9 +1231,9 @@ impl IndexScheduler {
// 2. close the associated environment // 2. close the associated environment
// 3. resize it // 3. resize it
// 4. re-schedule tasks // 4. re-schedule tasks
Err(Error::Milli(milli::Error::UserError( Err(Error::Milli {
milli::UserError::MaxDatabaseSizeReached, error: milli::Error::UserError(milli::UserError::MaxDatabaseSizeReached), ..
))) if index_uid.is_some() => { }) if index_uid.is_some() => {
// fixme: add index_uid to match to avoid the unwrap // fixme: add index_uid to match to avoid the unwrap
let index_uid = index_uid.unwrap(); let index_uid = index_uid.unwrap();
// fixme: handle error more gracefully? not sure when this could happen // fixme: handle error more gracefully? not sure when this could happen
@ -1470,6 +1470,7 @@ impl IndexScheduler {
// TODO: consider using a type alias or a struct embedder/template // TODO: consider using a type alias or a struct embedder/template
pub fn embedders( pub fn embedders(
&self, &self,
index_uid: String,
embedding_configs: Vec<IndexEmbeddingConfig>, embedding_configs: Vec<IndexEmbeddingConfig>,
) -> Result<EmbeddingConfigs> { ) -> Result<EmbeddingConfigs> {
let res: Result<_> = embedding_configs let res: Result<_> = embedding_configs
@ -1481,7 +1482,10 @@ impl IndexScheduler {
.. ..
}| { }| {
let prompt = let prompt =
Arc::new(prompt.try_into().map_err(meilisearch_types::milli::Error::from)?); Arc::new(prompt.try_into()
.map_err(meilisearch_types::milli::Error::from)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?
);
// optimistically return existing embedder // optimistically return existing embedder
{ {
let embedders = self.embedders.read().unwrap(); let embedders = self.embedders.read().unwrap();
@ -1497,7 +1501,8 @@ impl IndexScheduler {
let embedder = Arc::new( let embedder = Arc::new(
Embedder::new(embedder_options.clone()) Embedder::new(embedder_options.clone())
.map_err(meilisearch_types::milli::vector::Error::from) .map_err(meilisearch_types::milli::vector::Error::from)
.map_err(meilisearch_types::milli::Error::from)?, .map_err(meilisearch_types::milli::Error::from)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
); );
{ {
let mut embedders = self.embedders.write().unwrap(); let mut embedders = self.embedders.write().unwrap();

View File

@ -7,6 +7,7 @@ use meilisearch_types::index_uid::{IndexUid, IndexUidFormatError};
use meilisearch_types::milli::OrderBy; use meilisearch_types::milli::OrderBy;
use serde_json::Value; use serde_json::Value;
use tokio::task::JoinError; use tokio::task::JoinError;
use meilisearch_types::milli;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum MeilisearchHttpError { pub enum MeilisearchHttpError {
@ -62,8 +63,11 @@ pub enum MeilisearchHttpError {
HeedError(#[from] meilisearch_types::heed::Error), HeedError(#[from] meilisearch_types::heed::Error),
#[error(transparent)] #[error(transparent)]
IndexScheduler(#[from] index_scheduler::Error), IndexScheduler(#[from] index_scheduler::Error),
#[error(transparent)] #[error("{}", match .index_name {
Milli(#[from] meilisearch_types::milli::Error), Some(name) if !name.is_empty() => format!("Index `{}`: {error}", name),
_ => format!("{error}")
})]
Milli { error: meilisearch_types::milli::Error, index_name: Option<String> },
#[error(transparent)] #[error(transparent)]
Payload(#[from] PayloadError), Payload(#[from] PayloadError),
#[error(transparent)] #[error(transparent)]
@ -76,6 +80,12 @@ pub enum MeilisearchHttpError {
MissingSearchHybrid, MissingSearchHybrid,
} }
impl MeilisearchHttpError {
pub(crate) fn from_milli(error: milli::Error, index_name: Option<String>) -> Self {
Self::Milli { error, index_name }
}
}
impl ErrorCode for MeilisearchHttpError { impl ErrorCode for MeilisearchHttpError {
fn error_code(&self) -> Code { fn error_code(&self) -> Code {
match self { match self {
@ -95,7 +105,7 @@ impl ErrorCode for MeilisearchHttpError {
MeilisearchHttpError::SerdeJson(_) => Code::Internal, MeilisearchHttpError::SerdeJson(_) => Code::Internal,
MeilisearchHttpError::HeedError(_) => Code::Internal, MeilisearchHttpError::HeedError(_) => Code::Internal,
MeilisearchHttpError::IndexScheduler(e) => e.error_code(), MeilisearchHttpError::IndexScheduler(e) => e.error_code(),
MeilisearchHttpError::Milli(e) => e.error_code(), MeilisearchHttpError::Milli{error, ..} => error.error_code(),
MeilisearchHttpError::Payload(e) => e.error_code(), MeilisearchHttpError::Payload(e) => e.error_code(),
MeilisearchHttpError::FileStore(_) => Code::Internal, MeilisearchHttpError::FileStore(_) => Code::Internal,
MeilisearchHttpError::DocumentFormat(e) => e.error_code(), MeilisearchHttpError::DocumentFormat(e) => e.error_code(),

View File

@ -395,6 +395,7 @@ fn import_dump(
for index_reader in dump_reader.indexes()? { for index_reader in dump_reader.indexes()? {
let mut index_reader = index_reader?; let mut index_reader = index_reader?;
let metadata = index_reader.metadata(); let metadata = index_reader.metadata();
let uid = metadata.uid.clone();
tracing::info!("Importing index `{}`.", metadata.uid); tracing::info!("Importing index `{}`.", metadata.uid);
let date = Some((metadata.created_at, metadata.updated_at)); let date = Some((metadata.created_at, metadata.updated_at));
@ -432,7 +433,7 @@ fn import_dump(
let reader = DocumentsBatchReader::from_reader(reader)?; let reader = DocumentsBatchReader::from_reader(reader)?;
let embedder_configs = index.embedding_configs(&wtxn)?; let embedder_configs = index.embedding_configs(&wtxn)?;
let embedders = index_scheduler.embedders(embedder_configs)?; let embedders = index_scheduler.embedders(uid, embedder_configs)?;
let builder = milli::update::IndexDocuments::new( let builder = milli::update::IndexDocuments::new(
&mut wtxn, &mut wtxn,

View File

@ -185,7 +185,7 @@ pub async fn search(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?; let search_kind = search_kind(&search_query, &index_scheduler, index_uid.to_string(), &index, features)?;
let permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search( perform_facet_search(

View File

@ -5,7 +5,7 @@ use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::actix_web::{AwebJson, AwebQueryParameter};
use deserr::{DeserializeError, Deserr, ValuePointerRef}; use deserr::{DeserializeError, Deserr, ValuePointerRef};
use index_scheduler::IndexScheduler; use index_scheduler::{Error, IndexScheduler};
use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::{immutable_field_error, DeserrJsonError, DeserrQueryParamError}; use meilisearch_types::deserr::{immutable_field_error, DeserrJsonError, DeserrQueryParamError};
use meilisearch_types::error::deserr_codes::*; use meilisearch_types::error::deserr_codes::*;
@ -107,7 +107,7 @@ pub async fn list_indexes(
if !filters.is_index_authorized(uid) { if !filters.is_index_authorized(uid) {
return Ok(None); return Ok(None);
} }
Ok(Some(IndexView::new(uid.to_string(), index)?)) Ok(Some(IndexView::new(uid.to_string(), index).map_err(|e| Error::from_milli(e, Some(uid.to_string())))?))
})?; })?;
// Won't cause to open all indexes because IndexView doesn't keep the `Index` opened. // Won't cause to open all indexes because IndexView doesn't keep the `Index` opened.
let indexes: Vec<IndexView> = indexes.into_iter().flatten().collect(); let indexes: Vec<IndexView> = indexes.into_iter().flatten().collect();

View File

@ -243,11 +243,11 @@ pub async fn search_with_url_query(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index, features)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
let permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features()) perform_search(index_uid.to_string(), &index, query, search_kind, retrieve_vector, index_scheduler.features())
}) })
.await; .await;
permit.drop().await; permit.drop().await;
@ -287,12 +287,12 @@ pub async fn search_with_post(
let features = index_scheduler.features(); let features = index_scheduler.features();
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?; let search_kind = search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index, features)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?; let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
let permit = search_queue.try_get_search_permit().await?; let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features()) perform_search(index_uid.to_string(), &index, query, search_kind, retrieve_vectors, index_scheduler.features())
}) })
.await; .await;
permit.drop().await; permit.drop().await;
@ -314,6 +314,7 @@ pub async fn search_with_post(
pub fn search_kind( pub fn search_kind(
query: &SearchQuery, query: &SearchQuery,
index_scheduler: &IndexScheduler, index_scheduler: &IndexScheduler,
index_uid: String,
index: &milli::Index, index: &milli::Index,
features: RoFeatures, features: RoFeatures,
) -> Result<SearchKind, ResponseError> { ) -> Result<SearchKind, ResponseError> {
@ -332,7 +333,7 @@ pub fn search_kind(
(None, _, None) => Ok(SearchKind::KeywordOnly), (None, _, None) => Ok(SearchKind::KeywordOnly),
// hybrid.semantic_ratio == 1.0 => vector // hybrid.semantic_ratio == 1.0 => vector
(_, Some(HybridQuery { semantic_ratio, embedder }), v) if **semantic_ratio == 1.0 => { (_, Some(HybridQuery { semantic_ratio, embedder }), v) if **semantic_ratio == 1.0 => {
SearchKind::semantic(index_scheduler, index, embedder, v.map(|v| v.len())) SearchKind::semantic(index_scheduler, index_uid, index, embedder, v.map(|v| v.len()))
} }
// hybrid.semantic_ratio == 0.0 => keyword // hybrid.semantic_ratio == 0.0 => keyword
(_, Some(HybridQuery { semantic_ratio, embedder: _ }), _) if **semantic_ratio == 0.0 => { (_, Some(HybridQuery { semantic_ratio, embedder: _ }), _) if **semantic_ratio == 0.0 => {
@ -340,13 +341,14 @@ pub fn search_kind(
} }
// no query, hybrid, vector => semantic // no query, hybrid, vector => semantic
(None, Some(HybridQuery { semantic_ratio: _, embedder }), Some(v)) => { (None, Some(HybridQuery { semantic_ratio: _, embedder }), Some(v)) => {
SearchKind::semantic(index_scheduler, index, embedder, Some(v.len())) SearchKind::semantic(index_scheduler, index_uid, index, embedder, Some(v.len()))
} }
// query, no hybrid, no vector => keyword // query, no hybrid, no vector => keyword
(Some(_), None, None) => Ok(SearchKind::KeywordOnly), (Some(_), None, None) => Ok(SearchKind::KeywordOnly),
// query, hybrid, maybe vector => hybrid // query, hybrid, maybe vector => hybrid
(Some(_), Some(HybridQuery { semantic_ratio, embedder }), v) => SearchKind::hybrid( (Some(_), Some(HybridQuery { semantic_ratio, embedder }), v) => SearchKind::hybrid(
index_scheduler, index_scheduler,
index_uid,
index, index,
embedder, embedder,
**semantic_ratio, **semantic_ratio,

View File

@ -104,7 +104,7 @@ async fn similar(
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let (embedder_name, embedder, quantized) = let (embedder_name, embedder, quantized) =
SearchKind::embedder(&index_scheduler, &index, &query.embedder, None)?; SearchKind::embedder(&index_scheduler, index_uid.to_string(), &index, &query.embedder, None)?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
perform_similar( perform_similar(

View File

@ -125,14 +125,16 @@ pub async fn multi_search_with_post(
}) })
.with_index(query_index)?; .with_index(query_index)?;
let index_uid_str = index_uid.to_string();
let search_kind = let search_kind =
search_kind(&query, index_scheduler.get_ref(), &index, features) search_kind(&query, index_scheduler.get_ref(), index_uid_str.clone(), &index, features)
.with_index(query_index)?; .with_index(query_index)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features) let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)
.with_index(query_index)?; .with_index(query_index)?;
let search_result = tokio::task::spawn_blocking(move || { let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, features) perform_search(index_uid_str.clone(), &index, query, search_kind, retrieve_vector, features)
}) })
.await .await
.with_index(query_index)?; .with_index(query_index)?;

View File

@ -560,7 +560,7 @@ pub fn perform_federated_search(
// use an immediately invoked lambda to capture the result without returning from the function // use an immediately invoked lambda to capture the result without returning from the function
let res: Result<(), ResponseError> = (|| { let res: Result<(), ResponseError> = (|| {
let search_kind = search_kind(&query, index_scheduler, &index, features)?; let search_kind = search_kind(&query, index_scheduler, index_uid.to_string(), &index, features)?;
let canonicalization_kind = match (&search_kind, &query.q) { let canonicalization_kind = match (&search_kind, &query.q) {
(SearchKind::SemanticOnly { .. }, _) => { (SearchKind::SemanticOnly { .. }, _) => {
@ -636,7 +636,7 @@ pub fn perform_federated_search(
search.offset(0); search.offset(0);
search.limit(required_hit_count); search.limit(required_hit_count);
let (result, _semantic_hit_count) = super::search_from_kind(search_kind, search)?; let (result, _semantic_hit_count) = super::search_from_kind(index_uid.to_string(), search_kind, search)?;
let format = AttributesFormat { let format = AttributesFormat {
attributes_to_retrieve: query.attributes_to_retrieve, attributes_to_retrieve: query.attributes_to_retrieve,
retrieve_vectors, retrieve_vectors,
@ -670,7 +670,8 @@ pub fn perform_federated_search(
let formatter_builder = HitMaker::formatter_builder(matching_words, tokenizer); let formatter_builder = HitMaker::formatter_builder(matching_words, tokenizer);
let hit_maker = HitMaker::new(&index, &rtxn, format, formatter_builder)?; let hit_maker = HitMaker::new(&index, &rtxn, format, formatter_builder)
.map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid.to_string())))?;
results_by_query.push(SearchResultByQuery { results_by_query.push(SearchResultByQuery {
federation_options, federation_options,

View File

@ -19,7 +19,7 @@ use meilisearch_types::locales::Locale;
use meilisearch_types::milli::score_details::{ScoreDetails, ScoringStrategy}; use meilisearch_types::milli::score_details::{ScoreDetails, ScoringStrategy};
use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors; use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors;
use meilisearch_types::milli::vector::Embedder; use meilisearch_types::milli::vector::Embedder;
use meilisearch_types::milli::{FacetValueHit, OrderBy, SearchForFacetValues, TimeBudget}; use meilisearch_types::milli::{FacetValueHit, InternalError, OrderBy, SearchForFacetValues, TimeBudget};
use meilisearch_types::settings::DEFAULT_PAGINATION_MAX_TOTAL_HITS; use meilisearch_types::settings::DEFAULT_PAGINATION_MAX_TOTAL_HITS;
use meilisearch_types::{milli, Document}; use meilisearch_types::{milli, Document};
use milli::tokenizer::{Language, TokenizerBuilder}; use milli::tokenizer::{Language, TokenizerBuilder};
@ -281,35 +281,38 @@ pub enum SearchKind {
impl SearchKind { impl SearchKind {
pub(crate) fn semantic( pub(crate) fn semantic(
index_scheduler: &index_scheduler::IndexScheduler, index_scheduler: &index_scheduler::IndexScheduler,
index_uid: String,
index: &Index, index: &Index,
embedder_name: &str, embedder_name: &str,
vector_len: Option<usize>, vector_len: Option<usize>,
) -> Result<Self, ResponseError> { ) -> Result<Self, ResponseError> {
let (embedder_name, embedder, quantized) = let (embedder_name, embedder, quantized) =
Self::embedder(index_scheduler, index, embedder_name, vector_len)?; Self::embedder(index_scheduler, index_uid, index, embedder_name, vector_len)?;
Ok(Self::SemanticOnly { embedder_name, embedder, quantized }) Ok(Self::SemanticOnly { embedder_name, embedder, quantized })
} }
pub(crate) fn hybrid( pub(crate) fn hybrid(
index_scheduler: &index_scheduler::IndexScheduler, index_scheduler: &index_scheduler::IndexScheduler,
index_uid: String,
index: &Index, index: &Index,
embedder_name: &str, embedder_name: &str,
semantic_ratio: f32, semantic_ratio: f32,
vector_len: Option<usize>, vector_len: Option<usize>,
) -> Result<Self, ResponseError> { ) -> Result<Self, ResponseError> {
let (embedder_name, embedder, quantized) = let (embedder_name, embedder, quantized) =
Self::embedder(index_scheduler, index, embedder_name, vector_len)?; Self::embedder(index_scheduler, index_uid, index, embedder_name, vector_len)?;
Ok(Self::Hybrid { embedder_name, embedder, quantized, semantic_ratio }) Ok(Self::Hybrid { embedder_name, embedder, quantized, semantic_ratio })
} }
pub(crate) fn embedder( pub(crate) fn embedder(
index_scheduler: &index_scheduler::IndexScheduler, index_scheduler: &index_scheduler::IndexScheduler,
index_uid: String,
index: &Index, index: &Index,
embedder_name: &str, embedder_name: &str,
vector_len: Option<usize>, vector_len: Option<usize>,
) -> Result<(String, Arc<Embedder>, bool), ResponseError> { ) -> Result<(String, Arc<Embedder>, bool), ResponseError> {
let embedder_configs = index.embedding_configs(&index.read_txn()?)?; let embedder_configs = index.embedding_configs(&index.read_txn()?)?;
let embedders = index_scheduler.embedders(embedder_configs)?; let embedders = index_scheduler.embedders(index_uid, embedder_configs)?;
let (embedder, _, quantized) = embedders let (embedder, _, quantized) = embedders
.get(embedder_name) .get(embedder_name)
@ -888,6 +891,7 @@ fn prepare_search<'t>(
} }
pub fn perform_search( pub fn perform_search(
index_uid: String,
index: &Index, index: &Index,
query: SearchQuery, query: SearchQuery,
search_kind: SearchKind, search_kind: SearchKind,
@ -914,7 +918,7 @@ pub fn perform_search(
used_negative_operator, used_negative_operator,
}, },
semantic_hit_count, semantic_hit_count,
) = search_from_kind(search_kind, search)?; ) = search_from_kind(index_uid, search_kind, search)?;
let SearchQuery { let SearchQuery {
q, q,
@ -1067,17 +1071,24 @@ fn compute_facet_distribution_stats<S: AsRef<str>>(
} }
pub fn search_from_kind( pub fn search_from_kind(
index_uid: String,
search_kind: SearchKind, search_kind: SearchKind,
search: milli::Search<'_>, search: milli::Search<'_>,
) -> Result<(milli::SearchResult, Option<u32>), MeilisearchHttpError> { ) -> Result<(milli::SearchResult, Option<u32>), MeilisearchHttpError> {
let (milli_result, semantic_hit_count) = match &search_kind { let (milli_result, semantic_hit_count) = match &search_kind {
SearchKind::KeywordOnly => (search.execute()?, None), SearchKind::KeywordOnly => {
let results = search.execute()
.map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid.to_string())))?;
(results, None)
},
SearchKind::SemanticOnly { .. } => { SearchKind::SemanticOnly { .. } => {
let results = search.execute()?; let results = search.execute()
.map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid.to_string())))?;
let semantic_hit_count = results.document_scores.len() as u32; let semantic_hit_count = results.document_scores.len() as u32;
(results, Some(semantic_hit_count)) (results, Some(semantic_hit_count))
} }
SearchKind::Hybrid { semantic_ratio, .. } => search.execute_hybrid(*semantic_ratio)?, SearchKind::Hybrid { semantic_ratio, .. } => search.execute_hybrid(*semantic_ratio)
.map_err(|e| MeilisearchHttpError::from_milli(e, Some(index_uid)))?,
}; };
Ok((milli_result, semantic_hit_count)) Ok((milli_result, semantic_hit_count))
} }
@ -1179,7 +1190,7 @@ impl<'a> HitMaker<'a> {
rtxn: &'a RoTxn<'a>, rtxn: &'a RoTxn<'a>,
format: AttributesFormat, format: AttributesFormat,
mut formatter_builder: MatcherBuilder<'a>, mut formatter_builder: MatcherBuilder<'a>,
) -> Result<Self, MeilisearchHttpError> { ) -> milli::Result<Self> {
formatter_builder.crop_marker(format.crop_marker); formatter_builder.crop_marker(format.crop_marker);
formatter_builder.highlight_prefix(format.highlight_pre_tag); formatter_builder.highlight_prefix(format.highlight_pre_tag);
formatter_builder.highlight_suffix(format.highlight_post_tag); formatter_builder.highlight_suffix(format.highlight_post_tag);
@ -1278,7 +1289,7 @@ impl<'a> HitMaker<'a> {
&self, &self,
id: u32, id: u32,
score: &[ScoreDetails], score: &[ScoreDetails],
) -> Result<SearchHit, MeilisearchHttpError> { ) -> milli::Result<SearchHit> {
let (_, obkv) = let (_, obkv) =
self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?; self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?;
@ -1321,7 +1332,7 @@ impl<'a> HitMaker<'a> {
.is_some_and(|conf| conf.user_provided.contains(id)); .is_some_and(|conf| conf.user_provided.contains(id));
let embeddings = let embeddings =
ExplicitVectors { embeddings: Some(vector.into()), regenerate: !user_provided }; ExplicitVectors { embeddings: Some(vector.into()), regenerate: !user_provided };
vectors.insert(name, serde_json::to_value(embeddings)?); vectors.insert(name, serde_json::to_value(embeddings).map_err(InternalError::SerdeJson)?);
} }
document.insert("_vectors".into(), vectors.into()); document.insert("_vectors".into(), vectors.into());
} }
@ -1367,7 +1378,7 @@ fn make_hits<'a>(
format: AttributesFormat, format: AttributesFormat,
matching_words: milli::MatchingWords, matching_words: milli::MatchingWords,
documents_ids_scores: impl Iterator<Item = (u32, &'a Vec<ScoreDetails>)> + 'a, documents_ids_scores: impl Iterator<Item = (u32, &'a Vec<ScoreDetails>)> + 'a,
) -> Result<Vec<SearchHit>, MeilisearchHttpError> { ) -> milli::Result<Vec<SearchHit>> {
let mut documents = Vec::new(); let mut documents = Vec::new();
let dictionary = index.dictionary(rtxn)?; let dictionary = index.dictionary(rtxn)?;
@ -1688,12 +1699,12 @@ fn make_document(
displayed_attributes: &BTreeSet<FieldId>, displayed_attributes: &BTreeSet<FieldId>,
field_ids_map: &FieldsIdsMap, field_ids_map: &FieldsIdsMap,
obkv: obkv::KvReaderU16, obkv: obkv::KvReaderU16,
) -> Result<Document, MeilisearchHttpError> { ) -> milli::Result<Document> {
let mut document = serde_json::Map::new(); let mut document = serde_json::Map::new();
// recreate the original json // recreate the original json
for (key, value) in obkv.iter() { for (key, value) in obkv.iter() {
let value = serde_json::from_slice(value)?; let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
let key = field_ids_map.name(key).expect("Missing field name").to_string(); let key = field_ids_map.name(key).expect("Missing field name").to_string();
document.insert(key, value); document.insert(key, value);
@ -1718,7 +1729,7 @@ fn format_fields(
displayable_ids: &BTreeSet<FieldId>, displayable_ids: &BTreeSet<FieldId>,
locales: Option<&[Language]>, locales: Option<&[Language]>,
localized_attributes: &[LocalizedAttributesRule], localized_attributes: &[LocalizedAttributesRule],
) -> Result<(Option<MatchesPosition>, Document), MeilisearchHttpError> { ) -> milli::Result<(Option<MatchesPosition>, Document)> {
let mut matches_position = compute_matches.then(BTreeMap::new); let mut matches_position = compute_matches.then(BTreeMap::new);
let mut document = document.clone(); let mut document = document.clone();
@ -1926,7 +1937,7 @@ fn parse_filter_array(arr: &[Value]) -> Result<Option<Filter>, MeilisearchHttpEr
} }
} }
Ok(Filter::from_array(ands)?) Ok(Filter::from_array(ands).map_err(|e|MeilisearchHttpError::from_milli(e,None))?)
} }
#[cfg(test)] #[cfg(test)]