From 37c709c9a9b4155d23e56c988fb95f8f28c59e02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 17 Oct 2018 13:35:34 +0200 Subject: [PATCH] feat: Introduce a way to distinct documents --- examples/csv-indexer.rs | 4 +- examples/json-lines-indexer.rs | 4 +- examples/serve-console.rs | 20 +++- examples/serve-http.rs | 13 ++- src/lib.rs | 2 +- src/rank/criterion/mod.rs | 9 +- src/rank/mod.rs | 2 +- src/rank/ranked_stream.rs | 197 ++++++++++++++++++++++----------- 8 files changed, 167 insertions(+), 84 deletions(-) diff --git a/examples/csv-indexer.rs b/examples/csv-indexer.rs index 923b8cbc1..940a3b69e 100644 --- a/examples/csv-indexer.rs +++ b/examples/csv-indexer.rs @@ -117,14 +117,14 @@ impl CsvIndexer { } } -fn insert_document_words<'a, I, A, B>(builder: &mut MetadataBuilder, doc_index: u64, attr: u8, words: I) +fn insert_document_words<'a, I, A, B>(builder: &mut MetadataBuilder, doc_id: u64, attr: u8, words: I) where A: io::Write, B: io::Write, I: IntoIterator, { for (index, word) in words { let doc_index = DocIndex { - document: doc_index, + document_id: doc_id, attribute: attr, attribute_index: index as u32, }; diff --git a/examples/json-lines-indexer.rs b/examples/json-lines-indexer.rs index 2f691f395..0a25f4e26 100644 --- a/examples/json-lines-indexer.rs +++ b/examples/json-lines-indexer.rs @@ -122,14 +122,14 @@ impl JsonLinesIndexer { } } -fn insert_document_words<'a, I, A, B>(builder: &mut MetadataBuilder, doc_index: u64, attr: u8, words: I) +fn insert_document_words<'a, I, A, B>(builder: &mut MetadataBuilder, doc_id: u64, attr: u8, words: I) where A: io::Write, B: io::Write, I: IntoIterator, { for (index, word) in words { let doc_index = DocIndex { - document: doc_index, + document_id: doc_id, attribute: attr, attribute_index: index as u32, }; diff --git a/examples/serve-console.rs b/examples/serve-console.rs index 59e2f4a8e..97919a3ee 100644 --- a/examples/serve-console.rs +++ b/examples/serve-console.rs @@ -5,8 +5,8 @@ use std::path::PathBuf; use elapsed::measure_time; use rocksdb::{DB, DBOptions, IngestExternalFileOptions}; +use raptor::rank::{criterion, Config, RankedStream, Document}; use raptor::{automaton, Metadata, CommonWords}; -use raptor::rank::{criterion, RankedStreamBuilder}; #[derive(Debug, StructOpt)] pub struct CommandConsole { @@ -62,6 +62,13 @@ impl ConsoleSearch { } } +// "Sony" "PlayStation 4 500GB" +fn starts_with_playstation(doc: &Document, database: &DB) -> Vec { + let title_key = format!("{}-title", doc.id); + let title = database.get(title_key.as_bytes()).unwrap().unwrap(); + title.get(0..4).map(|s| s.to_vec()).unwrap_or(Vec::new()) +} + fn search(metadata: &Metadata, database: &DB, common_words: &CommonWords, query: &str) { let mut automatons = Vec::new(); for query in query.split_whitespace().filter(|q| !common_words.contains(*q)) { @@ -69,10 +76,15 @@ fn search(metadata: &Metadata, database: &DB, common_words: &CommonWords, query: automatons.push(lev); } - let mut builder = RankedStreamBuilder::new(metadata, automatons); - builder.criteria(criterion::default()); + let config = Config { + metadata: metadata, + automatons: automatons, + criteria: criterion::default(), + distinct: ((), 1), + }; + let stream = RankedStream::new(config); - let mut stream = builder.build(); + // let documents = stream.retrieve_distinct_documents(|doc| starts_with_playstation(doc, database), 0..20); let documents = stream.retrieve_documents(0..20); for document in documents { diff --git a/examples/serve-http.rs b/examples/serve-http.rs index c5eafd3c4..4b882e0a8 100644 --- a/examples/serve-http.rs +++ b/examples/serve-http.rs @@ -7,11 +7,10 @@ use std::path::PathBuf; use std::error::Error; use std::sync::Arc; -use raptor::rank::{criterion, RankedStreamBuilder}; +use raptor::rank::{criterion, Config, RankedStream}; use raptor::{automaton, Metadata, CommonWords}; use rocksdb::{DB, DBOptions, IngestExternalFileOptions}; use warp::Filter; - use structopt::StructOpt; #[derive(Debug, StructOpt)] @@ -99,10 +98,14 @@ where M: AsRef, automatons.push(lev); } - let mut builder = RankedStreamBuilder::new(metadata.as_ref(), automatons); - builder.criteria(criterion::default()); + let config = Config { + metadata: metadata.as_ref(), + automatons: automatons, + criteria: criterion::default(), + distinct: ((), 1), + }; + let stream = RankedStream::new(config); - let mut stream = builder.build(); let documents = stream.retrieve_documents(0..20); let mut body = Vec::new(); diff --git a/src/lib.rs b/src/lib.rs index 9ee316a2e..e8fb9ab98 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ pub type DocumentId = u64; #[repr(C)] pub struct DocIndex { /// The document identifier where the word was found. - pub document: DocumentId, + pub document_id: DocumentId, /// The attribute identifier in the document /// where the word was found. diff --git a/src/rank/criterion/mod.rs b/src/rank/criterion/mod.rs index 907b5cf48..97f1df09c 100644 --- a/src/rank/criterion/mod.rs +++ b/src/rank/criterion/mod.rs @@ -7,7 +7,6 @@ mod exact; use std::vec; use std::cmp::Ordering; -use std::ops::Deref; use crate::rank::Document; pub use self::{ @@ -31,21 +30,21 @@ pub trait Criterion { impl<'a, T: Criterion + ?Sized> Criterion for &'a T { fn evaluate(&self, lhs: &Document, rhs: &Document) -> Ordering { - self.deref().evaluate(lhs, rhs) + (**self).evaluate(lhs, rhs) } fn eq(&self, lhs: &Document, rhs: &Document) -> bool { - self.deref().eq(lhs, rhs) + (**self).eq(lhs, rhs) } } impl Criterion for Box { fn evaluate(&self, lhs: &Document, rhs: &Document) -> Ordering { - self.deref().evaluate(lhs, rhs) + (**self).evaluate(lhs, rhs) } fn eq(&self, lhs: &Document, rhs: &Document) -> bool { - self.deref().eq(lhs, rhs) + (**self).eq(lhs, rhs) } } diff --git a/src/rank/mod.rs b/src/rank/mod.rs index c846964d1..413c7566e 100644 --- a/src/rank/mod.rs +++ b/src/rank/mod.rs @@ -3,7 +3,7 @@ mod ranked_stream; use crate::{Match, DocumentId}; -pub use self::ranked_stream::{RankedStreamBuilder, RankedStream}; +pub use self::ranked_stream::{Config, RankedStream}; #[inline] fn match_query_index(a: &Match, b: &Match) -> bool { diff --git a/src/rank/ranked_stream.rs b/src/rank/ranked_stream.rs index c32861762..56f228cf9 100644 --- a/src/rank/ranked_stream.rs +++ b/src/rank/ranked_stream.rs @@ -1,6 +1,8 @@ +use std::collections::HashMap; +use std::hash::Hash; use std::ops::Range; use std::rc::Rc; -use std::{mem, vec, cmp}; +use std::{mem, vec}; use fnv::FnvHashMap; use fst::Streamer; @@ -9,52 +11,41 @@ use group_by::GroupByMut; use crate::automaton::{DfaExt, AutomatonExt}; use crate::metadata::Metadata; use crate::metadata::ops::OpBuilder; -use crate::rank::criterion::Criterion; +use crate::rank::criterion::{self, Criterion}; use crate::rank::Document; -use crate::Match; +use crate::{Match, DocumentId}; -#[derive(Clone)] -pub struct RankedStreamBuilder<'m, C> { - metadata: &'m Metadata, - automatons: Vec>, - criteria: Vec, +pub struct Config<'m, C, F> { + pub metadata: &'m Metadata, + pub automatons: Vec, + pub criteria: Vec, + pub distinct: (F, usize), } -impl<'m, C> RankedStreamBuilder<'m, C> { - pub fn new(metadata: &'m Metadata, automatons: Vec) -> Self { - RankedStreamBuilder { - metadata: metadata, - automatons: automatons.into_iter().map(Rc::new).collect(), - criteria: Vec::new(), // hummm... prefer the criterion::default() ones ! - } - } +pub struct RankedStream<'m, C, F> { + stream: crate::metadata::ops::Union<'m>, + automatons: Vec>, + criteria: Vec, + distinct: (F, usize), +} - pub fn criteria(&mut self, criteria: Vec) { - self.criteria = criteria; - } - - pub fn build(&self) -> RankedStream { - let mut builder = OpBuilder::with_automatons(self.automatons.clone()); - builder.push(self.metadata); +impl<'m, C, F> RankedStream<'m, C, F> { + pub fn new(config: Config<'m, C, F>) -> Self { + let automatons: Vec<_> = config.automatons.into_iter().map(Rc::new).collect(); + let mut builder = OpBuilder::with_automatons(automatons.clone()); + builder.push(config.metadata); RankedStream { stream: builder.union(), - automatons: &self.automatons, - criteria: &self.criteria, + automatons: automatons, + criteria: config.criteria, + distinct: config.distinct, } } } -pub struct RankedStream<'a, 'm, C> { - stream: crate::metadata::ops::Union<'m>, - automatons: &'a [Rc], - criteria: &'a [C], -} - -impl<'a, 'm, C> RankedStream<'a, 'm, C> { - pub fn retrieve_documents(&mut self, range: Range) -> Vec - where C: Criterion - { +impl<'m, C, F> RankedStream<'m, C, F> { + fn retrieve_all_documents(&mut self) -> Vec { let mut matches = FnvHashMap::default(); while let Some((string, indexed_values)) = self.stream.next() { @@ -63,55 +54,133 @@ impl<'a, 'm, C> RankedStream<'a, 'm, C> { let distance = automaton.eval(string).to_u8(); let is_exact = distance == 0 && string.len() == automaton.query_len(); - for di in iv.doc_indexes.as_slice() { + for doc_index in iv.doc_indexes.as_slice() { let match_ = Match { query_index: iv.index as u32, distance: distance, - attribute: di.attribute, - attribute_index: di.attribute_index, + attribute: doc_index.attribute, + attribute_index: doc_index.attribute_index, is_exact: is_exact, }; - matches.entry(di.document).or_insert_with(Vec::new).push(match_); + matches.entry(doc_index.document_id).or_insert_with(Vec::new).push(match_); } } } - // collect matches from an HashMap into a Vec - let mut documents: Vec<_> = matches.into_iter().map(|(id, mut matches)| { + matches.into_iter().map(|(id, mut matches)| { matches.sort_unstable(); unsafe { Document::from_sorted_matches(id, matches) } - }).collect(); + }).collect() + } +} +impl<'a, C, F> RankedStream<'a, C, F> +where C: Criterion +{ + pub fn retrieve_documents(mut self, range: Range) -> Vec { + let mut documents = self.retrieve_all_documents(); let mut groups = vec![documents.as_mut_slice()]; for criterion in self.criteria { let tmp_groups = mem::replace(&mut groups, Vec::new()); - let mut current_range = Range { start: 0, end: 0 }; - 'grp: for group in tmp_groups { - current_range.end += group.len(); - - // if a part of the current group is in the range returned - // we must sort it and emit the sub-groups - if current_range.contains(&range.start) { - group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); - for group in GroupByMut::new(group, |a, b| criterion.eq(a, b)) { - groups.push(group); - if current_range.end >= range.end { break 'grp } - } - } else { - groups.push(group) + for group in tmp_groups { + group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); + for group in GroupByMut::new(group, |a, b| criterion.eq(a, b)) { + groups.push(group); } - - current_range.start = current_range.end; } } - // TODO find a better algorithm, here we allocate for too many documents - // and we do a useless allocation, we should reuse the documents Vec - let start = cmp::min(range.start, documents.len()); - let mut documents = documents.split_off(start); - documents.truncate(range.len()); - documents + documents[range].to_vec() + } + + pub fn retrieve_distinct_documents(mut self, range: Range) -> Vec + where F: Fn(&DocumentId) -> K, + K: Hash + Eq, + { + let mut documents = self.retrieve_all_documents(); + let mut groups = vec![documents.as_mut_slice()]; + + for criterion in self.criteria { + let tmp_groups = mem::replace(&mut groups, Vec::new()); + + for group in tmp_groups { + group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); + for group in GroupByMut::new(group, |a, b| criterion.eq(a, b)) { + groups.push(group); + } + } + } + + let mut out_documents = Vec::with_capacity(range.len()); + let (distinct, limit) = self.distinct; + let mut seen = DistinctMap::new(limit); + + for document in documents { + let key = distinct(&document.id); + let accepted = seen.digest(key); + + if accepted { + if seen.len() == range.end { break } + if seen.len() >= range.start { + out_documents.push(document); + } + } + } + + out_documents + } +} + +pub struct DistinctMap { + inner: HashMap, + limit: usize, + len: usize, +} + +impl DistinctMap { + pub fn new(limit: usize) -> Self { + DistinctMap { + inner: HashMap::new(), + limit: limit, + len: 0, + } + } + + pub fn digest(&mut self, key: K) -> bool { + let seen = self.inner.entry(key).or_insert(0); + if *seen < self.limit { *seen += 1; self.len += 1; true } else { false } + } + + pub fn len(&self) -> usize { + self.len + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn easy_distinct_map() { + let mut map = DistinctMap::new(2); + for x in &[1, 1, 1, 2, 3, 4, 5, 6, 6, 6, 6, 6] { + map.digest(x); + } + assert_eq!(map.len(), 8); + + let mut map = DistinctMap::new(2); + assert_eq!(map.digest(1), true); + assert_eq!(map.digest(1), true); + assert_eq!(map.digest(1), false); + assert_eq!(map.digest(1), false); + + assert_eq!(map.digest(2), true); + assert_eq!(map.digest(3), true); + assert_eq!(map.digest(2), true); + assert_eq!(map.digest(2), false); + + assert_eq!(map.len(), 5); } }