diff --git a/src/update_store.rs b/src/update_store.rs index 5bc90d1c4..09920018f 100644 --- a/src/update_store.rs +++ b/src/update_store.rs @@ -1,7 +1,7 @@ use std::path::Path; use std::sync::Arc; -use crossbeam_channel::{bounded, Sender, Receiver}; +use crossbeam_channel::{Sender, Receiver}; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; use heed::{EnvOpenOptions, Env, Database}; use once_cell::sync::OnceCell; @@ -20,7 +20,7 @@ pub struct UpdateStore { impl UpdateStore { pub fn open( - options: EnvOpenOptions, + mut options: EnvOpenOptions, path: P, mut update_function: F, ) -> heed::Result>> @@ -29,12 +29,13 @@ impl UpdateStore { F: FnMut(u64, M, &[u8]) -> heed::Result + Send + 'static, M: for<'a> Deserialize<'a> + Serialize, { + options.max_dbs(3); let env = options.open(path)?; let pending_meta = env.create_database(Some("pending-meta"))?; let pending = env.create_database(Some("pending"))?; let processed_meta = env.create_database(Some("processed-meta"))?; - let (notification_sender, notification_receiver) = bounded(1); + let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); let update_store = Arc::new(UpdateStore { env, pending, @@ -188,3 +189,62 @@ impl UpdateStore { } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::{Duration, Instant}; + + #[test] + fn simple() { + let dir = tempfile::tempdir().unwrap(); + let options = EnvOpenOptions::new(); + let update_store = UpdateStore::open(options, dir, |id, meta: String, content| { + Ok(meta + " processed") + }).unwrap(); + + let meta = String::from("kiki"); + let update_id = update_store.register_update(&meta, &[]).unwrap(); + + thread::sleep(Duration::from_millis(100)); + + let meta = update_store.update_meta(update_id).unwrap().unwrap(); + assert_eq!(meta, "kiki processed"); + } + + #[test] + fn long_running_update() { + let dir = tempfile::tempdir().unwrap(); + let options = EnvOpenOptions::new(); + let update_store = UpdateStore::open(options, dir, |id, meta: String, content| { + thread::sleep(Duration::from_millis(400)); + Ok(meta + " processed") + }).unwrap(); + + let before_register = Instant::now(); + + let meta = String::from("kiki"); + let update_id_kiki = update_store.register_update(&meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + let meta = String::from("coco"); + let update_id_coco = update_store.register_update(&meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + let meta = String::from("cucu"); + let update_id_cucu = update_store.register_update(&meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + thread::sleep(Duration::from_millis(400 * 3 + 100)); + + let meta = update_store.update_meta(update_id_kiki).unwrap().unwrap(); + assert_eq!(meta, "kiki processed"); + + let meta = update_store.update_meta(update_id_coco).unwrap().unwrap(); + assert_eq!(meta, "coco processed"); + + let meta = update_store.update_meta(update_id_cucu).unwrap().unwrap(); + assert_eq!(meta, "cucu processed"); + } +}