diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index f0e6dc6e0..16bc204d4 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] byteorder = "1.3.1" -hashbrown = "0.1.8" +hashbrown = "0.2.2" lazy_static = "1.2.0" log = "0.4.6" meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index c3dc2dc05..e6fca8c66 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -8,13 +8,17 @@ edition = "2018" arc-swap = "0.3.11" bincode = "1.1.2" byteorder = "1.3.1" -hashbrown = { version = "0.1.8", features = ["serde"] } +hashbrown = { version = "0.2.2", features = ["serde"] } linked-hash-map = { version = "0.5.2", features = ["serde_impl"] } meilidb-core = { path = "../meilidb-core", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } ordered-float = { version = "1.0.2", features = ["serde"] } sdset = "0.3.1" -serde = { version = "1.0.88", features = ["derive"] } +serde = { version = "1.0.90", features = ["derive"] } serde_json = { version = "1.0.39", features = ["preserve_order"] } sled = "0.22.1" toml = { version = "0.5.0", features = ["preserve_order"] } + +[dependencies.rmp-serde] +git = "https://github.com/3Hren/msgpack-rust.git" +rev = "40b3d48" diff --git a/meilidb-data/src/database.rs b/meilidb-data/src/database.rs index baacddc35..6aac48b48 100644 --- a/meilidb-data/src/database.rs +++ b/meilidb-data/src/database.rs @@ -1,3 +1,6 @@ +use std::collections::HashSet; +use std::io::{self, Cursor, BufRead}; +use std::iter::FromIterator; use std::path::Path; use std::sync::Arc; @@ -6,7 +9,11 @@ use hashbrown::HashMap; use meilidb_core::shared_data_cursor::{FromSharedDataCursor, SharedDataCursor}; use meilidb_core::write_to_bytes::WriteToBytes; use meilidb_core::{DocumentId, Index as WordIndex}; +use rmp_serde::decode::{Deserializer as RmpDeserializer, ReadReader}; +use rmp_serde::decode::{Error as RmpError}; +use serde::{de, forward_to_deserialize_any}; use sled::IVec; +use byteorder::{ReadBytesExt, BigEndian}; use crate::{Schema, SchemaAttr, RankedMap}; @@ -46,6 +53,37 @@ fn document_key(id: DocumentId, attr: SchemaAttr) -> Vec { bytes } +trait CursorExt { + fn consume_if_eq(&mut self, needle: &[u8]) -> bool; +} + +impl> CursorExt for Cursor { + fn consume_if_eq(&mut self, needle: &[u8]) -> bool { + let position = self.position() as usize; + let slice = self.get_ref().as_ref(); + + if slice[position..].starts_with(needle) { + self.consume(needle.len()); + true + } else { + false + } + } +} + +fn extract_document_key(key: Vec) -> io::Result<(DocumentId, SchemaAttr)> { + let mut key = Cursor::new(key); + + if !key.consume_if_eq(b"document-") { + return Err(io::Error::from(io::ErrorKind::InvalidData)) + } + + let document_id = key.read_u64::().map(DocumentId)?; + let schema_attr = key.read_u16::().map(SchemaAttr)?; + + Ok((document_id, schema_attr)) +} + fn ivec_into_arc(ivec: IVec) -> Arc<[u8]> { match ivec { IVec::Inline(len, bytes) => Arc::from(&bytes[..len as usize]), @@ -55,7 +93,7 @@ fn ivec_into_arc(ivec: IVec) -> Arc<[u8]> { #[derive(Clone)] pub struct Database { - opened: Arc>>, + opened: Arc>>, inner: sled::Db, } @@ -68,22 +106,22 @@ impl Database { pub fn open_index(&self, name: &str) -> Result, Error> { // check if the index was already opened - if let Some(index) = self.opened.lease().get(name) { - return Ok(Some(index.clone())) + if let Some(raw_index) = self.opened.lease().get(name) { + return Ok(Some(Index(raw_index.clone()))) } let raw_name = index_name(name); if self.inner.tree_names().into_iter().any(|tn| tn == raw_name) { let tree = self.inner.open_tree(raw_name)?; - let index = Index::from_raw(tree)?; + let raw_index = RawIndex::from_raw(tree)?; self.opened.rcu(|opened| { let mut opened = HashMap::clone(opened); - opened.insert(name.to_string(), index.clone()); + opened.insert(name.to_string(), raw_index.clone()); opened }); - return Ok(Some(index)) + return Ok(Some(Index(raw_index))) } Ok(None) @@ -92,7 +130,7 @@ impl Database { pub fn create_index(&self, name: String, schema: Schema) -> Result { match self.open_index(&name)? { Some(index) => { - if index.schema != schema { + if index.schema() != &schema { return Err(Error::SchemaDiffer); } @@ -101,30 +139,30 @@ impl Database { None => { let raw_name = index_name(&name); let tree = self.inner.open_tree(raw_name)?; - let index = Index::new_from_raw(tree, schema)?; + let raw_index = RawIndex::new_from_raw(tree, schema)?; self.opened.rcu(|opened| { let mut opened = HashMap::clone(opened); - opened.insert(name.clone(), index.clone()); + opened.insert(name.clone(), raw_index.clone()); opened }); - Ok(index) + Ok(Index(raw_index)) }, } } } #[derive(Clone)] -pub struct Index { +pub struct RawIndex { schema: Schema, word_index: Arc>, ranked_map: Arc>, inner: Arc, } -impl Index { - fn from_raw(inner: Arc) -> Result { +impl RawIndex { + fn from_raw(inner: Arc) -> Result { let schema = { let bytes = inner.get("schema")?; let bytes = bytes.ok_or(Error::SchemaMissing)?; @@ -153,10 +191,10 @@ impl Index { Arc::new(ArcSwap::new(Arc::new(map))) }; - Ok(Index { schema, word_index, ranked_map, inner }) + Ok(RawIndex { schema, word_index, ranked_map, inner }) } - fn new_from_raw(inner: Arc, schema: Schema) -> Result { + fn new_from_raw(inner: Arc, schema: Schema) -> Result { let mut schema_bytes = Vec::new(); schema.write_to_bin(&mut schema_bytes)?; inner.set("schema", schema_bytes)?; @@ -167,7 +205,7 @@ impl Index { let ranked_map = Arc::new(ArcSwap::new(Arc::new(RankedMap::default()))); - Ok(Index { schema, word_index, ranked_map, inner }) + Ok(RawIndex { schema, word_index, ranked_map, inner }) } pub fn schema(&self) -> &Schema { @@ -182,11 +220,11 @@ impl Index { self.ranked_map.lease() } - fn update_word_index(&self, word_index: Arc) { + pub fn update_word_index(&self, word_index: Arc) { self.word_index.store(word_index) } - fn update_ranked_map(&self, ranked_map: Arc) { + pub fn update_ranked_map(&self, ranked_map: Arc) { self.ranked_map.store(ranked_map) } @@ -212,6 +250,12 @@ impl Index { Ok(self.inner.get(key)?) } + pub fn get_document_fields(&self, id: DocumentId) -> DocumentFieldsIter { + let start = document_key(id, SchemaAttr::min()); + let end = document_key(id, SchemaAttr::max()); + DocumentFieldsIter(self.inner.range(start..=end)) + } + pub fn del_document_attribute( &self, id: DocumentId, @@ -222,3 +266,150 @@ impl Index { Ok(self.inner.del(key)?) } } + +pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); + +impl<'a> Iterator for DocumentFieldsIter<'a> { + type Item = Result<(DocumentId, SchemaAttr, IVec), Error>; + + fn next(&mut self) -> Option { + match self.0.next() { + Some(Ok((key, value))) => { + let (id, attr) = extract_document_key(key).unwrap(); + Some(Ok((id, attr, value))) + }, + Some(Err(e)) => Some(Err(Error::SledError(e))), + None => None, + } + } +} + +#[derive(Clone)] +pub struct Index(RawIndex); + +impl Index { + pub fn schema(&self) -> &Schema { + self.0.schema() + } + + pub fn word_index(&self) -> Lease> { + self.0.word_index() + } + + pub fn ranked_map(&self) -> Lease> { + self.0.ranked_map() + } + + pub fn document( + &self, + fields: Option<&HashSet<&str>>, + id: DocumentId, + ) -> Result, RmpError> + where T: de::DeserializeOwned, + { + let fields = match fields { + Some(fields) => { + let iter = fields.iter().filter_map(|n| self.0.schema().attribute(n)); + Some(HashSet::from_iter(iter)) + }, + None => None, + }; + + let mut deserializer = Deserializer { + document_id: id, + raw_index: &self.0, + fields: fields.as_ref(), + }; + + // TODO: currently we return an error if all document fields are missing, + // returning None would have been better + T::deserialize(&mut deserializer).map(Some) + } +} + +struct Deserializer<'a> { + document_id: DocumentId, + raw_index: &'a RawIndex, + fields: Option<&'a HashSet>, +} + +impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> +{ + type Error = RmpError; + + fn deserialize_any(self, visitor: V) -> Result + where V: de::Visitor<'de> + { + self.deserialize_map(visitor) + } + + forward_to_deserialize_any! { + bool u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 char str string unit seq + bytes byte_buf unit_struct tuple_struct + identifier tuple ignored_any option newtype_struct enum struct + } + + fn deserialize_map(self, visitor: V) -> Result + where V: de::Visitor<'de> + { + let document_attributes = self.raw_index.get_document_fields(self.document_id); + let document_attributes = document_attributes.filter_map(|result| { + match result { + Ok(value) => Some(value), + Err(e) => { + // TODO: must log the error + // error!("sled iter error; {}", e); + None + }, + } + }); + let iter = document_attributes.filter_map(|(_, attr, value)| { + if self.fields.map_or(true, |f| f.contains(&attr)) { + let attribute_name = self.raw_index.schema.attribute_name(attr); + Some((attribute_name, Value::new(value))) + } else { + None + } + }); + + let map_deserializer = de::value::MapDeserializer::new(iter); + visitor.visit_map(map_deserializer) + } +} + +struct Value(RmpDeserializer>>) where A: AsRef<[u8]>; + +impl Value where A: AsRef<[u8]> +{ + fn new(value: A) -> Value { + Value(RmpDeserializer::new(Cursor::new(value))) + } +} + +impl<'de, A> de::IntoDeserializer<'de, RmpError> for Value +where A: AsRef<[u8]>, +{ + type Deserializer = Self; + + fn into_deserializer(self) -> Self::Deserializer { + self + } +} + +impl<'de, 'a, A> de::Deserializer<'de> for Value +where A: AsRef<[u8]>, +{ + type Error = RmpError; + + fn deserialize_any(mut self, visitor: V) -> Result + where V: de::Visitor<'de> + { + self.0.deserialize_any(visitor) + } + + forward_to_deserialize_any! { + bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string + bytes byte_buf option unit unit_struct newtype_struct seq tuple + tuple_struct map struct enum identifier ignored_any + } +}