feat: Implement the create/open/ingest_file Database methods

This commit is contained in:
Clément Renault 2018-12-03 15:57:01 +01:00
parent 42b0cf68eb
commit 6f8dbbde9a
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
2 changed files with 96 additions and 11 deletions

View File

@ -1,13 +1,17 @@
use std::error::Error; use std::error::Error;
use std::path::Path; use std::path::Path;
use rocksdb::rocksdb::DB; use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions};
use rocksdb::{DB, MergeOperands};
use rocksdb::rocksdb::Writable;
pub use crate::database::database_view::DatabaseView;
use crate::index::update::Update; use crate::index::update::Update;
use crate::database::database_view::DatabaseView; use crate::index::schema::Schema;
use crate::blob::{self, Blob};
pub mod document_key; mod document_key;
pub mod database_view; mod database_view;
mod deserializer; mod deserializer;
const DATA_INDEX: &[u8] = b"data-index"; const DATA_INDEX: &[u8] = b"data-index";
@ -16,16 +20,64 @@ const DATA_SCHEMA: &[u8] = b"data-schema";
pub struct Database(DB); pub struct Database(DB);
impl Database { impl Database {
pub fn create(path: &Path) -> Result<Database, ()> { pub fn create<P: AsRef<Path>>(path: P, schema: Schema) -> Result<Database, Box<Error>> {
unimplemented!() let path = path.as_ref();
if path.exists() {
return Err(format!("File already exists at path: {}, cannot create database.",
path.display()).into())
}
let path = path.to_string_lossy();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
let mut schema_bytes = Vec::new();
schema.write_to(&mut schema_bytes)?;
db.put(DATA_SCHEMA, &schema_bytes)?;
Ok(Database(db))
} }
pub fn open(path: &Path) -> Result<Database, ()> { pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
unimplemented!() let path = path.as_ref().to_string_lossy();
let mut opts = DBOptions::new();
opts.create_if_missing(false);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
// FIXME create a generic function to do that !
let _schema = match db.get(DATA_SCHEMA)? {
Some(value) => Schema::read_from(&*value)?,
None => return Err(String::from("Database does not contain a schema").into()),
};
Ok(Database(db))
} }
pub fn ingest_update_file(&self, update: Update) -> Result<(), ()> { pub fn ingest_update_file(&self, update: Update) -> Result<(), Box<Error>> {
unimplemented!() let move_update = update.can_be_moved();
let path = update.into_path_buf();
let path = path.to_string_lossy();
let mut options = IngestExternalFileOptions::new();
options.move_files(move_update);
let cf_handle = self.0.cf_handle("default").unwrap();
self.0.ingest_external_file_optimized(&cf_handle, &options, &[&path])?;
// compacting to avoid calling the merge operator
self.0.compact_range(Some(DATA_INDEX), Some(DATA_INDEX));
Ok(())
} }
pub fn view(&self) -> Result<DatabaseView, Box<Error>> { pub fn view(&self) -> Result<DatabaseView, Box<Error>> {
@ -33,3 +85,27 @@ impl Database {
DatabaseView::new(snapshot) DatabaseView::new(snapshot)
} }
} }
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
if key != DATA_INDEX { panic!("The merge operator only supports \"data-index\" merging") }
let capacity = {
let remaining = operands.size_hint().0;
let already_exist = usize::from(existing_value.is_some());
remaining + already_exist
};
let mut op = blob::OpBuilder::with_capacity(capacity);
if let Some(existing_value) = existing_value {
let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index");
op.push(Blob::Positive(blob));
}
for bytes in operands {
let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blob");
op.push(blob);
}
let blob = op.merge().expect("BUG: could not merge blobs");
bincode::serialize(&blob).expect("BUG: could not serialize merged blob")
}

View File

@ -18,11 +18,20 @@ const DOC_KEY_ATTR_LEN: usize = DOC_KEY_LEN + 1 + std::mem::size_of::<u32>();
pub struct Update { pub struct Update {
path: PathBuf, path: PathBuf,
can_be_moved: bool,
} }
impl Update { impl Update {
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> { pub fn open<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
Ok(Update { path: path.into() }) Ok(Update { path: path.into(), can_be_moved: false })
}
pub fn open_and_move<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
Ok(Update { path: path.into(), can_be_moved: true })
}
pub fn can_be_moved(&self) -> bool {
self.can_be_moved
} }
pub fn into_path_buf(self) -> PathBuf { pub fn into_path_buf(self) -> PathBuf {