feat: Introduce a basic RocksDB based version

This commit is contained in:
Clément Renault 2019-05-23 14:47:10 +02:00
parent 3a8da82792
commit 4c973238a1
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
12 changed files with 119 additions and 105 deletions

View File

@ -16,7 +16,7 @@ ordered-float = { version = "1.0.2", features = ["serde"] }
sdset = "0.3.2"
serde = { version = "1.0.91", features = ["derive"] }
serde_json = { version = "1.0.39", features = ["preserve_order"] }
sled = "0.23.0"
rocksdb = "0.12.2"
toml = { version = "0.5.0", features = ["preserve_order"] }
zerocopy = "0.2.2"
@ -28,9 +28,5 @@ rev = "40b3d48"
git = "https://github.com/Kerollmops/fst.git"
branch = "arc-byte-slice"
[features]
default = []
compression = ["sled/compression"]
[dev-dependencies]
tempfile = "3.0.7"

View File

@ -2,12 +2,12 @@ use std::sync::Arc;
use std::ops::Deref;
#[derive(Clone)]
pub struct CustomSettings(pub Arc<sled::Tree>);
pub struct CustomSettings(pub Arc<rocksdb::DB>, pub String);
impl Deref for CustomSettings {
type Target = sled::Tree;
type Target = rocksdb::DB;
fn deref(&self) -> &sled::Tree {
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -3,15 +3,16 @@ use meilidb_core::DocumentId;
use super::Error;
#[derive(Clone)]
pub struct DocsWordsIndex(pub Arc<sled::Tree>);
pub struct DocsWordsIndex(pub Arc<rocksdb::DB>, pub String);
impl DocsWordsIndex {
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {
let key = id.0.to_be_bytes();
match self.0.get(key)? {
let cf = self.0.cf_handle(&self.1).unwrap();
match self.0.get_pinned_cf(cf, key)? {
Some(bytes) => {
let len = bytes.len();
let value = bytes.into();
let value = Arc::from(bytes.as_ref());
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
Ok(Some(fst::Set::from(fst)))
},
@ -21,13 +22,15 @@ impl DocsWordsIndex {
pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> {
let key = id.0.to_be_bytes();
self.0.set(key, words.as_fst().as_bytes())?;
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.put_cf(cf, key, words.as_fst().as_bytes())?;
Ok(())
}
pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> {
let key = id.0.to_be_bytes();
self.0.del(key)?;
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.delete_cf(cf, key)?;
Ok(())
}
}

View File

@ -2,69 +2,77 @@ use std::sync::Arc;
use std::convert::TryInto;
use meilidb_core::DocumentId;
use sled::IVec;
use rocksdb::DBVector;
use crate::document_attr_key::DocumentAttrKey;
use crate::schema::SchemaAttr;
#[derive(Clone)]
pub struct DocumentsIndex(pub Arc<sled::Tree>);
pub struct DocumentsIndex(pub Arc<rocksdb::DB>, pub String);
impl DocumentsIndex {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<IVec>> {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<Option<DBVector>, rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.get(key)
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.get_cf(cf, key)
}
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> {
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.set(key, value)?;
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.put_cf(cf, key, value)?;
Ok(())
}
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> {
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.del(key)?;
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.delete_cf(cf, key)?;
Ok(())
}
pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> {
pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
let document_attrs = self.0.range(start..=end).keys();
for key in document_attrs {
self.0.del(key?)?;
}
let cf = self.0.cf_handle(&self.1).unwrap();
let mut batch = rocksdb::WriteBatch::default();
batch.delete_range_cf(cf, start, end)?;
self.0.write(batch)?;
Ok(())
}
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
let start = DocumentAttrKey::new(id, SchemaAttr::min());
let start = start.to_be_bytes();
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max());
let end = end.to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap();
let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward);
let iter = self.0.iterator_cf(cf, from).unwrap();
DocumentFieldsIter(self.0.range(start..=end))
DocumentFieldsIter(iter, end.to_vec())
}
}
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>);
pub struct DocumentFieldsIter<'a>(rocksdb::DBIterator<'a>, Vec<u8>);
impl<'a> Iterator for DocumentFieldsIter<'a> {
type Item = sled::Result<(SchemaAttr, IVec)>;
type Item = Result<(SchemaAttr, Box<[u8]>), rocksdb::Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.0.next() {
Some(Ok((key, value))) => {
Some((key, value)) => {
if key.as_ref() > self.1.as_ref() {
return None;
}
let slice: &[u8] = key.as_ref();
let array = slice.try_into().unwrap();
let key = DocumentAttrKey::from_be_bytes(array);
Some(Ok((key.attribute, value)))
},
Some(Err(e)) => Some(Err(e)),
None => None,
}
}

View File

@ -7,15 +7,15 @@ pub enum Error {
SchemaMissing,
WordIndexMissing,
MissingDocumentId,
SledError(sled::Error),
RocksdbError(rocksdb::Error),
FstError(fst::Error),
BincodeError(bincode::Error),
SerializerError(SerializerError),
}
impl From<sled::Error> for Error {
fn from(error: sled::Error) -> Error {
Error::SledError(error)
impl From<rocksdb::Error> for Error {
fn from(error: rocksdb::Error) -> Error {
Error::RocksdbError(error)
}
}
@ -45,7 +45,7 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"),
SledError(e) => write!(f, "sled error; {}", e),
RocksdbError(e) => write!(f, "RocksDB error; {}", e),
FstError(e) => write!(f, "fst error; {}", e),
BincodeError(e) => write!(f, "bincode error; {}", e),
SerializerError(e) => write!(f, "serializer error; {}", e),

View File

@ -6,11 +6,12 @@ use crate::schema::Schema;
use super::Error;
#[derive(Clone)]
pub struct MainIndex(pub Arc<sled::Tree>);
pub struct MainIndex(pub Arc<rocksdb::DB>, pub String);
impl MainIndex {
pub fn schema(&self) -> Result<Option<Schema>, Error> {
match self.0.get("schema")? {
let cf = self.0.cf_handle(&self.1).unwrap();
match self.0.get_cf(cf, "schema")? {
Some(bytes) => {
let schema = Schema::read_from_bin(bytes.as_ref())?;
Ok(Some(schema))
@ -22,15 +23,17 @@ impl MainIndex {
pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> {
let mut bytes = Vec::new();
schema.write_to_bin(&mut bytes)?;
self.0.set("schema", bytes)?;
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.put_cf(cf, "schema", bytes)?;
Ok(())
}
pub fn words_set(&self) -> Result<Option<fst::Set>, Error> {
match self.0.get("words")? {
let cf = self.0.cf_handle(&self.1).unwrap();
match self.0.get_pinned_cf(cf, "words")? {
Some(bytes) => {
let len = bytes.len();
let value = bytes.into();
let value = Arc::from(bytes.as_ref());
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
Ok(Some(fst::Set::from(fst)))
},
@ -39,12 +42,14 @@ impl MainIndex {
}
pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> {
self.0.set("words", value.as_fst().as_bytes())?;
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.put_cf(cf, "words", value.as_fst().as_bytes())?;
Ok(())
}
pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
match self.0.get("ranked-map")? {
let cf = self.0.cf_handle(&self.1).unwrap();
match self.0.get_cf(cf, "ranked-map")? {
Some(bytes) => {
let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?;
Ok(Some(ranked_map))
@ -56,7 +61,8 @@ impl MainIndex {
pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> {
let mut bytes = Vec::new();
value.write_to_bin(&mut bytes)?;
self.0.set("ranked_map", bytes)?;
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.put_cf(cf, "ranked_map", bytes)?;
Ok(())
}
}

View File

@ -31,26 +31,24 @@ use self::words_index::WordsIndex;
pub struct Database {
cache: RwLock<HashMap<String, Arc<Index>>>,
inner: sled::Db,
inner: Arc<rocksdb::DB>,
}
impl Database {
pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
let path = path.as_ref();
let cache = RwLock::new(HashMap::new());
let config = sled::ConfigBuilder::new().path(path).print_profile_on_drop(true).build();
let inner = sled::Db::start(config)?;
Ok(Database { cache, inner })
}
pub fn start_with_compression<P: AsRef<Path>>(path: P, factor: i32) -> Result<Database, Error> {
let config = sled::ConfigBuilder::default()
.use_compression(true)
.compression_factor(factor)
.path(path)
.build();
let inner = {
let options = {
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options
};
let cfs = rocksdb::DB::list_cf(&options, path).unwrap_or(Vec::new());
Arc::new(rocksdb::DB::open_cf(&options, path, cfs)?)
};
let cache = RwLock::new(HashMap::new());
let inner = sled::Db::start(config)?;
Ok(Database { cache, inner })
}
@ -66,7 +64,7 @@ impl Database {
fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> {
let bytes = bincode::serialize(value)?;
self.inner.set("indexes", bytes)?;
self.inner.put("indexes", bytes)?;
Ok(())
}
@ -89,32 +87,32 @@ impl Database {
}
let main = {
let tree = self.inner.open_tree(name)?;
MainIndex(tree)
self.inner.cf_handle(name).expect("cf not found");
MainIndex(self.inner.clone(), name.to_owned())
};
let words = {
let tree_name = format!("{}-words", name);
let tree = self.inner.open_tree(tree_name)?;
WordsIndex(tree)
let cf_name = format!("{}-words", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
WordsIndex(self.inner.clone(), cf_name)
};
let docs_words = {
let tree_name = format!("{}-docs-words", name);
let tree = self.inner.open_tree(tree_name)?;
DocsWordsIndex(tree)
let cf_name = format!("{}-docs-words", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
DocsWordsIndex(self.inner.clone(), cf_name)
};
let documents = {
let tree_name = format!("{}-documents", name);
let tree = self.inner.open_tree(tree_name)?;
DocumentsIndex(tree)
let cf_name = format!("{}-documents", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
DocumentsIndex(self.inner.clone(), cf_name)
};
let custom = {
let tree_name = format!("{}-custom", name);
let tree = self.inner.open_tree(tree_name)?;
CustomSettings(tree)
let cf_name = format!("{}-custom", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
CustomSettings(self.inner.clone(), cf_name)
};
let raw_index = RawIndex { main, words, docs_words, documents, custom };
@ -136,8 +134,8 @@ impl Database {
},
Entry::Vacant(vacant) => {
let main = {
let tree = self.inner.open_tree(name)?;
MainIndex(tree)
self.inner.create_cf(name, &rocksdb::Options::default())?;
MainIndex(self.inner.clone(), name.to_owned())
};
if let Some(prev_schema) = main.schema()? {
@ -149,27 +147,27 @@ impl Database {
main.set_schema(&schema)?;
let words = {
let tree_name = format!("{}-words", name);
let tree = self.inner.open_tree(tree_name)?;
WordsIndex(tree)
let cf_name = format!("{}-words", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
WordsIndex(self.inner.clone(), cf_name)
};
let docs_words = {
let tree_name = format!("{}-docs-words", name);
let tree = self.inner.open_tree(tree_name)?;
DocsWordsIndex(tree)
let cf_name = format!("{}-docs-words", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocsWordsIndex(self.inner.clone(), cf_name)
};
let documents = {
let tree_name = format!("{}-documents", name);
let tree = self.inner.open_tree(tree_name)?;
DocumentsIndex(tree)
let cf_name = format!("{}-documents", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocumentsIndex(self.inner.clone(), cf_name)
};
let custom = {
let tree_name = format!("{}-custom", name);
let tree = self.inner.open_tree(tree_name)?;
CustomSettings(tree)
let cf_name = format!("{}-custom", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
CustomSettings(self.inner.clone(), cf_name)
};
let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new);

View File

@ -5,11 +5,12 @@ use sdset::{Set, SetBuf};
use zerocopy::{LayoutVerified, AsBytes};
#[derive(Clone)]
pub struct WordsIndex(pub Arc<sled::Tree>);
pub struct WordsIndex(pub Arc<rocksdb::DB>, pub String);
impl WordsIndex {
pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> {
match self.0.get(word)? {
pub fn doc_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, rocksdb::Error> {
let cf = self.0.cf_handle(&self.1).unwrap();
match self.0.get_cf(cf, word)? {
Some(bytes) => {
let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout");
let slice = layout.into_slice();
@ -20,13 +21,15 @@ impl WordsIndex {
}
}
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> sled::Result<()> {
self.0.set(word, set.as_bytes())?;
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> Result<(), rocksdb::Error> {
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.put_cf(cf, word, set.as_bytes())?;
Ok(())
}
pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> {
self.0.del(word)?;
pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> {
let cf = self.0.cf_handle(&self.1).unwrap();
self.0.delete_cf(cf, word)?;
Ok(())
}
}

View File

@ -6,7 +6,7 @@ mod ranked_map;
mod serde;
pub mod schema;
pub use sled;
pub use rocksdb;
pub use self::database::{Database, Index, CustomSettings};
pub use self::number::Number;
pub use self::ranked_map::RankedMap;

View File

@ -36,7 +36,7 @@ use crate::schema::SchemaAttr;
pub enum SerializerError {
DocumentIdNotFound,
RmpError(RmpError),
SledError(sled::Error),
RocksdbError(rocksdb::Error),
ParseNumberError(ParseNumberError),
UnserializableType { type_name: &'static str },
UnindexableType { type_name: &'static str },
@ -57,7 +57,7 @@ impl fmt::Display for SerializerError {
write!(f, "serialized document does not have an id according to the schema")
}
SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
SerializerError::SledError(e) => write!(f, "sled related error: {}", e),
SerializerError::RocksdbError(e) => write!(f, "RocksDB related error: {}", e),
SerializerError::ParseNumberError(e) => {
write!(f, "error while trying to parse a number: {}", e)
},
@ -89,9 +89,9 @@ impl From<RmpError> for SerializerError {
}
}
impl From<sled::Error> for SerializerError {
fn from(error: sled::Error) -> SerializerError {
SerializerError::SledError(error)
impl From<rocksdb::Error> for SerializerError {
fn from(error: rocksdb::Error) -> SerializerError {
SerializerError::RocksdbError(error)
}
}

View File

@ -59,7 +59,7 @@ fn index(
let mut system = sysinfo::System::new();
let index = database.create_index("default", schema.clone())?;
let index = database.create_index("test", schema.clone())?;
let mut rdr = csv::Reader::from_path(csv_data_path)?;
let mut raw_record = csv::StringRecord::new();

View File

@ -143,7 +143,7 @@ fn main() -> Result<(), Box<Error>> {
let mut buffer = String::new();
let input = io::stdin();
let index = database.open_index("default")?.unwrap();
let index = database.open_index("test")?.unwrap();
let schema = index.schema();
println!("database prepared for you in {:.2?}", start.elapsed());