async update store

This commit is contained in:
mpostma 2021-03-04 17:25:02 +01:00
parent a955e04ab6
commit 6a0a9fec6b
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
2 changed files with 99 additions and 57 deletions

View File

@ -24,15 +24,15 @@ pub enum UpdateError {
} }
enum UpdateMsg<D> { enum UpdateMsg<D> {
CreateIndex{
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
Update { Update {
uuid: Uuid, uuid: Uuid,
meta: UpdateMeta, meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<D>>, data: mpsc::Receiver<PayloadData<D>>,
ret: oneshot::Sender<Result<UpdateStatus>> ret: oneshot::Sender<Result<UpdateStatus>>
},
ListUpdates {
uuid: Uuid,
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>,
} }
} }
@ -58,12 +58,14 @@ where D: AsRef<[u8]> + Sized + 'static,
} }
async fn run(mut self) { async fn run(mut self) {
use UpdateMsg::*;
info!("started update actor."); info!("started update actor.");
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
Some(UpdateMsg::Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await, Some(Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await,
Some(_) => {} Some(ListUpdates { uuid, ret }) => self.handle_list_updates(uuid, ret).await,
None => {} None => {}
} }
} }
@ -99,6 +101,10 @@ where D: AsRef<[u8]> + Sized + 'static,
let _ = ret.send(result); let _ = ret.send(result);
}).await; }).await;
} }
async fn handle_list_updates(&self, uuid: Uuid, ret: oneshot::Sender<Result<Vec<UpdateStatus>>>) {
todo!()
}
} }
#[derive(Clone)] #[derive(Clone)]

View File

