index error handling

This commit is contained in:
Marin Postma 2021-05-24 17:20:44 +02:00
parent 2185fb8367
commit 7ad553670f
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
3 changed files with 63 additions and 62 deletions

View File

@ -30,10 +30,14 @@ pub struct IndexActor<S> {
impl<S: IndexStore + Sync + Send> IndexActor<S> { impl<S: IndexStore + Sync + Send> IndexActor<S> {
pub fn new(receiver: mpsc::Receiver<IndexMsg>, store: S) -> IndexResult<Self> { pub fn new(receiver: mpsc::Receiver<IndexMsg>, store: S) -> IndexResult<Self> {
let options = IndexerOpts::default(); let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; let update_handler = UpdateHandler::new(&options)?;
let update_handler = Arc::new(update_handler); let update_handler = Arc::new(update_handler);
let receiver = Some(receiver); let receiver = Some(receiver);
Ok(Self { receiver, update_handler, store }) Ok(Self {
receiver,
update_handler,
store,
})
} }
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send /// `run` poll the write_receiver and read_receiver concurrently, but while messages send
@ -122,7 +126,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Snapshot { uuid, path, ret } => { Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuid, path).await); let _ = ret.send(self.handle_snapshot(uuid, path).await);
} }
Dump { uid, uuid, path, ret } => { Dump {
uid,
uuid,
path,
ret,
} => {
let _ = ret.send(self.handle_dump(&uid, uuid, path).await); let _ = ret.send(self.handle_dump(&uid, uuid, path).await);
} }
GetStats { uuid, ret } => { GetStats { uuid, ret } => {
@ -146,9 +155,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
primary_key: Option<String>, primary_key: Option<String>,
) -> IndexResult<IndexMeta> { ) -> IndexResult<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?; let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index)) let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(meta) Ok(meta)
} }
@ -165,9 +172,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
None => self.store.create(uuid, None).await?, None => self.store.create(uuid, None).await?,
}; };
spawn_blocking(move || update_handler.handle_update(meta, data, index)) let result =
.await spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?;
.map_err(|e| IndexError::Error(e.into())) Ok(result)
} }
async fn handle_settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>> { async fn handle_settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>> {
@ -176,9 +183,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid) .get(uuid)
.await? .await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.settings().map_err(IndexError::Error)) let result = spawn_blocking(move || index.settings()).await??;
.await Ok(result)
.map_err(|e| IndexError::Error(e.into()))?
} }
async fn handle_fetch_documents( async fn handle_fetch_documents(
@ -193,13 +199,11 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid) .get(uuid)
.await? .await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || { let result =
index spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve))
.retrieve_documents(offset, limit, attributes_to_retrieve) .await??;
.map_err(IndexError::Error)
}) Ok(result)
.await
.map_err(|e| IndexError::Error(e.into()))?
} }
async fn handle_fetch_document( async fn handle_fetch_document(
@ -213,13 +217,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid) .get(uuid)
.await? .await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index let result =
.retrieve_document(doc_id, attributes_to_retrieve) spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve))
.map_err(IndexError::Error) .await??;
})
.await Ok(result)
.map_err(|e| IndexError::Error(e.into()))?
} }
async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> { async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> {
@ -242,9 +245,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> { async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
match self.store.get(uuid).await? { match self.store.get(uuid).await? {
Some(index) => { Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index)) let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(meta) Ok(meta)
} }
None => Err(IndexError::UnexistingIndex), None => Err(IndexError::UnexistingIndex),
@ -262,7 +263,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.await? .await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || match index_settings.primary_key { let result = spawn_blocking(move || match index_settings.primary_key {
Some(ref primary_key) => { Some(ref primary_key) => {
let mut txn = index.write_txn()?; let mut txn = index.write_txn()?;
if index.primary_key(&txn)?.is_some() { if index.primary_key(&txn)?.is_some() {
@ -278,23 +279,22 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(meta) Ok(meta)
} }
}) })
.await .await??;
.map_err(|e| IndexError::Error(e.into()))?
Ok(result)
} }
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> { async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> {
use tokio::fs::create_dir_all; use tokio::fs::create_dir_all;
path.push("indexes"); path.push("indexes");
create_dir_all(&path) create_dir_all(&path).await?;
.await
.map_err(|e| IndexError::Error(e.into()))?;
if let Some(index) = self.store.get(uuid).await? { if let Some(index) = self.store.get(uuid).await? {
let mut index_path = path.join(format!("index-{}", uuid)); let mut index_path = path.join(format!("index-{}", uuid));
create_dir_all(&index_path)
.await create_dir_all(&index_path).await?;
.map_err(|e| IndexError::Error(e.into()))?;
index_path.push("data.mdb"); index_path.push("data.mdb");
spawn_blocking(move || -> anyhow::Result<()> { spawn_blocking(move || -> anyhow::Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot. // Get write txn to wait for ongoing write transaction before snapshot.
@ -304,9 +304,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.copy_to_path(index_path, CompactionOption::Enabled)?; .copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(()) Ok(())
}) })
.await .await??;
.map_err(|e| IndexError::Error(e.into()))?
.map_err(IndexError::Error)?;
} }
Ok(()) Ok(())
@ -318,9 +316,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
use std::io::prelude::*; use std::io::prelude::*;
use tokio::fs::create_dir_all; use tokio::fs::create_dir_all;
create_dir_all(&path) create_dir_all(&path).await?;
.await
.map_err(|e| IndexError::Error(e.into()))?;
if let Some(index) = self.store.get(uuid).await? { if let Some(index) = self.store.get(uuid).await? {
let documents_path = path.join(uid).join("documents.jsonl"); let documents_path = path.join(uid).join("documents.jsonl");
@ -354,9 +350,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(()) Ok(())
}) })
.await .await??;
.map_err(|e| IndexError::Error(e.into()))?
.map_err(IndexError::Error)?;
} }
Ok(()) Ok(())
@ -379,7 +373,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
fields_distribution: index.fields_distribution(&rtxn)?, fields_distribution: index.fields_distribution(&rtxn)?,
}) })
}) })
.await .await?
.map_err(|e| IndexError::Error(e.into()))?
} }
} }

