diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 70ab30d34..485ccf972 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -27,7 +27,7 @@ actix-http = { version = "=3.0.0-beta.6" } actix-service = "2.0.0" actix-web = { version = "=4.0.0-beta.6", features = ["rustls"] } actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "6db8c3e", optional = true } -anyhow = "1.0.36" +#anyhow = "1.0.36" async-stream = "0.3.0" async-trait = "0.1.42" arc-swap = "1.2.0" diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index 9f8a688bc..634216ade 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -6,6 +6,7 @@ use sha2::Digest; use crate::index::{Checked, Settings}; use crate::index_controller::{ DumpInfo, IndexController, IndexMetadata, IndexSettings, IndexStats, Stats, + error::Result }; use crate::option::Opt; @@ -56,7 +57,7 @@ impl ApiKeys { } impl Data { - pub fn new(options: Opt) -> anyhow::Result { + pub fn new(options: Opt) -> std::result::Result> { let path = options.db_path.clone(); let index_controller = IndexController::new(&path, &options)?; @@ -79,15 +80,15 @@ impl Data { Ok(Data { inner }) } - pub async fn settings(&self, uid: String) -> anyhow::Result> { + pub async fn settings(&self, uid: String) -> Result> { self.index_controller.settings(uid).await } - pub async fn list_indexes(&self) -> anyhow::Result> { + pub async fn list_indexes(&self) -> Result> { self.index_controller.list_indexes().await } - pub async fn index(&self, uid: String) -> anyhow::Result { + pub async fn index(&self, uid: String) -> Result { self.index_controller.get_index(uid).await } @@ -95,7 +96,7 @@ impl Data { &self, uid: String, primary_key: Option, - ) -> anyhow::Result { + ) -> Result { let settings = IndexSettings { uid: Some(uid), primary_key, @@ -105,19 +106,19 @@ impl Data { Ok(meta) } - pub async fn get_index_stats(&self, uid: String) -> anyhow::Result { + pub async fn get_index_stats(&self, uid: String) -> Result { Ok(self.index_controller.get_index_stats(uid).await?) } - pub async fn get_all_stats(&self) -> anyhow::Result { + pub async fn get_all_stats(&self) -> Result { Ok(self.index_controller.get_all_stats().await?) } - pub async fn create_dump(&self) -> anyhow::Result { + pub async fn create_dump(&self) -> Result { Ok(self.index_controller.create_dump().await?) } - pub async fn dump_status(&self, uid: String) -> anyhow::Result { + pub async fn dump_status(&self, uid: String) -> Result { Ok(self.index_controller.dump_info(uid).await?) } diff --git a/meilisearch-http/src/data/search.rs b/meilisearch-http/src/data/search.rs index 1a998b997..30645cfaf 100644 --- a/meilisearch-http/src/data/search.rs +++ b/meilisearch-http/src/data/search.rs @@ -2,13 +2,14 @@ use serde_json::{Map, Value}; use super::Data; use crate::index::{SearchQuery, SearchResult}; +use crate::index_controller::error::Result; impl Data { pub async fn search( &self, index: String, search_query: SearchQuery, - ) -> anyhow::Result { + ) -> Result { self.index_controller.search(index, search_query).await } @@ -18,7 +19,7 @@ impl Data { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> anyhow::Result>> { + ) -> Result>> { self.index_controller .documents(index, offset, limit, attributes_to_retrieve) .await @@ -29,7 +30,7 @@ impl Data { index: String, document_id: String, attributes_to_retrieve: Option>, - ) -> anyhow::Result> { + ) -> Result> { self.index_controller .document(index, document_id, attributes_to_retrieve) .await diff --git a/meilisearch-http/src/data/updates.rs b/meilisearch-http/src/data/updates.rs index 23949aa86..e2b9f6629 100644 --- a/meilisearch-http/src/data/updates.rs +++ b/meilisearch-http/src/data/updates.rs @@ -3,7 +3,7 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat}; use super::Data; use crate::index::{Checked, Settings}; -use crate::index_controller::{IndexMetadata, IndexSettings, UpdateStatus}; +use crate::index_controller::{IndexMetadata, IndexSettings, UpdateStatus, error::Result}; impl Data { pub async fn add_documents( @@ -13,7 +13,7 @@ impl Data { format: UpdateFormat, stream: Payload, primary_key: Option, - ) -> anyhow::Result { + ) -> Result { let update_status = self .index_controller .add_documents(index, method, format, stream, primary_key) @@ -26,7 +26,7 @@ impl Data { index: String, settings: Settings, create: bool, - ) -> anyhow::Result { + ) -> Result { let update = self .index_controller .update_settings(index, settings, create) @@ -34,7 +34,7 @@ impl Data { Ok(update) } - pub async fn clear_documents(&self, index: String) -> anyhow::Result { + pub async fn clear_documents(&self, index: String) -> Result { let update = self.index_controller.clear_documents(index).await?; Ok(update) } @@ -43,7 +43,7 @@ impl Data { &self, index: String, document_ids: Vec, - ) -> anyhow::Result { + ) -> Result { let update = self .index_controller .delete_documents(index, document_ids) @@ -51,16 +51,16 @@ impl Data { Ok(update) } - pub async fn delete_index(&self, index: String) -> anyhow::Result<()> { + pub async fn delete_index(&self, index: String) -> Result<()> { self.index_controller.delete_index(index).await?; Ok(()) } - pub async fn get_update_status(&self, index: String, uid: u64) -> anyhow::Result { + pub async fn get_update_status(&self, index: String, uid: u64) -> Result { self.index_controller.update_status(index, uid).await } - pub async fn get_updates_status(&self, index: String) -> anyhow::Result> { + pub async fn get_updates_status(&self, index: String) -> Result> { self.index_controller.all_update_status(index).await } @@ -69,7 +69,7 @@ impl Data { uid: String, primary_key: Option, new_uid: Option, - ) -> anyhow::Result { + ) -> Result { let settings = IndexSettings { uid: new_uid, primary_key, diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index 07bd96fb9..04e2c52ca 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -1,15 +1,33 @@ use std::error; +use std::error::Error; use std::fmt; use actix_web as aweb; use actix_web::body::Body; use actix_web::dev::BaseHttpResponseBuilder; -use actix_web::error::{JsonPayloadError, QueryPayloadError}; -use actix_web::http::Error as HttpError; use actix_web::http::StatusCode; use meilisearch_error::{Code, ErrorCode}; use serde::ser::{Serialize, SerializeStruct, Serializer}; +use crate::index_controller::error::IndexControllerError; + +#[derive(Debug, thiserror::Error)] +pub enum AuthenticationError { + #[error("You must have an authorization token")] + MissingAuthorizationHeader, + #[error("Invalid API key")] + InvalidToken(String), +} + +impl ErrorCode for AuthenticationError { + fn error_code(&self) -> Code { + match self { + AuthenticationError ::MissingAuthorizationHeader => Code::MissingAuthorizationHeader, + AuthenticationError::InvalidToken(_) => Code::InvalidToken, + } + } +} + #[derive(Debug)] pub struct ResponseError { inner: Box, @@ -29,30 +47,26 @@ impl fmt::Display for ResponseError { } } -// TODO: remove this when implementing actual error handling -impl From for ResponseError { - fn from(other: anyhow::Error) -> ResponseError { - ResponseError { - inner: Box::new(Error::NotFound(other.to_string())), - } - } +macro_rules! response_error { + ($($other:path), *) => { + $( + impl From<$other> for ResponseError { + fn from(error: $other) -> ResponseError { + ResponseError { + inner: Box::new(error), + } + } + } + + )* + }; } -impl From for ResponseError { - fn from(error: Error) -> ResponseError { - ResponseError { - inner: Box::new(error), - } - } -} +response_error!( + IndexControllerError, + AuthenticationError +); -impl From for ResponseError { - fn from(err: FacetCountError) -> ResponseError { - ResponseError { - inner: Box::new(err), - } - } -} impl Serialize for ResponseError { fn serialize(&self, serializer: S) -> Result @@ -83,239 +97,35 @@ impl aweb::error::ResponseError for ResponseError { } #[derive(Debug)] -pub enum Error { - BadParameter(String, String), - BadRequest(String), - CreateIndex(String), - DocumentNotFound(String), - IndexNotFound(String), - IndexAlreadyExists(String), - Internal(String), - InvalidIndexUid, - InvalidToken(String), - MissingAuthorizationHeader, - NotFound(String), - OpenIndex(String), - RetrieveDocument(u32, String), - SearchDocuments(String), - PayloadTooLarge, - UnsupportedMediaType, - DumpAlreadyInProgress, - DumpProcessFailed(String), -} +struct PayloadError(E); -impl error::Error for Error {} - -impl ErrorCode for Error { - fn error_code(&self) -> Code { - use Error::*; - match self { - BadParameter(_, _) => Code::BadParameter, - BadRequest(_) => Code::BadRequest, - CreateIndex(_) => Code::CreateIndex, - DocumentNotFound(_) => Code::DocumentNotFound, - IndexNotFound(_) => Code::IndexNotFound, - IndexAlreadyExists(_) => Code::IndexAlreadyExists, - Internal(_) => Code::Internal, - InvalidIndexUid => Code::InvalidIndexUid, - InvalidToken(_) => Code::InvalidToken, - MissingAuthorizationHeader => Code::MissingAuthorizationHeader, - NotFound(_) => Code::NotFound, - OpenIndex(_) => Code::OpenIndex, - RetrieveDocument(_, _) => Code::RetrieveDocument, - SearchDocuments(_) => Code::SearchDocuments, - PayloadTooLarge => Code::PayloadTooLarge, - UnsupportedMediaType => Code::UnsupportedMediaType, - _ => unreachable!() - //DumpAlreadyInProgress => Code::DumpAlreadyInProgress, - //DumpProcessFailed(_) => Code::DumpProcessFailed, - } - } -} - -#[derive(Debug)] -pub enum FacetCountError { - AttributeNotSet(String), - SyntaxError(String), - UnexpectedToken { - found: String, - expected: &'static [&'static str], - }, - NoFacetSet, -} - -impl error::Error for FacetCountError {} - -impl ErrorCode for FacetCountError { - fn error_code(&self) -> Code { - Code::BadRequest - } -} - -impl FacetCountError { - pub fn unexpected_token( - found: impl ToString, - expected: &'static [&'static str], - ) -> FacetCountError { - let found = found.to_string(); - FacetCountError::UnexpectedToken { expected, found } - } -} - -impl From for FacetCountError { - fn from(other: serde_json::error::Error) -> FacetCountError { - FacetCountError::SyntaxError(other.to_string()) - } -} - -impl fmt::Display for FacetCountError { +impl fmt::Display for PayloadError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use FacetCountError::*; + std::fmt::Display::fmt(&self.0, f) + } +} - match self { - AttributeNotSet(attr) => write!(f, "Attribute {} is not set as facet", attr), - SyntaxError(msg) => write!(f, "Syntax error: {}", msg), - UnexpectedToken { expected, found } => { - write!(f, "Unexpected {} found, expected {:?}", found, expected) - } - NoFacetSet => write!(f, "Can't perform facet count, as no facet is set"), +impl Error for PayloadError {} + +impl ErrorCode for PayloadError { + fn error_code(&self) -> Code { + Code::Internal + } +} + +impl From> for ResponseError +where E: Error + Sync + Send + 'static +{ + fn from(other: PayloadError) -> Self { + ResponseError { + inner: Box::new(other), } } } -impl Error { - pub fn internal(err: impl fmt::Display) -> Error { - Error::Internal(err.to_string()) - } - - pub fn bad_request(err: impl fmt::Display) -> Error { - Error::BadRequest(err.to_string()) - } - - pub fn missing_authorization_header() -> Error { - Error::MissingAuthorizationHeader - } - - pub fn invalid_token(err: impl fmt::Display) -> Error { - Error::InvalidToken(err.to_string()) - } - - pub fn not_found(err: impl fmt::Display) -> Error { - Error::NotFound(err.to_string()) - } - - pub fn index_not_found(err: impl fmt::Display) -> Error { - Error::IndexNotFound(err.to_string()) - } - - pub fn document_not_found(err: impl fmt::Display) -> Error { - Error::DocumentNotFound(err.to_string()) - } - - pub fn bad_parameter(param: impl fmt::Display, err: impl fmt::Display) -> Error { - Error::BadParameter(param.to_string(), err.to_string()) - } - - pub fn open_index(err: impl fmt::Display) -> Error { - Error::OpenIndex(err.to_string()) - } - - pub fn create_index(err: impl fmt::Display) -> Error { - Error::CreateIndex(err.to_string()) - } - - pub fn invalid_index_uid() -> Error { - Error::InvalidIndexUid - } - - pub fn retrieve_document(doc_id: u32, err: impl fmt::Display) -> Error { - Error::RetrieveDocument(doc_id, err.to_string()) - } - - pub fn search_documents(err: impl fmt::Display) -> Error { - Error::SearchDocuments(err.to_string()) - } - - pub fn dump_conflict() -> Error { - Error::DumpAlreadyInProgress - } - - pub fn dump_failed(message: String) -> Error { - Error::DumpProcessFailed(message) - } -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::BadParameter(param, err) => write!(f, "Url parameter {} error: {}", param, err), - Self::BadRequest(err) => f.write_str(err), - Self::CreateIndex(err) => write!(f, "Impossible to create index; {}", err), - Self::DocumentNotFound(document_id) => write!(f, "Document with id {} not found", document_id), - Self::IndexNotFound(index_uid) => write!(f, "Index {} not found", index_uid), - Self::IndexAlreadyExists(index_uid) => write!(f, "Index {} already exists", index_uid), - Self::Internal(err) => f.write_str(err), - Self::InvalidIndexUid => f.write_str("Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_)."), - Self::InvalidToken(err) => write!(f, "Invalid API key: {}", err), - Self::MissingAuthorizationHeader => f.write_str("You must have an authorization token"), - Self::NotFound(err) => write!(f, "{} not found", err), - Self::OpenIndex(err) => write!(f, "Impossible to open index; {}", err), - Self::RetrieveDocument(id, err) => write!(f, "Impossible to retrieve the document with id: {}; {}", id, err), - Self::SearchDocuments(err) => write!(f, "Impossible to search documents; {}", err), - Self::PayloadTooLarge => f.write_str("Payload too large"), - Self::UnsupportedMediaType => f.write_str("Unsupported media type"), - Self::DumpAlreadyInProgress => f.write_str("Another dump is already in progress"), - Self::DumpProcessFailed(message) => write!(f, "Dump process failed: {}", message), - } - } -} - -impl From for Error { - fn from(err: std::io::Error) -> Error { - Error::Internal(err.to_string()) - } -} - -impl From for Error { - fn from(err: HttpError) -> Error { - Error::Internal(err.to_string()) - } -} - -impl From for Error { - fn from(err: serde_json::error::Error) -> Error { - Error::Internal(err.to_string()) - } -} - -impl From for Error { - fn from(err: JsonPayloadError) -> Error { - match err { - JsonPayloadError::Deserialize(err) => { - Error::BadRequest(format!("Invalid JSON: {}", err)) - } - JsonPayloadError::Overflow => Error::PayloadTooLarge, - JsonPayloadError::ContentType => Error::UnsupportedMediaType, - JsonPayloadError::Payload(err) => { - Error::BadRequest(format!("Problem while decoding the request: {}", err)) - } - e => Error::Internal(format!("Unexpected Json error: {}", e)), - } - } -} - -impl From for Error { - fn from(err: QueryPayloadError) -> Error { - match err { - QueryPayloadError::Deserialize(err) => { - Error::BadRequest(format!("Invalid query parameters: {}", err)) - } - e => Error::Internal(format!("Unexpected query payload error: {}", e)), - } - } -} - -pub fn payload_error_handler>(err: E) -> ResponseError { - let error: Error = err.into(); +pub fn payload_error_handler(err: E) -> ResponseError +where E: Error + Sync + Send + 'static +{ + let error = PayloadError(err); error.into() } diff --git a/meilisearch-http/src/helpers/authentication.rs b/meilisearch-http/src/helpers/authentication.rs index 54d5488f4..5577b9c1d 100644 --- a/meilisearch-http/src/helpers/authentication.rs +++ b/meilisearch-http/src/helpers/authentication.rs @@ -9,7 +9,7 @@ use futures::future::{ok, Future, Ready}; use futures::ready; use pin_project::pin_project; -use crate::error::{Error, ResponseError}; +use crate::error::{ResponseError, AuthenticationError}; use crate::Data; #[derive(Clone, Copy)] @@ -117,7 +117,7 @@ where AuthProj::NoHeader(req) => { match req.take() { Some(req) => { - let response = ResponseError::from(Error::MissingAuthorizationHeader); + let response = ResponseError::from(AuthenticationError::MissingAuthorizationHeader); let response = response.error_response(); let response = req.into_response(response); Poll::Ready(Ok(response)) @@ -134,7 +134,7 @@ where .get("X-Meili-API-Key") .map(|h| h.to_str().map(String::from).unwrap_or_default()) .unwrap_or_default(); - let response = ResponseError::from(Error::InvalidToken(bad_token)); + let response = ResponseError::from(AuthenticationError::InvalidToken(bad_token)); let response = response.error_response(); let response = req.into_response(response); Poll::Ready(Ok(response)) diff --git a/meilisearch-http/src/helpers/compression.rs b/meilisearch-http/src/helpers/compression.rs index c4747cb21..55861ca53 100644 --- a/meilisearch-http/src/helpers/compression.rs +++ b/meilisearch-http/src/helpers/compression.rs @@ -5,7 +5,7 @@ use std::path::Path; use flate2::{read::GzDecoder, write::GzEncoder, Compression}; use tar::{Archive, Builder}; -pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { +pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> Result<(), Box> { let mut f = File::create(dest)?; let gz_encoder = GzEncoder::new(&mut f, Compression::default()); let mut tar_encoder = Builder::new(gz_encoder); @@ -16,7 +16,7 @@ pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Resul Ok(()) } -pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { +pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> Result<(), Box> { let f = File::open(&src)?; let gz = GzDecoder::new(f); let mut ar = Archive::new(gz); diff --git a/meilisearch-http/src/index/dump.rs b/meilisearch-http/src/index/dump.rs index 13e6cbc02..05202be2d 100644 --- a/meilisearch-http/src/index/dump.rs +++ b/meilisearch-http/src/index/dump.rs @@ -3,7 +3,6 @@ use std::io::{BufRead, BufReader, Write}; use std::path::Path; use std::sync::Arc; -use anyhow::{bail, Context}; use heed::RoTxn; use indexmap::IndexMap; use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream}; @@ -12,6 +11,7 @@ use serde::{Deserialize, Serialize}; use crate::option::IndexerOpts; use super::{update_handler::UpdateHandler, Index, Settings, Unchecked}; +use super::error::{IndexError, Result}; #[derive(Serialize, Deserialize)] struct DumpMeta { @@ -23,7 +23,7 @@ const META_FILE_NAME: &str = "meta.json"; const DATA_FILE_NAME: &str = "documents.jsonl"; impl Index { - pub fn dump(&self, path: impl AsRef) -> anyhow::Result<()> { + pub fn dump(&self, path: impl AsRef) -> Result<()> { // acquire write txn make sure any ongoing write is finished before we start. let txn = self.env.write_txn()?; @@ -33,11 +33,12 @@ impl Index { Ok(()) } - fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { let document_file_path = path.as_ref().join(DATA_FILE_NAME); let mut document_file = File::create(&document_file_path)?; - let documents = self.all_documents(txn)?; + let documents = self.all_documents(txn) + .map_err(|e| IndexError::Internal(e.into()))?; let fields_ids_map = self.fields_ids_map(txn)?; // dump documents @@ -60,7 +61,7 @@ impl Index { Ok(()) } - fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { let meta_file_path = path.as_ref().join(META_FILE_NAME); let mut meta_file = File::create(&meta_file_path)?; @@ -81,11 +82,13 @@ impl Index { dst: impl AsRef, size: usize, indexing_options: &IndexerOpts, - ) -> anyhow::Result<()> { + ) -> std::result::Result<(), Box> { let dir_name = src .as_ref() .file_name() - .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + // TODO: remove + //.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + .unwrap(); let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); create_dir_all(&dst_dir_path)?; @@ -124,7 +127,7 @@ impl Index { match Arc::try_unwrap(index.0) { Ok(inner) => inner.prepare_for_closing().wait(), - Err(_) => bail!("Could not close index properly."), + Err(_) => todo!("Could not close index properly."), } Ok(()) diff --git a/meilisearch-http/src/index/error.rs b/meilisearch-http/src/index/error.rs new file mode 100644 index 000000000..cfda813a1 --- /dev/null +++ b/meilisearch-http/src/index/error.rs @@ -0,0 +1,60 @@ +use std::error::Error; + +use meilisearch_error::{Code, ErrorCode}; +use serde_json::Value; + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum IndexError { + #[error("Internal error: {0}")] + Internal(Box), + #[error("Document with id {0} not found.")] + DocumentNotFound(String), + #[error("error with facet: {0}")] + Facet(#[from] FacetError), +} + +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for IndexError { + fn from(other: $other) -> Self { + Self::Internal(Box::new(other)) + } + } + )* + } +} + +internal_error!( + std::io::Error, + heed::Error, + fst::Error, + serde_json::Error +); + +impl ErrorCode for IndexError { + fn error_code(&self) -> Code { + match self { + IndexError::Internal(_) => Code::Internal, + IndexError::DocumentNotFound(_) => Code::DocumentNotFound, + IndexError::Facet(e) => e.error_code(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum FacetError { + #[error("Invalid facet expression, expected {}, found: {1}", .0.join(", "))] + InvalidExpression(&'static [&'static str], Value) +} + +impl ErrorCode for FacetError { + fn error_code(&self) -> Code { + match self { + FacetError::InvalidExpression(_, _) => Code::Facet, + } + } +} + diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index 56958760a..816629524 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -5,19 +5,24 @@ use std::ops::Deref; use std::path::Path; use std::sync::Arc; -use anyhow::{bail, Context}; use heed::{EnvOpenOptions, RoTxn}; use milli::obkv_to_json; +use serde::{de::Deserializer, Deserialize}; use serde_json::{Map, Value}; use crate::helpers::EnvSizer; +use error::Result; + pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; -use serde::{de::Deserializer, Deserialize}; pub use updates::{Checked, Facets, Settings, Unchecked}; +use self::error::IndexError; + +pub mod error; +pub mod update_handler; + mod dump; mod search; -pub mod update_handler; mod updates; pub type Document = Map; @@ -33,7 +38,7 @@ impl Deref for Index { } } -pub fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> +pub fn deserialize_some<'de, T, D>(deserializer: D) -> std::result::Result, D::Error> where T: Deserialize<'de>, D: Deserializer<'de>, @@ -42,20 +47,21 @@ where } impl Index { - pub fn open(path: impl AsRef, size: usize) -> anyhow::Result { + pub fn open(path: impl AsRef, size: usize) -> Result { create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); - let index = milli::Index::new(options, &path)?; + let index = + milli::Index::new(options, &path).map_err(|e| IndexError::Internal(e.into()))?; Ok(Index(Arc::new(index))) } - pub fn settings(&self) -> anyhow::Result> { + pub fn settings(&self) -> Result> { let txn = self.read_txn()?; self.settings_txn(&txn) } - pub fn settings_txn(&self, txn: &RoTxn) -> anyhow::Result> { + pub fn settings_txn(&self, txn: &RoTxn) -> Result> { let displayed_attributes = self .displayed_fields(&txn)? .map(|fields| fields.into_iter().map(String::from).collect()); @@ -65,7 +71,8 @@ impl Index { .map(|fields| fields.into_iter().map(String::from).collect()); let faceted_attributes = self - .faceted_fields(&txn)? + .faceted_fields(&txn) + .map_err(|e| IndexError::Internal(Box::new(e)))? .into_iter() .collect(); @@ -76,8 +83,9 @@ impl Index { .collect(); let stop_words = self - .stop_words(&txn)? - .map(|stop_words| -> anyhow::Result> { + .stop_words(&txn) + .map_err(|e| IndexError::Internal(e.into()))? + .map(|stop_words| -> Result> { Ok(stop_words.stream().into_strs()?.into_iter().collect()) }) .transpose()? @@ -114,12 +122,13 @@ impl Index { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> anyhow::Result>> { + ) -> Result>> { let txn = self.read_txn()?; let fields_ids_map = self.fields_ids_map(&txn)?; - let fields_to_display = - self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; + let fields_to_display = self + .fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map) + .map_err(|e| IndexError::Internal(e.into()))?; let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit); @@ -127,7 +136,8 @@ impl Index { for entry in iter { let (_id, obkv) = entry?; - let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?; + let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv) + .map_err(|e| IndexError::Internal(e.into()))?; documents.push(object); } @@ -138,28 +148,35 @@ impl Index { &self, doc_id: String, attributes_to_retrieve: Option>, - ) -> anyhow::Result> { + ) -> Result> { let txn = self.read_txn()?; let fields_ids_map = self.fields_ids_map(&txn)?; - let fields_to_display = - self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; + let fields_to_display = self + .fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map) + .map_err(|e| IndexError::Internal(e.into()))?; let internal_id = self - .external_documents_ids(&txn)? + .external_documents_ids(&txn) + .map_err(|e| IndexError::Internal(e.into()))? .get(doc_id.as_bytes()) - .with_context(|| format!("Document with id {} not found", doc_id))?; + .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; let document = self - .documents(&txn, std::iter::once(internal_id))? + .documents(&txn, std::iter::once(internal_id)) + .map_err(|e| IndexError::Internal(e.into()))? .into_iter() .next() .map(|(_, d)| d); match document { - Some(document) => Ok(obkv_to_json(&fields_to_display, &fields_ids_map, document)?), - None => bail!("Document with id {} not found", doc_id), + Some(document) => { + let document = obkv_to_json(&fields_to_display, &fields_ids_map, document) + .map_err(|e| IndexError::Internal(e.into()))?; + Ok(document) + } + None => Err(IndexError::DocumentNotFound(doc_id)), } } @@ -172,8 +189,9 @@ impl Index { txn: &heed::RoTxn, attributes_to_retrieve: &Option>, fields_ids_map: &milli::FieldsIdsMap, - ) -> anyhow::Result> { - let mut displayed_fields_ids = match self.displayed_fields_ids(&txn)? { + ) -> Result> { + let mut displayed_fields_ids = match self.displayed_fields_ids(&txn) + .map_err(|e| IndexError::Internal(Box::new(e)))? { Some(ids) => ids.into_iter().collect::>(), None => fields_ids_map.iter().map(|(id, _)| id).collect(), }; diff --git a/meilisearch-http/src/index/search.rs b/meilisearch-http/src/index/search.rs index fbc47cd4e..3122f784e 100644 --- a/meilisearch-http/src/index/search.rs +++ b/meilisearch-http/src/index/search.rs @@ -2,7 +2,6 @@ use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::time::Instant; -use anyhow::bail; use either::Either; use heed::RoTxn; use indexmap::IndexMap; @@ -11,6 +10,9 @@ use milli::{FilterCondition, FieldId, FieldsIdsMap, MatchingWords}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use crate::index::error::FacetError; + +use super::error::{IndexError, Result}; use super::Index; pub type Document = IndexMap; @@ -71,7 +73,7 @@ struct FormatOptions { } impl Index { - pub fn perform_search(&self, query: SearchQuery) -> anyhow::Result { + pub fn perform_search(&self, query: SearchQuery) -> Result { let before_search = Instant::now(); let rtxn = self.read_txn()?; @@ -95,12 +97,14 @@ impl Index { matching_words, candidates, .. - } = search.execute()?; - let mut documents = Vec::new(); + } = search + .execute() + .map_err(|e| IndexError::Internal(e.into()))?; let fields_ids_map = self.fields_ids_map(&rtxn).unwrap(); let displayed_ids = self .displayed_fields_ids(&rtxn)? + .map_err(|e| IndexError::Internal(Box::new(e)))? .map(|fields| fields.into_iter().collect::>()) .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); @@ -158,6 +162,8 @@ impl Index { let formatter = Formatter::new(&stop_words, (String::from(""), String::from(""))); + let mut documents = Vec::new(); + for (_id, obkv) in self.documents(&rtxn, documents_ids)? { let document = make_document(&to_retrieve_ids, &fields_ids_map, obkv)?; let formatted = format_fields( @@ -167,6 +173,7 @@ impl Index { &matching_words, &formatted_options, )?; + let hit = SearchHit { document, formatted, @@ -182,7 +189,12 @@ impl Index { if fields.iter().all(|f| f != "*") { facet_distribution.facets(fields); } - Some(facet_distribution.candidates(candidates).execute()?) + let distribution = facet_distribution + .candidates(candidates) + .execute() + .map_err(|e| IndexError::Internal(e.into()))?; + + Some(distribution) } None => None, }; @@ -326,7 +338,7 @@ fn make_document( attributes_to_retrieve: &BTreeSet, field_ids_map: &FieldsIdsMap, obkv: obkv::KvReader, -) -> anyhow::Result { +) -> Result { let mut document = Document::new(); for attr in attributes_to_retrieve { if let Some(value) = obkv.get(*attr) { @@ -351,7 +363,7 @@ fn format_fields>( formatter: &Formatter, matching_words: &impl Matcher, formatted_options: &BTreeMap, -) -> anyhow::Result { +) -> Result { let mut document = Document::new(); for (id, format) in formatted_options { @@ -513,15 +525,15 @@ impl<'a, A: AsRef<[u8]>> Formatter<'a, A> { } } -fn parse_filter( - facets: &Value, - index: &Index, - txn: &RoTxn, -) -> anyhow::Result> { +fn parse_filter(facets: &Value, index: &Index, txn: &RoTxn) -> Result> { match facets { - Value::String(expr) => Ok(Some(FilterCondition::from_str(txn, index, expr)?)), + Value::String(expr) => { + let condition = FilterCondition::from_str(txn, index, expr) + .map_err(|e| IndexError::Internal(e.into()))?; + Ok(Some(condition)) + } Value::Array(arr) => parse_filter_array(txn, index, arr), - v => bail!("Invalid facet expression, expected Array, found: {:?}", v), + v => return Err(FacetError::InvalidExpression(&["Array"], v.clone()).into()), } } @@ -529,7 +541,7 @@ fn parse_filter_array( txn: &RoTxn, index: &Index, arr: &[Value], -) -> anyhow::Result> { +) -> Result> { let mut ands = Vec::new(); for value in arr { match value { @@ -539,19 +551,22 @@ fn parse_filter_array( for value in arr { match value { Value::String(s) => ors.push(s.clone()), - v => bail!("Invalid facet expression, expected String, found: {:?}", v), + v => { + return Err(FacetError::InvalidExpression(&["String"], v.clone()).into()) + } } } ands.push(Either::Left(ors)); } - v => bail!( - "Invalid facet expression, expected String or [String], found: {:?}", - v - ), + v => { + return Err( + FacetError::InvalidExpression(&["String", "[String]"], v.clone()).into(), + ) + } } } - Ok(FilterCondition::from_array(txn, &index.0, ands)?) + FilterCondition::from_array(txn, &index.0, ands).map_err(|e| IndexError::Internal(Box::new(e))) } #[cfg(test)] diff --git a/meilisearch-http/src/index/update_handler.rs b/meilisearch-http/src/index/update_handler.rs index 63a074abb..ce6a7e48d 100644 --- a/meilisearch-http/src/index/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -1,7 +1,6 @@ use std::fs::File; use crate::index::Index; -use anyhow::Result; use grenad::CompressionType; use milli::update::UpdateBuilder; use rayon::ThreadPool; @@ -22,7 +21,7 @@ pub struct UpdateHandler { } impl UpdateHandler { - pub fn new(opt: &IndexerOpts) -> anyhow::Result { + pub fn new(opt: &IndexerOpts) -> std::result::Result> { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(opt.indexing_jobs.unwrap_or(0)) .build()?; diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index ce327520e..04e90f4f4 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -8,11 +8,16 @@ use log::info; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use serde::{Deserialize, Serialize, Serializer}; +use crate::index::error::IndexError; use crate::index_controller::UpdateResult; +use super::error::Result; use super::{deserialize_some, Index}; -fn serialize_with_wildcard(field: &Option>>, s: S) -> Result +fn serialize_with_wildcard( + field: &Option>>, + s: S, +) -> std::result::Result where S: Serializer, { @@ -174,7 +179,7 @@ impl Index { content: Option, update_builder: UpdateBuilder, primary_key: Option<&str>, - ) -> anyhow::Result { + ) -> Result { let mut txn = self.write_txn()?; let result = self.update_documents_txn( &mut txn, @@ -196,7 +201,7 @@ impl Index { content: Option, update_builder: UpdateBuilder, primary_key: Option<&str>, - ) -> anyhow::Result { + ) -> Result { info!("performing document addition"); // Set the primary key if not set already, ignore if already set. @@ -204,7 +209,8 @@ impl Index { let mut builder = UpdateBuilder::new(0) .settings(txn, &self); builder.set_primary_key(primary_key.to_string()); - builder.execute(|_, _| ())?; + builder.execute(|_, _| ()) + .map_err(|e| IndexError::Internal(Box::new(e)))?; } let mut builder = update_builder.index_documents(txn, self); @@ -216,11 +222,15 @@ impl Index { let gzipped = false; let addition = match content { - Some(content) if gzipped => { - builder.execute(GzDecoder::new(content), indexing_callback)? - } - Some(content) => builder.execute(content, indexing_callback)?, - None => builder.execute(std::io::empty(), indexing_callback)?, + Some(content) if gzipped => builder + .execute(GzDecoder::new(content), indexing_callback) + .map_err(|e| IndexError::Internal(e.into()))?, + Some(content) => builder + .execute(content, indexing_callback) + .map_err(|e| IndexError::Internal(e.into()))?, + None => builder + .execute(std::io::empty(), indexing_callback) + .map_err(|e| IndexError::Internal(e.into()))?, }; info!("document addition done: {:?}", addition); @@ -228,7 +238,7 @@ impl Index { Ok(UpdateResult::DocumentsAddition(addition)) } - pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result { + pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result { // We must use the write transaction of the update here. let mut wtxn = self.write_txn()?; let builder = update_builder.clear_documents(&mut wtxn, self); @@ -238,7 +248,7 @@ impl Index { .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), - Err(e) => Err(e.into()), + Err(e) => Err(IndexError::Internal(Box::new(e))), } } @@ -247,7 +257,7 @@ impl Index { txn: &mut heed::RwTxn<'a, 'b>, settings: &Settings, update_builder: UpdateBuilder, - ) -> anyhow::Result { + ) -> Result { // We must use the write transaction of the update here. let mut builder = update_builder.settings(txn, self); @@ -300,7 +310,8 @@ impl Index { builder.execute(|indexing_step, update_id| { info!("update {}: {:?}", update_id, indexing_step) - })?; + }) + .map_err(|e| IndexError::Internal(e.into()))?; Ok(UpdateResult::Other) } @@ -309,7 +320,7 @@ impl Index { &self, settings: &Settings, update_builder: UpdateBuilder, - ) -> anyhow::Result { + ) -> Result { let mut txn = self.write_txn()?; let result = self.update_settings_txn(&mut txn, settings, update_builder)?; txn.commit()?; @@ -320,9 +331,10 @@ impl Index { &self, document_ids: &[String], update_builder: UpdateBuilder, - ) -> anyhow::Result { + ) -> Result { let mut txn = self.write_txn()?; - let mut builder = update_builder.delete_documents(&mut txn, self)?; + let mut builder = update_builder.delete_documents(&mut txn, self) + .map_err(|e| IndexError::Internal(e.into()))?; // We ignore unexisting document ids document_ids.iter().for_each(|id| { @@ -334,7 +346,7 @@ impl Index { .commit() .and(Ok(UpdateResult::DocumentDeletion { deleted })) .map_err(Into::into), - Err(e) => Err(e.into()), + Err(e) => Err(IndexError::Internal(Box::new(e))), } } } diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index c78079de6..a534a3a69 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -10,8 +10,9 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use update_actor::UpdateActorHandle; use uuid_resolver::UuidResolverHandle; -use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask}; +use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask}; use crate::index_controller::{update_actor, uuid_resolver}; +use super::error::{DumpActorError, Result}; pub const CONCURRENT_DUMP_MSG: usize = 10; @@ -95,14 +96,14 @@ where } } - async fn handle_create_dump(&self, ret: oneshot::Sender>) { + async fn handle_create_dump(&self, ret: oneshot::Sender>) { let uid = generate_uid(); let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); let _lock = match self.lock.try_lock() { Some(lock) => lock, None => { - ret.send(Err(DumpError::DumpAlreadyRunning)) + ret.send(Err(DumpActorError::DumpAlreadyRunning)) .expect("Dump actor is dead"); return; } @@ -147,10 +148,10 @@ where }; } - async fn handle_dump_info(&self, uid: String) -> DumpResult { + async fn handle_dump_info(&self, uid: String) -> Result { match self.dump_infos.read().await.get(&uid) { Some(info) => Ok(info.clone()), - _ => Err(DumpError::DumpDoesNotExist(uid)), + _ => Err(DumpActorError::DumpDoesNotExist(uid)), } } } diff --git a/meilisearch-http/src/index_controller/dump_actor/error.rs b/meilisearch-http/src/index_controller/dump_actor/error.rs new file mode 100644 index 000000000..c0f373181 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/error.rs @@ -0,0 +1,51 @@ +use meilisearch_error::{Code, ErrorCode}; + +use crate::index_controller::{update_actor::error::UpdateActorError, uuid_resolver::UuidResolverError}; + +pub type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum DumpActorError { + #[error("dump already running")] + DumpAlreadyRunning, + #[error("dump `{0}` does not exist")] + DumpDoesNotExist(String), + #[error("Internal error: {0}")] + Internal(Box), + #[error("error while dumping uuids: {0}")] + UuidResolver(#[from] UuidResolverError), + #[error("error while dumping updates: {0}")] + UpdateActor(#[from] UpdateActorError), +} + +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for DumpActorError { + fn from(other: $other) -> Self { + Self::Internal(Box::new(other)) + } + } + )* + } +} + +internal_error!( + heed::Error, + std::io::Error, + tokio::task::JoinError, + serde_json::error::Error, + tempfile::PersistError +); + +impl ErrorCode for DumpActorError { + fn error_code(&self) -> Code { + match self { + DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress, + DumpActorError::DumpDoesNotExist(_) => Code::DocumentNotFound, + DumpActorError::Internal(_) => Code::Internal, + DumpActorError::UuidResolver(e) => e.error_code(), + DumpActorError::UpdateActor(e) => e.error_code(), + } + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs index ab91aeae6..b233a4e61 100644 --- a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs @@ -3,7 +3,8 @@ use std::path::Path; use actix_web::web::Bytes; use tokio::sync::{mpsc, oneshot}; -use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult}; +use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg}; +use super::error::Result; #[derive(Clone)] pub struct DumpActorHandleImpl { @@ -12,14 +13,14 @@ pub struct DumpActorHandleImpl { #[async_trait::async_trait] impl DumpActorHandle for DumpActorHandleImpl { - async fn create_dump(&self) -> DumpResult { + async fn create_dump(&self) -> Result { let (ret, receiver) = oneshot::channel(); let msg = DumpMsg::CreateDump { ret }; let _ = self.sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } - async fn dump_info(&self, uid: String) -> DumpResult { + async fn dump_info(&self, uid: String) -> Result { let (ret, receiver) = oneshot::channel(); let msg = DumpMsg::DumpInfo { ret, uid }; let _ = self.sender.send(msg).await; @@ -34,7 +35,7 @@ impl DumpActorHandleImpl { update: crate::index_controller::update_actor::UpdateActorHandleImpl, index_db_size: usize, update_db_size: usize, - ) -> anyhow::Result { + ) -> std::result::Result> { let (sender, receiver) = mpsc::channel(10); let actor = DumpActor::new( receiver, diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs index a7f1aa8d1..502a6333e 100644 --- a/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs @@ -31,7 +31,7 @@ impl MetadataV1 { dst: impl AsRef, size: usize, indexer_options: &IndexerOpts, - ) -> anyhow::Result<()> { + ) -> std::result::Result<(), Box> { info!( "Loading dump, dump database version: {}, dump version: V1", self.db_version @@ -83,7 +83,7 @@ fn load_index( primary_key: Option<&str>, size: usize, indexer_options: &IndexerOpts, -) -> anyhow::Result<()> { +) -> std::result::Result<(), Box> { let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid)); create_dir_all(&index_path)?; @@ -172,7 +172,7 @@ impl From for index_controller::Settings { } /// Extract Settings from `settings.json` file present at provided `dir_path` -fn import_settings(dir_path: impl AsRef) -> anyhow::Result { +fn import_settings(dir_path: impl AsRef) -> std::result::Result> { let path = dir_path.as_ref().join("settings.json"); let file = File::open(path)?; let reader = std::io::BufReader::new(file); diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs index eddd8a3b7..7f5420d23 100644 --- a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs @@ -34,7 +34,7 @@ impl MetadataV2 { index_db_size: usize, update_db_size: usize, indexing_options: &IndexerOpts, - ) -> anyhow::Result<()> { + ) -> std::result::Result<(), Box> { info!( "Loading dump from {}, dump database version: {}, dump version: V2", self.dump_date, self.db_version diff --git a/meilisearch-http/src/index_controller/dump_actor/message.rs b/meilisearch-http/src/index_controller/dump_actor/message.rs index dff9f5954..b34314eff 100644 --- a/meilisearch-http/src/index_controller/dump_actor/message.rs +++ b/meilisearch-http/src/index_controller/dump_actor/message.rs @@ -1,13 +1,14 @@ use tokio::sync::oneshot; -use super::{DumpInfo, DumpResult}; +use super::DumpInfo; +use super::error::Result; pub enum DumpMsg { CreateDump { - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, DumpInfo { uid: String, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, } diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index 66f081e87..35daa0da8 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -1,13 +1,11 @@ use std::fs::File; use std::path::{Path, PathBuf}; -use anyhow::Context; use chrono::{DateTime, Utc}; -use log::{error, info, warn}; +use log::{info, warn}; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; -use thiserror::Error; use tokio::fs::create_dir_all; use loaders::v1::MetadataV1; @@ -18,39 +16,28 @@ pub use handle_impl::*; pub use message::DumpMsg; use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle}; +use crate::index_controller::dump_actor::error::DumpActorError; use crate::{helpers::compression, option::IndexerOpts}; +use error::Result; mod actor; mod handle_impl; mod loaders; mod message; +pub mod error; const META_FILE_NAME: &str = "metadata.json"; -pub type DumpResult = std::result::Result; - -#[derive(Error, Debug)] -pub enum DumpError { - #[error("error with index: {0}")] - Error(#[from] anyhow::Error), - #[error("Heed error: {0}")] - HeedError(#[from] heed::Error), - #[error("dump already running")] - DumpAlreadyRunning, - #[error("dump `{0}` does not exist")] - DumpDoesNotExist(String), -} - #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait DumpActorHandle { /// Start the creation of a dump /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] - async fn create_dump(&self) -> DumpResult; + async fn create_dump(&self) -> Result; /// Return the status of an already created dump /// Implementation: [handle_impl::DumpActorHandleImpl::dump_status] - async fn dump_info(&self, uid: String) -> DumpResult; + async fn dump_info(&self, uid: String) -> Result; } #[derive(Debug, Serialize, Deserialize)] @@ -120,7 +107,7 @@ pub fn load_dump( index_db_size: usize, update_db_size: usize, indexer_opts: &IndexerOpts, -) -> anyhow::Result<()> { +) -> std::result::Result<(), Box> { let tmp_src = tempfile::tempdir_in(".")?; let tmp_src_path = tmp_src.path(); @@ -133,7 +120,9 @@ pub fn load_dump( let dst_dir = dst_path .as_ref() .parent() - .with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?; + // TODO + //.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?; + .unwrap(); let tmp_dst = tempfile::tempdir_in(dst_dir)?; @@ -175,7 +164,7 @@ where U: UuidResolverHandle + Send + Sync + Clone + 'static, P: UpdateActorHandle + Send + Sync + Clone + 'static, { - async fn run(self) -> anyhow::Result<()> { + async fn run(self) -> Result<()> { info!("Performing dump."); create_dir_all(&self.path).await?; @@ -196,9 +185,10 @@ where .dump(uuids, temp_dump_path.clone()) .await?; - let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let dump_path = tokio::task::spawn_blocking(move || -> Result { let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; - compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?; + compression::to_tar_gz(temp_dump_path, temp_dump_file.path()) + .map_err(|e| DumpActorError::Internal(e))?; let dump_path = self.path.join(self.uid).with_extension("dump"); temp_dump_file.persist(&dump_path)?; diff --git a/meilisearch-http/src/index_controller/error.rs b/meilisearch-http/src/index_controller/error.rs new file mode 100644 index 000000000..2931981ba --- /dev/null +++ b/meilisearch-http/src/index_controller/error.rs @@ -0,0 +1,40 @@ +use std::error::Error; + +use meilisearch_error::Code; +use meilisearch_error::ErrorCode; + +use super::dump_actor::error::DumpActorError; +use super::index_actor::error::IndexActorError; +use super::update_actor::error::UpdateActorError; +use super::uuid_resolver::UuidResolverError; + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum IndexControllerError { + #[error("Internal error: {0}")] + Internal(Box), + #[error("Missing index uid")] + MissingUid, + #[error("error resolving index uid: {0}")] + Uuid(#[from] UuidResolverError), + #[error("error with index: {0}")] + IndexActor(#[from] IndexActorError), + #[error("error with update: {0}")] + UpdateActor(#[from] UpdateActorError), + #[error("error with dump: {0}")] + DumpActor(#[from] DumpActorError), +} + +impl ErrorCode for IndexControllerError { + fn error_code(&self) -> Code { + match self { + IndexControllerError::Internal(_) => Code::Internal, + IndexControllerError::MissingUid => Code::InvalidIndexUid, + IndexControllerError::Uuid(e) => e.error_code(), + IndexControllerError::IndexActor(e) => e.error_code(), + IndexControllerError::UpdateActor(e) => e.error_code(), + IndexControllerError::DumpActor(e) => e.error_code(), + } + } +} diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index c35e685a8..c714cfd25 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -19,7 +19,8 @@ use crate::index_controller::{ }; use crate::option::IndexerOpts; -use super::{IndexError, IndexMeta, IndexMsg, IndexResult, IndexSettings, IndexStore}; +use super::{IndexMeta, IndexMsg, IndexSettings, IndexStore}; +use super::error::{Result, IndexActorError}; pub const CONCURRENT_INDEX_MSG: usize = 10; @@ -30,7 +31,7 @@ pub struct IndexActor { } impl IndexActor { - pub fn new(receiver: mpsc::Receiver, store: S) -> IndexResult { + pub fn new(receiver: mpsc::Receiver, store: S) -> std::result::Result> { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options)?; let update_handler = Arc::new(update_handler); @@ -137,20 +138,22 @@ impl IndexActor { } } - async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result { + async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> Result { let index = self .store .get(uuid) .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.perform_search(query)).await? + .ok_or(IndexActorError::UnexistingIndex)?; + let result = spawn_blocking(move || index.perform_search(query)).await??; + Ok(result) + } async fn handle_create_index( &self, uuid: Uuid, primary_key: Option, - ) -> IndexResult { + ) -> Result { let index = self.store.create(uuid, primary_key).await?; let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) @@ -161,7 +164,7 @@ impl IndexActor { uuid: Uuid, meta: Processing, data: Option, - ) -> IndexResult> { + ) -> Result> { debug!("Processing update {}", meta.id()); let update_handler = self.update_handler.clone(); let index = match self.store.get(uuid).await? { @@ -172,12 +175,12 @@ impl IndexActor { Ok(spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?) } - async fn handle_settings(&self, uuid: Uuid) -> IndexResult> { + async fn handle_settings(&self, uuid: Uuid) -> Result> { let index = self .store .get(uuid) .await? - .ok_or(IndexError::UnexistingIndex)?; + .ok_or(IndexActorError::UnexistingIndex)?; let result = spawn_blocking(move || index.settings()).await??; Ok(result) } @@ -188,12 +191,12 @@ impl IndexActor { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> IndexResult> { + ) -> Result> { let index = self .store .get(uuid) .await? - .ok_or(IndexError::UnexistingIndex)?; + .ok_or(IndexActorError::UnexistingIndex)?; let result = spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) .await??; @@ -206,12 +209,12 @@ impl IndexActor { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> IndexResult { + ) -> Result { let index = self .store .get(uuid) .await? - .ok_or(IndexError::UnexistingIndex)?; + .ok_or(IndexActorError::UnexistingIndex)?; let result = spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) @@ -220,7 +223,7 @@ impl IndexActor { Ok(result) } - async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> { + async fn handle_delete(&self, uuid: Uuid) -> Result<()> { let index = self.store.delete(uuid).await?; if let Some(index) = index { @@ -237,13 +240,13 @@ impl IndexActor { Ok(()) } - async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult { + async fn handle_get_meta(&self, uuid: Uuid) -> Result { match self.store.get(uuid).await? { Some(index) => { let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) } - None => Err(IndexError::UnexistingIndex), + None => Err(IndexActorError::UnexistingIndex), } } @@ -251,23 +254,23 @@ impl IndexActor { &self, uuid: Uuid, index_settings: IndexSettings, - ) -> IndexResult { + ) -> Result { let index = self .store .get(uuid) .await? - .ok_or(IndexError::UnexistingIndex)?; + .ok_or(IndexActorError::UnexistingIndex)?; let result = spawn_blocking(move || match index_settings.primary_key { Some(primary_key) => { let mut txn = index.write_txn()?; if index.primary_key(&txn)?.is_some() { - return Err(IndexError::ExistingPrimaryKey); + return Err(IndexActorError::ExistingPrimaryKey); } let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); builder.set_primary_key(primary_key); builder.execute(|_, _| ()) - .map_err(|e| IndexError::Internal(e.to_string()))?; + .map_err(|e| IndexActorError::Internal(Box::new(e)))?; let meta = IndexMeta::new_txn(&index, &txn)?; txn.commit()?; Ok(meta) @@ -282,7 +285,7 @@ impl IndexActor { Ok(result) } - async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> { + async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { use tokio::fs::create_dir_all; path.push("indexes"); @@ -294,7 +297,7 @@ impl IndexActor { create_dir_all(&index_path).await?; index_path.push("data.mdb"); - spawn_blocking(move || -> anyhow::Result<()> { + spawn_blocking(move || -> Result<()> { // Get write txn to wait for ongoing write transaction before snapshot. let _txn = index.write_txn()?; index @@ -310,12 +313,12 @@ impl IndexActor { /// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the /// documents and all the settings. - async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let index = self .store .get(uuid) .await? - .ok_or(IndexError::UnexistingIndex)?; + .ok_or(IndexActorError::UnexistingIndex)?; let path = path.join(format!("indexes/index-{}/", uuid)); fs::create_dir_all(&path).await?; @@ -325,12 +328,12 @@ impl IndexActor { Ok(()) } - async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult { + async fn handle_get_stats(&self, uuid: Uuid) -> Result { let index = self .store .get(uuid) .await? - .ok_or(IndexError::UnexistingIndex)?; + .ok_or(IndexActorError::UnexistingIndex)?; spawn_blocking(move || { let rtxn = index.read_txn()?; @@ -338,9 +341,10 @@ impl IndexActor { Ok(IndexStats { size: index.size(), number_of_documents: index.number_of_documents(&rtxn) - .map_err(|e| IndexError::Internal(e.to_string()))?, + .map_err(|e| IndexActorError::Internal(Box::new(e)))?, is_indexing: None, - fields_distribution: index.fields_distribution(&rtxn)?, + fields_distribution: index.fields_distribution(&rtxn) + .map_err(|e| IndexActorError::Internal(e.into()))?, }) }) .await? diff --git a/meilisearch-http/src/index_controller/index_actor/error.rs b/meilisearch-http/src/index_controller/index_actor/error.rs new file mode 100644 index 000000000..a124d8b1e --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/error.rs @@ -0,0 +1,49 @@ +use meilisearch_error::{Code, ErrorCode}; + +use crate::index::error::IndexError; + +pub type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum IndexActorError { + #[error("index error: {0}")] + IndexError(#[from] IndexError), + #[error("index already exists")] + IndexAlreadyExists, + #[error("Index doesn't exists")] + UnexistingIndex, + #[error("Existing primary key")] + ExistingPrimaryKey, + #[error("Internal Index Error: {0}")] + Internal(Box), +} + +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for IndexActorError { + fn from(other: $other) -> Self { + Self::Internal(Box::new(other)) + } + } + )* + } +} + +internal_error!( + heed::Error, + tokio::task::JoinError, + std::io::Error +); + +impl ErrorCode for IndexActorError { + fn error_code(&self) -> Code { + match self { + IndexActorError::IndexError(e) => e.error_code(), + IndexActorError::IndexAlreadyExists => Code::IndexAlreadyExists, + IndexActorError::UnexistingIndex => Code::IndexNotFound, + IndexActorError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent, + IndexActorError::Internal(_) => Code::Internal, + } + } +} diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 6bf83c647..6afd409d2 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -12,7 +12,8 @@ use crate::{ index_controller::{Failed, Processed}, }; -use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, IndexResult, MapIndexStore}; +use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore}; +use super::error::Result; #[derive(Clone)] pub struct IndexActorHandleImpl { @@ -25,7 +26,7 @@ impl IndexActorHandle for IndexActorHandleImpl { &self, uuid: Uuid, primary_key: Option, - ) -> IndexResult { + ) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::CreateIndex { ret, @@ -41,7 +42,7 @@ impl IndexActorHandle for IndexActorHandleImpl { uuid: Uuid, meta: Processing, data: Option, - ) -> anyhow::Result> { + ) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Update { ret, @@ -53,14 +54,14 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult { + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Search { uuid, query, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn settings(&self, uuid: Uuid) -> IndexResult> { + async fn settings(&self, uuid: Uuid) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Settings { uuid, ret }; let _ = self.sender.send(msg).await; @@ -73,7 +74,7 @@ impl IndexActorHandle for IndexActorHandleImpl { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> IndexResult> { + ) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Documents { uuid, @@ -91,7 +92,7 @@ impl IndexActorHandle for IndexActorHandleImpl { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> IndexResult { + ) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Document { uuid, @@ -103,14 +104,14 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn delete(&self, uuid: Uuid) -> IndexResult<()> { + async fn delete(&self, uuid: Uuid) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Delete { uuid, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn get_index_meta(&self, uuid: Uuid) -> IndexResult { + async fn get_index_meta(&self, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetMeta { uuid, ret }; let _ = self.sender.send(msg).await; @@ -121,7 +122,7 @@ impl IndexActorHandle for IndexActorHandleImpl { &self, uuid: Uuid, index_settings: IndexSettings, - ) -> IndexResult { + ) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::UpdateIndex { uuid, @@ -132,21 +133,21 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Snapshot { uuid, path, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Dump { uuid, path, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { + async fn get_index_stats(&self, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetStats { uuid, ret }; let _ = self.sender.send(msg).await; @@ -155,7 +156,7 @@ impl IndexActorHandle for IndexActorHandleImpl { } impl IndexActorHandleImpl { - pub fn new(path: impl AsRef, index_size: usize) -> anyhow::Result { + pub fn new(path: impl AsRef, index_size: usize) -> std::result::Result> { let (sender, receiver) = mpsc::channel(100); let store = MapIndexStore::new(path, index_size); diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index e7304d56c..d9c0dabd9 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -5,8 +5,9 @@ use uuid::Uuid; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index_controller::{Failed, IndexStats, Processed, Processing}; +use super::error::Result as IndexResult; -use super::{IndexMeta, IndexResult, IndexSettings}; +use super::{IndexMeta, IndexSettings}; #[allow(clippy::large_enum_variant)] pub enum IndexMsg { @@ -24,7 +25,7 @@ pub enum IndexMsg { Search { uuid: Uuid, query: SearchQuery, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Settings { uuid: Uuid, diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index b54a676b0..a66f1b9ea 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -5,7 +5,6 @@ use chrono::{DateTime, Utc}; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; -use thiserror::Error; use uuid::Uuid; use actor::IndexActor; @@ -16,6 +15,9 @@ use store::{IndexStore, MapIndexStore}; use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; use crate::index_controller::{Failed, IndexStats, Processed, Processing}; +use error::Result; + +use self::error::IndexActorError; use super::IndexSettings; @@ -23,8 +25,7 @@ mod actor; mod handle_impl; mod message; mod store; - -pub type IndexResult = std::result::Result; +pub mod error; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -35,18 +36,18 @@ pub struct IndexMeta { } impl IndexMeta { - fn new(index: &Index) -> IndexResult { + fn new(index: &Index) -> Result { let txn = index.read_txn()?; Self::new_txn(index, &txn) } - fn new_txn(index: &Index, txn: &heed::RoTxn) -> IndexResult { + fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { let created_at = index .created_at(&txn) - .map_err(|e| IndexError::Internal(e.to_string()))?; + .map_err(|e| IndexActorError::Internal(Box::new(e)))?; let updated_at = index .updated_at(&txn) - .map_err(|e| IndexError::Internal(e.to_string()))?; + .map_err(|e| IndexActorError::Internal(Box::new(e)))?; let primary_key = index.primary_key(&txn)?.map(String::from); Ok(Self { created_at, @@ -56,50 +57,19 @@ impl IndexMeta { } } -#[derive(Error, Debug)] -pub enum IndexError { - #[error("index already exists")] - IndexAlreadyExists, - #[error("Index doesn't exists")] - UnexistingIndex, - #[error("Existing primary key")] - ExistingPrimaryKey, - #[error("Internal Index Error: {0}")] - Internal(String), -} - -macro_rules! internal_error { - ($($other:path), *) => { - $( - impl From<$other> for IndexError { - fn from(other: $other) -> Self { - Self::Internal(other.to_string()) - } - } - )* - } -} - -internal_error!( - anyhow::Error, - heed::Error, - tokio::task::JoinError, - std::io::Error -); - #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait IndexActorHandle { async fn create_index(&self, uuid: Uuid, primary_key: Option) - -> IndexResult; + -> Result; async fn update( &self, uuid: Uuid, meta: Processing, data: Option, - ) -> anyhow::Result>; - async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult; - async fn settings(&self, uuid: Uuid) -> IndexResult>; + ) -> Result>; + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result; + async fn settings(&self, uuid: Uuid) -> Result>; async fn documents( &self, @@ -107,23 +77,23 @@ pub trait IndexActorHandle { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> IndexResult>; + ) -> Result>; async fn document( &self, uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> IndexResult; - async fn delete(&self, uuid: Uuid) -> IndexResult<()>; - async fn get_index_meta(&self, uuid: Uuid) -> IndexResult; + ) -> Result; + async fn delete(&self, uuid: Uuid) -> Result<()>; + async fn get_index_meta(&self, uuid: Uuid) -> Result; async fn update_index( &self, uuid: Uuid, index_settings: IndexSettings, - ) -> IndexResult; - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; - async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; - async fn get_index_stats(&self, uuid: Uuid) -> IndexResult; + ) -> Result; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; + async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()>; + async fn get_index_stats(&self, uuid: Uuid) -> Result; } #[cfg(test)] @@ -139,7 +109,7 @@ mod test { &self, uuid: Uuid, primary_key: Option, - ) -> IndexResult { + ) -> Result { self.as_ref().create_index(uuid, primary_key).await } @@ -148,15 +118,15 @@ mod test { uuid: Uuid, meta: Processing, data: Option, - ) -> anyhow::Result> { + ) -> Result> { self.as_ref().update(uuid, meta, data).await } - async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult { + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { self.as_ref().search(uuid, query).await } - async fn settings(&self, uuid: Uuid) -> IndexResult> { + async fn settings(&self, uuid: Uuid) -> Result> { self.as_ref().settings(uuid).await } @@ -166,7 +136,7 @@ mod test { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> IndexResult> { + ) -> Result> { self.as_ref() .documents(uuid, offset, limit, attributes_to_retrieve) .await @@ -177,17 +147,17 @@ mod test { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> IndexResult { + ) -> Result { self.as_ref() .document(uuid, doc_id, attributes_to_retrieve) .await } - async fn delete(&self, uuid: Uuid) -> IndexResult<()> { + async fn delete(&self, uuid: Uuid) -> Result<()> { self.as_ref().delete(uuid).await } - async fn get_index_meta(&self, uuid: Uuid) -> IndexResult { + async fn get_index_meta(&self, uuid: Uuid) -> Result { self.as_ref().get_index_meta(uuid).await } @@ -195,19 +165,19 @@ mod test { &self, uuid: Uuid, index_settings: IndexSettings, - ) -> IndexResult { + ) -> Result { self.as_ref().update_index(uuid, index_settings).await } - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { self.as_ref().snapshot(uuid, path).await } - async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { self.as_ref().dump(uuid, path).await } - async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { + async fn get_index_stats(&self, uuid: Uuid) -> Result { self.as_ref().get_index_stats(uuid).await } } diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 39a6e64a6..4c2aed622 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -8,16 +8,16 @@ use tokio::sync::RwLock; use tokio::task::spawn_blocking; use uuid::Uuid; -use super::{IndexError, IndexResult}; +use super::error::{IndexActorError, Result}; use crate::index::Index; type AsyncMap = Arc>>; #[async_trait::async_trait] pub trait IndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult; - async fn get(&self, uuid: Uuid) -> IndexResult>; - async fn delete(&self, uuid: Uuid) -> IndexResult>; + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn get(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: Uuid) -> Result>; } pub struct MapIndexStore { @@ -40,7 +40,7 @@ impl MapIndexStore { #[async_trait::async_trait] impl IndexStore for MapIndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult { + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { // We need to keep the lock until we are sure the db file has been opened correclty, to // ensure that another db is not created at the same time. let mut lock = self.index_store.write().await; @@ -50,11 +50,11 @@ impl IndexStore for MapIndexStore { } let path = self.path.join(format!("index-{}", uuid)); if path.exists() { - return Err(IndexError::IndexAlreadyExists); + return Err(IndexActorError::IndexAlreadyExists); } let index_size = self.index_size; - let index = spawn_blocking(move || -> IndexResult { + let index = spawn_blocking(move || -> Result { let index = Index::open(path, index_size)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; @@ -62,7 +62,7 @@ impl IndexStore for MapIndexStore { let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); builder.set_primary_key(primary_key); builder.execute(|_, _| ()) - .map_err(|e| IndexError::Internal(e.to_string()))?; + .map_err(|e| IndexActorError::Internal(Box::new(e)))?; txn.commit()?; } @@ -75,7 +75,7 @@ impl IndexStore for MapIndexStore { Ok(index) } - async fn get(&self, uuid: Uuid) -> IndexResult> { + async fn get(&self, uuid: Uuid) -> Result> { let guard = self.index_store.read().await; match guard.get(&uuid) { Some(index) => Ok(Some(index.clone())), @@ -95,7 +95,7 @@ impl IndexStore for MapIndexStore { } } - async fn delete(&self, uuid: Uuid) -> IndexResult> { + async fn delete(&self, uuid: Uuid) -> Result> { let db_path = self.path.join(format!("index-{}", uuid)); fs::remove_dir_all(db_path).await?; let index = self.index_store.write().await.remove(&uuid); diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 0c801558b..0046bd6a5 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use actix_web::web::{Bytes, Payload}; -use anyhow::bail; use chrono::{DateTime, Utc}; use futures::stream::StreamExt; use log::info; @@ -24,8 +23,10 @@ use uuid_resolver::{UuidResolverError, UuidResolverHandle}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::option::Opt; +use error::Result; use self::dump_actor::load_dump; +use self::error::IndexControllerError; mod dump_actor; mod index_actor; @@ -33,6 +34,7 @@ mod snapshot; mod update_actor; mod updates; mod uuid_resolver; +pub mod error; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -81,7 +83,7 @@ pub struct Stats { } impl IndexController { - pub fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { + pub fn new(path: impl AsRef, options: &Opt) -> std::result::Result> { let index_size = options.max_mdb_size.get_bytes() as usize; let update_store_size = options.max_udb_size.get_bytes() as usize; @@ -151,7 +153,7 @@ impl IndexController { format: milli::update::UpdateFormat, payload: Payload, primary_key: Option, - ) -> anyhow::Result { + ) -> Result { let perform_update = |uuid| async move { let meta = UpdateMeta::DocumentsAddition { method, @@ -189,7 +191,7 @@ impl IndexController { } } - pub async fn clear_documents(&self, uid: String) -> anyhow::Result { + pub async fn clear_documents(&self, uid: String) -> Result { let uuid = self.uuid_resolver.get(uid).await?; let meta = UpdateMeta::ClearDocuments; let (_, receiver) = mpsc::channel(1); @@ -201,7 +203,7 @@ impl IndexController { &self, uid: String, documents: Vec, - ) -> anyhow::Result { + ) -> Result { let uuid = self.uuid_resolver.get(uid).await?; let meta = UpdateMeta::DeleteDocuments { ids: documents }; let (_, receiver) = mpsc::channel(1); @@ -214,7 +216,7 @@ impl IndexController { uid: String, settings: Settings, create: bool, - ) -> anyhow::Result { + ) -> Result { let perform_udpate = |uuid| async move { let meta = UpdateMeta::Settings(settings.into_unchecked()); // Nothing so send, drop the sender right away, as not to block the update actor. @@ -239,9 +241,9 @@ impl IndexController { pub async fn create_index( &self, index_settings: IndexSettings, - ) -> anyhow::Result { + ) -> Result { let IndexSettings { uid, primary_key } = index_settings; - let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?; + let uid = uid.ok_or(IndexControllerError::MissingUid)?; let uuid = Uuid::new_v4(); let meta = self.index_handle.create_index(uuid, primary_key).await?; self.uuid_resolver.insert(uid.clone(), uuid).await?; @@ -255,26 +257,26 @@ impl IndexController { Ok(meta) } - pub async fn delete_index(&self, uid: String) -> anyhow::Result<()> { + pub async fn delete_index(&self, uid: String) -> Result<()> { let uuid = self.uuid_resolver.delete(uid).await?; self.update_handle.delete(uuid).await?; self.index_handle.delete(uuid).await?; Ok(()) } - pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result { + pub async fn update_status(&self, uid: String, id: u64) -> Result { let uuid = self.uuid_resolver.get(uid).await?; let result = self.update_handle.update_status(uuid, id).await?; Ok(result) } - pub async fn all_update_status(&self, uid: String) -> anyhow::Result> { + pub async fn all_update_status(&self, uid: String) -> Result> { let uuid = self.uuid_resolver.get(uid).await?; let result = self.update_handle.get_all_updates_status(uuid).await?; Ok(result) } - pub async fn list_indexes(&self) -> anyhow::Result> { + pub async fn list_indexes(&self) -> Result> { let uuids = self.uuid_resolver.list().await?; let mut ret = Vec::new(); @@ -293,7 +295,7 @@ impl IndexController { Ok(ret) } - pub async fn settings(&self, uid: String) -> anyhow::Result> { + pub async fn settings(&self, uid: String) -> Result> { let uuid = self.uuid_resolver.get(uid.clone()).await?; let settings = self.index_handle.settings(uuid).await?; Ok(settings) @@ -305,7 +307,7 @@ impl IndexController { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> anyhow::Result> { + ) -> Result> { let uuid = self.uuid_resolver.get(uid.clone()).await?; let documents = self .index_handle @@ -319,7 +321,7 @@ impl IndexController { uid: String, doc_id: String, attributes_to_retrieve: Option>, - ) -> anyhow::Result { + ) -> Result { let uuid = self.uuid_resolver.get(uid.clone()).await?; let document = self .index_handle @@ -332,9 +334,9 @@ impl IndexController { &self, uid: String, index_settings: IndexSettings, - ) -> anyhow::Result { + ) -> Result { if index_settings.uid.is_some() { - bail!("Can't change the index uid.") + todo!("Can't change the index uid.") } let uuid = self.uuid_resolver.get(uid.clone()).await?; @@ -348,13 +350,13 @@ impl IndexController { Ok(meta) } - pub async fn search(&self, uid: String, query: SearchQuery) -> anyhow::Result { + pub async fn search(&self, uid: String, query: SearchQuery) -> Result { let uuid = self.uuid_resolver.get(uid).await?; let result = self.index_handle.search(uuid, query).await?; Ok(result) } - pub async fn get_index(&self, uid: String) -> anyhow::Result { + pub async fn get_index(&self, uid: String) -> Result { let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { @@ -366,11 +368,11 @@ impl IndexController { Ok(meta) } - pub async fn get_uuids_size(&self) -> anyhow::Result { + pub async fn get_uuids_size(&self) -> Result { Ok(self.uuid_resolver.get_size().await?) } - pub async fn get_index_stats(&self, uid: String) -> anyhow::Result { + pub async fn get_index_stats(&self, uid: String) -> Result { let uuid = self.uuid_resolver.get(uid).await?; let update_infos = self.update_handle.get_info().await?; let mut stats = self.index_handle.get_index_stats(uuid).await?; @@ -379,7 +381,7 @@ impl IndexController { Ok(stats) } - pub async fn get_all_stats(&self) -> anyhow::Result { + pub async fn get_all_stats(&self) -> Result { let update_infos = self.update_handle.get_info().await?; let mut database_size = self.get_uuids_size().await? + update_infos.size; let mut last_update: Option> = None; @@ -405,11 +407,11 @@ impl IndexController { }) } - pub async fn create_dump(&self) -> anyhow::Result { + pub async fn create_dump(&self) -> Result { Ok(self.dump_handle.create_dump().await?) } - pub async fn dump_info(&self, uid: String) -> anyhow::Result { + pub async fn dump_info(&self, uid: String) -> Result { Ok(self.dump_handle.dump_info(uid).await?) } } diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index daef7d582..171c08eee 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -1,7 +1,6 @@ use std::path::{Path, PathBuf}; use std::time::Duration; -use anyhow::bail; use log::{error, info}; use tokio::fs; use tokio::task::spawn_blocking; @@ -53,7 +52,7 @@ where } } - async fn perform_snapshot(&self) -> anyhow::Result<()> { + async fn perform_snapshot(&self) -> std::result::Result<(), Box> { info!("Performing snapshot."); let snapshot_dir = self.snapshot_path.clone(); @@ -78,7 +77,7 @@ where let snapshot_path = self .snapshot_path .join(format!("{}.snapshot", self.db_name)); - let snapshot_path = spawn_blocking(move || -> anyhow::Result { + let snapshot_path = spawn_blocking(move || -> Result> { let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; @@ -98,7 +97,7 @@ pub fn load_snapshot( snapshot_path: impl AsRef, ignore_snapshot_if_db_exists: bool, ignore_missing_snapshot: bool, -) -> anyhow::Result<()> { +) -> std::result::Result<(), Box> { if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() { match compression::from_tar_gz(snapshot_path, &db_path) { Ok(()) => Ok(()), @@ -109,7 +108,7 @@ pub fn load_snapshot( } } } else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists { - bail!( + todo!( "database already exists at {:?}, try to delete it or rename it", db_path .as_ref() @@ -117,7 +116,7 @@ pub fn load_snapshot( .unwrap_or_else(|_| db_path.as_ref().to_owned()) ) } else if !snapshot_path.as_ref().exists() && !ignore_missing_snapshot { - bail!( + todo!( "snapshot doesn't exist at {:?}", snapshot_path .as_ref() @@ -142,7 +141,7 @@ mod test { use super::*; use crate::index_controller::index_actor::MockIndexActorHandle; use crate::index_controller::update_actor::{ - MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError, + MockUpdateActorHandle, UpdateActorHandleImpl, error::UpdateActorError, }; use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidResolverError}; @@ -224,7 +223,7 @@ mod test { update_handle .expect_snapshot() // abitrary error - .returning(|_, _| Box::pin(err(UpdateError::UnexistingUpdate(0)))); + .returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); let snapshot_path = tempfile::tempdir_in(".").unwrap(); let snapshot_service = SnapshotService::new( diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index eebbf6247..fa8251a0c 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -13,7 +13,8 @@ use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; use uuid::Uuid; -use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; +use super::{PayloadData, UpdateMsg, UpdateStore, UpdateStoreInfo}; +use super::error::{Result, UpdateActorError}; use crate::index_controller::index_actor::IndexActorHandle; use crate::index_controller::{UpdateMeta, UpdateStatus}; @@ -35,7 +36,7 @@ where inbox: mpsc::Receiver>, path: impl AsRef, index_handle: I, - ) -> anyhow::Result { + ) -> std::result::Result> { let path = path.as_ref().join("updates"); std::fs::create_dir_all(&path)?; @@ -202,7 +203,7 @@ where tokio::task::spawn_blocking(move || { let result = store .meta(uuid, id)? - .ok_or(UpdateError::UnexistingUpdate(id))?; + .ok_or(UpdateActorError::UnexistingUpdate(id))?; Ok(result) }) .await? @@ -230,7 +231,7 @@ where let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + tokio::task::spawn_blocking(move || -> Result<()> { update_store.dump(&uuids, path.to_path_buf(), index_handle)?; Ok(()) }) @@ -241,7 +242,7 @@ where async fn handle_get_info(&self) -> Result { let update_store = self.store.clone(); - let info = tokio::task::spawn_blocking(move || -> anyhow::Result { + let info = tokio::task::spawn_blocking(move || -> Result { let info = update_store.get_info()?; Ok(info) }) diff --git a/meilisearch-http/src/index_controller/update_actor/error.rs b/meilisearch-http/src/index_controller/update_actor/error.rs new file mode 100644 index 000000000..ee68dd997 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/error.rs @@ -0,0 +1,64 @@ +use std::error::Error; + +use meilisearch_error::{Code, ErrorCode}; + +use crate::index_controller::index_actor::error::IndexActorError; + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum UpdateActorError { + #[error("Update {0} doesn't exist.")] + UnexistingUpdate(u64), + #[error("Internal error processing update: {0}")] + Internal(Box), + #[error("error with index: {0}")] + IndexActor(#[from] IndexActorError), + #[error( + "Update store was shut down due to a fatal error, please check your logs for more info." + )] + FatalUpdateStoreError, +} + +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for UpdateActorError { + fn from(other: $other) -> Self { + Self::Internal(Box::new(other)) + } + } + )* + } +} + +impl From> for UpdateActorError { + fn from(_: tokio::sync::mpsc::error::SendError) -> Self { + Self::FatalUpdateStoreError + } +} + +impl From for UpdateActorError { + fn from(_: tokio::sync::oneshot::error::RecvError) -> Self { + Self::FatalUpdateStoreError + } +} + +internal_error!( + heed::Error, + std::io::Error, + serde_json::Error, + actix_http::error::PayloadError, + tokio::task::JoinError +); + +impl ErrorCode for UpdateActorError { + fn error_code(&self) -> Code { + match self { + UpdateActorError::UnexistingUpdate(_) => Code::NotFound, + UpdateActorError::Internal(_) => Code::Internal, + UpdateActorError::IndexActor(e) => e.error_code(), + UpdateActorError::FatalUpdateStoreError => Code::Internal, + } + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index 7844bf855..f5331554b 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -6,10 +6,8 @@ use uuid::Uuid; use crate::index_controller::{IndexActorHandle, UpdateStatus}; -use super::{ - PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateError, UpdateMeta, UpdateMsg, - UpdateStoreInfo, -}; +use super::error::Result; +use super::{PayloadData, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo}; #[derive(Clone)] pub struct UpdateActorHandleImpl { @@ -24,7 +22,7 @@ where index_handle: I, path: impl AsRef, update_store_size: usize, - ) -> anyhow::Result + ) -> std::result::Result> where I: IndexActorHandle + Clone + Send + Sync + 'static, { @@ -48,72 +46,42 @@ where async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::ListUpdates { uuid, ret }; - self.sender - .send(msg) - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)?; - receiver - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)? + self.sender.send(msg).await?; + receiver.await? } async fn update_status(&self, uuid: Uuid, id: u64) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::GetUpdate { uuid, id, ret }; - self.sender - .send(msg) - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)?; - receiver - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)? + self.sender.send(msg).await?; + receiver.await? } async fn delete(&self, uuid: Uuid) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Delete { uuid, ret }; - self.sender - .send(msg) - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)?; - receiver - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)? + self.sender.send(msg).await?; + receiver.await? } async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Snapshot { uuids, path, ret }; - self.sender - .send(msg) - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)?; - receiver - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)? + self.sender.send(msg).await?; + receiver.await? } async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Dump { uuids, path, ret }; - self.sender - .send(msg) - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)?; - receiver - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)? + self.sender.send(msg).await?; + receiver.await? } async fn get_info(&self) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::GetInfo { ret }; - self.sender - .send(msg) - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)?; - receiver - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)? + self.sender.send(msg).await?; + receiver.await? } async fn update( @@ -129,12 +97,7 @@ where meta, ret, }; - self.sender - .send(msg) - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)?; - receiver - .await - .map_err(|_| UpdateError::FatalUpdateStoreError)? + self.sender.send(msg).await?; + receiver.await? } } diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 37df2af32..48a88b097 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -4,7 +4,8 @@ use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use super::{PayloadData, Result, UpdateMeta, UpdateStatus, UpdateStoreInfo}; +use super::{PayloadData, UpdateMeta, UpdateStatus, UpdateStoreInfo}; +use super::error::Result; pub enum UpdateMsg { Update { diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index b854cca70..42be99cf8 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,12 +1,6 @@ -mod actor; -mod handle_impl; -mod message; -pub mod store; - use std::{collections::HashSet, path::PathBuf}; use actix_http::error::PayloadError; -use thiserror::Error; use tokio::sync::mpsc; use uuid::Uuid; @@ -14,49 +8,22 @@ use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; +use error::Result; pub use handle_impl::UpdateActorHandleImpl; pub use store::{UpdateStore, UpdateStoreInfo}; -pub type Result = std::result::Result; +mod actor; +mod handle_impl; +mod message; +pub mod error; +pub mod store; + type PayloadData = std::result::Result; #[cfg(test)] use mockall::automock; -#[derive(Debug, Error)] -pub enum UpdateError { - #[error("Update {0} doesn't exist.")] - UnexistingUpdate(u64), - #[error("Internal error processing update: {0}")] - Internal(String), - #[error( - "Update store was shut down due to a fatal error, please check your logs for more info." - )] - FatalUpdateStoreError, -} - -macro_rules! internal_error { - ($($other:path), *) => { - $( - impl From<$other> for UpdateError { - fn from(other: $other) -> Self { - Self::Internal(other.to_string()) - } - } - )* - } -} - -internal_error!( - heed::Error, - std::io::Error, - serde_json::Error, - PayloadError, - tokio::task::JoinError, - anyhow::Error -); - #[async_trait::async_trait] #[cfg_attr(test, automock(type Data=Vec;))] pub trait UpdateActorHandle { diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs index e7f36a2a1..a621feebe 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -9,7 +9,7 @@ use heed::{EnvOpenOptions, RoTxn}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{State, UpdateStore}; +use super::{State, UpdateStore, Result}; use crate::index_controller::{ index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued, UpdateStatus, @@ -27,7 +27,7 @@ impl UpdateStore { uuids: &HashSet, path: PathBuf, handle: impl IndexActorHandle, - ) -> anyhow::Result<()> { + ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Dumping); @@ -52,7 +52,7 @@ impl UpdateStore { txn: &RoTxn, uuids: &HashSet, path: impl AsRef, - ) -> anyhow::Result<()> { + ) -> Result<()> { let dump_data_path = path.as_ref().join("data.jsonl"); let mut dump_data_file = File::create(dump_data_path)?; @@ -71,7 +71,7 @@ impl UpdateStore { uuids: &HashSet, mut file: &mut File, dst_path: impl AsRef, - ) -> anyhow::Result<()> { + ) -> Result<()> { let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); for pending in pendings { @@ -103,7 +103,7 @@ impl UpdateStore { txn: &RoTxn, uuids: &HashSet, mut file: &mut File, - ) -> anyhow::Result<()> { + ) -> Result<()> { let updates = self.updates.iter(txn)?.lazily_decode_data(); for update in updates { @@ -125,7 +125,7 @@ impl UpdateStore { src: impl AsRef, dst: impl AsRef, db_size: usize, - ) -> anyhow::Result<()> { + ) -> std::result::Result<(), Box> { let dst_update_path = dst.as_ref().join("updates/"); create_dir_all(&dst_update_path)?; @@ -175,7 +175,7 @@ async fn dump_indexes( uuids: &HashSet, handle: impl IndexActorHandle, path: impl AsRef, -) -> anyhow::Result<()> { +) -> Result<()> { for uuid in uuids { handle.dump(*uuid, path.as_ref().to_owned()).await?; } diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index e7b719fc9..429ce362b 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -24,8 +24,9 @@ use uuid::Uuid; use codec::*; use super::UpdateMeta; +use super::error::Result; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; -use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult}; +use crate::helpers::EnvSizer; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; @@ -109,7 +110,7 @@ impl UpdateStore { fn new( mut options: EnvOpenOptions, path: impl AsRef, - ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { + ) -> std::result::Result<(Self, mpsc::Receiver<()>), Box> { options.max_dbs(5); let env = options.open(&path)?; @@ -140,7 +141,7 @@ impl UpdateStore { path: impl AsRef, index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, must_exit: Arc, - ) -> anyhow::Result> { + ) -> std::result::Result, Box> { let (update_store, mut notification_receiver) = Self::new(options, path)?; let update_store = Arc::new(update_store); @@ -285,7 +286,7 @@ impl UpdateStore { fn process_pending_update( &self, index_handle: impl IndexActorHandle, - ) -> anyhow::Result> { + ) -> Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; @@ -320,7 +321,7 @@ impl UpdateStore { index_handle: impl IndexActorHandle, index_uuid: Uuid, global_id: u64, - ) -> anyhow::Result> { + ) -> Result> { let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); let update_id = processing.id(); @@ -368,7 +369,7 @@ impl UpdateStore { } /// List the updates for `index_uuid`. - pub fn list(&self, index_uuid: Uuid) -> anyhow::Result> { + pub fn list(&self, index_uuid: Uuid) -> Result> { let mut update_list = BTreeMap::::new(); let txn = self.env.read_txn()?; @@ -437,7 +438,7 @@ impl UpdateStore { } /// Delete all updates for an index from the update store. - pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> { + pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> { let mut txn = self.env.write_txn()?; // Contains all the content file paths that we need to be removed if the deletion was successful. let mut uuids_to_remove = Vec::new(); @@ -488,7 +489,7 @@ impl UpdateStore { uuids: &HashSet, path: impl AsRef, handle: impl IndexActorHandle + Clone, - ) -> anyhow::Result<()> { + ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); @@ -535,13 +536,13 @@ impl UpdateStore { while let Some(res) = stream.next().await { res?; } - Ok(()) as IndexResult<()> + Ok(()) as Result<()> })?; Ok(()) } - pub fn get_info(&self) -> anyhow::Result { + pub fn get_info(&self) -> Result { let mut size = self.env.size(); let txn = self.env.read_txn()?; for entry in self.pending_queue.iter(&txn)? { diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs index af710dd87..1296264e0 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -12,7 +12,7 @@ pub struct UuidResolverHandleImpl { } impl UuidResolverHandleImpl { - pub fn new(path: impl AsRef) -> anyhow::Result { + pub fn new(path: impl AsRef) -> Result { let (sender, reveiver) = mpsc::channel(100); let store = HeedUuidStore::new(path)?; let actor = UuidResolverActor::new(reveiver, store); @@ -32,7 +32,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } - async fn delete(&self, name: String) -> anyhow::Result { + async fn delete(&self, name: String) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Delete { uid: name, ret }; let _ = self.sender.send(msg).await; @@ -41,7 +41,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } - async fn list(&self) -> anyhow::Result> { + async fn list(&self) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::List { ret }; let _ = self.sender.send(msg).await; @@ -50,7 +50,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } - async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> { + async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Insert { ret, name, uuid }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index 3c3b5fd06..67d77d411 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -6,6 +6,8 @@ pub mod store; use std::collections::HashSet; use std::path::PathBuf; +use meilisearch_error::Code; +use meilisearch_error::ErrorCode; use thiserror::Error; use uuid::Uuid; @@ -27,9 +29,9 @@ pub type Result = std::result::Result; #[cfg_attr(test, automock)] pub trait UuidResolverHandle { async fn get(&self, name: String) -> Result; - async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>; - async fn delete(&self, name: String) -> anyhow::Result; - async fn list(&self) -> anyhow::Result>; + async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; + async fn delete(&self, name: String) -> Result; + async fn list(&self) -> Result>; async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; async fn dump(&self, path: PathBuf) -> Result>; @@ -44,7 +46,7 @@ pub enum UuidResolverError { #[error("Badly formatted index uid: {0}")] BadlyFormatted(String), #[error("Internal error resolving index uid: {0}")] - Internal(String), + Internal(Box), } macro_rules! internal_error { @@ -52,7 +54,7 @@ macro_rules! internal_error { $( impl From<$other> for UuidResolverError { fn from(other: $other) -> Self { - Self::Internal(other.to_string()) + Self::Internal(Box::new(other)) } } )* @@ -66,3 +68,14 @@ internal_error!( tokio::task::JoinError, serde_json::Error ); + +impl ErrorCode for UuidResolverError { + fn error_code(&self) -> Code { + match self { + UuidResolverError::NameAlreadyExist => Code::IndexAlreadyExists, + UuidResolverError::UnexistingIndex(_) => Code::IndexNotFound, + UuidResolverError::BadlyFormatted(_) => Code::InvalidIndexUid, + UuidResolverError::Internal(_) => Code::Internal, + } + } +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index bab223bb3..88d707797 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -39,7 +39,7 @@ pub struct HeedUuidStore { } impl HeedUuidStore { - pub fn new(path: impl AsRef) -> anyhow::Result { + pub fn new(path: impl AsRef) -> Result { let path = path.as_ref().join(UUIDS_DB_PATH); create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); @@ -153,7 +153,7 @@ impl HeedUuidStore { Ok(uuids) } - pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> Result<()> { let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH); std::fs::create_dir_all(&uuid_resolver_path)?; diff --git a/meilisearch-http/src/routes/search.rs b/meilisearch-http/src/routes/search.rs index 36f5bdf4d..3ae38bb21 100644 --- a/meilisearch-http/src/routes/search.rs +++ b/meilisearch-http/src/routes/search.rs @@ -31,9 +31,9 @@ pub struct SearchQueryGet { } impl TryFrom for SearchQuery { - type Error = anyhow::Error; + type Error = Box; - fn try_from(other: SearchQueryGet) -> anyhow::Result { + fn try_from(other: SearchQueryGet) -> Result { let attributes_to_retrieve = other .attributes_to_retrieve .map(|attrs| attrs.split(',').map(String::from).collect::>());