diff --git a/meilidb-data/src/database.rs b/meilidb-data/src/database.rs deleted file mode 100644 index badbc2bf8..000000000 --- a/meilidb-data/src/database.rs +++ /dev/null @@ -1,719 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::{BTreeSet, HashSet, HashMap}; -use std::convert::TryInto; -use std::path::Path; -use std::sync::{Arc, RwLock}; -use std::{error, fmt}; - -use arc_swap::{ArcSwap, Lease}; -use meilidb_core::{criterion::Criteria, QueryBuilder, Store, DocumentId, DocIndex}; -use rmp_serde::decode::{Error as RmpError}; -use sdset::{Set, SetBuf, SetOperation, duo::{Union, DifferenceByKey}}; -use serde::de; -use sled::IVec; -use zerocopy::{AsBytes, LayoutVerified}; -use fst::{SetBuilder, set::OpBuilder, Streamer}; - -use crate::document_attr_key::DocumentAttrKey; -use crate::indexer::Indexer; -use crate::serde::extract_document_id; -use crate::serde::{Serializer, RamDocumentStore, Deserializer, SerializerError}; -use crate::{Schema, SchemaAttr, RankedMap}; - -#[derive(Debug)] -pub enum Error { - SchemaDiffer, - SchemaMissing, - WordIndexMissing, - MissingDocumentId, - SledError(sled::Error), - FstError(fst::Error), - BincodeError(bincode::Error), - SerializerError(SerializerError), -} - -impl From for Error { - fn from(error: sled::Error) -> Error { - Error::SledError(error) - } -} - -impl From for Error { - fn from(error: fst::Error) -> Error { - Error::FstError(error) - } -} - -impl From for Error { - fn from(error: bincode::Error) -> Error { - Error::BincodeError(error) - } -} - -impl From for Error { - fn from(error: SerializerError) -> Error { - Error::SerializerError(error) - } -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::Error::*; - match self { - SchemaDiffer => write!(f, "schemas differ"), - SchemaMissing => write!(f, "this index does not have a schema"), - WordIndexMissing => write!(f, "this index does not have a word index"), - MissingDocumentId => write!(f, "document id is missing"), - SledError(e) => write!(f, "sled error; {}", e), - FstError(e) => write!(f, "fst error; {}", e), - BincodeError(e) => write!(f, "bincode error; {}", e), - SerializerError(e) => write!(f, "serializer error; {}", e), - } - } -} - -impl error::Error for Error { } - -pub struct Database { - cache: RwLock>>, - inner: sled::Db, -} - -impl Database { - pub fn start_default>(path: P) -> Result { - let cache = RwLock::new(HashMap::new()); - let inner = sled::Db::start_default(path)?; - Ok(Database { cache, inner }) - } - - pub fn indexes(&self) -> Result>, Error> { - let bytes = match self.inner.get("indexes")? { - Some(bytes) => bytes, - None => return Ok(None), - }; - - let indexes = bincode::deserialize(&bytes)?; - Ok(Some(indexes)) - } - - pub fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { - let bytes = bincode::serialize(value)?; - self.inner.set("indexes", bytes)?; - Ok(()) - } - - pub fn open_index(&self, name: &str) -> Result>, Error> { - { - let cache = self.cache.read().unwrap(); - if let Some(index) = cache.get(name).cloned() { - return Ok(Some(index)) - } - } - - let mut cache = self.cache.write().unwrap(); - let index = match cache.entry(name.to_string()) { - Entry::Occupied(occupied) => { - occupied.get().clone() - }, - Entry::Vacant(vacant) => { - if !self.indexes()?.map_or(false, |x| x.contains(name)) { - return Ok(None) - } - - let main = { - let tree = self.inner.open_tree(name)?; - MainIndex(tree) - }; - - let words = { - let tree_name = format!("{}-words", name); - let tree = self.inner.open_tree(tree_name)?; - WordsIndex(tree) - }; - - let docs_words = { - let tree_name = format!("{}-docs-words", name); - let tree = self.inner.open_tree(tree_name)?; - DocsWords(tree) - }; - - let documents = { - let tree_name = format!("{}-documents", name); - let tree = self.inner.open_tree(tree_name)?; - DocumentsIndex(tree) - }; - - let raw_index = RawIndex { main, words, docs_words, documents }; - let index = Index::from_raw(raw_index)?; - - vacant.insert(Arc::new(index)).clone() - }, - }; - - Ok(Some(index)) - } - - pub fn create_index(&self, name: &str, schema: Schema) -> Result, Error> { - let mut cache = self.cache.write().unwrap(); - - let index = match cache.entry(name.to_string()) { - Entry::Occupied(occupied) => { - occupied.get().clone() - }, - Entry::Vacant(vacant) => { - let main = { - let tree = self.inner.open_tree(name)?; - MainIndex(tree) - }; - - if let Some(prev_schema) = main.schema()? { - if prev_schema != schema { - return Err(Error::SchemaDiffer) - } - } - - main.set_schema(&schema)?; - - let words = { - let tree_name = format!("{}-words", name); - let tree = self.inner.open_tree(tree_name)?; - WordsIndex(tree) - }; - - let docs_words = { - let tree_name = format!("{}-docs-words", name); - let tree = self.inner.open_tree(tree_name)?; - DocsWords(tree) - }; - - let documents = { - let tree_name = format!("{}-documents", name); - let tree = self.inner.open_tree(tree_name)?; - DocumentsIndex(tree) - }; - - let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); - indexes.insert(name.to_string()); - self.set_indexes(&indexes)?; - - let raw_index = RawIndex { main, words, docs_words, documents }; - let index = Index::from_raw(raw_index)?; - - vacant.insert(Arc::new(index)).clone() - }, - }; - - Ok(index) - } -} - -#[derive(Clone)] -pub struct RawIndex { - pub main: MainIndex, - pub words: WordsIndex, - pub docs_words: DocsWords, - pub documents: DocumentsIndex, -} - -#[derive(Clone)] -pub struct MainIndex(Arc); - -impl MainIndex { - pub fn schema(&self) -> Result, Error> { - match self.0.get("schema")? { - Some(bytes) => { - let schema = Schema::read_from_bin(bytes.as_ref())?; - Ok(Some(schema)) - }, - None => Ok(None), - } - } - - pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { - let mut bytes = Vec::new(); - schema.write_to_bin(&mut bytes)?; - self.0.set("schema", bytes)?; - Ok(()) - } - - pub fn words_set(&self) -> Result, Error> { - match self.0.get("words")? { - Some(bytes) => { - let len = bytes.len(); - let value = bytes.into(); - let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; - Ok(Some(fst::Set::from(fst))) - }, - None => Ok(None), - } - } - - pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { - self.0.set("words", value.as_fst().as_bytes())?; - Ok(()) - } - - pub fn ranked_map(&self) -> Result, Error> { - match self.0.get("ranked-map")? { - Some(bytes) => { - let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; - Ok(Some(ranked_map)) - }, - None => Ok(None), - } - } - - pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> { - let mut bytes = Vec::new(); - value.write_to_bin(&mut bytes)?; - self.0.set("ranked_map", bytes)?; - Ok(()) - } -} - -#[derive(Clone)] -pub struct WordsIndex(Arc); - -impl WordsIndex { - pub fn doc_indexes(&self, word: &[u8]) -> sled::Result>> { - match self.0.get(word)? { - Some(bytes) => { - let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout"); - let slice = layout.into_slice(); - let setbuf = SetBuf::new_unchecked(slice.to_vec()); - Ok(Some(setbuf)) - }, - None => Ok(None), - } - } - - pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> sled::Result<()> { - self.0.set(word, set.as_bytes())?; - Ok(()) - } - - pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { - self.0.del(word)?; - Ok(()) - } -} - -#[derive(Clone)] -pub struct DocsWords(Arc); - -impl DocsWords { - pub fn doc_words(&self, id: DocumentId) -> Result, Error> { - let key = id.0.to_be_bytes(); - match self.0.get(key)? { - Some(bytes) => { - let len = bytes.len(); - let value = bytes.into(); - let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; - Ok(Some(fst::Set::from(fst))) - }, - None => Ok(None) - } - } - - pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> { - let key = id.0.to_be_bytes(); - self.0.set(key, words.as_fst().as_bytes())?; - Ok(()) - } - - pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { - let key = id.0.to_be_bytes(); - self.0.del(key)?; - Ok(()) - } -} - -#[derive(Clone)] -pub struct DocumentsIndex(Arc); - -impl DocumentsIndex { - pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.get(key) - } - - pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> sled::Result<()> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.set(key, value)?; - Ok(()) - } - - pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.del(key)?; - Ok(()) - } - - pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { - let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); - let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - let document_attrs = self.0.range(start..=end).keys(); - - for key in document_attrs { - self.0.del(key?)?; - } - - Ok(()) - } - - pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { - let start = DocumentAttrKey::new(id, SchemaAttr::min()); - let start = start.to_be_bytes(); - - let end = DocumentAttrKey::new(id, SchemaAttr::max()); - let end = end.to_be_bytes(); - - DocumentFieldsIter(self.0.range(start..=end)) - } -} - -pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); - -impl<'a> Iterator for DocumentFieldsIter<'a> { - type Item = sled::Result<(SchemaAttr, IVec)>; - - fn next(&mut self) -> Option { - match self.0.next() { - Some(Ok((key, value))) => { - let slice: &[u8] = key.as_ref(); - let array = slice.try_into().unwrap(); - let key = DocumentAttrKey::from_be_bytes(array); - Some(Ok((key.attribute, value))) - }, - Some(Err(e)) => Some(Err(e)), - None => None, - } - } -} - -#[derive(Clone)] -pub struct Index(ArcSwap); - -pub struct InnerIndex { - pub words: fst::Set, - pub schema: Schema, - pub ranked_map: RankedMap, - pub raw: RawIndex, // TODO this will be a snapshot in the future -} - -impl Index { - fn from_raw(raw: RawIndex) -> Result { - let words = match raw.main.words_set()? { - Some(words) => words, - None => fst::Set::default(), - }; - - let schema = match raw.main.schema()? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; - - let ranked_map = match raw.main.ranked_map()? { - Some(map) => map, - None => RankedMap::default(), - }; - - let inner = InnerIndex { words, schema, ranked_map, raw }; - let index = Index(ArcSwap::new(Arc::new(inner))); - - Ok(index) - } - - pub fn query_builder(&self) -> QueryBuilder { - let lease = IndexLease(self.0.lease()); - QueryBuilder::new(lease) - } - - pub fn query_builder_with_criteria<'c>( - &self, - criteria: Criteria<'c>, - ) -> QueryBuilder<'c, IndexLease> - { - let lease = IndexLease(self.0.lease()); - QueryBuilder::with_criteria(lease, criteria) - } - - pub fn lease_inner(&self) -> Lease> { - self.0.lease() - } - - pub fn schema(&self) -> Schema { - self.0.lease().schema.clone() - } - - pub fn documents_addition(&self) -> DocumentsAddition { - let ranked_map = self.0.lease().ranked_map.clone(); - DocumentsAddition::new(self, ranked_map) - } - - pub fn documents_deletion(&self) -> DocumentsDeletion { - DocumentsDeletion::new(self) - } - - pub fn document( - &self, - fields: Option<&HashSet<&str>>, - id: DocumentId, - ) -> Result, RmpError> - where T: de::DeserializeOwned, - { - let schema = &self.lease_inner().schema; - let fields = fields - .map(|fields| { - fields - .into_iter() - .filter_map(|name| schema.attribute(name)) - .collect() - }); - - let mut deserializer = Deserializer { - document_id: id, - index: &self, - fields: fields.as_ref(), - }; - - // TODO: currently we return an error if all document fields are missing, - // returning None would have been better - T::deserialize(&mut deserializer).map(Some) - } -} - -pub struct IndexLease(Lease>); - -impl Store for IndexLease { - type Error = Error; - - fn words(&self) -> Result<&fst::Set, Self::Error> { - Ok(&self.0.words) - } - - fn word_indexes(&self, word: &[u8]) -> Result>, Self::Error> { - Ok(self.0.raw.words.doc_indexes(word)?) - } -} - -pub struct DocumentsAddition<'a> { - inner: &'a Index, - document_ids: HashSet, - document_store: RamDocumentStore, - indexer: Indexer, - ranked_map: RankedMap, -} - -impl<'a> DocumentsAddition<'a> { - fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> { - DocumentsAddition { - inner, - document_ids: HashSet::new(), - document_store: RamDocumentStore::new(), - indexer: Indexer::new(), - ranked_map, - } - } - - pub fn update_document(&mut self, document: D) -> Result<(), Error> - where D: serde::Serialize, - { - let schema = &self.inner.lease_inner().schema; - let identifier = schema.identifier_name(); - - let document_id = match extract_document_id(identifier, &document)? { - Some(id) => id, - None => return Err(Error::MissingDocumentId), - }; - - // 1. store the document id for future deletion - self.document_ids.insert(document_id); - - // 2. index the document fields in ram stores - let serializer = Serializer { - schema, - document_store: &mut self.document_store, - indexer: &mut self.indexer, - ranked_map: &mut self.ranked_map, - document_id, - }; - - document.serialize(serializer)?; - - Ok(()) - } - - pub fn finalize(self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let main = &lease_inner.raw.main; - let words = &lease_inner.raw.words; - let docs_words = &lease_inner.raw.docs_words; - let documents = &lease_inner.raw.documents; - - // 1. remove the previous documents match indexes - let mut documents_deletion = DocumentsDeletion::new(self.inner); - documents_deletion.extend(self.document_ids); - documents_deletion.finalize()?; - - // 2. insert new document attributes in the database - for ((id, attr), value) in self.document_store.into_inner() { - documents.set_document_field(id, attr, value)?; - } - - let indexed = self.indexer.build(); - let mut delta_words_builder = SetBuilder::memory(); - - for (word, delta_set) in indexed.words_doc_indexes { - delta_words_builder.insert(&word).unwrap(); - - let set = match words.doc_indexes(&word)? { - Some(set) => Union::new(&set, &delta_set).into_set_buf(), - None => delta_set, - }; - - words.set_doc_indexes(&word, &set)?; - } - - for (id, words) in indexed.docs_words { - docs_words.set_doc_words(id, &words)?; - } - - let delta_words = delta_words_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap(); - - let words = match main.words_set()? { - Some(words) => { - let op = OpBuilder::new() - .add(words.stream()) - .add(delta_words.stream()) - .r#union(); - - let mut words_builder = SetBuilder::memory(); - words_builder.extend_stream(op).unwrap(); - words_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap() - }, - None => delta_words, - }; - - main.set_words_set(&words)?; - main.set_ranked_map(&self.ranked_map)?; - - // update the "consistent" view of the Index - let ranked_map = self.ranked_map; - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - - let inner = InnerIndex { words, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); - - Ok(()) - } -} - -pub struct DocumentsDeletion<'a> { - inner: &'a Index, - documents: Vec, -} - -impl<'a> DocumentsDeletion<'a> { - fn new(inner: &'a Index) -> DocumentsDeletion { - DocumentsDeletion { inner, documents: Vec::new() } - } - - pub fn delete_document(&mut self, id: DocumentId) { - self.documents.push(id); - } - - pub fn finalize(mut self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let main = &lease_inner.raw.main; - let docs_words = &lease_inner.raw.docs_words; - let words = &lease_inner.raw.words; - let documents = &lease_inner.raw.documents; - - let idset = { - self.documents.sort_unstable(); - self.documents.dedup(); - SetBuf::new_unchecked(self.documents) - }; - - let mut words_document_ids = HashMap::new(); - for id in idset.into_vec() { - if let Some(words) = docs_words.doc_words(id)? { - let mut stream = words.stream(); - while let Some(word) = stream.next() { - let word = word.to_vec(); - words_document_ids.entry(word).or_insert_with(Vec::new).push(id); - } - } - } - - let mut removed_words = BTreeSet::new(); - for (word, mut document_ids) in words_document_ids { - document_ids.sort_unstable(); - document_ids.dedup(); - let document_ids = SetBuf::new_unchecked(document_ids); - - if let Some(doc_indexes) = words.doc_indexes(&word)? { - let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id); - let doc_indexes = op.into_set_buf(); - - if !doc_indexes.is_empty() { - words.set_doc_indexes(&word, &doc_indexes)?; - } else { - words.del_doc_indexes(&word)?; - removed_words.insert(word); - } - } - - for id in document_ids.into_vec() { - documents.del_all_document_fields(id)?; - docs_words.del_doc_words(id)?; - } - } - - let removed_words = fst::Set::from_iter(removed_words).unwrap(); - let words = match main.words_set()? { - Some(words_set) => { - let op = fst::set::OpBuilder::new() - .add(words_set.stream()) - .add(removed_words.stream()) - .difference(); - - let mut words_builder = SetBuilder::memory(); - words_builder.extend_stream(op).unwrap(); - words_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap() - }, - None => fst::Set::default(), - }; - - main.set_words_set(&words)?; - - // TODO must update the ranked_map too! - - // update the "consistent" view of the Index - let ranked_map = lease_inner.ranked_map.clone(); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - - let inner = InnerIndex { words, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); - - Ok(()) - } -} - -impl<'a> Extend for DocumentsDeletion<'a> { - fn extend>(&mut self, iter: T) { - self.documents.extend(iter) - } -} diff --git a/meilidb-data/src/database/docs_words_index.rs b/meilidb-data/src/database/docs_words_index.rs new file mode 100644 index 000000000..6b7de15a2 --- /dev/null +++ b/meilidb-data/src/database/docs_words_index.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; +use meilidb_core::DocumentId; +use super::Error; + +#[derive(Clone)] +pub struct DocsWordsIndex(pub Arc); + +impl DocsWordsIndex { + pub fn doc_words(&self, id: DocumentId) -> Result, Error> { + let key = id.0.to_be_bytes(); + match self.0.get(key)? { + Some(bytes) => { + let len = bytes.len(); + let value = bytes.into(); + let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; + Ok(Some(fst::Set::from(fst))) + }, + None => Ok(None) + } + } + + pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> { + let key = id.0.to_be_bytes(); + self.0.set(key, words.as_fst().as_bytes())?; + Ok(()) + } + + pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { + let key = id.0.to_be_bytes(); + self.0.del(key)?; + Ok(()) + } +} diff --git a/meilidb-data/src/database/documents_addition.rs b/meilidb-data/src/database/documents_addition.rs new file mode 100644 index 000000000..b22c06da6 --- /dev/null +++ b/meilidb-data/src/database/documents_addition.rs @@ -0,0 +1,131 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use meilidb_core::DocumentId; +use fst::{SetBuilder, set::OpBuilder}; +use sdset::{SetOperation, duo::Union}; + +use crate::indexer::Indexer; +use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; +use crate::RankedMap; + +use super::{Error, Index, InnerIndex, DocumentsDeletion}; + +pub struct DocumentsAddition<'a> { + inner: &'a Index, + document_ids: HashSet, + document_store: RamDocumentStore, + indexer: Indexer, + ranked_map: RankedMap, +} + +impl<'a> DocumentsAddition<'a> { + pub(crate) fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> { + DocumentsAddition { + inner, + document_ids: HashSet::new(), + document_store: RamDocumentStore::new(), + indexer: Indexer::new(), + ranked_map, + } + } + + pub fn update_document(&mut self, document: D) -> Result<(), Error> + where D: serde::Serialize, + { + let schema = &self.inner.lease_inner().schema; + let identifier = schema.identifier_name(); + + let document_id = match extract_document_id(identifier, &document)? { + Some(id) => id, + None => return Err(Error::MissingDocumentId), + }; + + // 1. store the document id for future deletion + self.document_ids.insert(document_id); + + // 2. index the document fields in ram stores + let serializer = Serializer { + schema, + document_store: &mut self.document_store, + indexer: &mut self.indexer, + ranked_map: &mut self.ranked_map, + document_id, + }; + + document.serialize(serializer)?; + + Ok(()) + } + + pub fn finalize(self) -> Result<(), Error> { + let lease_inner = self.inner.lease_inner(); + let main = &lease_inner.raw.main; + let words = &lease_inner.raw.words; + let docs_words = &lease_inner.raw.docs_words; + let documents = &lease_inner.raw.documents; + + // 1. remove the previous documents match indexes + let mut documents_deletion = DocumentsDeletion::new(self.inner); + documents_deletion.extend(self.document_ids); + documents_deletion.finalize()?; + + // 2. insert new document attributes in the database + for ((id, attr), value) in self.document_store.into_inner() { + documents.set_document_field(id, attr, value)?; + } + + let indexed = self.indexer.build(); + let mut delta_words_builder = SetBuilder::memory(); + + for (word, delta_set) in indexed.words_doc_indexes { + delta_words_builder.insert(&word).unwrap(); + + let set = match words.doc_indexes(&word)? { + Some(set) => Union::new(&set, &delta_set).into_set_buf(), + None => delta_set, + }; + + words.set_doc_indexes(&word, &set)?; + } + + for (id, words) in indexed.docs_words { + docs_words.set_doc_words(id, &words)?; + } + + let delta_words = delta_words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + let words = match main.words_set()? { + Some(words) => { + let op = OpBuilder::new() + .add(words.stream()) + .add(delta_words.stream()) + .r#union(); + + let mut words_builder = SetBuilder::memory(); + words_builder.extend_stream(op).unwrap(); + words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => delta_words, + }; + + main.set_words_set(&words)?; + main.set_ranked_map(&self.ranked_map)?; + + // update the "consistent" view of the Index + let ranked_map = self.ranked_map; + let schema = lease_inner.schema.clone(); + let raw = lease_inner.raw.clone(); + + let inner = InnerIndex { words, schema, ranked_map, raw }; + self.inner.0.store(Arc::new(inner)); + + Ok(()) + } +} diff --git a/meilidb-data/src/database/documents_deletion.rs b/meilidb-data/src/database/documents_deletion.rs new file mode 100644 index 000000000..5729e3461 --- /dev/null +++ b/meilidb-data/src/database/documents_deletion.rs @@ -0,0 +1,110 @@ +use std::collections::{HashMap, BTreeSet}; +use std::sync::Arc; + +use sdset::{SetBuf, SetOperation, duo::DifferenceByKey}; +use fst::{SetBuilder, Streamer}; +use meilidb_core::DocumentId; + +use super::{Index, Error, InnerIndex}; + +pub struct DocumentsDeletion<'a> { + inner: &'a Index, + documents: Vec, +} + +impl<'a> DocumentsDeletion<'a> { + pub(crate) fn new(inner: &'a Index) -> DocumentsDeletion { + DocumentsDeletion { inner, documents: Vec::new() } + } + + pub fn delete_document(&mut self, id: DocumentId) { + self.documents.push(id); + } + + pub fn finalize(mut self) -> Result<(), Error> { + let lease_inner = self.inner.lease_inner(); + let main = &lease_inner.raw.main; + let docs_words = &lease_inner.raw.docs_words; + let words = &lease_inner.raw.words; + let documents = &lease_inner.raw.documents; + + let idset = { + self.documents.sort_unstable(); + self.documents.dedup(); + SetBuf::new_unchecked(self.documents) + }; + + let mut words_document_ids = HashMap::new(); + for id in idset.into_vec() { + if let Some(words) = docs_words.doc_words(id)? { + let mut stream = words.stream(); + while let Some(word) = stream.next() { + let word = word.to_vec(); + words_document_ids.entry(word).or_insert_with(Vec::new).push(id); + } + } + } + + let mut removed_words = BTreeSet::new(); + for (word, mut document_ids) in words_document_ids { + document_ids.sort_unstable(); + document_ids.dedup(); + let document_ids = SetBuf::new_unchecked(document_ids); + + if let Some(doc_indexes) = words.doc_indexes(&word)? { + let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id); + let doc_indexes = op.into_set_buf(); + + if !doc_indexes.is_empty() { + words.set_doc_indexes(&word, &doc_indexes)?; + } else { + words.del_doc_indexes(&word)?; + removed_words.insert(word); + } + } + + for id in document_ids.into_vec() { + documents.del_all_document_fields(id)?; + docs_words.del_doc_words(id)?; + } + } + + let removed_words = fst::Set::from_iter(removed_words).unwrap(); + let words = match main.words_set()? { + Some(words_set) => { + let op = fst::set::OpBuilder::new() + .add(words_set.stream()) + .add(removed_words.stream()) + .difference(); + + let mut words_builder = SetBuilder::memory(); + words_builder.extend_stream(op).unwrap(); + words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => fst::Set::default(), + }; + + main.set_words_set(&words)?; + + // TODO must update the ranked_map too! + + // update the "consistent" view of the Index + let ranked_map = lease_inner.ranked_map.clone(); + let schema = lease_inner.schema.clone(); + let raw = lease_inner.raw.clone(); + + let inner = InnerIndex { words, schema, ranked_map, raw }; + self.inner.0.store(Arc::new(inner)); + + Ok(()) + } +} + +impl<'a> Extend for DocumentsDeletion<'a> { + fn extend>(&mut self, iter: T) { + self.documents.extend(iter) + } +} diff --git a/meilidb-data/src/database/documents_index.rs b/meilidb-data/src/database/documents_index.rs new file mode 100644 index 000000000..36866a638 --- /dev/null +++ b/meilidb-data/src/database/documents_index.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; +use std::convert::TryInto; + +use meilidb_core::DocumentId; +use sled::IVec; + +use crate::document_attr_key::DocumentAttrKey; +use crate::schema::SchemaAttr; + +#[derive(Clone)] +pub struct DocumentsIndex(pub(crate) Arc); + +impl DocumentsIndex { + pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.get(key) + } + + pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> sled::Result<()> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.set(key, value)?; + Ok(()) + } + + pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.del(key)?; + Ok(()) + } + + pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { + let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); + let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); + let document_attrs = self.0.range(start..=end).keys(); + + for key in document_attrs { + self.0.del(key?)?; + } + + Ok(()) + } + + pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { + let start = DocumentAttrKey::new(id, SchemaAttr::min()); + let start = start.to_be_bytes(); + + let end = DocumentAttrKey::new(id, SchemaAttr::max()); + let end = end.to_be_bytes(); + + DocumentFieldsIter(self.0.range(start..=end)) + } +} + +pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); + +impl<'a> Iterator for DocumentFieldsIter<'a> { + type Item = sled::Result<(SchemaAttr, IVec)>; + + fn next(&mut self) -> Option { + match self.0.next() { + Some(Ok((key, value))) => { + let slice: &[u8] = key.as_ref(); + let array = slice.try_into().unwrap(); + let key = DocumentAttrKey::from_be_bytes(array); + Some(Ok((key.attribute, value))) + }, + Some(Err(e)) => Some(Err(e)), + None => None, + } + } +} diff --git a/meilidb-data/src/database/error.rs b/meilidb-data/src/database/error.rs new file mode 100644 index 000000000..3e1b48235 --- /dev/null +++ b/meilidb-data/src/database/error.rs @@ -0,0 +1,57 @@ +use std::{error, fmt}; +use crate::serde::SerializerError; + +#[derive(Debug)] +pub enum Error { + SchemaDiffer, + SchemaMissing, + WordIndexMissing, + MissingDocumentId, + SledError(sled::Error), + FstError(fst::Error), + BincodeError(bincode::Error), + SerializerError(SerializerError), +} + +impl From for Error { + fn from(error: sled::Error) -> Error { + Error::SledError(error) + } +} + +impl From for Error { + fn from(error: fst::Error) -> Error { + Error::FstError(error) + } +} + +impl From for Error { + fn from(error: bincode::Error) -> Error { + Error::BincodeError(error) + } +} + +impl From for Error { + fn from(error: SerializerError) -> Error { + Error::SerializerError(error) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::Error::*; + match self { + SchemaDiffer => write!(f, "schemas differ"), + SchemaMissing => write!(f, "this index does not have a schema"), + WordIndexMissing => write!(f, "this index does not have a word index"), + MissingDocumentId => write!(f, "document id is missing"), + SledError(e) => write!(f, "sled error; {}", e), + FstError(e) => write!(f, "fst error; {}", e), + BincodeError(e) => write!(f, "bincode error; {}", e), + SerializerError(e) => write!(f, "serializer error; {}", e), + } + } +} + +impl error::Error for Error { } + diff --git a/meilidb-data/src/database/index.rs b/meilidb-data/src/database/index.rs new file mode 100644 index 000000000..d4ed26636 --- /dev/null +++ b/meilidb-data/src/database/index.rs @@ -0,0 +1,121 @@ +use sdset::SetBuf; +use std::collections::HashSet; +use std::sync::Arc; + +use arc_swap::{ArcSwap, Lease}; +use meilidb_core::criterion::Criteria; +use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; +use rmp_serde::decode::Error as RmpError; +use serde::de; + +use crate::ranked_map::RankedMap; +use crate::schema::Schema; +use crate::serde::Deserializer; + +use super::{Error, RawIndex, DocumentsAddition, DocumentsDeletion}; + +#[derive(Clone)] +pub struct Index(pub(crate) ArcSwap); + +pub struct InnerIndex { + pub words: fst::Set, + pub schema: Schema, + pub ranked_map: RankedMap, + pub raw: RawIndex, // TODO this will be a snapshot in the future +} + +impl Index { + pub(crate) fn from_raw(raw: RawIndex) -> Result { + let words = match raw.main.words_set()? { + Some(words) => words, + None => fst::Set::default(), + }; + + let schema = match raw.main.schema()? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + + let ranked_map = match raw.main.ranked_map()? { + Some(map) => map, + None => RankedMap::default(), + }; + + let inner = InnerIndex { words, schema, ranked_map, raw }; + let index = Index(ArcSwap::new(Arc::new(inner))); + + Ok(index) + } + + pub fn query_builder(&self) -> QueryBuilder { + let lease = IndexLease(self.0.lease()); + QueryBuilder::new(lease) + } + + pub fn query_builder_with_criteria<'c>( + &self, + criteria: Criteria<'c>, + ) -> QueryBuilder<'c, IndexLease> + { + let lease = IndexLease(self.0.lease()); + QueryBuilder::with_criteria(lease, criteria) + } + + pub fn lease_inner(&self) -> Lease> { + self.0.lease() + } + + pub fn schema(&self) -> Schema { + self.0.lease().schema.clone() + } + + pub fn documents_addition(&self) -> DocumentsAddition { + let ranked_map = self.0.lease().ranked_map.clone(); + DocumentsAddition::new(self, ranked_map) + } + + pub fn documents_deletion(&self) -> DocumentsDeletion { + DocumentsDeletion::new(self) + } + + pub fn document( + &self, + fields: Option<&HashSet<&str>>, + id: DocumentId, + ) -> Result, RmpError> + where T: de::DeserializeOwned, + { + let schema = &self.lease_inner().schema; + let fields = fields + .map(|fields| { + fields + .into_iter() + .filter_map(|name| schema.attribute(name)) + .collect() + }); + + let mut deserializer = Deserializer { + document_id: id, + index: &self, + fields: fields.as_ref(), + }; + + // TODO: currently we return an error if all document fields are missing, + // returning None would have been better + T::deserialize(&mut deserializer).map(Some) + } +} + +pub struct IndexLease(Lease>); + +impl Store for IndexLease { + type Error = Error; + + fn words(&self) -> Result<&fst::Set, Self::Error> { + Ok(&self.0.words) + } + + fn word_indexes(&self, word: &[u8]) -> Result>, Self::Error> { + Ok(self.0.raw.words.doc_indexes(word)?) + } +} diff --git a/meilidb-data/src/database/main_index.rs b/meilidb-data/src/database/main_index.rs new file mode 100644 index 000000000..f8d389774 --- /dev/null +++ b/meilidb-data/src/database/main_index.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use crate::ranked_map::RankedMap; +use crate::schema::Schema; + +use super::Error; + +#[derive(Clone)] +pub struct MainIndex(pub(crate) Arc); + +impl MainIndex { + pub fn schema(&self) -> Result, Error> { + match self.0.get("schema")? { + Some(bytes) => { + let schema = Schema::read_from_bin(bytes.as_ref())?; + Ok(Some(schema)) + }, + None => Ok(None), + } + } + + pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { + let mut bytes = Vec::new(); + schema.write_to_bin(&mut bytes)?; + self.0.set("schema", bytes)?; + Ok(()) + } + + pub fn words_set(&self) -> Result, Error> { + match self.0.get("words")? { + Some(bytes) => { + let len = bytes.len(); + let value = bytes.into(); + let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; + Ok(Some(fst::Set::from(fst))) + }, + None => Ok(None), + } + } + + pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { + self.0.set("words", value.as_fst().as_bytes())?; + Ok(()) + } + + pub fn ranked_map(&self) -> Result, Error> { + match self.0.get("ranked-map")? { + Some(bytes) => { + let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; + Ok(Some(ranked_map)) + }, + None => Ok(None), + } + } + + pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> { + let mut bytes = Vec::new(); + value.write_to_bin(&mut bytes)?; + self.0.set("ranked_map", bytes)?; + Ok(()) + } +} diff --git a/meilidb-data/src/database/mod.rs b/meilidb-data/src/database/mod.rs new file mode 100644 index 000000000..5f415fb1e --- /dev/null +++ b/meilidb-data/src/database/mod.rs @@ -0,0 +1,161 @@ +use std::collections::hash_map::Entry; +use std::collections::{HashSet, HashMap}; +use std::path::Path; +use std::sync::{Arc, RwLock}; + +use crate::Schema; + +mod docs_words_index; +mod documents_addition; +mod documents_deletion; +mod documents_index; +mod error; +mod index; +mod main_index; +mod raw_index; +mod words_index; + +pub use self::error::Error; +pub use self::index::Index; + +use self::docs_words_index::DocsWordsIndex; +use self::documents_addition::DocumentsAddition; +use self::documents_deletion::DocumentsDeletion; +use self::documents_index::DocumentsIndex; +use self::index::InnerIndex; +use self::main_index::MainIndex; +use self::raw_index::RawIndex; +use self::words_index::WordsIndex; + +pub struct Database { + cache: RwLock>>, + inner: sled::Db, +} + +impl Database { + pub fn start_default>(path: P) -> Result { + let cache = RwLock::new(HashMap::new()); + let inner = sled::Db::start_default(path)?; + Ok(Database { cache, inner }) + } + + pub fn indexes(&self) -> Result>, Error> { + let bytes = match self.inner.get("indexes")? { + Some(bytes) => bytes, + None => return Ok(None), + }; + + let indexes = bincode::deserialize(&bytes)?; + Ok(Some(indexes)) + } + + fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { + let bytes = bincode::serialize(value)?; + self.inner.set("indexes", bytes)?; + Ok(()) + } + + pub fn open_index(&self, name: &str) -> Result>, Error> { + { + let cache = self.cache.read().unwrap(); + if let Some(index) = cache.get(name).cloned() { + return Ok(Some(index)) + } + } + + let mut cache = self.cache.write().unwrap(); + let index = match cache.entry(name.to_string()) { + Entry::Occupied(occupied) => { + occupied.get().clone() + }, + Entry::Vacant(vacant) => { + if !self.indexes()?.map_or(false, |x| x.contains(name)) { + return Ok(None) + } + + let main = { + let tree = self.inner.open_tree(name)?; + MainIndex(tree) + }; + + let words = { + let tree_name = format!("{}-words", name); + let tree = self.inner.open_tree(tree_name)?; + WordsIndex(tree) + }; + + let docs_words = { + let tree_name = format!("{}-docs-words", name); + let tree = self.inner.open_tree(tree_name)?; + DocsWordsIndex(tree) + }; + + let documents = { + let tree_name = format!("{}-documents", name); + let tree = self.inner.open_tree(tree_name)?; + DocumentsIndex(tree) + }; + + let raw_index = RawIndex { main, words, docs_words, documents }; + let index = Index::from_raw(raw_index)?; + + vacant.insert(Arc::new(index)).clone() + }, + }; + + Ok(Some(index)) + } + + pub fn create_index(&self, name: &str, schema: Schema) -> Result, Error> { + let mut cache = self.cache.write().unwrap(); + + let index = match cache.entry(name.to_string()) { + Entry::Occupied(occupied) => { + occupied.get().clone() + }, + Entry::Vacant(vacant) => { + let main = { + let tree = self.inner.open_tree(name)?; + MainIndex(tree) + }; + + if let Some(prev_schema) = main.schema()? { + if prev_schema != schema { + return Err(Error::SchemaDiffer) + } + } + + main.set_schema(&schema)?; + + let words = { + let tree_name = format!("{}-words", name); + let tree = self.inner.open_tree(tree_name)?; + WordsIndex(tree) + }; + + let docs_words = { + let tree_name = format!("{}-docs-words", name); + let tree = self.inner.open_tree(tree_name)?; + DocsWordsIndex(tree) + }; + + let documents = { + let tree_name = format!("{}-documents", name); + let tree = self.inner.open_tree(tree_name)?; + DocumentsIndex(tree) + }; + + let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); + indexes.insert(name.to_string()); + self.set_indexes(&indexes)?; + + let raw_index = RawIndex { main, words, docs_words, documents }; + let index = Index::from_raw(raw_index)?; + + vacant.insert(Arc::new(index)).clone() + }, + }; + + Ok(index) + } +} diff --git a/meilidb-data/src/database/raw_index.rs b/meilidb-data/src/database/raw_index.rs new file mode 100644 index 000000000..0b2a56dbd --- /dev/null +++ b/meilidb-data/src/database/raw_index.rs @@ -0,0 +1,9 @@ +use super::{MainIndex, WordsIndex, DocsWordsIndex, DocumentsIndex}; + +#[derive(Clone)] +pub struct RawIndex { + pub main: MainIndex, + pub words: WordsIndex, + pub docs_words: DocsWordsIndex, + pub documents: DocumentsIndex, +} diff --git a/meilidb-data/src/database/words_index.rs b/meilidb-data/src/database/words_index.rs new file mode 100644 index 000000000..398e73900 --- /dev/null +++ b/meilidb-data/src/database/words_index.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use meilidb_core::DocIndex; +use sdset::{Set, SetBuf}; +use zerocopy::{LayoutVerified, AsBytes}; + +#[derive(Clone)] +pub struct WordsIndex(pub(crate) Arc); + +impl WordsIndex { + pub fn doc_indexes(&self, word: &[u8]) -> sled::Result>> { + match self.0.get(word)? { + Some(bytes) => { + let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout"); + let slice = layout.into_slice(); + let setbuf = SetBuf::new_unchecked(slice.to_vec()); + Ok(Some(setbuf)) + }, + None => Ok(None), + } + } + + pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> sled::Result<()> { + self.0.set(word, set.as_bytes())?; + Ok(()) + } + + pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { + self.0.del(word)?; + Ok(()) + } +}