diff --git a/Cargo.lock b/Cargo.lock index 576db9e62..f5a2fde2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,6 +323,16 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.7.3" @@ -987,6 +997,7 @@ dependencies = [ "memmap", "once_cell", "oxidized-mtbl", + "pipe", "rayon", "roaring", "serde", @@ -1303,6 +1314,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pipe" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcd11e042e056991b5df9c0c5ae6bd0cce219b74294c40f65b89f40f7030106c" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "pkg-config" version = "0.3.17" diff --git a/Cargo.toml b/Cargo.toml index d7961a46c..00a851df8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ default-run = "indexer" [dependencies] anyhow = "1.0.28" +arc-cache = { git = "https://github.com/Kerollmops/rust-arc-cache.git", rev = "56530f2" } bitpacking = "0.8.2" byteorder = "1.3.4" cow-utils = "0.1.2" @@ -19,7 +20,7 @@ levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } memmap = "0.7.0" once_cell = "1.4.0" oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "6b8a3a8" } -arc-cache = { git = "https://github.com/Kerollmops/rust-arc-cache.git", rev = "56530f2" } +pipe = "0.3.0" rayon = "1.3.1" roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" } slice-group-by = "0.2.6" diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 8cd814a3c..40163df36 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,7 +1,9 @@ use std::convert::TryFrom; use std::fs::File; +use std::io::{self, Read, Write}; use std::iter::FromIterator; use std::path::PathBuf; +use std::thread; use std::time::Instant; use anyhow::Context; @@ -71,6 +73,7 @@ struct Opt { verbose: usize, /// CSV file to index, if unspecified the CSV is read from standard input. + /// Note that it is much faster to index from a file. csv_file: Option, } @@ -346,7 +349,7 @@ where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> } fn index_csv( - mut rdr: csv::Reader, + mut rdr: csv::Reader>, thread_index: usize, num_threads: usize, arc_cache_size: Option, @@ -429,11 +432,43 @@ fn main() -> anyhow::Result<()> { let max_nb_chunks = opt.max_nb_chunks; let max_memory = opt.max_memory; - // We duplicate the file # jobs times. - let file = opt.csv_file.unwrap(); - let csv_readers = (0..num_threads) - .map(|_| csv::Reader::from_path(&file)) - .collect::, _>>()?; + let csv_readers = match opt.csv_file { + Some(file_path) => { + // We open the file # jobs times. + (0..num_threads) + .map(|_| { + let file = File::open(&file_path)?; + let r = Box::new(file) as Box; + Ok(csv::Reader::from_reader(r)) as io::Result<_> + }) + .collect::, _>>()? + }, + None => { + let mut csv_readers = Vec::new(); + let mut writers = Vec::new(); + for (r, w) in (0..num_threads).map(|_| pipe::pipe()) { + let r = Box::new(r) as Box; + csv_readers.push(csv::Reader::from_reader(r)); + writers.push(w); + } + + thread::spawn(move || { + let stdin = std::io::stdin(); + let mut stdin = stdin.lock(); + let mut buffer = [0u8; 4096]; + loop { + match stdin.read(&mut buffer)? { + 0 => return Ok(()) as io::Result<()>, + size => for w in &mut writers { + w.write_all(&buffer[..size])?; + } + } + } + }); + + csv_readers + }, + }; let stores = csv_readers .into_par_iter()