conccurrent update run loop

This commit is contained in:
marin postma 2021-06-02 17:45:28 +02:00
parent 1e659bb17b
commit 0f767e3743
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
2 changed files with 302 additions and 267 deletions

483
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,8 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use async_stream::stream;
use futures::StreamExt;
use log::info; use log::info;
use oxidized_json_checker::JsonChecker; use oxidized_json_checker::JsonChecker;
use tokio::fs; use tokio::fs;
@ -18,7 +20,7 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, I> { pub struct UpdateActor<D, I> {
path: PathBuf, path: PathBuf,
store: Arc<UpdateStore>, store: Arc<UpdateStore>,
inbox: mpsc::Receiver<UpdateMsg<D>>, inbox: Option<mpsc::Receiver<UpdateMsg<D>>>,
index_handle: I, index_handle: I,
must_exit: Arc<AtomicBool>, must_exit: Arc<AtomicBool>,
} }
@ -45,7 +47,7 @@ where
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?; let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
std::fs::create_dir_all(path.join("update_files"))?; std::fs::create_dir_all(path.join("update_files"))?;
let inbox = Some(inbox);
Ok(Self { Ok(Self {
path, path,
store, store,
@ -60,43 +62,59 @@ where
info!("Started update actor."); info!("Started update actor.");
loop { let mut inbox = self
let msg = self.inbox.recv().await; .inbox
.take()
.expect("A receiver should be present by now.");
if self.must_exit.load(std::sync::atomic::Ordering::Relaxed) { let must_exit = self.must_exit.clone();
let stream = stream! {
loop {
let msg = inbox.recv().await;
if must_exit.load(std::sync::atomic::Ordering::Relaxed) {
break; break;
} }
match msg { match msg {
Some(Update { Some(msg) => yield msg,
None => break,
}
}
};
stream
.for_each_concurrent(Some(10), |msg| async {
match msg {
Update {
uuid, uuid,
meta, meta,
data, data,
ret, ret,
}) => { } => {
let _ = ret.send(self.handle_update(uuid, meta, data).await); let _ = ret.send(self.handle_update(uuid, meta, data).await);
} }
Some(ListUpdates { uuid, ret }) => { ListUpdates { uuid, ret } => {
let _ = ret.send(self.handle_list_updates(uuid).await); let _ = ret.send(self.handle_list_updates(uuid).await);
} }
Some(GetUpdate { uuid, ret, id }) => { GetUpdate { uuid, ret, id } => {
let _ = ret.send(self.handle_get_update(uuid, id).await); let _ = ret.send(self.handle_get_update(uuid, id).await);
} }
Some(Delete { uuid, ret }) => { Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await); let _ = ret.send(self.handle_delete(uuid).await);
} }
Some(Snapshot { uuids, path, ret }) => { Snapshot { uuids, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuids, path).await); let _ = ret.send(self.handle_snapshot(uuids, path).await);
} }
Some(Dump { uuids, path, ret }) => { GetInfo { ret } => {
let _ = ret.send(self.handle_dump(uuids, path).await);
}
Some(GetInfo { ret }) => {
let _ = ret.send(self.handle_get_info().await); let _ = ret.send(self.handle_get_info().await);
} }
None => break, Dump { uuids, path, ret } => {
let _ = ret.send(self.handle_dump(uuids, path).await);
} }
} }
})
.await;
} }
async fn handle_update( async fn handle_update(