From 06403a57081a63805baee44ade6232e2500df5e8 Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 8 Mar 2021 15:53:16 +0100 Subject: [PATCH] clean index actor unwraps --- src/data/mod.rs | 4 +- src/index_controller/index_actor.rs | 188 +++++++++++++++------------ src/index_controller/mod.rs | 6 +- src/index_controller/update_store.rs | 12 +- 4 files changed, 113 insertions(+), 97 deletions(-) diff --git a/src/data/mod.rs b/src/data/mod.rs index 0f7722657..b011c18b3 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -59,10 +59,8 @@ impl Data { pub fn new(options: Opt) -> anyhow::Result { let path = options.db_path.clone(); - //let indexer_opts = options.indexer_options.clone(); - create_dir_all(&path)?; - let index_controller = IndexController::new(&path); + let index_controller = IndexController::new(&path)?; let mut api_keys = ApiKeys { master: options.clone().master_key, diff --git a/src/index_controller/index_actor.rs b/src/index_controller/index_actor.rs index c85224bb4..1ddc041a1 100644 --- a/src/index_controller/index_actor.rs +++ b/src/index_controller/index_actor.rs @@ -5,15 +5,15 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use async_stream::stream; -use chrono::{Utc, DateTime}; +use chrono::{DateTime, Utc}; use futures::pin_mut; use futures::stream::StreamExt; use heed::EnvOpenOptions; use log::info; +use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; -use serde::{Serialize, Deserialize}; use super::get_arc_ownership_blocking; use super::update_handler::UpdateHandler; @@ -31,7 +31,7 @@ type UpdateResult = std::result::Result, Failed, updated_at: DateTime, @@ -47,7 +47,7 @@ enum IndexMsg { Update { meta: Processing, data: std::fs::File, - ret: oneshot::Sender, + ret: oneshot::Sender>, }, Search { uuid: Uuid, @@ -102,8 +102,9 @@ pub enum IndexError { trait IndexStore { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; async fn update_index(&self, uuid: Uuid, f: F) -> Result - where F: FnOnce(Index) -> Result + Send + Sync + 'static, - R: Sync + Send + 'static; + where + F: FnOnce(Index) -> Result + Send + Sync + 'static, + R: Sync + Send + 'static; async fn get_or_create(&self, uuid: Uuid, primary_key: Option) -> Result; async fn get(&self, uuid: Uuid) -> Result>; async fn delete(&self, uuid: &Uuid) -> Result>; @@ -115,18 +116,19 @@ impl IndexActor { read_receiver: mpsc::Receiver, write_receiver: mpsc::Receiver, store: S, - ) -> Self { + ) -> Result { let options = IndexerOpts::default(); - let update_handler = UpdateHandler::new(&options).unwrap(); + let update_handler = UpdateHandler::new(&options) + .map_err(|e| IndexError::Error(e.into()))?; let update_handler = Arc::new(update_handler); let read_receiver = Some(read_receiver); let write_receiver = Some(write_receiver); - Self { + Ok(Self { read_receiver, write_receiver, store, update_handler, - } + }) } /// `run` poll the write_receiver and read_receiver concurrently, but while messages send @@ -180,10 +182,18 @@ impl IndexActor { uuid, primary_key, ret, - } => self.handle_create_index(uuid, primary_key, ret).await, - Update { ret, meta, data } => self.handle_update(meta, data, ret).await, - Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, - Settings { ret, uuid } => self.handle_settings(uuid, ret).await, + } => { + let _ = ret.send(self.handle_create_index(uuid, primary_key).await); + } + Update { ret, meta, data } => { + let _ = ret.send(self.handle_update(meta, data).await); + } + Search { ret, query, uuid } => { + let _ = ret.send(self.handle_search(uuid, query).await); + } + Settings { ret, uuid } => { + let _ = ret.send(self.handle_settings(uuid).await); + } Documents { ret, uuid, @@ -191,8 +201,10 @@ impl IndexActor { offset, limit, } => { - self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) - .await + let _ = ret.send( + self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve) + .await, + ); } Document { uuid, @@ -200,8 +212,10 @@ impl IndexActor { doc_id, ret, } => { - self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret) - .await + let _ = ret.send( + self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve) + .await, + ); } Delete { uuid, ret } => { let _ = ret.send(self.handle_delete(uuid).await); @@ -212,56 +226,51 @@ impl IndexActor { } } - async fn handle_search( - &self, - uuid: Uuid, - query: SearchQuery, - ret: oneshot::Sender>, - ) { - let index = self.store.get(uuid).await.unwrap().unwrap(); - tokio::task::spawn_blocking(move || { - let result = index.perform_search(query); - ret.send(result) - }); + async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result { + let index = self.store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + tokio::task::spawn_blocking(move || index.perform_search(query)).await? } async fn handle_create_index( &self, uuid: Uuid, primary_key: Option, - ret: oneshot::Sender>, - ) { - let result = self.store.create_index(uuid, primary_key).await; - let _ = ret.send(result); + ) -> Result { + self.store.create_index(uuid, primary_key).await } async fn handle_update( &self, meta: Processing, data: File, - ret: oneshot::Sender, - ) { + ) -> Result { info!("Processing update {}", meta.id()); let uuid = meta.index_uuid().clone(); let update_handler = self.update_handler.clone(); - let handle = self.store.update_index(uuid, |index| { - let handle = tokio::task::spawn_blocking(move || { - let result = update_handler.handle_update(meta, data, index); - let _ = ret.send(result); - }); - Ok(handle) - }); + let handle = self + .store + .update_index(uuid, |index| { + let handle = tokio::task::spawn_blocking(move || { + update_handler.handle_update(meta, data, index) + }); + Ok(handle) + }) + .await?; - handle.await; + handle.await.map_err(|e| IndexError::Error(e.into())) } - async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender>) { - let index = self.store.get(uuid).await.unwrap().unwrap(); - tokio::task::spawn_blocking(move || { - let result = index.settings().map_err(|e| IndexError::Error(e)); - let _ = ret.send(result); - }) - .await; + async fn handle_settings(&self, uuid: Uuid) -> Result { + let index = self.store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + tokio::task::spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e))) + .await + .map_err(|e| IndexError::Error(e.into()))? } async fn handle_fetch_documents( @@ -270,16 +279,17 @@ impl IndexActor { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ret: oneshot::Sender>>, - ) { - let index = self.store.get(uuid).await.unwrap().unwrap(); + ) -> Result> { + let index = self.store.get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; tokio::task::spawn_blocking(move || { - let result = index + index .retrieve_documents(offset, limit, attributes_to_retrieve) - .map_err(|e| IndexError::Error(e)); - let _ = ret.send(result); + .map_err(|e| IndexError::Error(e)) }) - .await; + .await + .map_err(|e| IndexError::Error(e.into()))? } async fn handle_fetch_document( @@ -287,16 +297,19 @@ impl IndexActor { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ret: oneshot::Sender>, - ) { - let index = self.store.get(uuid).await.unwrap().unwrap(); + ) -> Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; tokio::task::spawn_blocking(move || { - let result = index + index .retrieve_document(doc_id, attributes_to_retrieve) - .map_err(|e| IndexError::Error(e)); - let _ = ret.send(result); + .map_err(|e| IndexError::Error(e)) }) - .await; + .await + .map_err(|e| IndexError::Error(e.into()))? } async fn handle_delete(&self, uuid: Uuid) -> Result<()> { @@ -329,24 +342,20 @@ pub struct IndexActorHandle { } impl IndexActorHandle { - pub fn new(path: impl AsRef) -> Self { + pub fn new(path: impl AsRef) -> anyhow::Result { let (read_sender, read_receiver) = mpsc::channel(100); let (write_sender, write_receiver) = mpsc::channel(100); let store = MapIndexStore::new(path); - let actor = IndexActor::new(read_receiver, write_receiver, store); + let actor = IndexActor::new(read_receiver, write_receiver, store)?; tokio::task::spawn(actor.run()); - Self { + Ok(Self { read_sender, write_sender, - } + }) } - pub async fn create_index( - &self, - uuid: Uuid, - primary_key: Option, - ) -> Result { + pub async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::CreateIndex { ret, @@ -357,11 +366,15 @@ impl IndexActorHandle { receiver.await.expect("IndexActor has been killed") } - pub async fn update(&self, meta: Processing, data: std::fs::File) -> UpdateResult { + pub async fn update( + &self, + meta: Processing, + data: std::fs::File, + ) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Update { ret, meta, data }; let _ = self.read_sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") + Ok(receiver.await.expect("IndexActor has been killed")?) } pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { @@ -441,7 +454,7 @@ impl IndexStore for MapIndexStore { let meta = match self.meta_store.write().await.entry(uuid.clone()) { Entry::Vacant(entry) => { let now = Utc::now(); - let meta = IndexMeta{ + let meta = IndexMeta { uuid, created_at: now.clone(), updated_at: now, @@ -478,7 +491,7 @@ impl IndexStore for MapIndexStore { Entry::Vacant(index_entry) => match self.meta_store.write().await.entry(uuid.clone()) { Entry::Vacant(meta_entry) => { let now = Utc::now(); - let meta = IndexMeta{ + let meta = IndexMeta { uuid, created_at: now.clone(), updated_at: now, @@ -491,12 +504,13 @@ impl IndexStore for MapIndexStore { create_dir_all(&db_path).expect("can't create db"); let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100_000); - let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?; + let index = milli::Index::new(options, &db_path) + .map_err(|e| IndexError::Error(e))?; let index = Index(Arc::new(index)); Ok(index) }) .await - .expect("thread died"); + .expect("thread died"); Ok(index_entry.insert(index?).clone()) } @@ -508,12 +522,13 @@ impl IndexStore for MapIndexStore { create_dir_all(&db_path).expect("can't create db"); let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100_000); - let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?; + let index = milli::Index::new(options, &db_path) + .map_err(|e| IndexError::Error(e))?; let index = Index(Arc::new(index)); Ok(index) }) .await - .expect("thread died"); + .expect("thread died"); Ok(index_entry.insert(index?).clone()) } @@ -540,11 +555,14 @@ impl IndexStore for MapIndexStore { } async fn update_index(&self, uuid: Uuid, f: F) -> Result - where F: FnOnce(Index) -> Result + Send + Sync + 'static, - R: Sync + Send + 'static, + where + F: FnOnce(Index) -> Result + Send + Sync + 'static, + R: Sync + Send + 'static, { let index = self.get_or_create(uuid.clone(), None).await?; - let mut meta = self.get_meta(&uuid).await? + let mut meta = self + .get_meta(&uuid) + .await? .ok_or(IndexError::UnexistingIndex)?; match f(index) { Ok(r) => { @@ -552,7 +570,7 @@ impl IndexStore for MapIndexStore { self.meta_store.write().await.insert(uuid, meta); Ok(r) } - Err(e) => Err(e) + Err(e) => Err(e), } } } diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index bcf663ec8..0bae7f42d 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -69,11 +69,11 @@ enum IndexControllerMsg { } impl IndexController { - pub fn new(path: impl AsRef) -> Self { + pub fn new(path: impl AsRef) -> anyhow::Result { let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); - let index_actor = index_actor::IndexActorHandle::new(&path); + let index_actor = index_actor::IndexActorHandle::new(&path)?; let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path); - Self { uuid_resolver, index_handle: index_actor, update_handle } + Ok(Self { uuid_resolver, index_handle: index_actor, update_handle }) } pub async fn add_documents( diff --git a/src/index_controller/update_store.rs b/src/index_controller/update_store.rs index 02c15ed8c..b8f174a34 100644 --- a/src/index_controller/update_store.rs +++ b/src/index_controller/update_store.rs @@ -30,18 +30,18 @@ pub trait HandleUpdate { &mut self, meta: Processing, content: File, - ) -> Result, Failed>; + ) -> anyhow::Result, Failed>>; } impl HandleUpdate for F where - F: FnMut(Processing, File) -> Result, Failed>, + F: FnMut(Processing, File) -> anyhow::Result, Failed>>, { fn handle_update( &mut self, meta: Processing, content: File, - ) -> Result, Failed> { + ) -> anyhow::Result, Failed>> { self(meta, content) } } @@ -100,7 +100,7 @@ where update_store.process_pending_update(handler) }) .await - .unwrap(); + .expect("Fatal error processing update."); match res { Ok(Some(_)) => (), Ok(None) => break, @@ -185,7 +185,7 @@ where /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, mut handler: U) -> heed::Result> + fn process_pending_update(&self, mut handler: U) -> anyhow::Result> where U: HandleUpdate, { @@ -209,7 +209,7 @@ where self.processing.write().unwrap().replace(processing.clone()); let file = File::open(&content_path)?; // Process the pending update using the provided user function. - let result = handler.handle_update(processing, file); + let result = handler.handle_update(processing, file)?; drop(rtxn); // Once the pending update have been successfully processed