feat: Introduce the DocumentsAddition type

This commit is contained in:
Clément Renault 2019-05-09 14:23:39 +02:00
parent 42e39f6eb5
commit e67ada8823
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
5 changed files with 289 additions and 232 deletions

View File

@ -87,8 +87,7 @@ where S: Store,
{
fn query_all(&self, query: &str) -> Result<Vec<RawDocument>, S::Error> {
let automatons = generate_automatons(query);
let words = self.store.words()?;
let words = words.as_fst();
let words = self.store.words()?.as_fst();
let mut stream = {
let mut op_builder = fst::raw::OpBuilder::new();

View File

@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::collections::{HashSet, HashMap};
use std::collections::hash_map::Entry;
use std::convert::TryInto;
use std::io::{self, Cursor, BufRead};
use std::iter::FromIterator;
@ -8,21 +9,19 @@ use std::{error, fmt};
use arc_swap::{ArcSwap, Lease};
use byteorder::{ReadBytesExt, BigEndian};
use hashbrown::HashMap;
use meilidb_core::{criterion::Criteria, QueryBuilder, DocumentId, DocIndex};
use meilidb_core::{criterion::Criteria, QueryBuilder, Store, DocumentId, DocIndex};
use rmp_serde::decode::{Error as RmpError};
use sdset::SetBuf;
use sdset::{Set, SetBuf, SetOperation, duo::Union};
use serde::de;
use sled::IVec;
use zerocopy::{AsBytes, LayoutVerified};
use fst::{SetBuilder, set::OpBuilder};
use crate::{Schema, SchemaAttr, RankedMap};
use crate::serde::{extract_document_id, Serializer, Deserializer, SerializerError};
use crate::indexer::{Indexer, WordIndexTree};
use crate::indexer::Indexer;
use crate::document_attr_key::DocumentAttrKey;
pub type WordIndex = meilidb_core::Index<WordIndexTree>;
#[derive(Debug)]
pub enum Error {
SchemaDiffer,
@ -97,11 +96,18 @@ impl Database {
}
}
let indexes: HashSet<&str> = match self.inner.get("indexes")? {
Some(bytes) => bincode::deserialize(&bytes)?,
let mut cache = self.cache.write().unwrap();
let index = match cache.entry(name.to_string()) {
Entry::Occupied(occupied) => {
occupied.get().clone()
},
Entry::Vacant(vacant) => {
let bytes = match self.inner.get("indexes")? {
Some(bytes) => bytes,
None => return Ok(None),
};
let indexes: HashSet<&str> = bincode::deserialize(&bytes)?;
if indexes.get(name).is_none() {
return Ok(None);
}
@ -124,27 +130,28 @@ impl Database {
};
let raw_index = RawIndex { main, words, documents };
let index = Arc::new(Index(raw_index));
let index = Index::from_raw(raw_index)?;
{
let cache = self.cache.write().unwrap();
cache.insert(name.to_string(), index.clone());
}
vacant.insert(Arc::new(index)).clone()
},
};
Ok(Some(index))
}
pub fn create_index(&self, name: &str, schema: Schema) -> Result<Arc<Index>, Error> {
{
let cache = self.cache.read().unwrap();
if let Some(index) = cache.get(name).cloned() {
// TODO check if schemas are the same
return Ok(index)
}
}
let mut cache = self.cache.write().unwrap();
let mut indexes: HashSet<&str> = match self.inner.get("indexes")? {
Some(bytes) => bincode::deserialize(&bytes)?,
let index = match cache.entry(name.to_string()) {
Entry::Occupied(occupied) => {
occupied.get().clone()
},
Entry::Vacant(vacant) => {
let bytes = self.inner.get("indexes")?;
let bytes = bytes.as_ref();
let mut indexes: HashSet<&str> = match bytes {
Some(bytes) => bincode::deserialize(bytes)?,
None => HashSet::new(),
};
@ -174,29 +181,28 @@ impl Database {
};
let raw_index = RawIndex { main, words, documents };
let index = Arc::new(Index(raw_index));
let index = Index::from_raw(raw_index)?;
{
let cache = self.cache.write().unwrap();
cache.insert(name.to_string(), index.clone());
}
vacant.insert(Arc::new(index)).clone()
},
};
Ok(index)
}
}
#[derive(Clone)]
struct RawIndex {
main: MainIndex,
words: WordsIndex,
documents: DocumentsIndex,
pub struct RawIndex {
pub main: MainIndex,
pub words: WordsIndex,
pub documents: DocumentsIndex,
}
#[derive(Clone)]
struct MainIndex(Arc<sled::Tree>);
pub struct MainIndex(Arc<sled::Tree>);
impl MainIndex {
fn schema(&self) -> Result<Option<Schema>, Error> {
pub fn schema(&self) -> Result<Option<Schema>, Error> {
match self.0.get("schema")? {
Some(bytes) => {
let schema = Schema::read_from_bin(bytes.as_ref())?;
@ -206,7 +212,7 @@ impl MainIndex {
}
}
fn words_set(&self) -> Result<Option<fst::Set>, Error> {
pub fn words_set(&self) -> Result<Option<fst::Set>, Error> {
match self.0.get("words")? {
Some(bytes) => {
let len = bytes.len();
@ -218,7 +224,12 @@ impl MainIndex {
}
}
fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> {
self.0.set("words", value.as_fst().as_bytes())?;
Ok(())
}
pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
match self.0.get("ranked-map")? {
Some(bytes) => {
let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?;
@ -227,13 +238,20 @@ impl MainIndex {
None => Ok(None),
}
}
pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> {
let mut bytes = Vec::new();
value.write_to_bin(&mut bytes)?;
self.0.set("ranked_map", bytes)?;
Ok(())
}
}
#[derive(Clone)]
struct WordsIndex(Arc<sled::Tree>);
pub struct WordsIndex(Arc<sled::Tree>);
impl WordsIndex {
fn doc_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, Error> {
pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> {
match self.0.get(word)? {
Some(bytes) => {
let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout");
@ -244,18 +262,33 @@ impl WordsIndex {
None => Ok(None),
}
}
pub fn set_doc_indexes(&self, word: &[u8], set: Option<&Set<DocIndex>>) -> sled::Result<()> {
match set {
Some(set) => self.0.set(word, set.as_bytes())?,
None => self.0.del(word)?,
};
Ok(())
}
}
#[derive(Clone)]
struct DocumentsIndex(Arc<sled::Tree>);
pub struct DocumentsIndex(Arc<sled::Tree>);
impl DocumentsIndex {
fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<Option<IVec>, Error> {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<IVec>> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.get(key).map_err(Into::into)
self.0.get(key)
}
fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.set(key, value)?;
Ok(())
}
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
let start = DocumentAttrKey::new(id, SchemaAttr::min());
let start = start.to_be_bytes();
@ -269,7 +302,7 @@ impl DocumentsIndex {
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>);
impl<'a> Iterator for DocumentFieldsIter<'a> {
type Item = Result<(SchemaAttr, IVec), Error>;
type Item = sled::Result<(SchemaAttr, IVec)>;
fn next(&mut self) -> Option<Self::Item> {
match self.0.next() {
@ -279,51 +312,72 @@ impl<'a> Iterator for DocumentFieldsIter<'a> {
let key = DocumentAttrKey::from_be_bytes(array);
Some(Ok((key.attribute, value)))
},
Some(Err(e)) => Some(Err(Error::SledError(e))),
Some(Err(e)) => Some(Err(e)),
None => None,
}
}
}
#[derive(Clone)]
pub struct Index(RawIndex);
pub struct Index(ArcSwap<InnerIndex>);
pub struct InnerIndex {
pub words: fst::Set,
pub schema: Schema,
pub ranked_map: RankedMap,
pub raw: RawIndex, // TODO this will be a snapshot in the future
}
impl Index {
pub fn query_builder(&self) -> QueryBuilder<Lease<Arc<WordIndex>>> {
let word_index = self.word_index();
QueryBuilder::new(word_index)
fn from_raw(raw: RawIndex) -> Result<Index, Error> {
let words = match raw.main.words_set()? {
Some(words) => words,
None => fst::Set::default(),
};
let schema = match raw.main.schema()? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let ranked_map = match raw.main.ranked_map()? {
Some(map) => map,
None => RankedMap::default(),
};
let inner = InnerIndex { words, schema, ranked_map, raw };
let index = Index(ArcSwap::new(Arc::new(inner)));
Ok(index)
}
pub fn query_builder(&self) -> QueryBuilder<IndexLease> {
let lease = IndexLease(self.0.lease());
QueryBuilder::new(lease)
}
pub fn query_builder_with_criteria<'c>(
&self,
criteria: Criteria<'c>,
) -> QueryBuilder<'c, Lease<Arc<WordIndex>>>
) -> QueryBuilder<'c, IndexLease>
{
let word_index = self.word_index();
QueryBuilder::with_criteria(word_index, criteria)
let lease = IndexLease(self.0.lease());
QueryBuilder::with_criteria(lease, criteria)
}
pub fn schema(&self) -> &Schema {
self.0.schema()
}
pub fn word_index(&self) -> Lease<Arc<WordIndex>> {
self.0.word_index()
}
pub fn ranked_map(&self) -> Lease<Arc<RankedMap>> {
self.0.ranked_map()
pub fn lease_inner(&self) -> Lease<Arc<InnerIndex>> {
self.0.lease()
}
pub fn documents_addition(&self) -> DocumentsAddition {
let index = self.0.clone();
let ranked_map = self.0.ranked_map().clone();
DocumentsAddition::from_raw(index, ranked_map)
let ranked_map = self.0.lease().ranked_map.clone();
DocumentsAddition::new(self, ranked_map)
}
pub fn documents_deletion(&self) -> DocumentsDeletion {
let index = self.0.clone();
DocumentsDeletion::from_raw(index)
// let index = self.0.clone();
// DocumentsDeletion::from_raw(index)
unimplemented!()
}
pub fn document<T>(
@ -333,17 +387,18 @@ impl Index {
) -> Result<Option<T>, RmpError>
where T: de::DeserializeOwned,
{
let fields = match fields {
Some(fields) => {
let iter = fields.iter().filter_map(|n| self.0.schema().attribute(n));
Some(HashSet::from_iter(iter))
},
None => None,
};
let schema = &self.lease_inner().schema;
let fields = fields
.map(|fields| {
fields
.into_iter()
.filter_map(|name| schema.attribute(name))
.collect()
});
let mut deserializer = Deserializer {
document_id: id,
raw_index: &self.0,
index: &self,
fields: fields.as_ref(),
};
@ -353,21 +408,35 @@ impl Index {
}
}
pub struct DocumentsAddition {
inner: RawIndex,
pub struct IndexLease(Lease<Arc<InnerIndex>>);
impl Store for IndexLease {
type Error = Error;
fn words(&self) -> Result<&fst::Set, Self::Error> {
Ok(&self.0.words)
}
fn word_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, Self::Error> {
Ok(self.0.raw.words.doc_indexes(word)?)
}
}
pub struct DocumentsAddition<'a> {
inner: &'a Index,
indexer: Indexer,
ranked_map: RankedMap,
}
impl DocumentsAddition {
pub fn from_raw(inner: RawIndex, ranked_map: RankedMap) -> DocumentsAddition {
impl<'a> DocumentsAddition<'a> {
fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> {
DocumentsAddition { inner, indexer: Indexer::new(), ranked_map }
}
pub fn update_document<D>(&mut self, document: D) -> Result<(), Error>
where D: serde::Serialize,
{
let schema = self.inner.schema();
let schema = &self.inner.lease_inner().schema;
let identifier = schema.identifier_name();
let document_id = match extract_document_id(identifier, &document)? {
@ -375,6 +444,12 @@ impl DocumentsAddition {
None => return Err(Error::MissingDocumentId),
};
// 1. remove the previous document match indexes
let mut documents_deletion = DocumentsDeletion::new(self.inner);
documents_deletion.delete_document(document_id);
documents_deletion.finalize()?;
// 2. index the document fields
let serializer = Serializer {
schema,
index: &self.inner,
@ -388,30 +463,70 @@ impl DocumentsAddition {
Ok(())
}
pub fn finalize(self) -> sled::Result<()> {
pub fn finalize(self) -> Result<(), Error> {
let lease_inner = self.inner.lease_inner();
let main = &lease_inner.raw.main;
let words = &lease_inner.raw.words;
let delta_index = self.indexer.build();
let mut delta_words_builder = SetBuilder::memory();
let index = self.inner.word_index();
let new_index = index.insert_indexes(delta_index)?;
for (word, delta_set) in delta_index {
delta_words_builder.insert(&word).unwrap();
let new_index = Arc::from(new_index);
self.inner.update_word_index(new_index);
let set = match words.doc_indexes(&word)? {
Some(set) => Union::new(&set, &delta_set).into_set_buf(),
None => delta_set,
};
words.set_doc_indexes(&word, Some(&set))?;
}
let delta_words = delta_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
let words = match main.words_set()? {
Some(words) => {
let op = OpBuilder::new()
.add(words.stream())
.add(delta_words.stream())
.r#union();
let mut words_builder = SetBuilder::memory();
words_builder.extend_stream(op).unwrap();
words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap()
},
None => delta_words,
};
main.set_words_set(&words)?;
main.set_ranked_map(&self.ranked_map)?;
// update the "consistent" view of the Index
let ranked_map = self.ranked_map;
let schema = lease_inner.schema.clone();
let raw = lease_inner.raw.clone();
let inner = InnerIndex { words, schema, ranked_map, raw };
self.inner.0.store(Arc::new(inner));
Ok(())
}
}
pub struct DocumentsDeletion {
inner: RawIndex,
pub struct DocumentsDeletion<'a> {
inner: &'a Index,
documents: Vec<DocumentId>,
}
impl DocumentsDeletion {
pub fn from_raw(inner: RawIndex) -> DocumentsDeletion {
DocumentsDeletion {
inner,
documents: Vec::new(),
}
impl<'a> DocumentsDeletion<'a> {
fn new(inner: &'a Index) -> DocumentsDeletion {
DocumentsDeletion { inner, documents: Vec::new() }
}
pub fn delete_document(&mut self, id: DocumentId) {
@ -423,13 +538,16 @@ impl DocumentsDeletion {
self.documents.dedup();
let idset = SetBuf::new_unchecked(self.documents);
let index = self.inner.word_index();
let new_index = index.remove_documents(&idset)?;
let new_index = Arc::from(new_index);
// let index = self.inner.word_index();
self.inner.update_word_index(new_index);
// let new_index = index.remove_documents(&idset)?;
// let new_index = Arc::from(new_index);
Ok(())
// self.inner.update_word_index(new_index);
// Ok(())
unimplemented!("documents deletion finalize")
}
}

View File

@ -1,78 +1,13 @@
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::sync::Arc;
use deunicode::deunicode_with_tofu;
use meilidb_core::{DocumentId, DocIndex, Store};
use meilidb_tokenizer::{is_cjk, Tokenizer, SeqTokenizer, Token};
use sdset::{Set, SetBuf};
use sled::Tree;
use zerocopy::{AsBytes, LayoutVerified};
use sdset::SetBuf;
use crate::SchemaAttr;
#[derive(Clone)]
pub struct WordIndexTree(pub Arc<Tree>);
impl Store for WordIndexTree {
type Error = sled::Error;
fn get_fst(&self) -> Result<fst::Set, Self::Error> {
match self.0.get("fst")? {
Some(bytes) => {
let bytes: Arc<[u8]> = bytes.into();
let len = bytes.len();
let raw = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
Ok(fst::Set::from(raw))
},
None => Ok(fst::Set::default()),
}
}
fn set_fst(&self, set: &fst::Set) -> Result<(), Self::Error> {
let bytes = set.as_fst().to_vec();
self.0.set("fst", bytes)?;
Ok(())
}
fn get_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, Self::Error> {
let mut word_bytes = Vec::from("word-");
word_bytes.extend_from_slice(word);
match self.0.get(word_bytes)? {
Some(bytes) => {
let layout = LayoutVerified::new_slice(bytes.as_ref()).unwrap();
let slice = layout.into_slice();
let setbuf = SetBuf::new_unchecked(slice.to_vec());
Ok(Some(setbuf))
},
None => Ok(None),
}
}
fn set_indexes(&self, word: &[u8], indexes: &Set<DocIndex>) -> Result<(), Self::Error> {
let mut word_bytes = Vec::from("word-");
word_bytes.extend_from_slice(word);
let slice = indexes.as_slice();
let bytes = slice.as_bytes();
self.0.set(word_bytes, bytes)?;
Ok(())
}
fn del_indexes(&self, word: &[u8]) -> Result<(), Self::Error> {
let mut word_bytes = Vec::from("word-");
word_bytes.extend_from_slice(word);
self.0.del(word_bytes)?;
Ok(())
}
}
type Word = Vec<u8>; // TODO make it be a SmallVec
pub struct Indexer {
@ -115,6 +50,7 @@ impl Indexer {
pub fn build(self) -> BTreeMap<Word, SetBuf<DocIndex>> {
self.indexed.into_iter().map(|(word, mut indexes)| {
indexes.sort_unstable();
indexes.dedup();
(word, SetBuf::new_unchecked(indexes))
}).collect()
}

View File

@ -6,12 +6,12 @@ use rmp_serde::decode::{Deserializer as RmpDeserializer, ReadReader};
use rmp_serde::decode::{Error as RmpError};
use serde::{de, forward_to_deserialize_any};
use crate::database::RawIndex;
use crate::database::Index;
use crate::SchemaAttr;
pub struct Deserializer<'a> {
pub document_id: DocumentId,
pub raw_index: &'a RawIndex,
pub index: &'a Index,
pub fields: Option<&'a HashSet<SchemaAttr>>,
}
@ -26,15 +26,18 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
}
forward_to_deserialize_any! {
bool u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 char str string unit seq
bytes byte_buf unit_struct tuple_struct
identifier tuple ignored_any option newtype_struct enum struct
bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string
bytes byte_buf option unit unit_struct newtype_struct seq tuple
tuple_struct struct enum identifier ignored_any
}
fn deserialize_map<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: de::Visitor<'de>
{
let document_attributes = self.raw_index.get_document_fields(self.document_id);
let schema = &self.index.lease_inner().schema;
let documents = &self.index.lease_inner().raw.documents;
let document_attributes = documents.document_fields(self.document_id);
let document_attributes = document_attributes.filter_map(|result| {
match result {
Ok(value) => Some(value),
@ -45,9 +48,10 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
},
}
});
let iter = document_attributes.filter_map(|(attr, value)| {
if self.fields.map_or(true, |f| f.contains(&attr)) {
let attribute_name = self.raw_index.schema().attribute_name(attr);
let attribute_name = schema.attribute_name(attr);
Some((attribute_name, Value::new(value)))
} else {
None

View File

@ -1,7 +1,7 @@
use meilidb_core::DocumentId;
use serde::ser;
use crate::database::RawIndex;
use crate::database::Index;
use crate::ranked_map::RankedMap;
use crate::indexer::Indexer as RawIndexer;
use crate::schema::Schema;
@ -9,7 +9,7 @@ use super::{SerializerError, ConvertToString, ConvertToNumber, Indexer};
pub struct Serializer<'a> {
pub schema: &'a Schema,
pub index: &'a RawIndex,
pub index: &'a Index,
pub indexer: &'a mut RawIndexer,
pub ranked_map: &'a mut RankedMap,
pub document_id: DocumentId,
@ -171,7 +171,7 @@ impl<'a> ser::Serializer for Serializer<'a> {
pub struct MapSerializer<'a> {
schema: &'a Schema,
document_id: DocumentId,
index: &'a RawIndex,
index: &'a Index,
indexer: &'a mut RawIndexer,
ranked_map: &'a mut RankedMap,
current_key_name: Option<String>,
@ -224,7 +224,7 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> {
pub struct StructSerializer<'a> {
schema: &'a Schema,
document_id: DocumentId,
index: &'a RawIndex,
index: &'a Index,
indexer: &'a mut RawIndexer,
ranked_map: &'a mut RankedMap,
}
@ -259,7 +259,7 @@ impl<'a> ser::SerializeStruct for StructSerializer<'a> {
fn serialize_value<T: ?Sized>(
schema: &Schema,
document_id: DocumentId,
index: &RawIndex,
index: &Index,
indexer: &mut RawIndexer,
ranked_map: &mut RankedMap,
key: &str,
@ -272,7 +272,7 @@ where T: ser::Serialize,
if props.is_stored() {
let value = rmp_serde::to_vec_named(value)?;
index.set_document_attribute(document_id, attr, value)?;
index.lease_inner().raw.documents.set_document_field(document_id, attr, value)?;
}
if props.is_indexed() {