Prefer using a global update callback common to all indexes

This commit is contained in:
Clément Renault 2019-11-15 17:33:06 +01:00
parent 3a1f41ebdb
commit 23a89732a5
6 changed files with 77 additions and 74 deletions

View File

@ -104,14 +104,14 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
let start = Instant::now(); let start = Instant::now();
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); let update_fn =
move |_name: &str, update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = match database.open_index(&command.index_name) { let index = match database.open_index(&command.index_name) {
Some(index) => index, Some(index) => index,
None => database.create_index(&command.index_name).unwrap(), None => database.create_index(&command.index_name).unwrap(),
}; };
let done = database.set_update_callback(&command.index_name, Box::new(update_fn)); database.set_update_callback(Box::new(update_fn));
assert!(done, "could not set the index update function");
let env = &database.env; let env = &database.env;

View File

@ -11,14 +11,15 @@ use log::debug;
use crate::{store, update, Index, MResult}; use crate::{store, update, Index, MResult};
pub type BoxUpdateFn = Box<dyn Fn(update::ProcessedUpdateResult) + Send + Sync + 'static>; pub type BoxUpdateFn = Box<dyn Fn(&str, update::ProcessedUpdateResult) + Send + Sync + 'static>;
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>; type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
pub struct Database { pub struct Database {
pub env: heed::Env, pub env: heed::Env,
common_store: heed::PolyDatabase, common_store: heed::PolyDatabase,
indexes_store: heed::Database<Str, Unit>, indexes_store: heed::Database<Str, Unit>,
indexes: RwLock<HashMap<String, (Index, Arc<ArcSwapFn>, thread::JoinHandle<()>)>>, indexes: RwLock<HashMap<String, (Index, thread::JoinHandle<()>)>>,
update_fn: Arc<ArcSwapFn>,
} }
macro_rules! r#break_try { macro_rules! r#break_try {
@ -41,7 +42,13 @@ pub enum UpdateEvent {
pub type UpdateEvents = Receiver<UpdateEvent>; pub type UpdateEvents = Receiver<UpdateEvent>;
pub type UpdateEventsEmitter = Sender<UpdateEvent>; pub type UpdateEventsEmitter = Sender<UpdateEvent>;
fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc<ArcSwapFn>, index: Index) { fn update_awaiter(
receiver: UpdateEvents,
env: heed::Env,
index_name: &str,
update_fn: Arc<ArcSwapFn>,
index: Index,
) {
let mut receiver = receiver.into_iter(); let mut receiver = receiver.into_iter();
while let Some(UpdateEvent::NewUpdate) = receiver.next() { while let Some(UpdateEvent::NewUpdate) = receiver.next() {
loop { loop {
@ -84,7 +91,7 @@ fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc<ArcSwap
// call the user callback when the update and the result are written consistently // call the user callback when the update and the result are written consistently
if let Some(ref callback) = *update_fn.load() { if let Some(ref callback) = *update_fn.load() {
(callback)(status); (callback)(index_name, status);
} }
} }
} }
@ -103,6 +110,7 @@ impl Database {
let common_store = env.create_poly_database(Some("common"))?; let common_store = env.create_poly_database(Some("common"))?;
let indexes_store = env.create_database::<Str, Unit>(Some("indexes"))?; let indexes_store = env.create_database::<Str, Unit>(Some("indexes"))?;
let update_fn = Arc::new(ArcSwapFn::empty());
// list all indexes that needs to be opened // list all indexes that needs to be opened
let mut must_open = Vec::new(); let mut must_open = Vec::new();
@ -128,21 +136,27 @@ impl Database {
continue; continue;
} }
}; };
let update_fn = Arc::new(ArcSwapFn::empty());
let env_clone = env.clone(); let env_clone = env.clone();
let index_clone = index.clone(); let index_clone = index.clone();
let name_clone = index_name.clone();
let update_fn_clone = update_fn.clone(); let update_fn_clone = update_fn.clone();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
update_awaiter(receiver, env_clone, update_fn_clone, index_clone) update_awaiter(
receiver,
env_clone,
&name_clone,
update_fn_clone,
index_clone,
)
}); });
// send an update notification to make sure that // send an update notification to make sure that
// possible pre-boot updates are consumed // possible pre-boot updates are consumed
sender.send(UpdateEvent::NewUpdate).unwrap(); sender.send(UpdateEvent::NewUpdate).unwrap();
let result = indexes.insert(index_name, (index, update_fn, handle)); let result = indexes.insert(index_name, (index, handle));
assert!( assert!(
result.is_none(), result.is_none(),
"The index should not have been already open" "The index should not have been already open"
@ -154,6 +168,7 @@ impl Database {
common_store, common_store,
indexes_store, indexes_store,
indexes: RwLock::new(indexes), indexes: RwLock::new(indexes),
update_fn,
}) })
} }
@ -180,16 +195,21 @@ impl Database {
let env_clone = self.env.clone(); let env_clone = self.env.clone();
let index_clone = index.clone(); let index_clone = index.clone();
let name_clone = name.to_owned();
let no_update_fn = Arc::new(ArcSwapFn::empty()); let update_fn_clone = self.update_fn.clone();
let no_update_fn_clone = no_update_fn.clone();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
update_awaiter(receiver, env_clone, no_update_fn_clone, index_clone) update_awaiter(
receiver,
env_clone,
&name_clone,
update_fn_clone,
index_clone,
)
}); });
writer.commit()?; writer.commit()?;
entry.insert((index.clone(), no_update_fn, handle)); entry.insert((index.clone(), handle));
Ok(index) Ok(index)
} }
@ -201,7 +221,7 @@ impl Database {
let mut indexes_lock = self.indexes.write().unwrap(); let mut indexes_lock = self.indexes.write().unwrap();
match indexes_lock.remove_entry(name) { match indexes_lock.remove_entry(name) {
Some((name, (index, _fn, handle))) => { Some((name, (index, handle))) => {
// remove the index name from the list of indexes // remove the index name from the list of indexes
// and clear all the LMDB dbi // and clear all the LMDB dbi
let mut writer = self.env.write_txn()?; let mut writer = self.env.write_txn()?;
@ -218,27 +238,13 @@ impl Database {
} }
} }
pub fn set_update_callback(&self, name: impl AsRef<str>, update_fn: BoxUpdateFn) -> bool { pub fn set_update_callback(&self, update_fn: BoxUpdateFn) {
let indexes_lock = self.indexes.read().unwrap(); let update_fn = Some(Arc::new(update_fn));
match indexes_lock.get(name.as_ref()) { self.update_fn.swap(update_fn);
Some((_, current_update_fn, _)) => {
let update_fn = Some(Arc::new(update_fn));
current_update_fn.swap(update_fn);
true
}
None => false,
}
} }
pub fn unset_update_callback(&self, name: impl AsRef<str>) -> bool { pub fn unset_update_callback(&self) {
let indexes_lock = self.indexes.read().unwrap(); self.update_fn.swap(None);
match indexes_lock.get(name.as_ref()) {
Some((_, current_update_fn, _)) => {
current_update_fn.swap(None);
true
}
None => false,
}
} }
pub fn copy_and_compact_to_path<P: AsRef<Path>>(&self, path: P) -> ZResult<File> { pub fn copy_and_compact_to_path<P: AsRef<Path>>(&self, path: P) -> ZResult<File> {
@ -272,11 +278,12 @@ mod tests {
let env = &database.env; let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
sender.send(update.update_id).unwrap()
};
let index = database.create_index("test").unwrap(); let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn)); database.set_update_callback(Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = { let schema = {
let data = r#" let data = r#"
@ -334,11 +341,12 @@ mod tests {
let env = &database.env; let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
sender.send(update.update_id).unwrap()
};
let index = database.create_index("test").unwrap(); let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn)); database.set_update_callback(Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = { let schema = {
let data = r#" let data = r#"
@ -395,11 +403,12 @@ mod tests {
let env = &database.env; let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
sender.send(update.update_id).unwrap()
};
let index = database.create_index("test").unwrap(); let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn)); database.set_update_callback(Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = { let schema = {
let data = r#" let data = r#"
@ -445,11 +454,12 @@ mod tests {
let env = &database.env; let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
sender.send(update.update_id).unwrap()
};
let index = database.create_index("test").unwrap(); let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn)); database.set_update_callback(Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = { let schema = {
let data = r#" let data = r#"
@ -615,11 +625,12 @@ mod tests {
let env = &database.env; let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
sender.send(update.update_id).unwrap()
};
let index = database.create_index("test").unwrap(); let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn)); database.set_update_callback(Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = { let schema = {
let data = r#" let data = r#"
@ -692,11 +703,12 @@ mod tests {
let env = &database.env; let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
sender.send(update.update_id).unwrap()
};
let index = database.create_index("test").unwrap(); let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn)); database.set_update_callback(Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = { let schema = {
let data = r#" let data = r#"

View File

@ -174,16 +174,10 @@ impl Data {
inner: Arc::new(inner_data), inner: Arc::new(inner_data),
}; };
for index_name in db.indexes_names().unwrap() { let callback_context = data.clone();
let callback_context = data.clone(); db.set_update_callback(Box::new(move |index_name, status| {
let callback_name = index_name.clone(); index_update_callback(&index_name, &callback_context, status);
db.set_update_callback( }));
index_name,
Box::new(move |status| {
index_update_callback(&callback_name, &callback_context, status);
}),
);
}
data data
} }

View File

@ -8,4 +8,4 @@ pub mod models;
pub mod option; pub mod option;
pub mod routes; pub mod routes;
use self::data::Data; pub use self::data::Data;

View File

@ -7,6 +7,7 @@ use tide_log::RequestLogger;
use meilidb_http::data::Data; use meilidb_http::data::Data;
use meilidb_http::option::Opt; use meilidb_http::option::Opt;
use meilidb_http::routes; use meilidb_http::routes;
use meilidb_http::routes::index::index_update_callback;
#[cfg(not(target_os = "macos"))] #[cfg(not(target_os = "macos"))]
#[global_allocator] #[global_allocator]
@ -16,8 +17,13 @@ pub fn main() -> Result<(), MainError> {
env_logger::init(); env_logger::init();
let opt = Opt::new(); let opt = Opt::new();
let data = Data::new(opt.clone()); let data = Data::new(opt.clone());
let data_cloned = data.clone();
data.db.set_update_callback(Box::new(move |name, status| {
index_update_callback(name, &data_cloned, status);
}));
let mut app = tide::App::with_state(data); let mut app = tide::App::with_state(data);
app.middleware( app.middleware(

View File

@ -69,15 +69,6 @@ pub async fn create_index(mut ctx: Context<Data>) -> SResult<Response> {
Err(e) => return Err(ResponseError::create_index(e)), Err(e) => return Err(ResponseError::create_index(e)),
}; };
let callback_context = ctx.state().clone();
let callback_name = index_name.clone();
db.set_update_callback(
&index_name,
Box::new(move |status| {
index_update_callback(&callback_name, &callback_context, status);
}),
);
let env = &db.env; let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?; let mut writer = env.write_txn().map_err(ResponseError::internal)?;