From def737edeeef8c37056a0644f8b3cb5e0e598396 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 22 Sep 2021 10:49:59 +0200 Subject: [PATCH] refactor uuid resolver --- .../src/index_controller/dump_actor/actor.rs | 13 +- .../dump_actor/handle_impl.rs | 4 +- .../index_controller/dump_actor/loaders/v1.rs | 3 +- .../index_controller/dump_actor/loaders/v2.rs | 2 +- .../src/index_controller/dump_actor/mod.rs | 13 +- meilisearch-lib/src/index_controller/error.rs | 5 + meilisearch-lib/src/index_controller/mod.rs | 38 +++--- .../index_controller/uuid_resolver/actor.rs | 98 ------------- .../index_controller/uuid_resolver/error.rs | 16 +++ .../uuid_resolver/handle_impl.rs | 87 ------------ .../index_controller/uuid_resolver/message.rs | 58 +++++++- .../src/index_controller/uuid_resolver/mod.rs | 129 ++++++++++++++---- .../index_controller/uuid_resolver/store.rs | 3 +- 13 files changed, 223 insertions(+), 246 deletions(-) delete mode 100644 meilisearch-lib/src/index_controller/uuid_resolver/actor.rs delete mode 100644 meilisearch-lib/src/index_controller/uuid_resolver/handle_impl.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/actor.rs b/meilisearch-lib/src/index_controller/dump_actor/actor.rs index eee733c4a..f82101bc1 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/actor.rs @@ -8,17 +8,17 @@ use futures::{lock::Mutex, stream::StreamExt}; use log::{error, trace}; use tokio::sync::{mpsc, oneshot, RwLock}; use update_actor::UpdateActorHandle; -use uuid_resolver::UuidResolverHandle; use super::error::{DumpActorError, Result}; use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask}; -use crate::index_controller::{update_actor, uuid_resolver}; +use crate::index_controller::uuid_resolver::UuidResolverSender; +use crate::index_controller::update_actor; pub const CONCURRENT_DUMP_MSG: usize = 10; -pub struct DumpActor { +pub struct DumpActor { inbox: Option>, - uuid_resolver: UuidResolver, + uuid_resolver: UuidResolverSender, update: Update, dump_path: PathBuf, lock: Arc>, @@ -32,14 +32,13 @@ fn generate_uid() -> String { Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() } -impl DumpActor +impl DumpActor where - UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static, Update: UpdateActorHandle + Send + Sync + Clone + 'static, { pub fn new( inbox: mpsc::Receiver, - uuid_resolver: UuidResolver, + uuid_resolver: UuidResolverSender, update: Update, dump_path: impl AsRef, index_db_size: usize, diff --git a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs index 649d82405..544cb89c6 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs @@ -2,6 +2,8 @@ use std::path::Path; use tokio::sync::{mpsc, oneshot}; +use crate::index_controller::uuid_resolver::UuidResolverSender; + use super::error::Result; use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg}; @@ -30,7 +32,7 @@ impl DumpActorHandle for DumpActorHandleImpl { impl DumpActorHandleImpl { pub fn new( path: impl AsRef, - uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, + uuid_resolver: UuidResolverSender, update: crate::index_controller::update_actor::UpdateActorHandleImpl, index_db_size: usize, update_db_size: usize, diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs index 584828b4e..b489b2107 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs @@ -7,7 +7,8 @@ use milli::update::Setting; use serde::{Deserialize, Deserializer, Serialize}; use uuid::Uuid; -use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata}; +use crate::index_controller::uuid_resolver::store::HeedUuidStore; +use crate::index_controller::{self, IndexMetadata}; use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use crate::{ index::Unchecked, diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs index c39da1e44..7b7a8236c 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs @@ -5,7 +5,7 @@ use log::info; use serde::{Deserialize, Serialize}; use crate::index::Index; -use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}; +use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::store::HeedUuidStore}; use crate::options::IndexerOpts; #[derive(Serialize, Deserialize, Debug)] diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index e0f9535f3..445966a56 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -16,8 +16,10 @@ pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; -use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle}; +use super::update_actor::UpdateActorHandle; +use super::uuid_resolver::UuidResolverSender; use crate::index_controller::dump_actor::error::DumpActorError; +use crate::index_controller::uuid_resolver::UuidResolverMsg; use crate::options::IndexerOpts; use error::Result; @@ -149,18 +151,17 @@ pub fn load_dump( Ok(()) } -struct DumpTask { +struct DumpTask

{ path: PathBuf, - uuid_resolver: U, + uuid_resolver: UuidResolverSender, update_handle: P, uid: String, update_db_size: usize, index_db_size: usize, } -impl DumpTask +impl

DumpTask

where - U: UuidResolverHandle + Send + Sync + Clone + 'static, P: UpdateActorHandle + Send + Sync + Clone + 'static, { @@ -179,7 +180,7 @@ where let mut meta_file = File::create(&meta_path)?; serde_json::to_writer(&mut meta_file, &meta)?; - let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?; + let uuids = UuidResolverMsg::dump(&self.uuid_resolver, temp_dump_path.clone()).await?; self.update_handle .dump(uuids, temp_dump_path.clone()) diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index 00f6b8656..d3be7d7b7 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -1,3 +1,5 @@ +use std::error::Error; + use meilisearch_error::Code; use meilisearch_error::ErrorCode; @@ -24,6 +26,8 @@ pub enum IndexControllerError { DumpActor(#[from] DumpActorError), #[error("{0}")] IndexError(#[from] IndexError), + #[error("Internal error: {0}")] + Internal(Box), } impl ErrorCode for IndexControllerError { @@ -35,6 +39,7 @@ impl ErrorCode for IndexControllerError { IndexControllerError::UpdateActor(e) => e.error_code(), IndexControllerError::DumpActor(e) => e.error_code(), IndexControllerError::IndexError(e) => e.error_code(), + IndexControllerError::Internal(_) => Code::Internal, } } } diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 73df4eee6..f22fec33f 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -20,13 +20,14 @@ use index_actor::IndexActorHandle; use snapshot::load_snapshot; use update_actor::UpdateActorHandle; pub use updates::*; -use uuid_resolver::{error::UuidResolverError, UuidResolverHandle}; +use uuid_resolver::error::UuidResolverError; use crate::options::IndexerOpts; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use error::Result; use self::dump_actor::load_dump; +use self::uuid_resolver::UuidResolverMsg; mod dump_actor; pub mod error; @@ -71,7 +72,7 @@ pub struct IndexStats { #[derive(Clone)] pub struct IndexController { - uuid_resolver: uuid_resolver::UuidResolverHandleImpl, + uuid_resolver: uuid_resolver::UuidResolverSender, index_handle: index_actor::IndexActorHandleImpl, update_handle: update_actor::UpdateActorHandleImpl, dump_handle: dump_actor::DumpActorHandleImpl, @@ -136,7 +137,7 @@ impl IndexControllerBuilder { std::fs::create_dir_all(db_path.as_ref())?; - let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&db_path)?; + let uuid_resolver = uuid_resolver::create_uuid_resolver(&db_path)?; let index_handle = index_actor::IndexActorHandleImpl::new(&db_path, index_size, &indexer_options)?; let update_handle = update_actor::UpdateActorHandleImpl::new( @@ -231,7 +232,8 @@ impl IndexController { } pub async fn register_update(&self, uid: &str, update: Update) -> Result { - match self.uuid_resolver.get(uid.to_string()).await { + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.to_string()).await; + match uuid { Ok(uuid) => { let update_result = self.update_handle.update(uuid, update).await?; Ok(update_result) @@ -241,7 +243,8 @@ impl IndexController { let update_result = self.update_handle.update(uuid, update).await?; // ignore if index creation fails now, since it may already have been created let _ = self.index_handle.create_index(uuid, None).await; - self.uuid_resolver.insert(name, uuid).await?; + UuidResolverMsg::insert(&self.uuid_resolver, uuid, name).await?; + Ok(update_result) } Err(e) => Err(e.into()), @@ -374,22 +377,20 @@ impl IndexController { //} pub async fn update_status(&self, uid: String, id: u64) -> Result { - let uuid = self.uuid_resolver.get(uid).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let result = self.update_handle.update_status(uuid, id).await?; Ok(result) } pub async fn all_update_status(&self, uid: String) -> Result> { - let uuid = self.uuid_resolver.get(uid).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let result = self.update_handle.get_all_updates_status(uuid).await?; Ok(result) } pub async fn list_indexes(&self) -> Result> { - let uuids = self.uuid_resolver.list().await?; - + let uuids = UuidResolverMsg::list(&self.uuid_resolver).await?; let mut ret = Vec::new(); - for (uid, uuid) in uuids { let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { @@ -405,7 +406,7 @@ impl IndexController { } pub async fn settings(&self, uid: String) -> Result> { - let uuid = self.uuid_resolver.get(uid.clone()).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let settings = self.index_handle.settings(uuid).await?; Ok(settings) } @@ -417,7 +418,7 @@ impl IndexController { limit: usize, attributes_to_retrieve: Option>, ) -> Result> { - let uuid = self.uuid_resolver.get(uid.clone()).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let documents = self .index_handle .documents(uuid, offset, limit, attributes_to_retrieve) @@ -431,7 +432,7 @@ impl IndexController { doc_id: String, attributes_to_retrieve: Option>, ) -> Result { - let uuid = self.uuid_resolver.get(uid.clone()).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let document = self .index_handle .document(uuid, doc_id, attributes_to_retrieve) @@ -448,7 +449,7 @@ impl IndexController { index_settings.uid.take(); } - let uuid = self.uuid_resolver.get(uid.clone()).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?; let meta = IndexMetadata { uuid, @@ -460,13 +461,13 @@ impl IndexController { } pub async fn search(&self, uid: String, query: SearchQuery) -> Result { - let uuid = self.uuid_resolver.get(uid).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let result = self.index_handle.search(uuid, query).await?; Ok(result) } pub async fn get_index(&self, uid: String) -> Result { - let uuid = self.uuid_resolver.get(uid.clone()).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?; let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { uuid, @@ -478,11 +479,12 @@ impl IndexController { } pub async fn get_uuids_size(&self) -> Result { - Ok(self.uuid_resolver.get_size().await?) + let size = UuidResolverMsg::get_size(&self.uuid_resolver).await?; + Ok(size) } pub async fn get_index_stats(&self, uid: String) -> Result { - let uuid = self.uuid_resolver.get(uid).await?; + let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let update_infos = self.update_handle.get_info().await?; let mut stats = self.index_handle.get_index_stats(uuid).await?; // Check if the currently indexing update is from out index. diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/actor.rs b/meilisearch-lib/src/index_controller/uuid_resolver/actor.rs deleted file mode 100644 index d221bd4f2..000000000 --- a/meilisearch-lib/src/index_controller/uuid_resolver/actor.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::{collections::HashSet, path::PathBuf}; - -use log::{trace, warn}; -use tokio::sync::mpsc; -use uuid::Uuid; - -use super::{error::UuidResolverError, Result, UuidResolveMsg, UuidStore}; - -pub struct UuidResolverActor { - inbox: mpsc::Receiver, - store: S, -} - -impl UuidResolverActor { - pub fn new(inbox: mpsc::Receiver, store: S) -> Self { - Self { inbox, store } - } - - pub async fn run(mut self) { - use UuidResolveMsg::*; - - trace!("uuid resolver started"); - - loop { - match self.inbox.recv().await { - Some(Get { uid: name, ret }) => { - let _ = ret.send(self.handle_get(name).await); - } - Some(Delete { uid: name, ret }) => { - let _ = ret.send(self.handle_delete(name).await); - } - Some(List { ret }) => { - let _ = ret.send(self.handle_list().await); - } - Some(Insert { ret, uuid, name }) => { - let _ = ret.send(self.handle_insert(name, uuid).await); - } - Some(SnapshotRequest { path, ret }) => { - let _ = ret.send(self.handle_snapshot(path).await); - } - Some(GetSize { ret }) => { - let _ = ret.send(self.handle_get_size().await); - } - Some(DumpRequest { path, ret }) => { - let _ = ret.send(self.handle_dump(path).await); - } - // all senders have been dropped, need to quit. - None => break, - } - } - - warn!("exiting uuid resolver loop"); - } - - async fn handle_get(&self, uid: String) -> Result { - self.store - .get_uuid(uid.clone()) - .await? - .ok_or(UuidResolverError::UnexistingIndex(uid)) - } - - async fn handle_delete(&self, uid: String) -> Result { - self.store - .delete(uid.clone()) - .await? - .ok_or(UuidResolverError::UnexistingIndex(uid)) - } - - async fn handle_list(&self) -> Result> { - let result = self.store.list().await?; - Ok(result) - } - - async fn handle_snapshot(&self, path: PathBuf) -> Result> { - self.store.snapshot(path).await - } - - async fn handle_dump(&self, path: PathBuf) -> Result> { - self.store.dump(path).await - } - - async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { - if !is_index_uid_valid(&uid) { - return Err(UuidResolverError::BadlyFormatted(uid)); - } - self.store.insert(uid, uuid).await?; - Ok(()) - } - - async fn handle_get_size(&self) -> Result { - self.store.get_size().await - } -} - -fn is_index_uid_valid(uid: &str) -> bool { - uid.chars() - .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') -} diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/error.rs b/meilisearch-lib/src/index_controller/uuid_resolver/error.rs index de3dc662e..8f32fa35d 100644 --- a/meilisearch-lib/src/index_controller/uuid_resolver/error.rs +++ b/meilisearch-lib/src/index_controller/uuid_resolver/error.rs @@ -1,4 +1,8 @@ +use std::fmt; + use meilisearch_error::{Code, ErrorCode}; +use tokio::sync::mpsc::error::SendError as MpscSendError; +use tokio::sync::oneshot::error::RecvError as OneshotRecvError; pub type Result = std::result::Result; @@ -22,6 +26,18 @@ internal_error!( serde_json::Error ); +impl From> for UuidResolverError { + fn from(other: MpscSendError) -> Self { + Self::Internal(Box::new(other)) + } +} + +impl From for UuidResolverError { + fn from(other: OneshotRecvError) -> Self { + Self::Internal(Box::new(other)) + } +} + impl ErrorCode for UuidResolverError { fn error_code(&self) -> Code { match self { diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-lib/src/index_controller/uuid_resolver/handle_impl.rs deleted file mode 100644 index 1296264e0..000000000 --- a/meilisearch-lib/src/index_controller/uuid_resolver/handle_impl.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::collections::HashSet; -use std::path::{Path, PathBuf}; - -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; - -use super::{HeedUuidStore, Result, UuidResolveMsg, UuidResolverActor, UuidResolverHandle}; - -#[derive(Clone)] -pub struct UuidResolverHandleImpl { - sender: mpsc::Sender, -} - -impl UuidResolverHandleImpl { - pub fn new(path: impl AsRef) -> Result { - let (sender, reveiver) = mpsc::channel(100); - let store = HeedUuidStore::new(path)?; - let actor = UuidResolverActor::new(reveiver, store); - tokio::spawn(actor.run()); - Ok(Self { sender }) - } -} - -#[async_trait::async_trait] -impl UuidResolverHandle for UuidResolverHandleImpl { - async fn get(&self, name: String) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Get { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - async fn delete(&self, name: String) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Delete { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - async fn list(&self) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::List { ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Insert { ret, name, uuid }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - async fn snapshot(&self, path: PathBuf) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::SnapshotRequest { path, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - async fn get_size(&self) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::GetSize { ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - async fn dump(&self, path: PathBuf) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::DumpRequest { ret, path }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } -} diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/message.rs b/meilisearch-lib/src/index_controller/uuid_resolver/message.rs index 46d9b585f..e9da56d5e 100644 --- a/meilisearch-lib/src/index_controller/uuid_resolver/message.rs +++ b/meilisearch-lib/src/index_controller/uuid_resolver/message.rs @@ -1,12 +1,13 @@ use std::collections::HashSet; use std::path::PathBuf; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use super::Result; +use super::error::Result; -pub enum UuidResolveMsg { +#[derive(Debug)] +pub enum UuidResolverMsg { Get { uid: String, ret: oneshot::Sender>, @@ -35,3 +36,54 @@ pub enum UuidResolveMsg { ret: oneshot::Sender>>, }, } + +impl UuidResolverMsg { + pub async fn get(channel: &mpsc::Sender, uid: String) -> Result { + let (ret, recv) = oneshot::channel(); + let msg = Self::Get { uid, ret }; + channel.send(msg).await?; + recv.await? + } + + pub async fn insert(channel: &mpsc::Sender, uuid: Uuid, name: String) -> Result<()> { + let (ret, recv) = oneshot::channel(); + let msg = Self::Insert { name, uuid, ret }; + channel.send(msg).await?; + recv.await? + } + + pub async fn list(channel: &mpsc::Sender) -> Result> { + let (ret, recv) = oneshot::channel(); + let msg = Self::List { ret }; + channel.send(msg).await?; + recv.await? + } + + pub async fn get_size(channel: &mpsc::Sender) -> Result { + let (ret, recv) = oneshot::channel(); + let msg = Self::GetSize { ret }; + channel.send(msg).await?; + recv.await? + } + + pub async fn dump(channel: &mpsc::Sender, path: PathBuf) -> Result> { + let (ret, recv) = oneshot::channel(); + let msg = Self::DumpRequest { ret, path }; + channel.send(msg).await?; + recv.await? + } + + pub async fn snapshot(channel: &mpsc::Sender, path: PathBuf) -> Result> { + let (ret, recv) = oneshot::channel(); + let msg = Self::SnapshotRequest { ret, path }; + channel.send(msg).await?; + recv.await? + } + + pub async fn delete(channel: &mpsc::Sender, uid: String) -> Result { + let (ret, recv) = oneshot::channel(); + let msg = Self::Delete { ret, uid }; + channel.send(msg).await?; + recv.await? + } +} diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/mod.rs b/meilisearch-lib/src/index_controller/uuid_resolver/mod.rs index da6c1264d..7157c1b41 100644 --- a/meilisearch-lib/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-lib/src/index_controller/uuid_resolver/mod.rs @@ -1,35 +1,118 @@ -mod actor; pub mod error; -mod handle_impl; mod message; pub mod store; -use std::collections::HashSet; -use std::path::PathBuf; +use std::path::Path; +use std::{collections::HashSet, path::PathBuf}; +use log::{trace, warn}; +use tokio::sync::mpsc; use uuid::Uuid; -use actor::UuidResolverActor; -use error::Result; -use message::UuidResolveMsg; -use store::UuidStore; +pub use self::error::UuidResolverError; +pub use self::message::UuidResolverMsg; +pub use self::store::{HeedUuidStore, UuidStore}; +use self::error::Result; -#[cfg(test)] -use mockall::automock; - -pub use handle_impl::UuidResolverHandleImpl; -pub use store::HeedUuidStore; +pub type UuidResolverSender = mpsc::Sender; const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB -#[async_trait::async_trait] -#[cfg_attr(test, automock)] -pub trait UuidResolverHandle { - async fn get(&self, name: String) -> Result; - async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; - async fn delete(&self, name: String) -> Result; - async fn list(&self) -> Result>; - async fn snapshot(&self, path: PathBuf) -> Result>; - async fn get_size(&self) -> Result; - async fn dump(&self, path: PathBuf) -> Result>; +pub fn create_uuid_resolver(path: impl AsRef) -> Result> { + let (sender, reveiver) = mpsc::channel(100); + let store = HeedUuidStore::new(path)?; + let actor = UuidResolver::new(reveiver, store); + tokio::spawn(actor.run()); + Ok(sender) +} + +pub struct UuidResolver { + inbox: mpsc::Receiver, + store: S, +} + +impl UuidResolver { + pub fn new(inbox: mpsc::Receiver, store: S) -> Self { + Self { inbox, store } + } + + pub async fn run(mut self) { + use UuidResolverMsg::*; + + trace!("uuid resolver started"); + + loop { + match self.inbox.recv().await { + Some(Get { uid: name, ret }) => { + let _ = ret.send(self.handle_get(name).await); + } + Some(Delete { uid: name, ret }) => { + let _ = ret.send(self.handle_delete(name).await); + } + Some(List { ret }) => { + let _ = ret.send(self.handle_list().await); + } + Some(Insert { ret, uuid, name }) => { + let _ = ret.send(self.handle_insert(name, uuid).await); + } + Some(SnapshotRequest { path, ret }) => { + let _ = ret.send(self.handle_snapshot(path).await); + } + Some(GetSize { ret }) => { + let _ = ret.send(self.handle_get_size().await); + } + Some(DumpRequest { path, ret }) => { + let _ = ret.send(self.handle_dump(path).await); + } + // all senders have been dropped, need to quit. + None => break, + } + } + + warn!("exiting uuid resolver loop"); + } + + async fn handle_get(&self, uid: String) -> Result { + self.store + .get_uuid(uid.clone()) + .await? + .ok_or(UuidResolverError::UnexistingIndex(uid)) + } + + async fn handle_delete(&self, uid: String) -> Result { + self.store + .delete(uid.clone()) + .await? + .ok_or(UuidResolverError::UnexistingIndex(uid)) + } + + async fn handle_list(&self) -> Result> { + let result = self.store.list().await?; + Ok(result) + } + + async fn handle_snapshot(&self, path: PathBuf) -> Result> { + self.store.snapshot(path).await + } + + async fn handle_dump(&self, path: PathBuf) -> Result> { + self.store.dump(path).await + } + + async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { + if !is_index_uid_valid(&uid) { + return Err(UuidResolverError::BadlyFormatted(uid)); + } + self.store.insert(uid, uuid).await?; + Ok(()) + } + + async fn handle_get_size(&self) -> Result { + self.store.get_size().await + } +} + +fn is_index_uid_valid(uid: &str) -> bool { + uid.chars() + .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') } diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/store.rs b/meilisearch-lib/src/index_controller/uuid_resolver/store.rs index 5457ab91d..34ba8ced5 100644 --- a/meilisearch-lib/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-lib/src/index_controller/uuid_resolver/store.rs @@ -8,7 +8,8 @@ use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{error::UuidResolverError, Result, UUID_STORE_SIZE}; +use super::UUID_STORE_SIZE; +use super::error::{UuidResolverError, Result}; use crate::EnvSizer; #[derive(Serialize, Deserialize)]