From 7c1a17520d59e470fe53fabbfd8c5b80fe3f4ebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 20 Nov 2018 11:37:19 +0100 Subject: [PATCH] feat: Introduce the index module --- Cargo.toml | 1 + examples/create-index.rs | 41 +++++++++++++++++++ examples/index-search.rs | 40 +++++++++++++++++++ examples/serve-console.rs | 35 ++++++----------- src/blob/mod.rs | 6 ++- src/index/mod.rs | 83 +++++++++++++++++++++------------------ src/index/schema.rs | 61 ++++++++++++++++++---------- src/index/search.rs | 5 --- src/rank/ranked_stream.rs | 8 ++-- 9 files changed, 188 insertions(+), 92 deletions(-) create mode 100644 examples/create-index.rs create mode 100644 examples/index-search.rs delete mode 100644 src/index/search.rs diff --git a/Cargo.toml b/Cargo.toml index 775a0950b..4741cb040 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,4 +35,5 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" structopt = "0.2" +tempfile = "3.0" warp = "0.1" diff --git a/examples/create-index.rs b/examples/create-index.rs new file mode 100644 index 000000000..84a8b73b6 --- /dev/null +++ b/examples/create-index.rs @@ -0,0 +1,41 @@ +use std::path::Path; +use std::error::Error; +use std::path::PathBuf; +use std::io::{self, Write}; + +use elapsed::measure_time; +use moby_name_gen::random_name; +use structopt::StructOpt; + +use pentium::index::update::Update; +use pentium::index::Index; + +#[derive(Debug, StructOpt)] +pub struct Cmd { + /// csv file to index + #[structopt(parse(from_os_str))] + pub csv_file: PathBuf, +} + +fn generate_update_from_csv(path: &Path) -> Result> { + unimplemented!() +} + +fn main() -> Result<(), Box> { + let command = Cmd::from_args(); + + let path = random_name(); + + println!("generating the update..."); + let update = generate_update_from_csv(&command.csv_file)?; + + println!("creating the index"); + let index = Index::open(&path)?; + + println!("ingesting the changes in the index"); + index.ingest_update(update)?; + + println!("the index {:?} has been created!", path); + + Ok(()) +} diff --git a/examples/index-search.rs b/examples/index-search.rs new file mode 100644 index 000000000..87b4c4195 --- /dev/null +++ b/examples/index-search.rs @@ -0,0 +1,40 @@ +use std::error::Error; +use std::path::PathBuf; +use std::io::{self, Write}; + +use elapsed::measure_time; +use structopt::StructOpt; +use pentium::index::Index; + +#[derive(Debug, StructOpt)] +pub struct Cmd { + /// Index path (e.g. relaxed-colden). + #[structopt(parse(from_os_str))] + pub index_path: PathBuf, +} + +fn main() -> Result<(), Box> { + let command = Cmd::from_args(); + let index = Index::open(command.index_path)?; + + loop { + print!("Searching for: "); + io::stdout().flush()?; + + let mut query = String::new(); + io::stdin().read_line(&mut query)?; + + if query.is_empty() { break } + + let (elapsed, result) = measure_time(|| index.search(&query)); + match result { + Ok(documents) => { + // display documents here ! + println!("Finished in {}", elapsed) + }, + Err(e) => panic!("{}", e), + } + } + + Ok(()) +} diff --git a/examples/serve-console.rs b/examples/serve-console.rs index 1836442ec..e908e0ec8 100644 --- a/examples/serve-console.rs +++ b/examples/serve-console.rs @@ -1,3 +1,4 @@ +use std::error::Error; use std::str::from_utf8_unchecked; use std::io::{self, Write}; use structopt::StructOpt; @@ -5,37 +6,25 @@ use std::path::PathBuf; use elapsed::measure_time; use rocksdb::{DB, DBOptions, IngestExternalFileOptions}; +use pentium::index::Index; use pentium::rank::{criterion, Config, RankedStream}; -use pentium::{automaton, DocumentId, Metadata}; +use pentium::{automaton, DocumentId}; #[derive(Debug, StructOpt)] pub struct CommandConsole { /// Meta file name (e.g. relaxed-colden). #[structopt(parse(from_os_str))] - pub meta_name: PathBuf, + pub index_path: PathBuf, } pub struct ConsoleSearch { - metadata: Metadata, - db: DB, + index: Index, } impl ConsoleSearch { - pub fn from_command(command: CommandConsole) -> io::Result { - let map_file = command.meta_name.with_extension("map"); - let idx_file = command.meta_name.with_extension("idx"); - let sst_file = command.meta_name.with_extension("sst"); - - let metadata = unsafe { Metadata::from_paths(map_file, idx_file).unwrap() }; - - let rocksdb = "rocksdb/storage"; - let db = DB::open_default(rocksdb).unwrap(); - let sst_file = sst_file.to_str().unwrap(); - db.ingest_external_file(&IngestExternalFileOptions::new(), &[sst_file]).unwrap(); - drop(db); - let db = DB::open_for_read_only(DBOptions::default(), rocksdb, false).unwrap(); - - Ok(ConsoleSearch { metadata, db }) + pub fn from_command(command: CommandConsole) -> Result> { + let index = Index::open(command.index_path)?; + Ok(ConsoleSearch { index }) } pub fn serve(self) { @@ -48,13 +37,13 @@ impl ConsoleSearch { if query.is_empty() { break } - let (elapsed, _) = measure_time(|| search(&self.metadata, &self.db, &query)); + let (elapsed, _) = measure_time(|| search(&self.index, &query)); println!("Finished in {}", elapsed); } } } -fn search(metadata: &Metadata, database: &DB, query: &str) { +fn search(index: &Index, query: &str) { let mut automatons = Vec::new(); for query in query.split_whitespace().map(str::to_lowercase) { let lev = automaton::build_prefix_dfa(&query); @@ -75,9 +64,11 @@ fn search(metadata: &Metadata, database: &DB, query: &str) { } }; + let index: Index = unimplemented!(); + // "Sony" "PlayStation 4 500GB" let config = Config { - index: unimplemented!(), + blobs: &index.blobs().unwrap(), automatons: automatons, criteria: criterion::default(), distinct: (distinct_by_title_first_four_chars, 1), diff --git a/src/blob/mod.rs b/src/blob/mod.rs index 0139f48d1..b8cb5d9e0 100644 --- a/src/blob/mod.rs +++ b/src/blob/mod.rs @@ -8,8 +8,8 @@ pub use self::merge::Merge; pub use self::positive_blob::{PositiveBlob, PositiveBlobBuilder}; pub use self::negative_blob::{NegativeBlob, NegativeBlobBuilder}; +use std::error::Error; use fst::Map; - use crate::data::DocIndexes; pub enum Blob { @@ -40,3 +40,7 @@ impl Sign { } } } + +pub fn ordered_blobs_from_slice(slice: &[u8]) -> Result, Box> { + unimplemented!() +} diff --git a/src/index/mod.rs b/src/index/mod.rs index d7f642de4..f5875c2ca 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,6 +1,5 @@ pub mod blob_name; pub mod schema; -pub mod search; pub mod update; use std::io; @@ -19,9 +18,12 @@ use ::rocksdb::merge_operator::MergeOperands; use crate::rank::Document; use crate::data::DocIdsBuilder; use crate::{DocIndex, DocumentId}; -use crate::index::{update::Update, search::Search}; +use crate::index::update::Update; use crate::blob::{PositiveBlobBuilder, Blob, Sign}; +use crate::blob::ordered_blobs_from_slice; use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer}; +use crate::rank::{criterion, Config, RankedStream}; +use crate::automaton; fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { let mut output = Vec::new(); @@ -36,6 +38,12 @@ pub struct Index { } impl Index { + pub fn create>(path: P) -> Result> { + unimplemented!("return a soft error: the database already exist at the given path") + // Self::open must not take a parameter for create_if_missing + // or we must create an OpenOptions with many parameters + // https://doc.rust-lang.org/std/fs/struct.OpenOptions.html + } pub fn open>(path: P) -> Result> { let path = path.as_ref().to_string_lossy(); @@ -66,50 +74,47 @@ impl Index { Ok(()) } - pub fn snapshot(&self) -> Snapshot<&rocksdb::DB> { - Snapshot::new(&self.database) + fn blobs(&self) -> Result, Box> { + match self.database.get(b"00-blobs-order")? { + Some(value) => Ok(ordered_blobs_from_slice(&value)?), + None => Ok(Vec::new()), + } } -} -impl Search for Index { - fn search(&self, text: &str) -> Vec { - unimplemented!() - } -} + pub fn search(&self, query: &str) -> Result, Box> { -pub struct Snapshot -where D: Deref, -{ - inner: rocksdb::Snapshot, -} + // FIXME create a SNAPSHOT for the search ! + let blobs = self.blobs()?; -impl Snapshot -where D: Deref, -{ - pub fn new(inner: D) -> Snapshot { - Self { inner: rocksdb::Snapshot::new(inner) } - } -} + let mut automatons = Vec::new(); + for query in query.split_whitespace().map(str::to_lowercase) { + let lev = automaton::build_prefix_dfa(&query); + automatons.push(lev); + } -impl Search for Snapshot -where D: Deref, -{ - fn search(&self, text: &str) -> Vec { - unimplemented!() + let config = Config { + blobs: &blobs, + automatons: automatons, + criteria: criterion::default(), + distinct: ((), 1), + }; + + Ok(RankedStream::new(config).retrieve_documents(0..20)) } } #[cfg(test)] mod tests { + use tempfile::NamedTempFile; + use super::*; use crate::index::schema::Schema; use crate::index::update::{PositiveUpdateBuilder, NegativeUpdateBuilder}; #[test] fn generate_negative_update() -> Result<(), Box> { - - let schema = Schema::open("/meili/default.sch")?; - let mut builder = NegativeUpdateBuilder::new("update-delete-0001.sst"); + let path = NamedTempFile::new()?.into_temp_path(); + let mut builder = NegativeUpdateBuilder::new(&path); // you can insert documents in any order, it is sorted internally builder.remove(1); @@ -157,18 +162,18 @@ mod tests { ////////////// - let index = Index::open("/meili/data")?; - let update = Update::open("update-0001.sst")?; + // let index = Index::open("/meili/data")?; + // let update = Update::open("update-0001.sst")?; - // if you create a snapshot before an update - let snapshot = index.snapshot(); - index.ingest_update(update)?; + // // if you create a snapshot before an update + // let snapshot = index.snapshot(); + // index.ingest_update(update)?; - // the snapshot does not see the updates - let results = snapshot.search("helo"); + // // the snapshot does not see the updates + // let results = snapshot.search("helo"); - // the raw index itself see new results - let results = index.search("helo"); + // // the raw index itself see new results + // let results = index.search("helo"); Ok(()) } diff --git a/src/index/schema.rs b/src/index/schema.rs index e87459e4d..f4ab19279 100644 --- a/src/index/schema.rs +++ b/src/index/schema.rs @@ -1,6 +1,8 @@ +use std::io::{Read, Write}; use std::error::Error; use std::path::Path; use std::ops::BitOr; +use std::fs::File; use std::fmt; pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false }; @@ -33,15 +35,23 @@ impl BitOr for SchemaProps { } } -pub struct SchemaBuilder; +pub struct SchemaBuilder { + fields: Vec<(String, SchemaProps)>, +} impl SchemaBuilder { pub fn new() -> SchemaBuilder { - unimplemented!() + SchemaBuilder { fields: Vec::new() } } - pub fn field(&mut self, name: &str, props: SchemaProps) -> SchemaField { - unimplemented!() + pub fn field(&mut self, name: N, props: SchemaProps) -> SchemaField + where N: Into, + { + let len = self.fields.len(); + let name = name.into(); + self.fields.push((name, props)); + + SchemaField(len as u32) } pub fn build(self) -> Schema { @@ -49,6 +59,32 @@ impl SchemaBuilder { } } +#[derive(Clone)] +pub struct Schema; + +impl Schema { + pub fn open>(path: P) -> Result> { + let file = File::open(path)?; + Schema::read_from(file) + } + + pub fn read_from(reader: R) -> Result> { + unimplemented!() + } + + pub fn write_to(writer: W) -> Result<(), Box> { + unimplemented!() + } + + pub fn props(&self, field: SchemaField) -> SchemaProps { + unimplemented!() + } + + pub fn field(&self, name: &str) -> Option { + unimplemented!() + } +} + #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq)] pub struct SchemaField(u32); @@ -63,20 +99,3 @@ impl fmt::Display for SchemaField { write!(f, "{}", self.0) } } - -#[derive(Clone)] -pub struct Schema; - -impl Schema { - pub fn open>(path: P) -> Result> { - unimplemented!() - } - - pub fn props(&self, field: SchemaField) -> SchemaProps { - unimplemented!() - } - - pub fn field(&self, name: &str) -> Option { - unimplemented!() - } -} diff --git a/src/index/search.rs b/src/index/search.rs deleted file mode 100644 index 712dd1c74..000000000 --- a/src/index/search.rs +++ /dev/null @@ -1,5 +0,0 @@ -use crate::rank::Document; - -pub trait Search { - fn search(&self, text: &str) -> Vec; -} diff --git a/src/rank/ranked_stream.rs b/src/rank/ranked_stream.rs index 9f014b56e..9e8fdcd54 100644 --- a/src/rank/ranked_stream.rs +++ b/src/rank/ranked_stream.rs @@ -22,8 +22,8 @@ fn clamp_range(range: Range, big: Range) -> Range { } } -pub struct Config { - pub index: Index, +pub struct Config<'a, C, F> { + pub blobs: &'a [Blob], pub automatons: Vec, pub criteria: Vec, pub distinct: (F, usize), @@ -37,11 +37,11 @@ pub struct RankedStream<'m, C, F> { } impl<'m, C, F> RankedStream<'m, C, F> { - pub fn new(config: Config) -> Self { + pub fn new(config: Config<'m, C, F>) -> Self { let automatons: Vec<_> = config.automatons.into_iter().map(Rc::new).collect(); RankedStream { - stream: Merge::with_automatons(automatons.clone(), unimplemented!()), + stream: Merge::with_automatons(automatons.clone(), config.blobs), automatons: automatons, criteria: config.criteria, distinct: config.distinct,