diff --git a/Cargo.lock b/Cargo.lock index 7f3b20e89..5f1134379 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "actix-codec" version = "0.3.0" @@ -428,7 +430,7 @@ checksum = "afddf7f520a80dbf76e6f50a35bca42a2331ef227a28b3b6dc5c2e2338d114b1" [[package]] name = "assert-json-diff" version = "1.0.1" -source = "git+https://github.com/qdequele/assert-json-diff#9012a0c8866d0f2db0ef9a6242e4a19d1e8c67e4" +source = "git+https://github.com/qdequele/assert-json-diff?branch=master#9012a0c8866d0f2db0ef9a6242e4a19d1e8c67e4" dependencies = [ "serde", "serde_json", diff --git a/src/index_controller/index_actor.rs b/src/index_controller/index_actor.rs index 59f000575..0e74d6665 100644 --- a/src/index_controller/index_actor.rs +++ b/src/index_controller/index_actor.rs @@ -11,6 +11,8 @@ use log::info; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; +use std::future::Future; +use futures::pin_mut; use super::update_handler::UpdateHandler; use crate::index::UpdateResult as UResult; @@ -61,7 +63,8 @@ enum IndexMsg { } struct IndexActor { - inbox: Option>, + read_receiver: Option>, + write_receiver: Option>, update_handler: Arc, store: S, } @@ -82,60 +85,96 @@ trait IndexStore { } impl IndexActor { - fn new(inbox: mpsc::Receiver, store: S) -> Self { + fn new( + read_receiver: mpsc::Receiver, + write_receiver: mpsc::Receiver, + store: S + ) -> Self { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options).unwrap(); let update_handler = Arc::new(update_handler); - let inbox = Some(inbox); + let read_receiver = Some(read_receiver); + let write_receiver = Some(write_receiver); Self { - inbox, + read_receiver, + write_receiver, store, update_handler, } } + /// `run` poll the write_receiver and read_receiver concurrently, but while messages send + /// through the read channel are processed concurrently, the messages sent through the write + /// channel are processed one at a time. async fn run(mut self) { - use IndexMsg::*; - - let mut inbox = self - .inbox + let mut read_receiver = self + .read_receiver .take() .expect("Index Actor must have a inbox at this point."); - let stream = stream! { + let read_stream = stream! { loop { - match inbox.recv().await { + match read_receiver.recv().await { Some(msg) => yield msg, None => break, } } }; - let fut = stream.for_each_concurrent(Some(10), |msg| async { - match msg { - CreateIndex { - uuid, - primary_key, - ret, - } => self.handle_create_index(uuid, primary_key, ret).await, - Update { ret, meta, data } => self.handle_update(meta, data, ret).await, - Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, - Settings { ret, uuid } => self.handle_settings(uuid, ret).await, - Documents { - ret, - uuid, - attributes_to_retrieve, - offset, - limit, - } => { - self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) - .await - } - Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await, - } - }); + let mut write_receiver = self + .write_receiver + .take() + .expect("Index Actor must have a inbox at this point."); - fut.await; + let write_stream = stream! { + loop { + match write_receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + + pin_mut!(write_stream); + pin_mut!(read_stream); + + let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)); + let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg)); + + let fut1: Box + Unpin + Send> = Box::new(fut1); + let fut2: Box + Unpin + Send> = Box::new(fut2); + + //let futures = futures::stream::futures_unordered::FuturesUnordered::new(); + //futures.push(fut1); + //futures.push(fut2); + //futures.for_each(f) + tokio::join!(fut1, fut2); + + } + + async fn handle_message(&self, msg: IndexMsg) { + use IndexMsg::*; + match msg { + CreateIndex { + uuid, + primary_key, + ret, + } => self.handle_create_index(uuid, primary_key, ret).await, + Update { ret, meta, data } => self.handle_update(meta, data, ret).await, + Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, + Settings { ret, uuid } => self.handle_settings(uuid, ret).await, + Documents { + ret, + uuid, + attributes_to_retrieve, + offset, + limit, + } => { + self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) + .await + } + Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await, + } } async fn handle_search( @@ -221,17 +260,19 @@ impl IndexActor { #[derive(Clone)] pub struct IndexActorHandle { - sender: mpsc::Sender, + read_sender: mpsc::Sender, + write_sender: mpsc::Sender, } impl IndexActorHandle { pub fn new(path: impl AsRef) -> Self { - let (sender, receiver) = mpsc::channel(100); + let (read_sender, read_receiver) = mpsc::channel(100); + let (write_sender, write_receiver) = mpsc::channel(100); let store = MapIndexStore::new(path); - let actor = IndexActor::new(receiver, store); + let actor = IndexActor::new(read_receiver, write_receiver, store); tokio::task::spawn(actor.run()); - Self { sender } + Self { read_sender, write_sender } } pub async fn create_index( @@ -245,28 +286,28 @@ impl IndexActorHandle { uuid, primary_key, }; - let _ = self.sender.send(msg).await; + let _ = self.read_sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } pub async fn update(&self, meta: Processing, data: std::fs::File) -> UpdateResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Update { ret, meta, data }; - let _ = self.sender.send(msg).await; + let _ = self.read_sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Search { uuid, query, ret }; - let _ = self.sender.send(msg).await; + let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } pub async fn settings(&self, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Settings { uuid, ret }; - let _ = self.sender.send(msg).await; + let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -285,7 +326,7 @@ impl IndexActorHandle { attributes_to_retrieve, limit, }; - let _ = self.sender.send(msg).await; + let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -302,7 +343,7 @@ impl IndexActorHandle { doc_id, attributes_to_retrieve, }; - let _ = self.sender.send(msg).await; + let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } } diff --git a/src/index_controller/update_actor.rs b/src/index_controller/update_actor.rs index 2f3c058bc..a9e2a412a 100644 --- a/src/index_controller/update_actor.rs +++ b/src/index_controller/update_actor.rs @@ -1,17 +1,18 @@ use std::fs::create_dir_all; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::collections::{HashMap, hash_map::Entry}; -use log::info; use super::index_actor::IndexActorHandle; +use log::info; use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; use tokio::fs::File; use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, oneshot, RwLock}; +use uuid::Uuid; -use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index::UpdateResult; +use crate::index_controller::{UpdateMeta, UpdateStatus}; pub type Result = std::result::Result; type UpdateStore = super::update_store::UpdateStore; @@ -28,33 +29,42 @@ enum UpdateMsg { uuid: Uuid, meta: UpdateMeta, data: mpsc::Receiver>, - ret: oneshot::Sender> + ret: oneshot::Sender>, }, ListUpdates { uuid: Uuid, ret: oneshot::Sender>>, - } + }, } -struct UpdateActor { +struct UpdateActor { path: PathBuf, - store: Arc, + store: S, inbox: mpsc::Receiver>, - index_handle: IndexActorHandle, } -impl UpdateActor -where D: AsRef<[u8]> + Sized + 'static, +#[async_trait::async_trait] +trait UpdateStoreStore { + async fn get_or_create(&self, uuid: Uuid) -> Result>; +} + +impl UpdateActor +where + D: AsRef<[u8]> + Sized + 'static, + S: UpdateStoreStore, { fn new( - store: Arc, + store: S, inbox: mpsc::Receiver>, - index_handle: IndexActorHandle, path: impl AsRef, - ) -> Self { + ) -> Self { let path = path.as_ref().to_owned().join("update_files"); create_dir_all(&path).unwrap(); - Self { store, inbox, index_handle, path } + Self { + store, + inbox, + path, + } } async fn run(mut self) { @@ -64,15 +74,26 @@ where D: AsRef<[u8]> + Sized + 'static, loop { match self.inbox.recv().await { - Some(Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await, + Some(Update { + uuid, + meta, + data, + ret, + }) => self.handle_update(uuid, meta, data, ret).await, Some(ListUpdates { uuid, ret }) => self.handle_list_updates(uuid, ret).await, None => {} } } } - async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver>, ret: oneshot::Sender>) { - let store = self.store.clone(); + async fn handle_update( + &self, + uuid: Uuid, + meta: UpdateMeta, + mut payload: mpsc::Receiver>, + ret: oneshot::Sender>, + ) { + let update_store = self.store.get_or_create(uuid).await.unwrap(); let update_file_id = uuid::Uuid::new_v4(); let path = self.path.join(format!("update_{}", update_file_id)); let mut file = File::create(&path).await.unwrap(); @@ -84,7 +105,7 @@ where D: AsRef<[u8]> + Sized + 'static, } Err(e) => { ret.send(Err(UpdateError::Error(e))); - return + return; } } } @@ -94,15 +115,20 @@ where D: AsRef<[u8]> + Sized + 'static, let file = file.into_std().await; let result = tokio::task::spawn_blocking(move || { - let result = store + let result = update_store .register_update(meta, path, uuid) .map(|pending| UpdateStatus::Pending(pending)) .map_err(|e| UpdateError::Error(Box::new(e))); let _ = ret.send(result); - }).await; + }) + .await; } - async fn handle_list_updates(&self, uuid: Uuid, ret: oneshot::Sender>>) { + async fn handle_list_updates( + &self, + uuid: Uuid, + ret: oneshot::Sender>>, + ) { todo!() } } @@ -113,29 +139,26 @@ pub struct UpdateActorHandle { } impl UpdateActorHandle -where D: AsRef<[u8]> + Sized + 'static, +where + D: AsRef<[u8]> + Sized + 'static, { pub fn new(index_handle: IndexActorHandle, path: impl AsRef) -> Self { + let path = path.as_ref().to_owned().join("updates"); let (sender, receiver) = mpsc::channel(100); - let mut options = heed::EnvOpenOptions::new(); - options.map_size(4096 * 100_000); + let store = MapUpdateStoreStore::new(index_handle, &path); + let actor = UpdateActor::new(store, receiver, path); - let path = path - .as_ref() - .to_owned() - .join("updates"); - - create_dir_all(&path).unwrap(); - let index_handle_clone = index_handle.clone(); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle_clone.update(meta, file)) - }).unwrap(); - let actor = UpdateActor::new(store, receiver, index_handle, path); tokio::task::spawn_local(actor.run()); + Self { sender } } - pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver>, uuid: Uuid) -> Result { + pub async fn update( + &self, + meta: UpdateMeta, + data: mpsc::Receiver>, + uuid: Uuid, + ) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Update { uuid, @@ -143,7 +166,46 @@ where D: AsRef<[u8]> + Sized + 'static, meta, ret, }; - let _ = self.sender.send(msg).await; + let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } } + +struct MapUpdateStoreStore { + db: Arc>>>, + index_handle: IndexActorHandle, + path: PathBuf, +} + +impl MapUpdateStoreStore { + fn new(index_handle: IndexActorHandle, path: impl AsRef) -> Self { + let db = Arc::new(RwLock::new(HashMap::new())); + let path = path.as_ref().to_owned(); + Self { db, index_handle, path } + } +} + +#[async_trait::async_trait] +impl UpdateStoreStore for MapUpdateStoreStore { + async fn get_or_create(&self, uuid: Uuid) -> Result> { + match self.db.write().await.entry(uuid) { + Entry::Vacant(e) => { + let mut options = heed::EnvOpenOptions::new(); + options.map_size(4096 * 100_000); + let path = self.path.clone().join(format!("updates-{}", e.key())); + create_dir_all(&path).unwrap(); + let index_handle = self.index_handle.clone(); + let store = UpdateStore::open(options, &path, move |meta, file| { + futures::executor::block_on(index_handle.update(meta, file)) + }).unwrap(); + let store = e.insert(store); + Ok(store.clone()) + } + Entry::Occupied(e) => { + Ok(e.get().clone()) + } + } + } +} + +