diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index c410e7e99..c88e014a7 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -48,6 +48,8 @@ struct MtblKvStore(Option); impl MtblKvStore { fn from_indexed(mut indexed: Indexed) -> anyhow::Result { + eprintln!("{:?}: Creating an MTBL store from an Indexed...", rayon::current_thread_index()); + let outfile = tempfile::tempfile()?; let mut out = Writer::new(outfile, None)?; @@ -73,10 +75,10 @@ impl MtblKvStore { // We must write the prefix postings ids key[0] = 2; let mut stream = indexed.fst.stream(); - while let Some(word) = stream.next() { + while let Some(prefix) = stream.next() { key.truncate(1); - key.extend_from_slice(word); - if let Some(ids) = indexed.prefix_postings_ids.remove(word) { + key.extend_from_slice(prefix); + if let Some(ids) = indexed.prefix_postings_ids.remove(prefix) { buffer.clear(); ids.serialize_into(&mut buffer)?; out.add(&key, &buffer).unwrap(); @@ -93,10 +95,14 @@ impl MtblKvStore { } let out = out.into_inner()?; + + eprintln!("{:?}: MTBL store created!", rayon::current_thread_index()); Ok(MtblKvStore(Some(out))) } fn merge_with(self, other: MtblKvStore) -> anyhow::Result { + eprintln!("{:?}: Merging two MTBL stores...", rayon::current_thread_index()); + let (left, right) = match (self.0, other.0) { (Some(left), Some(right)) => (left, right), (Some(left), None) => return Ok(MtblKvStore(Some(left))), @@ -159,11 +165,15 @@ impl MtblKvStore { } let out = out.into_inner()?; + + eprintln!("{:?}: MTBL stores merged!", rayon::current_thread_index()); Ok(MtblKvStore(Some(out))) } } fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { + eprintln!("{:?}: Indexing into an Indexed...", rayon::current_thread_index()); + const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; @@ -189,8 +199,8 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { postings_ids.entry(SmallVec32::from(word.as_bytes())) .or_insert_with(RoaringBitmap::new) .insert(document_id); - if let Some(prefix) = word.as_bytes().get(0..word.len().min(4)) { - for i in 0..prefix.len() { + if let Some(prefix) = word.as_bytes().get(0..word.len().min(5)) { + for i in 0..=prefix.len() { prefix_postings_ids.entry(SmallVec32::from(&prefix[..i])) .or_insert_with(RoaringBitmap::new) .insert(document_id); @@ -216,6 +226,7 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?; let indexed = Indexed { fst: new_words_fst, headers, postings_ids, prefix_postings_ids, documents }; + eprintln!("{:?}: Indexed created!", rayon::current_thread_index()); MtblKvStore::from_indexed(indexed) } @@ -274,19 +285,17 @@ fn main() -> anyhow::Result<()> { .open(opt.database)?; let index = Index::new(&env)?; - let res = opt.files_to_index + let mtbl_store = opt.files_to_index .into_par_iter() .try_fold(MtblKvStore::default, |acc, path| { let rdr = csv::Reader::from_path(path)?; - let mtbl_store = index_csv(rdr)?; - acc.merge_with(mtbl_store) + let store = index_csv(rdr)?; + acc.merge_with(store) }) .inspect(|_| { eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed)) }) - .try_reduce(MtblKvStore::default, MtblKvStore::merge_with); - - let mtbl_store = res?; + .try_reduce(MtblKvStore::default, MtblKvStore::merge_with)?; eprintln!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?; diff --git a/src/lib.rs b/src/lib.rs index f5de08980..c250d455f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,18 +79,21 @@ impl Index { let mut union_result = RoaringBitmap::default(); if word.len() <= 4 { - if let Some(ids) = self.prefix_postings_ids.get(rtxn, &word[..word.len().min(4)])? { + if let Some(ids) = self.prefix_postings_ids.get(rtxn, &word[..word.len().min(5)])? { union_result = RoaringBitmap::deserialize_from(ids)?; } } else { + let mut count = 0; let mut stream = fst.search(dfa).into_stream(); while let Some(word) = stream.next() { + count += 1; let word = std::str::from_utf8(word)?; if let Some(ids) = self.postings_ids.get(rtxn, word)? { let right = RoaringBitmap::deserialize_from(ids)?; union_result.union_with(&right); } } + eprint!("with {:?} words ", count); } eprintln!("union for {:?} took {:.02?}", word, before.elapsed()); @@ -99,14 +102,16 @@ impl Index { let before = Instant::now(); let left_len = left.len(); left.intersect_with(&union_result); - eprintln!("intersect between {:?} and {:?} took {:.02?}", - left_len, union_result.len(), before.elapsed()); + eprintln!("intersect between {:?} and {:?} gives {:?} took {:.02?}", + left_len, union_result.len(), left.len(), before.elapsed()); Some(left) }, None => Some(union_result), }; } + eprintln!("{} candidates", intersect_result.as_ref().map_or(0, |r| r.len())); + Ok(intersect_result.unwrap_or_default().iter().take(20).collect()) } }