diff --git a/Cargo.lock b/Cargo.lock index abc44ba56..1fa4b92e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,11 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "adler32" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" + [[package]] name = "anyhow" version = "1.0.31" @@ -66,6 +72,9 @@ name = "cc" version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -84,6 +93,21 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "crc32c" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ba37ef26c12988c1cee882d522d65e1d5d2ad8c3864665b88ee92767ed84c5" + +[[package]] +name = "crc32fast" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-deque" version = "0.7.3" @@ -165,6 +189,18 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" +[[package]] +name = "flate2" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cfff41391129e0a856d6d822600b8d71179d46879e310417eb9c762eb178b42" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "fs_extra" version = "1.1.0" @@ -186,6 +222,23 @@ dependencies = [ "byteorder 1.3.4", ] +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "heck" version = "0.3.1" @@ -251,6 +304,15 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "itertools" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.5" @@ -278,6 +340,15 @@ dependencies = [ "libc", ] +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -325,11 +396,15 @@ dependencies = [ "fxhash", "heed", "jemallocator", + "memmap", + "oxidized-mtbl", "rayon", "roaring", "slice-group-by", "smallstr", + "smallvec", "structopt", + "tempfile", ] [[package]] @@ -338,6 +413,16 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "memoffset" version = "0.5.4" @@ -347,6 +432,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "miniz_oxide" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" +dependencies = [ + "adler32", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -363,6 +457,18 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +[[package]] +name = "oxidized-mtbl" +version = "0.1.0" +source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=8918476#8918476f61f4430890d067db7b4a6cfb2d549c43" +dependencies = [ + "byteorder 1.3.4", + "crc32c", + "flate2", + "snap", + "zstd", +] + [[package]] name = "page_size" version = "0.4.2" @@ -385,6 +491,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" +[[package]] +name = "ppv-lite86" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" + [[package]] name = "proc-macro-error" version = "1.0.2" @@ -429,6 +541,47 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom", + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core", +] + [[package]] name = "rayon" version = "1.3.0" @@ -453,6 +606,12 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "redox_syscall" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" + [[package]] name = "regex-automata" version = "0.1.9" @@ -462,6 +621,15 @@ dependencies = [ "byteorder 1.3.4", ] +[[package]] +name = "remove_dir_all" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +dependencies = [ + "winapi", +] + [[package]] name = "roaring" version = "0.5.2" @@ -521,6 +689,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" +[[package]] +name = "snap" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fb9b0bb877b35a1cc1474a3b43d9c226a2625311760cdda2cbccbc0c7a8376" + [[package]] name = "structopt" version = "0.3.14" @@ -579,6 +753,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tempfile" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +dependencies = [ + "cfg-if", + "libc", + "rand", + "redox_syscall", + "remove_dir_all", + "winapi", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -641,6 +829,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "winapi" version = "0.3.8" @@ -683,3 +877,34 @@ dependencies = [ "syn", "synstructure", ] + +[[package]] +name = "zstd" +version = "0.5.2+zstd.1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644352b10ce7f333d6e0af85bd4f5322dc449416dc1211c6308e95bca8923db4" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "2.0.4+zstd.1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7113c0c9aed2c55181f2d9f5b0a36e7d2c0183b11c058ab40b35987479efe4d7" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.4.16+zstd.1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c442965efc45353be5a9b9969c9b0872fff6828c7e06d118dda2cb2d0bb11d5a" +dependencies = [ + "cc", + "glob", + "itertools", + "libc", +] diff --git a/Cargo.toml b/Cargo.toml index bf5f76152..abe475a1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,20 @@ edition = "2018" anyhow = "1.0.28" bitpacking = "0.8.2" byteorder = "1.3.4" -roaring = "0.5.2" csv = "1.1.3" fst = "0.4.3" fxhash = "0.2.1" heed = { version = "0.8.0", default-features = false, features = ["lmdb"] } jemallocator = "0.3.2" +memmap = "0.7.0" +oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "8918476" } rayon = "1.3.0" +roaring = "0.5.2" slice-group-by = "0.2.6" smallstr = "0.2.0" +smallvec = "1.4.0" structopt = { version = "0.3.14", default-features = false } +tempfile = "3.1.0" [profile.release] debug = true diff --git a/src/main.rs b/src/main.rs index 1dd6551ef..7a22d16d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,25 @@ -use std::collections::hash_map::Entry; use std::collections::{HashMap, BTreeSet}; use std::convert::TryFrom; +use std::convert::TryInto; use std::fs::File; use std::hash::BuildHasherDefault; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; -use anyhow::{ensure, Context}; -use roaring::RoaringBitmap; -use fst::IntoStreamer; +use anyhow::Context; +use fst::{Streamer, IntoStreamer}; use fxhash::FxHasher32; -use heed::{EnvOpenOptions, PolyDatabase, Database}; use heed::types::*; +use heed::{EnvOpenOptions, PolyDatabase, Database}; +use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; use rayon::prelude::*; +use roaring::RoaringBitmap; use slice_group_by::StrGroupBy; use structopt::StructOpt; pub type FastMap4 = HashMap>; pub type SmallString32 = smallstr::SmallString<[u8; 32]>; +pub type SmallVec32 = smallvec::SmallVec<[u8; 32]>; pub type BEU32 = heed::zerocopy::U32; pub type DocumentId = u32; @@ -39,100 +41,126 @@ struct Opt { files_to_index: Vec, } -fn union_postings_ids(_key: &[u8], old_value: Option<&[u8]>, new_value: RoaringBitmap) -> Option> { - let result = match old_value { - Some(bytes) => { - let mut old_value = RoaringBitmap::deserialize_from(bytes).unwrap(); - old_value.union_with(&new_value); - old_value - }, - None => new_value, - }; - - let mut vec = Vec::new(); - result.serialize_into(&mut vec).unwrap(); - Some(vec) -} - -fn union_words_fst(key: &[u8], old_value: Option<&[u8]>, new_value: &fst::Set>) -> Option> { - if key != b"words-fst" { unimplemented!() } - - // Do an union of the old and the new set of words. - let mut builder = fst::set::OpBuilder::new(); - - let old_words = old_value.map(|v| fst::Set::new(v).unwrap()); - let old_words = old_words.as_ref().map(|v| v.into_stream()); - if let Some(old_words) = old_words { - builder.push(old_words); - } - - builder.push(new_value); - - let op = builder.r#union(); - let mut build = fst::SetBuilder::memory(); - build.extend_stream(op.into_stream()).unwrap(); - - Some(build.into_inner().unwrap()) -} - fn alphanumeric_tokens(string: &str) -> impl Iterator { let is_alphanumeric = |s: &&str| s.chars().next().map_or(false, char::is_alphanumeric); string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric) } -#[derive(Default)] struct Indexed { fst: fst::Set>, - postings_ids: FastMap4, + postings_ids: FastMap4, headers: Vec, documents: Vec<(DocumentId, Vec)>, } -impl Indexed { - fn merge_with(mut self, mut other: Indexed) -> Indexed { +#[derive(Default)] +struct MtblKvStore(Option); - // Union of the two FSTs - let op = fst::set::OpBuilder::new() - .add(self.fst.into_stream()) - .add(other.fst.into_stream()) - .r#union(); +impl MtblKvStore { + fn from_indexed(mut indexed: Indexed) -> anyhow::Result { + let outfile = tempfile::tempfile()?; + let mut out = Writer::new(outfile, None)?; - let mut build = fst::SetBuilder::memory(); - build.extend_stream(op.into_stream()).unwrap(); - let fst = build.into_set(); + out.add(b"\0headers", indexed.headers)?; + out.add(b"\0words-fst", indexed.fst.as_fst().as_bytes())?; - // Merge the postings by unions - for (word, mut postings) in other.postings_ids { - match self.postings_ids.entry(word) { - Entry::Occupied(mut entry) => { - let old = entry.get(); - postings.union_with(&old); - entry.insert(postings); - }, - Entry::Vacant(entry) => { - entry.insert(postings); - }, + // postings ids keys are all prefixed by a '1' + let mut key = vec![1]; + let mut buffer = Vec::new(); + // We must write the postings ids in order for mtbl therefore + // we iterate over the fst to read the words in order + let mut stream = indexed.fst.stream(); + while let Some(word) = stream.next() { + key.truncate(1); + key.extend_from_slice(word); + if let Some(ids) = indexed.postings_ids.remove(word) { + buffer.clear(); + ids.serialize_into(&mut buffer)?; + out.add(&key, &buffer).unwrap(); } } - // assert headers are valid - if !self.headers.is_empty() { - assert_eq!(self.headers, other.headers); + // postings ids keys are all prefixed by a '2' + key[0] = 2; + indexed.documents.sort_unstable(); + for (id, content) in indexed.documents { + key.truncate(1); + key.extend_from_slice(&id.to_be_bytes()); + out.add(&key, content).unwrap(); } - // extend the documents - self.documents.append(&mut other.documents); + let out = out.into_inner()?; + Ok(MtblKvStore(Some(out))) + } - Indexed { - fst, - postings_ids: self.postings_ids, - headers: self.headers, - documents: self.documents, + fn merge_with(self, other: MtblKvStore) -> anyhow::Result { + let (left, right) = match (self.0, other.0) { + (Some(left), Some(right)) => (left, right), + (Some(left), None) => return Ok(MtblKvStore(Some(left))), + (None, Some(right)) => return Ok(MtblKvStore(Some(right))), + (None, None) => return Ok(MtblKvStore(None)), + }; + + let left = unsafe { memmap::Mmap::map(&left)? }; + let right = unsafe { memmap::Mmap::map(&right)? }; + + let left = Reader::new(&left, ReaderOptions::default()).unwrap(); + let right = Reader::new(&right, ReaderOptions::default()).unwrap(); + + fn merge(key: &[u8], left: &[u8], right: &[u8]) -> Option> { + if key == b"\0words-fst" { + let left_fst = fst::Set::new(left).unwrap(); + let right_fst = fst::Set::new(right).unwrap(); + + // Union of the two FSTs + let op = fst::set::OpBuilder::new() + .add(left_fst.into_stream()) + .add(right_fst.into_stream()) + .r#union(); + + let mut build = fst::SetBuilder::memory(); + build.extend_stream(op.into_stream()).unwrap(); + Some(build.into_inner().unwrap()) + } + else if key == b"\0headers" { + assert_eq!(left, right); + Some(left.to_vec()) + } + else if key.starts_with(&[1]) { + let mut left = RoaringBitmap::deserialize_from(left).unwrap(); + let right = RoaringBitmap::deserialize_from(right).unwrap(); + left.union_with(&right); + let mut vec = Vec::new(); + left.serialize_into(&mut vec).unwrap(); + Some(vec) + } + else if key.starts_with(&[2]) { + assert_eq!(left, right); + Some(left.to_vec()) + } + else { + panic!("wut? {:?}", key) + } } + + let outfile = tempfile::tempfile()?; + let mut out = Writer::new(outfile, None)?; + + let sources = vec![left, right]; + let opt = MergerOptions { merge }; + let mut merger = Merger::new(sources, opt); + + let mut iter = merger.iter(); + while let Some((k, v)) = iter.next() { + out.add(k, v).unwrap(); + } + + let out = out.into_inner()?; + Ok(MtblKvStore(Some(out))) } } -fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { +fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; @@ -153,7 +181,7 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { for (_attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { for (_pos, word) in alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { if !word.is_empty() && word.len() < 500 { // LMDB limits - postings_ids.entry(SmallString32::from(word)) + postings_ids.entry(SmallVec32::from(word.as_bytes())) .or_insert_with(RoaringBitmap::new) .insert(document_id); } @@ -173,44 +201,51 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { new_words.insert(word.clone()); } - let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallString32::as_str))?; + let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?; - Ok(Indexed { fst: new_words_fst, headers, postings_ids, documents }) + let indexed = Indexed { fst: new_words_fst, headers, postings_ids, documents }; + + MtblKvStore::from_indexed(indexed) } +// TODO merge with the previous values fn writer( wtxn: &mut heed::RwTxn, main: PolyDatabase, postings_ids: Database, documents: Database, ByteSlice>, - indexed: Indexed, + mtbl_store: MtblKvStore, ) -> anyhow::Result { - // Write and merge the words fst - let old_value = main.get::<_, Str, ByteSlice>(wtxn, "words-fst")?; - let new_value = union_words_fst(b"words-fst", old_value, &indexed.fst) - .context("error while do a words-fst union")?; - main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &new_value)?; + let mtbl_store = match mtbl_store.0 { + Some(store) => unsafe { memmap::Mmap::map(&store)? }, + None => return Ok(0), + }; + let mtbl_store = Reader::new(&mtbl_store, ReaderOptions::default()).unwrap(); + + // Write the words fst + let fst = mtbl_store.get(b"\0words-fst").unwrap(); + let fst = fst::Set::new(fst)?; + main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?; // Write and merge the headers - if let Some(old_headers) = main.get::<_, Str, ByteSlice>(wtxn, "headers")? { - ensure!(old_headers == &*indexed.headers, "headers differs from the previous ones"); - } - main.put::<_, Str, ByteSlice>(wtxn, "headers", &indexed.headers)?; + let headers = mtbl_store.get(b"\0headers").unwrap(); + main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?; // Write and merge the postings lists - for (word, postings) in indexed.postings_ids { - let old_value = postings_ids.get(wtxn, word.as_str())?; - let new_value = union_postings_ids(word.as_bytes(), old_value, postings) - .context("error while do a words-fst union")?; - postings_ids.put(wtxn, &word, &new_value)?; + let mut iter = mtbl_store.iter_prefix(&[1]).unwrap(); + while let Some((word, postings)) = iter.next() { + let word = std::str::from_utf8(&word[1..]).unwrap(); + postings_ids.put(wtxn, &word, &postings)?; } - let count = indexed.documents.len(); - // Write the documents - for (id, content) in indexed.documents { + let mut count = 0; + let mut iter = mtbl_store.iter_prefix(&[2]).unwrap(); + while let Some((id_bytes, content)) = iter.next() { + let id = id_bytes[1..].try_into().map(u32::from_be_bytes).unwrap(); documents.put(wtxn, &BEU32::new(id), &content)?; + count += 1; } Ok(count) @@ -232,29 +267,23 @@ fn main() -> anyhow::Result<()> { let res = opt.files_to_index .into_par_iter() - .try_fold(|| Indexed::default(), |acc, path| { + .try_fold(MtblKvStore::default, |acc, path| { let rdr = csv::Reader::from_path(path)?; - let indexed = index_csv(rdr)?; - Ok(acc.merge_with(indexed)) as anyhow::Result - }) - .map(|indexed| match indexed { - Ok(indexed) => { - let tid = rayon::current_thread_index(); - eprintln!("{:?}: A new step to write into LMDB", tid); - let mut wtxn = env.write_txn()?; - let count = writer(&mut wtxn, main, postings_ids, documents, indexed)?; - wtxn.commit()?; - eprintln!("{:?}: Wrote {} documents into LMDB", tid, count); - Ok(count) - }, - Err(e) => Err(e), + let mtbl_store = index_csv(rdr)?; + acc.merge_with(mtbl_store) }) .inspect(|_| { eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed)) }) - .try_reduce(|| 0, |a, b| Ok(a + b)); + .try_reduce(MtblKvStore::default, MtblKvStore::merge_with); - println!("indexed {:?} documents", res); + let mtbl_store = res?; + + eprintln!("We are writing into LMDB..."); + let mut wtxn = env.write_txn()?; + let count = writer(&mut wtxn, main, postings_ids, documents, mtbl_store)?; + wtxn.commit()?; + eprintln!("Wrote {} documents into LMDB", count); Ok(()) }