diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 98c279d85..bed058dfc 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,5 +1,5 @@ use std::collections::BTreeSet; -use std::convert::{TryInto, TryFrom}; +use std::convert::TryFrom; use std::fs::File; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -15,7 +15,7 @@ use roaring::RoaringBitmap; use structopt::StructOpt; use mega_mini_indexer::alphanumeric_tokens; -use mega_mini_indexer::{FastMap4, SmallVec32, BEU32, Index, DocumentId}; +use mega_mini_indexer::{FastMap4, SmallVec32, Index, DocumentId}; #[cfg(target_os = "linux")] #[global_allocator] @@ -136,7 +136,9 @@ impl MtblKvStore { } } - fn from_many(stores: Vec) -> anyhow::Result { + fn from_many(stores: Vec, mut f: F) -> anyhow::Result<()> + where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> + { eprintln!("{:?}: Merging {} MTBL stores...", rayon::current_thread_index(), stores.len()); let mmaps: Vec<_> = stores.iter().flat_map(|m| { @@ -147,21 +149,16 @@ impl MtblKvStore { Reader::new(&mmap, ReaderOptions::default()).unwrap() }).collect(); - let outfile = tempfile::tempfile()?; - let mut out = Writer::new(outfile, None)?; - let opt = MergerOptions { merge: MtblKvStore::merge }; let mut merger = Merger::new(sources, opt); let mut iter = merger.iter(); while let Some((k, v)) = iter.next() { - out.add(k, v).unwrap(); + (f)(k, v)?; } - let out = out.into_inner()?; - eprintln!("{:?}: MTBL stores merged!", rayon::current_thread_index()); - Ok(MtblKvStore(Some(out))) + Ok(()) } } @@ -226,46 +223,32 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { } // TODO merge with the previous values -fn writer(wtxn: &mut heed::RwTxn, index: Index, mtbl_store: MtblKvStore) -> anyhow::Result { - let mtbl_store = match mtbl_store.0 { - Some(store) => unsafe { memmap::Mmap::map(&store)? }, - None => return Ok(0), - }; - let mtbl_store = Reader::new(&mtbl_store, ReaderOptions::default()).unwrap(); - - // Write the words fst - let fst = mtbl_store.get(b"\0words-fst").unwrap(); - let fst = fst::Set::new(fst)?; - index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?; - - // Write and merge the headers - let headers = mtbl_store.get(b"\0headers").unwrap(); - index.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?; - - // Write and merge the postings lists - let mut iter = mtbl_store.iter_prefix(&[1]).unwrap(); - while let Some((word, postings)) = iter.next() { - let word = std::str::from_utf8(&word[1..]).unwrap(); - index.postings_ids.put(wtxn, &word, &postings)?; +fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> { + if key == b"\0words-fst" { + // Write the words fst + index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", val)?; + } + else if key == b"\0headers" { + // Write the headers + index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; + } + else if key.starts_with(&[1]) { + // Write the postings lists + index.postings_ids.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; + } + else if key.starts_with(&[2]) { + // Write the prefix postings lists + index.prefix_postings_ids.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; + } + else if key.starts_with(&[3]) { + // Write the documents + index.documents.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; } - // Write and merge the prefix postings lists - let mut iter = mtbl_store.iter_prefix(&[2]).unwrap(); - while let Some((word, postings)) = iter.next() { - let word = std::str::from_utf8(&word[1..]).unwrap(); - index.prefix_postings_ids.put(wtxn, &word, &postings)?; - } - - // Write the documents - let mut count = 0; - let mut iter = mtbl_store.iter_prefix(&[3]).unwrap(); - while let Some((id_bytes, content)) = iter.next() { - let id = id_bytes[1..].try_into().map(u32::from_be_bytes).unwrap(); - index.documents.put(wtxn, &BEU32::new(id), &content)?; - count += 1; - } - - Ok(count) + Ok(()) } fn main() -> anyhow::Result<()> { @@ -291,18 +274,24 @@ fn main() -> anyhow::Result<()> { }) .collect::>()?; - while stores.len() > 1 { + while stores.len() > 3 { + let chunk_size = (stores.len() / rayon::current_num_threads()).max(2); let s = std::mem::take(&mut stores); - stores = s.into_par_iter().chunks(3) - .map(MtblKvStore::from_many) + stores = s.into_par_iter().chunks(chunk_size) + .map(|v| { + let outfile = tempfile::tempfile()?; + let mut out = Writer::new(outfile, None)?; + MtblKvStore::from_many(v, |k, v| Ok(out.add(k, v).unwrap()))?; + let out = out.into_inner()?; + Ok(MtblKvStore(Some(out))) as anyhow::Result<_> + }) .collect::>()?; } - let mtbl_store = stores.pop().unwrap_or_default(); - eprintln!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?; - let count = writer(&mut wtxn, index, mtbl_store)?; + MtblKvStore::from_many(stores, |k, v| writer(&mut wtxn, &index, k, v))?; + let count = index.documents.len(&wtxn)?; wtxn.commit()?; eprintln!("Wrote {} documents into LMDB", count);