simplify index actor run loop

This commit is contained in:
Marin Postma 2021-04-13 17:46:39 +02:00
parent 9ce68d11a7
commit b626d02ffe
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
2 changed files with 24 additions and 55 deletions

View File

@ -1,10 +1,8 @@
use std::fs::File; use std::fs::File;
use std::future::Future;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use async_stream::stream; use async_stream::stream;
use futures::pin_mut;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use heed::CompactionOption; use heed::CompactionOption;
use log::debug; use log::debug;
@ -22,8 +20,7 @@ use crate::option::IndexerOpts;
use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult};
pub struct IndexActor<S> { pub struct IndexActor<S> {
read_receiver: Option<mpsc::Receiver<IndexMsg>>, receiver: Option<mpsc::Receiver<IndexMsg>>,
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>, update_handler: Arc<UpdateHandler>,
processing: RwLock<Option<Uuid>>, processing: RwLock<Option<Uuid>>,
store: S, store: S,
@ -31,18 +28,16 @@ pub struct IndexActor<S> {
impl<S: IndexStore + Sync + Send> IndexActor<S> { impl<S: IndexStore + Sync + Send> IndexActor<S> {
pub fn new( pub fn new(
read_receiver: mpsc::Receiver<IndexMsg>, receiver: mpsc::Receiver<IndexMsg>,
write_receiver: mpsc::Receiver<IndexMsg>,
store: S, store: S,
) -> Result<Self> { ) -> Result<Self> {
let options = IndexerOpts::default(); let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?;
let update_handler = Arc::new(update_handler); let update_handler = Arc::new(update_handler);
let read_receiver = Some(read_receiver); let receiver = Some(receiver);
let write_receiver = Some(write_receiver);
Ok(Self { Ok(Self {
read_receiver, receiver,
write_receiver, store,
update_handler, update_handler,
processing: RwLock::new(None), processing: RwLock::new(None),
store, store,
@ -53,44 +48,21 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
/// through the read channel are processed concurrently, the messages sent through the write /// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time. /// channel are processed one at a time.
pub async fn run(mut self) { pub async fn run(mut self) {
let mut read_receiver = self let mut receiver = self
.read_receiver .receiver
.take() .take()
.expect("Index Actor must have a inbox at this point."); .expect("Index Actor must have a inbox at this point.");
let read_stream = stream! { let stream = stream! {
loop { loop {
match read_receiver.recv().await { match receiver.recv().await {
Some(msg) => yield msg, Some(msg) => yield msg,
None => break, None => break,
} }
} }
}; };
let mut write_receiver = self stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)).await;
.write_receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let write_stream = stream! {
loop {
match write_receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
pin_mut!(write_stream);
pin_mut!(read_stream);
let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg));
let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg));
let fut1: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut1);
let fut2: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut2);
tokio::join!(fut1, fut2);
} }
async fn handle_message(&self, msg: IndexMsg) { async fn handle_message(&self, msg: IndexMsg) {

View File

@ -13,8 +13,7 @@ use super::{
#[derive(Clone)] #[derive(Clone)]
pub struct IndexActorHandleImpl { pub struct IndexActorHandleImpl {
read_sender: mpsc::Sender<IndexMsg>, sender: mpsc::Sender<IndexMsg>,
write_sender: mpsc::Sender<IndexMsg>,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -26,7 +25,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
uuid, uuid,
primary_key, primary_key,
}; };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed") receiver.await.expect("IndexActor has been killed")
} }
@ -38,21 +37,21 @@ impl IndexActorHandle for IndexActorHandleImpl {
) -> anyhow::Result<UpdateResult> { ) -> anyhow::Result<UpdateResult> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data, uuid }; let msg = IndexMsg::Update { ret, meta, data, uuid };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> { async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret }; let msg = IndexMsg::Search { uuid, query, ret };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
async fn settings(&self, uuid: Uuid) -> Result<Settings> { async fn settings(&self, uuid: Uuid) -> Result<Settings> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret }; let msg = IndexMsg::Settings { uuid, ret };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
@ -71,7 +70,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
attributes_to_retrieve, attributes_to_retrieve,
limit, limit,
}; };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
@ -88,21 +87,21 @@ impl IndexActorHandle for IndexActorHandleImpl {
doc_id, doc_id,
attributes_to_retrieve, attributes_to_retrieve,
}; };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
async fn delete(&self, uuid: Uuid) -> Result<()> { async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret }; let msg = IndexMsg::Delete { uuid, ret };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> { async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret }; let msg = IndexMsg::GetMeta { uuid, ret };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
@ -113,14 +112,14 @@ impl IndexActorHandle for IndexActorHandleImpl {
index_settings, index_settings,
ret, ret,
}; };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { uuid, path, ret }; let msg = IndexMsg::Snapshot { uuid, path, ret };
let _ = self.read_sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
@ -134,15 +133,13 @@ impl IndexActorHandle for IndexActorHandleImpl {
impl IndexActorHandleImpl { impl IndexActorHandleImpl {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> { pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
let (read_sender, read_receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path, index_size); let store = MapIndexStore::new(path, index_size);
let actor = IndexActor::new(read_receiver, write_receiver, store)?; let actor = IndexActor::new(receiver, store)?;
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Ok(Self { Ok(Self {
read_sender, sender,
write_sender,
}) })
} }
} }