From 4e1597bd1dbbc9c1b10a85124f9a748be6031e26 Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 8 Mar 2021 16:27:29 +0100 Subject: [PATCH] clean Uuid resolver actor --- src/index_controller/mod.rs | 38 ++++------- src/index_controller/uuid_resolver.rs | 93 +++++++++++++++++---------- 2 files changed, 74 insertions(+), 57 deletions(-) diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 0bae7f42d..a824852bf 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -10,7 +10,6 @@ use std::sync::Arc; use std::time::Duration; use actix_web::web::{Bytes, Payload}; -use anyhow::Context; use futures::stream::StreamExt; use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Serialize, Deserialize}; @@ -108,7 +107,7 @@ impl IndexController { } pub async fn clear_documents(&self, index: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(index).await?.unwrap(); + let uuid = self.uuid_resolver.resolve(index).await?; let meta = UpdateMeta::ClearDocuments; let (_, receiver) = mpsc::channel(1); let status = self.update_handle.update(meta, receiver, uuid).await?; @@ -116,7 +115,7 @@ impl IndexController { } pub async fn delete_documents(&self, index: String, document_ids: Vec) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(index).await.unwrap().unwrap(); + let uuid = self.uuid_resolver.resolve(index).await?; let meta = UpdateMeta::DeleteDocuments; let (sender, receiver) = mpsc::channel(10); @@ -153,8 +152,7 @@ impl IndexController { pub async fn delete_index(&self, index_uid: String) -> anyhow::Result<()> { let uuid = self.uuid_resolver .delete(index_uid) - .await? - .context("index not found")?; + .await?; self.update_handle.delete(uuid.clone()).await?; self.index_handle.delete(uuid).await?; Ok(()) @@ -163,16 +161,14 @@ impl IndexController { pub async fn update_status(&self, index: String, id: u64) -> anyhow::Result> { let uuid = self.uuid_resolver .resolve(index) - .await? - .context("index not found")?; + .await?; let result = self.update_handle.update_status(uuid, id).await?; Ok(result) } pub async fn all_update_status(&self, index: String) -> anyhow::Result> { let uuid = self.uuid_resolver - .resolve(index).await? - .context("index not found")?; + .resolve(index).await?; let result = self.update_handle.get_all_updates_status(uuid).await?; Ok(result) } @@ -195,8 +191,7 @@ impl IndexController { pub async fn settings(&self, index: String) -> anyhow::Result { let uuid = self.uuid_resolver .resolve(index.clone()) - .await? - .with_context(|| format!("Index {:?} doesn't exist", index))?; + .await?; let settings = self.index_handle.settings(uuid).await?; Ok(settings) } @@ -210,8 +205,7 @@ impl IndexController { ) -> anyhow::Result> { let uuid = self.uuid_resolver .resolve(index.clone()) - .await? - .with_context(|| format!("Index {:?} doesn't exist", index))?; + .await?; let documents = self.index_handle.documents(uuid, offset, limit, attributes_to_retrieve).await?; Ok(documents) } @@ -224,8 +218,7 @@ impl IndexController { ) -> anyhow::Result { let uuid = self.uuid_resolver .resolve(index.clone()) - .await? - .with_context(|| format!("Index {:?} doesn't exist", index))?; + .await?; let document = self.index_handle.document(uuid, doc_id, attributes_to_retrieve).await?; Ok(document) } @@ -235,21 +228,18 @@ impl IndexController { } pub async fn search(&self, name: String, query: SearchQuery) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(name).await.unwrap().unwrap(); + let uuid = self.uuid_resolver.resolve(name).await?; let result = self.index_handle.search(uuid, query).await?; Ok(result) } pub async fn get_index(&self, name: String) -> anyhow::Result> { let uuid = self.uuid_resolver.resolve(name.clone()).await?; - if let Some(uuid) = uuid { - let result = self.index_handle - .get_index_meta(uuid) - .await? - .map(|meta| IndexMetadata { name, meta }); - return Ok(result) - } - Ok(None) + let result = self.index_handle + .get_index_meta(uuid) + .await? + .map(|meta| IndexMetadata { name, meta }); + Ok(result) } } diff --git a/src/index_controller/uuid_resolver.rs b/src/index_controller/uuid_resolver.rs index e2539fdb2..8d33edef4 100644 --- a/src/index_controller/uuid_resolver.rs +++ b/src/index_controller/uuid_resolver.rs @@ -1,10 +1,10 @@ -use thiserror::Error; -use tokio::sync::{RwLock, mpsc, oneshot}; -use uuid::Uuid; +use log::{info, warn}; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; -use std::collections::hash_map::Entry; -use log::{info, warn}; +use thiserror::Error; +use tokio::sync::{mpsc, oneshot, RwLock}; +use uuid::Uuid; pub type Result = std::result::Result; @@ -12,7 +12,7 @@ pub type Result = std::result::Result; enum UuidResolveMsg { Resolve { name: String, - ret: oneshot::Sender>>, + ret: oneshot::Sender>, }, GetOrCreate { name: String, @@ -24,7 +24,7 @@ enum UuidResolveMsg { }, Delete { name: String, - ret: oneshot::Sender>>, + ret: oneshot::Sender>, }, List { ret: oneshot::Sender>>, @@ -46,13 +46,20 @@ impl UuidResolverActor { info!("uuid resolver started"); - // TODO: benchmark and use buffered streams to improve throughput. loop { match self.inbox.recv().await { - Some(Create { name, ret }) => self.handle_create(name, ret).await, - Some(GetOrCreate { name, ret }) => self.handle_get_or_create(name, ret).await, - Some(Resolve { name, ret }) => self.handle_resolve(name, ret).await, - Some(Delete { name, ret }) => self.handle_delete(name, ret).await, + Some(Create { name, ret }) => { + let _ = ret.send(self.handle_create(name).await); + } + Some(GetOrCreate { name, ret }) => { + let _ = ret.send(self.handle_get_or_create(name).await); + } + Some(Resolve { name, ret }) => { + let _ = ret.send(self.handle_resolve(name).await); + } + Some(Delete { name, ret }) => { + let _ = ret.send(self.handle_delete(name).await); + } Some(List { ret }) => { let _ = ret.send(self.handle_list().await); } @@ -64,24 +71,26 @@ impl UuidResolverActor { warn!("exiting uuid resolver loop"); } - async fn handle_create(&self, name: String, ret: oneshot::Sender>) { - let result = self.store.create_uuid(name, true).await; - let _ = ret.send(result); + async fn handle_create(&self, name: String) -> Result { + self.store.create_uuid(name, true).await } - async fn handle_get_or_create(&self, name: String, ret: oneshot::Sender>) { - let result = self.store.create_uuid(name, false).await; - let _ = ret.send(result); + async fn handle_get_or_create(&self, name: String) -> Result { + self.store.create_uuid(name, false).await } - async fn handle_resolve(&self, name: String, ret: oneshot::Sender>>) { - let result = self.store.get_uuid(&name).await; - let _ = ret.send(result); + async fn handle_resolve(&self, name: String) -> Result { + self.store + .get_uuid(&name) + .await? + .ok_or(UuidError::UnexistingIndex(name)) } - async fn handle_delete(&self, name: String, ret: oneshot::Sender>>) { - let result = self.store.delete(&name).await; - let _ = ret.send(result); + async fn handle_delete(&self, name: String) -> Result { + self.store + .delete(&name) + .await? + .ok_or(UuidError::UnexistingIndex(name)) } async fn handle_list(&self) -> Result> { @@ -104,39 +113,49 @@ impl UuidResolverHandle { Self { sender } } - pub async fn resolve(&self, name: String) -> anyhow::Result> { + pub async fn resolve(&self, name: String) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Resolve { name, ret }; let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) } pub async fn get_or_create(&self, name: String) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::GetOrCreate { name, ret }; let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) } pub async fn create(&self, name: String) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Create { name, ret }; let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) } - pub async fn delete(&self, name: String) -> anyhow::Result> { + pub async fn delete(&self, name: String) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Delete { name, ret }; let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) } pub async fn list(&self) -> anyhow::Result> { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::List { ret }; let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) } } @@ -144,6 +163,8 @@ impl UuidResolverHandle { pub enum UuidError { #[error("Name already exist.")] NameAlreadyExist, + #[error("Index \"{0}\" doesn't exist.")] + UnexistingIndex(String), } #[async_trait::async_trait] @@ -168,7 +189,7 @@ impl UuidStore for MapUuidStore { } else { Ok(entry.get().clone()) } - }, + } Entry::Vacant(entry) => { let uuid = Uuid::new_v4(); let uuid = entry.insert(uuid); @@ -186,7 +207,13 @@ impl UuidStore for MapUuidStore { } async fn list(&self) -> Result> { - let list = self.0.read().await.iter().map(|(name, uuid)| (name.to_owned(), uuid.clone())).collect(); + let list = self + .0 + .read() + .await + .iter() + .map(|(name, uuid)| (name.to_owned(), uuid.clone())) + .collect(); Ok(list) } }