diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index af45d27d5..107ee55eb 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -2,22 +2,25 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; use std::hash::{Hash, BuildHasher}; -use std::io; -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{cmp, io}; +use std::iter::FromIterator; +use std::path::{Path, PathBuf}; +use std::time::Instant; -use anyhow::Context; -use cow_utils::CowUtils; -use fst::Streamer; -use heed::EnvOpenOptions; +use anyhow::{ensure, Context}; +use fst::{Streamer, set::OpBuilder}; use heed::types::*; +use heed::{Env, EnvOpenOptions}; +use rayon::prelude::*; use roaring::RoaringBitmap; use slice_group_by::StrGroupBy; use structopt::StructOpt; +use tempfile::TempDir; use mega_mini_indexer::cache::ArcCache; use mega_mini_indexer::{BEU32, Index, DocumentId, FastMap4}; +const ONE_MILLION: u32 = 1_000_000; const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; @@ -25,8 +28,6 @@ const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0); // AtomicU32 ? - pub fn simple_alphanumeric_tokens(string: &str) -> impl Iterator { let is_alphanumeric = |s: &&str| s.chars().next().map_or(false, char::is_alphanumeric); string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric) @@ -95,12 +96,19 @@ fn index_csv( writer.write_byte_record(headers.as_byte_record())?; let headers = writer.into_inner()?; + let mut document_id = 0usize; + let mut before = Instant::now(); let mut document = csv::StringRecord::new(); while rdr.read_record(&mut document)? { - let document_id = ID_GENERATOR.fetch_add(1, Ordering::SeqCst); + document_id = document_id + 1; let document_id = DocumentId::try_from(document_id).context("Generated id is too big")?; + if thread_index == 0 && document_id % ONE_MILLION == 0 { + eprintln!("Document {}m just processed ({:.02?} elapsed).", document_id / ONE_MILLION, before.elapsed()); + before = Instant::now(); + } + for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { if !word.is_empty() && word.len() < 500 { // LMDB limits @@ -142,11 +150,13 @@ fn index_csv( } } - // We write the document in the database. - let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); - writer.write_byte_record(document.as_byte_record())?; - let document = writer.into_inner()?; - index.documents.put(wtxn, &BEU32::new(document_id), &document)?; + if thread_index == 0 { + // We write the document in the database. + let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); + writer.write_byte_record(document.as_byte_record())?; + let document = writer.into_inner()?; + index.documents.put(wtxn, &BEU32::new(document_id), &document)?; + } } put_evicted_into_heed(wtxn, index, words_cache)?; @@ -207,37 +217,134 @@ fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> any Ok(()) } -fn main() -> anyhow::Result<()> { - let opt = Opt::from_args(); +fn merge_databases( + others: Vec<(usize, Option, Env, Index)>, + wtxn: &mut heed::RwTxn, + index: &Index, +) -> anyhow::Result<()> +{ + eprintln!("Merging the temporary databases..."); - std::fs::create_dir_all(&opt.database)?; + let mut fsts = Vec::new(); + for (_i, _dir, env, oindex) in others { + let rtxn = env.read_txn()?; + + // merge and check the headers are equal + let headers = oindex.headers(&rtxn)?.context("A database is missing the headers")?; + match index.headers(wtxn)? { + Some(h) => ensure!(h == headers, "headers are not equal"), + None => index.put_headers(wtxn, &headers)?, + }; + + // retrieve the FSTs to merge them together in one run. + let fst = oindex.fst(&rtxn)?.context("A database is missing its FST")?; + let fst = fst.map_data(|s| s.to_vec())?; + fsts.push(fst); + + // merge the words positions + for result in oindex.word_positions.iter(&rtxn)? { + let (word, pos) = result?; + index.word_positions.put(wtxn, word, &pos)?; + } + + // merge the documents ids by word and position + for result in oindex.word_position_docids.iter(&rtxn)? { + let (key, docids) = result?; + index.word_position_docids.put(wtxn, key, &docids)?; + } + + // merge the documents ids by word and attribute + for result in oindex.word_attribute_docids.iter(&rtxn)? { + let (key, docids) = result?; + index.word_attribute_docids.put(wtxn, key, &docids)?; + } + + for result in oindex.documents.iter(&rtxn)? { + let (id, content) = result?; + index.documents.put(wtxn, &id, &content)?; + } + } + + // Merge all the FSTs to create a final one and write it in the final database. + if let Some(fst) = index.fst(wtxn)? { + let fst = fst.map_data(|s| s.to_vec())?; + fsts.push(fst); + } + + let builder = OpBuilder::from_iter(&fsts); + let op = builder.r#union(); + let mut builder = fst::set::SetBuilder::memory(); + builder.extend_stream(op)?; + let fst = builder.into_set(); + + index.put_fst(wtxn, &fst)?; + + Ok(()) +} + +fn open_env_index(path: impl AsRef) -> anyhow::Result<(Env, Index)> { let env = EnvOpenOptions::new() .map_size(100 * 1024 * 1024 * 1024) // 100 GB .max_readers(10) .max_dbs(10) - .open(opt.database)?; + .open(path)?; let index = Index::new(&env)?; - let mut wtxn = env.write_txn()?; + Ok((env, index)) +} - match opt.csv_file { +fn main() -> anyhow::Result<()> { + let opt = Opt::from_args(); + std::fs::create_dir_all(&opt.database)?; + + match &opt.csv_file { Some(path) => { - let rdr = csv::Reader::from_path(path)?; - index_csv(&mut wtxn, rdr, &index, 1, 0)?; + let num_threads = rayon::current_num_threads(); + + let result: Result, anyhow::Error> = + (0..num_threads).into_par_iter().map(|i| { + let (dir, env, index) = if i == 0 { + let (env, index) = open_env_index(&opt.database)?; + (None, env, index) + } else { + let dir = tempfile::tempdir()?; + let (env, index) = open_env_index(&dir)?; + (Some(dir), env, index) + }; + + let mut wtxn = env.write_txn()?; + let rdr = csv::Reader::from_path(path)?; + index_csv(&mut wtxn, rdr, &index, num_threads, i)?; + + wtxn.commit()?; + + Ok((i, dir, env, index)) + }) + .collect(); + + let mut parts = result?; + parts.sort_unstable_by_key(|&(i, ..)| cmp::Reverse(i)); + + let (_, _, env, index) = parts.pop().context("missing base database")?; + + // TODO we can merge databases that are ready to be merged + // into the final one, without having to wait for all of them. + // TODO we can reuse an already existing database instead of creating a new one + // it would be even better to use the first one as it contains the documents. + let mut wtxn = env.write_txn()?; + merge_databases(parts, &mut wtxn, &index)?; + + compute_words_attributes_docids(&mut wtxn, &index)?; + let count = index.documents.len(&wtxn)?; + + wtxn.commit()?; + + eprintln!("Wrote {} documents into LMDB", count); }, - None => { - let rdr = csv::Reader::from_reader(io::stdin()); - index_csv(&mut wtxn, rdr, &index, 1, 0)?; - } + None => todo!("support for stdin CSV while indexing in parallel"), }; - compute_words_attributes_docids(&mut wtxn, &index)?; - let count = index.documents.len(&wtxn)?; - - wtxn.commit()?; - - eprintln!("Wrote {} documents into LMDB", count); Ok(()) }