diff --git a/src/database/mod.rs b/src/database/mod.rs index 8b2b89544..99fc5228e 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,12 +1,11 @@ +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use std::error::Error; use std::path::Path; use std::ops::Deref; -use std::sync::Arc; -use std::fmt; use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions}; -use rocksdb::{DB, DBVector, MergeOperands, SeekKey}; use rocksdb::rocksdb::{Writable, Snapshot}; +use rocksdb::{DB, DBVector, MergeOperands}; pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::database_view::{DatabaseView, DocumentIter}; @@ -43,8 +42,15 @@ where D: Deref } } -#[derive(Clone)] -pub struct Database(Arc); +pub struct Database { + // DB is under a Mutex to sync update ingestions and separate DB update locking + // and DatabaseView acquiring locking in other words: + // "Block readers the minimum possible amount of time" + db: Mutex>, + + // This view is updated each time the DB ingests an update + view: RwLock>>, +} impl Database { pub fn create>(path: P, schema: Schema) -> Result> { @@ -68,7 +74,11 @@ impl Database { schema.write_to(&mut schema_bytes)?; db.put(DATA_SCHEMA, &schema_bytes)?; - Ok(Database(Arc::new(db))) + let db = Arc::new(db); + let snapshot = Snapshot::new(db.clone()); + let view = RwLock::new(DatabaseView::new(snapshot)?); + + Ok(Database { db: Mutex::new(db), view }) } pub fn open>(path: P) -> Result> { @@ -88,58 +98,65 @@ impl Database { None => return Err(String::from("Database does not contain a schema").into()), }; - Ok(Database(Arc::new(db))) + let db = Arc::new(db); + let snapshot = Snapshot::new(db.clone()); + let view = RwLock::new(DatabaseView::new(snapshot)?); + + Ok(Database { db: Mutex::new(db), view }) } pub fn ingest_update_file(&self, update: Update) -> Result<(), Box> { - let move_update = update.can_be_moved(); - let path = update.into_path_buf(); - let path = path.to_string_lossy(); + let snapshot = { + // We must have a mutex here to ensure that update ingestions and compactions + // are done atomatically and in the right order. + // This way update ingestions will block other update ingestions without blocking view + // creations while doing the "data-index" compaction + let db = match self.db.lock() { + Ok(db) => db, + Err(e) => return Err(e.to_string().into()), + }; - let mut options = IngestExternalFileOptions::new(); - options.move_files(move_update); + let move_update = update.can_be_moved(); + let path = update.into_path_buf(); + let path = path.to_string_lossy(); - let cf_handle = self.0.cf_handle("default").unwrap(); - self.0.ingest_external_file_optimized(&cf_handle, &options, &[&path])?; + let mut options = IngestExternalFileOptions::new(); + options.move_files(move_update); - // compacting to avoid calling the merge operator - self.0.compact_range(Some(DATA_INDEX), Some(DATA_INDEX)); + let cf_handle = db.cf_handle("default").expect("\"default\" column family not found"); + db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?; + + // Compacting to trigger the merge operator only one time + // while ingesting the update and not each time searching + db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX)); + + Snapshot::new(db.clone()) + }; + + // Here we will block the view creation for the minimum amount of time: + // updating the DatabaseView itself with the new database snapshot + let view = DatabaseView::new(snapshot)?; + match self.view.write() { + Ok(mut lock) => *lock = view, + Err(e) => return Err(e.to_string().into()), + } Ok(()) } pub fn get(&self, key: &[u8]) -> Result, Box> { - Ok(self.0.get(key)?) + self.view().get(key) } pub fn flush(&self) -> Result<(), Box> { - Ok(self.0.flush(true)?) - } - - pub fn view(&self) -> Result, Box> { - let snapshot = self.0.snapshot(); - DatabaseView::new(snapshot) - } - - pub fn view_arc(&self) -> Result>, Box> { - let snapshot = Snapshot::new(self.0.clone()); - DatabaseView::new(snapshot) - } -} - -impl fmt::Debug for Database { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Database([")?; - let mut iter = self.0.iter(); - iter.seek(SeekKey::Start); - let mut first = true; - for (key, _value) in &mut iter { - if !first { write!(f, ", ")?; } - first = false; - let key = String::from_utf8_lossy(&key); - write!(f, "{:?}", key)?; + match self.db.lock() { + Ok(db) => Ok(db.flush(true)?), + Err(e) => Err(e.to_string().into()), } - write!(f, "])") + } + + pub fn view(&self) -> RwLockReadGuard>> { + self.view.read().unwrap() } } @@ -229,7 +246,7 @@ mod tests { update.set_move(true); database.ingest_update_file(update)?; - let view = database.view()?; + let view = database.view(); let de_doc0: SimpleDoc = view.retrieve_document(0)?; let de_doc1: SimpleDoc = view.retrieve_document(1)?;