From 0a44ff86abaf3469e39de0b1cf60d93cec0c6a74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Fri, 28 Aug 2020 15:38:05 +0200 Subject: [PATCH] Put the documents MTBL back into LMDB We makes sure to write the documents into a file before memory mapping it and putting it into LMDB, this way we avoid moving it to RAM --- public/script.js | 5 +-- src/bin/indexer.rs | 52 +++++++++++------------ src/bin/infos.rs | 7 ++- src/bin/search.rs | 4 +- src/bin/serve.rs | 20 +++------ src/heed_codec/mod.rs | 2 + src/heed_codec/mtbl_codec.rs | 20 +++++++++ src/lib.rs | 82 ++++++++++++++++-------------------- src/transitive_arc.rs | 16 ------- templates/index.html | 2 +- 10 files changed, 100 insertions(+), 110 deletions(-) create mode 100644 src/heed_codec/mtbl_codec.rs delete mode 100644 src/transitive_arc.rs diff --git a/public/script.js b/public/script.js index 6882bbb62..da0831ef3 100644 --- a/public/script.js +++ b/public/script.js @@ -63,8 +63,5 @@ $('#docs-count').text(function(index, text) { // Make the database a little bit easier to read $('#db-size').text(function(index, text) { - let arr = text.split("+"); - let database_size = filesize(parseInt(arr[0])); - let documents_size = filesize(parseInt(arr[1])); - return `${database_size} + ${documents_size}` + return filesize(parseInt(text)) }); diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 4bb582fcd..9306cc845 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,5 +1,5 @@ use std::convert::{TryFrom, TryInto}; -use std::fs::{File, OpenOptions}; +use std::fs::File; use std::io::{self, Read, Write}; use std::iter::FromIterator; use std::path::PathBuf; @@ -14,7 +14,7 @@ use flate2::read::GzDecoder; use fst::IntoStreamer; use heed::EnvOpenOptions; use heed::types::*; -use log::debug; +use log::{debug, info}; use memmap::Mmap; use oxidized_mtbl::{Reader, Writer, Merger, Sorter, CompressionType}; use rayon::prelude::*; @@ -486,9 +486,9 @@ fn main() -> anyhow::Result<()> { .max_dbs(10) .open(&opt.database)?; - let mut index = Index::new(&env, &opt.database)?; + let before_indexing = Instant::now(); + let index = Index::new(&env)?; - let documents_path = opt.database.join("documents.mtbl"); let num_threads = rayon::current_num_threads(); let arc_cache_size = opt.indexer.arc_cache_size; let max_nb_chunks = opt.indexer.max_nb_chunks; @@ -566,32 +566,28 @@ fn main() -> anyhow::Result<()> { docs_stores.push(d); }); - debug!("We are writing into LMDB and MTBL..."); + debug!("We are writing the documents into MTBL on disk..."); + // We also merge the documents into its own MTBL store. + let file = tempfile::tempfile()?; + let mut writer = Writer::builder() + .compression_type(documents_compression_type) + .compression_level(documents_compression_level) + .build(file); + let mut builder = Merger::builder(docs_merge); + builder.extend(docs_stores); + builder.build().write_into(&mut writer)?; + let file = writer.into_inner()?; + let documents_mmap = unsafe { memmap::Mmap::map(&file)? }; - // We run both merging steps in parallel. - let (lmdb, mtbl) = rayon::join(|| { - // We merge the postings lists into LMDB. - let mut wtxn = env.write_txn()?; - merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; - Ok(wtxn.commit()?) as anyhow::Result<_> - }, || { - // We also merge the documents into its own MTBL store. - let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(documents_path)?; - let mut writer = Writer::builder() - .compression_type(documents_compression_type) - .compression_level(documents_compression_level) - .build(file); - let mut builder = Merger::builder(docs_merge); - builder.extend(docs_stores); - builder.build().write_into(&mut writer)?; - Ok(writer.finish()?) as anyhow::Result<_> - }); + debug!("We are writing the postings lists and documents into LMDB on disk..."); + // We merge the postings lists into LMDB. + let mut wtxn = env.write_txn()?; + merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; + index.put_documents(&mut wtxn, &documents_mmap)?; + let count = index.number_of_documents(&wtxn)?; + wtxn.commit()?; - lmdb.and(mtbl)?; - index.refresh_documents()?; - let count = index.number_of_documents(); - - debug!("Wrote {} documents into LMDB", count); + info!("Wrote {} documents in {:.02?}", count, before_indexing.elapsed()); Ok(()) } diff --git a/src/bin/infos.rs b/src/bin/infos.rs index e68f52357..56ea0d720 100644 --- a/src/bin/infos.rs +++ b/src/bin/infos.rs @@ -96,7 +96,7 @@ fn main() -> anyhow::Result<()> { .open(&opt.database)?; // Open the LMDB database. - let index = Index::new(&env, opt.database)?; + let index = Index::new(&env)?; let rtxn = env.read_txn()?; match opt.command { @@ -200,6 +200,11 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho if heap.len() > limit { heap.pop(); } } + if let Some(documents) = index.main.get::<_, ByteSlice, ByteSlice>(rtxn, b"documents")? { + heap.push(Reverse((documents.len(), format!("documents"), main_name))); + if heap.len() > limit { heap.pop(); } + } + for result in index.word_positions.as_polymorph().iter::<_, Str, ByteSlice>(rtxn)? { let (word, value) = result?; heap.push(Reverse((value.len(), word.to_string(), word_positions_name))); diff --git a/src/bin/search.rs b/src/bin/search.rs index 2618c376f..52d450f40 100644 --- a/src/bin/search.rs +++ b/src/bin/search.rs @@ -49,7 +49,7 @@ fn main() -> anyhow::Result<()> { .open(&opt.database)?; // Open the LMDB database. - let index = Index::new(&env, opt.database)?; + let index = Index::new(&env)?; let rtxn = env.read_txn()?; let stdin = io::stdin(); @@ -68,7 +68,7 @@ fn main() -> anyhow::Result<()> { Some(headers) => headers, None => return Ok(()), }; - let documents = index.documents(result.documents_ids.iter().cloned())?; + let documents = index.documents(&rtxn, result.documents_ids.iter().cloned())?; let mut stdout = io::stdout(); stdout.write_all(&headers)?; diff --git a/src/bin/serve.rs b/src/bin/serve.rs index 3b91aa2c2..67252f233 100644 --- a/src/bin/serve.rs +++ b/src/bin/serve.rs @@ -62,7 +62,6 @@ fn highlight_string(string: &str, words: &HashSet) -> String { struct IndexTemplate { db_name: String, db_size: usize, - docs_size: usize, docs_count: usize, } @@ -83,28 +82,23 @@ async fn main() -> anyhow::Result<()> { .open(&opt.database)?; // Open the LMDB database. - let index = Index::new(&env, &opt.database)?; + let index = Index::new(&env)?; // Retrieve the database the file stem (w/o the extension), // the disk file size and the number of documents in the database. let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; - let docs_size = File::open(opt.database.join("documents.mtbl"))?.metadata()?.len() as usize; - let docs_count = index.number_of_documents(); + + let rtxn = env.read_txn()?; + let docs_count = index.number_of_documents(&rtxn)? as usize; + drop(rtxn); // We run and wait on the HTTP server // Expose an HTML page to debug the search in a browser let dash_html_route = warp::filters::method::get() .and(warp::filters::path::end()) - .map(move || { - IndexTemplate { - db_name: db_name.clone(), - db_size, - docs_size, - docs_count: docs_count as usize, - } - }); + .map(move || IndexTemplate { db_name: db_name.clone(), db_size, docs_count }); let dash_bulma_route = warp::filters::method::get() .and(warp::path!("bulma.min.css")) @@ -192,7 +186,7 @@ async fn main() -> anyhow::Result<()> { if let Some(headers) = index.headers(&rtxn).unwrap() { // We write the headers body.extend_from_slice(headers); - let documents = index.documents(documents_ids).unwrap(); + let documents = index.documents(&rtxn, documents_ids).unwrap(); for (_id, content) in documents { let content = std::str::from_utf8(content.as_ref()).unwrap(); diff --git a/src/heed_codec/mod.rs b/src/heed_codec/mod.rs index bb75cdc15..7729065a9 100644 --- a/src/heed_codec/mod.rs +++ b/src/heed_codec/mod.rs @@ -1,5 +1,7 @@ +mod mtbl_codec; mod roaring_bitmap_codec; mod str_beu32_codec; +pub use self::mtbl_codec::MtblCodec; pub use self::roaring_bitmap_codec::RoaringBitmapCodec; pub use self::str_beu32_codec::StrBEU32Codec; diff --git a/src/heed_codec/mtbl_codec.rs b/src/heed_codec/mtbl_codec.rs new file mode 100644 index 000000000..c36960079 --- /dev/null +++ b/src/heed_codec/mtbl_codec.rs @@ -0,0 +1,20 @@ +use std::borrow::Cow; +use oxidized_mtbl::Reader; + +pub struct MtblCodec; + +impl<'a> heed::BytesDecode<'a> for MtblCodec { + type DItem = Reader<&'a [u8]>; + + fn bytes_decode(bytes: &'a [u8]) -> Option { + Reader::new(bytes).ok() + } +} + +impl heed::BytesEncode<'_> for MtblCodec { + type EItem = [u8]; + + fn bytes_encode(item: &Self::EItem) -> Option> { + Some(Cow::Borrowed(item)) + } +} diff --git a/src/lib.rs b/src/lib.rs index ffc0d289b..bcfc3d2bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,27 +2,20 @@ mod criterion; mod node; mod query_tokens; mod search; -mod transitive_arc; pub mod heed_codec; pub mod lexer; use std::collections::HashMap; -use std::fs::{File, OpenOptions}; use std::hash::BuildHasherDefault; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use anyhow::Context; +use anyhow::{bail, Context}; use fxhash::{FxHasher32, FxHasher64}; use heed::types::*; use heed::{PolyDatabase, Database}; -use memmap::Mmap; -use oxidized_mtbl as omtbl; pub use self::search::{Search, SearchResult}; pub use self::criterion::{Criterion, default_criteria}; -use self::heed_codec::{RoaringBitmapCodec, StrBEU32Codec}; -use self::transitive_arc::TransitiveArc; +use self::heed_codec::{MtblCodec, RoaringBitmapCodec, StrBEU32Codec}; pub type FastMap4 = HashMap>; pub type FastMap8 = HashMap>; @@ -34,10 +27,12 @@ pub type DocumentId = u32; pub type Attribute = u32; pub type Position = u32; +const WORDS_FST_KEY: &str = "words-fst"; +const HEADERS_KEY: &str = "headers"; +const DOCUMENTS_KEY: &str = "documents"; + #[derive(Clone)] pub struct Index { - // The database path, where the LMDB and MTBL files are. - path: PathBuf, /// Contains many different types (e.g. the documents CSV headers). pub main: PolyDatabase, /// A word and all the positions where it appears in the whole dataset. @@ -46,44 +41,24 @@ pub struct Index { pub word_position_docids: Database, /// Maps a word and an attribute (u32) to all the documents ids where the given word appears. pub word_attribute_docids: Database, - /// The MTBL store that contains the documents content. - documents: omtbl::Reader>, } impl Index { - pub fn new>(env: &heed::Env, path: P) -> anyhow::Result { - let documents_path = path.as_ref().join("documents.mtbl"); - let mut documents = OpenOptions::new().create(true).write(true).read(true).open(documents_path)?; - // If the file is empty we must initialize it like an empty MTBL database. - if documents.metadata()?.len() == 0 { - omtbl::Writer::new(&mut documents).finish()?; - } - let documents = unsafe { memmap::Mmap::map(&documents)? }; - + pub fn new(env: &heed::Env) -> anyhow::Result { Ok(Index { - path: path.as_ref().to_path_buf(), main: env.create_poly_database(None)?, word_positions: env.create_database(Some("word-positions"))?, word_position_docids: env.create_database(Some("word-position-docids"))?, word_attribute_docids: env.create_database(Some("word-attribute-docids"))?, - documents: omtbl::Reader::new(TransitiveArc(Arc::new(documents)))?, }) } - pub fn refresh_documents(&mut self) -> anyhow::Result<()> { - let documents_path = self.path.join("documents.mtbl"); - let documents = File::open(&documents_path)?; - let documents = unsafe { memmap::Mmap::map(&documents)? }; - self.documents = omtbl::Reader::new(TransitiveArc(Arc::new(documents)))?; - Ok(()) - } - pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &[u8]) -> anyhow::Result<()> { - Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers)?) + Ok(self.main.put::<_, Str, ByteSlice>(wtxn, HEADERS_KEY, headers)?) } pub fn headers<'t>(&self, rtxn: &'t heed::RoTxn) -> heed::Result> { - self.main.get::<_, Str, ByteSlice>(rtxn, "headers") + self.main.get::<_, Str, ByteSlice>(rtxn, HEADERS_KEY) } pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result> { @@ -98,29 +73,46 @@ impl Index { } pub fn put_fst>(&self, wtxn: &mut heed::RwTxn, fst: &fst::Set) -> anyhow::Result<()> { - Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", fst.as_fst().as_bytes())?) + Ok(self.main.put::<_, Str, ByteSlice>(wtxn, WORDS_FST_KEY, fst.as_fst().as_bytes())?) } pub fn fst<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result>> { - match self.main.get::<_, Str, ByteSlice>(rtxn, "words-fst")? { + match self.main.get::<_, Str, ByteSlice>(rtxn, WORDS_FST_KEY)? { Some(bytes) => Ok(Some(fst::Set::new(bytes)?)), None => Ok(None), } } /// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing. - pub fn documents>(&self, iter: I) -> anyhow::Result)>> { - iter.into_iter().map(|id| { - let key = id.to_be_bytes(); - let content = self.documents.clone().get(&key)?.with_context(|| format!("Could not find document {}.", id))?; - Ok((id, content.as_ref().to_vec())) - }) - .collect() + pub fn documents<'t>( + &self, + rtxn: &'t heed::RoTxn, + iter: impl IntoIterator, + ) -> anyhow::Result)>> + { + match self.main.get::<_, Str, MtblCodec>(rtxn, DOCUMENTS_KEY)? { + Some(documents) => { + iter.into_iter().map(|id| { + let key = id.to_be_bytes(); + let content = documents.clone().get(&key)? + .with_context(|| format!("Could not find document {}", id))?; + Ok((id, content.as_ref().to_vec())) + }).collect() + }, + None => bail!("No documents database found"), + } + } + + pub fn put_documents(&self, wtxn: &mut heed::RwTxn, documents: &[u8]) -> anyhow::Result<()> { + Ok(self.main.put::<_, Str, MtblCodec>(wtxn, DOCUMENTS_KEY, documents)?) } /// Returns the number of documents indexed in the database. - pub fn number_of_documents(&self) -> usize { - self.documents.metadata().count_entries as usize + pub fn number_of_documents<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result { + match self.main.get::<_, Str, MtblCodec>(rtxn, DOCUMENTS_KEY)? { + Some(documents) => Ok(documents.metadata().count_entries as usize), + None => return Ok(0), + } } pub fn search<'a>(&'a self, rtxn: &'a heed::RoTxn) -> Search<'a> { diff --git a/src/transitive_arc.rs b/src/transitive_arc.rs deleted file mode 100644 index c25ee3e63..000000000 --- a/src/transitive_arc.rs +++ /dev/null @@ -1,16 +0,0 @@ -use std::sync::Arc; - -/// An `Arc<[u8]>` that is transitive over `AsRef<[u8]>`. -pub struct TransitiveArc(pub Arc); - -impl> AsRef<[u8]> for TransitiveArc { - fn as_ref(&self) -> &[u8] { - self.0.as_ref().as_ref() - } -} - -impl Clone for TransitiveArc { - fn clone(&self) -> TransitiveArc { - TransitiveArc(self.0.clone()) - } -} diff --git a/templates/index.html b/templates/index.html index cb4e1f458..0cb717c92 100644 --- a/templates/index.html +++ b/templates/index.html @@ -34,7 +34,7 @@

Database Size

-

{{ db_size }} + {{ docs_size }}

+

{{ db_size }}