diff --git a/Cargo.lock b/Cargo.lock index ec92fe58e..b20d19ae2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,7 +9,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "617a8268e3537fe1d8c9ead925fca49ef6400927ee7bc26750e90ecee14ce4b8" dependencies = [ "bitflags 1.3.2", - "bytes", + "bytes 1.4.0", "futures-core", "futures-sink", "memchr", @@ -49,7 +49,7 @@ dependencies = [ "base64 0.21.2", "bitflags 1.3.2", "brotli", - "bytes", + "bytes 1.4.0", "bytestring", "derive_more", "encoding_rs", @@ -119,7 +119,7 @@ dependencies = [ "actix-utils", "futures-core", "futures-util", - "mio", + "mio 0.8.7", "num_cpus", "socket2", "tokio", @@ -182,9 +182,9 @@ dependencies = [ "actix-utils", "actix-web-codegen", "ahash 0.7.6", - "bytes", + "bytes 1.4.0", "bytestring", - "cfg-if", + "cfg-if 1.0.0", "cookie", "derive_more", "encoding_rs", @@ -251,7 +251,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "433cfd6710c9986c576a25ca913c39d66a6474107b406f34f91d4a8923395241" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cipher", "cpufeatures", ] @@ -273,7 +273,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "getrandom", "once_cell", "version_check", @@ -303,12 +303,6 @@ dependencies = [ "alloc-no-stdlib", ] -[[package]] -name = "allocator-api2" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" - [[package]] name = "anes" version = "0.1.6" @@ -448,7 +442,7 @@ checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" dependencies = [ "addr2line", "cc", - "cfg-if", + "cfg-if 1.0.0", "libc", "miniz_oxide 0.6.2", "object", @@ -478,7 +472,7 @@ name = "benchmarks" version = "1.3.0" dependencies = [ "anyhow", - "bytes", + "bytes 1.4.0", "convert_case 0.6.0", "criterion", "csv", @@ -618,6 +612,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "bytes" version = "1.4.0" @@ -630,7 +630,7 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "238e4886760d98c4f899360c834fa93e62cf7f721ac3c2da375cbdf4b8679aae" dependencies = [ - "bytes", + "bytes 1.4.0", ] [[package]] @@ -670,15 +670,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" -[[package]] -name = "castaway" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a17ed5635fc8536268e5d4de1e22e81ac34419e5f052d4d51f4e01dcc263fcc" -dependencies = [ - "rustversion", -] - [[package]] name = "cc" version = "1.0.79" @@ -697,6 +688,12 @@ dependencies = [ "smallvec", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -806,7 +803,7 @@ version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "191d9573962933b4027f932c600cd252ce27a8ad5979418fe78e43c07996f27b" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 2.0.28", @@ -824,17 +821,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "compact_str" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33b5c3ee2b4ffa00ac2b00d1645cd9229ade668139bccf95f15fadcf374127b" -dependencies = [ - "castaway", - "itoa", - "ryu", -] - [[package]] name = "concat-arrays" version = "0.1.2" @@ -858,26 +844,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "const_format" -version = "0.2.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c990efc7a285731f9a4378d81aff2f0e85a2c8781a05ef0f8baa8dac54d0ff48" -dependencies = [ - "const_format_proc_macros", -] - -[[package]] -name = "const_format_proc_macros" -version = "0.2.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e026b6ce194a874cb9cf32cd5772d1ef9767cc8fcb5765948d74f37a9d8b2bf6" -dependencies = [ - "proc-macro2", - "quote", - "unicode-xid", -] - [[package]] name = "constant_time_eq" version = "0.1.5" @@ -943,7 +909,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -994,7 +960,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-channel", "crossbeam-deque", "crossbeam-epoch", @@ -1008,7 +974,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -1018,7 +984,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", "crossbeam-utils", ] @@ -1030,7 +996,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" dependencies = [ "autocfg", - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "memoffset", "scopeguard", @@ -1042,7 +1008,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -1052,7 +1018,7 @@ version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -1228,7 +1194,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf36e65a80337bea855cd4ef9b8401ffce06a7baedf2e85ec467b1ac3f6e82b6" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "dirs-sys-next", ] @@ -1240,7 +1206,7 @@ checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" dependencies = [ "libc", "redox_users", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1353,7 +1319,7 @@ version = "0.8.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -1473,7 +1439,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cbc844cecaee9d4443931972e1289c8ff485cb4cc2767cb03ca139ed6885153" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "redox_syscall 0.2.16", "windows-sys 0.48.0", @@ -1527,6 +1493,22 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a" +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags 1.3.2", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + [[package]] name = "futures" version = "0.3.28" @@ -1660,7 +1642,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi", ] @@ -1719,7 +1701,7 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d357c7ae988e7d2182f7d7871d0b963962420b0678b0997ce7de72001aeab782" dependencies = [ - "bytes", + "bytes 1.4.0", "fnv", "futures-core", "futures-sink", @@ -1761,19 +1743,6 @@ name = "hashbrown" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" -dependencies = [ - "ahash 0.8.3", - "allocator-api2", -] - -[[package]] -name = "hashlink" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "312f66718a2d7789ffef4f4b7b213138ed9f1eb3aa1d0d82fc99f88fb3ffd26f" -dependencies = [ - "hashbrown 0.14.0", -] [[package]] name = "heapless" @@ -1788,15 +1757,6 @@ dependencies = [ "stable_deref_trait", ] -[[package]] -name = "heck" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "heck" version = "0.4.1" @@ -1873,7 +1833,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ - "bytes", + "bytes 1.4.0", "fnv", "itoa", ] @@ -1884,7 +1844,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes", + "bytes 1.4.0", "http", "pin-project-lite", ] @@ -1913,7 +1873,7 @@ version = "0.14.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" dependencies = [ - "bytes", + "bytes 1.4.0", "futures-channel", "futures-core", "futures-util", @@ -1960,12 +1920,6 @@ dependencies = [ "unicode-normalization", ] -[[package]] -name = "ignore-result" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "665ff4dce8edd10d490641ccb78949832f1ddbff02c584fb1f85ab888fe0e50c" - [[package]] name = "index-scheduler" version = "1.3.0" @@ -1996,7 +1950,7 @@ dependencies = [ "time", "tokio", "uuid 1.4.1", - "zookeeper-client", + "zookeeper", ] [[package]] @@ -2051,7 +2005,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -2080,6 +2034,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "ipnet" version = "2.7.2" @@ -2178,6 +2141,16 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -2190,6 +2163,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "levenshtein_automata" version = "0.2.1" @@ -2600,7 +2579,7 @@ dependencies = [ "brotli", "bstr", "byte-unit", - "bytes", + "bytes 1.4.0", "cargo_toml", "clap", "crossbeam-channel", @@ -2672,7 +2651,7 @@ dependencies = [ "walkdir", "yaup", "zip", - "zookeeper-client", + "zookeeper", ] [[package]] @@ -2692,9 +2671,8 @@ dependencies = [ "sha2", "thiserror", "time", - "tokio", "uuid 1.4.1", - "zookeeper-client", + "zookeeper", ] [[package]] @@ -2856,6 +2834,25 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +dependencies = [ + "cfg-if 0.1.10", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow", + "net2", + "slab", + "winapi 0.2.8", +] + [[package]] name = "mio" version = "0.8.7" @@ -2868,11 +2865,46 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio-extras" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" +dependencies = [ + "lazycell", + "log", + "mio 0.6.23", + "slab", +] + +[[package]] +name = "miow" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + [[package]] name = "nelson" version = "0.1.0" source = "git+https://github.com/meilisearch/nelson.git?rev=675f13885548fb415ead8fbb447e9e6d9314000a#675f13885548fb415ead8fbb447e9e6d9314000a" +[[package]] +name = "net2" +version = "0.2.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + [[package]] name = "nom" version = "7.1.3" @@ -2900,7 +2932,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2944,27 +2976,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_enum" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" -dependencies = [ - "num_enum_derive", -] - -[[package]] -name = "num_enum_derive" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "object" version = "0.30.3" @@ -3008,7 +3019,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -3018,7 +3029,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b7663cbd190cfd818d08efa8497f6cd383076688c49a391ef7c0d03cd12b561" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -3037,7 +3048,7 @@ version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "redox_syscall 0.2.16", "smallvec", @@ -3260,16 +3271,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "proc-macro-crate" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" -dependencies = [ - "once_cell", - "toml_edit", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3322,7 +3323,7 @@ version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "fnv", "lazy_static", "libc", @@ -3348,7 +3349,7 @@ dependencies = [ "anyhow", "bincode", "byteorder", - "cfg-if", + "cfg-if 1.0.0", "instant", "lz4_flex", "once_cell", @@ -3498,7 +3499,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ "base64 0.21.2", - "bytes", + "bytes 1.4.0", "encoding_rs", "futures-core", "futures-util", @@ -3548,7 +3549,7 @@ dependencies = [ "spin 0.5.2", "untrusted", "web-sys", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -3808,7 +3809,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", ] @@ -3819,7 +3820,7 @@ version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", ] @@ -3830,7 +3831,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", ] @@ -3910,6 +3911,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "snowflake" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27207bb65232eda1f588cf46db2fee75c0808d557f6b3cf19a75f5d6d7c94df1" + [[package]] name = "socket2" version = "0.4.9" @@ -3917,7 +3924,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -3964,28 +3971,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" -[[package]] -name = "strum" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" -dependencies = [ - "strum_macros", -] - -[[package]] -name = "strum_macros" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "rustversion", - "syn 1.0.109", -] - [[package]] name = "subtle" version = "2.5.0" @@ -4041,13 +4026,13 @@ version = "0.28.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c2f3ca6693feb29a89724516f016488e9aafc7f37264f898593ee4b942f31b" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "core-foundation-sys", "libc", "ntapi", "once_cell", "rayon", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -4076,7 +4061,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "fastrand", "redox_syscall 0.3.5", "rustix 0.37.19", @@ -4171,9 +4156,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", - "bytes", + "bytes 1.4.0", "libc", - "mio", + "mio 0.8.7", "num_cpus", "parking_lot", "pin-project-lite", @@ -4232,7 +4217,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ - "bytes", + "bytes 1.4.0", "futures-core", "futures-sink", "pin-project-lite", @@ -4286,7 +4271,7 @@ version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "log", "pin-project-lite", "tracing-core", @@ -4443,7 +4428,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f21b881cd6636ece9735721cf03c1fe1e774fe258683d084bb2812ab67435749" dependencies = [ "anyhow", - "cfg-if", + "cfg-if 1.0.0", "enum-iterator", "getset", "git2", @@ -4501,7 +4486,7 @@ version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -4526,7 +4511,7 @@ version = "0.4.36" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "wasm-bindgen", "web-sys", @@ -4609,6 +4594,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -4619,6 +4610,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -4631,7 +4628,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -4787,7 +4784,17 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" dependencies = [ - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", ] [[package]] @@ -4866,25 +4873,29 @@ dependencies = [ ] [[package]] -name = "zookeeper-client" -version = "0.5.0" +name = "zookeeper" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69dd5ba1592db2f385d3ff8fee13ed3a50024d2cf65bf4b49ca8ba7cfb9070" +checksum = "2312b424380193701a7341cec0551b80d2e3afd827ea0d3440af67899156ce10" dependencies = [ - "bytes", - "compact_str", - "const_format", - "either", - "hashbrown 0.12.3", - "hashlink", - "ignore-result", + "byteorder", + "bytes 0.5.6", + "lazy_static", "log", - "num_enum", - "static_assertions", - "strum", - "thiserror", - "tokio", - "uuid 1.4.1", + "mio 0.6.23", + "mio-extras", + "snowflake", + "zookeeper_derive", +] + +[[package]] +name = "zookeeper_derive" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42307291e3c8b2e4082e5647572da863f0470511d0ecb1618a4cd0a361549723" +dependencies = [ + "quote", + "syn 1.0.109", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2099483ee..4f6546f03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,24 +36,3 @@ opt-level = 3 opt-level = 3 [profile.dev.package.roaring] opt-level = 3 - -[profile.dev.package.lindera-ipadic-builder] -opt-level = 3 -[profile.dev.package.encoding] -opt-level = 3 -[profile.dev.package.yada] -opt-level = 3 - -[profile.release.package.lindera-ipadic-builder] -opt-level = 3 -[profile.release.package.encoding] -opt-level = 3 -[profile.release.package.yada] -opt-level = 3 - -[profile.bench.package.lindera-ipadic-builder] -opt-level = 3 -[profile.bench.package.encoding] -opt-level = 3 -[profile.bench.package.yada] -opt-level = 3 diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index b97545b20..6beea99eb 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -32,7 +32,7 @@ thiserror = "1.0.40" time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } uuid = { version = "1.3.1", features = ["serde", "v4"] } tokio = { version = "1.27.0", features = ["full"] } -zookeeper-client = "0.5.0" +zookeeper = "0.8.0" [dev-dependencies] big_s = "1.0.2" diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 56cf11d92..b0fe0b432 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -58,7 +58,10 @@ use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use uuid::Uuid; -use zookeeper_client as zk; +use zookeeper::recipes::leader::LeaderLatch; +use zookeeper::{ + Acl, AddWatchMode, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZooKeeper, +}; use crate::index_mapper::IndexMapper; use crate::utils::{check_index_swap_validity, clamp_to_page_size}; @@ -234,7 +237,6 @@ pub enum Breakpoint { InsideProcessBatch, } -#[derive(Debug)] pub struct IndexSchedulerOptions { /// The path to the version file of Meilisearch. pub version_file_path: PathBuf, @@ -271,7 +273,7 @@ pub struct IndexSchedulerOptions { /// The experimental features enabled for this instance. pub instance_features: InstanceTogglableFeatures, /// zookeeper client - pub zk: Option, + pub zookeeper: Option>, } /// Structure which holds meilisearch's indexes and schedules the tasks @@ -341,7 +343,7 @@ pub struct IndexScheduler { pub(crate) version_file_path: PathBuf, /// The URL to the ZooKeeper cluster - pub(crate) zk: Option, + pub(crate) zookeeper: Option>, // ================= test // The next entry is dedicated to the tests. @@ -384,7 +386,7 @@ impl IndexScheduler { snapshots_path: self.snapshots_path.clone(), dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), - zk: self.zk.clone(), + zookeeper: self.zookeeper.clone(), version_file_path: self.version_file_path.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), @@ -399,7 +401,7 @@ impl IndexScheduler { impl IndexScheduler { /// Create an index scheduler and start its run loop. - pub async fn new( + pub fn new( options: IndexSchedulerOptions, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, @@ -481,7 +483,7 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, - zk: options.zk, + zookeeper: options.zookeeper, #[cfg(test)] test_breakpoint_sdr, #[cfg(test)] @@ -492,19 +494,30 @@ impl IndexScheduler { }; // initialize the directories we need to process batches. - if let Some(ref zk) = this.zk { - let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); - match zk.create("/election", &[], &options).await { - Ok(_) | Err(zk::Error::NodeExists) => (), + if let Some(zookeeper) = &this.zookeeper { + match zookeeper.create( + "/election", + vec![], + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) { + Ok(_) | Err(ZkError::NodeExists) => (), Err(e) => panic!("{e}"), } - match zk.create("/snapshots", &[], &options).await { - Ok(_) | Err(zk::Error::NodeExists) => (), + match zookeeper.create( + "/snapshots", + vec![], + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) { + Ok(_) | Err(ZkError::NodeExists) => (), Err(e) => panic!("{e}"), } } - this.run().await; + + this.run(); + Ok(this) } @@ -592,315 +605,119 @@ impl IndexScheduler { /// /// This function will execute in a different thread and must be called /// only once per index scheduler. - async fn run(&self) { - let run = self.private_clone(); - let zk = self.zk.clone(); - let mut self_node_id = zk::CreateSequence(0); - tokio::task::spawn(async move { - #[cfg(test)] - run.breakpoint(Breakpoint::Init); + fn run(&self) { + #[cfg(test)] + run.breakpoint(Breakpoint::Init); + + if let Some(zookeeper) = self.zookeeper.clone() { + let id = Uuid::new_v4().to_string(); + let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); + latch.start().unwrap(); // Join the potential leaders list. // The lowest in the list is the leader. And if we're not the leader // we watch the node right before us to be notified if he dies. // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection - let mut watchers = if let Some(ref zk) = zk { - let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all()); - let (_stat, id) = zk.create("/election/node-", &[], &options).await.unwrap(); - self_node_id = id; - let previous_path = { - let mut list = zk.list_children("/election").await.unwrap(); - list.sort(); + let latchc = latch.clone(); + let this = self.private_clone(); + zookeeper + .add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { + if !latchc.has_leadership() { + let WatchedEvent { event_type, path, keeper_state: _ } = event; + match event_type { + WatchedEventType::NodeCreated => { + let path = path.unwrap(); + log::info!("The snapshot {} is in preparation", path); + } + WatchedEventType::NodeDataChanged => { + let path = path.unwrap(); + log::info!("Importing snapshot {}", path); + let snapshot_id = + path.strip_prefix("/snapshots/snapshot-").unwrap(); + let snapshot_dir = PathBuf::from(format!( + "{}/zk-snapshots/{}", + env!("HOME"), + snapshot_id + )); - let self_node_path = format!("node-{}", self_node_id); - let previous_path = - list.into_iter().take_while(|path| path < &self_node_path).last(); - previous_path.map(|path| format!("/election/{}", path)) - }; + // 1. TODO: Ensure the snapshot version file is the same as our version. - if let Some(previous_path) = previous_path { - log::warn!("I am the follower {}", self_node_id); - Some(( - zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(), - zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive) - .await - .unwrap(), - )) - } else { - // if there was no node before ourselves, then we're the leader. - log::warn!("I'm the leader"); - None - } - } else { - log::warn!("I don't have any ZK cluster"); - None - }; + // 2. Download all the databases + let tasks_file = + tempfile::NamedTempFile::new_in(this.env.path()).unwrap(); - loop { - match watchers.as_mut() { - Some((leader_watcher, snapshot_watcher)) => { - // We wait for a new batch processed by the leader OR a disconnection from the leader. - tokio::select! { - zk::WatchedEvent { event_type, session_state, .. } = leader_watcher.changed() => match event_type { - zk::EventType::Session => panic!("Session error {:?}", session_state), - zk::EventType::NodeDeleted => { - // The node behind us has been disconnected, - // am I the leader or is there someone before me. - let zk = zk.as_ref().unwrap(); - let previous_path = { - let mut list = zk.list_children("/election").await.unwrap(); - list.sort(); + log::info!("Downloading the index scheduler database."); + let tasks_snapshot = snapshot_dir.join("tasks.mdb"); + std::fs::copy(tasks_snapshot, tasks_file).unwrap(); - let self_node_path = format!("node-{}", self_node_id); - let previous_path = - list.into_iter().take_while(|path| path < &self_node_path).last(); - previous_path.map(|path| format!("/election/{}", path)) - }; + log::info!("Downloading the indexes databases"); + let indexes_files = + tempfile::TempDir::new_in(&this.index_mapper.base_path) + .unwrap(); + let mut indexes = Vec::new(); - let (leader_watcher, snapshot_watcher) = watchers.take().unwrap(); - leader_watcher.remove().await.unwrap(); - watchers = if let Some(previous_path) = previous_path { - log::warn!("I stay a follower {}", self_node_id); - Some(( - zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(), - snapshot_watcher, - )) - } else { - log::warn!("I'm the new leader"); - snapshot_watcher.remove().await.unwrap(); - None - } + let dst = snapshot_dir.join("indexes"); + for result in std::fs::read_dir(&dst).unwrap() { + let entry = result.unwrap(); + let uuid = + entry.file_name().as_os_str().to_str().unwrap().to_string(); + log::info!("\tDownloading the index {}", uuid.to_string()); + std::fs::copy( + dst.join(&uuid), + indexes_files.path().join(&uuid), + ) + .unwrap(); + indexes.push(uuid); } - _ => (), - }, - zk::WatchedEvent { event_type, session_state, path } = snapshot_watcher.changed() => match event_type { - zk::EventType::Session => panic!("Session error {:?}", session_state), - zk::EventType::NodeCreated => { - log::info!("The snapshot {} is in preparation", path); - } - zk::EventType::NodeDataChanged => { - log::info!("Importing snapshot {}", path); - let snapshot_id = path.strip_prefix("/snapshots/snapshot-").unwrap(); - let snapshot_dir = - PathBuf::from(format!("{}/zk-snapshots/{}", env!("HOME"), snapshot_id)); + // 3. Lock the index-mapper and close all the env + // TODO: continue here - // 1. TODO: Ensure the snapshot version file is the same as our version. + // run.env.close(); - // 2. Download all the databases - let tasks_file = tempfile::NamedTempFile::new_in(run.env.path()).unwrap(); + // 4. Move all the databases - log::info!("Downloading the index scheduler database."); - let tasks_snapshot = - snapshot_dir.join("tasks.mdb"); - std::fs::copy(tasks_snapshot, tasks_file).unwrap(); + // 5. Unlock the index-mapper + // 2. Download and import the index-scheduler database - log::info!("Downloading the indexes databases"); - let indexes_files = tempfile::TempDir::new_in(&run.index_mapper.base_path).unwrap(); - let mut indexes = Vec::new(); - - let dst = snapshot_dir.join("indexes"); - let mut indexes_snapshot = tokio::fs::read_dir(&dst).await.unwrap(); - while let Some(file) = indexes_snapshot.next_entry().await.unwrap() { - let uuid = file.file_name().as_os_str().to_str().unwrap().to_string(); - log::info!("\tDownloading the index {}", uuid.to_string()); - std::fs::copy(dst.join(&uuid), indexes_files.path().join(&uuid)).unwrap(); - indexes.push(uuid); - } - - // 3. Lock the index-mapper and close all the env - // TODO: continue here - - - - // run.env.close(); - - // 4. Move all the databases - - // 5. Unlock the index-mapper - - // 2. Download and import the index-scheduler database - - // 3. Snapshot every indexes - } - _ => (), - }, - else => break, + // 3. Snapshot every indexes + } + otherwise => panic!("{otherwise:?}"), } } - None => { - // we're either a leader or not running in a cluster, - // either way we should wait until we receive a task. - let wake_up = run.wake_up.clone(); - let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await; + }) + .unwrap(); - match run.tick().await { - Ok(TickOutcome::TickAgain(n)) => { - // We must tick again. - run.wake_up.signal(); - - // if we're in a cluster that means we're the leader - // and should share a snapshot of what we've done. - if let Some(ref zk) = run.zk { - // if nothing was processed we have nothing to do. - if n == 0 { - continue; - } - - let options = zk::CreateMode::EphemeralSequential - .with_acls(zk::Acls::anyone_all()); - let (_stat, snapshot_id) = zk - .create("/snapshots/snapshot-", &[], &options) - .await - .unwrap(); - - let zk_snapshots = format!("{}/zk-snapshots", env!("HOME")); - tokio::fs::create_dir_all(&zk_snapshots).await.unwrap(); - let snapshot_dir = - PathBuf::from(format!("{zk_snapshots}/{snapshot_id}")); - tokio::fs::create_dir(&snapshot_dir).await.unwrap(); - - // 1. Snapshot the version file. - let dst = - snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME); - tokio::fs::copy(&run.version_file_path, dst).await.unwrap(); - - // 2. Snapshot the index-scheduler LMDB env - let dst = snapshot_dir.join("tasks"); - tokio::fs::create_dir_all(&dst).await.unwrap(); - - log::info!("Snapshotting the tasks"); - let env = run.env.clone(); - tokio::task::spawn_blocking(move || { - env.copy_to_path( - dst.join("tasks.mdb"), - heed::CompactionOption::Enabled, - ) - .unwrap(); - }) - .await - .unwrap(); - - // 3. Snapshot every indexes - log::info!("Snapshotting the indexes"); - let dst = snapshot_dir.join("indexes"); - tokio::fs::create_dir_all(&dst).await.unwrap(); - - let this = run.private_clone(); - let indexes = tokio::task::spawn_blocking(move || { - let rtxn = this.env.read_txn().unwrap(); - this.index_mapper - .index_mapping - .iter(&rtxn) - .unwrap() - .map(|ret| ret.unwrap()) - .map(|(name, uuid)| (name.to_string(), uuid)) - .collect::>() - }) - .await - .unwrap(); - for (name, uuid) in indexes { - log::info!(" Snapshotting index {name}"); - let this = run.private_clone(); - let dst = dst.clone(); - tokio::task::spawn_blocking(move || { - let rtxn = this.env.read_txn().unwrap(); - let index = - this.index_mapper.index(&rtxn, &name).unwrap(); - index - .copy_to_path( - dst.join(format!("{uuid}.mdb")), - heed::CompactionOption::Enabled, - ) - .unwrap(); - }) - .await - .unwrap(); - } - - // we must notify everyone that we dropped a new snapshot on the s3 - let _stat = zk - .set_data( - &format!("/snapshots/snapshot-{}", snapshot_id), - &[], - None, - ) - .await - .unwrap(); - log::info!( - "Notified everyone about the new snapshot {snapshot_id}" - ); - - // We can now delete all the tasks that has been processed - let processed = run - .processing_tasks - .read() - .unwrap() - .processed_previously() - .clone(); // we don't want to hold the mutex - log::info!("Deleting {} processed tasks", processed.len()); - for task in processed { - let _ = zk // we don't want to crash if we can't delete an update file. - .delete( - &format!( - "/tasks/task-{}", - zk::CreateSequence(task as i32) - ), - None, - ) - .await; - // TODO: Delete the update files associated with the deleted tasks - } - } - } - Ok(TickOutcome::WaitForSignal) => (), - Err(e) => { - log::error!("{}", e); - // Wait one second when an irrecoverable error occurs. - if !e.is_recoverable() { - std::thread::sleep(Duration::from_secs(1)); - } - } - } - } - } - } - }); - - if let Some(ref zk) = &self.zk { - let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); - match zk.create("/tasks", &[], &options).await { + match zookeeper.create( + "/tasks", + vec![], + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) { Ok(_) => (), - Err(zk::Error::NodeExists) => { + Err(ZkError::NodeExists) => { log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk."); - let children = zk - .list_children("/tasks") - .await - .expect("Internal, the /tasks directory was deleted during execution."); + let children = zookeeper + .get_children("/tasks", false) + .expect("Internal, the /tasks directory was deleted during execution."); // TODO change me log::info!("Importing {} tasks", children.len()); for path in children { log::info!(" Importing {}", path); - match zk.get_data(&format!("/tasks/{}", &path)).await { + match zookeeper.get_data(&format!("/tasks/{}", &path), false) { Ok((task, _stat)) => { if task.is_empty() { log::info!(" Task {} was empty, skipping.", path); - continue; - } - let task = serde_json::from_slice(&task).unwrap(); - - let this = self.private_clone(); - tokio::task::spawn_blocking(move || { - let mut wtxn = this.env.write_txn().unwrap(); - this.register_raw_task(&mut wtxn, &task).unwrap(); + } else { + let task = serde_json::from_slice(&task).unwrap(); + let mut wtxn = self.env.write_txn().unwrap(); + self.register_raw_task(&mut wtxn, &task).unwrap(); wtxn.commit().unwrap(); // we received a new tasks, we must wake up - this.wake_up.signal(); - }) - .await - .unwrap(); + self.wake_up.signal(); + } } Err(e) => panic!("{e}"), } @@ -913,38 +730,154 @@ impl IndexScheduler { } // TODO: fix unwrap by returning a clear error. - let mut watcher = - zk.watch("/tasks", zk::AddWatchMode::PersistentRecursive).await.unwrap(); let this = self.private_clone(); - tokio::spawn(async move { - loop { - let zk::WatchedEvent { event_type, session_state, path } = - watcher.changed().await; + zookeeper + .add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| { + let WatchedEvent { event_type, path, keeper_state: _ } = event; match event_type { - zk::EventType::Session => panic!("Session error {:?}", session_state), - // A task as been added - zk::EventType::NodeDataChanged => { + WatchedEventType::NodeDataChanged => { + let path = path.unwrap(); // Add raw task content in local DB log::info!("Received a new task from the cluster at {}", path); let (data, _stat) = - this.zk.as_ref().unwrap().get_data(&path).await.unwrap(); - let task = serde_json::from_slice(&data).unwrap(); + this.zookeeper.as_ref().unwrap().get_data(&path, false).unwrap(); + let task = serde_json::from_slice(data.as_slice()).unwrap(); + let mut wtxn = this.env.write_txn().unwrap(); + this.register_raw_task(&mut wtxn, &task).unwrap(); + wtxn.commit().unwrap(); + } + otherwise => panic!("{otherwise:?}"), + } + + this.wake_up.signal(); + }) + .unwrap(); + } + + let this = self.private_clone(); + std::thread::spawn(move || { + loop { + // we're either a leader or not running in a cluster, + // either way we should wait until we receive a task. + let wake_up = this.wake_up.clone(); + let _ = wake_up.wait(); + + match this.tick() { + Ok(TickOutcome::TickAgain(n)) => { + // We must tick again. + this.wake_up.signal(); + + // if we're in a cluster that means we're the leader + // and should share a snapshot of what we've done. + if let Some(ref zookeeper) = this.zookeeper { + // if nothing was processed we have nothing to do. + if n == 0 { + continue; + } + + let snapshot_id = zookeeper + .create( + "/snapshots/snapshot-", + vec![], + Acl::open_unsafe().clone(), + CreateMode::PersistentSequential, + ) + .unwrap(); + + dbg!(&snapshot_id); + let zk_snapshots = format!("{}/zk-snapshots", env!("HOME")); + std::fs::create_dir_all(&zk_snapshots).unwrap(); + let snapshot_dir = + PathBuf::from(format!("{zk_snapshots}/{snapshot_id}")); + std::fs::create_dir(&snapshot_dir).unwrap(); + + // 1. Snapshot the version file. + let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME); + std::fs::copy(&this.version_file_path, dst).unwrap(); + + // 2. Snapshot the index-scheduler LMDB env + let dst = snapshot_dir.join("tasks"); + std::fs::create_dir_all(&dst).unwrap(); + + log::info!("Snapshotting the tasks"); + let env = this.env.clone(); + env.copy_to_path( + dst.join("tasks.mdb"), + heed::CompactionOption::Enabled, + ) + .unwrap(); + + // 3. Snapshot every indexes + log::info!("Snapshotting the indexes"); + let dst = snapshot_dir.join("indexes"); + std::fs::create_dir_all(&dst).unwrap(); let this = this.private_clone(); - tokio::task::spawn_blocking(move || { - let mut wtxn = this.env.write_txn().unwrap(); - this.register_raw_task(&mut wtxn, &task).unwrap(); - wtxn.commit().unwrap(); - }) - .await - .unwrap(); + let rtxn = this.env.read_txn().unwrap(); + let indexes = this + .index_mapper + .index_mapping + .iter(&rtxn) + .unwrap() + .map(|ret| ret.unwrap()) + .map(|(name, uuid)| (name.to_string(), uuid)) + .collect::>(); + + for (name, uuid) in indexes { + log::info!(" Snapshotting index {name}"); + let this = this.private_clone(); + let dst = dst.clone(); + let rtxn = this.env.read_txn().unwrap(); + let index = this.index_mapper.index(&rtxn, &name).unwrap(); + index + .copy_to_path( + dst.join(format!("{uuid}.mdb")), + heed::CompactionOption::Enabled, + ) + .unwrap(); + } + + // we must notify everyone that we dropped a new snapshot on the s3 + let _stat = zookeeper.set_data( + &format!("/snapshots/snapshot-{}", snapshot_id), + vec![], + None, + ); + log::info!("Notified everyone about the new snapshot {snapshot_id}"); + + // We can now delete all the tasks that has been processed + let processed = this + .processing_tasks + .read() + .unwrap() + .processed_previously() + .clone(); // we don't want to hold the mutex + log::info!("Deleting {} processed tasks", processed.len()); + for task in processed { + let _ = zookeeper // we don't want to crash if we can't delete an update file. + .delete( + &format!( + "/tasks/task-{:0>10}", + task as i32 + ), + None, + ) + .unwrap(); + // TODO: Delete the update files associated with the deleted tasks + } + } + } + Ok(TickOutcome::WaitForSignal) => (), + Err(e) => { + log::error!("{}", e); + // Wait one second when an irrecoverable error occurs. + if !e.is_recoverable() { + std::thread::sleep(Duration::from_secs(1)); } - _ => (), } - this.wake_up.signal(); } - }); - } + } + }); } pub fn indexer_config(&self) -> &IndexerConfig { @@ -1279,14 +1212,17 @@ impl IndexScheduler { /// Register a new task in the scheduler. /// /// If it fails and data was associated with the task, it tries to delete the associated data. - pub async fn register(&self, kind: KindWithContent) -> Result { - let id = match self.zk { - Some(ref zk) => { - // reserve uniq ID on zookeeper. And give it to the spawn blocking. - let options = - zk::CreateMode::PersistentSequential.with_acls(zk::Acls::anyone_all()); - match zk.create("/tasks/task-", &[], &options).await { - Ok((_stats, id)) => Some(id), + pub fn register(&self, kind: KindWithContent) -> Result { + let id = match &self.zookeeper { + Some(zookeeper) => { + // Reserve uniq ID on zookeeper. And give it to the spawn blocking. + match zookeeper.create( + "/tasks/task-", + vec![], + Acl::open_unsafe().clone(), + CreateMode::PersistentSequential, + ) { + Ok(path) => path.rsplit_once('-').map(|(_, id)| id.parse::().unwrap()), Err(e) => panic!("{e}"), } } @@ -1294,80 +1230,69 @@ impl IndexScheduler { }; let this = self.private_clone(); - let task = tokio::task::spawn_blocking(move || { - let mut wtxn = this.env.write_txn()?; + let mut wtxn = this.env.write_txn()?; - // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task - if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) - && (this.env.non_free_pages_size()? * 100) / this.env.map_size()? as u64 > 50 + // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task + if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) + && (this.env.non_free_pages_size()? * 100) / this.env.map_size()? as u64 > 50 + { + return Err(Error::NoSpaceLeftInTaskQueue); + } + + // Retrieve the id generated by zookeeper or generate a local id. + let id = match id { + Some(id) => id as u32, + None => this.next_task_id(&wtxn)?, + }; + + let mut task = Task { + uid: id, + enqueued_at: OffsetDateTime::now_utc(), + started_at: None, + finished_at: None, + error: None, + canceled_by: None, + details: kind.default_details(), + status: Status::Enqueued, + kind: kind.clone(), + }; + // For deletion and cancelation tasks, we want to make extra sure that they + // don't attempt to delete/cancel tasks that are newer than themselves. + filter_out_references_to_newer_tasks(&mut task); + // If the register task is an index swap task, verify that it is well-formed + // (that it does not contain duplicate indexes). + check_index_swap_validity(&task)?; + + this.register_raw_task(&mut wtxn, &task)?; + + if let Err(e) = wtxn.commit() { + this.delete_persisted_task_data(&task)?; + return Err(e.into()); + } + + // If the registered task is a task cancelation + // we inform the processing tasks to stop (if necessary). + if let KindWithContent::TaskCancelation { tasks, .. } = kind { + let tasks_to_cancel = RoaringBitmap::from_iter(tasks); + if this.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel) { - return Err(Error::NoSpaceLeftInTaskQueue); + this.must_stop_processing.must_stop(); } + } - // get id generated by zookeeper or generate a local id. - let id = match id { - Some(id) => id.0 as u32, - None => this.next_task_id(&wtxn)?, - }; - - let mut task = Task { - uid: id, - enqueued_at: OffsetDateTime::now_utc(), - started_at: None, - finished_at: None, - error: None, - canceled_by: None, - details: kind.default_details(), - status: Status::Enqueued, - kind: kind.clone(), - }; - // For deletion and cancelation tasks, we want to make extra sure that they - // don't attempt to delete/cancel tasks that are newer than themselves. - filter_out_references_to_newer_tasks(&mut task); - // If the register task is an index swap task, verify that it is well-formed - // (that it does not contain duplicate indexes). - check_index_swap_validity(&task)?; - - this.register_raw_task(&mut wtxn, &task)?; - - if let Err(e) = wtxn.commit() { - this.delete_persisted_task_data(&task)?; - return Err(e.into()); - } - - // If the registered task is a task cancelation - // we inform the processing tasks to stop (if necessary). - if let KindWithContent::TaskCancelation { tasks, .. } = kind { - let tasks_to_cancel = RoaringBitmap::from_iter(tasks); - if this - .processing_tasks - .read() - .unwrap() - .must_cancel_processing_tasks(&tasks_to_cancel) - { - this.must_stop_processing.must_stop(); - } - } - - // notify the scheduler loop to execute a new tick - this.wake_up.signal(); - - Ok(task) - }) - .await - .unwrap()?; + // notify the scheduler loop to execute a new tick + this.wake_up.signal(); // TODO: send task to ZK in raw json. - if let Some(ref zk) = self.zk { - let id = id.unwrap(); + if let Some(zookeeper) = &self.zookeeper { // TODO: ugly unwrap - zk.set_data( - &format!("/tasks/task-{}", id), - &serde_json::to_vec_pretty(&task).unwrap(), - None, - ) - .await - .unwrap(); + zookeeper + .set_data( + &format!("/tasks/task-{}", id), + serde_json::to_vec_pretty(&task).unwrap(), + None, + ) + .unwrap(); } Ok(task) @@ -1449,7 +1374,7 @@ impl IndexScheduler { /// 6. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. - async fn tick(&self) -> Result { + fn tick(&self) -> Result { #[cfg(test)] { *self.run_loop_iteration.write().unwrap() += 1; @@ -1458,7 +1383,7 @@ impl IndexScheduler { puffin::GlobalProfiler::lock().new_frame(); - self.cleanup_task_queue().await?; + self.cleanup_task_queue()?; let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let batch = @@ -1597,7 +1522,7 @@ impl IndexScheduler { } /// Register a task to cleanup the task queue if needed - async fn cleanup_task_queue(&self) -> Result<()> { + fn cleanup_task_queue(&self) -> Result<()> { let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let nb_tasks = self.all_task_ids(&rtxn)?.len(); @@ -1640,8 +1565,7 @@ impl IndexScheduler { delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, ), tasks: to_delete, - }) - .await?; + })?; Ok(()) } diff --git a/meilisearch-auth/Cargo.toml b/meilisearch-auth/Cargo.toml index 15b3a6031..06071c8bd 100644 --- a/meilisearch-auth/Cargo.toml +++ b/meilisearch-auth/Cargo.toml @@ -24,6 +24,5 @@ serde_json = { version = "1.0.95", features = ["preserve_order"] } sha2 = "0.10.6" thiserror = "1.0.40" time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } -tokio = { version = "1.27.0", features = ["full"] } uuid = { version = "1.3.1", features = ["serde", "v4"] } -zookeeper-client = "0.5.0" +zookeeper = "0.8.0" diff --git a/meilisearch-auth/src/error.rs b/meilisearch-auth/src/error.rs index eb4cd9b48..d5b91cae5 100644 --- a/meilisearch-auth/src/error.rs +++ b/meilisearch-auth/src/error.rs @@ -2,7 +2,6 @@ use std::error::Error; use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::internal_error; -use zookeeper_client as zk; pub type Result = std::result::Result; @@ -20,8 +19,7 @@ internal_error!( AuthControllerError: meilisearch_types::milli::heed::Error, std::io::Error, serde_json::Error, - tokio::task::JoinError, - zk::Error, + zookeeper::ZkError, std::str::Utf8Error ); diff --git a/meilisearch-auth/src/lib.rs b/meilisearch-auth/src/lib.rs index d88ce24f8..077af26b9 100644 --- a/meilisearch-auth/src/lib.rs +++ b/meilisearch-auth/src/lib.rs @@ -16,57 +16,59 @@ pub use store::open_auth_store_env; use store::{generate_key_as_hexa, HeedAuthStore}; use time::OffsetDateTime; use uuid::Uuid; -use zookeeper_client as zk; +use zookeeper::{ + Acl, AddWatchMode, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZooKeeper, +}; #[derive(Clone)] pub struct AuthController { store: Arc, master_key: Option, - zk: Option, + zookeeper: Option>, } impl AuthController { - pub async fn new( + pub fn new( db_path: impl AsRef, master_key: &Option, - zk: Option, + zookeeper: Option>, ) -> Result { let store = HeedAuthStore::new(db_path)?; - let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zk }; + let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zookeeper }; - match controller.zk { + match controller.zookeeper { // setup the auth zk environment, the `auth` node - Some(ref zk) => { - let options = - zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); + Some(ref zookeeper) => { // TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create // for the moment we consider that `create` only returns Error::NodeExists. - match zk.create("/auth", &[], &options).await { + match zookeeper.create( + "/auth", + vec![], + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) { // If the store is empty, we must generate and push the default api-keys. - Ok(_) => generate_default_keys(&controller).await?, + Ok(_) => generate_default_keys(&controller)?, // If the node exist we should clear our DB and download all the existing api-keys - Err(zk::Error::NodeExists) => { + Err(ZkError::NodeExists) => { log::warn!("Auth directory already exists, we need to clear our keys + import the one in zookeeper"); let store = controller.store.clone(); - tokio::task::spawn_blocking(move || store.delete_all_keys()).await??; - let children = zk - .list_children("/auth") - .await + store.delete_all_keys()?; + let children = zookeeper + .get_children("/auth", false) .expect("Internal, the auth directory was deleted during execution."); log::info!("Importing {} api-keys", children.len()); for path in children { log::info!(" Importing {}", path); - match zk.get_data(&format!("/auth/{}", &path)).await { + match zookeeper.get_data(&format!("/auth/{}", &path), false) { Ok((key, _stat)) => { - let key = serde_json::from_slice(&key).unwrap(); - let store = controller.store.clone(); - tokio::task::spawn_blocking(move || store.put_api_key(key)) - .await??; - - }, - Err(e) => panic!("{e}") + let key = serde_json::from_slice(&key).unwrap(); + let store = controller.store.clone(); + store.put_api_key(key)?; + } + Err(e) => panic!("{e}"), } // else the file was deleted while we were inserting the key. We ignore it. // TODO: What happens if someone updates the files before we have the time @@ -74,13 +76,9 @@ impl AuthController { } } e @ Err( - zk::Error::NoNode - | zk::Error::NoChildrenForEphemerals - | zk::Error::InvalidAcl, + ZkError::NoNode | ZkError::NoChildrenForEphemerals | ZkError::InvalidACL, ) => unreachable!("{e:?}"), - Err(e) => { - panic!("{e}") - } + Err(e) => panic!("{e}"), } // TODO: Race condition above: // What happens if two node join exactly at the same moment: @@ -91,39 +89,34 @@ impl AuthController { // Zookeeper Event listener loop let controller_clone = controller.clone(); - let mut watcher = zk.watch("/auth", zk::AddWatchMode::PersistentRecursive).await?; - let czk = zk.clone(); - tokio::spawn(async move { - let zk = czk; - loop { - let zk::WatchedEvent { event_type, session_state, path } = - dbg!(watcher.changed().await); + let zkk = zookeeper.clone(); + zookeeper.add_watch("/auth", AddWatchMode::PersistentRecursive, move |event| { + let WatchedEvent { event_type, path, keeper_state: _ } = dbg!(event); - match event_type { - zk::EventType::Session => panic!("Session error {:?}", session_state), - // a key is deleted from zk - zk::EventType::NodeDeleted => { - // TODO: ugly unwraps - let uuid = path.strip_prefix("/auth/").unwrap(); - let uuid = Uuid::parse_str(&uuid).unwrap(); - log::info!("The key {} has been deleted", uuid); - dbg!(controller_clone.store.delete_api_key(uuid).unwrap()); - } - zk::EventType::NodeCreated | zk::EventType::NodeDataChanged => { - let (key, _stat) = zk.get_data(&path).await.unwrap(); - let key: Key = serde_json::from_slice(&key).unwrap(); - log::info!("The key {} has been deleted", key.uid); - - dbg!(controller_clone.store.put_api_key(key).unwrap()); - } - zk::EventType::NodeChildrenChanged => panic!("Got the unexpected NodeChildrenChanged event, what is it used for?"), + match event_type { + WatchedEventType::NodeDeleted => { + // TODO: ugly unwraps + let path = path.unwrap(); + let uuid = path.strip_prefix("/auth/").unwrap(); + let uuid = Uuid::parse_str(&uuid).unwrap(); + log::info!("The key {} has been deleted", uuid); + dbg!(controller_clone.store.delete_api_key(uuid).unwrap()); } + WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => { + let path = path.unwrap(); + let (key, _stat) = zkk.get_data(&path, false).unwrap(); + let key: Key = serde_json::from_slice(&key).unwrap(); + log::info!("The key {} has been deleted", key.uid); + + dbg!(controller_clone.store.put_api_key(key).unwrap()); + } + otherwise => panic!("Got the unexpected `{otherwise:?}` event!"), } - }); + })?; } None => { if controller.store.is_empty()? { - generate_default_keys(&controller).await?; + generate_default_keys(&controller)?; } } } @@ -147,27 +140,29 @@ impl AuthController { self.store.used_size() } - pub async fn create_key(&self, create_key: CreateApiKey) -> Result { + pub fn create_key(&self, create_key: CreateApiKey) -> Result { match self.store.get_api_key(create_key.uid)? { Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())), - None => self.put_key(create_key.to_key()).await, + None => self.put_key(create_key.to_key()), } } - pub async fn put_key(&self, key: Key) -> Result { + pub fn put_key(&self, key: Key) -> Result { let store = self.store.clone(); // TODO: we may commit only after zk persisted the keys - let key = tokio::task::spawn_blocking(move || store.put_api_key(key)).await??; - if let Some(ref zk) = self.zk { - let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); - - zk.create(&format!("/auth/{}", key.uid), &serde_json::to_vec_pretty(&key)?, &options) - .await?; + let key = store.put_api_key(key)?; + if let Some(zookeeper) = &self.zookeeper { + zookeeper.create( + &format!("/auth/{}", key.uid), + serde_json::to_vec_pretty(&key)?, + Acl::open_unsafe().clone(), + CreateMode::Persistent, + )?; } Ok(key) } - pub async fn update_key(&self, uid: Uuid, patch: PatchApiKey) -> Result { + pub fn update_key(&self, uid: Uuid, patch: PatchApiKey) -> Result { let mut key = self.get_key(uid)?; match patch.description { Setting::NotSet => (), @@ -180,10 +175,13 @@ impl AuthController { key.updated_at = OffsetDateTime::now_utc(); let store = self.store.clone(); // TODO: we may commit only after zk persisted the keys - let key = tokio::task::spawn_blocking(move || store.put_api_key(key)).await??; - if let Some(ref zk) = self.zk { - zk.set_data(&format!("/auth/{}", key.uid), &serde_json::to_vec_pretty(&key)?, None) - .await?; + let key = store.put_api_key(key)?; + if let Some(zookeeper) = &self.zookeeper { + zookeeper.set_data( + &format!("/auth/{}", key.uid), + serde_json::to_vec_pretty(&key)?, + None, + )?; } Ok(key) } @@ -226,12 +224,12 @@ impl AuthController { self.store.list_api_keys() } - pub async fn delete_key(&self, uid: Uuid) -> Result<()> { + pub fn delete_key(&self, uid: Uuid) -> Result<()> { let store = self.store.clone(); - let deleted = tokio::task::spawn_blocking(move || store.delete_api_key(uid)).await??; + let deleted = store.delete_api_key(uid)?; if deleted { - if let Some(ref zk) = self.zk { - zk.delete(&format!("/auth/{}", uid), None).await?; + if let Some(zookeeper) = &self.zookeeper { + zookeeper.delete(&format!("/auth/{}", uid), None)?; } Ok(()) } else { @@ -426,10 +424,9 @@ pub struct IndexSearchRules { pub filter: Option, } -async fn generate_default_keys(controller: &AuthController) -> Result<()> { - controller.put_key(Key::default_admin()).await?; - controller.put_key(Key::default_search()).await?; - +fn generate_default_keys(controller: &AuthController) -> Result<()> { + controller.put_key(Key::default_admin())?; + controller.put_key(Key::default_search())?; Ok(()) } diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index bf5501d63..9ac09934e 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -105,7 +105,7 @@ walkdir = "2.3.3" yaup = "0.2.1" serde_urlencoded = "0.7.1" termcolor = "1.2.0" -zookeeper-client = "0.5.0" +zookeeper = "0.8.0" [dev-dependencies] actix-rt = "2.8.0" diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 1b277fe90..fb9b0bb66 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -39,7 +39,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; -use zookeeper_client as zk; +use zookeeper::ZooKeeper; use crate::error::MeilisearchHttpError; @@ -139,7 +139,7 @@ enum OnFailure { pub async fn setup_meilisearch( opt: &Opt, - zk: Option, + zookeeper: Option>, ) -> anyhow::Result<(Arc, Arc)> { let empty_db = is_empty_db(&opt.db_path); let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot { @@ -147,7 +147,7 @@ pub async fn setup_meilisearch( // the db is empty and the snapshot exists, import it if empty_db && snapshot_path_exists { match compression::from_tar_gz(snapshot_path, &opt.db_path) { - Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zk).await?, + Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zookeeper)?, Err(e) => { std::fs::remove_dir_all(&opt.db_path)?; return Err(e); @@ -164,14 +164,14 @@ pub async fn setup_meilisearch( bail!("snapshot doesn't exist at {}", snapshot_path.display()) // the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag } else { - open_or_create_database(opt, empty_db, zk).await? + open_or_create_database(opt, empty_db, zookeeper)? } } else if let Some(ref path) = opt.import_dump { let src_path_exists = path.exists(); // the db is empty and the dump exists, import it if empty_db && src_path_exists { let (mut index_scheduler, mut auth_controller) = - open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zk).await?; + open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zookeeper)?; match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { Ok(()) => (index_scheduler, auth_controller), Err(e) => { @@ -191,10 +191,10 @@ pub async fn setup_meilisearch( // the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag // or, the dump is missing but we can ignore that because of the ignore_missing_dump flag } else { - open_or_create_database(opt, empty_db, zk).await? + open_or_create_database(opt, empty_db, zookeeper)? } } else { - open_or_create_database(opt, empty_db, zk).await? + open_or_create_database(opt, empty_db, zookeeper)? }; // We create a loop in a thread that registers snapshotCreation tasks @@ -203,30 +203,26 @@ pub async fn setup_meilisearch( if let ScheduleSnapshot::Enabled(snapshot_delay) = opt.schedule_snapshot { let snapshot_delay = Duration::from_secs(snapshot_delay); let index_scheduler = index_scheduler.clone(); - tokio::task::spawn(async move { - loop { - thread::sleep(snapshot_delay); - if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation).await { - error!("Error while registering snapshot: {}", e); - } + thread::spawn(move || loop { + thread::sleep(snapshot_delay); + if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { + error!("Error while registering snapshot: {}", e); } - }) - .await - .unwrap(); + }); } Ok((index_scheduler, auth_controller)) } /// Try to start the IndexScheduler and AuthController without checking the VERSION file or anything. -async fn open_or_create_database_unchecked( +fn open_or_create_database_unchecked( opt: &Opt, on_failure: OnFailure, - zk: Option, + zookeeper: Option>, ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. - let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zk.clone()); + let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zookeeper.clone()); let instance_features = opt.to_instance_features(); let index_scheduler = IndexScheduler::new(IndexSchedulerOptions { version_file_path: opt.db_path.join(VERSION_FILE_NAME), @@ -245,14 +241,13 @@ async fn open_or_create_database_unchecked( index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_count: DEFAULT_INDEX_COUNT, instance_features, - zk: zk.clone(), + zookeeper: zookeeper.clone(), }) - .await .map_err(anyhow::Error::from); match ( index_scheduler, - auth_controller.await.map_err(anyhow::Error::from), + auth_controller.map_err(anyhow::Error::from), create_version_file(&opt.db_path).map_err(anyhow::Error::from), ) { (Ok(i), Ok(a), Ok(())) => Ok((i, a)), @@ -266,16 +261,16 @@ async fn open_or_create_database_unchecked( } /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. -async fn open_or_create_database( +fn open_or_create_database( opt: &Opt, empty_db: bool, - zk: Option, + zookeeper: Option>, ) -> anyhow::Result<(IndexScheduler, AuthController)> { if !empty_db { check_version_file(&opt.db_path)?; } - open_or_create_database_unchecked(opt, OnFailure::KeepDb, zk).await + open_or_create_database_unchecked(opt, OnFailure::KeepDb, zookeeper) } fn import_dump( diff --git a/meilisearch/src/main.rs b/meilisearch/src/main.rs index f436a6bf1..ad64384df 100644 --- a/meilisearch/src/main.rs +++ b/meilisearch/src/main.rs @@ -2,6 +2,7 @@ use std::env; use std::io::{stderr, Write}; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use actix_web::http::KeepAlive; use actix_web::web::Data; @@ -12,7 +13,7 @@ use meilisearch::analytics::Analytics; use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, Opt}; use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE}; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; -use zookeeper_client as zk; +use zookeeper::ZooKeeper; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; @@ -64,11 +65,10 @@ async fn main() -> anyhow::Result<()> { _ => (), } - let zk = match opt.zk_url { - Some(ref url) => Some(zk::Client::connect(url).await.unwrap()), - None => None, - }; - let (index_scheduler, auth_controller) = setup_meilisearch(&opt, zk).await?; + let timeout = Duration::from_millis(2500); + let zookeeper = + opt.zk_url.as_ref().map(|url| Arc::new(ZooKeeper::connect(url, timeout, drop).unwrap())); + let (index_scheduler, auth_controller) = setup_meilisearch(&opt, zookeeper).await?; #[cfg(all(not(debug_assertions), feature = "analytics"))] let analytics = if !opt.no_analytics { diff --git a/meilisearch/src/routes/api_key.rs b/meilisearch/src/routes/api_key.rs index 0caa4e0f0..6f2593612 100644 --- a/meilisearch/src/routes/api_key.rs +++ b/meilisearch/src/routes/api_key.rs @@ -41,7 +41,7 @@ pub async fn create_api_key( _req: HttpRequest, ) -> Result { let v = body.into_inner(); - let key = auth_controller.create_key(v).await?; + let key = auth_controller.create_key(v)?; let key = KeyView::from_key(key, &auth_controller); Ok(HttpResponse::Created().json(key)) @@ -107,7 +107,7 @@ pub async fn patch_api_key( let key = path.into_inner().key; let patch_api_key = body.into_inner(); let uid = Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?; - let key = auth_controller.update_key(uid, patch_api_key).await?; + let key = auth_controller.update_key(uid, patch_api_key)?; let key = KeyView::from_key(key, &auth_controller); Ok(HttpResponse::Ok().json(key)) @@ -119,7 +119,7 @@ pub async fn delete_api_key( ) -> Result { let key = path.into_inner().key; let uid = Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?; - auth_controller.delete_key(uid).await?; + auth_controller.delete_key(uid)?; Ok(HttpResponse::NoContent().finish()) } diff --git a/meilisearch/src/routes/dump.rs b/meilisearch/src/routes/dump.rs index e61b40ff1..57feb6634 100644 --- a/meilisearch/src/routes/dump.rs +++ b/meilisearch/src/routes/dump.rs @@ -29,7 +29,7 @@ pub async fn create_dump( keys: auth_controller.list_keys()?, instance_uid: analytics.instance_uid().cloned(), }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index a99f57911..249aeca4e 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -129,7 +129,7 @@ pub async fn delete_document( index_uid: index_uid.to_string(), documents_ids: vec![document_id], }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) } @@ -444,7 +444,7 @@ async fn document_addition( }; let scheduler = index_scheduler.clone(); - let task = match scheduler.register(task).await { + let task = match scheduler.register(task) { Ok(task) => task, Err(e) => { index_scheduler.delete_update_file(uuid)?; @@ -475,7 +475,7 @@ pub async fn delete_documents_batch( let task = KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -510,7 +510,7 @@ pub async fn delete_documents_by_filter( .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -526,7 +526,7 @@ pub async fn clear_all_documents( analytics.delete_documents(DocumentDeletionKind::ClearAll, &req); let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index fee7310e0..05bf3f1be 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -135,7 +135,7 @@ pub async fn create_index( ); let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); Ok(HttpResponse::Accepted().json(task)) } else { @@ -202,7 +202,7 @@ pub async fn update_index( primary_key: body.primary_key, }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -214,7 +214,7 @@ pub async fn delete_index( ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); Ok(HttpResponse::Accepted().json(task)) } diff --git a/meilisearch/src/routes/indexes/settings.rs b/meilisearch/src/routes/indexes/settings.rs index df2748cf1..7c6efc77d 100644 --- a/meilisearch/src/routes/indexes/settings.rs +++ b/meilisearch/src/routes/indexes/settings.rs @@ -55,7 +55,7 @@ macro_rules! make_setting_route { is_deletion: true, allow_index_creation, }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -94,7 +94,7 @@ macro_rules! make_setting_route { is_deletion: false, allow_index_creation, }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -580,7 +580,7 @@ pub async fn update_all( is_deletion: false, allow_index_creation, }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -615,7 +615,7 @@ pub async fn delete_all( is_deletion: true, allow_index_creation, }; - let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); + let task: SummarizedTaskView = index_scheduler.register(task)?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/swap_indexes.rs b/meilisearch/src/routes/swap_indexes.rs index acaffea33..c4e204c09 100644 --- a/meilisearch/src/routes/swap_indexes.rs +++ b/meilisearch/src/routes/swap_indexes.rs @@ -61,7 +61,7 @@ pub async fn swap_indexes( let task = KindWithContent::IndexSwap { swaps }; - let task = index_scheduler.register(task).await?; + let task = index_scheduler.register(task)?; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Accepted().json(task)) } diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index 5a5a2215a..1c2da6cef 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -332,7 +332,7 @@ async fn cancel_tasks( let task_cancelation = KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; - let task = index_scheduler.register(task_cancelation).await?; + let task = index_scheduler.register(task_cancelation)?; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task)) @@ -377,7 +377,7 @@ async fn delete_tasks( let task_deletion = KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; - let task = index_scheduler.register(task_deletion).await?; + let task = index_scheduler.register(task_deletion)?; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task))