Accept csv from stdin

This commit is contained in:
Clément Renault 2020-08-06 12:38:42 +02:00
parent d3b1096510
commit 405a71d3a4
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 63 additions and 7 deletions

20
Cargo.lock generated
View File

@ -323,6 +323,16 @@ dependencies = [
"itertools", "itertools",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.7.3" version = "0.7.3"
@ -987,6 +997,7 @@ dependencies = [
"memmap", "memmap",
"once_cell", "once_cell",
"oxidized-mtbl", "oxidized-mtbl",
"pipe",
"rayon", "rayon",
"roaring", "roaring",
"serde", "serde",
@ -1303,6 +1314,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipe"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcd11e042e056991b5df9c0c5ae6bd0cce219b74294c40f65b89f40f7030106c"
dependencies = [
"crossbeam-channel",
]
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.17" version = "0.3.17"

View File

@ -7,6 +7,7 @@ default-run = "indexer"
[dependencies] [dependencies]
anyhow = "1.0.28" anyhow = "1.0.28"
arc-cache = { git = "https://github.com/Kerollmops/rust-arc-cache.git", rev = "56530f2" }
bitpacking = "0.8.2" bitpacking = "0.8.2"
byteorder = "1.3.4" byteorder = "1.3.4"
cow-utils = "0.1.2" cow-utils = "0.1.2"
@ -19,7 +20,7 @@ levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] }
memmap = "0.7.0" memmap = "0.7.0"
once_cell = "1.4.0" once_cell = "1.4.0"
oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "6b8a3a8" } oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "6b8a3a8" }
arc-cache = { git = "https://github.com/Kerollmops/rust-arc-cache.git", rev = "56530f2" } pipe = "0.3.0"
rayon = "1.3.1" rayon = "1.3.1"
roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" } roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" }
slice-group-by = "0.2.6" slice-group-by = "0.2.6"

View File

@ -1,7 +1,9 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fs::File; use std::fs::File;
use std::io::{self, Read, Write};
use std::iter::FromIterator; use std::iter::FromIterator;
use std::path::PathBuf; use std::path::PathBuf;
use std::thread;
use std::time::Instant; use std::time::Instant;
use anyhow::Context; use anyhow::Context;
@ -71,6 +73,7 @@ struct Opt {
verbose: usize, verbose: usize,
/// CSV file to index, if unspecified the CSV is read from standard input. /// CSV file to index, if unspecified the CSV is read from standard input.
/// Note that it is much faster to index from a file.
csv_file: Option<PathBuf>, csv_file: Option<PathBuf>,
} }
@ -346,7 +349,7 @@ where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()>
} }
fn index_csv( fn index_csv(
mut rdr: csv::Reader<File>, mut rdr: csv::Reader<Box<dyn Read + Send>>,
thread_index: usize, thread_index: usize,
num_threads: usize, num_threads: usize,
arc_cache_size: Option<usize>, arc_cache_size: Option<usize>,
@ -429,11 +432,43 @@ fn main() -> anyhow::Result<()> {
let max_nb_chunks = opt.max_nb_chunks; let max_nb_chunks = opt.max_nb_chunks;
let max_memory = opt.max_memory; let max_memory = opt.max_memory;
// We duplicate the file # jobs times. let csv_readers = match opt.csv_file {
let file = opt.csv_file.unwrap(); Some(file_path) => {
let csv_readers = (0..num_threads) // We open the file # jobs times.
.map(|_| csv::Reader::from_path(&file)) (0..num_threads)
.collect::<Result<Vec<_>, _>>()?; .map(|_| {
let file = File::open(&file_path)?;
let r = Box::new(file) as Box<dyn Read + Send>;
Ok(csv::Reader::from_reader(r)) as io::Result<_>
})
.collect::<Result<Vec<_>, _>>()?
},
None => {
let mut csv_readers = Vec::new();
let mut writers = Vec::new();
for (r, w) in (0..num_threads).map(|_| pipe::pipe()) {
let r = Box::new(r) as Box<dyn Read + Send>;
csv_readers.push(csv::Reader::from_reader(r));
writers.push(w);
}
thread::spawn(move || {
let stdin = std::io::stdin();
let mut stdin = stdin.lock();
let mut buffer = [0u8; 4096];
loop {
match stdin.read(&mut buffer)? {
0 => return Ok(()) as io::Result<()>,
size => for w in &mut writers {
w.write_all(&buffer[..size])?;
}
}
}
});
csv_readers
},
};
let stores = csv_readers let stores = csv_readers
.into_par_iter() .into_par_iter()