From 1237306ca8254d7d7b828a2e2bcde8a9eb12c674 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Sat, 30 May 2020 15:35:33 +0200 Subject: [PATCH] Introduce a thread that write to heed --- Cargo.lock | 448 ++++++++++------------------------ Cargo.toml | 9 +- src/bp_vec.rs | 201 --------------- src/codec/bitpacker_sorted.rs | 90 ------- src/codec/mod.rs | 3 - src/main.rs | 251 ++++++++++--------- 6 files changed, 269 insertions(+), 733 deletions(-) delete mode 100644 src/bp_vec.rs delete mode 100644 src/codec/bitpacker_sorted.rs delete mode 100644 src/codec/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 1660704b0..24008fc08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,40 +1,11 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -[[package]] -name = "aho-corasick" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" -dependencies = [ - "memchr", -] - -[[package]] -name = "ansi_term" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" -dependencies = [ - "winapi", -] - [[package]] name = "anyhow" version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.0.0" @@ -42,27 +13,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" [[package]] -name = "bindgen" -version = "0.53.3" +name = "bincode" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c72a978d268b1d70b0e963217e60fdabd9523a941457a6c42a7315d15c7e89e5" +checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" dependencies = [ - "bitflags", - "cexpr", - "cfg-if", - "clang-sys", - "clap", - "env_logger", - "lazy_static", - "lazycell", - "log", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", - "which", + "byteorder 1.3.4", + "serde", ] [[package]] @@ -92,6 +49,12 @@ dependencies = [ "serde", ] +[[package]] +name = "byteorder" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" + [[package]] name = "byteorder" version = "1.3.4" @@ -103,18 +66,6 @@ name = "cc" version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" -dependencies = [ - "jobserver", -] - -[[package]] -name = "cexpr" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27" -dependencies = [ - "nom", -] [[package]] name = "cfg-if" @@ -122,30 +73,25 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" -[[package]] -name = "clang-sys" -version = "0.29.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a" -dependencies = [ - "glob", - "libc", - "libloading", -] - [[package]] name = "clap" version = "2.33.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" dependencies = [ - "ansi_term", - "atty", "bitflags", - "strsim", "textwrap", "unicode-width", - "vec_map", +] + +[[package]] +name = "crossbeam-channel" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" +dependencies = [ + "crossbeam-utils", + "maybe-uninit", ] [[package]] @@ -176,9 +122,9 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" +checksum = "ab6bffe714b6bb07e42f201352c34f51fefd355ace793f9e638ebd52d23f98d2" dependencies = [ "cfg-if", "crossbeam-utils", @@ -229,19 +175,6 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" -[[package]] -name = "env_logger" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" -dependencies = [ - "atty", - "humantime", - "log", - "regex", - "termcolor", -] - [[package]] name = "fs_extra" version = "1.1.0" @@ -260,26 +193,9 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" dependencies = [ - "byteorder", + "byteorder 1.3.4", ] -[[package]] -name = "getrandom" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - -[[package]] -name = "glob" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" - [[package]] name = "heck" version = "0.3.1" @@ -289,6 +205,42 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heed" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7882b766b4be1b90d8ce5ce4c7aca2539b43176a708dbc8e79576dbbdbba93" +dependencies = [ + "byteorder 1.3.4", + "heed-traits", + "heed-types", + "libc", + "lmdb-rkv-sys", + "once_cell", + "page_size", + "url", + "zerocopy", +] + +[[package]] +name = "heed-traits" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b328f6260a7e51bdb0ca6b68e6ea27ee3d11fba5dee930896ee7ff6ad5fc072c" + +[[package]] +name = "heed-types" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e100387815256b00dbb4f48db990f7fa03e9b88b4a89c2a1661b7d9d77b77c46" +dependencies = [ + "bincode", + "heed-traits", + "serde", + "serde_json", + "zerocopy", +] + [[package]] name = "hermit-abi" version = "0.1.13" @@ -299,12 +251,14 @@ dependencies = [ ] [[package]] -name = "humantime" -version = "1.3.0" +name = "idna" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +checksum = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" dependencies = [ - "quick-error", + "matches", + "unicode-bidi", + "unicode-normalization", ] [[package]] @@ -334,27 +288,12 @@ dependencies = [ "libc", ] -[[package]] -name = "jobserver" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" -dependencies = [ - "libc", -] - [[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lazycell" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f" - [[package]] name = "libc" version = "0.2.70" @@ -362,35 +301,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" [[package]] -name = "libloading" -version = "0.5.2" +name = "lmdb-rkv-sys" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753" +checksum = "b27470ac25167b3afdfb6af8fcd3bc1be67de50ffbdaf4073378cfded6ae24a5" dependencies = [ "cc", - "winapi", -] - -[[package]] -name = "librocksdb-sys" -version = "6.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "883213ae3d09bfc3d104aefe94b25ebb183b6f4d3a515b23b14817e1f4854005" -dependencies = [ - "bindgen", - "cc", - "glob", "libc", + "pkg-config", ] [[package]] -name = "log" -version = "0.4.8" +name = "matches" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" -dependencies = [ - "cfg-if", -] +checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" [[package]] name = "maybe-uninit" @@ -404,19 +329,18 @@ version = "0.1.0" dependencies = [ "anyhow", "bitpacking", - "byteorder", + "byteorder 1.3.4", + "crossbeam-channel", "csv", "fst", "fxhash", + "heed", "jemallocator", - "quickcheck", "rayon", - "rocksdb", - "sdset", + "roaring", "slice-group-by", "smallstr", "structopt", - "zerocopy", ] [[package]] @@ -434,16 +358,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "nom" -version = "5.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b471253da97532da4b61552249c521e01e736071f71c1a4f7ebbfbf0a06aad6" -dependencies = [ - "memchr", - "version_check", -] - [[package]] name = "num_cpus" version = "1.13.0" @@ -455,16 +369,32 @@ dependencies = [ ] [[package]] -name = "peeking_take_while" -version = "0.1.2" +name = "once_cell" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" [[package]] -name = "ppv-lite86" -version = "0.2.8" +name = "page_size" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" +checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pkg-config" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" [[package]] name = "proc-macro-error" @@ -501,24 +431,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - -[[package]] -name = "quickcheck" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44883e74aa97ad63db83c4bf8ca490f02b2fc02f92575e720c8551e843c945f" -dependencies = [ - "env_logger", - "log", - "rand", - "rand_core", -] - [[package]] name = "quote" version = "1.0.6" @@ -528,47 +440,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom", - "libc", - "rand_chacha", - "rand_core", - "rand_hc", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core", -] - [[package]] name = "rayon" version = "1.3.0" @@ -593,49 +464,24 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "regex" -version = "1.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6020f034922e3194c711b82a627453881bc4682166cabb07134a10c26ba7692" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", - "thread_local", -] - [[package]] name = "regex-automata" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" dependencies = [ - "byteorder", + "byteorder 1.3.4", ] [[package]] -name = "regex-syntax" -version = "0.6.17" +name = "roaring" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" - -[[package]] -name = "rocksdb" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61aa17a99a2413cd71c1106691bf59dad7de0cd5099127f90e9d99c429c40d4a" +checksum = "4af20e5d3e44732a57489fa297768ca29361b54fbc3b20cdeb738fa6932cc22d" dependencies = [ - "libc", - "librocksdb-sys", + "byteorder 0.5.3", ] -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - [[package]] name = "ryu" version = "1.0.4" @@ -648,12 +494,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "sdset" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbb21fe0588557792176c89bc7b943027b14f346d03c6be6a199c2860277d93a" - [[package]] name = "serde" version = "1.0.110" @@ -661,10 +501,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c" [[package]] -name = "shlex" -version = "0.1.1" +name = "serde_json" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" +checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" +dependencies = [ + "itoa", + "ryu", + "serde", +] [[package]] name = "slice-group-by" @@ -687,12 +532,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - [[package]] name = "structopt" version = "0.3.14" @@ -751,15 +590,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "termcolor" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" -dependencies = [ - "winapi-util", -] - [[package]] name = "textwrap" version = "0.11.0" @@ -770,12 +600,21 @@ dependencies = [ ] [[package]] -name = "thread_local" -version = "1.0.1" +name = "unicode-bidi" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +checksum = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" dependencies = [ - "lazy_static", + "matches", +] + +[[package]] +name = "unicode-normalization" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" +dependencies = [ + "smallvec", ] [[package]] @@ -797,10 +636,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" [[package]] -name = "vec_map" -version = "0.8.2" +name = "url" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +checksum = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" +dependencies = [ + "idna", + "matches", + "percent-encoding", +] [[package]] name = "version_check" @@ -808,21 +652,6 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - -[[package]] -name = "which" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724" -dependencies = [ - "libc", -] - [[package]] name = "winapi" version = "0.3.8" @@ -839,15 +668,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -860,7 +680,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6580539ad917b7c026220c4b3f2c08d52ce54d6ce0dc491e66002e35388fab46" dependencies = [ - "byteorder", + "byteorder 1.3.4", "zerocopy-derive", ] diff --git a/Cargo.toml b/Cargo.toml index 5186d5964..51966ebf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,20 +8,17 @@ edition = "2018" anyhow = "1.0.28" bitpacking = "0.8.2" byteorder = "1.3.4" +roaring = "0.5.2" +crossbeam-channel = "0.4.2" csv = "1.1.3" fst = "0.4.3" fxhash = "0.2.1" +heed = { version = "0.8.0", default-features = false, features = ["lmdb"] } jemallocator = "0.3.2" rayon = "1.3.0" -rocksdb = "0.14.0" -sdset = "0.4.0" slice-group-by = "0.2.6" smallstr = "0.2.0" structopt = { version = "0.3.14", default-features = false } -zerocopy = "0.3.0" - -[dev-dependencies] -quickcheck = "0.9.2" [profile.release] debug = true diff --git a/src/bp_vec.rs b/src/bp_vec.rs deleted file mode 100644 index f91e6aa22..000000000 --- a/src/bp_vec.rs +++ /dev/null @@ -1,201 +0,0 @@ -use byteorder::{ByteOrder, NativeEndian}; -use bitpacking::{BitPacker, BitPacker4x}; - -/// An append only bitpacked u32 vector that ignore order of insertion. -#[derive(Default)] -pub struct BpVec { - compressed: Vec, - uncompressed: Vec, -} - -impl BpVec { - pub fn new() -> BpVec { - BpVec::default() - } - - pub fn push(&mut self, elem: u32) { - self.uncompressed.push(elem); - if self.uncompressed.len() == BitPacker4x::BLOCK_LEN { - encode(&mut self.uncompressed[..], &mut self.compressed); - self.uncompressed.clear(); - } - } - - pub fn extend_from_slice(&mut self, elems: &[u32]) { - self.uncompressed.extend_from_slice(elems); - let remaining = self.uncompressed.len() % BitPacker4x::BLOCK_LEN; - for chunk in self.uncompressed[remaining..].chunks_exact_mut(BitPacker4x::BLOCK_LEN) { - encode(chunk, &mut self.compressed); - } - self.uncompressed.truncate(remaining); - self.uncompressed.shrink_to_fit(); - } - - pub fn to_vec(self) -> Vec { - let BpVec { compressed, mut uncompressed } = self; - decode(&compressed, &mut uncompressed); - uncompressed - } - - pub fn compressed_capacity(&self) -> usize { - self.compressed.capacity() - } - - pub fn uncompressed_capacity(&self) -> usize { - self.uncompressed.capacity() - } -} - -fn encode(items: &mut [u32], encoded: &mut Vec) { - assert_eq!(items.len(), BitPacker4x::BLOCK_LEN); - - let bitpacker = BitPacker4x::new(); - - // We reserve enough space in the output buffer, filled with zeroes. - let len = encoded.len(); - // initial_value + num_bits + encoded numbers - let max_possible_length = 4 + 1 + 4 * BitPacker4x::BLOCK_LEN; - encoded.resize(len + max_possible_length, 0); - - // We sort the items to be able to efficiently bitpack them. - items.sort_unstable(); - // We save the initial value to us for this block, the lowest one. - let initial_value = items[0]; - // We compute the number of bits necessary to encode this block - let num_bits = bitpacker.num_bits_sorted(initial_value, items); - - // We write the initial value for this block. - let buffer = &mut encoded[len..]; - NativeEndian::write_u32(buffer, initial_value); - // We write the num_bits that will be read to decode this block - let buffer = &mut buffer[4..]; - buffer[0] = num_bits; - // We encode the block numbers into the buffer using the num_bits - let buffer = &mut buffer[1..]; - let compressed_len = bitpacker.compress_sorted(initial_value, items, buffer, num_bits); - - // We truncate the buffer to the avoid leaking padding zeroes - encoded.truncate(len + 4 + 1 + compressed_len); -} - -fn decode(mut encoded: &[u8], decoded: &mut Vec) { - let bitpacker = BitPacker4x::new(); - - // initial_value + num_bits - while let Some(header) = encoded.get(0..4 + 1) { - // We extract the header informations - let initial_value = NativeEndian::read_u32(header); - let num_bits = header[4]; - let bytes = &encoded[4 + 1..]; - - // If the num_bits is equal to zero it means that all encoded numbers were zeroes - if num_bits == 0 { - decoded.resize(decoded.len() + BitPacker4x::BLOCK_LEN, initial_value); - encoded = bytes; - continue; - } - - // We guess the block size based on the num_bits used for this block - let block_size = BitPacker4x::compressed_block_size(num_bits); - - // We pad the decoded vector with zeroes - let new_len = decoded.len() + BitPacker4x::BLOCK_LEN; - decoded.resize(new_len, 0); - - // Create a view into the decoded buffer and decode into it - let to_decompress = &mut decoded[new_len - BitPacker4x::BLOCK_LEN..new_len]; - bitpacker.decompress_sorted(initial_value, &bytes[..block_size], to_decompress, num_bits); - - // Advance the bytes offset to read the next block (+ num_bits) - encoded = &bytes[block_size..]; - } -} - -impl sdset::Collection for BpVec { - fn push(&mut self, elem: u32) { - BpVec::push(self, elem); - } - - fn extend_from_slice(&mut self, elems: &[u32]) { - BpVec::extend_from_slice(self, elems); - } - - fn extend(&mut self, elems: I) where I: IntoIterator { - elems.into_iter().for_each(|x| BpVec::push(self, x)); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - quickcheck! { - fn qc_push(xs: Vec) -> bool { - let mut xs: Vec<_> = xs.iter().cloned().cycle().take(1300).collect(); - - let mut bpvec = BpVec::new(); - xs.iter().for_each(|x| bpvec.push(*x)); - let mut result = bpvec.to_vec(); - - result.sort_unstable(); - xs.sort_unstable(); - - xs == result - } - } - - quickcheck! { - fn qc_extend_from_slice(xs: Vec) -> bool { - let mut xs: Vec<_> = xs.iter().cloned().cycle().take(1300).collect(); - - let mut bpvec = BpVec::new(); - bpvec.extend_from_slice(&xs); - let mut result = bpvec.to_vec(); - - result.sort_unstable(); - xs.sort_unstable(); - - xs == result - } - } - - #[test] - fn empty() { - let mut bpvec = BpVec::new(); - bpvec.extend_from_slice(&[]); - let result = bpvec.to_vec(); - - assert!(result.is_empty()); - } - - #[test] - fn one_zero() { - let mut bpvec = BpVec::new(); - bpvec.extend_from_slice(&[0]); - let result = bpvec.to_vec(); - - assert_eq!(&[0], &*result); - } - - #[test] - fn many_zeros() { - let xs: Vec<_> = std::iter::repeat(0).take(1300).collect(); - - let mut bpvec = BpVec::new(); - bpvec.extend_from_slice(&xs); - let result = bpvec.to_vec(); - - assert_eq!(xs, result); - } - - #[test] - fn many_ones() { - let xs: Vec<_> = std::iter::repeat(1).take(1300).collect(); - - let mut bpvec = BpVec::new(); - bpvec.extend_from_slice(&xs); - let result = bpvec.to_vec(); - - assert_eq!(xs, result); - } -} diff --git a/src/codec/bitpacker_sorted.rs b/src/codec/bitpacker_sorted.rs deleted file mode 100644 index c51b4d71c..000000000 --- a/src/codec/bitpacker_sorted.rs +++ /dev/null @@ -1,90 +0,0 @@ -use bitpacking::{BitPacker, BitPacker4x}; -use byteorder::{ReadBytesExt, NativeEndian}; -use zerocopy::AsBytes; - -pub struct CodecBitPacker4xSorted; - -impl CodecBitPacker4xSorted { - pub fn bytes_encode(item: &[u32]) -> Option> { - // This is a hotfix to the SIGSEGV - // https://github.com/tantivy-search/bitpacking/issues/23 - if item.is_empty() { - return Some(Vec::default()) - } - - let bitpacker = BitPacker4x::new(); - let mut compressed = Vec::new(); - let mut initial_value = 0; - - // The number of remaining numbers that don't fit in the block size. - compressed.push((item.len() % BitPacker4x::BLOCK_LEN) as u8); - - // we cannot use a mut slice here because of #68630, TooGeneric error. - // we can probably avoid this new allocation by directly using the compressed final Vec. - let mut buffer = vec![0u8; 4 * BitPacker4x::BLOCK_LEN]; - - for chunk in item.chunks(BitPacker4x::BLOCK_LEN) { - if chunk.len() == BitPacker4x::BLOCK_LEN { - // compute the number of bits necessary to encode this block - let num_bits = bitpacker.num_bits_sorted(initial_value, chunk); - // Encode the block numbers into the buffer using the num_bits - let compressed_len = bitpacker.compress_sorted(initial_value, chunk, &mut buffer, num_bits); - // Write the num_bits that will be read to decode this block - compressed.push(num_bits); - // Wrtie the bytes of the compressed block numbers - compressed.extend_from_slice(&buffer[..compressed_len]); - // Save the initial_value, which is the last value of the n-1 used for the n block - initial_value = *chunk.last().unwrap(); - } else { - // Save the remaining numbers which don't fit inside of a BLOCK_LEN - compressed.extend_from_slice(chunk.as_bytes()); - } - } - - Some(compressed) - } - - pub fn bytes_decode(bytes: &[u8]) -> Option> { - if bytes.is_empty() { - return Some(Vec::new()) - } - - let bitpacker = BitPacker4x::new(); - let (remaining, bytes) = bytes.split_first().unwrap(); - let remaining = *remaining as usize; - - let (mut bytes, mut remaining_bytes) = bytes.split_at(bytes.len() - remaining * 4); - let mut decompressed = Vec::new(); - let mut initial_value = 0; - - while let Some(num_bits) = bytes.get(0) { - if *num_bits == 0 { - decompressed.resize(decompressed.len() + BitPacker4x::BLOCK_LEN, initial_value); - bytes = &bytes[1..]; - continue; - } - - let block_size = BitPacker4x::compressed_block_size(*num_bits); - - let new_len = decompressed.len() + BitPacker4x::BLOCK_LEN; - decompressed.resize(new_len, 0); - - // Create a view into the decompressed buffer and decomress into it - let to_decompress = &mut decompressed[new_len - BitPacker4x::BLOCK_LEN..new_len]; - bitpacker.decompress_sorted(initial_value, &bytes[1..block_size + 1], to_decompress, *num_bits); - - // Set the new initial_value for the next block - initial_value = *decompressed.last().unwrap(); - // Advance the bytes offset to read the next block (+ num_bits) - bytes = &bytes[block_size + 1..]; - } - - // We add the remaining uncompressed numbers. - let new_len = decompressed.len() + remaining; - decompressed.resize(new_len, 0); - let to_decompress = &mut decompressed[new_len - remaining..new_len]; - remaining_bytes.read_u32_into::(to_decompress).ok()?; - - Some(decompressed) - } -} diff --git a/src/codec/mod.rs b/src/codec/mod.rs deleted file mode 100644 index 451839fea..000000000 --- a/src/codec/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod bitpacker_sorted; - -pub use self::bitpacker_sorted::CodecBitPacker4xSorted; diff --git a/src/main.rs b/src/main.rs index 2d3cda946..a2784cae3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,37 +1,32 @@ -#[cfg(test)] -#[macro_use] extern crate quickcheck; - -mod codec; -mod bp_vec; - use std::collections::{HashMap, BTreeSet}; use std::convert::TryFrom; use std::fs::File; use std::hash::BuildHasherDefault; use std::path::PathBuf; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; use anyhow::{ensure, Context}; +use roaring::RoaringBitmap; +use crossbeam_channel::{select, Sender, Receiver}; use fst::IntoStreamer; use fxhash::FxHasher32; +use heed::{EnvOpenOptions, Database}; +use heed::types::*; use rayon::prelude::*; -use sdset::{SetOperation, SetBuf}; use slice_group_by::StrGroupBy; use structopt::StructOpt; -use zerocopy::{LayoutVerified, AsBytes}; - -// use self::codec::CodecBitPacker4xSorted; -use self::bp_vec::BpVec; pub type FastMap4 = HashMap>; pub type SmallString32 = smallstr::SmallString<[u8; 32]>; +pub type BEU32 = heed::zerocopy::U32; +pub type DocumentId = u32; #[cfg(target_os = "linux")] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0); +static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0); // AtomicU32 ? #[derive(Debug, StructOpt)] #[structopt(name = "mm-indexer", about = "The server side of the daugt project.")] @@ -45,73 +40,24 @@ struct Opt { files_to_index: Vec, } -fn bytes_to_u32s(bytes: &[u8]) -> Vec { - fn aligned_to(bytes: &[u8], align: usize) -> bool { - (bytes as *const _ as *const () as usize) % align == 0 - } - - match LayoutVerified::new_slice(bytes) { - Some(slice) => slice.to_vec(), - None => { - let len = bytes.len(); - - // ensure that it is the alignment that is wrong and the length is valid - assert!(len % 4 == 0, "length is {} and is not modulo 4", len); - assert!(!aligned_to(bytes, std::mem::align_of::()), "bytes are already aligned"); - - let elems = len / 4; - let mut vec = Vec::::with_capacity(elems); - - unsafe { - let dst = vec.as_mut_ptr() as *mut u8; - std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len); - vec.set_len(elems); - } - - vec +fn union_postings_ids(_key: &[u8], old_value: Option<&[u8]>, new_value: RoaringBitmap) -> Option> { + let result = match old_value { + Some(bytes) => { + let mut old_value = RoaringBitmap::deserialize_from(bytes).unwrap(); + old_value.union_with(&new_value); + old_value }, - } + None => new_value, + }; + + let mut vec = Vec::new(); + result.serialize_into(&mut vec).unwrap(); + Some(vec) } -fn union_postings_ids( - _key: &[u8], - old_value: Option<&[u8]>, - operands: &mut rocksdb::MergeOperands, -) -> Option> -{ - let mut sets_bufs = Vec::new(); - - if let Some(old_value) = old_value { - let old_value = bytes_to_u32s(old_value); - sets_bufs.push(SetBuf::new_unchecked(old_value.to_vec())); - } - - for operand in operands { - let new_value = bytes_to_u32s(operand); - sets_bufs.push(SetBuf::new_unchecked(new_value.to_vec())); - } - - let sets = sets_bufs.iter().map(|s| s.as_set()).collect(); - let result: SetBuf = sdset::multi::Union::new(sets).into_set_buf(); - - assert!(result.as_bytes().len() % 4 == 0); - - Some(result.as_bytes().to_vec()) -} - -fn union_words_fst( - key: &[u8], - old_value: Option<&[u8]>, - operands: &mut rocksdb::MergeOperands, -) -> Option> -{ +fn union_words_fst(key: &[u8], old_value: Option<&[u8]>, new_value: &fst::Set>) -> Option> { if key != b"words-fst" { unimplemented!() } - let mut fst_operands = Vec::new(); - for operand in operands { - fst_operands.push(fst::Set::new(operand).unwrap()); - } - // Do an union of the old and the new set of words. let mut builder = fst::set::OpBuilder::new(); @@ -121,9 +67,7 @@ fn union_words_fst( builder.push(old_words); } - for new_words in &fst_operands { - builder.push(new_words.into_stream()); - } + builder.push(new_value); let op = builder.r#union(); let mut build = fst::SetBuilder::memory(); @@ -137,19 +81,94 @@ fn alphanumeric_tokens(string: &str) -> impl Iterator { string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric) } -fn index_csv( - tid: usize, - db: Arc, - mut rdr: csv::Reader, -) -> anyhow::Result -{ +enum MainKey { + WordsFst(fst::Set>), + Headers(Vec), +} + +#[derive(Clone)] +struct DbSender { + main: Sender, + postings_ids: Sender<(SmallString32, RoaringBitmap)>, + documents: Sender<(DocumentId, Vec)>, +} + +struct DbReceiver { + main: Receiver, + postings_ids: Receiver<(SmallString32, RoaringBitmap)>, + documents: Receiver<(DocumentId, Vec)>, +} + +fn thread_channel() -> (DbSender, DbReceiver) { + let (sd_main, rc_main) = crossbeam_channel::bounded(4); + let (sd_postings, rc_postings) = crossbeam_channel::bounded(10); + let (sd_documents, rc_documents) = crossbeam_channel::bounded(10); + + let sender = DbSender { main: sd_main, postings_ids: sd_postings, documents: sd_documents }; + let receiver = DbReceiver { main: rc_main, postings_ids: rc_postings, documents: rc_documents }; + + (sender, receiver) +} + +fn writer_thread(env: heed::Env, receiver: DbReceiver) -> anyhow::Result<()> { + let main = env.create_poly_database(None)?; + let postings_ids: Database = env.create_database(Some("postings-ids"))?; + let documents: Database, ByteSlice> = env.create_database(Some("documents"))?; + + let mut wtxn = env.write_txn()?; + + loop { + select! { + recv(receiver.main) -> msg => { + let msg = match msg { + Err(_) => break, + Ok(msg) => msg, + }; + + match msg { + MainKey::WordsFst(new_fst) => { + let old_value = main.get::<_, Str, ByteSlice>(&wtxn, "words-fst")?; + let new_value = union_words_fst(b"words-fst", old_value, &new_fst) + .context("error while do a words-fst union")?; + main.put::<_, Str, ByteSlice>(&mut wtxn, "words-fst", &new_value)?; + }, + MainKey::Headers(headers) => { + if let Some(old_headers) = main.get::<_, Str, ByteSlice>(&wtxn, "headers")? { + ensure!(old_headers == &*headers, "headers differs from the previous ones"); + } + main.put::<_, Str, ByteSlice>(&mut wtxn, "headers", &headers)?; + }, + } + }, + recv(receiver.postings_ids) -> msg => { + let (word, postings) = match msg { + Err(_) => break, + Ok(msg) => msg, + }; + + let old_value = postings_ids.get(&wtxn, &word)?; + let new_value = union_postings_ids(word.as_bytes(), old_value, postings) + .context("error while do a words-fst union")?; + postings_ids.put(&mut wtxn, &word, &new_value)?; + }, + recv(receiver.documents) -> msg => { + let (id, content) = match msg { + Err(_) => break, + Ok(msg) => msg, + }; + documents.put(&mut wtxn, &BEU32::new(id), &content)?; + }, + } + } + + wtxn.commit()?; + Ok(()) +} + +fn index_csv(tid: usize, db_sender: DbSender, mut rdr: csv::Reader) -> anyhow::Result { const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; - let main = db.cf_handle("main").context("cf \"main\" not found")?; - let postings_ids = db.cf_handle("postings-ids").context("cf \"postings-ids\" not found")?; - let documents = db.cf_handle("documents").context("cf \"documents\" not found")?; - let mut document = csv::StringRecord::new(); let mut new_postings_ids = FastMap4::default(); let mut new_words = BTreeSet::default(); @@ -160,19 +179,19 @@ fn index_csv( let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); writer.write_byte_record(headers.as_byte_record())?; let headers = writer.into_inner()?; - - if let Some(old_headers) = db.get_cf(&main, "headers")? { - ensure!(old_headers == headers, "headers differs from the previous ones"); - } - db.put_cf(&main, "headers", headers.as_slice())?; + db_sender.main.send(MainKey::Headers(headers))?; while rdr.read_record(&mut document)? { let document_id = ID_GENERATOR.fetch_add(1, Ordering::SeqCst); - let document_id = u32::try_from(document_id).context("Generated id is too big")?; + let document_id = DocumentId::try_from(document_id).context("Generated id is too big")?; for (_attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { for (_pos, word) in alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { - new_postings_ids.entry(SmallString32::from(word)).or_insert_with(BpVec::new).push(document_id); + if !word.is_empty() && word.len() < 500 { // LMDB limits + new_postings_ids.entry(SmallString32::from(word)) + .or_insert_with(RoaringBitmap::new) + .insert(document_id); + } } } @@ -180,15 +199,11 @@ fn index_csv( let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); writer.write_byte_record(document.as_byte_record())?; let document = writer.into_inner()?; - db.put_cf(&documents, document_id.to_be_bytes(), document)?; + db_sender.documents.send((document_id, document))?; number_of_documents += 1; if number_of_documents % 100000 == 0 { - let postings_ids_size = new_postings_ids.iter().map(|(_, v)| { - v.compressed_capacity() + v.uncompressed_capacity() * 4 - }).sum::(); - eprintln!("{}, documents seen {}, postings size {}", - tid, number_of_documents, postings_ids_size); + eprintln!("{}, documents seen {}", tid, number_of_documents); } } @@ -196,8 +211,7 @@ fn index_csv( // We compute and store the postings list into the DB. for (word, new_ids) in new_postings_ids { - let new_ids = SetBuf::from_dirty(new_ids.to_vec()); - db.merge_cf(&postings_ids, word.as_bytes(), new_ids.as_bytes())?; + db_sender.postings_ids.send((word.clone(), new_ids))?; new_words.insert(word); } @@ -207,7 +221,7 @@ fn index_csv( let new_words_fst = fst::Set::from_iter(new_words.iter().map(|s| s.as_str()))?; drop(new_words); - db.merge_cf(&main, "words-fst", new_words_fst.as_fst().as_bytes())?; + db_sender.main.send(MainKey::WordsFst(new_words_fst))?; eprintln!("Finished merging the words-fst"); eprintln!("Total number of documents seen is {}", ID_GENERATOR.load(Ordering::Relaxed)); @@ -218,31 +232,30 @@ fn index_csv( fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - // Setup the merge operators - opts.set_merge_operator("main", union_words_fst, None); // Some(union_words_fst)); - opts.set_merge_operator("postings-ids", union_postings_ids, None); // Some(union_postings_ids)); + std::fs::create_dir_all(&opt.database)?; + let env = EnvOpenOptions::new() + .map_size(100 * 1024 * 1024 * 1024) // 100 GB + .max_readers(10) + .max_dbs(5) + .open(opt.database)?; - let mut db = rocksdb::DB::open(&opts, &opt.database)?; + let (sender, receiver) = thread_channel(); + let writing_child = thread::spawn(move || writer_thread(env, receiver)); - let cfs = &["main", "postings-ids", "documents"]; - for cf in cfs.into_iter() { - db.create_cf(cf, &opts).unwrap(); - } - - let db = Arc::new(db); let res = opt.files_to_index .into_par_iter() .enumerate() .map(|(tid, path)| { let rdr = csv::Reader::from_path(path)?; - index_csv(tid, db.clone(), rdr) + index_csv(tid, sender.clone(), rdr) }) .try_reduce(|| 0, |a, b| Ok(a + b)); - println!("{:?}", res); + + eprintln!("witing the writing thread..."); + writing_child.join().unwrap().unwrap(); + + println!("indexed {:?} documents", res); Ok(()) }