Directly write to LMDB without intermediate final MTBL

This commit is contained in:
Kerollmops 2020-06-01 21:09:32 +02:00
parent 2174042994
commit 1df1f88fe1
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -1,5 +1,5 @@
use std::collections::BTreeSet;
use std::convert::{TryInto, TryFrom};
use std::convert::TryFrom;
use std::fs::File;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
@ -15,7 +15,7 @@ use roaring::RoaringBitmap;
use structopt::StructOpt;
use mega_mini_indexer::alphanumeric_tokens;
use mega_mini_indexer::{FastMap4, SmallVec32, BEU32, Index, DocumentId};
use mega_mini_indexer::{FastMap4, SmallVec32, Index, DocumentId};
#[cfg(target_os = "linux")]
#[global_allocator]
@ -136,7 +136,9 @@ impl MtblKvStore {
}
}
fn from_many(stores: Vec<MtblKvStore>) -> anyhow::Result<MtblKvStore> {
fn from_many<F>(stores: Vec<MtblKvStore>, mut f: F) -> anyhow::Result<()>
where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()>
{
eprintln!("{:?}: Merging {} MTBL stores...", rayon::current_thread_index(), stores.len());
let mmaps: Vec<_> = stores.iter().flat_map(|m| {
@ -147,21 +149,16 @@ impl MtblKvStore {
Reader::new(&mmap, ReaderOptions::default()).unwrap()
}).collect();
let outfile = tempfile::tempfile()?;
let mut out = Writer::new(outfile, None)?;
let opt = MergerOptions { merge: MtblKvStore::merge };
let mut merger = Merger::new(sources, opt);
let mut iter = merger.iter();
while let Some((k, v)) = iter.next() {
out.add(k, v).unwrap();
(f)(k, v)?;
}
let out = out.into_inner()?;
eprintln!("{:?}: MTBL stores merged!", rayon::current_thread_index());
Ok(MtblKvStore(Some(out)))
Ok(())
}
}
@ -226,46 +223,32 @@ fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<MtblKvStore> {
}
// TODO merge with the previous values
fn writer(wtxn: &mut heed::RwTxn, index: Index, mtbl_store: MtblKvStore) -> anyhow::Result<usize> {
let mtbl_store = match mtbl_store.0 {
Some(store) => unsafe { memmap::Mmap::map(&store)? },
None => return Ok(0),
};
let mtbl_store = Reader::new(&mtbl_store, ReaderOptions::default()).unwrap();
fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> {
if key == b"\0words-fst" {
// Write the words fst
let fst = mtbl_store.get(b"\0words-fst").unwrap();
let fst = fst::Set::new(fst)?;
index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?;
// Write and merge the headers
let headers = mtbl_store.get(b"\0headers").unwrap();
index.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?;
// Write and merge the postings lists
let mut iter = mtbl_store.iter_prefix(&[1]).unwrap();
while let Some((word, postings)) = iter.next() {
let word = std::str::from_utf8(&word[1..]).unwrap();
index.postings_ids.put(wtxn, &word, &postings)?;
index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", val)?;
}
// Write and merge the prefix postings lists
let mut iter = mtbl_store.iter_prefix(&[2]).unwrap();
while let Some((word, postings)) = iter.next() {
let word = std::str::from_utf8(&word[1..]).unwrap();
index.prefix_postings_ids.put(wtxn, &word, &postings)?;
else if key == b"\0headers" {
// Write the headers
index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?;
}
else if key.starts_with(&[1]) {
// Write the postings lists
index.postings_ids.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[2]) {
// Write the prefix postings lists
index.prefix_postings_ids.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[3]) {
// Write the documents
let mut count = 0;
let mut iter = mtbl_store.iter_prefix(&[3]).unwrap();
while let Some((id_bytes, content)) = iter.next() {
let id = id_bytes[1..].try_into().map(u32::from_be_bytes).unwrap();
index.documents.put(wtxn, &BEU32::new(id), &content)?;
count += 1;
index.documents.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
Ok(count)
Ok(())
}
fn main() -> anyhow::Result<()> {
@ -291,18 +274,24 @@ fn main() -> anyhow::Result<()> {
})
.collect::<Result<_, _>>()?;
while stores.len() > 1 {
while stores.len() > 3 {
let chunk_size = (stores.len() / rayon::current_num_threads()).max(2);
let s = std::mem::take(&mut stores);
stores = s.into_par_iter().chunks(3)
.map(MtblKvStore::from_many)
stores = s.into_par_iter().chunks(chunk_size)
.map(|v| {
let outfile = tempfile::tempfile()?;
let mut out = Writer::new(outfile, None)?;
MtblKvStore::from_many(v, |k, v| Ok(out.add(k, v).unwrap()))?;
let out = out.into_inner()?;
Ok(MtblKvStore(Some(out))) as anyhow::Result<_>
})
.collect::<Result<_, _>>()?;
}
let mtbl_store = stores.pop().unwrap_or_default();
eprintln!("We are writing into LMDB...");
let mut wtxn = env.write_txn()?;
let count = writer(&mut wtxn, index, mtbl_store)?;
MtblKvStore::from_many(stores, |k, v| writer(&mut wtxn, &index, k, v))?;
let count = index.documents.len(&wtxn)?;
wtxn.commit()?;
eprintln!("Wrote {} documents into LMDB", count);