From eca49e3a03541824b9305f42543d646ab33b62c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 18 Oct 2020 16:37:37 +0200 Subject: [PATCH] Introduce a notification channel for the UpdateStore --- Cargo.lock | 69 ++++++++++++++++++++++++++++++++++----------- Cargo.toml | 1 + src/update_store.rs | 49 +++++++++++++++++++++++++++++--- 3 files changed, 98 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 95c770b0a..3f0dd23fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,6 +219,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "chrono" version = "0.4.13" @@ -251,13 +257,19 @@ dependencies = [ "bitflags", ] +[[package]] +name = "const_fn" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce90df4c658c62f12d78f7508cf92f9173e5184a539c10bfe54a3107b3ffd0f2" + [[package]] name = "crc32fast" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", ] [[package]] @@ -296,6 +308,16 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils 0.8.0", +] + [[package]] name = "crossbeam-deque" version = "0.7.3" @@ -303,7 +325,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" dependencies = [ "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -314,8 +336,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg 1.0.0", - "cfg-if", - "crossbeam-utils", + "cfg-if 0.1.10", + "crossbeam-utils 0.7.2", "lazy_static", "maybe-uninit", "memoffset", @@ -328,8 +350,8 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab6bffe714b6bb07e42f201352c34f51fefd355ace793f9e638ebd52d23f98d2" dependencies = [ - "cfg-if", - "crossbeam-utils", + "cfg-if 0.1.10", + "crossbeam-utils 0.7.2", ] [[package]] @@ -339,7 +361,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ "autocfg 1.0.0", - "cfg-if", + "cfg-if 0.1.10", + "lazy_static", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec91540d98355f690a86367e566ecad2e9e579f230230eb7c21398372be73ea5" +dependencies = [ + "autocfg 1.0.0", + "cfg-if 1.0.0", + "const_fn", "lazy_static", ] @@ -398,7 +432,7 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "crc32fast", "libc", "miniz_oxide", @@ -563,7 +597,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "wasi", ] @@ -920,7 +954,7 @@ version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", ] [[package]] @@ -960,6 +994,7 @@ dependencies = [ "bstr", "byteorder", "criterion", + "crossbeam-channel", "csv", "flate2", "fst", @@ -1041,7 +1076,7 @@ version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "fuchsia-zircon", "fuchsia-zircon-sys", "iovec", @@ -1131,7 +1166,7 @@ version = "0.2.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "winapi 0.3.8", ] @@ -1144,7 +1179,7 @@ checksum = "85db2feff6bf70ebc3a4793191517d5f0331100a2f10f9bf93b5e5214f32b7b7" dependencies = [ "bitflags", "cc", - "cfg-if", + "cfg-if 0.1.10", "libc", ] @@ -1551,7 +1586,7 @@ checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280" dependencies = [ "crossbeam-deque", "crossbeam-queue", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "num_cpus", ] @@ -1805,7 +1840,7 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "redox_syscall", "winapi 0.3.8", @@ -1888,7 +1923,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "rand 0.7.3", "redox_syscall", @@ -2246,7 +2281,7 @@ version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "wasm-bindgen-macro", ] diff --git a/Cargo.toml b/Cargo.toml index ec5ccea40..e0492e129 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ default-run = "indexer" anyhow = "1.0.28" bstr = "0.2.13" byteorder = "1.3.4" +crossbeam-channel = "0.5.0" csv = "1.1.3" flate2 = "1.0.17" fst = "0.4.4" diff --git a/src/update_store.rs b/src/update_store.rs index b71cfab42..55fa480f6 100644 --- a/src/update_store.rs +++ b/src/update_store.rs @@ -1,7 +1,10 @@ use std::path::Path; +use std::sync::Arc; +use crossbeam_channel::{bounded, Sender, Receiver}; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; use heed::{EnvOpenOptions, Env, Database}; +use once_cell::sync::OnceCell; use serde::{Serialize, Deserialize}; use crate::BEU64; @@ -12,15 +15,49 @@ pub struct UpdateStore { pending_meta: Database, SerdeJson>, pending: Database, ByteSlice>, processed_meta: Database, SerdeJson>, + notification_sender: Sender<()>, } -impl UpdateStore { - pub fn open>(options: EnvOpenOptions, path: P) -> heed::Result> { +impl UpdateStore { + pub fn open( + options: EnvOpenOptions, + path: P, + mut update_function: F, + ) -> heed::Result>> + where + P: AsRef, + F: FnMut(u64, M, &[u8]) -> heed::Result + Send + 'static, + M: for<'a> Deserialize<'a> + Serialize, + { let env = options.open(path)?; let pending_meta = env.create_database(Some("pending-meta"))?; let pending = env.create_database(Some("pending"))?; let processed_meta = env.create_database(Some("processed-meta"))?; - Ok(UpdateStore { env, pending, pending_meta, processed_meta }) + + let (notification_sender, notification_receiver) = bounded(1); + let update_store = Arc::new(UpdateStore { + env, + pending, + pending_meta, + processed_meta, + notification_sender, + }); + + let update_store_cloned = update_store.clone(); + std::thread::spawn(move || { + // Block and wait for something to process. + for () in notification_receiver { + loop { + match update_store_cloned.process_pending_update(&mut update_function) { + Ok(Some(_)) => (), + Ok(None) => break, + Err(e) => eprintln!("error while processing update: {}", e), + } + } + } + }); + + Ok(update_store) } /// Returns the new biggest id to use to store the new update. @@ -64,13 +101,17 @@ impl UpdateStore { wtxn.commit()?; + if let Err(e) = self.notification_sender.try_send(()) { + assert!(!e.is_disconnected(), "update notification channel is disconnected"); + } + Ok(update_id) } /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - pub fn process_pending_update(&self, mut f: F) -> heed::Result> + fn process_pending_update(&self, mut f: F) -> heed::Result> where F: FnMut(u64, M, &[u8]) -> heed::Result, M: for<'a> Deserialize<'a> + Serialize,