Implement a memory dumper

It moves the in memory HashMaps used when indexing to a disk based MTBL file
This commit is contained in:
Kerollmops 2020-07-07 16:48:49 +02:00
parent b12bfcb03b
commit 11c7fef80a
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 81 additions and 12 deletions

2
Cargo.lock generated
View File

@ -1483,7 +1483,7 @@ dependencies = [
[[package]] [[package]]
name = "roaring" name = "roaring"
version = "0.6.0" version = "0.6.0"
source = "git+https://github.com/Kerollmops/roaring-rs.git?branch=deserialize-from-slice#24420bb9f980749476cec860ea8dd3c1683c0cd1" source = "git+https://github.com/Kerollmops/roaring-rs.git?branch=mem-usage#a71692552902019751ef5b0e57336f030045a76a"
dependencies = [ dependencies = [
"byteorder", "byteorder",
] ]

View File

@ -20,7 +20,7 @@ memmap = "0.7.0"
once_cell = "1.4.0" once_cell = "1.4.0"
oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9451be8" } oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9451be8" }
rayon = "1.3.1" rayon = "1.3.1"
roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "deserialize-from-slice" } roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" }
slice-group-by = "0.2.6" slice-group-by = "0.2.6"
smallstr = "0.2.0" smallstr = "0.2.0"
smallvec = "1.4.0" smallvec = "1.4.0"

View File

