add actors/ support index creation

This commit is contained in:
mpostma 2021-02-26 09:10:36 +01:00
parent 61ce749122
commit 672a4b5400
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
4 changed files with 357 additions and 0 deletions

View File

@ -0,0 +1,132 @@
use uuid::Uuid;
use std::path::{PathBuf, Path};
use chrono::Utc;
use tokio::sync::{mpsc, oneshot, RwLock};
use thiserror::Error;
use std::collections::HashMap;
use std::sync::Arc;
use milli::Index;
use std::collections::hash_map::Entry;
use std::fs::create_dir_all;
use heed::EnvOpenOptions;
use crate::index_controller::IndexMetadata;
pub type Result<T> = std::result::Result<T, IndexError>;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
enum IndexMsg {
CreateIndex { uuid: Uuid, primary_key: Option<String>, ret: oneshot::Sender<Result<IndexMetadata>> },
}
struct IndexActor<S> {
inbox: mpsc::Receiver<IndexMsg>,
store: S,
}
#[derive(Error, Debug)]
pub enum IndexError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("index already exists")]
IndexAlreadyExists,
}
#[async_trait::async_trait]
trait IndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata>;
}
impl<S: IndexStore> IndexActor<S> {
fn new(inbox: mpsc::Receiver<IndexMsg>, store: S) -> Self {
Self { inbox, store }
}
async fn run(mut self) {
loop {
match self.inbox.recv().await {
Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await,
None => break,
}
}
}
async fn handle_create_index(&self, uuid: Uuid, primary_key: Option<String>, ret: oneshot::Sender<Result<IndexMetadata>>) {
let result = self.store.create_index(uuid, primary_key).await;
let _ = ret.send(result);
}
}
#[derive(Clone)]
pub struct IndexActorHandle {
sender: mpsc::Sender<IndexMsg>,
}
impl IndexActorHandle {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(100);
let store = MapIndexStore::new("data.ms");
let actor = IndexActor::new(receiver, store);
tokio::task::spawn(actor.run());
Self { sender }
}
pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex { ret, uuid, primary_key };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
}
struct MapIndexStore {
root: PathBuf,
meta_store: AsyncMap<Uuid, IndexMetadata>,
index_store: AsyncMap<Uuid, Index>,
}
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata> {
let meta = match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(entry) => {
let meta = IndexMetadata {
uuid,
created_at: Utc::now(),
updated_at: Utc::now(),
primary_key,
};
entry.insert(meta).clone()
}
Entry::Occupied(_) => return Err(IndexError::IndexAlreadyExists),
};
let db_path = self.root.join(format!("index-{}", meta.uuid));
println!("before blocking");
let index: Result<Index> = tokio::task::spawn_blocking(move || {
create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100_000);
let index = Index::new(options, &db_path)
.map_err(|e| IndexError::Error(e))?;
Ok(index)
}).await.expect("thread died");
println!("after blocking");
self.index_store.write().await.insert(meta.uuid.clone(), index?);
Ok(meta)
}
}
impl MapIndexStore {
fn new(root: impl AsRef<Path>) -> Self {
let mut root = root.as_ref().to_owned();
root.push("indexes/");
let meta_store = Arc::new(RwLock::new(HashMap::new()));
let index_store = Arc::new(RwLock::new(HashMap::new()));
Self { meta_store, index_store, root }
}
}

View File

