From 23a89732a58234d37e038ad78fd19bb1419dc5d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Fri, 15 Nov 2019 17:33:06 +0100 Subject: [PATCH] Prefer using a global update callback common to all indexes --- meilidb-core/examples/from_file.rs | 6 +- meilidb-core/src/database.rs | 112 ++++++++++++++++------------- meilidb-http/src/data.rs | 14 ++-- meilidb-http/src/lib.rs | 2 +- meilidb-http/src/main.rs | 8 ++- meilidb-http/src/routes/index.rs | 9 --- 6 files changed, 77 insertions(+), 74 deletions(-) diff --git a/meilidb-core/examples/from_file.rs b/meilidb-core/examples/from_file.rs index dde8296cc..f7a135dd8 100644 --- a/meilidb-core/examples/from_file.rs +++ b/meilidb-core/examples/from_file.rs @@ -104,14 +104,14 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box index, None => database.create_index(&command.index_name).unwrap(), }; - let done = database.set_update_callback(&command.index_name, Box::new(update_fn)); - assert!(done, "could not set the index update function"); + database.set_update_callback(Box::new(update_fn)); let env = &database.env; diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 2ff16a461..fc290a0f6 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -11,14 +11,15 @@ use log::debug; use crate::{store, update, Index, MResult}; -pub type BoxUpdateFn = Box; +pub type BoxUpdateFn = Box; type ArcSwapFn = arc_swap::ArcSwapOption; pub struct Database { pub env: heed::Env, common_store: heed::PolyDatabase, indexes_store: heed::Database, - indexes: RwLock, thread::JoinHandle<()>)>>, + indexes: RwLock)>>, + update_fn: Arc, } macro_rules! r#break_try { @@ -41,7 +42,13 @@ pub enum UpdateEvent { pub type UpdateEvents = Receiver; pub type UpdateEventsEmitter = Sender; -fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc, index: Index) { +fn update_awaiter( + receiver: UpdateEvents, + env: heed::Env, + index_name: &str, + update_fn: Arc, + index: Index, +) { let mut receiver = receiver.into_iter(); while let Some(UpdateEvent::NewUpdate) = receiver.next() { loop { @@ -84,7 +91,7 @@ fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc(Some("indexes"))?; + let update_fn = Arc::new(ArcSwapFn::empty()); // list all indexes that needs to be opened let mut must_open = Vec::new(); @@ -128,21 +136,27 @@ impl Database { continue; } }; - let update_fn = Arc::new(ArcSwapFn::empty()); let env_clone = env.clone(); let index_clone = index.clone(); + let name_clone = index_name.clone(); let update_fn_clone = update_fn.clone(); let handle = thread::spawn(move || { - update_awaiter(receiver, env_clone, update_fn_clone, index_clone) + update_awaiter( + receiver, + env_clone, + &name_clone, + update_fn_clone, + index_clone, + ) }); // send an update notification to make sure that // possible pre-boot updates are consumed sender.send(UpdateEvent::NewUpdate).unwrap(); - let result = indexes.insert(index_name, (index, update_fn, handle)); + let result = indexes.insert(index_name, (index, handle)); assert!( result.is_none(), "The index should not have been already open" @@ -154,6 +168,7 @@ impl Database { common_store, indexes_store, indexes: RwLock::new(indexes), + update_fn, }) } @@ -180,16 +195,21 @@ impl Database { let env_clone = self.env.clone(); let index_clone = index.clone(); - - let no_update_fn = Arc::new(ArcSwapFn::empty()); - let no_update_fn_clone = no_update_fn.clone(); + let name_clone = name.to_owned(); + let update_fn_clone = self.update_fn.clone(); let handle = thread::spawn(move || { - update_awaiter(receiver, env_clone, no_update_fn_clone, index_clone) + update_awaiter( + receiver, + env_clone, + &name_clone, + update_fn_clone, + index_clone, + ) }); writer.commit()?; - entry.insert((index.clone(), no_update_fn, handle)); + entry.insert((index.clone(), handle)); Ok(index) } @@ -201,7 +221,7 @@ impl Database { let mut indexes_lock = self.indexes.write().unwrap(); match indexes_lock.remove_entry(name) { - Some((name, (index, _fn, handle))) => { + Some((name, (index, handle))) => { // remove the index name from the list of indexes // and clear all the LMDB dbi let mut writer = self.env.write_txn()?; @@ -218,27 +238,13 @@ impl Database { } } - pub fn set_update_callback(&self, name: impl AsRef, update_fn: BoxUpdateFn) -> bool { - let indexes_lock = self.indexes.read().unwrap(); - match indexes_lock.get(name.as_ref()) { - Some((_, current_update_fn, _)) => { - let update_fn = Some(Arc::new(update_fn)); - current_update_fn.swap(update_fn); - true - } - None => false, - } + pub fn set_update_callback(&self, update_fn: BoxUpdateFn) { + let update_fn = Some(Arc::new(update_fn)); + self.update_fn.swap(update_fn); } - pub fn unset_update_callback(&self, name: impl AsRef) -> bool { - let indexes_lock = self.indexes.read().unwrap(); - match indexes_lock.get(name.as_ref()) { - Some((_, current_update_fn, _)) => { - current_update_fn.swap(None); - true - } - None => false, - } + pub fn unset_update_callback(&self) { + self.update_fn.swap(None); } pub fn copy_and_compact_to_path>(&self, path: P) -> ZResult { @@ -272,11 +278,12 @@ mod tests { let env = &database.env; let (sender, receiver) = mpsc::sync_channel(100); - let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let update_fn = move |_name: &str, update: ProcessedUpdateResult| { + sender.send(update.update_id).unwrap() + }; let index = database.create_index("test").unwrap(); - let done = database.set_update_callback("test", Box::new(update_fn)); - assert!(done, "could not set the index update function"); + database.set_update_callback(Box::new(update_fn)); let schema = { let data = r#" @@ -334,11 +341,12 @@ mod tests { let env = &database.env; let (sender, receiver) = mpsc::sync_channel(100); - let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let update_fn = move |_name: &str, update: ProcessedUpdateResult| { + sender.send(update.update_id).unwrap() + }; let index = database.create_index("test").unwrap(); - let done = database.set_update_callback("test", Box::new(update_fn)); - assert!(done, "could not set the index update function"); + database.set_update_callback(Box::new(update_fn)); let schema = { let data = r#" @@ -395,11 +403,12 @@ mod tests { let env = &database.env; let (sender, receiver) = mpsc::sync_channel(100); - let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let update_fn = move |_name: &str, update: ProcessedUpdateResult| { + sender.send(update.update_id).unwrap() + }; let index = database.create_index("test").unwrap(); - let done = database.set_update_callback("test", Box::new(update_fn)); - assert!(done, "could not set the index update function"); + database.set_update_callback(Box::new(update_fn)); let schema = { let data = r#" @@ -445,11 +454,12 @@ mod tests { let env = &database.env; let (sender, receiver) = mpsc::sync_channel(100); - let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let update_fn = move |_name: &str, update: ProcessedUpdateResult| { + sender.send(update.update_id).unwrap() + }; let index = database.create_index("test").unwrap(); - let done = database.set_update_callback("test", Box::new(update_fn)); - assert!(done, "could not set the index update function"); + database.set_update_callback(Box::new(update_fn)); let schema = { let data = r#" @@ -615,11 +625,12 @@ mod tests { let env = &database.env; let (sender, receiver) = mpsc::sync_channel(100); - let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let update_fn = move |_name: &str, update: ProcessedUpdateResult| { + sender.send(update.update_id).unwrap() + }; let index = database.create_index("test").unwrap(); - let done = database.set_update_callback("test", Box::new(update_fn)); - assert!(done, "could not set the index update function"); + database.set_update_callback(Box::new(update_fn)); let schema = { let data = r#" @@ -692,11 +703,12 @@ mod tests { let env = &database.env; let (sender, receiver) = mpsc::sync_channel(100); - let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let update_fn = move |_name: &str, update: ProcessedUpdateResult| { + sender.send(update.update_id).unwrap() + }; let index = database.create_index("test").unwrap(); - let done = database.set_update_callback("test", Box::new(update_fn)); - assert!(done, "could not set the index update function"); + database.set_update_callback(Box::new(update_fn)); let schema = { let data = r#" diff --git a/meilidb-http/src/data.rs b/meilidb-http/src/data.rs index 4becfa10f..f62cafdc4 100644 --- a/meilidb-http/src/data.rs +++ b/meilidb-http/src/data.rs @@ -174,16 +174,10 @@ impl Data { inner: Arc::new(inner_data), }; - for index_name in db.indexes_names().unwrap() { - let callback_context = data.clone(); - let callback_name = index_name.clone(); - db.set_update_callback( - index_name, - Box::new(move |status| { - index_update_callback(&callback_name, &callback_context, status); - }), - ); - } + let callback_context = data.clone(); + db.set_update_callback(Box::new(move |index_name, status| { + index_update_callback(&index_name, &callback_context, status); + })); data } diff --git a/meilidb-http/src/lib.rs b/meilidb-http/src/lib.rs index 94f395c43..118af8dc1 100644 --- a/meilidb-http/src/lib.rs +++ b/meilidb-http/src/lib.rs @@ -8,4 +8,4 @@ pub mod models; pub mod option; pub mod routes; -use self::data::Data; +pub use self::data::Data; diff --git a/meilidb-http/src/main.rs b/meilidb-http/src/main.rs index 269445f65..1ad429649 100644 --- a/meilidb-http/src/main.rs +++ b/meilidb-http/src/main.rs @@ -7,6 +7,7 @@ use tide_log::RequestLogger; use meilidb_http::data::Data; use meilidb_http::option::Opt; use meilidb_http::routes; +use meilidb_http::routes::index::index_update_callback; #[cfg(not(target_os = "macos"))] #[global_allocator] @@ -16,8 +17,13 @@ pub fn main() -> Result<(), MainError> { env_logger::init(); let opt = Opt::new(); - let data = Data::new(opt.clone()); + + let data_cloned = data.clone(); + data.db.set_update_callback(Box::new(move |name, status| { + index_update_callback(name, &data_cloned, status); + })); + let mut app = tide::App::with_state(data); app.middleware( diff --git a/meilidb-http/src/routes/index.rs b/meilidb-http/src/routes/index.rs index 8df1ced6f..0522d584f 100644 --- a/meilidb-http/src/routes/index.rs +++ b/meilidb-http/src/routes/index.rs @@ -69,15 +69,6 @@ pub async fn create_index(mut ctx: Context) -> SResult { Err(e) => return Err(ResponseError::create_index(e)), }; - let callback_context = ctx.state().clone(); - let callback_name = index_name.clone(); - db.set_update_callback( - &index_name, - Box::new(move |status| { - index_update_callback(&callback_name, &callback_context, status); - }), - ); - let env = &db.env; let mut writer = env.write_txn().map_err(ResponseError::internal)?;