From 4c973238a19dded9bc315db0e3e6b78be9c56520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 23 May 2019 14:47:10 +0200 Subject: [PATCH] feat: Introduce a basic RocksDB based version --- meilidb-data/Cargo.toml | 6 +- meilidb-data/src/database/custom_settings.rs | 6 +- meilidb-data/src/database/docs_words_index.rs | 13 +-- meilidb-data/src/database/documents_index.rs | 52 +++++++----- meilidb-data/src/database/error.rs | 10 +-- meilidb-data/src/database/main_index.rs | 22 +++-- meilidb-data/src/database/mod.rs | 82 +++++++++---------- meilidb-data/src/database/words_index.rs | 17 ++-- meilidb-data/src/lib.rs | 2 +- meilidb-data/src/serde/mod.rs | 10 +-- meilidb/examples/create-database.rs | 2 +- meilidb/examples/query-database.rs | 2 +- 12 files changed, 119 insertions(+), 105 deletions(-) diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index 03e6f0074..8c6fe5845 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -16,7 +16,7 @@ ordered-float = { version = "1.0.2", features = ["serde"] } sdset = "0.3.2" serde = { version = "1.0.91", features = ["derive"] } serde_json = { version = "1.0.39", features = ["preserve_order"] } -sled = "0.23.0" +rocksdb = "0.12.2" toml = { version = "0.5.0", features = ["preserve_order"] } zerocopy = "0.2.2" @@ -28,9 +28,5 @@ rev = "40b3d48" git = "https://github.com/Kerollmops/fst.git" branch = "arc-byte-slice" -[features] -default = [] -compression = ["sled/compression"] - [dev-dependencies] tempfile = "3.0.7" diff --git a/meilidb-data/src/database/custom_settings.rs b/meilidb-data/src/database/custom_settings.rs index 565151aaa..b9227d0bb 100644 --- a/meilidb-data/src/database/custom_settings.rs +++ b/meilidb-data/src/database/custom_settings.rs @@ -2,12 +2,12 @@ use std::sync::Arc; use std::ops::Deref; #[derive(Clone)] -pub struct CustomSettings(pub Arc); +pub struct CustomSettings(pub Arc, pub String); impl Deref for CustomSettings { - type Target = sled::Tree; + type Target = rocksdb::DB; - fn deref(&self) -> &sled::Tree { + fn deref(&self) -> &Self::Target { &self.0 } } diff --git a/meilidb-data/src/database/docs_words_index.rs b/meilidb-data/src/database/docs_words_index.rs index 6b7de15a2..38430a9d1 100644 --- a/meilidb-data/src/database/docs_words_index.rs +++ b/meilidb-data/src/database/docs_words_index.rs @@ -3,15 +3,16 @@ use meilidb_core::DocumentId; use super::Error; #[derive(Clone)] -pub struct DocsWordsIndex(pub Arc); +pub struct DocsWordsIndex(pub Arc, pub String); impl DocsWordsIndex { pub fn doc_words(&self, id: DocumentId) -> Result, Error> { let key = id.0.to_be_bytes(); - match self.0.get(key)? { + let cf = self.0.cf_handle(&self.1).unwrap(); + match self.0.get_pinned_cf(cf, key)? { Some(bytes) => { let len = bytes.len(); - let value = bytes.into(); + let value = Arc::from(bytes.as_ref()); let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; Ok(Some(fst::Set::from(fst))) }, @@ -21,13 +22,15 @@ impl DocsWordsIndex { pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> { let key = id.0.to_be_bytes(); - self.0.set(key, words.as_fst().as_bytes())?; + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.put_cf(cf, key, words.as_fst().as_bytes())?; Ok(()) } pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { let key = id.0.to_be_bytes(); - self.0.del(key)?; + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.delete_cf(cf, key)?; Ok(()) } } diff --git a/meilidb-data/src/database/documents_index.rs b/meilidb-data/src/database/documents_index.rs index 5fd276bb3..1326a480d 100644 --- a/meilidb-data/src/database/documents_index.rs +++ b/meilidb-data/src/database/documents_index.rs @@ -2,69 +2,77 @@ use std::sync::Arc; use std::convert::TryInto; use meilidb_core::DocumentId; -use sled::IVec; +use rocksdb::DBVector; use crate::document_attr_key::DocumentAttrKey; use crate::schema::SchemaAttr; #[derive(Clone)] -pub struct DocumentsIndex(pub Arc); +pub struct DocumentsIndex(pub Arc, pub String); impl DocumentsIndex { - pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result> { + pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result, rocksdb::Error> { let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.get(key) + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.get_cf(cf, key) } - pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> sled::Result<()> { + pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> Result<(), rocksdb::Error> { let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.set(key, value)?; + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.put_cf(cf, key, value)?; Ok(()) } - pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { + pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> { let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.del(key)?; + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.delete_cf(cf, key)?; Ok(()) } - pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { + pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> { let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - let document_attrs = self.0.range(start..=end).keys(); - for key in document_attrs { - self.0.del(key?)?; - } + let cf = self.0.cf_handle(&self.1).unwrap(); + let mut batch = rocksdb::WriteBatch::default(); + batch.delete_range_cf(cf, start, end)?; + self.0.write(batch)?; Ok(()) } pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { - let start = DocumentAttrKey::new(id, SchemaAttr::min()); - let start = start.to_be_bytes(); + let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); + let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - let end = DocumentAttrKey::new(id, SchemaAttr::max()); - let end = end.to_be_bytes(); + let cf = self.0.cf_handle(&self.1).unwrap(); + let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward); + let iter = self.0.iterator_cf(cf, from).unwrap(); - DocumentFieldsIter(self.0.range(start..=end)) + DocumentFieldsIter(iter, end.to_vec()) } } -pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); +pub struct DocumentFieldsIter<'a>(rocksdb::DBIterator<'a>, Vec); impl<'a> Iterator for DocumentFieldsIter<'a> { - type Item = sled::Result<(SchemaAttr, IVec)>; + type Item = Result<(SchemaAttr, Box<[u8]>), rocksdb::Error>; fn next(&mut self) -> Option { match self.0.next() { - Some(Ok((key, value))) => { + Some((key, value)) => { + + if key.as_ref() > self.1.as_ref() { + return None; + } + let slice: &[u8] = key.as_ref(); let array = slice.try_into().unwrap(); let key = DocumentAttrKey::from_be_bytes(array); Some(Ok((key.attribute, value))) }, - Some(Err(e)) => Some(Err(e)), None => None, } } diff --git a/meilidb-data/src/database/error.rs b/meilidb-data/src/database/error.rs index 3e1b48235..99b90e056 100644 --- a/meilidb-data/src/database/error.rs +++ b/meilidb-data/src/database/error.rs @@ -7,15 +7,15 @@ pub enum Error { SchemaMissing, WordIndexMissing, MissingDocumentId, - SledError(sled::Error), + RocksdbError(rocksdb::Error), FstError(fst::Error), BincodeError(bincode::Error), SerializerError(SerializerError), } -impl From for Error { - fn from(error: sled::Error) -> Error { - Error::SledError(error) +impl From for Error { + fn from(error: rocksdb::Error) -> Error { + Error::RocksdbError(error) } } @@ -45,7 +45,7 @@ impl fmt::Display for Error { SchemaMissing => write!(f, "this index does not have a schema"), WordIndexMissing => write!(f, "this index does not have a word index"), MissingDocumentId => write!(f, "document id is missing"), - SledError(e) => write!(f, "sled error; {}", e), + RocksdbError(e) => write!(f, "RocksDB error; {}", e), FstError(e) => write!(f, "fst error; {}", e), BincodeError(e) => write!(f, "bincode error; {}", e), SerializerError(e) => write!(f, "serializer error; {}", e), diff --git a/meilidb-data/src/database/main_index.rs b/meilidb-data/src/database/main_index.rs index b1d8edc81..251dd78e7 100644 --- a/meilidb-data/src/database/main_index.rs +++ b/meilidb-data/src/database/main_index.rs @@ -6,11 +6,12 @@ use crate::schema::Schema; use super::Error; #[derive(Clone)] -pub struct MainIndex(pub Arc); +pub struct MainIndex(pub Arc, pub String); impl MainIndex { pub fn schema(&self) -> Result, Error> { - match self.0.get("schema")? { + let cf = self.0.cf_handle(&self.1).unwrap(); + match self.0.get_cf(cf, "schema")? { Some(bytes) => { let schema = Schema::read_from_bin(bytes.as_ref())?; Ok(Some(schema)) @@ -22,15 +23,17 @@ impl MainIndex { pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { let mut bytes = Vec::new(); schema.write_to_bin(&mut bytes)?; - self.0.set("schema", bytes)?; + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.put_cf(cf, "schema", bytes)?; Ok(()) } pub fn words_set(&self) -> Result, Error> { - match self.0.get("words")? { + let cf = self.0.cf_handle(&self.1).unwrap(); + match self.0.get_pinned_cf(cf, "words")? { Some(bytes) => { let len = bytes.len(); - let value = bytes.into(); + let value = Arc::from(bytes.as_ref()); let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; Ok(Some(fst::Set::from(fst))) }, @@ -39,12 +42,14 @@ impl MainIndex { } pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { - self.0.set("words", value.as_fst().as_bytes())?; + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.put_cf(cf, "words", value.as_fst().as_bytes())?; Ok(()) } pub fn ranked_map(&self) -> Result, Error> { - match self.0.get("ranked-map")? { + let cf = self.0.cf_handle(&self.1).unwrap(); + match self.0.get_cf(cf, "ranked-map")? { Some(bytes) => { let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; Ok(Some(ranked_map)) @@ -56,7 +61,8 @@ impl MainIndex { pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> { let mut bytes = Vec::new(); value.write_to_bin(&mut bytes)?; - self.0.set("ranked_map", bytes)?; + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.put_cf(cf, "ranked_map", bytes)?; Ok(()) } } diff --git a/meilidb-data/src/database/mod.rs b/meilidb-data/src/database/mod.rs index 9e14f8168..0231a6d2f 100644 --- a/meilidb-data/src/database/mod.rs +++ b/meilidb-data/src/database/mod.rs @@ -31,26 +31,24 @@ use self::words_index::WordsIndex; pub struct Database { cache: RwLock>>, - inner: sled::Db, + inner: Arc, } impl Database { pub fn start_default>(path: P) -> Result { + let path = path.as_ref(); let cache = RwLock::new(HashMap::new()); - let config = sled::ConfigBuilder::new().path(path).print_profile_on_drop(true).build(); - let inner = sled::Db::start(config)?; - Ok(Database { cache, inner }) - } - pub fn start_with_compression>(path: P, factor: i32) -> Result { - let config = sled::ConfigBuilder::default() - .use_compression(true) - .compression_factor(factor) - .path(path) - .build(); + let inner = { + let options = { + let mut options = rocksdb::Options::default(); + options.create_if_missing(true); + options + }; + let cfs = rocksdb::DB::list_cf(&options, path).unwrap_or(Vec::new()); + Arc::new(rocksdb::DB::open_cf(&options, path, cfs)?) + }; - let cache = RwLock::new(HashMap::new()); - let inner = sled::Db::start(config)?; Ok(Database { cache, inner }) } @@ -66,7 +64,7 @@ impl Database { fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { let bytes = bincode::serialize(value)?; - self.inner.set("indexes", bytes)?; + self.inner.put("indexes", bytes)?; Ok(()) } @@ -89,32 +87,32 @@ impl Database { } let main = { - let tree = self.inner.open_tree(name)?; - MainIndex(tree) + self.inner.cf_handle(name).expect("cf not found"); + MainIndex(self.inner.clone(), name.to_owned()) }; let words = { - let tree_name = format!("{}-words", name); - let tree = self.inner.open_tree(tree_name)?; - WordsIndex(tree) + let cf_name = format!("{}-words", name); + self.inner.cf_handle(&cf_name).expect("cf not found"); + WordsIndex(self.inner.clone(), cf_name) }; let docs_words = { - let tree_name = format!("{}-docs-words", name); - let tree = self.inner.open_tree(tree_name)?; - DocsWordsIndex(tree) + let cf_name = format!("{}-docs-words", name); + self.inner.cf_handle(&cf_name).expect("cf not found"); + DocsWordsIndex(self.inner.clone(), cf_name) }; let documents = { - let tree_name = format!("{}-documents", name); - let tree = self.inner.open_tree(tree_name)?; - DocumentsIndex(tree) + let cf_name = format!("{}-documents", name); + self.inner.cf_handle(&cf_name).expect("cf not found"); + DocumentsIndex(self.inner.clone(), cf_name) }; let custom = { - let tree_name = format!("{}-custom", name); - let tree = self.inner.open_tree(tree_name)?; - CustomSettings(tree) + let cf_name = format!("{}-custom", name); + self.inner.cf_handle(&cf_name).expect("cf not found"); + CustomSettings(self.inner.clone(), cf_name) }; let raw_index = RawIndex { main, words, docs_words, documents, custom }; @@ -136,8 +134,8 @@ impl Database { }, Entry::Vacant(vacant) => { let main = { - let tree = self.inner.open_tree(name)?; - MainIndex(tree) + self.inner.create_cf(name, &rocksdb::Options::default())?; + MainIndex(self.inner.clone(), name.to_owned()) }; if let Some(prev_schema) = main.schema()? { @@ -149,27 +147,27 @@ impl Database { main.set_schema(&schema)?; let words = { - let tree_name = format!("{}-words", name); - let tree = self.inner.open_tree(tree_name)?; - WordsIndex(tree) + let cf_name = format!("{}-words", name); + self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; + WordsIndex(self.inner.clone(), cf_name) }; let docs_words = { - let tree_name = format!("{}-docs-words", name); - let tree = self.inner.open_tree(tree_name)?; - DocsWordsIndex(tree) + let cf_name = format!("{}-docs-words", name); + self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; + DocsWordsIndex(self.inner.clone(), cf_name) }; let documents = { - let tree_name = format!("{}-documents", name); - let tree = self.inner.open_tree(tree_name)?; - DocumentsIndex(tree) + let cf_name = format!("{}-documents", name); + self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; + DocumentsIndex(self.inner.clone(), cf_name) }; let custom = { - let tree_name = format!("{}-custom", name); - let tree = self.inner.open_tree(tree_name)?; - CustomSettings(tree) + let cf_name = format!("{}-custom", name); + self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; + CustomSettings(self.inner.clone(), cf_name) }; let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); diff --git a/meilidb-data/src/database/words_index.rs b/meilidb-data/src/database/words_index.rs index 3b2598186..862a918c4 100644 --- a/meilidb-data/src/database/words_index.rs +++ b/meilidb-data/src/database/words_index.rs @@ -5,11 +5,12 @@ use sdset::{Set, SetBuf}; use zerocopy::{LayoutVerified, AsBytes}; #[derive(Clone)] -pub struct WordsIndex(pub Arc); +pub struct WordsIndex(pub Arc, pub String); impl WordsIndex { - pub fn doc_indexes(&self, word: &[u8]) -> sled::Result>> { - match self.0.get(word)? { + pub fn doc_indexes(&self, word: &[u8]) -> Result>, rocksdb::Error> { + let cf = self.0.cf_handle(&self.1).unwrap(); + match self.0.get_cf(cf, word)? { Some(bytes) => { let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout"); let slice = layout.into_slice(); @@ -20,13 +21,15 @@ impl WordsIndex { } } - pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> sled::Result<()> { - self.0.set(word, set.as_bytes())?; + pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> Result<(), rocksdb::Error> { + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.put_cf(cf, word, set.as_bytes())?; Ok(()) } - pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { - self.0.del(word)?; + pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> { + let cf = self.0.cf_handle(&self.1).unwrap(); + self.0.delete_cf(cf, word)?; Ok(()) } } diff --git a/meilidb-data/src/lib.rs b/meilidb-data/src/lib.rs index 79cc3a3e9..520a3e6e2 100644 --- a/meilidb-data/src/lib.rs +++ b/meilidb-data/src/lib.rs @@ -6,7 +6,7 @@ mod ranked_map; mod serde; pub mod schema; -pub use sled; +pub use rocksdb; pub use self::database::{Database, Index, CustomSettings}; pub use self::number::Number; pub use self::ranked_map::RankedMap; diff --git a/meilidb-data/src/serde/mod.rs b/meilidb-data/src/serde/mod.rs index 1e2854c36..0fe5918cc 100644 --- a/meilidb-data/src/serde/mod.rs +++ b/meilidb-data/src/serde/mod.rs @@ -36,7 +36,7 @@ use crate::schema::SchemaAttr; pub enum SerializerError { DocumentIdNotFound, RmpError(RmpError), - SledError(sled::Error), + RocksdbError(rocksdb::Error), ParseNumberError(ParseNumberError), UnserializableType { type_name: &'static str }, UnindexableType { type_name: &'static str }, @@ -57,7 +57,7 @@ impl fmt::Display for SerializerError { write!(f, "serialized document does not have an id according to the schema") } SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), - SerializerError::SledError(e) => write!(f, "sled related error: {}", e), + SerializerError::RocksdbError(e) => write!(f, "RocksDB related error: {}", e), SerializerError::ParseNumberError(e) => { write!(f, "error while trying to parse a number: {}", e) }, @@ -89,9 +89,9 @@ impl From for SerializerError { } } -impl From for SerializerError { - fn from(error: sled::Error) -> SerializerError { - SerializerError::SledError(error) +impl From for SerializerError { + fn from(error: rocksdb::Error) -> SerializerError { + SerializerError::RocksdbError(error) } } diff --git a/meilidb/examples/create-database.rs b/meilidb/examples/create-database.rs index f19c32a31..1518b4298 100644 --- a/meilidb/examples/create-database.rs +++ b/meilidb/examples/create-database.rs @@ -59,7 +59,7 @@ fn index( let mut system = sysinfo::System::new(); - let index = database.create_index("default", schema.clone())?; + let index = database.create_index("test", schema.clone())?; let mut rdr = csv::Reader::from_path(csv_data_path)?; let mut raw_record = csv::StringRecord::new(); diff --git a/meilidb/examples/query-database.rs b/meilidb/examples/query-database.rs index 1be27dce1..4fd529c27 100644 --- a/meilidb/examples/query-database.rs +++ b/meilidb/examples/query-database.rs @@ -143,7 +143,7 @@ fn main() -> Result<(), Box> { let mut buffer = String::new(); let input = io::stdin(); - let index = database.open_index("default")?.unwrap(); + let index = database.open_index("test")?.unwrap(); let schema = index.schema(); println!("database prepared for you in {:.2?}", start.elapsed());