diff --git a/Cargo.lock b/Cargo.lock index 4ad582fa5..b6f0e8c27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1483,7 +1483,7 @@ dependencies = [ [[package]] name = "roaring" version = "0.6.0" -source = "git+https://github.com/Kerollmops/roaring-rs.git?branch=deserialize-from-slice#24420bb9f980749476cec860ea8dd3c1683c0cd1" +source = "git+https://github.com/Kerollmops/roaring-rs.git?branch=mem-usage#a71692552902019751ef5b0e57336f030045a76a" dependencies = [ "byteorder", ] diff --git a/Cargo.toml b/Cargo.toml index db126a2d1..dea1d6086 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ memmap = "0.7.0" once_cell = "1.4.0" oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9451be8" } rayon = "1.3.1" -roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "deserialize-from-slice" } +roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" } slice-group-by = "0.2.6" smallstr = "0.2.0" smallvec = "1.4.0" diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 2e5eb561d..dffa75255 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -2,6 +2,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; use std::fs::File; +use std::mem; use std::path::PathBuf; use std::time::Instant; @@ -45,6 +46,14 @@ struct Opt { #[structopt(short, long)] jobs: Option, + /// Maximum number of bytes to allocate, will be divided by the number of + /// cores used. It is recommended to set a maximum of half of the available memory + /// as the current measurement method is really bad. + /// + /// The minumum amount of memory used will be 50MB anyway. + #[structopt(long, default_value = "4294967296")] + max_memory_usage: usize, + /// CSV file to index, if unspecified the CSV is read from standard input. csv_file: Option, } @@ -57,6 +66,21 @@ struct Indexed { documents: Vec<(DocumentId, Vec)>, } +impl Indexed { + fn new( + word_positions: FastMap4, RoaringBitmap>, + word_position_docids: FastMap4<(SmallVec32, Position), RoaringBitmap>, + headers: Vec, + documents: Vec<(DocumentId, Vec)>, + ) -> anyhow::Result + { + // We store the words from the postings. + let new_words: BTreeSet<_> = word_position_docids.iter().map(|((w, _), _)| w).collect(); + let fst = fst::Set::from_iter(new_words)?; + Ok(Indexed { fst, headers, word_positions, word_position_docids, documents }) + } +} + #[derive(Default)] struct MtblKvStore(Option); @@ -175,6 +199,7 @@ impl MtblKvStore { where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> { eprintln!("Merging {} MTBL stores...", stores.len()); + let before = Instant::now(); let mmaps: Vec<_> = stores.iter().flat_map(|m| { m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() }) @@ -192,20 +217,49 @@ impl MtblKvStore { (f)(k, v)?; } - eprintln!("MTBL stores merged!"); + eprintln!("MTBL stores merged in {:.02?}!", before.elapsed()); Ok(()) } } +fn mem_usage( + word_positions: &FastMap4, RoaringBitmap>, + word_position_docids: &FastMap4<(SmallVec32, Position), RoaringBitmap>, + documents: &Vec<(u32, Vec)>, +) -> usize +{ + use std::mem::size_of; + + let documents = + documents.iter().map(|(_, d)| d.capacity()).sum::() + + documents.capacity() * size_of::<(Position, Vec)>(); + + let word_positions = + word_positions.iter().map(|(k, r)| { + (if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage() + }).sum::() + + word_positions.capacity() * size_of::<(SmallVec32, RoaringBitmap)>(); + + let word_position_docids = + word_position_docids.iter().map(|((k, _), r)| { + (if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage() + }).sum::() + + word_position_docids.capacity() * size_of::<((SmallVec32, Position), RoaringBitmap)>(); + + documents + word_positions + word_position_docids +} + fn index_csv( mut rdr: csv::Reader, thread_index: usize, num_threads: usize, + max_mem_usage: usize, ) -> anyhow::Result> { eprintln!("{:?}: Indexing into an Indexed...", thread_index); - let mut document = csv::StringRecord::new(); + let mut stores = Vec::new(); + let mut word_positions = FastMap4::default(); let mut word_position_docids = FastMap4::default(); let mut documents = Vec::new(); @@ -217,6 +271,7 @@ fn index_csv( let headers = writer.into_inner()?; let mut document_id: usize = 0; + let mut document = csv::StringRecord::new(); while rdr.read_record(&mut document)? { document_id = document_id + 1; @@ -251,16 +306,28 @@ fn index_csv( writer.write_byte_record(document.as_byte_record())?; let document = writer.into_inner()?; documents.push((document_id, document)); + + if documents.len() % 100_000 == 0 { + let usage = mem_usage(&word_positions, &word_position_docids, &documents); + if usage > max_mem_usage { + eprintln!("Whoops too much memory used ({}B).", usage); + + let word_positions = mem::take(&mut word_positions); + let word_position_docids = mem::take(&mut word_position_docids); + let documents = mem::take(&mut documents); + + let indexed = Indexed::new(word_positions, word_position_docids, headers.clone(), documents)?; + eprintln!("{:?}: Indexed created!", thread_index); + stores.push(MtblKvStore::from_indexed(indexed)?); + } + } } - // We store the words from the postings. - let new_words: BTreeSet<_> = word_position_docids.iter().map(|((w, _), _)| w).collect(); - let fst = fst::Set::from_iter(new_words)?; - - let indexed = Indexed { fst, headers, word_positions, word_position_docids, documents }; + let indexed = Indexed::new(word_positions, word_position_docids, headers, documents)?; eprintln!("{:?}: Indexed created!", thread_index); + stores.push(MtblKvStore::from_indexed(indexed)?); - MtblKvStore::from_indexed(indexed).map(|x| vec![x]) + Ok(stores) } // TODO merge with the previous values @@ -362,15 +429,17 @@ fn main() -> anyhow::Result<()> { let index = Index::new(&env)?; - // We duplicate the file # CPU times let num_threads = rayon::current_num_threads(); + let max_memory_usage = (opt.max_memory_usage / num_threads).max(50 * 1024 * 1024); // 50MB + + // We duplicate the file # jobs times. let file = opt.csv_file.unwrap(); let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::>()?; let stores: Vec<_> = csv_readers .into_par_iter() .enumerate() - .map(|(i, rdr)| index_csv(rdr, i, num_threads)) + .map(|(i, rdr)| index_csv(rdr, i, num_threads, max_memory_usage)) .collect::>()?; let stores: Vec<_> = stores.into_iter().flatten().collect();