Simplify the indexer main loop

This commit is contained in:
Clément Renault 2020-09-21 14:59:48 +02:00
parent 3ded98e5fa
commit 944df52e2a
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -4,7 +4,7 @@ use std::fs::File;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::iter::FromIterator; use std::iter::FromIterator;
use std::path::PathBuf; use std::path::PathBuf;
use std::thread; use std::{iter, thread};
use std::time::Instant; use std::time::Instant;
use anyhow::Context; use anyhow::Context;
@ -129,7 +129,7 @@ struct Store {
} }
impl Store { impl Store {
fn new( pub fn new(
arc_cache_size: usize, arc_cache_size: usize,
max_nb_chunks: Option<usize>, max_nb_chunks: Option<usize>,
max_memory: Option<usize>, max_memory: Option<usize>,
@ -164,7 +164,7 @@ impl Store {
} }
// Save the documents ids under the position and word we have seen it. // Save the documents ids under the position and word we have seen it.
pub fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> { fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> {
let word_vec = SmallVec32::from(word.as_bytes()); let word_vec = SmallVec32::from(word.as_bytes());
let ids = RoaringBitmap::from_iter(Some(id)); let ids = RoaringBitmap::from_iter(Some(id));
let (_, lrus) = self.word_docids.insert(word_vec, ids, |old, new| old.union_with(&new)); let (_, lrus) = self.word_docids.insert(word_vec, ids, |old, new| old.union_with(&new));
@ -172,13 +172,13 @@ impl Store {
Ok(()) Ok(())
} }
pub fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> { fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> {
let headers = CsvStringRecordCodec::bytes_encode(headers) let headers = CsvStringRecordCodec::bytes_encode(headers)
.with_context(|| format!("could not encode csv record"))?; .with_context(|| format!("could not encode csv record"))?;
Ok(self.sorter.insert(HEADERS_KEY, headers)?) Ok(self.sorter.insert(HEADERS_KEY, headers)?)
} }
pub fn write_document( fn write_document(
&mut self, &mut self,
id: DocumentId, id: DocumentId,
iter: impl IntoIterator<Item=(String, RoaringBitmap)>, iter: impl IntoIterator<Item=(String, RoaringBitmap)>,
@ -248,7 +248,57 @@ impl Store {
Ok(()) Ok(())
} }
pub fn finish(mut self) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)> { pub fn index_csv(
mut self,
mut rdr: csv::Reader<Box<dyn Read + Send>>,
thread_index: usize,
num_threads: usize,
) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)>
{
debug!("{:?}: Indexing in a Store...", thread_index);
// Write the headers into the store.
let headers = rdr.headers()?;
self.write_headers(&headers)?;
let mut before = Instant::now();
let mut document_id: usize = 0;
let mut document = csv::StringRecord::new();
let mut word_positions = HashMap::new();
while rdr.read_record(&mut document)? {
// We skip documents that must not be indexed by this thread.
if document_id % num_threads == thread_index {
if document_id % ONE_MILLION == 0 {
let count = document_id / ONE_MILLION;
info!("We have seen {}m documents so far ({:.02?}).", count, before.elapsed());
before = Instant::now();
}
let document_id = DocumentId::try_from(document_id).context("generated id is too big")?;
for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) {
for (pos, (_, token)) in simple_tokenizer(&content).filter(only_words).enumerate().take(MAX_POSITION) {
let word = token.to_lowercase();
let position = (attr * MAX_POSITION + pos) as u32;
self.insert_word_docid(&word, document_id)?;
word_positions.entry(word).or_insert_with(RoaringBitmap::new).insert(position);
}
}
// We write the document in the documents store.
self.write_document(document_id, word_positions.drain(), &document)?;
}
// Compute the document id of the next document.
document_id = document_id + 1;
}
let (reader, docs_reader) = self.finish()?;
debug!("{:?}: Store created!", thread_index);
Ok((reader, docs_reader))
}
fn finish(mut self) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)> {
Self::write_word_docids(&mut self.sorter, self.word_docids)?; Self::write_word_docids(&mut self.sorter, self.word_docids)?;
Self::write_documents_ids(&mut self.sorter, self.documents_ids)?; Self::write_documents_ids(&mut self.sorter, self.documents_ids)?;
@ -375,66 +425,57 @@ where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()>
Ok(()) Ok(())
} }
fn index_csv( /// Returns the list of CSV sources that the indexer must read.
mut rdr: csv::Reader<Box<dyn Read + Send>>, ///
thread_index: usize, /// There is `num_threads` sources. If the file is not specified, the standard input is used.
fn csv_readers(
csv_file_path: Option<PathBuf>,
num_threads: usize, num_threads: usize,
arc_cache_size: usize, ) -> anyhow::Result<Vec<csv::Reader<Box<dyn Read + Send>>>>
max_nb_chunks: Option<usize>,
max_memory: Option<usize>,
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)>
{ {
debug!("{:?}: Indexing into a Store...", thread_index); match csv_file_path {
Some(file_path) => {
let mut store = Store::new( // We open the file # jobs times.
arc_cache_size, iter::repeat_with(|| {
max_nb_chunks, let file = File::open(&file_path)
max_memory, .with_context(|| format!("Failed to read CSV file {}", file_path.display()))?;
chunk_compression_type, // if the file extension is "gz" or "gzip" we can decode and read it.
chunk_compression_level, let r = if file_path.extension().map_or(false, |e| e == "gz" || e == "gzip") {
); Box::new(GzDecoder::new(file)) as Box<dyn Read + Send>
} else {
// Write the headers into a Vec of bytes and then into the store. Box::new(file) as Box<dyn Read + Send>
let headers = rdr.headers()?; };
store.write_headers(&headers)?; Ok(csv::Reader::from_reader(r)) as anyhow::Result<_>
})
let mut before = Instant::now(); .take(num_threads)
let mut document_id: usize = 0; .collect()
let mut document = csv::StringRecord::new(); },
let mut word_positions = HashMap::new(); None => {
while rdr.read_record(&mut document)? { let mut csv_readers = Vec::new();
let mut writers = Vec::new();
// We skip documents that must not be indexed by this thread. for (r, w) in iter::repeat_with(ringtail::io::pipe).take(num_threads) {
if document_id % num_threads == thread_index { let r = Box::new(r) as Box<dyn Read + Send>;
if document_id % ONE_MILLION == 0 { csv_readers.push(csv::Reader::from_reader(r));
info!("We have seen {}m documents so far ({:.02?}).", writers.push(w);
document_id / ONE_MILLION, before.elapsed());
before = Instant::now();
} }
let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; thread::spawn(move || {
for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { let stdin = std::io::stdin();
for (pos, (_, token)) in simple_tokenizer(&content).filter(only_words).enumerate().take(MAX_POSITION) { let mut stdin = stdin.lock();
let word = token.to_lowercase(); let mut buffer = [0u8; 4096];
let position = (attr * MAX_POSITION + pos) as u32; loop {
store.insert_word_docid(&word, document_id)?; match stdin.read(&mut buffer)? {
word_positions.entry(word).or_insert_with(RoaringBitmap::new).insert(position); 0 => return Ok(()) as io::Result<()>,
size => for w in &mut writers {
w.write_all(&buffer[..size])?;
}
}
} }
} });
// We write the document in the database. Ok(csv_readers)
store.write_document(document_id, word_positions.drain(), &document)?; },
}
// Compute the document id of the the next document.
document_id = document_id + 1;
} }
let (reader, docs_reader) = store.finish()?;
debug!("{:?}: Store created!", thread_index);
Ok((reader, docs_reader))
} }
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
@ -466,69 +507,22 @@ fn main() -> anyhow::Result<()> {
let chunk_compression_type = compression_type_from_str(&opt.indexer.chunk_compression_type); let chunk_compression_type = compression_type_from_str(&opt.indexer.chunk_compression_type);
let chunk_compression_level = opt.indexer.chunk_compression_level; let chunk_compression_level = opt.indexer.chunk_compression_level;
let csv_readers = match opt.csv_file { let readers = csv_readers(opt.csv_file, num_threads)?
Some(file_path) => {
// We open the file # jobs times.
(0..num_threads)
.map(|_| {
let file = File::open(&file_path)?;
// if the file extension is "gz" or "gzip" we can decode and read it.
let r = if file_path.extension().map_or(false, |ext| ext == "gz" || ext == "gzip") {
Box::new(GzDecoder::new(file)) as Box<dyn Read + Send>
} else {
Box::new(file) as Box<dyn Read + Send>
};
Ok(csv::Reader::from_reader(r)) as io::Result<_>
})
.collect::<Result<Vec<_>, _>>()?
},
None => {
let mut csv_readers = Vec::new();
let mut writers = Vec::new();
for (r, w) in (0..num_threads).map(|_| ringtail::io::pipe()) {
let r = Box::new(r) as Box<dyn Read + Send>;
csv_readers.push(csv::Reader::from_reader(r));
writers.push(w);
}
thread::spawn(move || {
let stdin = std::io::stdin();
let mut stdin = stdin.lock();
let mut buffer = [0u8; 4096];
loop {
match stdin.read(&mut buffer)? {
0 => return Ok(()) as io::Result<()>,
size => for w in &mut writers {
w.write_all(&buffer[..size])?;
}
}
}
});
csv_readers
},
};
let readers = csv_readers
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
.map(|(i, rdr)| { .map(|(i, rdr)| {
index_csv( Store::new(
rdr,
i,
num_threads,
arc_cache_size, arc_cache_size,
max_nb_chunks, max_nb_chunks,
max_memory, max_memory,
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
) ).index_csv(rdr, i, num_threads)
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
let mut stores = Vec::with_capacity(readers.len()); let mut stores = Vec::with_capacity(readers.len());
let mut docs_stores = Vec::with_capacity(readers.len()); let mut docs_stores = Vec::with_capacity(readers.len());
readers.into_iter().for_each(|(s, d)| { readers.into_iter().for_each(|(s, d)| {
stores.push(s); stores.push(s);
docs_stores.push(d); docs_stores.push(d);