diff --git a/Cargo.lock b/Cargo.lock index 18931ea68..1660704b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,18 +9,62 @@ dependencies = [ "memchr", ] +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + [[package]] name = "anyhow" version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "bindgen" +version = "0.53.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c72a978d268b1d70b0e963217e60fdabd9523a941457a6c42a7315d15c7e89e5" +dependencies = [ + "bitflags", + "cexpr", + "cfg-if", + "clang-sys", + "clap", + "env_logger", + "lazy_static", + "lazycell", + "log", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "which", +] + [[package]] name = "bitflags" version = "1.2.1" @@ -59,6 +103,18 @@ name = "cc" version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" +dependencies = [ + "jobserver", +] + +[[package]] +name = "cexpr" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -66,33 +122,30 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "clang-sys" +version = "0.29.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "2.33.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" dependencies = [ + "ansi_term", + "atty", "bitflags", + "strsim", "textwrap", "unicode-width", -] - -[[package]] -name = "cloudabi" -version = "0.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" -dependencies = [ - "bitflags", -] - -[[package]] -name = "crc32fast" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" -dependencies = [ - "cfg-if", + "vec_map", ] [[package]] @@ -182,18 +235,11 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" dependencies = [ + "atty", + "humantime", "log", "regex", -] - -[[package]] -name = "fs2" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" -dependencies = [ - "libc", - "winapi", + "termcolor", ] [[package]] @@ -228,6 +274,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "heck" version = "0.3.1" @@ -246,6 +298,15 @@ dependencies = [ "libc", ] +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + [[package]] name = "itoa" version = "0.4.5" @@ -273,12 +334,27 @@ dependencies = [ "libc", ] +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f" + [[package]] name = "libc" version = "0.2.70" @@ -286,12 +362,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" [[package]] -name = "lock_api" -version = "0.3.4" +name = "libloading" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753" dependencies = [ - "scopeguard", + "cc", + "winapi", +] + +[[package]] +name = "librocksdb-sys" +version = "6.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "883213ae3d09bfc3d104aefe94b25ebb183b6f4d3a515b23b14817e1f4854005" +dependencies = [ + "bindgen", + "cc", + "glob", + "libc", ] [[package]] @@ -322,8 +411,8 @@ dependencies = [ "jemallocator", "quickcheck", "rayon", + "rocksdb", "sdset", - "sled", "slice-group-by", "smallstr", "structopt", @@ -345,6 +434,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "nom" +version = "5.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b471253da97532da4b61552249c521e01e736071f71c1a4f7ebbfbf0a06aad6" +dependencies = [ + "memchr", + "version_check", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -356,28 +455,10 @@ dependencies = [ ] [[package]] -name = "parking_lot" -version = "0.10.2" +name = "peeking_take_while" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" -dependencies = [ - "cfg-if", - "cloudabi", - "libc", - "redox_syscall", - "smallvec", - "winapi", -] +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "ppv-lite86" @@ -420,6 +501,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quickcheck" version = "0.9.2" @@ -506,12 +593,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "redox_syscall" -version = "0.1.56" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" - [[package]] name = "regex" version = "1.3.7" @@ -539,6 +620,22 @@ version = "0.6.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" +[[package]] +name = "rocksdb" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61aa17a99a2413cd71c1106691bf59dad7de0cd5099127f90e9d99c429c40d4a" +dependencies = [ + "libc", + "librocksdb-sys", +] + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "ryu" version = "1.0.4" @@ -564,19 +661,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c" [[package]] -name = "sled" -version = "0.31.0" -source = "git+https://github.com/spacejam/sled.git?rev=2fe05c9#2fe05c933a4a68d4dbbc06a16a3058236fcc6350" -dependencies = [ - "crc32fast", - "crossbeam-epoch", - "crossbeam-utils", - "fs2", - "fxhash", - "libc", - "log", - "parking_lot", -] +name = "shlex" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" [[package]] name = "slice-group-by" @@ -599,6 +687,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "structopt" version = "0.3.14" @@ -657,6 +751,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "termcolor" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" +dependencies = [ + "winapi-util", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -693,6 +796,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.2" @@ -705,6 +814,15 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "which" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724" +dependencies = [ + "libc", +] + [[package]] name = "winapi" version = "0.3.8" @@ -721,6 +839,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index cfa000799..5186d5964 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,8 @@ fst = "0.4.3" fxhash = "0.2.1" jemallocator = "0.3.2" rayon = "1.3.0" +rocksdb = "0.14.0" sdset = "0.4.0" -sled = { git = "https://github.com/spacejam/sled.git", rev = "2fe05c9"} slice-group-by = "0.2.6" smallstr = "0.2.0" structopt = { version = "0.3.14", default-features = false } diff --git a/src/bp_vec.rs b/src/bp_vec.rs index d567ac0a5..f91e6aa22 100644 --- a/src/bp_vec.rs +++ b/src/bp_vec.rs @@ -37,8 +37,12 @@ impl BpVec { uncompressed } - pub fn capacity(&self) -> usize { - self.compressed.capacity() + self.uncompressed.capacity() + pub fn compressed_capacity(&self) -> usize { + self.compressed.capacity() + } + + pub fn uncompressed_capacity(&self) -> usize { + self.uncompressed.capacity() } } diff --git a/src/codec/bitpacker_sorted.rs b/src/codec/bitpacker_sorted.rs index 274e2c2bb..c51b4d71c 100644 --- a/src/codec/bitpacker_sorted.rs +++ b/src/codec/bitpacker_sorted.rs @@ -58,6 +58,12 @@ impl CodecBitPacker4xSorted { let mut initial_value = 0; while let Some(num_bits) = bytes.get(0) { + if *num_bits == 0 { + decompressed.resize(decompressed.len() + BitPacker4x::BLOCK_LEN, initial_value); + bytes = &bytes[1..]; + continue; + } + let block_size = BitPacker4x::compressed_block_size(*num_bits); let new_len = decompressed.len() + BitPacker4x::BLOCK_LEN; diff --git a/src/main.rs b/src/main.rs index c8d345baf..e4ce5aeca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,8 @@ use std::convert::TryFrom; use std::fs::File; use std::hash::BuildHasherDefault; use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use anyhow::{ensure, Context}; use fst::IntoStreamer; @@ -28,6 +30,8 @@ pub type SmallString32 = smallstr::SmallString<[u8; 32]>; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0); + #[derive(Debug, StructOpt)] #[structopt(name = "mm-indexer", about = "The server side of the daugt project.")] struct Opt { @@ -40,40 +44,58 @@ struct Opt { files_to_index: Vec, } -fn union_bitpacked_postings_ids(_key: &[u8], old_value: Option<&[u8]>, new_value: &[u8]) -> Option> { - if old_value.is_none() { - return Some(new_value.to_vec()) +fn union_bitpacked_postings_ids( + _key: &[u8], + old_value: Option<&[u8]>, + operands: &mut rocksdb::MergeOperands, +) -> Option> +{ + let mut sets_bufs = Vec::new(); + + if let Some(old_value) = old_value { + let old_value = CodecBitPacker4xSorted::bytes_decode(old_value).unwrap(); + sets_bufs.push(SetBuf::new(old_value).unwrap()); } - let old_value = old_value.unwrap_or_default(); - let old_value = CodecBitPacker4xSorted::bytes_decode(&old_value).unwrap(); - let new_value = CodecBitPacker4xSorted::bytes_decode(&new_value).unwrap(); + for operand in operands { + let new_value = CodecBitPacker4xSorted::bytes_decode(operand).unwrap(); + sets_bufs.push(SetBuf::new(new_value).unwrap()); + } - let old_set = SetBuf::new(old_value).unwrap(); - let new_set = SetBuf::new(new_value).unwrap(); - - let result = sdset::duo::Union::new(&old_set, &new_set).into_set_buf(); + let sets = sets_bufs.iter().map(|s| s.as_set()).collect(); + let result = sdset::multi::Union::new(sets).into_set_buf(); let compressed = CodecBitPacker4xSorted::bytes_encode(&result).unwrap(); Some(compressed) } -fn union_words_fst(key: &[u8], old_value: Option<&[u8]>, new_value: &[u8]) -> Option> { +fn union_words_fst( + key: &[u8], + old_value: Option<&[u8]>, + operands: &mut rocksdb::MergeOperands, +) -> Option> +{ if key != b"words-fst" { unimplemented!() } - let old_value = match old_value { - Some(old_value) => old_value, - None => return Some(new_value.to_vec()), - }; - - eprintln!("old_words size: {}", old_value.len()); - eprintln!("new_words size: {}", new_value.len()); - - let old_words = fst::Set::new(old_value).unwrap(); - let new_words = fst::Set::new(new_value).unwrap(); + let mut fst_operands = Vec::new(); + for operand in operands { + fst_operands.push(fst::Set::new(operand).unwrap()); + } // Do an union of the old and the new set of words. - let op = old_words.op().add(new_words.into_stream()).r#union(); + let mut builder = fst::set::OpBuilder::new(); + + let old_words = old_value.map(|v| fst::Set::new(v).unwrap()); + let old_words = old_words.as_ref().map(|v| v.into_stream()); + if let Some(old_words) = old_words { + builder.push(old_words); + } + + for new_words in &fst_operands { + builder.push(new_words.into_stream()); + } + + let op = builder.r#union(); let mut build = fst::SetBuilder::memory(); build.extend_stream(op.into_stream()).unwrap(); @@ -85,13 +107,18 @@ fn alphanumeric_tokens(string: &str) -> impl Iterator { string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric) } -fn index_csv(tid: usize, db: sled::Db, mut rdr: csv::Reader) -> anyhow::Result { +fn index_csv( + tid: usize, + db: Arc, + mut rdr: csv::Reader, +) -> anyhow::Result +{ const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; - let main = &*db; - let postings_ids = db.open_tree("postings-ids")?; - let documents = db.open_tree("documents")?; + let main = db.cf_handle("main").context("cf \"main\" not found")?; + let postings_ids = db.cf_handle("postings-ids").context("cf \"postings-ids\" not found")?; + let documents = db.cf_handle("documents").context("cf \"documents\" not found")?; let mut document = csv::StringRecord::new(); let mut new_postings_ids = FastMap4::default(); @@ -104,12 +131,13 @@ fn index_csv(tid: usize, db: sled::Db, mut rdr: csv::Reader) -> anyhow::Re writer.write_byte_record(headers.as_byte_record())?; let headers = writer.into_inner()?; - if let Some(old_headers) = main.insert("headers", headers.as_slice())? { + if let Some(old_headers) = db.get_cf(&main, "headers")? { ensure!(old_headers == headers, "headers differs from the previous ones"); } + db.put_cf(&main, "headers", headers.as_slice())?; while rdr.read_record(&mut document)? { - let document_id = db.generate_id()?; + let document_id = ID_GENERATOR.fetch_add(1, Ordering::SeqCst); let document_id = u32::try_from(document_id).context("Generated id is too big")?; for (_attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { @@ -122,11 +150,13 @@ fn index_csv(tid: usize, db: sled::Db, mut rdr: csv::Reader) -> anyhow::Re let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); writer.write_byte_record(document.as_byte_record())?; let document = writer.into_inner()?; - documents.insert(document_id.to_be_bytes(), document)?; + db.put_cf(&documents, document_id.to_be_bytes(), document)?; number_of_documents += 1; if number_of_documents % 100000 == 0 { - let postings_ids_size = new_postings_ids.iter().map(|(_, v)| v.capacity() * 4).sum::(); + let postings_ids_size = new_postings_ids.iter().map(|(_, v)| { + v.compressed_capacity() + v.uncompressed_capacity() * 4 + }).sum::(); eprintln!("{}, documents seen {}, postings size {}", tid, number_of_documents, postings_ids_size); } @@ -140,7 +170,7 @@ fn index_csv(tid: usize, db: sled::Db, mut rdr: csv::Reader) -> anyhow::Re let compressed = CodecBitPacker4xSorted::bytes_encode(&new_ids) .context("error while compressing using CodecBitPacker4xSorted")?; - postings_ids.merge(word.as_bytes(), compressed)?; + db.merge_cf(&postings_ids, word.as_bytes(), compressed)?; new_words.insert(word); } @@ -151,7 +181,7 @@ fn index_csv(tid: usize, db: sled::Db, mut rdr: csv::Reader) -> anyhow::Re let new_words_fst = fst::Set::from_iter(new_words.iter().map(|s| s.as_str()))?; drop(new_words); - main.merge("words-fst", new_words_fst.as_fst().as_bytes())?; + db.merge_cf(&main, "words-fst", new_words_fst.as_fst().as_bytes())?; eprintln!("Finished merging the words-fst"); @@ -161,16 +191,21 @@ fn index_csv(tid: usize, db: sled::Db, mut rdr: csv::Reader) -> anyhow::Re fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); - let db = sled::open(opt.database)?; - let main = &*db; - + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); // Setup the merge operators - main.set_merge_operator(union_words_fst); - let postings_ids = db.open_tree("postings-ids")?; - postings_ids.set_merge_operator(union_bitpacked_postings_ids); - // ... - let _documents = db.open_tree("documents")?; + opts.set_merge_operator("main", union_words_fst, Some(union_words_fst)); + opts.set_merge_operator("postings-ids", union_bitpacked_postings_ids, Some(union_bitpacked_postings_ids)); + let mut db = rocksdb::DB::open(&opts, &opt.database)?; + + let cfs = &["main", "postings-ids", "documents"]; + for cf in cfs.into_iter() { + db.create_cf(cf, &opts).unwrap(); + } + + let db = Arc::new(db); let res = opt.files_to_index .into_par_iter() .enumerate()