From 7ad553670fe0bedac02982dce047b76848761713 Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Mon, 24 May 2021 17:20:44 +0200 Subject: [PATCH] index error handling --- .../src/index_controller/index_actor/actor.rs | 91 +++++++++---------- .../src/index_controller/index_actor/mod.rs | 20 +++- .../src/index_controller/index_actor/store.rs | 14 +-- 3 files changed, 63 insertions(+), 62 deletions(-) diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 1f0091265..0e2e63468 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -30,10 +30,14 @@ pub struct IndexActor { impl IndexActor { pub fn new(receiver: mpsc::Receiver, store: S) -> IndexResult { let options = IndexerOpts::default(); - let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; + let update_handler = UpdateHandler::new(&options)?; let update_handler = Arc::new(update_handler); let receiver = Some(receiver); - Ok(Self { receiver, update_handler, store }) + Ok(Self { + receiver, + update_handler, + store, + }) } /// `run` poll the write_receiver and read_receiver concurrently, but while messages send @@ -122,7 +126,12 @@ impl IndexActor { Snapshot { uuid, path, ret } => { let _ = ret.send(self.handle_snapshot(uuid, path).await); } - Dump { uid, uuid, path, ret } => { + Dump { + uid, + uuid, + path, + ret, + } => { let _ = ret.send(self.handle_dump(&uid, uuid, path).await); } GetStats { uuid, ret } => { @@ -146,9 +155,7 @@ impl IndexActor { primary_key: Option, ) -> IndexResult { let index = self.store.create(uuid, primary_key).await?; - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) } @@ -165,9 +172,9 @@ impl IndexActor { None => self.store.create(uuid, None).await?, }; - spawn_blocking(move || update_handler.handle_update(meta, data, index)) - .await - .map_err(|e| IndexError::Error(e.into())) + let result = + spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?; + Ok(result) } async fn handle_settings(&self, uuid: Uuid) -> IndexResult> { @@ -176,9 +183,8 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.settings().map_err(IndexError::Error)) - .await - .map_err(|e| IndexError::Error(e.into()))? + let result = spawn_blocking(move || index.settings()).await??; + Ok(result) } async fn handle_fetch_documents( @@ -193,13 +199,11 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_documents(offset, limit, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? + let result = + spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) + .await??; + + Ok(result) } async fn handle_fetch_document( @@ -213,13 +217,12 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_document(doc_id, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? + + let result = + spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) + .await??; + + Ok(result) } async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> { @@ -242,9 +245,7 @@ impl IndexActor { async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult { match self.store.get(uuid).await? { Some(index) => { - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) } None => Err(IndexError::UnexistingIndex), @@ -262,7 +263,7 @@ impl IndexActor { .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || match index_settings.primary_key { + let result = spawn_blocking(move || match index_settings.primary_key { Some(ref primary_key) => { let mut txn = index.write_txn()?; if index.primary_key(&txn)?.is_some() { @@ -278,23 +279,22 @@ impl IndexActor { Ok(meta) } }) - .await - .map_err(|e| IndexError::Error(e.into()))? + .await??; + + Ok(result) } async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> { use tokio::fs::create_dir_all; path.push("indexes"); - create_dir_all(&path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + create_dir_all(&path).await?; if let Some(index) = self.store.get(uuid).await? { let mut index_path = path.join(format!("index-{}", uuid)); - create_dir_all(&index_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + + create_dir_all(&index_path).await?; + index_path.push("data.mdb"); spawn_blocking(move || -> anyhow::Result<()> { // Get write txn to wait for ongoing write transaction before snapshot. @@ -304,9 +304,7 @@ impl IndexActor { .copy_to_path(index_path, CompactionOption::Enabled)?; Ok(()) }) - .await - .map_err(|e| IndexError::Error(e.into()))? - .map_err(IndexError::Error)?; + .await??; } Ok(()) @@ -318,9 +316,7 @@ impl IndexActor { use std::io::prelude::*; use tokio::fs::create_dir_all; - create_dir_all(&path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + create_dir_all(&path).await?; if let Some(index) = self.store.get(uuid).await? { let documents_path = path.join(uid).join("documents.jsonl"); @@ -354,9 +350,7 @@ impl IndexActor { Ok(()) }) - .await - .map_err(|e| IndexError::Error(e.into()))? - .map_err(IndexError::Error)?; + .await??; } Ok(()) @@ -379,7 +373,6 @@ impl IndexActor { fields_distribution: index.fields_distribution(&rtxn)?, }) }) - .await - .map_err(|e| IndexError::Error(e.into()))? + .await? } } diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index 3b92b1078..fd1d59e8f 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -50,18 +50,30 @@ impl IndexMeta { #[derive(Error, Debug)] pub enum IndexError { - #[error("error with index: {0}")] - Error(#[from] anyhow::Error), #[error("index already exists")] IndexAlreadyExists, #[error("Index doesn't exists")] UnexistingIndex, - #[error("Heed error: {0}")] - HeedError(#[from] heed::Error), #[error("Existing primary key")] ExistingPrimaryKey, + #[error("Internal Index Error: {0}")] + Internal(String) } +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for IndexError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!(anyhow::Error, heed::Error, tokio::task::JoinError, std::io::Error); + #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait IndexActorHandle { diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 44f076f2f..3dee166a9 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -56,8 +56,7 @@ impl IndexStore for MapIndexStore { } Ok(index) }) - .await - .map_err(|e| IndexError::Error(e.into()))??; + .await??; self.index_store.write().await.insert(uuid, index.clone()); @@ -78,8 +77,7 @@ impl IndexStore for MapIndexStore { let index_size = self.index_size; let index = spawn_blocking(move || open_index(path, index_size)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + .await??; self.index_store.write().await.insert(uuid, index.clone()); Ok(Some(index)) } @@ -88,18 +86,16 @@ impl IndexStore for MapIndexStore { async fn delete(&self, uuid: Uuid) -> IndexResult> { let db_path = self.path.join(format!("index-{}", uuid)); - fs::remove_dir_all(db_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + fs::remove_dir_all(db_path).await?; let index = self.index_store.write().await.remove(&uuid); Ok(index) } } fn open_index(path: impl AsRef, size: usize) -> IndexResult { - std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; + std::fs::create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); - let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; + let index = milli::Index::new(options, &path)?; Ok(Index(Arc::new(index))) }