@ -0,0 +1,93 @@
mod index_actor;
mod update_actor;
mod uuid_resolver;
use tokio::fs::File;
use tokio::sync::oneshot;
use super::IndexController;
use uuid::Uuid;
use super::IndexMetadata;
pub struct ActorIndexController {
uuid_resolver: uuid_resolver::UuidResolverHandle,
index_actor: index_actor::IndexActorHandle,
}
impl ActorIndexController {
pub fn new() -> Self {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new();
let index_actor = index_actor::IndexActorHandle::new();
Self { uuid_resolver, index_actor }
}
}
enum IndexControllerMsg {
CreateIndex {
uuid: Uuid,
primary_key: Option<String>,
ret: oneshot::Sender<anyhow::Result<IndexMetadata>>,
},
Shutdown,
}
#[async_trait::async_trait]
impl IndexController for ActorIndexController {
async fn add_documents(
&self,
index: String,
method: milli::update::IndexDocumentsMethod,
format: milli::update::UpdateFormat,
data: File,
primary_key: Option<String>,
) -> anyhow::Result<super::UpdateStatus> {
todo!()
}
fn clear_documents(&self, index: String) -> anyhow::Result<super::UpdateStatus> {
todo!()
}
fn delete_documents(&self, index: String, document_ids: Vec<String>) -> anyhow::Result<super::UpdateStatus> {
todo!()
}
fn update_settings(&self, index_uid: String, settings: super::Settings) -> anyhow::Result<super::UpdateStatus> {
todo!()
}
async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> {
let super::IndexSettings { name, primary_key } = index_settings;
let uuid = self.uuid_resolver.create(name.unwrap()).await?;
let index_meta = self.index_actor.create_index(uuid, primary_key).await?;
Ok(index_meta)
}
fn delete_index(&self, index_uid: String) -> anyhow::Result<()> {
todo!()
}
fn swap_indices(&self, index1_uid: String, index2_uid: String) -> anyhow::Result<()> {
todo!()
}
fn index(&self, name: String) -> anyhow::Result<Option<std::sync::Arc<milli::Index>>> {
todo!()
}
fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<super::UpdateStatus>> {
todo!()
}
fn all_update_status(&self, index: String) -> anyhow::Result<Vec<super::UpdateStatus>> {
todo!()
}
fn list_indexes(&self) -> anyhow::Result<Vec<super::IndexMetadata>> {
todo!()
}
fn update_index(&self, name: String, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> {
todo!()
}
}

View File

@ -0,0 +1,16 @@
use super::index_actor::IndexActorHandle;
use uuid::Uuid;
use tokio::sync::{mpsc, oneshot};
enum UpdateMsg {
CreateIndex{
uuid: Uuid,
ret: oneshot::Sender<anyhow::Result<()>>,
}
}
struct UpdateActor<S> {
update_store: S,
inbox: mpsc::Receiver<UpdateMsg>,
index_actor: IndexActorHandle,
}

View File

@ -0,0 +1,116 @@
use thiserror::Error;
use tokio::sync::{RwLock, mpsc, oneshot};
use uuid::Uuid;
use std::collections::HashMap;
use std::sync::Arc;
use std::collections::hash_map::Entry;
use log::info;
pub type Result<T> = std::result::Result<T, UuidError>;
#[derive(Debug)]
enum UuidResolveMsg {
Resolve {
name: String,
ret: oneshot::Sender<Result<Option<Uuid>>>,
},
Create {
name: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Shutdown,
}
struct UuidResolverActor<S> {
inbox: mpsc::Receiver<UuidResolveMsg>,
store: S,
}
impl<S: UuidStore> UuidResolverActor<S> {
fn new(inbox: mpsc::Receiver<UuidResolveMsg>, store: S) -> Self {
Self { inbox, store }
}
async fn run(mut self) {
use UuidResolveMsg::*;
info!("uuid resolver started");
// TODO: benchmark and use buffered streams to improve throughput.
loop {
match self.inbox.recv().await {
Some(Create { name, ret }) => self.handle_create(name, ret).await,
Some(_) => (),
// all senders have ned dropped, need to quit.
None => break,
}
}
}
async fn handle_create(&self, name: String, ret: oneshot::Sender<Result<Uuid>>) {
let result = self.store.create_uuid(name).await;
let _ = ret.send(result);
}
}
#[derive(Clone)]
pub struct UuidResolverHandle {
sender: mpsc::Sender<UuidResolveMsg>,
}
impl UuidResolverHandle {
pub fn new() -> Self {
let (sender, reveiver) = mpsc::channel(100);
let store = MapUuidStore(Arc::new(RwLock::new(HashMap::new())));
let actor = UuidResolverActor::new(reveiver, store);
tokio::spawn(actor.run());
Self { sender }
}
pub async fn resolve(&self, name: String) -> anyhow::Result<Option<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Resolve { name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("Uuid resolver actor has been killed")?)
}
pub async fn create(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Create { name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("Uuid resolver actor has been killed")?)
}
}
#[derive(Clone, Debug, Error)]
pub enum UuidError {
#[error("Name already exist.")]
NameAlreadyExist,
}
#[async_trait::async_trait]
trait UuidStore {
async fn create_uuid(&self, name: String) -> Result<Uuid>;
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>>;
}
struct MapUuidStore(Arc<RwLock<HashMap<String, Uuid>>>);
#[async_trait::async_trait]
impl UuidStore for MapUuidStore {
async fn create_uuid(&self, name: String) -> Result<Uuid> {
match self.0.write().await.entry(name) {
Entry::Occupied(_) => Err(UuidError::NameAlreadyExist),
Entry::Vacant(entry) => {
let uuid = Uuid::new_v4();
let uuid = entry.insert(uuid);
Ok(uuid.clone())
}
}
}
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
Ok(self.0.read().await.get(&name).cloned())
}
}