From 1f2abce7c398041c12302ac3c9e2108432a40d29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 9 May 2019 16:16:07 +0200 Subject: [PATCH] feat: Introduce the DocumentsDeletion type --- meilidb-core/src/query_builder.rs | 5 +- meilidb-data/Cargo.toml | 1 - meilidb-data/src/database.rs | 258 ++++++++++++++++++++++++------ meilidb-data/src/indexer.rs | 74 ++++++--- 4 files changed, 271 insertions(+), 67 deletions(-) diff --git a/meilidb-core/src/query_builder.rs b/meilidb-core/src/query_builder.rs index 73d0601a9..058c61ecc 100644 --- a/meilidb-core/src/query_builder.rs +++ b/meilidb-core/src/query_builder.rs @@ -107,7 +107,10 @@ where S: Store, let is_exact = distance == 0 && input.len() == automaton.query_len(); let doc_indexes = self.store.word_indexes(input)?; - let doc_indexes = doc_indexes.expect("word doc-indexes not found"); + let doc_indexes = match doc_indexes { + Some(doc_indexes) => doc_indexes, + None => continue, + }; for di in doc_indexes.as_slice() { if self.searchable_attrs.as_ref().map_or(true, |r| r.contains(&di.attribute)) { diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index c883196d4..eb0d92f55 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] arc-swap = "0.3.11" bincode = "1.1.2" -byteorder = "1.3.1" deunicode = "1.0.0" hashbrown = { version = "0.2.2", features = ["serde"] } linked-hash-map = { version = "0.5.2", features = ["serde_impl"] } diff --git a/meilidb-data/src/database.rs b/meilidb-data/src/database.rs index 20773e00d..3336738c0 100644 --- a/meilidb-data/src/database.rs +++ b/meilidb-data/src/database.rs @@ -1,25 +1,22 @@ -use std::collections::{HashSet, HashMap}; +use std::collections::{BTreeSet, HashSet, HashMap}; use std::collections::hash_map::Entry; use std::convert::TryInto; -use std::io::{self, Cursor, BufRead}; -use std::iter::FromIterator; use std::path::Path; use std::sync::{Arc, RwLock}; use std::{error, fmt}; use arc_swap::{ArcSwap, Lease}; -use byteorder::{ReadBytesExt, BigEndian}; use meilidb_core::{criterion::Criteria, QueryBuilder, Store, DocumentId, DocIndex}; use rmp_serde::decode::{Error as RmpError}; -use sdset::{Set, SetBuf, SetOperation, duo::Union}; +use sdset::{Set, SetBuf, SetOperation, duo::{Union, DifferenceByKey}}; use serde::de; use sled::IVec; use zerocopy::{AsBytes, LayoutVerified}; -use fst::{SetBuilder, set::OpBuilder}; +use fst::{SetBuilder, set::OpBuilder, Streamer}; use crate::{Schema, SchemaAttr, RankedMap}; use crate::serde::{extract_document_id, Serializer, Deserializer, SerializerError}; -use crate::indexer::Indexer; +use crate::indexer::{Indexer, Indexed}; use crate::document_attr_key::DocumentAttrKey; #[derive(Debug)] @@ -88,6 +85,22 @@ impl Database { Ok(Database { cache, inner }) } + pub fn indexes(&self) -> Result>, Error> { + let bytes = match self.inner.get("indexes")? { + Some(bytes) => bytes, + None => return Ok(None), + }; + + let indexes = bincode::deserialize(&bytes)?; + Ok(Some(indexes)) + } + + pub fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { + let bytes = bincode::serialize(value)?; + self.inner.set("indexes", bytes)?; + Ok(()) + } + pub fn open_index(&self, name: &str) -> Result>, Error> { { let cache = self.cache.read().unwrap(); @@ -102,14 +115,8 @@ impl Database { occupied.get().clone() }, Entry::Vacant(vacant) => { - let bytes = match self.inner.get("indexes")? { - Some(bytes) => bytes, - None => return Ok(None), - }; - - let indexes: HashSet<&str> = bincode::deserialize(&bytes)?; - if indexes.get(name).is_none() { - return Ok(None); + if !self.indexes()?.map_or(false, |x| !x.contains(name)) { + return Ok(None) } let main = { @@ -123,13 +130,19 @@ impl Database { WordsIndex(tree) }; + let attrs_words = { + let tree_name = format!("{}-attrs-words", name); + let tree = self.inner.open_tree(tree_name)?; + AttrsWords(tree) + }; + let documents = { let tree_name = format!("{}-documents", name); let tree = self.inner.open_tree(tree_name)?; DocumentsIndex(tree) }; - let raw_index = RawIndex { main, words, documents }; + let raw_index = RawIndex { main, words, attrs_words, documents }; let index = Index::from_raw(raw_index)?; vacant.insert(Arc::new(index)).clone() @@ -147,16 +160,6 @@ impl Database { occupied.get().clone() }, Entry::Vacant(vacant) => { - let bytes = self.inner.get("indexes")?; - let bytes = bytes.as_ref(); - - let mut indexes: HashSet<&str> = match bytes { - Some(bytes) => bincode::deserialize(bytes)?, - None => HashSet::new(), - }; - - let new_insertion = indexes.insert(name); - let main = { let tree = self.inner.open_tree(name)?; MainIndex(tree) @@ -168,19 +171,31 @@ impl Database { } } + main.set_schema(&schema)?; + let words = { let tree_name = format!("{}-words", name); let tree = self.inner.open_tree(tree_name)?; WordsIndex(tree) }; + let attrs_words = { + let tree_name = format!("{}-attrs-words", name); + let tree = self.inner.open_tree(tree_name)?; + AttrsWords(tree) + }; + let documents = { let tree_name = format!("{}-documents", name); let tree = self.inner.open_tree(tree_name)?; DocumentsIndex(tree) }; - let raw_index = RawIndex { main, words, documents }; + let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); + indexes.insert(name.to_string()); + self.set_indexes(&indexes)?; + + let raw_index = RawIndex { main, words, attrs_words, documents }; let index = Index::from_raw(raw_index)?; vacant.insert(Arc::new(index)).clone() @@ -195,6 +210,7 @@ impl Database { pub struct RawIndex { pub main: MainIndex, pub words: WordsIndex, + pub attrs_words: AttrsWords, pub documents: DocumentsIndex, } @@ -212,6 +228,13 @@ impl MainIndex { } } + pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { + let mut bytes = Vec::new(); + schema.write_to_bin(&mut bytes)?; + self.0.set("schema", bytes)?; + Ok(()) + } + pub fn words_set(&self) -> Result, Error> { match self.0.get("words")? { Some(bytes) => { @@ -263,16 +286,87 @@ impl WordsIndex { } } - pub fn set_doc_indexes(&self, word: &[u8], set: Option<&Set>) -> sled::Result<()> { - match set { - Some(set) => self.0.set(word, set.as_bytes())?, - None => self.0.del(word)?, + pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> sled::Result<()> { + self.0.set(word, set.as_bytes())?; + Ok(()) + } + + pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { + self.0.del(word)?; + Ok(()) + } +} + +#[derive(Clone)] +pub struct AttrsWords(Arc); + +impl AttrsWords { + pub fn attr_words(&self, id: DocumentId, attr: SchemaAttr) -> Result, Error> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + match self.0.get(key)? { + Some(bytes) => { + let len = bytes.len(); + let value = bytes.into(); + let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; + Ok(Some(fst::Set::from(fst))) + }, + None => Ok(None) + } + } + + pub fn attrs_words(&self, id: DocumentId) -> DocumentAttrsWordsIter { + let start = DocumentAttrKey::new(id, SchemaAttr::min()); + let start = start.to_be_bytes(); + + let end = DocumentAttrKey::new(id, SchemaAttr::max()); + let end = end.to_be_bytes(); + + DocumentAttrsWordsIter(self.0.range(start..=end)) + } + + pub fn set_attr_words( + &self, + id: DocumentId, + attr: SchemaAttr, + words: Option<&fst::Set>, + ) -> Result<(), Error> + { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + + match words { + Some(words) => self.0.set(key, words.as_fst().as_bytes())?, + None => self.0.del(key)?, }; Ok(()) } } +pub struct DocumentAttrsWordsIter<'a>(sled::Iter<'a>); + +impl<'a> Iterator for DocumentAttrsWordsIter<'a> { + type Item = sled::Result<(SchemaAttr, fst::Set)>; + + fn next(&mut self) -> Option { + match self.0.next() { + Some(Ok((key, bytes))) => { + let slice: &[u8] = key.as_ref(); + let array = slice.try_into().unwrap(); + let key = DocumentAttrKey::from_be_bytes(array); + + let len = bytes.len(); + let value = bytes.into(); + let fst = fst::raw::Fst::from_shared_bytes(value, 0, len).unwrap(); + let set = fst::Set::from(fst); + + Some(Ok((key.attribute, set))) + }, + Some(Err(e)) => Some(Err(e.into())), + None => None, + } + } +} + #[derive(Clone)] pub struct DocumentsIndex(Arc); @@ -288,6 +382,12 @@ impl DocumentsIndex { Ok(()) } + pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.del(key)?; + Ok(()) + } + pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { let start = DocumentAttrKey::new(id, SchemaAttr::min()); let start = start.to_be_bytes(); @@ -375,9 +475,7 @@ impl Index { } pub fn documents_deletion(&self) -> DocumentsDeletion { - // let index = self.0.clone(); - // DocumentsDeletion::from_raw(index) - unimplemented!() + DocumentsDeletion::new(self) } pub fn document( @@ -467,11 +565,12 @@ impl<'a> DocumentsAddition<'a> { let lease_inner = self.inner.lease_inner(); let main = &lease_inner.raw.main; let words = &lease_inner.raw.words; + let attrs_words = &lease_inner.raw.attrs_words; - let delta_index = self.indexer.build(); + let Indexed { words_doc_indexes, docs_attrs_words } = self.indexer.build(); let mut delta_words_builder = SetBuilder::memory(); - for (word, delta_set) in delta_index { + for (word, delta_set) in words_doc_indexes { delta_words_builder.insert(&word).unwrap(); let set = match words.doc_indexes(&word)? { @@ -479,7 +578,11 @@ impl<'a> DocumentsAddition<'a> { None => delta_set, }; - words.set_doc_indexes(&word, Some(&set))?; + words.set_doc_indexes(&word, &set)?; + } + + for ((id, attr), words) in docs_attrs_words { + attrs_words.set_attr_words(id, attr, Some(&words))?; } let delta_words = delta_words_builder @@ -534,20 +637,83 @@ impl<'a> DocumentsDeletion<'a> { } pub fn finalize(mut self) -> Result<(), Error> { - self.documents.sort_unstable(); - self.documents.dedup(); + let lease_inner = self.inner.lease_inner(); + let main = &lease_inner.raw.main; + let attrs_words = &lease_inner.raw.attrs_words; + let words = &lease_inner.raw.words; + let documents = &lease_inner.raw.documents; - let idset = SetBuf::new_unchecked(self.documents); + let idset = { + self.documents.sort_unstable(); + self.documents.dedup(); + SetBuf::new_unchecked(self.documents) + }; - // let index = self.inner.word_index(); + let mut words_attrs = HashMap::new(); + for id in idset.into_vec() { + for result in attrs_words.attrs_words(id) { + let (attr, words) = result?; + let mut stream = words.stream(); + while let Some(word) = stream.next() { + let word = word.to_vec(); + words_attrs.entry(word).or_insert_with(Vec::new).push((id, attr)); + } + } + } - // let new_index = index.remove_documents(&idset)?; - // let new_index = Arc::from(new_index); + let mut removed_words = BTreeSet::new(); + for (word, mut attrs) in words_attrs { + attrs.sort_unstable(); + attrs.dedup(); + let attrs = SetBuf::new_unchecked(attrs); - // self.inner.update_word_index(new_index); + if let Some(doc_indexes) = words.doc_indexes(&word)? { + let op = DifferenceByKey::new(&doc_indexes, &attrs, |d| d.document_id, |(id, _)| *id); + let doc_indexes = op.into_set_buf(); - // Ok(()) + if !doc_indexes.is_empty() { + words.set_doc_indexes(&word, &doc_indexes)?; + } else { + words.del_doc_indexes(&word)?; + removed_words.insert(word); + } + } - unimplemented!("documents deletion finalize") + for (id, attr) in attrs.into_vec() { + documents.del_document_field(id, attr)?; + } + } + + let removed_words = fst::Set::from_iter(removed_words).unwrap(); + let words = match main.words_set()? { + Some(words_set) => { + let op = fst::set::OpBuilder::new() + .add(words_set.stream()) + .add(removed_words.stream()) + .difference(); + + let mut words_builder = SetBuilder::memory(); + words_builder.extend_stream(op).unwrap(); + words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => fst::Set::default(), + }; + + main.set_words_set(&words)?; + + // TODO must update the ranked_map too! + + // update the "consistent" view of the Index + let ranked_map = lease_inner.ranked_map.clone(); + let schema = lease_inner.schema.clone(); + let raw = lease_inner.raw.clone(); + + let inner = InnerIndex { words, schema, ranked_map, raw }; + self.inner.0.store(Arc::new(inner)); + + Ok(()) } } diff --git a/meilidb-data/src/indexer.rs b/meilidb-data/src/indexer.rs index a578f52c7..1aa3095c8 100644 --- a/meilidb-data/src/indexer.rs +++ b/meilidb-data/src/indexer.rs @@ -1,8 +1,8 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use deunicode::deunicode_with_tofu; -use meilidb_core::{DocumentId, DocIndex, Store}; +use meilidb_core::{DocumentId, DocIndex}; use meilidb_tokenizer::{is_cjk, Tokenizer, SeqTokenizer, Token}; use sdset::SetBuf; @@ -12,27 +12,39 @@ type Word = Vec; // TODO make it be a SmallVec pub struct Indexer { word_limit: usize, // the maximum number of indexed words - indexed: BTreeMap>, + words_doc_indexes: BTreeMap>, + docs_attrs_words: HashMap<(DocumentId, SchemaAttr), Vec>, +} + +pub struct Indexed { + pub words_doc_indexes: BTreeMap>, + pub docs_attrs_words: HashMap<(DocumentId, SchemaAttr), fst::Set>, } impl Indexer { pub fn new() -> Indexer { - Indexer { - word_limit: 1000, - indexed: BTreeMap::new(), - } + Indexer::with_word_limit(1000) } pub fn with_word_limit(limit: usize) -> Indexer { Indexer { word_limit: limit, - indexed: BTreeMap::new(), + words_doc_indexes: BTreeMap::new(), + docs_attrs_words: HashMap::new(), } } pub fn index_text(&mut self, id: DocumentId, attr: SchemaAttr, text: &str) { for token in Tokenizer::new(text) { - let must_continue = index_token(token, id, attr, self.word_limit, &mut self.indexed); + let must_continue = index_token( + token, + id, + attr, + self.word_limit, + &mut self.words_doc_indexes, + &mut self.docs_attrs_words, + ); + if !must_continue { break } } } @@ -42,17 +54,38 @@ impl Indexer { { let iter = iter.into_iter(); for token in SeqTokenizer::new(iter) { - let must_continue = index_token(token, id, attr, self.word_limit, &mut self.indexed); + let must_continue = index_token( + token, + id, + attr, + self.word_limit, + &mut self.words_doc_indexes, + &mut self.docs_attrs_words, + ); + if !must_continue { break } } } - pub fn build(self) -> BTreeMap> { - self.indexed.into_iter().map(|(word, mut indexes)| { - indexes.sort_unstable(); - indexes.dedup(); - (word, SetBuf::new_unchecked(indexes)) - }).collect() + pub fn build(self) -> Indexed { + let words_doc_indexes = self.words_doc_indexes + .into_iter() + .map(|(word, mut indexes)| { + indexes.sort_unstable(); + indexes.dedup(); + (word, SetBuf::new_unchecked(indexes)) + }).collect(); + + let docs_attrs_words = self.docs_attrs_words + .into_iter() + .map(|((id, attr), mut words)| { + words.sort_unstable(); + words.dedup(); + ((id, attr), fst::Set::from_iter(words).unwrap()) + }) + .collect(); + + Indexed { words_doc_indexes, docs_attrs_words } } } @@ -61,7 +94,8 @@ fn index_token( id: DocumentId, attr: SchemaAttr, word_limit: usize, - indexed: &mut BTreeMap>, + words_doc_indexes: &mut BTreeMap>, + docs_attrs_words: &mut HashMap<(DocumentId, SchemaAttr), Vec>, ) -> bool { if token.word_index >= word_limit { return false } @@ -71,7 +105,8 @@ fn index_token( match token_to_docindex(id, attr, token) { Some(docindex) => { let word = Vec::from(token.word); - indexed.entry(word).or_insert_with(Vec::new).push(docindex); + words_doc_indexes.entry(word.clone()).or_insert_with(Vec::new).push(docindex); + docs_attrs_words.entry((id, attr)).or_insert_with(Vec::new).push(word); }, None => return false, } @@ -83,7 +118,8 @@ fn index_token( match token_to_docindex(id, attr, token) { Some(docindex) => { let word = Vec::from(token.word); - indexed.entry(word).or_insert_with(Vec::new).push(docindex); + words_doc_indexes.entry(word.clone()).or_insert_with(Vec::new).push(docindex); + docs_attrs_words.entry((id, attr)).or_insert_with(Vec::new).push(word); }, None => return false, }