View File

@ -50,18 +50,30 @@ impl IndexMeta {
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum IndexError { pub enum IndexError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("index already exists")] #[error("index already exists")]
IndexAlreadyExists, IndexAlreadyExists,
#[error("Index doesn't exists")] #[error("Index doesn't exists")]
UnexistingIndex, UnexistingIndex,
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("Existing primary key")] #[error("Existing primary key")]
ExistingPrimaryKey, ExistingPrimaryKey,
#[error("Internal Index Error: {0}")]
Internal(String)
} }
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for IndexError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(anyhow::Error, heed::Error, tokio::task::JoinError, std::io::Error);
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, automock)] #[cfg_attr(test, automock)]
pub trait IndexActorHandle { pub trait IndexActorHandle {

View File

@ -56,8 +56,7 @@ impl IndexStore for MapIndexStore {
} }
Ok(index) Ok(index)
}) })
.await .await??;
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid, index.clone()); self.index_store.write().await.insert(uuid, index.clone());
@ -78,8 +77,7 @@ impl IndexStore for MapIndexStore {
let index_size = self.index_size; let index_size = self.index_size;
let index = spawn_blocking(move || open_index(path, index_size)) let index = spawn_blocking(move || open_index(path, index_size))
.await .await??;
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid, index.clone()); self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index)) Ok(Some(index))
} }
@ -88,18 +86,16 @@ impl IndexStore for MapIndexStore {
async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>> { async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid)); let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path) fs::remove_dir_all(db_path).await?;
.await
.map_err(|e| IndexError::Error(e.into()))?;
let index = self.index_store.write().await.remove(&uuid); let index = self.index_store.write().await.remove(&uuid);
Ok(index) Ok(index)
} }
} }
fn open_index(path: impl AsRef<Path>, size: usize) -> IndexResult<Index> { fn open_index(path: impl AsRef<Path>, size: usize) -> IndexResult<Index> {
std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; std::fs::create_dir_all(&path)?;
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(size); options.map_size(size);
let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; let index = milli::Index::new(options, &path)?;
Ok(Index(Arc::new(index))) Ok(Index(Arc::new(index)))
} }