feat: Introduce an Index system based on RocksDB

This commit is contained in:
Clément Renault 2018-11-15 17:55:20 +01:00
parent cc52d5dda5
commit b3249d515d
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
14 changed files with 569 additions and 97 deletions

View File

@ -6,10 +6,11 @@ authors = ["Kerollmops <renault.cle@gmail.com>"]
[dependencies]
byteorder = "1.2"
fnv = "1.0"
fs2 = "0.4"
lazy_static = "1.1"
sdset = "0.2"
fs2 = "0.4"
fnv = "1.0"
unidecode = "0.3"
[dependencies.fst]
git = "https://github.com/Kerollmops/fst.git"
@ -27,12 +28,11 @@ git = "https://github.com/pingcap/rust-rocksdb.git"
git = "https://github.com/Kerollmops/group-by.git"
[dev-dependencies]
csv = "1.0"
elapsed = "0.1"
moby-name-gen = "0.1"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
structopt = "0.2"
unidecode = "0.3"
elapsed = "0.1"
serde = "1.0"
warp = "0.1"
csv = "1.0"

View File

@ -79,8 +79,7 @@ impl CsvIndexer {
}
{
let title = Tokenizer::new(&product.title);
let title = title.iter().filter(|&(_, w)| !self.common_words.contains(w));
let title = Tokenizer::new(&product.title).filter(|&(_, w)| !self.common_words.contains(w));
insert_document_words(&mut builder, product.id, 1, title);
let key = format!("{}-title", product.id);
@ -89,8 +88,7 @@ impl CsvIndexer {
}
{
let description = Tokenizer::new(&product.description);
let description = description.iter().filter(|&(_, w)| !self.common_words.contains(w));
let description = Tokenizer::new(&product.description).filter(|&(_, w)| !self.common_words.contains(w));
insert_document_words(&mut builder, product.id, 2, description);
let key = format!("{}-description", product.id);

View File

@ -84,8 +84,7 @@ impl JsonLinesIndexer {
}
{
let title = Tokenizer::new(&product.title);
let title = title.iter().filter(|&(_, w)| !self.common_words.contains(w));
let title = Tokenizer::new(&product.title).filter(|&(_, w)| !self.common_words.contains(w));
insert_document_words(&mut builder, product.id, 1, title);
let key = format!("{}-title", product.id);
@ -94,8 +93,7 @@ impl JsonLinesIndexer {
}
{
let description = Tokenizer::new(&product.description);
let description = description.iter().filter(|&(_, w)| !self.common_words.contains(w));
let description = Tokenizer::new(&product.description).filter(|&(_, w)| !self.common_words.contains(w));
insert_document_words(&mut builder, product.id, 2, description);
let key = format!("{}-description", product.id);

View File

@ -40,7 +40,7 @@ impl<W: Write> NegativeBlobBuilder<W> {
Self { doc_ids: DocIdsBuilder::new(wrt) }
}
pub fn insert(&mut self, doc: DocumentId) {
pub fn insert(&mut self, doc: DocumentId) -> bool {
self.doc_ids.insert(doc)
}

View File

@ -47,7 +47,7 @@ impl DocIds {
}
pub struct DocIdsBuilder<W> {
doc_ids: BTreeSet<DocumentId>,
doc_ids: BTreeSet<DocumentId>, // TODO: prefer a linked-list
wrt: W,
}
@ -59,8 +59,8 @@ impl<W: io::Write> DocIdsBuilder<W> {
}
}
pub fn insert(&mut self, doc: DocumentId) {
self.doc_ids.insert(doc);
pub fn insert(&mut self, doc: DocumentId) -> bool {
self.doc_ids.insert(doc)
}
pub fn into_inner(mut self) -> io::Result<W> {

View File

@ -1,40 +0,0 @@
use std::path::{Path, PathBuf};
use std::error::Error;
use std::fs::{self, File};
use fs2::FileExt;
use crate::rank::Document;
use crate::blob::Blob;
pub struct Index {
path: PathBuf,
lock_file: File,
blobs: Vec<Blob>,
}
impl Index {
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Self, Box<Error>> {
let path = path.into();
let lock_file = File::create(path.join(".lock"))?;
lock_file.try_lock_exclusive()?;
let blobs = Vec::new();
Ok(Self { path, lock_file, blobs })
}
pub fn create<P: Into<PathBuf>>(path: P) -> Result<Self, Box<Error>> {
let path = path.into();
fs::create_dir_all(&path)?;
File::create(path.join(".lock"))?;
Self::open(path)
}
pub fn blobs(&self) -> &[Blob] {
&self.blobs
}
}

16
src/index/blob_name.rs Normal file
View File

@ -0,0 +1,16 @@
use std::fmt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct BlobName;
impl BlobName {
pub fn new() -> BlobName {
unimplemented!()
}
}
impl fmt::Display for BlobName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
unimplemented!()
}
}

175
src/index/mod.rs Normal file
View File

@ -0,0 +1,175 @@
pub mod blob_name;
pub mod schema;
pub mod search;
pub mod update;
use std::io;
use std::rc::Rc;
use std::error::Error;
use std::fs::{self, File};
use std::fmt::{self, Write};
use std::ops::{Deref, BitOr};
use std::path::{Path, PathBuf};
use std::collections::{BTreeSet, BTreeMap};
use fs2::FileExt;
use ::rocksdb::{rocksdb, rocksdb_options};
use ::rocksdb::merge_operator::MergeOperands;
use crate::rank::Document;
use crate::data::DocIdsBuilder;
use crate::{DocIndex, DocumentId};
use crate::index::{update::Update, search::Search};
use crate::blob::{PositiveBlobBuilder, Blob, Sign};
use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer};
fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut output = Vec::new();
for bytes in operands.chain(value) {
output.extend_from_slice(bytes);
}
output
}
pub struct Index {
database: rocksdb::DB,
}
impl Index {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Index, Box<Error>> {
let path = path.as_ref().to_string_lossy();
let mut opts = rocksdb_options::DBOptions::new();
opts.create_if_missing(true);
let mut cf_opts = rocksdb_options::ColumnFamilyOptions::new();
cf_opts.add_merge_operator("blobs order operator", simple_vec_append);
let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
// check if index is a valid RocksDB and
// contains the right key-values (i.e. "blobs-order")
Ok(Self { database })
}
pub fn ingest_update(&self, update: Update) -> Result<(), Box<Error>> {
let path = update.into_path_buf();
let path = path.to_string_lossy();
let mut options = rocksdb_options::IngestExternalFileOptions::new();
// options.move_files(true);
let cf_handle = self.database.cf_handle("default").unwrap();
self.database.ingest_external_file_optimized(&cf_handle, &options, &[&path])?;
Ok(())
}
pub fn snapshot(&self) -> Snapshot<&rocksdb::DB> {
Snapshot::new(&self.database)
}
}
impl Search for Index {
fn search(&self, text: &str) -> Vec<Document> {
unimplemented!()
}
}
pub struct Snapshot<D>
where D: Deref<Target=rocksdb::DB>,
{
inner: rocksdb::Snapshot<D>,
}
impl<D> Snapshot<D>
where D: Deref<Target=rocksdb::DB>,
{
pub fn new(inner: D) -> Snapshot<D> {
Self { inner: rocksdb::Snapshot::new(inner) }
}
}
impl<D> Search for Snapshot<D>
where D: Deref<Target=rocksdb::DB>,
{
fn search(&self, text: &str) -> Vec<Document> {
unimplemented!()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::index::schema::Schema;
use crate::index::update::{PositiveUpdateBuilder, NegativeUpdateBuilder};
#[test]
fn generate_negative_update() -> Result<(), Box<Error>> {
let schema = Schema::open("/meili/default.sch")?;
let mut builder = NegativeUpdateBuilder::new("update-delete-0001.sst");
// you can insert documents in any order, it is sorted internally
builder.remove(1);
builder.remove(5);
builder.remove(2);
let update = builder.build()?;
assert_eq!(update.info().sign, Sign::Negative);
Ok(())
}
#[test]
fn generate_positive_update() -> Result<(), Box<Error>> {
let schema = Schema::open("/meili/default.sch")?;
let tokenizer_builder = DefaultBuilder::new();
let mut builder = PositiveUpdateBuilder::new("update-positive-0001.sst", schema.clone(), tokenizer_builder);
// you can insert documents in any order, it is sorted internally
let title_field = schema.field("title").unwrap();
builder.update_field(1, title_field, "hallo!".to_owned());
builder.update_field(5, title_field, "hello!".to_owned());
builder.update_field(2, title_field, "hi!".to_owned());
let name_field = schema.field("name").unwrap();
builder.remove_field(4, name_field);
let update = builder.build()?;
assert_eq!(update.info().sign, Sign::Positive);
Ok(())
}
#[test]
fn execution() -> Result<(), Box<Error>> {
let index = Index::open("/meili/data")?;
let update = Update::open("update-0001.sst")?;
index.ingest_update(update)?;
// directly apply changes to the database and see new results
let results = index.search("helo");
//////////////
let index = Index::open("/meili/data")?;
let update = Update::open("update-0001.sst")?;
// if you create a snapshot before an update
let snapshot = index.snapshot();
index.ingest_update(update)?;
// the snapshot does not see the updates
let results = snapshot.search("helo");
// the raw index itself see new results
let results = index.search("helo");
Ok(())
}
}

82
src/index/schema.rs Normal file
View File

@ -0,0 +1,82 @@
use std::error::Error;
use std::path::Path;
use std::ops::BitOr;
use std::fmt;
pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false };
pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true };
#[derive(Copy, Clone)]
pub struct SchemaProps {
stored: bool,
indexed: bool,
}
impl SchemaProps {
pub fn is_stored(&self) -> bool {
self.stored
}
pub fn is_indexed(&self) -> bool {
self.indexed
}
}
impl BitOr for SchemaProps {
type Output = Self;
fn bitor(self, other: Self) -> Self::Output {
SchemaProps {
stored: self.stored | other.stored,
indexed: self.indexed | other.indexed,
}
}
}
pub struct SchemaBuilder;
impl SchemaBuilder {
pub fn new() -> SchemaBuilder {
unimplemented!()
}
pub fn field(&mut self, name: &str, props: SchemaProps) -> SchemaField {
unimplemented!()
}
pub fn build(self) -> Schema {
unimplemented!()
}
}
#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub struct SchemaField(u32);
impl SchemaField {
pub fn as_u32(&self) -> u32 {
self.0
}
}
impl fmt::Display for SchemaField {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone)]
pub struct Schema;
impl Schema {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Schema, Box<Error>> {
unimplemented!()
}
pub fn props(&self, field: SchemaField) -> SchemaProps {
unimplemented!()
}
pub fn field(&self, name: &str) -> Option<SchemaField> {
unimplemented!()
}
}

