diff --git a/src/index_controller/update_actor.rs b/src/index_controller/update_actor.rs index 384c52098..2f3c058bc 100644 --- a/src/index_controller/update_actor.rs +++ b/src/index_controller/update_actor.rs @@ -24,15 +24,15 @@ pub enum UpdateError { } enum UpdateMsg { - CreateIndex{ - uuid: Uuid, - ret: oneshot::Sender>, - }, Update { uuid: Uuid, meta: UpdateMeta, data: mpsc::Receiver>, ret: oneshot::Sender> + }, + ListUpdates { + uuid: Uuid, + ret: oneshot::Sender>>, } } @@ -58,12 +58,14 @@ where D: AsRef<[u8]> + Sized + 'static, } async fn run(mut self) { + use UpdateMsg::*; + info!("started update actor."); loop { match self.inbox.recv().await { - Some(UpdateMsg::Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await, - Some(_) => {} + Some(Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await, + Some(ListUpdates { uuid, ret }) => self.handle_list_updates(uuid, ret).await, None => {} } } @@ -99,6 +101,10 @@ where D: AsRef<[u8]> + Sized + 'static, let _ = ret.send(result); }).await; } + + async fn handle_list_updates(&self, uuid: Uuid, ret: oneshot::Sender>>) { + todo!() + } } #[derive(Clone)] diff --git a/src/index_controller/update_store.rs b/src/index_controller/update_store.rs index ae4bfb8d8..02c15ed8c 100644 --- a/src/index_controller/update_store.rs +++ b/src/index_controller/update_store.rs @@ -1,12 +1,12 @@ +use std::fs::remove_file; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use std::fs::remove_file; -use crossbeam_channel::Sender; -use heed::types::{OwnedType, DecodeIgnore, SerdeJson}; -use heed::{EnvOpenOptions, Env, Database}; -use serde::{Serialize, Deserialize}; +use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; +use heed::{Database, Env, EnvOpenOptions}; +use serde::{Deserialize, Serialize}; use std::fs::File; +use tokio::sync::mpsc; use uuid::Uuid; use crate::index_controller::updates::*; @@ -22,17 +22,26 @@ pub struct UpdateStore { failed_meta: Database, SerdeJson>>, aborted_meta: Database, SerdeJson>>, processing: Arc>>>, - notification_sender: Sender<()>, + notification_sender: mpsc::Sender<()>, } pub trait HandleUpdate { - fn handle_update(&mut self, meta: Processing, content: File) -> Result, Failed>; + fn handle_update( + &mut self, + meta: Processing, + content: File, + ) -> Result, Failed>; } impl HandleUpdate for F -where F: FnMut(Processing, File) -> Result, Failed> +where + F: FnMut(Processing, File) -> Result, Failed>, { - fn handle_update(&mut self, meta: Processing, content: File) -> Result, Failed> { + fn handle_update( + &mut self, + meta: Processing, + content: File, + ) -> Result, Failed> { self(meta, content) } } @@ -46,11 +55,11 @@ where pub fn open( mut options: EnvOpenOptions, path: P, - mut update_handler: U, + update_handler: U, ) -> heed::Result> where P: AsRef, - U: HandleUpdate + Send + 'static, + U: HandleUpdate + Sync + Clone + Send + 'static, { options.max_dbs(5); @@ -62,7 +71,7 @@ where let failed_meta = env.create_database(Some("failed-meta"))?; let processing = Arc::new(RwLock::new(None)); - let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); + let (notification_sender, mut notification_receiver) = mpsc::channel(10); // Send a first notification to trigger the process. let _ = notification_sender.send(()); @@ -80,13 +89,19 @@ where // We need a weak reference so we can take ownership on the arc later when we // want to close the index. let update_store_weak = Arc::downgrade(&update_store); - std::thread::spawn(move || { + tokio::task::spawn(async move { // Block and wait for something to process. - 'outer: for _ in notification_receiver { + 'outer: while let Some(_) = notification_receiver.recv().await { loop { match update_store_weak.upgrade() { Some(update_store) => { - match update_store.process_pending_update(&mut update_handler) { + let handler = update_handler.clone(); + let res = tokio::task::spawn_blocking(move || { + update_store.process_pending_update(handler) + }) + .await + .unwrap(); + match res { Ok(Some(_)) => (), Ok(None) => break, Err(e) => eprintln!("error while processing update: {}", e), @@ -108,17 +123,20 @@ where /// Returns the new biggest id to use to store the new update. fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { - let last_pending = self.pending_meta + let last_pending = self + .pending_meta .remap_data_type::() .last(txn)? .map(|(k, _)| k.get()); - let last_processed = self.processed_meta + let last_processed = self + .processed_meta .remap_data_type::() .last(txn)? .map(|(k, _)| k.get()); - let last_aborted = self.aborted_meta + let last_aborted = self + .aborted_meta .remap_data_type::() .last(txn)? .map(|(k, _)| k.get()); @@ -154,21 +172,22 @@ where let meta = Pending::new(meta, update_id, index_uuid); self.pending_meta.put(&mut wtxn, &update_key, &meta)?; - self.pending.put(&mut wtxn, &update_key, &content.as_ref().to_owned())?; + self.pending + .put(&mut wtxn, &update_key, &content.as_ref().to_owned())?; wtxn.commit()?; - if let Err(e) = self.notification_sender.try_send(()) { - assert!(!e.is_disconnected(), "update notification channel is disconnected"); - } + self.notification_sender + .blocking_send(()) + .expect("Update store loop exited."); Ok(meta) } /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, handler: &mut U) -> heed::Result> + fn process_pending_update(&self, mut handler: U) -> heed::Result> where - U: HandleUpdate + Send + 'static, + U: HandleUpdate, { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; @@ -178,7 +197,8 @@ where // a reader while processing it, not a writer. match first_meta { Some((first_id, pending)) => { - let content_path = self.pending + let content_path = self + .pending .get(&rtxn, &first_id)? .expect("associated update content"); @@ -186,10 +206,7 @@ where // to the update handler. Processing store is non persistent to be able recover // from a failure let processing = pending.processing(); - self.processing - .write() - .unwrap() - .replace(processing.clone()); + self.processing.write().unwrap().replace(processing.clone()); let file = File::open(&content_path)?; // Process the pending update using the provided user function. let result = handler.handle_update(processing, file); @@ -199,10 +216,7 @@ where // we must remove the content from the pending and processing stores and // write the *new* meta to the processed-meta store and commit. let mut wtxn = self.env.write_txn()?; - self.processing - .write() - .unwrap() - .take(); + self.processing.write().unwrap().take(); self.pending_meta.delete(&mut wtxn, &first_id)?; remove_file(&content_path)?; self.pending.delete(&mut wtxn, &first_id)?; @@ -213,8 +227,8 @@ where wtxn.commit()?; Ok(Some(())) - }, - None => Ok(None) + } + None => Ok(None), } } @@ -241,7 +255,13 @@ where let failed_iter = self.failed_meta.iter(&rtxn)?; // We execute the user defined function with both iterators. - (f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) + (f)( + processing, + processed_iter, + aborted_iter, + pending_iter, + failed_iter, + ) } /// Returns the update associated meta or `None` if the update doesn't exist. @@ -340,22 +360,33 @@ mod tests { use std::time::{Duration, Instant}; impl HandleUpdate for F - where F: FnMut(Processing, &[u8]) -> Result, Failed> + Send + 'static { - fn handle_update(&mut self, meta: Processing, content: &[u8]) -> Result, Failed> { - self(meta, content) - } + where + F: FnMut(Processing, &[u8]) -> Result, Failed> + Send + 'static, + { + fn handle_update( + &mut self, + meta: Processing, + content: &[u8], + ) -> Result, Failed> { + self(meta, content) } + } #[test] fn simple() { let dir = tempfile::tempdir().unwrap(); let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir, |meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { - let new_meta = meta.meta().to_string() + " processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); + let update_store = UpdateStore::open( + options, + dir, + |meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { + let new_meta = meta.meta().to_string() + " processed"; + let processed = meta.process(new_meta); + Ok(processed) + }, + ) + .unwrap(); let meta = String::from("kiki"); let update = update_store.register_update(meta, &[]).unwrap(); @@ -374,12 +405,17 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir, |meta: Processing, _content:&_| -> Result<_, Failed<_, ()>> { - thread::sleep(Duration::from_millis(400)); - let new_meta = meta.meta().to_string() + "processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); + let update_store = UpdateStore::open( + options, + dir, + |meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { + thread::sleep(Duration::from_millis(400)); + let new_meta = meta.meta().to_string() + "processed"; + let processed = meta.process(new_meta); + Ok(processed) + }, + ) + .unwrap(); let before_register = Instant::now();