2019-10-10 19:38:58 +08:00
|
|
|
use std::collections::hash_map::{HashMap, Entry};
|
2019-10-04 22:49:17 +08:00
|
|
|
use std::path::Path;
|
|
|
|
use std::sync::{Arc, RwLock};
|
2019-10-07 22:16:04 +08:00
|
|
|
use std::{fs, thread};
|
|
|
|
|
|
|
|
use crossbeam_channel::Receiver;
|
2019-10-08 20:53:35 +08:00
|
|
|
use log::{debug, error};
|
2019-10-07 22:16:04 +08:00
|
|
|
|
|
|
|
use crate::{store, update, Index, MResult};
|
2019-10-04 22:49:17 +08:00
|
|
|
|
2019-10-09 17:45:19 +08:00
|
|
|
pub type BoxUpdateFn = Box<dyn Fn(update::UpdateResult) + Send + Sync + 'static>;
|
|
|
|
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
|
|
|
|
|
2019-10-04 22:49:17 +08:00
|
|
|
pub struct Database {
|
|
|
|
pub rkv: Arc<RwLock<rkv::Rkv>>,
|
2019-10-10 21:14:32 +08:00
|
|
|
common_store: rkv::SingleStore,
|
2019-10-07 22:16:04 +08:00
|
|
|
indexes_store: rkv::SingleStore,
|
2019-10-09 17:45:19 +08:00
|
|
|
indexes: RwLock<HashMap<String, (Index, Arc<ArcSwapFn>, thread::JoinHandle<()>)>>,
|
2019-10-07 22:16:04 +08:00
|
|
|
}
|
|
|
|
|
2019-10-09 17:45:19 +08:00
|
|
|
fn update_awaiter(
|
|
|
|
receiver: Receiver<()>,
|
|
|
|
rkv: Arc<RwLock<rkv::Rkv>>,
|
|
|
|
update_fn: Arc<ArcSwapFn>,
|
|
|
|
index: Index,
|
|
|
|
)
|
|
|
|
{
|
2019-10-07 22:16:04 +08:00
|
|
|
for () in receiver {
|
|
|
|
// consume all updates in order (oldest first)
|
|
|
|
loop {
|
|
|
|
let rkv = match rkv.read() {
|
|
|
|
Ok(rkv) => rkv,
|
|
|
|
Err(e) => { error!("rkv RwLock read failed: {}", e); break }
|
|
|
|
};
|
2019-10-08 20:53:35 +08:00
|
|
|
|
2019-10-07 22:16:04 +08:00
|
|
|
let mut writer = match rkv.write() {
|
|
|
|
Ok(writer) => writer,
|
|
|
|
Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break }
|
|
|
|
};
|
|
|
|
|
2019-10-09 23:23:48 +08:00
|
|
|
match update::update_task(&mut writer, index.clone()) {
|
|
|
|
Ok(Some(status)) => {
|
|
|
|
if let Err(e) = writer.commit() { error!("update transaction failed: {}", e) }
|
|
|
|
|
|
|
|
if let Some(ref callback) = *update_fn.load() {
|
|
|
|
(callback)(status);
|
|
|
|
}
|
|
|
|
},
|
2019-10-07 22:16:04 +08:00
|
|
|
// no more updates to handle for now
|
2019-10-09 23:23:48 +08:00
|
|
|
Ok(None) => { debug!("no more updates"); writer.abort(); break },
|
2019-10-07 22:16:04 +08:00
|
|
|
Err(e) => { error!("update task failed: {}", e); writer.abort() },
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-10-04 22:49:17 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Database {
|
|
|
|
pub fn open_or_create(path: impl AsRef<Path>) -> MResult<Database> {
|
|
|
|
let manager = rkv::Manager::singleton();
|
|
|
|
let mut rkv_write = manager.write().unwrap();
|
|
|
|
|
2019-10-07 22:16:04 +08:00
|
|
|
fs::create_dir_all(path.as_ref())?;
|
|
|
|
|
2019-10-04 22:49:17 +08:00
|
|
|
let rkv = rkv_write
|
|
|
|
.get_or_create(path.as_ref(), |path| {
|
|
|
|
let mut builder = rkv::Rkv::environment_builder();
|
|
|
|
builder.set_max_dbs(3000).set_map_size(10 * 1024 * 1024 * 1024); // 10GB
|
|
|
|
rkv::Rkv::from_env(path, builder)
|
|
|
|
})?;
|
|
|
|
|
|
|
|
drop(rkv_write);
|
|
|
|
|
|
|
|
let rkv_read = rkv.read().unwrap();
|
|
|
|
let create_options = rkv::store::Options::create();
|
2019-10-10 21:14:32 +08:00
|
|
|
let common_store = rkv_read.open_single("common", create_options)?;
|
2019-10-07 22:16:04 +08:00
|
|
|
let indexes_store = rkv_read.open_single("indexes", create_options)?;
|
2019-10-04 22:49:17 +08:00
|
|
|
|
|
|
|
// list all indexes that needs to be opened
|
|
|
|
let mut must_open = Vec::new();
|
|
|
|
let reader = rkv_read.read()?;
|
2019-10-07 22:16:04 +08:00
|
|
|
for result in indexes_store.iter_start(&reader)? {
|
2019-10-04 22:49:17 +08:00
|
|
|
let (key, _) = result?;
|
|
|
|
if let Ok(index_name) = std::str::from_utf8(key) {
|
|
|
|
must_open.push(index_name.to_owned());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
drop(reader);
|
|
|
|
|
|
|
|
// open the previously aggregated indexes
|
|
|
|
let mut indexes = HashMap::new();
|
|
|
|
for index_name in must_open {
|
2019-10-07 22:16:04 +08:00
|
|
|
|
|
|
|
let (sender, receiver) = crossbeam_channel::bounded(100);
|
|
|
|
let index = store::open(&rkv_read, &index_name, sender.clone())?;
|
2019-10-09 17:45:19 +08:00
|
|
|
let update_fn = Arc::new(ArcSwapFn::empty());
|
|
|
|
|
2019-10-07 22:16:04 +08:00
|
|
|
let rkv_clone = rkv.clone();
|
|
|
|
let index_clone = index.clone();
|
2019-10-09 17:45:19 +08:00
|
|
|
let update_fn_clone = update_fn.clone();
|
|
|
|
|
|
|
|
let handle = thread::spawn(move || {
|
|
|
|
update_awaiter(receiver, rkv_clone, update_fn_clone, index_clone)
|
|
|
|
});
|
2019-10-07 22:16:04 +08:00
|
|
|
|
|
|
|
// send an update notification to make sure that
|
|
|
|
// possible previous boot updates are consumed
|
|
|
|
sender.send(()).unwrap();
|
|
|
|
|
2019-10-09 17:45:19 +08:00
|
|
|
let result = indexes.insert(index_name, (index, update_fn, handle));
|
2019-10-07 22:16:04 +08:00
|
|
|
assert!(result.is_none(), "The index should not have been already open");
|
2019-10-04 22:49:17 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
drop(rkv_read);
|
|
|
|
|
2019-10-10 21:14:32 +08:00
|
|
|
Ok(Database { rkv, common_store, indexes_store, indexes: RwLock::new(indexes) })
|
2019-10-04 22:49:17 +08:00
|
|
|
}
|
|
|
|
|
2019-10-10 19:38:58 +08:00
|
|
|
pub fn open_index(&self, name: impl AsRef<str>) -> Option<Index> {
|
2019-10-04 22:49:17 +08:00
|
|
|
let indexes_lock = self.indexes.read().unwrap();
|
2019-10-10 19:38:58 +08:00
|
|
|
match indexes_lock.get(name.as_ref()) {
|
|
|
|
Some((index, ..)) => Some(index.clone()),
|
|
|
|
None => None,
|
|
|
|
}
|
|
|
|
}
|
2019-10-04 22:49:17 +08:00
|
|
|
|
2019-10-10 19:38:58 +08:00
|
|
|
pub fn create_index(&self, name: impl AsRef<str>) -> MResult<Index> {
|
|
|
|
let name = name.as_ref();
|
|
|
|
let mut indexes_lock = self.indexes.write().unwrap();
|
2019-10-04 22:49:17 +08:00
|
|
|
|
2019-10-10 19:38:58 +08:00
|
|
|
match indexes_lock.entry(name.to_owned()) {
|
|
|
|
Entry::Occupied(_) => Err(crate::Error::IndexAlreadyExists),
|
|
|
|
Entry::Vacant(entry) => {
|
2019-10-04 22:49:17 +08:00
|
|
|
let rkv_lock = self.rkv.read().unwrap();
|
2019-10-07 22:16:04 +08:00
|
|
|
let (sender, receiver) = crossbeam_channel::bounded(100);
|
2019-10-10 19:38:58 +08:00
|
|
|
let index = store::create(&rkv_lock, name, sender)?;
|
2019-10-04 22:49:17 +08:00
|
|
|
|
|
|
|
let mut writer = rkv_lock.write()?;
|
|
|
|
let value = rkv::Value::Blob(&[]);
|
2019-10-10 19:38:58 +08:00
|
|
|
self.indexes_store.put(&mut writer, name, &value)?;
|
2019-10-09 17:45:19 +08:00
|
|
|
|
2019-10-10 19:38:58 +08:00
|
|
|
let rkv_clone = self.rkv.clone();
|
|
|
|
let index_clone = index.clone();
|
2019-10-09 17:45:19 +08:00
|
|
|
|
2019-10-10 19:38:58 +08:00
|
|
|
let no_update_fn = Arc::new(ArcSwapFn::empty());
|
|
|
|
let no_update_fn_clone = no_update_fn.clone();
|
2019-10-09 17:45:19 +08:00
|
|
|
|
2019-10-10 19:38:58 +08:00
|
|
|
let handle = thread::spawn(move || {
|
|
|
|
update_awaiter(receiver, rkv_clone, no_update_fn_clone, index_clone)
|
|
|
|
});
|
2019-10-04 22:49:17 +08:00
|
|
|
|
|
|
|
writer.commit()?;
|
2019-10-10 19:38:58 +08:00
|
|
|
entry.insert((index.clone(), no_update_fn, handle));
|
2019-10-04 22:49:17 +08:00
|
|
|
|
|
|
|
Ok(index)
|
2019-10-10 19:38:58 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_update_callback(&self, name: impl AsRef<str>, update_fn: BoxUpdateFn) -> bool {
|
|
|
|
let indexes_lock = self.indexes.read().unwrap();
|
|
|
|
match indexes_lock.get(name.as_ref()) {
|
|
|
|
Some((_, current_update_fn, _)) => {
|
|
|
|
let update_fn = Some(Arc::new(update_fn));
|
|
|
|
current_update_fn.swap(update_fn);
|
|
|
|
true
|
2019-10-04 22:49:17 +08:00
|
|
|
},
|
2019-10-10 19:38:58 +08:00
|
|
|
None => false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn unset_update_callback(&self, name: impl AsRef<str>) -> bool {
|
|
|
|
let indexes_lock = self.indexes.read().unwrap();
|
|
|
|
match indexes_lock.get(name.as_ref()) {
|
|
|
|
Some((_, current_update_fn, _)) => { current_update_fn.swap(None); true },
|
|
|
|
None => false,
|
2019-10-04 22:49:17 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn indexes_names(&self) -> MResult<Vec<String>> {
|
|
|
|
let indexes = self.indexes.read().unwrap();
|
|
|
|
Ok(indexes.keys().cloned().collect())
|
|
|
|
}
|
2019-10-07 22:16:04 +08:00
|
|
|
|
2019-10-10 21:14:32 +08:00
|
|
|
pub fn common_store(&self) -> rkv::SingleStore {
|
|
|
|
self.common_store
|
2019-10-07 22:16:04 +08:00
|
|
|
}
|
2019-10-04 22:49:17 +08:00
|
|
|
}
|