5
src/index/search.rs Normal file
View File

@ -0,0 +1,5 @@
use crate::rank::Document;
pub trait Search {
fn search(&self, text: &str) -> Vec<Document>;
}

55
src/index/update/mod.rs Normal file
View File

@ -0,0 +1,55 @@
use std::path::PathBuf;
use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::index::blob_name::BlobName;
use crate::blob::Sign;
mod negative_update;
mod positive_update;
pub use self::negative_update::{NegativeUpdateBuilder};
pub use self::positive_update::{PositiveUpdateBuilder, NewState};
// These prefixes are here to make sure the documents fields
// and the internal data doesn't collide and the internal data are
// at the top of the sst file.
const FIELD_BLOBS_ORDER: &str = "00-blobs-order";
pub struct Update {
path: PathBuf,
}
impl Update {
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
let path = path.into();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&path.to_string_lossy())?;
let infos = file_writer.finish()?;
if infos.smallest_key() != FIELD_BLOBS_ORDER.as_bytes() {
// FIXME return a nice error
panic!("Invalid update file: the blobs-order field is not the smallest key")
}
Ok(Update { path })
}
pub fn into_path_buf(self) -> PathBuf {
self.path
}
pub fn info(&self) -> UpdateInfo {
unimplemented!()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct UpdateInfo {
pub sign: Sign,
pub id: BlobName,
}

View File

@ -0,0 +1,59 @@
use std::path::PathBuf;
use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::index::update::{FIELD_BLOBS_ORDER, Update};
use crate::index::blob_name::BlobName;
use crate::data::DocIdsBuilder;
use crate::DocumentId;
pub struct NegativeUpdateBuilder {
path: PathBuf,
doc_ids: DocIdsBuilder<Vec<u8>>,
}
impl NegativeUpdateBuilder {
pub fn new<P: Into<PathBuf>>(path: P) -> NegativeUpdateBuilder {
NegativeUpdateBuilder {
path: path.into(),
doc_ids: DocIdsBuilder::new(Vec::new()),
}
}
pub fn remove(&mut self, id: DocumentId) -> bool {
self.doc_ids.insert(id)
}
pub fn build(self) -> Result<Update, Box<Error>> {
let blob_name = BlobName::new();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
// TODO the blob-name must be written in bytes (16 bytes)
// along with the sign
unimplemented!("write the blob sign and name");
// write the blob name to be merged
let blob_name = blob_name.to_string();
file_writer.merge(FIELD_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?;
// write the doc ids
let blob_key = format!("0b-{}-doc-ids", blob_name);
let blob_doc_ids = self.doc_ids.into_inner()?;
file_writer.put(blob_key.as_bytes(), &blob_doc_ids)?;
for id in blob_doc_ids {
let start = format!("5d-{}", id);
let end = format!("5d-{}", id + 1);
file_writer.delete_range(start.as_bytes(), end.as_bytes())?;
}
file_writer.finish()?;
Update::open(self.path)
}
}

View File

@ -0,0 +1,124 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::error::Error;
use std::fmt::Write;
use ::rocksdb::rocksdb_options;
use crate::index::schema::{SchemaProps, Schema, SchemaField};
use crate::index::update::{FIELD_BLOBS_ORDER, Update};
use crate::tokenizer::TokenizerBuilder;
use crate::index::blob_name::BlobName;
use crate::blob::PositiveBlobBuilder;
use crate::{DocIndex, DocumentId};
pub enum NewState {
Updated {
value: String,
props: SchemaProps,
},
Removed,
}
pub struct PositiveUpdateBuilder<B> {
path: PathBuf,
schema: Schema,
tokenizer_builder: B,
new_states: BTreeMap<(DocumentId, SchemaField), NewState>,
}
impl<B> PositiveUpdateBuilder<B> {
pub fn new<P: Into<PathBuf>>(path: P, schema: Schema, tokenizer_builder: B) -> PositiveUpdateBuilder<B> {
PositiveUpdateBuilder {
path: path.into(),
schema: schema,
tokenizer_builder: tokenizer_builder,
new_states: BTreeMap::new(),
}
}
// TODO value must be a field that can be indexed
pub fn update_field(&mut self, id: DocumentId, field: SchemaField, value: String) {
let state = NewState::Updated { value, props: self.schema.props(field) };
self.new_states.insert((id, field), state);
}
pub fn remove_field(&mut self, id: DocumentId, field: SchemaField) {
self.new_states.insert((id, field), NewState::Removed);
}
}
impl<B> PositiveUpdateBuilder<B>
where B: TokenizerBuilder
{
pub fn build(self) -> Result<Update, Box<Error>> {
let blob_name = BlobName::new();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
// TODO the blob-name must be written in bytes (16 bytes)
// along with the sign
unimplemented!("write the blob sign and name");
// write the blob name to be merged
let blob_name = blob_name.to_string();
file_writer.put(FIELD_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?;
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
for ((document_id, field), state) in &self.new_states {
let value = match state {
NewState::Updated { value, props } if props.is_indexed() => value,
_ => continue,
};
for (index, word) in self.tokenizer_builder.build(value) {
let doc_index = DocIndex {
document_id: *document_id,
attribute: field.as_u32() as u8,
attribute_index: index as u32,
};
// insert the exact representation
let word_lower = word.to_lowercase();
// and the unidecoded lowercased version
let word_unidecoded = unidecode::unidecode(word).to_lowercase();
if word_lower != word_unidecoded {
builder.insert(word_unidecoded, doc_index);
}
builder.insert(word_lower, doc_index);
}
}
let (blob_fst_map, blob_doc_idx) = builder.into_inner()?;
// write the fst
let blob_key = format!("0b-{}-fst", blob_name);
file_writer.put(blob_key.as_bytes(), &blob_fst_map)?;
// write the doc-idx
let blob_key = format!("0b-{}-doc-idx", blob_name);
file_writer.put(blob_key.as_bytes(), &blob_doc_idx)?;
// write all the documents fields updates
let mut key = String::from("5d-");
let prefix_len = key.len();
for ((id, field), state) in self.new_states {
key.truncate(prefix_len);
write!(&mut key, "{}-{}", id, field)?;
match state {
NewState::Updated { value, props } => if props.is_stored() {
file_writer.put(key.as_bytes(), value.as_bytes())?
},
NewState::Removed => file_writer.delete(key.as_bytes())?,
}
}
file_writer.finish()?;
Update::open(self.path)
}
}

View File

@ -1,28 +1,32 @@
use std::mem;
use self::Separator::*;
pub trait TokenizerBuilder {
fn build<'a>(&self, text: &'a str) -> Box<Iterator<Item=(usize, &'a str)> + 'a>;
}
pub struct DefaultBuilder;
impl DefaultBuilder {
pub fn new() -> DefaultBuilder {
DefaultBuilder
}
}
impl TokenizerBuilder for DefaultBuilder {
fn build<'a>(&self, text: &'a str) -> Box<Iterator<Item=(usize, &'a str)> + 'a> {
Box::new(Tokenizer::new(text))
}
}
pub struct Tokenizer<'a> {
index: usize,
inner: &'a str,
}
impl<'a> Tokenizer<'a> {
pub fn new(string: &str) -> Tokenizer {
Tokenizer { inner: string }
}
pub fn iter(&self) -> Tokens {
Tokens::new(self.inner)
}
}
pub struct Tokens<'a> {
index: usize,
inner: &'a str,
}
impl<'a> Tokens<'a> {
fn new(string: &str) -> Tokens {
Tokens {
Tokenizer {
index: 0,
inner: string.trim_matches(&[' ', '.', ';', ',', '!', '?', '-', '\'', '"'][..]),
}
@ -52,7 +56,7 @@ impl Separator {
}
}
impl<'a> Iterator for Tokens<'a> {
impl<'a> Iterator for Tokenizer<'a> {
type Item = (usize, &'a str);
fn next(&mut self) -> Option<Self::Item> {
@ -101,37 +105,33 @@ mod tests {
#[test]
fn easy() {
let tokenizer = Tokenizer::new("salut");
let mut tokens = tokenizer.iter();
let mut tokenizer = Tokenizer::new("salut");
assert_eq!(tokens.next(), Some((0, "salut")));
assert_eq!(tokens.next(), None);
assert_eq!(tokenizer.next(), Some((0, "salut")));
assert_eq!(tokenizer.next(), None);
let tokenizer = Tokenizer::new("yo ");
let mut tokens = tokenizer.iter();
let mut tokenizer = Tokenizer::new("yo ");
assert_eq!(tokens.next(), Some((0, "yo")));
assert_eq!(tokens.next(), None);
assert_eq!(tokenizer.next(), Some((0, "yo")));
assert_eq!(tokenizer.next(), None);
}
#[test]
fn hard() {
let tokenizer = Tokenizer::new(" .? yo lolo. aïe");
let mut tokens = tokenizer.iter();
let mut tokenizer = Tokenizer::new(" .? yo lolo. aïe");
assert_eq!(tokens.next(), Some((0, "yo")));
assert_eq!(tokens.next(), Some((1, "lolo")));
assert_eq!(tokens.next(), Some((9, "aïe")));
assert_eq!(tokens.next(), None);
assert_eq!(tokenizer.next(), Some((0, "yo")));
assert_eq!(tokenizer.next(), Some((1, "lolo")));
assert_eq!(tokenizer.next(), Some((9, "aïe")));
assert_eq!(tokenizer.next(), None);
let tokenizer = Tokenizer::new("yo ! lolo ? wtf - lol . aïe ,");
let mut tokens = tokenizer.iter();
let mut tokenizer = Tokenizer::new("yo ! lolo ? wtf - lol . aïe ,");
assert_eq!(tokens.next(), Some((0, "yo")));
assert_eq!(tokens.next(), Some((8, "lolo")));
assert_eq!(tokens.next(), Some((16, "wtf")));
assert_eq!(tokens.next(), Some((24, "lol")));
assert_eq!(tokens.next(), Some((32, "aïe")));
assert_eq!(tokens.next(), None);
assert_eq!(tokenizer.next(), Some((0, "yo")));
assert_eq!(tokenizer.next(), Some((8, "lolo")));
assert_eq!(tokenizer.next(), Some((16, "wtf")));
assert_eq!(tokenizer.next(), Some((24, "lol")));
assert_eq!(tokenizer.next(), Some((32, "aïe")));
assert_eq!(tokenizer.next(), None);
}
}