@ -1,12 +1,12 @@
use std::fs::remove_file;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::fs::remove_file;
use crossbeam_channel::Sender; use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
use heed::types::{OwnedType, DecodeIgnore, SerdeJson}; use heed::{Database, Env, EnvOpenOptions};
use heed::{EnvOpenOptions, Env, Database}; use serde::{Deserialize, Serialize};
use serde::{Serialize, Deserialize};
use std::fs::File; use std::fs::File;
use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use crate::index_controller::updates::*; use crate::index_controller::updates::*;
@ -22,17 +22,26 @@ pub struct UpdateStore<M, N, E> {
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>, failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>, aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
processing: Arc<RwLock<Option<Processing<M>>>>, processing: Arc<RwLock<Option<Processing<M>>>>,
notification_sender: Sender<()>, notification_sender: mpsc::Sender<()>,
} }
pub trait HandleUpdate<M, N, E> { pub trait HandleUpdate<M, N, E> {
fn handle_update(&mut self, meta: Processing<M>, content: File) -> Result<Processed<M, N>, Failed<M, E>>; fn handle_update(
&mut self,
meta: Processing<M>,
content: File,
) -> Result<Processed<M, N>, Failed<M, E>>;
} }
impl<M, N, E, F> HandleUpdate<M, N, E> for F impl<M, N, E, F> HandleUpdate<M, N, E> for F
where F: FnMut(Processing<M>, File) -> Result<Processed<M, N>, Failed<M, E>> where
F: FnMut(Processing<M>, File) -> Result<Processed<M, N>, Failed<M, E>>,
{ {
fn handle_update(&mut self, meta: Processing<M>, content: File) -> Result<Processed<M, N>, Failed<M, E>> { fn handle_update(
&mut self,
meta: Processing<M>,
content: File,
) -> Result<Processed<M, N>, Failed<M, E>> {
self(meta, content) self(meta, content)
} }
} }
@ -46,11 +55,11 @@ where
pub fn open<P, U>( pub fn open<P, U>(
mut options: EnvOpenOptions, mut options: EnvOpenOptions,
path: P, path: P,
mut update_handler: U, update_handler: U,
) -> heed::Result<Arc<Self>> ) -> heed::Result<Arc<Self>>
where where
P: AsRef<Path>, P: AsRef<Path>,
U: HandleUpdate<M, N, E> + Send + 'static, U: HandleUpdate<M, N, E> + Sync + Clone + Send + 'static,
{ {
options.max_dbs(5); options.max_dbs(5);
@ -62,7 +71,7 @@ where
let failed_meta = env.create_database(Some("failed-meta"))?; let failed_meta = env.create_database(Some("failed-meta"))?;
let processing = Arc::new(RwLock::new(None)); let processing = Arc::new(RwLock::new(None));
let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); let (notification_sender, mut notification_receiver) = mpsc::channel(10);
// Send a first notification to trigger the process. // Send a first notification to trigger the process.
let _ = notification_sender.send(()); let _ = notification_sender.send(());
@ -80,13 +89,19 @@ where
// We need a weak reference so we can take ownership on the arc later when we // We need a weak reference so we can take ownership on the arc later when we
// want to close the index. // want to close the index.
let update_store_weak = Arc::downgrade(&update_store); let update_store_weak = Arc::downgrade(&update_store);
std::thread::spawn(move || { tokio::task::spawn(async move {
// Block and wait for something to process. // Block and wait for something to process.
'outer: for _ in notification_receiver { 'outer: while let Some(_) = notification_receiver.recv().await {
loop { loop {
match update_store_weak.upgrade() { match update_store_weak.upgrade() {
Some(update_store) => { Some(update_store) => {
match update_store.process_pending_update(&mut update_handler) { let handler = update_handler.clone();
let res = tokio::task::spawn_blocking(move || {
update_store.process_pending_update(handler)
})
.await
.unwrap();
match res {
Ok(Some(_)) => (), Ok(Some(_)) => (),
Ok(None) => break, Ok(None) => break,
Err(e) => eprintln!("error while processing update: {}", e), Err(e) => eprintln!("error while processing update: {}", e),
@ -108,17 +123,20 @@ where
/// Returns the new biggest id to use to store the new update. /// Returns the new biggest id to use to store the new update.
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> { fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
let last_pending = self.pending_meta let last_pending = self
.pending_meta
.remap_data_type::<DecodeIgnore>() .remap_data_type::<DecodeIgnore>()
.last(txn)? .last(txn)?
.map(|(k, _)| k.get()); .map(|(k, _)| k.get());
let last_processed = self.processed_meta let last_processed = self
.processed_meta
.remap_data_type::<DecodeIgnore>() .remap_data_type::<DecodeIgnore>()
.last(txn)? .last(txn)?
.map(|(k, _)| k.get()); .map(|(k, _)| k.get());
let last_aborted = self.aborted_meta let last_aborted = self
.aborted_meta
.remap_data_type::<DecodeIgnore>() .remap_data_type::<DecodeIgnore>()
.last(txn)? .last(txn)?
.map(|(k, _)| k.get()); .map(|(k, _)| k.get());
@ -154,21 +172,22 @@ where
let meta = Pending::new(meta, update_id, index_uuid); let meta = Pending::new(meta, update_id, index_uuid);
self.pending_meta.put(&mut wtxn, &update_key, &meta)?; self.pending_meta.put(&mut wtxn, &update_key, &meta)?;
self.pending.put(&mut wtxn, &update_key, &content.as_ref().to_owned())?; self.pending
.put(&mut wtxn, &update_key, &content.as_ref().to_owned())?;
wtxn.commit()?; wtxn.commit()?;
if let Err(e) = self.notification_sender.try_send(()) { self.notification_sender
assert!(!e.is_disconnected(), "update notification channel is disconnected"); .blocking_send(())
} .expect("Update store loop exited.");
Ok(meta) Ok(meta)
} }
/// Executes the user provided function on the next pending update (the one with the lowest id). /// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and /// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed. /// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update<U>(&self, handler: &mut U) -> heed::Result<Option<()>> fn process_pending_update<U>(&self, mut handler: U) -> heed::Result<Option<()>>
where where
U: HandleUpdate<M, N, E> + Send + 'static, U: HandleUpdate<M, N, E>,
{ {
// Create a read transaction to be able to retrieve the pending update in order. // Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
@ -178,7 +197,8 @@ where
// a reader while processing it, not a writer. // a reader while processing it, not a writer.
match first_meta { match first_meta {
Some((first_id, pending)) => { Some((first_id, pending)) => {
let content_path = self.pending let content_path = self
.pending
.get(&rtxn, &first_id)? .get(&rtxn, &first_id)?
.expect("associated update content"); .expect("associated update content");
@ -186,10 +206,7 @@ where
// to the update handler. Processing store is non persistent to be able recover // to the update handler. Processing store is non persistent to be able recover
// from a failure // from a failure
let processing = pending.processing(); let processing = pending.processing();
self.processing self.processing.write().unwrap().replace(processing.clone());
.write()
.unwrap()
.replace(processing.clone());
let file = File::open(&content_path)?; let file = File::open(&content_path)?;
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let result = handler.handle_update(processing, file); let result = handler.handle_update(processing, file);
@ -199,10 +216,7 @@ where
// we must remove the content from the pending and processing stores and // we must remove the content from the pending and processing stores and
// write the *new* meta to the processed-meta store and commit. // write the *new* meta to the processed-meta store and commit.
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
self.processing self.processing.write().unwrap().take();
.write()
.unwrap()
.take();
self.pending_meta.delete(&mut wtxn, &first_id)?; self.pending_meta.delete(&mut wtxn, &first_id)?;
remove_file(&content_path)?; remove_file(&content_path)?;
self.pending.delete(&mut wtxn, &first_id)?; self.pending.delete(&mut wtxn, &first_id)?;
@ -213,8 +227,8 @@ where
wtxn.commit()?; wtxn.commit()?;
Ok(Some(())) Ok(Some(()))
}, }
None => Ok(None) None => Ok(None),
} }
} }
@ -241,7 +255,13 @@ where
let failed_iter = self.failed_meta.iter(&rtxn)?; let failed_iter = self.failed_meta.iter(&rtxn)?;
// We execute the user defined function with both iterators. // We execute the user defined function with both iterators.
(f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) (f)(
processing,
processed_iter,
aborted_iter,
pending_iter,
failed_iter,
)
} }
/// Returns the update associated meta or `None` if the update doesn't exist. /// Returns the update associated meta or `None` if the update doesn't exist.
@ -340,22 +360,33 @@ mod tests {
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
impl<M, N, F, E> HandleUpdate<M, N, E> for F impl<M, N, F, E> HandleUpdate<M, N, E> for F
where F: FnMut(Processing<M>, &[u8]) -> Result<Processed<M, N>, Failed<M, E>> + Send + 'static { where
fn handle_update(&mut self, meta: Processing<M>, content: &[u8]) -> Result<Processed<M, N>, Failed<M, E>> { F: FnMut(Processing<M>, &[u8]) -> Result<Processed<M, N>, Failed<M, E>> + Send + 'static,
self(meta, content) {
} fn handle_update(
&mut self,
meta: Processing<M>,
content: &[u8],
) -> Result<Processed<M, N>, Failed<M, E>> {
self(meta, content)
} }
}
#[test] #[test]
fn simple() { fn simple() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100); options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir, |meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> { let update_store = UpdateStore::open(
let new_meta = meta.meta().to_string() + " processed"; options,
let processed = meta.process(new_meta); dir,
Ok(processed) |meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
}).unwrap(); let new_meta = meta.meta().to_string() + " processed";
let processed = meta.process(new_meta);
Ok(processed)
},
)
.unwrap();
let meta = String::from("kiki"); let meta = String::from("kiki");
let update = update_store.register_update(meta, &[]).unwrap(); let update = update_store.register_update(meta, &[]).unwrap();
@ -374,12 +405,17 @@ mod tests {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100); options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir, |meta: Processing<String>, _content:&_| -> Result<_, Failed<_, ()>> { let update_store = UpdateStore::open(
thread::sleep(Duration::from_millis(400)); options,
let new_meta = meta.meta().to_string() + "processed"; dir,
let processed = meta.process(new_meta); |meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
Ok(processed) thread::sleep(Duration::from_millis(400));
}).unwrap(); let new_meta = meta.meta().to_string() + "processed";
let processed = meta.process(new_meta);
Ok(processed)
},
)
.unwrap();
let before_register = Instant::now(); let before_register = Instant::now();