diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index a7d2c01f1..17689823c 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -7,15 +7,15 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use anyhow::Context; use cow_utils::CowUtils; use fst::{Streamer, IntoStreamer}; +use heed::EnvOpenOptions; use heed::types::*; -use heed::{EnvOpenOptions, PolyDatabase, Database}; use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; use rayon::prelude::*; use roaring::RoaringBitmap; use structopt::StructOpt; use mega_mini_indexer::alphanumeric_tokens; -use mega_mini_indexer::{FastMap4, SmallVec32, BEU32, DocumentId}; +use mega_mini_indexer::{FastMap4, SmallVec32, BEU32, Index, DocumentId}; #[cfg(target_os = "linux")] #[global_allocator] @@ -38,6 +38,7 @@ struct Opt { struct Indexed { fst: fst::Set>, postings_ids: FastMap4, + prefix_postings_ids: FastMap4, headers: Vec, documents: Vec<(DocumentId, Vec)>, } @@ -69,8 +70,21 @@ impl MtblKvStore { } } - // postings ids keys are all prefixed by a '2' + // We must write the prefix postings ids key[0] = 2; + let mut stream = indexed.fst.stream(); + while let Some(word) = stream.next() { + key.truncate(1); + key.extend_from_slice(word); + if let Some(ids) = indexed.prefix_postings_ids.remove(word) { + buffer.clear(); + ids.serialize_into(&mut buffer)?; + out.add(&key, &buffer).unwrap(); + } + } + + // postings ids keys are all prefixed by a '2' + key[0] = 3; indexed.documents.sort_unstable(); for (id, content) in indexed.documents { key.truncate(1); @@ -115,7 +129,7 @@ impl MtblKvStore { assert_eq!(left, right); Some(left.to_vec()) } - else if key.starts_with(&[1]) { + else if key.starts_with(&[1]) || key.starts_with(&[2]) { let mut left = RoaringBitmap::deserialize_from(left).unwrap(); let right = RoaringBitmap::deserialize_from(right).unwrap(); left.union_with(&right); @@ -123,7 +137,7 @@ impl MtblKvStore { left.serialize_into(&mut vec).unwrap(); Some(vec) } - else if key.starts_with(&[2]) { + else if key.starts_with(&[3]) { assert_eq!(left, right); Some(left.to_vec()) } @@ -155,6 +169,7 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { let mut document = csv::StringRecord::new(); let mut postings_ids = FastMap4::default(); + let mut prefix_postings_ids = FastMap4::default(); let mut documents = Vec::new(); // Write the headers into a Vec of bytes. @@ -174,6 +189,11 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { postings_ids.entry(SmallVec32::from(word.as_bytes())) .or_insert_with(RoaringBitmap::new) .insert(document_id); + if let Some(prefix) = word.as_bytes().get(0..word.len().min(4)) { + prefix_postings_ids.entry(SmallVec32::from(prefix)) + .or_insert_with(RoaringBitmap::new) + .insert(document_id); + } } } } @@ -185,7 +205,7 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { documents.push((document_id, document)); } - // We compute and store the postings list into the DB. + // We store the words from the postings. let mut new_words = BTreeSet::default(); for (word, _new_ids) in &postings_ids { new_words.insert(word.clone()); @@ -193,20 +213,13 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?; - let indexed = Indexed { fst: new_words_fst, headers, postings_ids, documents }; + let indexed = Indexed { fst: new_words_fst, headers, postings_ids, prefix_postings_ids, documents }; MtblKvStore::from_indexed(indexed) } // TODO merge with the previous values -fn writer( - wtxn: &mut heed::RwTxn, - main: PolyDatabase, - postings_ids: Database, - documents: Database, ByteSlice>, - mtbl_store: MtblKvStore, -) -> anyhow::Result -{ +fn writer(wtxn: &mut heed::RwTxn, index: Index, mtbl_store: MtblKvStore) -> anyhow::Result { let mtbl_store = match mtbl_store.0 { Some(store) => unsafe { memmap::Mmap::map(&store)? }, None => return Ok(0), @@ -216,25 +229,32 @@ fn writer( // Write the words fst let fst = mtbl_store.get(b"\0words-fst").unwrap(); let fst = fst::Set::new(fst)?; - main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?; + index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?; // Write and merge the headers let headers = mtbl_store.get(b"\0headers").unwrap(); - main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?; + index.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?; // Write and merge the postings lists let mut iter = mtbl_store.iter_prefix(&[1]).unwrap(); while let Some((word, postings)) = iter.next() { let word = std::str::from_utf8(&word[1..]).unwrap(); - postings_ids.put(wtxn, &word, &postings)?; + index.postings_ids.put(wtxn, &word, &postings)?; + } + + // Write and merge the prefix postings lists + let mut iter = mtbl_store.iter_prefix(&[2]).unwrap(); + while let Some((word, postings)) = iter.next() { + let word = std::str::from_utf8(&word[1..]).unwrap(); + index.prefix_postings_ids.put(wtxn, &word, &postings)?; } // Write the documents let mut count = 0; - let mut iter = mtbl_store.iter_prefix(&[2]).unwrap(); + let mut iter = mtbl_store.iter_prefix(&[3]).unwrap(); while let Some((id_bytes, content)) = iter.next() { let id = id_bytes[1..].try_into().map(u32::from_be_bytes).unwrap(); - documents.put(wtxn, &BEU32::new(id), &content)?; + index.documents.put(wtxn, &BEU32::new(id), &content)?; count += 1; } @@ -251,10 +271,7 @@ fn main() -> anyhow::Result<()> { .max_dbs(5) .open(opt.database)?; - let main = env.create_poly_database(None)?; - let postings_ids: Database = env.create_database(Some("postings-ids"))?; - let documents: Database, ByteSlice> = env.create_database(Some("documents"))?; - + let index = Index::new(&env)?; let res = opt.files_to_index .into_par_iter() .try_fold(MtblKvStore::default, |acc, path| { @@ -271,7 +288,7 @@ fn main() -> anyhow::Result<()> { eprintln!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?; - let count = writer(&mut wtxn, main, postings_ids, documents, mtbl_store)?; + let count = writer(&mut wtxn, index, mtbl_store)?; wtxn.commit()?; eprintln!("Wrote {} documents into LMDB", count); diff --git a/src/lib.rs b/src/lib.rs index 01bc92df9..248e360cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ pub fn alphanumeric_tokens(string: &str) -> impl Iterator { pub struct Index { pub main: PolyDatabase, pub postings_ids: Database, + pub prefix_postings_ids: Database, pub documents: Database, ByteSlice>, } @@ -33,11 +34,13 @@ impl Index { pub fn new(env: &heed::Env) -> heed::Result { let main = env.create_poly_database(None)?; let postings_ids = env.create_database(Some("postings-ids"))?; + let prefix_postings_ids = env.create_database(Some("prefix-postings-ids"))?; let documents = env.create_database(Some("documents"))?; Ok(Index { main, postings_ids, + prefix_postings_ids, documents, }) } @@ -73,16 +76,23 @@ impl Index { let mut intersect_result: Option = None; for (word, dfa) in dfas { let before = Instant::now(); + let mut union_result = RoaringBitmap::default(); - let mut stream = fst.search(dfa).into_stream(); - while let Some(word) = stream.next() { - let word = std::str::from_utf8(word)?; - if let Some(ids) = self.postings_ids.get(rtxn, word)? { - let right = RoaringBitmap::deserialize_from(ids)?; - union_result.union_with(&right); + if word.len() <= 4 { + if let Some(ids) = self.prefix_postings_ids.get(rtxn, &word[..word.len().min(4)])? { + union_result = RoaringBitmap::deserialize_from(ids)?; } + } else { + let mut stream = fst.search(dfa).into_stream(); + while let Some(word) = stream.next() { + let word = std::str::from_utf8(word)?; + if let Some(ids) = self.postings_ids.get(rtxn, word)? { + let right = RoaringBitmap::deserialize_from(ids)?; + union_result.union_with(&right); + } + } + eprintln!("union for {:?} took {:.02?}", word, before.elapsed()); } - eprintln!("union for {:?} took {:.02?}", word, before.elapsed()); intersect_result = match intersect_result.take() { Some(mut left) => {