diff --git a/raptor/src/lib.rs b/raptor/src/lib.rs index 2786d0ca8..85ad4628e 100644 --- a/raptor/src/lib.rs +++ b/raptor/src/lib.rs @@ -5,12 +5,7 @@ pub mod metadata; pub mod vec_read_only; pub mod automaton; -pub use self::metadata::{ - Metadata, MetadataBuilder, - // Stream, StreamBuilder, - // Union, OpBuilder, - // IndexedValues, -}; +pub use self::metadata::{Metadata, MetadataBuilder}; pub use self::rank::RankedStream; pub type DocumentId = u64; @@ -23,7 +18,6 @@ pub type DocumentId = u64; #[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] #[repr(C)] pub struct DocIndex { - /// The document identifier where the word was found. pub document: DocumentId, @@ -50,7 +44,6 @@ pub struct DocIndex { // TODO do data oriented programming ? very arrays ? #[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct Match { - /// The word index in the query sentence. /// Same as the `attribute_index` but for the query words. /// diff --git a/raptor/src/metadata/difference.rs b/raptor/src/metadata/difference.rs new file mode 100644 index 000000000..50c648459 --- /dev/null +++ b/raptor/src/metadata/difference.rs @@ -0,0 +1,126 @@ +use fst::{Streamer, Automaton}; +use crate::metadata::ops::{self, IndexedDocIndexes}; +use crate::metadata::{stream_ops, Metadata}; + +fn union_with_automatons<'a, A>(metas: &'a [Metadata], autos: Vec) -> ops::Union +where A: 'a + Automaton + Clone, +{ + let mut op = ops::OpBuilder::with_automatons(autos); + for metadata in metas { + op.push(metadata); + } + op.union() +} + +pub struct Difference<'f> { + inner: stream_ops::Difference<'f>, +} + +impl<'f> Difference<'f> { + pub fn new(positives: &'f [Metadata], negatives: &'f [Metadata], automatons: Vec) -> Self + where A: 'f + Automaton + Clone + { + let positives = union_with_automatons(positives, automatons.clone()); + let negatives = union_with_automatons(negatives, automatons); + + let mut builder = stream_ops::OpBuilder::new(); + builder.push(positives); + builder.push(negatives); + + Difference { inner: builder.difference() } + } +} + +impl<'a, 'f> Streamer<'a> for Difference<'f> { + type Item = (&'a [u8], &'a [IndexedDocIndexes]); + + fn next(&'a mut self) -> Option { + self.inner.next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use fst::automaton::AlwaysMatch; + use crate::metadata::{Metadata, MetadataBuilder}; + use crate::vec_read_only::VecReadOnly; + use crate::DocIndex; + + fn construct_metadata(documents: Vec<(String, DocIndex)>) -> Metadata { + let mapw = Vec::new(); + let indexesw = Vec::new(); + + let mut builder = MetadataBuilder::new(mapw, indexesw); + + for (string, doc_index) in documents { + builder.insert(string, doc_index); + } + + let (map, indexes) = builder.into_inner().unwrap(); + Metadata::from_bytes(map, indexes).unwrap() + } + + #[test] + fn empty() { + let positive_metas = construct_metadata(vec![ + ("chameau".into(), DocIndex{ document: 12, attribute: 1, attribute_index: 22 }), + ("chameau".into(), DocIndex{ document: 31, attribute: 0, attribute_index: 1 }), + ]); + + let negative_metas = construct_metadata(vec![ + ("chameau".into(), DocIndex{ document: 12, attribute: 1, attribute_index: 22 }), + ("chameau".into(), DocIndex{ document: 31, attribute: 0, attribute_index: 1 }), + ]); + + let positives = &[positive_metas]; + let negatives = &[negative_metas]; + let mut diff = Difference::new(positives, negatives, vec![AlwaysMatch]); + + assert_eq!(diff.next(), None); + } + + #[test] + fn one_positive() { + let di1 = DocIndex{ document: 12, attribute: 1, attribute_index: 22 }; + let di2 = DocIndex{ document: 31, attribute: 0, attribute_index: 1 }; + + let positive_metas = construct_metadata(vec![ + ("chameau".into(), di1), + ("chameau".into(), di2), + ]); + + let negative_metas = construct_metadata(vec![ + ("chameau".into(), di1), + ]); + + let positives = &[positive_metas]; + let negatives = &[negative_metas]; + let mut diff = Difference::new(positives, negatives, vec![AlwaysMatch]); + + let idi = IndexedDocIndexes{ index: 0, doc_indexes: VecReadOnly::new(vec![di2]) }; + assert_eq!(diff.next(), Some(("chameau".as_bytes(), &[idi][..]))); + assert_eq!(diff.next(), None); + } + + #[test] + fn more_negative_than_positive() { + let di1 = DocIndex{ document: 12, attribute: 1, attribute_index: 22 }; + let di2 = DocIndex{ document: 31, attribute: 0, attribute_index: 1 }; + + let positive_metas = construct_metadata(vec![ + ("chameau".into(), di1), + ]); + + let negative_metas = construct_metadata(vec![ + ("chameau".into(), di1), + ("chameau".into(), di2), + ]); + + let positives = &[positive_metas]; + let negatives = &[negative_metas]; + let mut diff = Difference::new(positives, negatives, vec![AlwaysMatch]); + + assert_eq!(diff.next(), None); + } +} diff --git a/raptor/src/metadata/mod.rs b/raptor/src/metadata/mod.rs index 7c63d2d53..9e594d49b 100644 --- a/raptor/src/metadata/mod.rs +++ b/raptor/src/metadata/mod.rs @@ -1,6 +1,8 @@ -mod ops_indexed_value; pub mod ops; +pub mod stream_ops; pub mod doc_indexes; +pub mod difference; +pub mod ops_indexed_value; use fst::{Map, MapBuilder}; use std::error::Error; diff --git a/raptor/src/metadata/stream_ops.rs b/raptor/src/metadata/stream_ops.rs new file mode 100644 index 000000000..230a54b07 --- /dev/null +++ b/raptor/src/metadata/stream_ops.rs @@ -0,0 +1,309 @@ +use std::rc::Rc; +use std::collections::{BinaryHeap, HashMap, BTreeMap}; +use std::cmp; +use fst::{IntoStreamer, Streamer}; +use sdset::multi::OpBuilder as SdOpBuilder; +use sdset::{SetOperation, Set}; +use crate::metadata::ops::IndexedDocIndexes; +use crate::vec_read_only::VecReadOnly; +use crate::DocIndex; + +type BoxedStream<'f> = Box Streamer<'a, Item=(&'a [u8], &'a [IndexedDocIndexes])> + 'f>; + +pub struct OpBuilder<'f> { + streams: Vec>, +} + +impl<'f> OpBuilder<'f> { + pub fn new() -> Self { + Self { streams: Vec::new() } + } + + /// Push a stream of `IndexedDocIndexes`. + /// + /// # Warning + /// + /// You must ensure yourself that the automatons are + /// all the same in the same order for each stream you push. + pub fn push(&mut self, stream: I) + where + I: for<'a> IntoStreamer<'a, Into=S, Item=(&'a [u8], &'a [IndexedDocIndexes])>, + S: 'f + for<'a> Streamer<'a, Item=(&'a [u8], &'a [IndexedDocIndexes])>, + { + self.streams.push(Box::new(stream.into_stream())); + } + + pub fn union(self) -> Union<'f> { + Union { + heap: StreamHeap::new(self.streams), + outs: Vec::new(), + cur_slot: None, + } + } + + pub fn intersection(self) -> Intersection<'f> { + Intersection { + heap: StreamHeap::new(self.streams), + outs: Vec::new(), + cur_slot: None, + } + } + + pub fn difference(self) -> Difference<'f> { + Difference { + heap: StreamHeap::new(self.streams), + outs: Vec::new(), + cur_slot: None, + } + } + + pub fn symmetric_difference(self) -> SymmetricDifference<'f> { + SymmetricDifference { + heap: StreamHeap::new(self.streams), + outs: Vec::new(), + cur_slot: None, + } + } +} + +// FIXME reuse it from metadata::ops +struct SlotIndexedDocIndexes { + aut_index: usize, + start: usize, + len: usize, +} + +macro_rules! logical_operation { + (struct $name:ident, $operation:ident) => { + +pub struct $name<'f> { + heap: StreamHeap<'f>, + outs: Vec, + cur_slot: Option, +} + +impl<'a, 'f> Streamer<'a> for $name<'f> { + type Item = (&'a [u8], &'a [IndexedDocIndexes]); + + // The Metadata could be types as "key-values present" and "key-values possibly not present" + // in other words Metadata that "needs" to have key-values and other that doesn't needs. + // + // We could probably allow the user to define in Metadata some Document + // that needs to be deleted and only declare the DocumentId, and not every DocIndex of each words. + fn next(&'a mut self) -> Option { + if let Some(slot) = self.cur_slot.take() { + self.heap.refill(slot); + } + let slot = match self.heap.pop() { + None => return None, + Some(slot) => { + self.cur_slot = Some(slot); + self.cur_slot.as_mut().unwrap() + } + }; + + self.outs.clear(); + + // retrieve all the doc_indexes of all the streams, + // store them in an HashMap which the key is + // the aut_index (associated with the state that is ignored), + // the doc_indexes must be stored in another BTreeMap which the key + // is the rdr_index. + // + // This will permit us to do set operations on readers (using the rdr_index) + // the BTreeMap will gives the rdr_index in order and the final result + // will be aggregated in a Vec of IndexedDocIndexes which the aut_index and state + // are the key of the first HashMap + + // TODO use the fnv Hasher! + + let mut builders = HashMap::new(); + let iv = slot.indexed_value(); + let builder = builders.entry(iv.index).or_insert_with(BTreeMap::new); + builder.insert(slot.rdr_index, iv.doc_indexes); + + while let Some(mut slot) = self.heap.pop_if_equal(slot.input()) { + let iv = slot.indexed_value(); + let builder = builders.entry(iv.index).or_insert_with(BTreeMap::new); + builder.insert(slot.rdr_index, iv.doc_indexes); + + self.heap.refill(slot); + } + + // now that we have accumulated all the doc_indexes like so: + // HashMap<(aut_index, state*), BtreeMap> + // we will be able to retrieve, for each aut_index, the doc_indexes + // that are needed to do the set operation + + let mut doc_indexes = Vec::new(); + let mut doc_indexes_slots = Vec::with_capacity(builders.len()); + for (aut_index, values) in builders { + + let sets = values.iter().map(|(_, v)| Set::new_unchecked(v.as_slice())).collect(); + let builder = SdOpBuilder::from_vec(sets); + + let start = doc_indexes.len(); + builder.$operation().extend_vec(&mut doc_indexes); + let len = doc_indexes.len() - start; + if len == 0 { continue } + + let slot = SlotIndexedDocIndexes { + aut_index: aut_index, + start: start, + len: len, + }; + doc_indexes_slots.push(slot); + } + + let read_only = VecReadOnly::new(doc_indexes); + self.outs.reserve(doc_indexes_slots.len()); + for slot in doc_indexes_slots { + let indexes = IndexedDocIndexes { + index: slot.aut_index, + doc_indexes: read_only.range(slot.start, slot.len), + }; + self.outs.push(indexes); + } + + if self.outs.is_empty() { return None } + Some((slot.input(), &self.outs)) + } +} +}} + +logical_operation!(struct Union, union); +logical_operation!(struct Intersection, intersection); +logical_operation!(struct Difference, difference); +logical_operation!(struct SymmetricDifference, symmetric_difference); + +struct StreamHeap<'f> { + rdrs: Vec>, + heap: BinaryHeap, +} + +impl<'f> StreamHeap<'f> { + fn new(streams: Vec>) -> StreamHeap<'f> { + let mut heap = StreamHeap { + rdrs: streams, + heap: BinaryHeap::new(), + }; + for i in 0..heap.rdrs.len() { + heap.refill(Slot::new(i)); + } + heap + } + + fn pop(&mut self) -> Option { + self.heap.pop() + } + + fn peek_is_duplicate(&self, key: &[u8]) -> bool { + self.heap.peek().map(|s| s.input() == key).unwrap_or(false) + } + + fn pop_if_equal(&mut self, key: &[u8]) -> Option { + if self.peek_is_duplicate(key) { + self.pop() + } else { + None + } + } + + fn pop_if_le(&mut self, key: &[u8]) -> Option { + if self.heap.peek().map(|s| s.input() <= key).unwrap_or(false) { + self.pop() + } else { + None + } + } + + fn num_slots(&self) -> usize { + self.rdrs.len() + } + + fn refill(&mut self, mut slot: Slot) { + if let Some((input, outputs)) = self.rdrs[slot.rdr_index].next() { + slot.set_input(input); + for output in outputs { + slot.set_aut_index(output.index); + slot.set_output(output.doc_indexes.clone()); + self.heap.push(slot.clone()); + } + } + } +} + +#[derive(Debug, Clone)] +struct Slot { + rdr_index: usize, + aut_index: usize, + input: Rc>, + output: Option>, +} + +impl PartialEq for Slot { + fn eq(&self, other: &Self) -> bool { + (&self.input, self.rdr_index, self.aut_index) + .eq(&(&other.input, other.rdr_index, other.aut_index)) + } +} + +impl Eq for Slot { } + +impl PartialOrd for Slot { + fn partial_cmp(&self, other: &Self) -> Option { + (&self.input, self.rdr_index, self.aut_index) + .partial_cmp(&(&other.input, other.rdr_index, other.aut_index)) + .map(|ord| ord.reverse()) + } +} + +impl Ord for Slot { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.partial_cmp(other).unwrap() + } +} + +impl Slot { + fn new(rdr_index: usize) -> Self { + Slot { + rdr_index: rdr_index, + aut_index: 0, + input: Rc::new(Vec::with_capacity(64)), + output: None, + } + } + + fn indexed_value(&mut self) -> IndexedDocIndexes { + IndexedDocIndexes { + index: self.aut_index, + doc_indexes: self.output.take().unwrap(), + } + } + + fn input(&self) -> &[u8] { + &self.input + } + + fn set_input(&mut self, input: &[u8]) { + if *self.input != input { + let inner = Rc::make_mut(&mut self.input); + inner.clear(); + inner.extend(input); + } + } + + fn set_aut_index(&mut self, aut_index: usize) { + self.aut_index = aut_index; + } + + fn set_output(&mut self, output: VecReadOnly) { + self.output = Some(output); + } +} + +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct IndexedValueWithState { + pub index: usize, + pub value: u64, +}