From 8ed8abb9df0388ddaaac38430cd649d611d1e929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 20 Oct 2020 15:00:58 +0200 Subject: [PATCH] Introduce an append-only indexing system --- src/indexing/mod.rs | 91 ++++++++++++++++++++++++++++++++------- src/indexing/store.rs | 3 +- src/subcommand/indexer.rs | 2 +- src/subcommand/serve.rs | 77 ++++++++++++++++++++++++--------- 4 files changed, 136 insertions(+), 37 deletions(-) diff --git a/src/indexing/mod.rs b/src/indexing/mod.rs index fd5a8e607..0fa4a7374 100644 --- a/src/indexing/mod.rs +++ b/src/indexing/mod.rs @@ -8,7 +8,7 @@ use bstr::ByteSlice as _; use flate2::read::GzDecoder; use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; -use log::{debug, info}; +use log::{debug, info, error}; use rayon::prelude::*; use structopt::StructOpt; use tempfile::tempfile; @@ -23,7 +23,7 @@ use self::merge_function::{ mod store; mod merge_function; -#[derive(Debug, StructOpt)] +#[derive(Debug, Clone, StructOpt)] pub struct IndexerOpt { /// The amount of documents to skip before printing /// a log regarding the indexing advancement. @@ -75,6 +75,12 @@ pub struct IndexerOpt { indexing_jobs: Option, } +#[derive(Debug, Copy, Clone)] +enum WriteMethod { + Append, + GetMergePut, +} + type MergeFn = fn(&[u8], &[Vec]) -> Result, ()>; fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { @@ -134,6 +140,7 @@ fn merge_into_lmdb_database( database: heed::PolyDatabase, sources: Vec>, merge: MergeFn, + method: WriteMethod, ) -> anyhow::Result<()> { debug!("Merging {} MTBL stores...", sources.len()); let before = Instant::now(); @@ -141,9 +148,26 @@ fn merge_into_lmdb_database( let merger = merge_readers(sources, merge); let mut in_iter = merger.into_merge_iter()?; - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = in_iter.next()? { - out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + match method { + WriteMethod::Append => { + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = in_iter.next()? { + out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + } + }, + WriteMethod::GetMergePut => { + while let Some((k, v)) = in_iter.next()? { + match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { + Some(old_val) => { + // TODO improve the function signature and avoid alocating here! + let vals = vec![old_val.to_vec(), v.to_vec()]; + let val = merge(k, &vals).expect("merge failed"); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? + }, + None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, + } + } + }, } debug!("MTBL stores merged in {:.02?}!", before.elapsed()); @@ -154,13 +178,32 @@ fn write_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, mut reader: Reader, + merge: MergeFn, + method: WriteMethod, ) -> anyhow::Result<()> { debug!("Writing MTBL stores..."); let before = Instant::now(); - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = reader.next()? { - out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + match method { + WriteMethod::Append => { + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = reader.next()? { + out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + } + }, + WriteMethod::GetMergePut => { + while let Some((k, v)) = reader.next()? { + match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { + Some(old_val) => { + // TODO improve the function signature and avoid alocating here! + let vals = vec![old_val.to_vec(), v.to_vec()]; + let val = merge(k, &vals).expect("merge failed"); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? + }, + None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, + } + } + } } debug!("MTBL stores merged in {:.02?}!", before.elapsed()); @@ -191,7 +234,7 @@ fn csv_bytes_readers<'a>( pub fn run<'a>( env: &heed::Env, index: &Index, - opt: IndexerOpt, + opt: &IndexerOpt, content: &'a [u8], gzipped: bool, ) -> anyhow::Result<()> @@ -204,7 +247,7 @@ pub fn run<'a>( fn run_intern<'a>( env: &heed::Env, index: &Index, - opt: IndexerOpt, + opt: &IndexerOpt, content: &'a [u8], gzipped: bool, ) -> anyhow::Result<()> @@ -224,6 +267,10 @@ fn run_intern<'a>( None }; + let rtxn = env.read_txn()?; + let number_of_documents = index.number_of_documents(&rtxn)?; + drop(rtxn); + let readers = csv_bytes_readers(content, gzipped, num_threads) .into_par_iter() .enumerate() @@ -236,7 +283,8 @@ fn run_intern<'a>( chunk_compression_level, chunk_fusing_shrink_size, )?; - store.index_csv(rdr, i, num_threads, log_every_n) + let base_document_id = number_of_documents; + store.index_csv(rdr, base_document_id, i, num_threads, log_every_n) }) .collect::, _>>()?; @@ -283,18 +331,24 @@ fn run_intern<'a>( .into_par_iter() .for_each(|(dbtype, readers, merge)| { let result = merge_readers(readers, merge); - sender.send((dbtype, result)).unwrap(); + if let Err(e) = sender.send((dbtype, result)) { + error!("sender error: {}", e); + } }); }); let mut wtxn = env.write_txn()?; + let contains_documents = number_of_documents != 0; + let write_method = if contains_documents { WriteMethod::GetMergePut } else { WriteMethod::Append }; + debug!("Writing the docid word positions into LMDB on disk..."); merge_into_lmdb_database( &mut wtxn, *index.docid_word_positions.as_polymorph(), docid_word_positions_readers, docid_word_positions_merge, + write_method )?; debug!("Writing the documents into LMDB on disk..."); @@ -303,6 +357,7 @@ fn run_intern<'a>( *index.documents.as_polymorph(), documents_readers, documents_merge, + write_method )?; for (db_type, result) in receiver { @@ -310,17 +365,23 @@ fn run_intern<'a>( match db_type { DatabaseType::Main => { debug!("Writing the main elements into LMDB on disk..."); - write_into_lmdb_database(&mut wtxn, index.main, content)?; + write_into_lmdb_database(&mut wtxn, index.main, content, main_merge, write_method)?; }, DatabaseType::WordDocids => { debug!("Writing the words docids into LMDB on disk..."); let db = *index.word_docids.as_polymorph(); - write_into_lmdb_database(&mut wtxn, db, content)?; + write_into_lmdb_database(&mut wtxn, db, content, word_docids_merge, write_method)?; }, DatabaseType::WordsPairsProximitiesDocids => { debug!("Writing the words pairs proximities docids into LMDB on disk..."); let db = *index.word_pair_proximity_docids.as_polymorph(); - write_into_lmdb_database(&mut wtxn, db, content)?; + write_into_lmdb_database( + &mut wtxn, + db, + content, + words_pairs_proximities_docids_merge, + write_method, + )?; }, } } diff --git a/src/indexing/store.rs b/src/indexing/store.rs index 8f9fc15df..904286ae2 100644 --- a/src/indexing/store.rs +++ b/src/indexing/store.rs @@ -304,6 +304,7 @@ impl Store { pub fn index_csv<'a>( mut self, mut rdr: csv::Reader>, + base_document_id: usize, thread_index: usize, num_threads: usize, log_every_n: usize, @@ -316,7 +317,7 @@ impl Store { self.write_headers(&headers)?; let mut before = Instant::now(); - let mut document_id: usize = 0; + let mut document_id: usize = base_document_id; let mut document = csv::StringRecord::new(); let mut words_positions = HashMap::new(); diff --git a/src/subcommand/indexer.rs b/src/subcommand/indexer.rs index 7310de11c..bfa9b2bc7 100644 --- a/src/subcommand/indexer.rs +++ b/src/subcommand/indexer.rs @@ -63,5 +63,5 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let file = File::open(file_path)?; let content = unsafe { memmap::Mmap::map(&file)? }; - indexing::run(&env, &index, opt.indexer, &content, gzipped) + indexing::run(&env, &index, &opt.indexer, &content, gzipped) } diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index e19d1d954..7e57aa49a 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -4,7 +4,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; use std::time::Instant; use askama_warp::Template; @@ -19,7 +18,7 @@ use tokio::sync::broadcast; use warp::filters::ws::Message; use warp::{Filter, http::Response}; -use crate::indexing::IndexerOpt; +use crate::indexing::{self, IndexerOpt}; use crate::tokenizer::{simple_tokenizer, TokenType}; use crate::{Index, UpdateStore, SearchResult}; @@ -111,6 +110,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .timestamp(stderrlog::Timestamp::Off) .init()?; + create_dir_all(&opt.database)?; let env = EnvOpenOptions::new() .map_size(opt.database_size) .max_dbs(10) @@ -128,48 +128,73 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let (update_status_sender, _) = broadcast::channel(100); let update_status_sender_cloned = update_status_sender.clone(); + let env_cloned = env.clone(); + let index_cloned = index.clone(); + let indexer_opt_cloned = opt.indexer.clone(); let update_store = UpdateStore::open( update_store_options, update_store_path, - move |update_id, meta: String, _content| { + move |update_id, meta: String, content| { let processing = UpdateStatus::Processing { update_id, meta: meta.clone() }; let _ = update_status_sender_cloned.send(processing); - std::thread::sleep(Duration::from_secs(3)); + let _progress = UpdateStatus::Progressing { update_id, meta: meta.clone() }; + // let _ = update_status_sender_cloned.send(progress); - let progress = UpdateStatus::Progressing { update_id, meta: meta.clone() }; - let _ = update_status_sender_cloned.send(progress); + let gzipped = false; + let result = indexing::run( + &env_cloned, + &index_cloned, + &indexer_opt_cloned, + content, + gzipped, + ); - std::thread::sleep(Duration::from_secs(3)); - - let progress = UpdateStatus::Progressing { update_id, meta: meta.clone() }; - let _ = update_status_sender_cloned.send(progress); - - std::thread::sleep(Duration::from_secs(3)); + let meta = match result { + Ok(()) => format!("valid update content"), + Err(e) => { + format!("error while processing update content: {}", e) + } + }; let processed = UpdateStatus::Processed { update_id, meta: meta.clone() }; let _ = update_status_sender_cloned.send(processed); + Ok(meta) })?; - // Retrieve the database the file stem (w/o the extension), - // the disk file size and the number of documents in the database. + // The database name will not change. let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); - let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; - - let rtxn = env.read_txn()?; - let docs_count = index.number_of_documents(&rtxn)? as usize; - drop(rtxn); + let lmdb_path = opt.database.join("data.mdb"); // We run and wait on the HTTP server // Expose an HTML page to debug the search in a browser let db_name_cloned = db_name.clone(); + let lmdb_path_cloned = lmdb_path.clone(); + let env_cloned = env.clone(); + let index_cloned = index.clone(); let dash_html_route = warp::filters::method::get() .and(warp::filters::path::end()) - .map(move || IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count }); + .map(move || { + // We retrieve the database size. + let db_size = File::open(lmdb_path_cloned.clone()) + .unwrap() + .metadata() + .unwrap() + .len() as usize; + + // And the number of documents in the database. + let rtxn = env_cloned.clone().read_txn().unwrap(); + let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize; + + IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count } + }); let update_store_cloned = update_store.clone(); + let lmdb_path_cloned = lmdb_path.clone(); + let env_cloned = env.clone(); + let index_cloned = index.clone(); let updates_list_or_html_route = warp::filters::method::get() .and(warp::header("Accept")) .and(warp::path!("updates")) @@ -190,6 +215,18 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { if header.contains("text/html") { updates.reverse(); + + // We retrieve the database size. + let db_size = File::open(lmdb_path_cloned.clone()) + .unwrap() + .metadata() + .unwrap() + .len() as usize; + + // And the number of documents in the database. + let rtxn = env_cloned.clone().read_txn().unwrap(); + let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize; + let template = UpdatesTemplate { db_name: db_name.clone(), db_size,