diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index f4fd57533..2e5eb561d 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,5 +1,5 @@ use std::collections::hash_map::Entry; -use std::collections::{HashMap, BTreeSet, BTreeMap}; +use std::collections::{HashMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; use std::fs::File; use std::path::PathBuf; @@ -11,11 +11,12 @@ use fst::{Streamer, IntoStreamer}; use heed::EnvOpenOptions; use heed::types::*; use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; +use rayon::prelude::*; use roaring::RoaringBitmap; use slice_group_by::StrGroupBy; use structopt::StructOpt; -use mega_mini_indexer::{FastMap4, SmallVec32, Index, DocumentId, AttributeId}; +use mega_mini_indexer::{FastMap4, SmallVec32, Index, DocumentId, Position}; const LMDB_MAX_KEY_LENGTH: usize = 512; const ONE_MILLION: usize = 1_000_000; @@ -50,8 +51,8 @@ struct Opt { struct Indexed { fst: fst::Set>, - postings_attrs: FastMap4, RoaringBitmap>, - postings_ids: FastMap4, FastMap4>, + word_positions: FastMap4, RoaringBitmap>, + word_position_docids: FastMap4<(SmallVec32, Position), RoaringBitmap>, headers: Vec, documents: Vec<(DocumentId, Vec)>, } @@ -79,12 +80,12 @@ impl MtblKvStore { // we iterate over the fst to read the words in order let mut stream = indexed.fst.stream(); while let Some(word) = stream.next() { - if let Some(attrs) = indexed.postings_attrs.remove(word) { + if let Some(positions) = indexed.word_positions.get(word) { key.truncate(1); key.extend_from_slice(word); - // We serialize the attrs ids into a buffer + // We serialize the positions into a buffer buffer.clear(); - attrs.serialize_into(&mut buffer)?; + positions.serialize_into(&mut buffer)?; // that we write under the generated key into MTBL out.add(&key, &buffer).unwrap(); } @@ -98,26 +99,27 @@ impl MtblKvStore { while let Some(word) = stream.next() { key.truncate(1); key.extend_from_slice(word); - if let Some(attrs) = indexed.postings_ids.remove(word) { - let attrs: BTreeMap<_, _> = attrs.into_iter().collect(); + if let Some(positions) = indexed.word_positions.remove(word) { // We iterate over all the attributes containing the documents ids - for (attr, ids) in attrs { - // we postfix the word by the attribute id - key.extend_from_slice(&attr.to_be_bytes()); + for pos in positions { + let ids = indexed.word_position_docids.remove(&(SmallVec32::from(word), pos)).unwrap(); + // we postfix the word by the positions it appears in + let position_bytes = pos.to_be_bytes(); + key.extend_from_slice(&position_bytes); // We serialize the document ids into a buffer buffer.clear(); ids.serialize_into(&mut buffer)?; // that we write under the generated key into MTBL out.add(&key, &buffer).unwrap(); - // And cleanup the attribute id afterward (u32 = 4 * u8) - key.truncate(key.len() - 4); + // And cleanup the position afterward + key.truncate(key.len() - position_bytes.len()); } } } - // postings ids keys are all prefixed by a '4' + // postings ids keys are all prefixed key[0] = 5; - indexed.documents.sort_unstable(); + indexed.documents.sort_unstable_by_key(|(id, _)| *id); for (id, content) in indexed.documents { key.truncate(1); key.extend_from_slice(&id.to_be_bytes()); @@ -204,8 +206,8 @@ fn index_csv( eprintln!("{:?}: Indexing into an Indexed...", thread_index); let mut document = csv::StringRecord::new(); - let mut postings_attrs = FastMap4::default(); - let mut postings_ids = FastMap4::default(); + let mut word_positions = FastMap4::default(); + let mut word_position_docids = FastMap4::default(); let mut documents = Vec::new(); // Write the headers into a Vec of bytes. @@ -234,12 +236,11 @@ fn index_csv( let position = (attr * MAX_POSITION + pos) as u32; // We save the positions where this word has been seen. - postings_attrs.entry(SmallVec32::from(word.as_bytes())) + word_positions.entry(SmallVec32::from(word.as_bytes())) .or_insert_with(RoaringBitmap::new).insert(position); // We save the documents ids under the position and word we have seen it. - postings_ids.entry(SmallVec32::from(word.as_bytes())) - .or_insert_with(FastMap4::default).entry(position) // positions + word_position_docids.entry((SmallVec32::from(word.as_bytes()), position)) // word + position .or_insert_with(RoaringBitmap::new).insert(document_id); // document ids } } @@ -253,14 +254,10 @@ fn index_csv( } // We store the words from the postings. - let mut new_words = BTreeSet::default(); - for (word, _new_ids) in &postings_ids { - new_words.insert(word.clone()); - } + let new_words: BTreeSet<_> = word_position_docids.iter().map(|((w, _), _)| w).collect(); + let fst = fst::Set::from_iter(new_words)?; - let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?; - - let indexed = Indexed { fst: new_words_fst, headers, postings_attrs, postings_ids, documents }; + let indexed = Indexed { fst, headers, word_positions, word_position_docids, documents }; eprintln!("{:?}: Indexed created!", thread_index); MtblKvStore::from_indexed(indexed).map(|x| vec![x]) @@ -371,7 +368,7 @@ fn main() -> anyhow::Result<()> { let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::>()?; let stores: Vec<_> = csv_readers - .into_iter() + .into_par_iter() .enumerate() .map(|(i, rdr)| index_csv(rdr, i, num_threads)) .collect::>()?; diff --git a/src/lib.rs b/src/lib.rs index 43451e3b7..42f63de51 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ pub type SmallVec16 = smallvec::SmallVec<[T; 16]>; pub type BEU32 = heed::zerocopy::U32; pub type DocumentId = u32; pub type AttributeId = u32; +pub type Position = u32; #[derive(Clone)] pub struct Index {