@ -2,6 +2,7 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, BTreeSet}; use std::collections::{HashMap, BTreeSet};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::fs::File; use std::fs::File;
use std::mem;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Instant; use std::time::Instant;
@ -45,6 +46,14 @@ struct Opt {
#[structopt(short, long)] #[structopt(short, long)]
jobs: Option<usize>, jobs: Option<usize>,
/// Maximum number of bytes to allocate, will be divided by the number of
/// cores used. It is recommended to set a maximum of half of the available memory
/// as the current measurement method is really bad.
///
/// The minumum amount of memory used will be 50MB anyway.
#[structopt(long, default_value = "4294967296")]
max_memory_usage: usize,
/// CSV file to index, if unspecified the CSV is read from standard input. /// CSV file to index, if unspecified the CSV is read from standard input.
csv_file: Option<PathBuf>, csv_file: Option<PathBuf>,
} }
@ -57,6 +66,21 @@ struct Indexed {
documents: Vec<(DocumentId, Vec<u8>)>, documents: Vec<(DocumentId, Vec<u8>)>,
} }
impl Indexed {
fn new(
word_positions: FastMap4<SmallVec32<u8>, RoaringBitmap>,
word_position_docids: FastMap4<(SmallVec32<u8>, Position), RoaringBitmap>,
headers: Vec<u8>,
documents: Vec<(DocumentId, Vec<u8>)>,
) -> anyhow::Result<Indexed>
{
// We store the words from the postings.
let new_words: BTreeSet<_> = word_position_docids.iter().map(|((w, _), _)| w).collect();
let fst = fst::Set::from_iter(new_words)?;
Ok(Indexed { fst, headers, word_positions, word_position_docids, documents })
}
}
#[derive(Default)] #[derive(Default)]
struct MtblKvStore(Option<File>); struct MtblKvStore(Option<File>);
@ -175,6 +199,7 @@ impl MtblKvStore {
where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()>
{ {
eprintln!("Merging {} MTBL stores...", stores.len()); eprintln!("Merging {} MTBL stores...", stores.len());
let before = Instant::now();
let mmaps: Vec<_> = stores.iter().flat_map(|m| { let mmaps: Vec<_> = stores.iter().flat_map(|m| {
m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() }) m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() })
@ -192,20 +217,49 @@ impl MtblKvStore {
(f)(k, v)?; (f)(k, v)?;
} }
eprintln!("MTBL stores merged!"); eprintln!("MTBL stores merged in {:.02?}!", before.elapsed());
Ok(()) Ok(())
} }
} }
fn mem_usage(
word_positions: &FastMap4<SmallVec32<u8>, RoaringBitmap>,
word_position_docids: &FastMap4<(SmallVec32<u8>, Position), RoaringBitmap>,
documents: &Vec<(u32, Vec<u8>)>,
) -> usize
{
use std::mem::size_of;
let documents =
documents.iter().map(|(_, d)| d.capacity()).sum::<usize>()
+ documents.capacity() * size_of::<(Position, Vec<u8>)>();
let word_positions =
word_positions.iter().map(|(k, r)| {
(if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage()
}).sum::<usize>()
+ word_positions.capacity() * size_of::<(SmallVec32<u8>, RoaringBitmap)>();
let word_position_docids =
word_position_docids.iter().map(|((k, _), r)| {
(if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage()
}).sum::<usize>()
+ word_position_docids.capacity() * size_of::<((SmallVec32<u8>, Position), RoaringBitmap)>();
documents + word_positions + word_position_docids
}
fn index_csv( fn index_csv(
mut rdr: csv::Reader<File>, mut rdr: csv::Reader<File>,
thread_index: usize, thread_index: usize,
num_threads: usize, num_threads: usize,
max_mem_usage: usize,
) -> anyhow::Result<Vec<MtblKvStore>> ) -> anyhow::Result<Vec<MtblKvStore>>
{ {
eprintln!("{:?}: Indexing into an Indexed...", thread_index); eprintln!("{:?}: Indexing into an Indexed...", thread_index);
let mut document = csv::StringRecord::new(); let mut stores = Vec::new();
let mut word_positions = FastMap4::default(); let mut word_positions = FastMap4::default();
let mut word_position_docids = FastMap4::default(); let mut word_position_docids = FastMap4::default();
let mut documents = Vec::new(); let mut documents = Vec::new();
@ -217,6 +271,7 @@ fn index_csv(
let headers = writer.into_inner()?; let headers = writer.into_inner()?;
let mut document_id: usize = 0; let mut document_id: usize = 0;
let mut document = csv::StringRecord::new();
while rdr.read_record(&mut document)? { while rdr.read_record(&mut document)? {
document_id = document_id + 1; document_id = document_id + 1;
@ -251,16 +306,28 @@ fn index_csv(
writer.write_byte_record(document.as_byte_record())?; writer.write_byte_record(document.as_byte_record())?;
let document = writer.into_inner()?; let document = writer.into_inner()?;
documents.push((document_id, document)); documents.push((document_id, document));
if documents.len() % 100_000 == 0 {
let usage = mem_usage(&word_positions, &word_position_docids, &documents);
if usage > max_mem_usage {
eprintln!("Whoops too much memory used ({}B).", usage);
let word_positions = mem::take(&mut word_positions);
let word_position_docids = mem::take(&mut word_position_docids);
let documents = mem::take(&mut documents);
let indexed = Indexed::new(word_positions, word_position_docids, headers.clone(), documents)?;
eprintln!("{:?}: Indexed created!", thread_index);
stores.push(MtblKvStore::from_indexed(indexed)?);
}
}
} }
// We store the words from the postings. let indexed = Indexed::new(word_positions, word_position_docids, headers, documents)?;
let new_words: BTreeSet<_> = word_position_docids.iter().map(|((w, _), _)| w).collect();
let fst = fst::Set::from_iter(new_words)?;
let indexed = Indexed { fst, headers, word_positions, word_position_docids, documents };
eprintln!("{:?}: Indexed created!", thread_index); eprintln!("{:?}: Indexed created!", thread_index);
stores.push(MtblKvStore::from_indexed(indexed)?);
MtblKvStore::from_indexed(indexed).map(|x| vec![x]) Ok(stores)
} }
// TODO merge with the previous values // TODO merge with the previous values
@ -362,15 +429,17 @@ fn main() -> anyhow::Result<()> {
let index = Index::new(&env)?; let index = Index::new(&env)?;
// We duplicate the file # CPU times
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();
let max_memory_usage = (opt.max_memory_usage / num_threads).max(50 * 1024 * 1024); // 50MB
// We duplicate the file # jobs times.
let file = opt.csv_file.unwrap(); let file = opt.csv_file.unwrap();
let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::<Result<_, _>>()?; let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::<Result<_, _>>()?;
let stores: Vec<_> = csv_readers let stores: Vec<_> = csv_readers
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
.map(|(i, rdr)| index_csv(rdr, i, num_threads)) .map(|(i, rdr)| index_csv(rdr, i, num_threads, max_memory_usage))
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
let stores: Vec<_> = stores.into_iter().flatten().collect(); let stores: Vec<_> = stores.into_iter().flatten().collect();