From c9a236b0af59486621e63a76ef847ccf6399c567 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 19 Jan 2022 11:21:19 +0100 Subject: [PATCH] feat(lib): auto-batching --- Cargo.lock | 141 ++-- meilisearch-http/src/lib.rs | 17 +- meilisearch-http/src/option.rs | 11 +- meilisearch-http/src/task.rs | 16 + meilisearch-http/tests/common/server.rs | 1 + meilisearch-lib/Cargo.toml | 5 +- .../index_resolver/mod.txt | 1 - meilisearch-lib/src/index/dump.rs | 23 +- meilisearch-lib/src/index/error.rs | 6 +- meilisearch-lib/src/index/index.rs | 9 +- meilisearch-lib/src/index/mod.rs | 20 +- meilisearch-lib/src/index/search.rs | 16 +- meilisearch-lib/src/index/update_handler.rs | 49 -- meilisearch-lib/src/index/updates.rs | 50 +- .../src/index_controller/dump_actor/actor.rs | 12 +- .../src/index_controller/dump_actor/mod.rs | 126 +-- meilisearch-lib/src/index_controller/mod.rs | 90 ++- .../src/index_resolver/index_store.rs | 13 +- meilisearch-lib/src/index_resolver/mod.rs | 166 ++-- meilisearch-lib/src/options.rs | 51 +- meilisearch-lib/src/snapshot.rs | 8 +- meilisearch-lib/src/tasks/batch.rs | 6 +- meilisearch-lib/src/tasks/mod.rs | 31 +- meilisearch-lib/src/tasks/scheduler.rs | 715 ++++++++++++------ meilisearch-lib/src/tasks/task.rs | 2 +- meilisearch-lib/src/tasks/task_store/mod.rs | 201 ++--- meilisearch-lib/src/tasks/task_store/store.rs | 65 +- meilisearch-lib/src/tasks/update_loop.rs | 107 +++ 28 files changed, 1181 insertions(+), 777 deletions(-) delete mode 100644 meilisearch-lib/src/index/update_handler.rs create mode 100644 meilisearch-lib/src/tasks/update_loop.rs diff --git a/Cargo.lock b/Cargo.lock index 16c621df7..5af898cc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,7 +79,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "465a6172cf69b960917811022d8f29bc0b7fa1398bc4f78b3c466673db1213b6" dependencies = [ - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -202,7 +202,7 @@ dependencies = [ "serde_urlencoded", "smallvec", "socket2", - "time 0.3.6", + "time 0.3.7", "url", ] @@ -214,7 +214,7 @@ checksum = "98a793e4a7bd059e06e1bc1bd9943b57a47f806de3599d2437441682292c333e" dependencies = [ "actix-router", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -287,9 +287,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.52" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84450d0b4a8bd1ba4144ce8ce718fbc5d071358b1e5384bace6536b3d1f2d5b3" +checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" dependencies = [ "backtrace", ] @@ -339,7 +339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -350,10 +350,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] +[[package]] +name = "atomic_refcell" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b5e5f48b927f04e952dedc932f31995a65a0bf65ec971c74436e51bf6e970d" + [[package]] name = "atty" version = "0.2.14" @@ -400,9 +406,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bimap" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07" +checksum = "bc0455254eb5c6964c4545d8bac815e1a1be4f3afe0ae695ea539c12d728d44b" dependencies = [ "serde", ] @@ -526,7 +532,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e215f8c2f9f79cb53c8335e687ffd07d5bfcb6fe5fc80723762d0be46e7cc54" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -648,9 +654,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.0.10" +version = "3.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a30c3bf9ff12dfe5dae53f0a96e0febcd18420d1c0e7fad77796d9d5c4b5375" +checksum = "08799f92c961c7a1cf0cc398a9073da99e21ce388b46372c37f3191f2f3eed3e" dependencies = [ "atty", "bitflags", @@ -665,14 +671,14 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.0.6" +version = "3.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "517358c28fcef6607bf6f76108e02afad7e82297d132a6b846dcc1fc3efcd153" +checksum = "0fd2078197a22f338bd4fbf7d6387eb6f0d6a3c69e6cbc09f5c93e97321fd92a" dependencies = [ "heck", "proc-macro-error", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -683,7 +689,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df715824eb382e34b7afb7463b0247bf41538aeba731fba05241ecdb5dc3747" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -700,7 +706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05" dependencies = [ "percent-encoding", - "time 0.3.6", + "time 0.3.7", "version_check", ] @@ -835,7 +841,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -847,7 +853,7 @@ checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" dependencies = [ "convert_case", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "rustc_version", "syn 1.0.86", ] @@ -942,7 +948,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c134c37760b27a871ba422106eedbb8247da973a09e82558bf26d619c882b159" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -961,9 +967,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "779d043b6a0b90cc4c0ed7ee380a6504394cee7efd7db050e3774eee387324b2" +checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" dependencies = [ "instant", ] @@ -983,7 +989,7 @@ dependencies = [ [[package]] name = "filter-parser" version = "0.1.0" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.21.1#7f50ca9a20090fc4fe2abae0394c1e6fdd351ebd" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.22.0#9f2ff71581ec1e0a54a3c9be030537705c27ec2d" dependencies = [ "nom", "nom_locate", @@ -1105,7 +1111,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -1201,7 +1207,7 @@ checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" dependencies = [ "proc-macro-error", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -1243,9 +1249,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c9de88456263e249e241fcd211d3954e2c9b0ef7ccfc235a444eb367cae3689" +checksum = "d9f1f717ddc7b2ba36df7e871fd88db79326551d3d6f1fc406fbfd28b582ff8e" dependencies = [ "bytes", "fnv", @@ -1564,9 +1570,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eef78b64d87775463c549fbd80e19249ef436ea3bf1de2a1eb7e717ec7fab1e9" +checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50" [[package]] name = "libgit2-sys" @@ -1662,7 +1668,7 @@ checksum = "10a9062912d7952c5588cc474795e0b9ee008e7e6781127945b85413d4b99d81" dependencies = [ "log", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -1787,6 +1793,7 @@ dependencies = [ "anyhow", "async-stream", "async-trait", + "atomic_refcell", "byte-unit", "bytes", "chrono", @@ -1841,8 +1848,8 @@ dependencies = [ [[package]] name = "meilisearch-tokenizer" -version = "0.2.6" -source = "git+https://github.com/meilisearch/tokenizer.git?tag=v0.2.6#a69bb0cf442ea6357464d71bdf5d28273e5153ba" +version = "0.2.7" +source = "git+https://github.com/meilisearch/tokenizer.git?tag=v0.2.7#e14f64f2482d8f57e9aae8dc37dcb1469099f6f3" dependencies = [ "character_converter", "cow-utils", @@ -1881,8 +1888,8 @@ dependencies = [ [[package]] name = "milli" -version = "0.21.1" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.21.1#7f50ca9a20090fc4fe2abae0394c1e6fdd351ebd" +version = "0.22.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.22.0#9f2ff71581ec1e0a54a3c9be030537705c27ec2d" dependencies = [ "bimap", "bincode", @@ -2013,14 +2020,14 @@ checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033" dependencies = [ "cfg-if 1.0.0", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] [[package]] name = "nelson" version = "0.1.0" -source = "git+https://github.com/MarinPostma/nelson.git?rev=e5f4ff046c21e7e986c7cb31550d1c9e7f0b693b#e5f4ff046c21e7e986c7cb31550d1c9e7f0b693b" +source = "git+https://github.com/MarinPostma/nelson.git?rev=675f13885548fb415ead8fbb447e9e6d9314000a#675f13885548fb415ead8fbb447e9e6d9314000a" [[package]] name = "nom" @@ -2101,9 +2108,9 @@ dependencies = [ [[package]] name = "num_threads" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71a1eb3a36534514077c1e079ada2fb170ef30c47d203aa6916138cf882ecd52" +checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15" dependencies = [ "libc", ] @@ -2286,7 +2293,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -2360,7 +2367,7 @@ checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", "version_check", ] @@ -2372,7 +2379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "version_check", ] @@ -2448,9 +2455,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47aa80447ce4daf1717500037052af176af5d38cc3e571d9ec1c7353fc10c87d" +checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" dependencies = [ "proc-macro2 1.0.36", ] @@ -2793,29 +2800,29 @@ checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" [[package]] name = "serde" -version = "1.0.134" +version = "1.0.136" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96b3c34c1690edf8174f5b289a336ab03f568a4460d8c6df75f2f3a692b3bc6a" +checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.134" +version = "1.0.136" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784ed1fbfa13fe191077537b0d70ec8ad1e903cfe04831da608aa36457cb653d" +checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] [[package]] name = "serde_json" -version = "1.0.75" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79" +checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085" dependencies = [ "indexmap", "itoa 1.0.1", @@ -2938,9 +2945,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "socket2" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f82496b90c36d70af5fcd482edaa2e0bd16fade569de1330405fecbbdac736b" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" dependencies = [ "libc", "winapi", @@ -2993,7 +3000,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "unicode-xid 0.2.2", ] @@ -3013,7 +3020,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", "unicode-xid 0.2.2", ] @@ -3095,7 +3102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -3133,9 +3140,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8d54b9298e05179c335de2b9645d061255bcd5155f843b3e328d2cfe0a5b413" +checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d" dependencies = [ "itoa 1.0.1", "libc", @@ -3190,7 +3197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", ] @@ -3443,7 +3450,7 @@ dependencies = [ "lazy_static", "log", "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", "wasm-bindgen-shared", ] @@ -3466,7 +3473,7 @@ version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" dependencies = [ - "quote 1.0.14", + "quote 1.0.15", "wasm-bindgen-macro-support", ] @@ -3477,7 +3484,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" dependencies = [ "proc-macro2 1.0.36", - "quote 1.0.14", + "quote 1.0.15", "syn 1.0.86", "wasm-bindgen-backend", "wasm-bindgen-shared", @@ -3633,18 +3640,18 @@ dependencies = [ [[package]] name = "zstd" -version = "0.9.3+zstd.1.5.2" +version = "0.9.2+zstd.1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "377f9c6801b6cbb254c3055266be42698b5bc6563c56b37e5fcca637a68eba95" +checksum = "2390ea1bf6c038c39674f22d95f0564725fc06034a47129179810b2fc58caa54" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "4.1.4+zstd.1.5.2" +version = "4.1.3+zstd.1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee" +checksum = "e99d81b99fb3c2c2c794e3fe56c305c63d5173a16a46b5850b07c935ffc7db79" dependencies = [ "libc", "zstd-sys", @@ -3652,9 +3659,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.3+zstd.1.5.2" +version = "1.6.2+zstd.1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" +checksum = "2daf2f248d9ea44454bfcb2516534e8b8ad2fc91bf818a1885495fc42bc8ac9f" dependencies = [ "cc", "libc", diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index a8abdae49..907a4423e 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -9,7 +9,7 @@ pub mod helpers; pub mod option; pub mod routes; -use std::sync::Arc; +use std::sync::{atomic::AtomicBool, Arc}; use std::time::Duration; use crate::error::MeilisearchHttpError; @@ -25,8 +25,17 @@ use extractors::payload::PayloadConfig; use meilisearch_auth::AuthController; use meilisearch_lib::MeiliSearch; +pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false); + pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { let mut meilisearch = MeiliSearch::builder(); + + // enable autobatching? + let _ = AUTOBATCHING_ENABLED.store( + opt.scheduler_options.enable_autobatching, + std::sync::atomic::Ordering::Relaxed, + ); + meilisearch .set_max_index_size(opt.max_index_size.get_bytes() as usize) .set_max_task_store_size(opt.max_task_db_size.get_bytes() as usize) @@ -52,7 +61,11 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { meilisearch.set_schedule_snapshot(); } - meilisearch.build(opt.db_path.clone(), opt.indexer_options.clone()) + meilisearch.build( + opt.db_path.clone(), + opt.indexer_options.clone(), + opt.scheduler_options.clone(), + ) } pub fn configure_data( diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index d59fd48d6..b6cc3db22 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use byte_unit::Byte; use clap::Parser; -use meilisearch_lib::options::IndexerOpts; +use meilisearch_lib::options::{IndexerOpts, SchedulerConfig}; use rustls::{ server::{ AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, @@ -147,9 +147,18 @@ pub struct Opt { #[serde(skip)] #[clap(skip)] pub indexer_options: IndexerOpts, + + #[clap(flatten)] + pub scheduler_options: SchedulerConfig, } impl Opt { + /// Wether analytics should be enabled or not. + #[cfg(all(not(debug_assertions), feature = "analytics"))] + pub fn analytics(&self) -> bool { + !self.no_analytics + } + pub fn get_ssl_config(&self) -> anyhow::Result> { if let (Some(cert_path), Some(key_path)) = (&self.ssl_cert_path, &self.ssl_key_path) { let config = rustls::ServerConfig::builder().with_safe_defaults(); diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index b81ecd7af..9881854e3 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -2,11 +2,14 @@ use chrono::{DateTime, Duration, Utc}; use meilisearch_error::ResponseError; use meilisearch_lib::index::{Settings, Unchecked}; use meilisearch_lib::milli::update::IndexDocumentsMethod; +use meilisearch_lib::tasks::batch::BatchId; use meilisearch_lib::tasks::task::{ DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult, }; use serde::{Serialize, Serializer}; +use crate::AUTOBATCHING_ENABLED; + #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] enum TaskType { @@ -106,6 +109,8 @@ pub struct TaskView { enqueued_at: DateTime, started_at: Option>, finished_at: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + batch_uid: Option>, } impl From for TaskView { @@ -252,6 +257,16 @@ impl From for TaskView { let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts)); + let batch_uid = if AUTOBATCHING_ENABLED.load(std::sync::atomic::Ordering::Relaxed) { + let id = events.iter().find_map(|e| match e { + TaskEvent::Batched { batch_id, .. } => Some(*batch_id), + _ => None, + }); + Some(id) + } else { + None + }; + Self { uid: id, index_uid: index_uid.into_inner(), @@ -263,6 +278,7 @@ impl From for TaskView { enqueued_at, started_at, finished_at, + batch_uid, } } } diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index bdaf75ac1..dcb4b6266 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -156,5 +156,6 @@ pub fn default_settings(dir: impl AsRef) -> Opt { ..Default::default() }, log_level: "off".into(), + scheduler_options: meilisearch_lib::options::SchedulerConfig::default(), } } diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 154dbfc68..bd8d4715f 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -28,7 +28,7 @@ lazy_static = "1.4.0" log = "0.4.14" meilisearch-error = { path = "../meilisearch-error" } meilisearch-auth = { path = "../meilisearch-auth" } -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.21.1" } +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.22.0" } mime = "0.3.16" num_cpus = "1.13.0" once_cell = "1.8.0" @@ -55,12 +55,13 @@ reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-featu sysinfo = "0.20.2" derivative = "2.2.0" fs_extra = "1.2.0" +atomic_refcell = "0.1.8" [dev-dependencies] actix-rt = "2.2.0" mockall = "0.10.2" paste = "1.0.5" -nelson = { git = "https://github.com/MarinPostma/nelson.git", rev = "e5f4ff046c21e7e986c7cb31550d1c9e7f0b693b"} +nelson = { git = "https://github.com/MarinPostma/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"} meilisearch-error = { path = "../meilisearch-error", features = ["test-traits"] } proptest = "1.0.0" proptest-derive = "0.3.0" diff --git a/meilisearch-lib/proptest-regressions/index_resolver/mod.txt b/meilisearch-lib/proptest-regressions/index_resolver/mod.txt index 583db4918..553b8f1d5 100644 --- a/meilisearch-lib/proptest-regressions/index_resolver/mod.txt +++ b/meilisearch-lib/proptest-regressions/index_resolver/mod.txt @@ -17,4 +17,3 @@ cc 3a01c78db082434b8a4f8914abf0d1059d39f4426d16df20d72e1bd7ebb94a6a # shrinks to cc c450806df3921d1e6fe9b6af93d999e8196d0175b69b64f1810802582421e94a # shrinks to task = Task { id: 0, index_uid: IndexUid("a"), content: CreateIndex { primary_key: Some("") }, events: [] }, index_exists = false, index_op_fails = false, any_int = 0 cc fb6b98947cbdbdee05ed3c0bf2923aad2c311edc276253642eb43a0c0ec4888a # shrinks to task = Task { id: 0, index_uid: IndexUid("A"), content: CreateIndex { primary_key: Some("") }, events: [] }, index_exists = false, index_op_fails = true, any_int = 0 cc 1aa59d8e22484e9915efbb5818e1e1ab684aa61b166dc82130d6221663ba00bf # shrinks to task = Task { id: 0, index_uid: IndexUid("a"), content: DocumentDeletion(Clear), events: [] }, index_exists = true, index_op_fails = false, any_int = 0 -cc 2e8644e6397b5f76e0b79f961fa125e2f45f42f26e03c453c9a174dfb427500d # shrinks to task = Task { id: 0, index_uid: IndexUid("0"), content: SettingsUpdate { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, synonyms: NotSet, distinct_attribute: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: false }, events: [] }, index_exists = false, index_op_fails = false, any_int = 0 diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index 661aefa79..153b724b9 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -6,10 +6,10 @@ use anyhow::Context; use heed::{EnvOpenOptions, RoTxn}; use indexmap::IndexMap; use milli::documents::DocumentBatchReader; +use milli::update::{IndexDocumentsConfig, IndexerConfig}; use serde::{Deserialize, Serialize}; use crate::document_formats::read_ndjson; -use crate::index::update_handler::UpdateHandler; use crate::index::updates::apply_settings_to_builder; use super::error::Result; @@ -85,7 +85,7 @@ impl Index { src: impl AsRef, dst: impl AsRef, size: usize, - update_handler: &UpdateHandler, + indexer_config: &IndexerConfig, ) -> anyhow::Result<()> { let dir_name = src .as_ref() @@ -110,8 +110,7 @@ impl Index { let mut txn = index.write_txn()?; // Apply settings first - let builder = update_handler.update_builder(); - let mut builder = builder.settings(&mut txn, &index); + let mut builder = milli::update::Settings::new(&mut txn, &index, indexer_config); if let Some(primary_key) = primary_key { builder.set_primary_key(primary_key); @@ -140,12 +139,16 @@ impl Index { //If the document file is empty, we don't perform the document addition, to prevent //a primary key error to be thrown. - if !documents_reader.is_empty() { - let builder = update_handler - .update_builder() - .index_documents(&mut txn, &index); - builder.execute(documents_reader, |_| ())?; - } + let config = IndexDocumentsConfig::default(); + let mut builder = milli::update::IndexDocuments::new( + &mut txn, + &index, + indexer_config, + config, + |_| (), + ); + builder.add_documents(documents_reader)?; + builder.execute()?; } txn.commit()?; diff --git a/meilisearch-lib/src/index/error.rs b/meilisearch-lib/src/index/error.rs index 23a252100..f8dcc0dc8 100644 --- a/meilisearch-lib/src/index/error.rs +++ b/meilisearch-lib/src/index/error.rs @@ -3,7 +3,7 @@ use std::error::Error; use meilisearch_error::{internal_error, Code, ErrorCode}; use serde_json::Value; -use crate::error::MilliError; +use crate::{error::MilliError, update_file_store}; pub type Result = std::result::Result; @@ -23,7 +23,9 @@ internal_error!( IndexError: std::io::Error, heed::Error, fst::Error, - serde_json::Error + serde_json::Error, + update_file_store::UpdateFileStoreError, + milli::documents::Error ); impl ErrorCode for IndexError { diff --git a/meilisearch-lib/src/index/index.rs b/meilisearch-lib/src/index/index.rs index ca82b0d95..41803b6d5 100644 --- a/meilisearch-lib/src/index/index.rs +++ b/meilisearch-lib/src/index/index.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use heed::{EnvOpenOptions, RoTxn}; -use milli::update::Setting; +use milli::update::{IndexerConfig, Setting}; use milli::{obkv_to_json, FieldDistribution, FieldId}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -17,7 +17,6 @@ use crate::EnvSizer; use super::error::IndexError; use super::error::Result; -use super::update_handler::UpdateHandler; use super::{Checked, Settings}; pub type Document = Map; @@ -68,7 +67,7 @@ pub struct Index { #[derivative(Debug = "ignore")] pub inner: Arc, #[derivative(Debug = "ignore")] - pub update_handler: Arc, + pub indexer_config: Arc, } impl Deref for Index { @@ -84,7 +83,7 @@ impl Index { path: impl AsRef, size: usize, uuid: Uuid, - update_handler: Arc, + update_handler: Arc, ) -> Result { log::debug!("opening index in {}", path.as_ref().display()); create_dir_all(&path)?; @@ -94,7 +93,7 @@ impl Index { Ok(Index { inner, uuid, - update_handler, + indexer_config: update_handler, }) } diff --git a/meilisearch-lib/src/index/mod.rs b/meilisearch-lib/src/index/mod.rs index 7f9470b24..3e6be739b 100644 --- a/meilisearch-lib/src/index/mod.rs +++ b/meilisearch-lib/src/index/mod.rs @@ -4,7 +4,6 @@ pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecke mod dump; pub mod error; mod search; -pub mod update_handler; pub mod updates; #[allow(clippy::module_inception)] @@ -26,6 +25,7 @@ pub mod test { use std::path::PathBuf; use std::sync::Arc; + use milli::update::IndexerConfig; use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod}; use nelson::Mocker; use serde_json::{Map, Value}; @@ -33,7 +33,6 @@ pub mod test { use super::error::Result; use super::index::Index; - use super::update_handler::UpdateHandler; use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings}; use crate::update_file_store::UpdateFileStore; @@ -52,7 +51,7 @@ pub mod test { path: impl AsRef, size: usize, uuid: Uuid, - update_handler: Arc, + update_handler: Arc, ) -> Result { let index = Index::open(path, size, uuid, update_handler)?; Ok(Self::Real(index)) @@ -62,7 +61,7 @@ pub mod test { src: impl AsRef, dst: impl AsRef, size: usize, - update_handler: &UpdateHandler, + update_handler: &IndexerConfig, ) -> anyhow::Result<()> { Index::load_dump(src, dst, size, update_handler) } @@ -157,21 +156,18 @@ pub mod test { pub fn update_documents( &self, method: IndexDocumentsMethod, - content_uuid: Uuid, primary_key: Option, file_store: UpdateFileStore, + contents: impl Iterator, ) -> Result { match self { MockIndex::Real(index) => { - index.update_documents(method, content_uuid, primary_key, file_store) + index.update_documents(method, primary_key, file_store, contents) } MockIndex::Mock(mocker) => unsafe { - mocker.get("update_documents").call(( - method, - content_uuid, - primary_key, - file_store, - )) + mocker + .get("update_documents") + .call((method, primary_key, file_store, contents)) }, } } diff --git a/meilisearch-lib/src/index/search.rs b/meilisearch-lib/src/index/search.rs index 99c4e8bf3..644b75468 100644 --- a/meilisearch-lib/src/index/search.rs +++ b/meilisearch-lib/src/index/search.rs @@ -295,7 +295,7 @@ fn compute_value_matches<'a, A: AsRef<[u8]>>( let mut start = 0; for (word, token) in analyzed.reconstruct() { if token.is_word() { - if let Some(length) = matcher.matches(token.text()) { + if let Some(length) = matcher.matches(&token) { infos.push(MatchInfo { start, length }); } } @@ -486,18 +486,18 @@ fn format_fields>( /// trait to allow unit testing of `format_fields` trait Matcher { - fn matches(&self, w: &str) -> Option; + fn matches(&self, w: &Token) -> Option; } #[cfg(test)] impl Matcher for BTreeMap<&str, Option> { - fn matches(&self, w: &str) -> Option { - self.get(w).cloned().flatten() + fn matches(&self, w: &Token) -> Option { + self.get(w.text()).cloned().flatten() } } impl Matcher for MatchingWords { - fn matches(&self, w: &str) -> Option { + fn matches(&self, w: &Token) -> Option { self.matching_bytes(w) } } @@ -579,7 +579,7 @@ impl<'a, A: AsRef<[u8]>> Formatter<'a, A> { let mut tokens = analyzed.reconstruct().peekable(); while let Some((word, token)) = - tokens.next_if(|(_, token)| matcher.matches(token.text()).is_none()) + tokens.next_if(|(_, token)| matcher.matches(token).is_none()) { buffer.push((word, token)); } @@ -623,7 +623,7 @@ impl<'a, A: AsRef<[u8]>> Formatter<'a, A> { // Check if we need to do highlighting or computed matches before calling // Matcher::match since the call is expensive. if format_options.highlight && token.is_word() { - if let Some(length) = matcher.matches(token.text()) { + if let Some(length) = matcher.matches(&token) { match word.get(..length).zip(word.get(length..)) { Some((head, tail)) => { out.push_str(&self.marks.0); @@ -653,7 +653,7 @@ fn parse_filter(facets: &Value) -> Result> { match facets { Value::String(expr) => { let condition = Filter::from_str(expr)?; - Ok(Some(condition)) + Ok(condition) } Value::Array(arr) => parse_filter_array(arr), v => Err(FacetError::InvalidExpression(&["Array"], v.clone()).into()), diff --git a/meilisearch-lib/src/index/update_handler.rs b/meilisearch-lib/src/index/update_handler.rs deleted file mode 100644 index 4b311dbfb..000000000 --- a/meilisearch-lib/src/index/update_handler.rs +++ /dev/null @@ -1,49 +0,0 @@ -use milli::update::UpdateBuilder; -use milli::CompressionType; -use rayon::ThreadPool; - -use crate::options::IndexerOpts; - -pub struct UpdateHandler { - max_nb_chunks: Option, - chunk_compression_level: Option, - thread_pool: ThreadPool, - log_frequency: usize, - max_memory: Option, - chunk_compression_type: CompressionType, -} - -impl UpdateHandler { - pub fn new(opt: &IndexerOpts) -> anyhow::Result { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(opt.indexing_jobs.unwrap_or(num_cpus::get() / 2)) - .build()?; - - Ok(Self { - max_nb_chunks: opt.max_nb_chunks, - chunk_compression_level: opt.chunk_compression_level, - thread_pool, - log_frequency: opt.log_every_n, - max_memory: opt.max_memory.map(|m| m.get_bytes() as usize), - chunk_compression_type: opt.chunk_compression_type, - }) - } - - pub fn update_builder(&self) -> UpdateBuilder { - // We prepare the update by using the update builder. - let mut update_builder = UpdateBuilder::new(); - if let Some(max_nb_chunks) = self.max_nb_chunks { - update_builder.max_nb_chunks(max_nb_chunks); - } - if let Some(chunk_compression_level) = self.chunk_compression_level { - update_builder.chunk_compression_level(chunk_compression_level); - } - update_builder.thread_pool(&self.thread_pool); - update_builder.log_every_n(self.log_frequency); - if let Some(max_memory) = self.max_memory { - update_builder.max_memory(max_memory); - } - update_builder.chunk_compression_type(self.chunk_compression_type); - update_builder - } -} diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index c1fc9a5c0..07bb0da0e 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -5,7 +5,8 @@ use std::num::NonZeroUsize; use log::{debug, info, trace}; use milli::documents::DocumentBatchReader; use milli::update::{ - DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, Setting, + DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, + Setting, }; use serde::{Deserialize, Serialize, Serializer}; use uuid::Uuid; @@ -178,7 +179,7 @@ impl Index { txn: &mut heed::RwTxn<'a, 'b>, primary_key: String, ) -> Result { - let mut builder = self.update_handler.update_builder().settings(txn, self); + let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref()); builder.set_primary_key(primary_key); builder.execute(|_| ())?; let meta = IndexMeta::new_txn(self, txn)?; @@ -197,10 +198,7 @@ impl Index { /// Deletes `ids` from the index, and returns how many documents were deleted. pub fn delete_documents(&self, ids: &[String]) -> Result { let mut txn = self.write_txn()?; - let mut builder = self - .update_handler - .update_builder() - .delete_documents(&mut txn, self)?; + let mut builder = milli::update::DeleteDocuments::new(&mut txn, self)?; // We ignore unexisting document ids ids.iter().for_each(|id| { @@ -216,11 +214,7 @@ impl Index { pub fn clear_documents(&self) -> Result<()> { let mut txn = self.write_txn()?; - self.update_handler - .update_builder() - .clear_documents(&mut txn, self) - .execute()?; - + milli::update::ClearDocuments::new(&mut txn, self).execute()?; txn.commit()?; Ok(()) @@ -229,9 +223,9 @@ impl Index { pub fn update_documents( &self, method: IndexDocumentsMethod, - content_uuid: Uuid, primary_key: Option, file_store: UpdateFileStore, + contents: impl IntoIterator, ) -> Result { trace!("performing document addition"); let mut txn = self.write_txn()?; @@ -242,17 +236,27 @@ impl Index { } } + let config = IndexDocumentsConfig { + update_method: method, + ..Default::default() + }; + let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); + let mut builder = milli::update::IndexDocuments::new( + &mut txn, + self, + self.indexer_config.as_ref(), + config, + indexing_callback, + ); - let content_file = file_store.get_update(content_uuid).unwrap(); - let reader = DocumentBatchReader::from_reader(content_file).unwrap(); + for content_uuid in contents.into_iter() { + let content_file = file_store.get_update(content_uuid)?; + let reader = DocumentBatchReader::from_reader(content_file)?; + builder.add_documents(reader)?; + } - let mut builder = self - .update_handler - .update_builder() - .index_documents(&mut txn, self); - builder.index_documents_method(method); - let addition = builder.execute(reader, indexing_callback)?; + let addition = builder.execute()?; txn.commit()?; @@ -264,10 +268,8 @@ impl Index { pub fn update_settings(&self, settings: &Settings) -> Result<()> { // We must use the write transaction of the update here. let mut txn = self.write_txn()?; - let mut builder = self - .update_handler - .update_builder() - .settings(&mut txn, self); + let mut builder = + milli::update::Settings::new(&mut txn, self, self.indexer_config.as_ref()); apply_settings_to_builder(settings, &mut builder); diff --git a/meilisearch-lib/src/index_controller/dump_actor/actor.rs b/meilisearch-lib/src/index_controller/dump_actor/actor.rs index aaf977df3..c9b871c0e 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/actor.rs @@ -10,7 +10,7 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use super::error::{DumpActorError, Result}; use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus}; -use crate::tasks::TaskStore; +use crate::tasks::Scheduler; use crate::update_file_store::UpdateFileStore; pub const CONCURRENT_DUMP_MSG: usize = 10; @@ -18,7 +18,7 @@ pub const CONCURRENT_DUMP_MSG: usize = 10; pub struct DumpActor { inbox: Option>, update_file_store: UpdateFileStore, - task_store: TaskStore, + scheduler: Arc>, dump_path: PathBuf, analytics_path: PathBuf, lock: Arc>, @@ -36,7 +36,7 @@ impl DumpActor { pub fn new( inbox: mpsc::Receiver, update_file_store: UpdateFileStore, - task_store: TaskStore, + scheduler: Arc>, dump_path: impl AsRef, analytics_path: impl AsRef, index_db_size: usize, @@ -46,7 +46,7 @@ impl DumpActor { let lock = Arc::new(Mutex::new(())); Self { inbox: Some(inbox), - task_store, + scheduler, update_file_store, dump_path: dump_path.as_ref().into(), analytics_path: analytics_path.as_ref().into(), @@ -118,13 +118,13 @@ impl DumpActor { dump_path: self.dump_path.clone(), db_path: self.analytics_path.clone(), update_file_store: self.update_file_store.clone(), - task_store: self.task_store.clone(), + scheduler: self.scheduler.clone(), uid: uid.clone(), update_db_size: self.update_db_size, index_db_size: self.index_db_size, }; - let task_result = tokio::task::spawn(task.run()).await; + let task_result = tokio::task::spawn_local(task.run()).await; let mut dump_infos = self.dump_infos.write().await; let dump_infos = dump_infos diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 2fcc34077..2c0f464d2 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -1,5 +1,6 @@ use std::fs::File; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::bail; use chrono::{DateTime, Utc}; @@ -12,7 +13,7 @@ use meilisearch_auth::AuthController; pub use message::DumpMsg; use tempfile::TempDir; use tokio::fs::create_dir_all; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, RwLock}; use crate::analytics; use crate::compression::{from_tar_gz, to_tar_gz}; @@ -20,7 +21,7 @@ use crate::index_controller::dump_actor::error::DumpActorError; use crate::index_controller::dump_actor::loaders::{v2, v3, v4}; use crate::options::IndexerOpts; use crate::tasks::task::Job; -use crate::tasks::TaskStore; +use crate::tasks::Scheduler; use crate::update_file_store::UpdateFileStore; use error::Result; @@ -319,7 +320,7 @@ struct DumpJob { dump_path: PathBuf, db_path: PathBuf, update_file_store: UpdateFileStore, - task_store: TaskStore, + scheduler: Arc>, uid: String, update_db_size: usize, index_db_size: usize, @@ -344,21 +345,28 @@ impl DumpJob { let (sender, receiver) = oneshot::channel(); - self.task_store - .register_job(Job::Dump { + self.scheduler + .write() + .await + .schedule_job(Job::Dump { ret: sender, path: temp_dump_path.clone(), }) .await; - receiver.await??; - self.task_store - .dump(&temp_dump_path, self.update_file_store.clone()) - .await?; + + // wait until the job has started performing before finishing the dump process + let sender = receiver.await??; AuthController::dump(&self.db_path, &temp_dump_path)?; + //TODO(marin): this is not right, the scheduler should dump itself, not do it here... + self.scheduler + .read() + .await + .dump(&temp_dump_path, self.update_file_store.clone()) + .await?; + let dump_path = tokio::task::spawn_blocking(move || -> Result { - let _ = &self; // for now we simply copy the updates/updates_files // FIXME: We may copy more files than necessary, if new files are added while we are // performing the dump. We need a way to filter them out. @@ -374,6 +382,9 @@ impl DumpJob { }) .await??; + // notify the update loop that we are finished performing the dump. + let _ = sender.send(()); + info!("Created dump in {:?}.", dump_path); Ok(()) @@ -382,19 +393,15 @@ impl DumpJob { #[cfg(test)] mod test { - use std::collections::HashSet; - - use futures::future::{err, ok}; use nelson::Mocker; use once_cell::sync::Lazy; - use uuid::Uuid; use super::*; - use crate::index::error::Result as IndexResult; - use crate::index::Index; use crate::index_resolver::error::IndexResolverError; - use crate::index_resolver::index_store::MockIndexStore; - use crate::index_resolver::meta_store::MockIndexMetaStore; + use crate::options::SchedulerConfig; + use crate::tasks::error::Result as TaskResult; + use crate::tasks::task::{Task, TaskId}; + use crate::tasks::{MockTaskPerformer, TaskFilter, TaskStore}; use crate::update_file_store::UpdateFileStore; fn setup() { @@ -411,86 +418,91 @@ mod test { } #[actix_rt::test] - #[ignore] async fn test_dump_normal() { setup(); let tmp = tempfile::tempdir().unwrap(); - let uuids = std::iter::repeat_with(Uuid::new_v4) - .take(4) - .collect::>(); - let mut uuid_store = MockIndexMetaStore::new(); - uuid_store - .expect_dump() - .once() - .returning(move |_| Box::pin(ok(()))); - - let mut index_store = MockIndexStore::new(); - index_store.expect_get().times(4).returning(move |uuid| { - let mocker = Mocker::default(); - let uuids_clone = uuids.clone(); - mocker.when::<(), Uuid>("uuid").once().then(move |_| { - assert!(uuids_clone.contains(&uuid)); - uuid - }); - mocker - .when::<&Path, IndexResult<()>>("dump") - .once() - .then(move |_| Ok(())); - Box::pin(ok(Some(Index::mock(mocker)))) - }); - let mocker = Mocker::default(); let update_file_store = UpdateFileStore::mock(mocker); - //let update_sender = - // create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap(); - - //TODO: fix dump tests + let mut performer = MockTaskPerformer::new(); + performer + .expect_process_job() + .once() + .returning(|j| match j { + Job::Dump { ret, .. } => { + let (sender, _receiver) = oneshot::channel(); + ret.send(Ok(sender)).unwrap(); + } + _ => unreachable!(), + }); + let performer = Arc::new(performer); let mocker = Mocker::default(); - let task_store = TaskStore::mock(mocker); + mocker + .when::<(&Path, UpdateFileStore), TaskResult<()>>("dump") + .then(|_| Ok(())); + mocker + .when::<(Option, Option, Option), TaskResult>>( + "list_tasks", + ) + .then(|_| Ok(Vec::new())); + let store = TaskStore::mock(mocker); + let config = SchedulerConfig::default(); + + let scheduler = Scheduler::new(store, performer, config).unwrap(); let task = DumpJob { dump_path: tmp.path().into(), // this should do nothing update_file_store, db_path: tmp.path().into(), - task_store, uid: String::from("test"), update_db_size: 4096 * 10, index_db_size: 4096 * 10, + scheduler, }; task.run().await.unwrap(); } #[actix_rt::test] - #[ignore] async fn error_performing_dump() { let tmp = tempfile::tempdir().unwrap(); - let mut uuid_store = MockIndexMetaStore::new(); - uuid_store - .expect_dump() - .once() - .returning(move |_| Box::pin(err(IndexResolverError::ExistingPrimaryKey))); - let mocker = Mocker::default(); let file_store = UpdateFileStore::mock(mocker); let mocker = Mocker::default(); + mocker + .when::<(Option, Option, Option), TaskResult>>( + "list_tasks", + ) + .then(|_| Ok(Vec::new())); let task_store = TaskStore::mock(mocker); + let mut performer = MockTaskPerformer::new(); + performer + .expect_process_job() + .once() + .returning(|job| match job { + Job::Dump { ret, .. } => drop(ret.send(Err(IndexResolverError::BadlyFormatted( + "blabla".to_string(), + )))), + _ => unreachable!(), + }); + let performer = Arc::new(performer); + + let scheduler = Scheduler::new(task_store, performer, SchedulerConfig::default()).unwrap(); let task = DumpJob { dump_path: tmp.path().into(), // this should do nothing db_path: tmp.path().into(), update_file_store: file_store, - task_store, uid: String::from("test"), update_db_size: 4096 * 10, index_db_size: 4096 * 10, + scheduler, }; assert!(task.run().await.is_err()); diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 5a8106fa8..5dfaae848 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -13,7 +13,7 @@ use futures::Stream; use futures::StreamExt; use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock}; use tokio::task::spawn_blocking; use tokio::time::sleep; use uuid::Uuid; @@ -23,12 +23,11 @@ use crate::index::{ Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, }; use crate::index_controller::dump_actor::{load_dump, DumpActor, DumpActorHandleImpl}; -use crate::options::IndexerOpts; +use crate::options::{IndexerOpts, SchedulerConfig}; use crate::snapshot::{load_snapshot, SnapshotService}; -use crate::tasks::create_task_store; use crate::tasks::error::TaskError; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; -use crate::tasks::{TaskFilter, TaskStore}; +use crate::tasks::{Scheduler, TaskFilter, TaskStore}; use error::Result; use self::dump_actor::{DumpActorHandle, DumpInfo}; @@ -68,6 +67,7 @@ pub struct IndexSettings { pub struct IndexController { index_resolver: Arc>, + scheduler: Arc>, task_store: TaskStore, dump_handle: dump_actor::DumpActorHandleImpl, update_file_store: UpdateFileStore, @@ -78,9 +78,10 @@ impl Clone for IndexController { fn clone(&self) -> Self { Self { index_resolver: self.index_resolver.clone(), - task_store: self.task_store.clone(), + scheduler: self.scheduler.clone(), dump_handle: self.dump_handle.clone(), update_file_store: self.update_file_store.clone(), + task_store: self.task_store.clone(), } } } @@ -160,6 +161,7 @@ impl IndexControllerBuilder { self, db_path: impl AsRef, indexer_options: IndexerOpts, + scheduler_config: SchedulerConfig, ) -> anyhow::Result { let index_size = self .max_index_size @@ -217,8 +219,9 @@ impl IndexControllerBuilder { update_file_store.clone(), )?); - let task_store = - create_task_store(meta_env, index_resolver.clone()).map_err(|e| anyhow::anyhow!(e))?; + let task_store = TaskStore::new(meta_env)?; + let scheduler = + Scheduler::new(task_store.clone(), index_resolver.clone(), scheduler_config)?; let dump_path = self .dump_dst @@ -229,14 +232,14 @@ impl IndexControllerBuilder { let actor = DumpActor::new( receiver, update_file_store.clone(), - task_store.clone(), + scheduler.clone(), dump_path, analytics_path, index_size, task_store_size, ); - tokio::task::spawn(actor.run()); + tokio::task::spawn_local(actor.run()); DumpActorHandleImpl { sender } }; @@ -255,17 +258,18 @@ impl IndexControllerBuilder { snapshot_path, index_size, meta_env_size: task_store_size, - task_store: task_store.clone(), + scheduler: scheduler.clone(), }; - tokio::task::spawn(snapshot_service.run()); + tokio::task::spawn_local(snapshot_service.run()); } Ok(IndexController { index_resolver, - task_store, + scheduler, dump_handle, update_file_store, + task_store, }) } @@ -415,12 +419,13 @@ where }; let task = self.task_store.register(uid, content).await?; + self.scheduler.read().await.notify(); Ok(task) } pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { - let task = self.task_store.get_task(id, filter).await?; + let task = self.scheduler.read().await.get_task(id, filter).await?; Ok(task) } @@ -435,7 +440,12 @@ where let mut filter = TaskFilter::default(); filter.filter_index(index_uid); - let task = self.task_store.get_task(task_id, Some(filter)).await?; + let task = self + .scheduler + .read() + .await + .get_task(task_id, Some(filter)) + .await?; Ok(task) } @@ -446,7 +456,12 @@ where limit: Option, offset: Option, ) -> Result> { - let tasks = self.task_store.list_tasks(offset, filter, limit).await?; + let tasks = self + .scheduler + .read() + .await + .list_tasks(offset, filter, limit) + .await?; Ok(tasks) } @@ -466,7 +481,9 @@ where filter.filter_index(index_uid); let tasks = self - .task_store + .scheduler + .read() + .await .list_tasks( Some(offset.unwrap_or_default() + task_id), Some(filter), @@ -547,10 +564,11 @@ where } pub async fn get_index_stats(&self, uid: String) -> Result { - let last_task = self.task_store.get_processing_task().await?; + let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?; // Check if the currently indexing update is from our index. - let is_indexing = last_task - .map(|task| task.index_uid.into_inner() == uid) + let is_indexing = processing_tasks + .first() + .map(|task| task.index_uid.as_str() == uid) .unwrap_or_default(); let index = self.index_resolver.get_index(uid).await?; @@ -564,7 +582,7 @@ where let mut last_task: Option> = None; let mut indexes = BTreeMap::new(); let mut database_size = 0; - let processing_task = self.task_store.get_processing_task().await?; + let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?; for (index_uid, index) in self.index_resolver.list().await? { if !search_rules.is_index_authorized(&index_uid) { @@ -584,8 +602,8 @@ where }); // Check if the currently indexing update is from our index. - stats.is_indexing = processing_task - .as_ref() + stats.is_indexing = processing_tasks + .first() .map(|p| p.index_uid.as_str() == index_uid) .or(Some(false)); @@ -637,16 +655,18 @@ mod test { impl IndexController { pub fn mock( - index_resolver: IndexResolver, + index_resolver: Arc>, task_store: TaskStore, update_file_store: UpdateFileStore, dump_handle: DumpActorHandleImpl, + scheduler: Arc>, ) -> Self { IndexController { - index_resolver: Arc::new(index_resolver), + index_resolver, task_store, dump_handle, update_file_store, + scheduler, } } } @@ -719,13 +739,27 @@ mod test { let task_store_mocker = nelson::Mocker::default(); let mocker = Mocker::default(); let update_file_store = UpdateFileStore::mock(mocker); - let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store.clone()); + let index_resolver = Arc::new(IndexResolver::new( + uuid_store, + index_store, + update_file_store.clone(), + )); let task_store = TaskStore::mock(task_store_mocker); - // let dump_actor = MockDumpActorHandle::new(); + let scheduler = Scheduler::new( + task_store.clone(), + index_resolver.clone(), + SchedulerConfig::default(), + ) + .unwrap(); let (sender, _) = mpsc::channel(1); let dump_handle = DumpActorHandleImpl { sender }; - let index_controller = - IndexController::mock(index_resolver, task_store, update_file_store, dump_handle); + let index_controller = IndexController::mock( + index_resolver, + task_store, + update_file_store, + dump_handle, + scheduler, + ); let r = index_controller .search(index_uid.to_owned(), query.clone()) diff --git a/meilisearch-lib/src/index_resolver/index_store.rs b/meilisearch-lib/src/index_resolver/index_store.rs index 91f520f1f..e4f58f130 100644 --- a/meilisearch-lib/src/index_resolver/index_store.rs +++ b/meilisearch-lib/src/index_resolver/index_store.rs @@ -1,14 +1,15 @@ use std::collections::HashMap; +use std::convert::TryFrom; use std::path::{Path, PathBuf}; use std::sync::Arc; +use milli::update::IndexerConfig; use tokio::fs; use tokio::sync::RwLock; use tokio::task::spawn_blocking; use uuid::Uuid; use super::error::{IndexResolverError, Result}; -use crate::index::update_handler::UpdateHandler; use crate::index::Index; use crate::options::IndexerOpts; @@ -26,7 +27,7 @@ pub struct MapIndexStore { index_store: AsyncMap, path: PathBuf, index_size: usize, - update_handler: Arc, + indexer_config: Arc, } impl MapIndexStore { @@ -35,14 +36,14 @@ impl MapIndexStore { index_size: usize, indexer_opts: &IndexerOpts, ) -> anyhow::Result { - let update_handler = Arc::new(UpdateHandler::new(indexer_opts)?); + let indexer_config = Arc::new(IndexerConfig::try_from(indexer_opts)?); let path = path.as_ref().join("indexes/"); let index_store = Arc::new(RwLock::new(HashMap::new())); Ok(Self { index_store, path, index_size, - update_handler, + indexer_config, }) } } @@ -63,7 +64,7 @@ impl IndexStore for MapIndexStore { } let index_size = self.index_size; - let update_handler = self.update_handler.clone(); + let update_handler = self.indexer_config.clone(); let index = spawn_blocking(move || -> Result { let index = Index::open(path, index_size, uuid, update_handler)?; Ok(index) @@ -88,7 +89,7 @@ impl IndexStore for MapIndexStore { } let index_size = self.index_size; - let update_handler = self.update_handler.clone(); + let update_handler = self.indexer_config.clone(); let index = spawn_blocking(move || Index::open(path, index_size, uuid, update_handler)) .await??; diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index c8b498d70..48201d39a 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -2,7 +2,7 @@ pub mod error; pub mod index_store; pub mod meta_store; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; use std::path::Path; use std::sync::Arc; @@ -12,16 +12,17 @@ use heed::Env; use index_store::{IndexStore, MapIndexStore}; use meilisearch_error::ResponseError; use meta_store::{HeedMetaStore, IndexMetaStore}; -use milli::update::DocumentDeletionResult; +use milli::update::{DocumentDeletionResult, IndexerConfig}; use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; use tokio::task::spawn_blocking; use uuid::Uuid; -use crate::index::{error::Result as IndexResult, update_handler::UpdateHandler, Index}; +use crate::index::{error::Result as IndexResult, Index}; use crate::options::IndexerOpts; use crate::tasks::batch::Batch; use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult}; -use crate::tasks::{Pending, TaskPerformer}; +use crate::tasks::TaskPerformer; use crate::update_file_store::UpdateFileStore; use self::meta_store::IndexMeta; @@ -96,14 +97,24 @@ where U: IndexMetaStore + Send + Sync + 'static, I: IndexStore + Send + Sync + 'static, { - type Error = ResponseError; + async fn process_batch(&self, mut batch: Batch) -> Batch { + // If a batch contains multiple tasks, then it must be a document addition batch + if let Some(Task { + content: TaskContent::DocumentAddition { .. }, + .. + }) = batch.tasks.first() + { + debug_assert!(batch.tasks.iter().all(|t| matches!( + t, + Task { + content: TaskContent::DocumentAddition { .. }, + .. + } + ))); - async fn process(&self, mut batch: Batch) -> Batch { - // Until batching is implemented, all batch should contain only one update. - debug_assert_eq!(batch.len(), 1); - - match batch.tasks.first_mut() { - Some(Pending::Task(task)) => { + self.process_document_addition_batch(batch).await + } else { + if let Some(task) = batch.tasks.first_mut() { task.events.push(TaskEvent::Processing(Utc::now())); match self.process_task(task).await { @@ -119,15 +130,12 @@ where }), } } - Some(Pending::Job(job)) => { - let job = std::mem::take(job); - self.process_job(job).await; - } - - None => (), + batch } + } - batch + async fn process_job(&self, job: Job) { + self.process_job(job).await; } async fn finish(&self, batch: &Batch) { @@ -158,9 +166,9 @@ impl IndexResolver { HeedMetaStore::load_dump(&src, env)?; let indexes_path = src.as_ref().join("indexes"); let indexes = indexes_path.read_dir()?; - let update_handler = UpdateHandler::new(indexer_opts)?; + let indexer_config = IndexerConfig::try_from(indexer_opts)?; for index in indexes { - Index::load_dump(&index?.path(), &dst, index_db_size, &update_handler)?; + Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?; } Ok(()) @@ -180,33 +188,100 @@ where } } - async fn process_task(&self, task: &Task) -> Result { - let index_uid = task.index_uid.clone(); - match &task.content { - TaskContent::DocumentAddition { - content_uuid, - merge_strategy, - primary_key, - allow_index_creation, + async fn process_document_addition_batch(&self, mut batch: Batch) -> Batch { + fn get_content_uuid(task: &Task) -> Uuid { + match task { + Task { + content: TaskContent::DocumentAddition { content_uuid, .. }, + .. + } => *content_uuid, + _ => panic!("unexpected task in the document addition batch"), + } + } + + let content_uuids = batch.tasks.iter().map(get_content_uuid).collect::>(); + + match batch.tasks.first() { + Some(Task { + index_uid, + id, + content: + TaskContent::DocumentAddition { + merge_strategy, + primary_key, + allow_index_creation, + .. + }, .. - } => { + }) => { let primary_key = primary_key.clone(); - let content_uuid = *content_uuid; let method = *merge_strategy; let index = if *allow_index_creation { - self.get_or_create_index(index_uid, task.id).await? + self.get_or_create_index(index_uid.clone(), *id).await } else { - self.get_index(index_uid.into_inner()).await? + self.get_index(index_uid.as_str().to_string()).await }; + + // If the index doesn't exist and we are not allowed to create it with the first + // task, we must fails the whole batch. + let now = Utc::now(); + let index = match index { + Ok(index) => index, + Err(e) => { + let error = ResponseError::from(e); + for task in batch.tasks.iter_mut() { + task.events.push(TaskEvent::Failed { + error: error.clone(), + timestamp: now, + }); + } + return batch; + } + }; + let file_store = self.file_store.clone(); let result = spawn_blocking(move || { - index.update_documents(method, content_uuid, primary_key, file_store) + index.update_documents( + method, + primary_key, + file_store, + content_uuids.into_iter(), + ) }) - .await??; + .await; - Ok(result.into()) + let event = match result { + Ok(Ok(result)) => TaskEvent::Succeded { + timestamp: Utc::now(), + result: TaskResult::DocumentAddition { + indexed_documents: result.indexed_documents, + }, + }, + Ok(Err(e)) => TaskEvent::Failed { + timestamp: Utc::now(), + error: e.into(), + }, + Err(e) => TaskEvent::Failed { + timestamp: Utc::now(), + error: IndexResolverError::from(e).into(), + }, + }; + + for task in batch.tasks.iter_mut() { + task.events.push(event.clone()); + } + + batch } + _ => panic!("invalid batch!"), + } + } + + async fn process_task(&self, task: &Task) -> Result { + let index_uid = task.index_uid.clone(); + match &task.content { + TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => { let ids = ids.clone(); let index = self.get_index(index_uid.into_inner()).await?; @@ -282,9 +357,13 @@ where Job::Dump { ret, path } => { log::trace!("The Dump task is getting executed"); - if ret.send(self.dump(path).await).is_err() { + let (sender, receiver) = oneshot::channel(); + if ret.send(self.dump(path).await.map(|_| sender)).is_err() { log::error!("The dump actor died."); } + + // wait until the dump has finished performing. + let _ = receiver.await; } Job::Empty => log::error!("Tried to process an empty task."), Job::Snapshot(job) => { @@ -404,7 +483,7 @@ where #[cfg(test)] mod test { - use std::collections::BTreeMap; + use std::{collections::BTreeMap, vec::IntoIter}; use super::*; @@ -447,7 +526,7 @@ mod test { mocker.when::>("update_primary_key") .then(move |_| Ok(IndexMeta{ created_at: Utc::now(), updated_at: Utc::now(), primary_key: None })); } - mocker.when::<(IndexDocumentsMethod, Uuid, Option, UpdateFileStore), IndexResult>("update_documents") + mocker.when::<(IndexDocumentsMethod, Option, UpdateFileStore, IntoIter), IndexResult>("update_documents") .then(move |(_, _, _, _)| result()); } TaskContent::SettingsUpdate{..} => { @@ -462,13 +541,13 @@ mod test { } TaskContent::DocumentDeletion(DocumentDeletion::Ids(_ids)) => { let result = move || if !index_op_fails { - Ok(any_int as u64) + Ok(DocumentDeletionResult { deleted_documents: any_int as u64, remaining_documents: any_int as u64 }) } else { // return this error because it's easy to generate... Err(IndexError::DocumentNotFound("a doc".into())) }; - mocker.when::<&[String], IndexResult>("delete_documents") + mocker.when::<&[String], IndexResult>("delete_documents") .then(move |_| result()); }, TaskContent::DocumentDeletion(DocumentDeletion::Clear) => { @@ -561,7 +640,8 @@ mod test { let update_file_store = UpdateFileStore::mock(mocker); let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store); - let result = index_resolver.process_task(&task).await; + let batch = Batch { id: 1, created_at: Utc::now(), tasks: vec![task.clone()] }; + let result = index_resolver.process_batch(batch).await; // Test for some expected output scenarios: // Index creation and deletion cannot fail because of a failed index op, since they @@ -575,9 +655,9 @@ mod test { | TaskContent::DocumentAddition { allow_index_creation: false, ..} | TaskContent::IndexUpdate { .. } )) { - assert!(result.is_err(), "{:?}", result); + assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result); } else { - assert!(result.is_ok(), "{:?}", result); + assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Succeded { .. }), "{:?}", result); } }); } diff --git a/meilisearch-lib/src/options.rs b/meilisearch-lib/src/options.rs index 0c2dea56a..d6657cae6 100644 --- a/meilisearch-lib/src/options.rs +++ b/meilisearch-lib/src/options.rs @@ -1,9 +1,10 @@ use core::fmt; -use std::{ops::Deref, str::FromStr}; +use std::{convert::TryFrom, ops::Deref, str::FromStr}; use byte_unit::{Byte, ByteError}; use clap::Parser; -use milli::CompressionType; +use milli::{update::IndexerConfig, CompressionType}; +use serde::Serialize; use sysinfo::{RefreshKind, System, SystemExt}; #[derive(Debug, Clone, Parser)] @@ -43,6 +44,52 @@ pub struct IndexerOpts { pub indexing_jobs: Option, } +#[derive(Debug, Clone, Parser, Default, Serialize)] +pub struct SchedulerConfig { + /// enable the autobatching experimental feature + #[clap(long, hide = true)] + pub enable_autobatching: bool, + + // The maximum number of updates of the same type that can be batched together. + // If unspecified, this is unlimited. A value of 0 is interpreted as 1. + #[clap(long, requires = "enable-autobatching", hide = true)] + pub max_batch_size: Option, + + // The maximum number of documents in a document batch. Since batches must contain at least one + // update for the scheduler to make progress, the number of documents in a batch will be at + // least the number of documents of its first update. + #[clap(long, requires = "enable-autobatching", hide = true)] + pub max_documents_per_batch: Option, + + /// Debounce duration in seconds + /// + /// When a new task is enqueued, the scheduler waits for `debounce_duration_sec` seconds for new updates before + /// starting to process a batch of updates. + #[clap(long, requires = "enable-autobatching", hide = true)] + pub debounce_duration_sec: Option, +} + +impl TryFrom<&IndexerOpts> for IndexerConfig { + type Error = anyhow::Error; + + fn try_from(other: &IndexerOpts) -> Result { + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(other.indexing_jobs.unwrap_or(num_cpus::get() / 2)) + .build()?; + + Ok(Self { + log_every_n: Some(other.log_every_n), + max_nb_chunks: other.max_nb_chunks, + max_memory: (*other.max_memory).map(|b| b.get_bytes() as usize), + chunk_compression_type: other.chunk_compression_type, + chunk_compression_level: other.chunk_compression_level, + thread_pool: Some(thread_pool), + max_positions_per_attributes: None, + ..Default::default() + }) + } +} + impl Default for IndexerOpts { fn default() -> Self { Self { diff --git a/meilisearch-lib/src/snapshot.rs b/meilisearch-lib/src/snapshot.rs index 2e34d5427..528c0f64a 100644 --- a/meilisearch-lib/src/snapshot.rs +++ b/meilisearch-lib/src/snapshot.rs @@ -1,17 +1,19 @@ use std::fs; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::time::Duration; use anyhow::bail; use fs_extra::dir::{self, CopyOptions}; use log::{info, trace}; +use tokio::sync::RwLock; use tokio::time::sleep; use walkdir::WalkDir; use crate::compression::from_tar_gz; use crate::index_controller::versioning::VERSION_FILE_NAME; use crate::tasks::task::Job; -use crate::tasks::TaskStore; +use crate::tasks::Scheduler; pub struct SnapshotService { pub(crate) db_path: PathBuf, @@ -19,7 +21,7 @@ pub struct SnapshotService { pub(crate) snapshot_path: PathBuf, pub(crate) index_size: usize, pub(crate) meta_env_size: usize, - pub(crate) task_store: TaskStore, + pub(crate) scheduler: Arc>, } impl SnapshotService { @@ -36,7 +38,7 @@ impl SnapshotService { index_size: self.index_size, }; let job = Job::Snapshot(snapshot_job); - self.task_store.register_job(job).await; + self.scheduler.write().await.schedule_job(job).await; sleep(self.snapshot_period).await; } diff --git a/meilisearch-lib/src/tasks/batch.rs b/meilisearch-lib/src/tasks/batch.rs index 92a1b2374..eff81acc5 100644 --- a/meilisearch-lib/src/tasks/batch.rs +++ b/meilisearch-lib/src/tasks/batch.rs @@ -1,14 +1,14 @@ use chrono::{DateTime, Utc}; -use super::{task::Task, task_store::Pending}; +use super::task::Task; -pub type BatchId = u32; +pub type BatchId = u64; #[derive(Debug)] pub struct Batch { pub id: BatchId, pub created_at: DateTime, - pub tasks: Vec>, + pub tasks: Vec, } impl Batch { diff --git a/meilisearch-lib/src/tasks/mod.rs b/meilisearch-lib/src/tasks/mod.rs index 9d6de324a..b56dfaf9d 100644 --- a/meilisearch-lib/src/tasks/mod.rs +++ b/meilisearch-lib/src/tasks/mod.rs @@ -1,47 +1,38 @@ -use std::sync::Arc; -use std::time::Duration; - use async_trait::async_trait; -use serde::{Deserialize, Serialize}; + +pub use scheduler::Scheduler; +pub use task_store::TaskFilter; #[cfg(test)] pub use task_store::test::MockTaskStore as TaskStore; #[cfg(not(test))] pub use task_store::TaskStore; -pub use task_store::{Pending, TaskFilter}; - use batch::Batch; use error::Result; -use scheduler::Scheduler; + +use self::task::Job; pub mod batch; pub mod error; -pub mod scheduler; +mod scheduler; pub mod task; mod task_store; +pub mod update_loop; #[cfg_attr(test, mockall::automock(type Error=test::DebugError;))] #[async_trait] pub trait TaskPerformer: Sync + Send + 'static { - type Error: Serialize + for<'de> Deserialize<'de> + std::error::Error + Sync + Send + 'static; /// Processes the `Task` batch returning the batch with the `Task` updated. - async fn process(&self, batch: Batch) -> Batch; + async fn process_batch(&self, batch: Batch) -> Batch; + + async fn process_job(&self, job: Job); + /// `finish` is called when the result of `process` has been commited to the task store. This /// method can be used to perform cleanup after the update has been completed for example. async fn finish(&self, batch: &Batch); } -pub fn create_task_store

(env: Arc, performer: Arc

) -> Result -where - P: TaskPerformer, -{ - let task_store = TaskStore::new(env)?; - let scheduler = Scheduler::new(task_store.clone(), performer, Duration::from_millis(1)); - tokio::task::spawn_local(scheduler.run()); - Ok(task_store) -} - #[cfg(test)] mod test { use serde::{Deserialize, Serialize}; diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 96ae56e6d..bbb9cb2e2 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -1,253 +1,526 @@ +use std::cmp::Ordering; +use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; +use std::ops::{Deref, DerefMut}; +use std::path::Path; use std::sync::Arc; use std::time::Duration; +use atomic_refcell::AtomicRefCell; use chrono::Utc; -use serde::{Deserialize, Serialize}; +use milli::update::IndexDocumentsMethod; +use tokio::sync::{watch, RwLock}; + +use crate::options::SchedulerConfig; +use crate::update_file_store::UpdateFileStore; use super::batch::Batch; use super::error::Result; -#[cfg(test)] -use super::task_store::test::MockTaskStore as TaskStore; -use super::task_store::Pending; -#[cfg(not(test))] -use super::task_store::TaskStore; -use super::TaskPerformer; -use crate::tasks::task::TaskEvent; +use super::task::{Job, Task, TaskContent, TaskEvent, TaskId}; +use super::update_loop::UpdateLoop; +use super::{TaskFilter, TaskPerformer, TaskStore}; -/// The scheduler roles is to perform batches of tasks one at a time. It will monitor the TaskStore -/// for new tasks, put them in a batch, and process the batch as soon as possible. -/// -/// When a batch is currently processing, the scheduler is just waiting. -pub struct Scheduler { - store: TaskStore, - performer: Arc

, - - /// The interval at which the the `TaskStore` should be checked for new updates - task_store_check_interval: Duration, +#[derive(Eq, Debug, Clone, Copy)] +enum TaskType { + DocumentAddition { number: usize }, + DocumentUpdate { number: usize }, + Other, } -impl

Scheduler

-where - P: TaskPerformer + Send + Sync + 'static, - P::Error: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, -{ - pub fn new(store: TaskStore, performer: Arc

, task_store_check_interval: Duration) -> Self { +/// Two tasks are equal if they have the same type. +impl PartialEq for TaskType { + fn eq(&self, other: &Self) -> bool { + matches!( + (self, other), + (Self::DocumentAddition { .. }, Self::DocumentAddition { .. }) + | (Self::DocumentUpdate { .. }, Self::DocumentUpdate { .. }) + ) + } +} + +#[derive(Eq, Debug, Clone, Copy)] +struct PendingTask { + kind: TaskType, + id: TaskId, +} + +impl PartialEq for PendingTask { + fn eq(&self, other: &Self) -> bool { + self.id.eq(&other.id) + } +} + +impl PartialOrd for PendingTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PendingTask { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id).reverse() + } +} + +#[derive(Debug)] +struct TaskList { + index: String, + tasks: BinaryHeap, +} + +impl Deref for TaskList { + type Target = BinaryHeap; + + fn deref(&self) -> &Self::Target { + &self.tasks + } +} + +impl DerefMut for TaskList { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.tasks + } +} + +impl TaskList { + fn new(index: String) -> Self { Self { - store, - performer, - task_store_check_interval, + index, + tasks: Default::default(), } } +} - pub async fn run(self) { - loop { - if let Err(e) = self.process_next_batch().await { - log::error!("an error occured while processing an update batch: {}", e); +impl PartialEq for TaskList { + fn eq(&self, other: &Self) -> bool { + self.index == other.index + } +} + +impl Eq for TaskList {} + +impl Ord for TaskList { + fn cmp(&self, other: &Self) -> Ordering { + match (self.peek(), other.peek()) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => lhs.cmp(rhs), + } + } +} + +impl PartialOrd for TaskList { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(Default)] +struct TaskQueue { + /// Maps index uids to their TaskList, for quick access + index_tasks: HashMap>>, + /// A queue that orders TaskList by the priority of their fist update + queue: BinaryHeap>>, +} + +impl TaskQueue { + fn insert(&mut self, task: Task) { + let uid = task.index_uid.into_inner(); + let id = task.id; + let kind = match task.content { + TaskContent::DocumentAddition { + documents_count, + merge_strategy: IndexDocumentsMethod::ReplaceDocuments, + .. + } => TaskType::DocumentAddition { + number: documents_count, + }, + TaskContent::DocumentAddition { + documents_count, + merge_strategy: IndexDocumentsMethod::UpdateDocuments, + .. + } => TaskType::DocumentUpdate { + number: documents_count, + }, + _ => TaskType::Other, + }; + let task = PendingTask { kind, id }; + + match self.index_tasks.entry(uid) { + Entry::Occupied(entry) => { + // A task list already exists for this index, all we have to to is to push the new + // update to the end of the list. This won't change the order since ids are + // monotically increasing. + let mut list = entry.get().borrow_mut(); + + // We only need the first element to be lower than the one we want to + // insert to preserve the order in the queue. + assert!(list.peek().map(|old_id| id >= old_id.id).unwrap_or(true)); + + list.push(task); + } + Entry::Vacant(entry) => { + let mut task_list = TaskList::new(entry.key().to_owned()); + task_list.push(task); + let task_list = Arc::new(AtomicRefCell::new(task_list)); + entry.insert(task_list.clone()); + self.queue.push(task_list); } } } - async fn process_next_batch(&self) -> Result<()> { - match self.prepare_batch().await? { - Some(mut batch) => { - for task in &mut batch.tasks { - match task { - Pending::Task(task) => task.events.push(TaskEvent::Processing(Utc::now())), - Pending::Job(_) => (), + /// Passes a context with a view to the task list of the next index to schedule. It is + /// guaranteed that the first id from task list will be the lowest pending task id. + fn head_mut(&mut self, mut f: impl FnMut(&mut TaskList) -> R) -> Option { + let head = self.queue.pop()?; + let result = { + let mut ref_head = head.borrow_mut(); + f(&mut *ref_head) + }; + if !head.borrow().tasks.is_empty() { + // After being mutated, the head is reinserted to the correct position. + self.queue.push(head); + } else { + self.index_tasks.remove(&head.borrow().index); + } + + Some(result) + } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() && self.index_tasks.is_empty() + } +} + +pub struct Scheduler { + jobs: VecDeque, + tasks: TaskQueue, + + store: TaskStore, + processing: Vec, + next_fetched_task_id: TaskId, + config: SchedulerConfig, + /// Notifies the update loop that a new task was received + notifier: watch::Sender<()>, +} + +impl Scheduler { + pub fn new

( + store: TaskStore, + performer: Arc

, + mut config: SchedulerConfig, + ) -> Result>> + where + P: TaskPerformer, + { + let (notifier, rcv) = watch::channel(()); + + let debounce_time = config.debounce_duration_sec; + + // Disable autobatching + if !config.enable_autobatching { + config.max_batch_size = Some(1); + } + + let this = Self { + jobs: VecDeque::new(), + tasks: TaskQueue::default(), + + store, + processing: Vec::new(), + next_fetched_task_id: 0, + config, + notifier, + }; + + // Notify update loop to start processing pending updates immediately after startup. + this.notify(); + + let this = Arc::new(RwLock::new(this)); + + let update_loop = UpdateLoop::new( + this.clone(), + performer, + debounce_time.filter(|&v| v > 0).map(Duration::from_secs), + rcv, + ); + + tokio::task::spawn_local(update_loop.run()); + + Ok(this) + } + + pub async fn dump(&self, path: &Path, file_store: UpdateFileStore) -> Result<()> { + self.store.dump(path, file_store).await + } + + fn register_task(&mut self, task: Task) { + assert!(!task.is_finished()); + self.tasks.insert(task); + } + + /// Clears the processing list, this method should be called when the processing of a batch is finished. + pub fn finish(&mut self) { + self.processing.clear(); + } + + pub fn notify(&self) { + let _ = self.notifier.send(()); + } + + fn notify_if_not_empty(&self) { + if !self.jobs.is_empty() || !self.tasks.is_empty() { + self.notify(); + } + } + + pub async fn update_tasks(&self, tasks: Vec) -> Result> { + self.store.update_tasks(tasks).await + } + + pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { + self.store.get_task(id, filter).await + } + + pub async fn list_tasks( + &self, + offset: Option, + filter: Option, + limit: Option, + ) -> Result> { + self.store.list_tasks(offset, filter, limit).await + } + + pub async fn get_processing_tasks(&self) -> Result> { + let mut tasks = Vec::new(); + + for id in self.processing.iter() { + let task = self.store.get_task(*id, None).await?; + tasks.push(task); + } + + Ok(tasks) + } + + pub async fn schedule_job(&mut self, job: Job) { + self.jobs.push_back(job); + self.notify(); + } + + async fn fetch_pending_tasks(&mut self) -> Result<()> { + // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. + // + // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. + let mut filter = TaskFilter::default(); + filter.filter_fn(|task| !task.is_finished()); + + self.store + .list_tasks(Some(self.next_fetched_task_id), Some(filter), None) + .await? + .into_iter() + // The tasks arrive in reverse order, and we need to insert them in order. + .rev() + .for_each(|t| { + self.next_fetched_task_id = t.id + 1; + self.register_task(t); + }); + + Ok(()) + } + + /// Prepare the next batch, and set `processing` to the ids in that batch. + pub async fn prepare(&mut self) -> Result { + // If there is a job to process, do it first. + if let Some(job) = self.jobs.pop_front() { + // There is more work to do, notify the update loop + self.notify_if_not_empty(); + return Ok(Pending::Job(job)); + } + // Try to fill the queue with pending tasks. + self.fetch_pending_tasks().await?; + + make_batch(&mut self.tasks, &mut self.processing, &self.config); + + log::debug!("prepared batch with {} tasks", self.processing.len()); + + if !self.processing.is_empty() { + let ids = std::mem::take(&mut self.processing); + + let (ids, mut tasks) = self.store.get_pending_tasks(ids).await?; + + // The batch id is the id of the first update it contains + let id = match tasks.first() { + Some(Task { id, .. }) => *id, + _ => panic!("invalid batch"), + }; + + tasks.iter_mut().for_each(|t| { + t.events.push(TaskEvent::Batched { + batch_id: id, + timestamp: Utc::now(), + }) + }); + + self.processing = ids; + + let batch = Batch { + id, + created_at: Utc::now(), + tasks, + }; + + // There is more work to do, notify the update loop + self.notify_if_not_empty(); + + Ok(Pending::Batch(batch)) + } else { + Ok(Pending::Nothing) + } + } +} + +#[derive(Debug)] +pub enum Pending { + Batch(Batch), + Job(Job), + Nothing, +} + +fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec, config: &SchedulerConfig) { + processing.clear(); + + let mut doc_count = 0; + tasks.head_mut(|list| match list.peek().copied() { + Some(PendingTask { + kind: TaskType::Other, + id, + }) => { + processing.push(id); + list.pop(); + } + Some(PendingTask { kind, .. }) => loop { + match list.peek() { + Some(pending) if pending.kind == kind => { + // We always need to process at least one task for the scheduler to make progress. + if processing.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) { + break; + } + let pending = list.pop().unwrap(); + processing.push(pending.id); + + // We add the number of documents to the count if we are scheduling document additions and + // stop adding if we already have enough. + // + // We check that bound only after adding the current task to the batch, so that a batch contains at least one task. + match pending.kind { + TaskType::DocumentUpdate { number } + | TaskType::DocumentAddition { number } => { + doc_count += number; + + if doc_count >= config.max_documents_per_batch.unwrap_or(usize::MAX) { + break; + } + } + _ => (), } } - - // the jobs are ignored - batch.tasks = self.store.update_tasks(batch.tasks).await?; - - let performer = self.performer.clone(); - let batch_result = performer.process(batch).await; - self.handle_batch_result(batch_result).await?; + _ => break, } - None => { - // No update found to create a batch we wait a bit before we retry. - tokio::time::sleep(self.task_store_check_interval).await; - } - } - - Ok(()) - } - - /// Checks for pending tasks and groups them in a batch. If there are no pending update, - /// return Ok(None) - /// - /// Until batching is properly implemented, the batches contain only one task. - async fn prepare_batch(&self) -> Result> { - match self.store.peek_pending_task().await { - Some(Pending::Task(next_task_id)) => { - let mut task = self.store.get_task(next_task_id, None).await?; - - task.events.push(TaskEvent::Batched { - timestamp: Utc::now(), - batch_id: 0, - }); - - let batch = Batch { - id: 0, - // index_uid: task.index_uid.clone(), - created_at: Utc::now(), - tasks: vec![Pending::Task(task)], - }; - Ok(Some(batch)) - } - Some(Pending::Job(job)) => Ok(Some(Batch { - id: 0, - created_at: Utc::now(), - tasks: vec![Pending::Job(job)], - })), - None => Ok(None), - } - } - - /// Handles the result from a batch processing. - /// - /// When a task is processed, the result of the processing is pushed to its event list. The - /// handle batch result make sure that the new state is save into its store. - /// The tasks are then removed from the processing queue. - async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> { - let tasks = self.store.update_tasks(batch.tasks).await?; - batch.tasks = tasks; - self.store.delete_pending(&batch.tasks[0]).await; - self.performer.finish(&batch).await; - Ok(()) - } + }, + None => (), + }); } #[cfg(test)] mod test { - use nelson::Mocker; + use milli::update::IndexDocumentsMethod; + use uuid::Uuid; - use crate::index_resolver::IndexUid; - use crate::tasks::task::Task; - use crate::tasks::task_store::TaskFilter; + use crate::{index_resolver::IndexUid, tasks::task::TaskContent}; - use super::super::task::{TaskContent, TaskEvent, TaskId, TaskResult}; - use super::super::MockTaskPerformer; use super::*; - #[tokio::test] - async fn test_prepare_batch_full() { - let mocker = Mocker::default(); - - mocker - .when::<(TaskId, Option), Result>>("get_task") - .once() - .then(|(id, _filter)| { - let task = Task { - id, - index_uid: IndexUid::new("Test".to_string()).unwrap(), - content: TaskContent::IndexDeletion, - events: vec![TaskEvent::Created(Utc::now())], - }; - Ok(Some(task)) - }); - - mocker - .when::<(), Option>>("peek_pending_task") - .then(|()| Some(Pending::Task(1))); - - let store = TaskStore::mock(mocker); - let performer = Arc::new(MockTaskPerformer::new()); - - let scheduler = Scheduler { - store, - performer, - task_store_check_interval: Duration::from_millis(1), - }; - - let batch = scheduler.prepare_batch().await.unwrap().unwrap(); - - assert_eq!(batch.tasks.len(), 1); - assert!( - matches!(batch.tasks[0], Pending::Task(Task { id: 1, .. })), - "{:?}", - batch.tasks[0] - ); - } - - #[tokio::test] - async fn test_prepare_batch_empty() { - let mocker = Mocker::default(); - mocker - .when::<(), Option>>("peek_pending_task") - .then(|()| None); - - let store = TaskStore::mock(mocker); - let performer = Arc::new(MockTaskPerformer::new()); - - let scheduler = Scheduler { - store, - performer, - task_store_check_interval: Duration::from_millis(1), - }; - - assert!(scheduler.prepare_batch().await.unwrap().is_none()); - } - - #[tokio::test] - async fn test_loop_run_normal() { - let mocker = Mocker::default(); - let mut id = Some(1); - mocker - .when::<(), Option>>("peek_pending_task") - .then(move |()| id.take().map(Pending::Task)); - mocker - .when::<(TaskId, Option), Result>("get_task") - .once() - .then(|(id, _)| { - let task = Task { - id, - index_uid: IndexUid::new("Test".to_string()).unwrap(), - content: TaskContent::IndexDeletion, - events: vec![TaskEvent::Created(Utc::now())], - }; - Ok(task) - }); - - mocker - .when::>, Result>>>("update_tasks") - .times(2) - .then(|tasks| { - assert_eq!(tasks.len(), 1); - Ok(tasks) - }); - - mocker.when::<(), ()>("delete_pending").once().then(|_| ()); - - let store = TaskStore::mock(mocker); - - let mut performer = MockTaskPerformer::new(); - performer.expect_process().once().returning(|mut batch| { - batch.tasks.iter_mut().for_each(|t| match t { - Pending::Task(Task { ref mut events, .. }) => events.push(TaskEvent::Succeded { - result: TaskResult::Other, - timestamp: Utc::now(), - }), - _ => panic!("expected a task, found a job"), - }); - - batch - }); - - performer.expect_finish().once().returning(|_| ()); - - let performer = Arc::new(performer); - - let scheduler = Scheduler { - store, - performer, - task_store_check_interval: Duration::from_millis(1), - }; - - let handle = tokio::spawn(scheduler.run()); - - if let Ok(r) = tokio::time::timeout(Duration::from_millis(100), handle).await { - r.unwrap(); + fn gen_task(id: TaskId, index_uid: &str, content: TaskContent) -> Task { + Task { + id, + index_uid: IndexUid::new_unchecked(index_uid.to_owned()), + content, + events: vec![], } } + + #[test] + fn register_updates_multiples_indexes() { + let mut queue = TaskQueue::default(); + queue.insert(gen_task(0, "test1", TaskContent::IndexDeletion)); + queue.insert(gen_task(1, "test2", TaskContent::IndexDeletion)); + queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion)); + queue.insert(gen_task(3, "test2", TaskContent::IndexDeletion)); + queue.insert(gen_task(4, "test1", TaskContent::IndexDeletion)); + queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion)); + queue.insert(gen_task(6, "test2", TaskContent::IndexDeletion)); + + let test1_tasks = queue + .head_mut(|tasks| tasks.drain().map(|t| t.id).collect::>()) + .unwrap(); + + assert_eq!(test1_tasks, &[0, 4, 5]); + + let test2_tasks = queue + .head_mut(|tasks| tasks.drain().map(|t| t.id).collect::>()) + .unwrap(); + + assert_eq!(test2_tasks, &[1, 2, 3, 6]); + + assert!(queue.index_tasks.is_empty()); + assert!(queue.queue.is_empty()); + } + + #[test] + fn test_make_batch() { + let mut queue = TaskQueue::default(); + let content = TaskContent::DocumentAddition { + content_uuid: Uuid::new_v4(), + merge_strategy: IndexDocumentsMethod::ReplaceDocuments, + primary_key: Some("test".to_string()), + documents_count: 0, + allow_index_creation: true, + }; + queue.insert(gen_task(0, "test1", content.clone())); + queue.insert(gen_task(1, "test2", content.clone())); + queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion)); + queue.insert(gen_task(3, "test2", content.clone())); + queue.insert(gen_task(4, "test1", content.clone())); + queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion)); + queue.insert(gen_task(6, "test2", content.clone())); + queue.insert(gen_task(7, "test1", content)); + + let mut batch = Vec::new(); + + let config = SchedulerConfig::default(); + make_batch(&mut queue, &mut batch, &config); + assert_eq!(batch, &[0, 4]); + + batch.clear(); + make_batch(&mut queue, &mut batch, &config); + assert_eq!(batch, &[1]); + + batch.clear(); + make_batch(&mut queue, &mut batch, &config); + assert_eq!(batch, &[2]); + + batch.clear(); + make_batch(&mut queue, &mut batch, &config); + assert_eq!(batch, &[3, 6]); + + batch.clear(); + make_batch(&mut queue, &mut batch, &config); + assert_eq!(batch, &[5]); + + batch.clear(); + make_batch(&mut queue, &mut batch, &config); + assert_eq!(batch, &[7]); + + assert!(queue.is_empty()); + } } diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index f5dd8d9be..f5d6687cd 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -97,7 +97,7 @@ impl Task { pub enum Job { Dump { #[derivative(PartialEq = "ignore")] - ret: oneshot::Sender>, + ret: oneshot::Sender, IndexResolverError>>, path: PathBuf, }, Snapshot(#[derivative(PartialEq = "ignore")] SnapshotJob), diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index d8e286ff3..88f12ddd1 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -1,7 +1,6 @@ mod store; -use std::cmp::Ordering; -use std::collections::{BinaryHeap, HashSet}; +use std::collections::HashSet; use std::io::{BufWriter, Write}; use std::path::Path; use std::sync::Arc; @@ -9,11 +8,9 @@ use std::sync::Arc; use chrono::Utc; use heed::{Env, RwTxn}; use log::debug; -use tokio::sync::RwLock; -use uuid::Uuid; use super::error::TaskError; -use super::task::{Job, Task, TaskContent, TaskId}; +use super::task::{Task, TaskContent, TaskId}; use super::Result; use crate::index_resolver::IndexUid; use crate::tasks::task::TaskEvent; @@ -25,9 +22,10 @@ pub use store::test::MockStore as Store; pub use store::Store; /// Defines constraints to be applied when querying for Tasks from the store. -#[derive(Default, Debug)] +#[derive(Default)] pub struct TaskFilter { indexes: Option>, + filter_fn: Option bool + Sync + Send + 'static>>, } impl TaskFilter { @@ -44,85 +42,28 @@ impl TaskFilter { .get_or_insert_with(Default::default) .insert(index); } -} -/// You can't clone a job because of its volatile nature. -/// If you need to take the `Job` with you though. You can call the method -/// `Pending::take`. It'll return the `Pending` as-is but `Empty` the original. -#[derive(Debug, PartialEq)] -pub enum Pending { - /// A task stored on disk that must be processed. - Task(T), - /// Job always have a higher priority over normal tasks and are not stored on disk. - /// It can be refered as `Volatile job`. - Job(Job), -} - -impl Pending { - /// Makes a copy of the task or take the content of the volatile job. - pub(crate) fn take(&mut self) -> Self { - match self { - Self::Task(id) => Self::Task(*id), - Self::Job(job) => Self::Job(job.take()), - } - } -} - -impl Eq for Pending {} - -impl PartialOrd for Pending { - fn partial_cmp(&self, other: &Self) -> Option { - match (self, other) { - // in case of two tasks we want to return the lowest taskId first. - (Pending::Task(lhs), Pending::Task(rhs)) => Some(lhs.cmp(rhs).reverse()), - // A job is always better than a task. - (Pending::Task(_), Pending::Job(_)) => Some(Ordering::Less), - (Pending::Job(_), Pending::Task(_)) => Some(Ordering::Greater), - // When there is two jobs we consider them equals. - (Pending::Job(_), Pending::Job(_)) => Some(Ordering::Equal), - } - } -} - -impl Pending { - pub fn get_content_uuid(&self) -> Option { - match self { - Pending::Task(task) => task.get_content_uuid(), - _ => None, - } - } -} - -impl Ord for Pending { - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap() + pub fn filter_fn(&mut self, f: impl Fn(&Task) -> bool + Sync + Send + 'static) { + self.filter_fn.replace(Box::new(f)); } } pub struct TaskStore { store: Arc, - pending_queue: Arc>>>, } impl Clone for TaskStore { fn clone(&self) -> Self { Self { store: self.store.clone(), - pending_queue: self.pending_queue.clone(), } } } impl TaskStore { pub fn new(env: Arc) -> Result { - let mut store = Store::new(env)?; - let unfinished_tasks = store.reset_and_return_unfinished_tasks()?; - let store = Arc::new(store); - - Ok(Self { - store, - pending_queue: Arc::new(RwLock::new(unfinished_tasks)), - }) + let store = Arc::new(Store::new(env)?); + Ok(Self { store }) } pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result { @@ -146,11 +87,6 @@ impl TaskStore { }) .await??; - self.pending_queue - .write() - .await - .push(Pending::Task(task.id)); - Ok(task) } @@ -159,35 +95,6 @@ impl TaskStore { Ok(()) } - /// Register an update that applies on multiple indexes. - /// Currently the update is considered as a priority. - pub async fn register_job(&self, content: Job) { - debug!("registering a job: {:?}", content); - self.pending_queue.write().await.push(Pending::Job(content)); - } - - /// Returns the next task to process. - pub async fn peek_pending_task(&self) -> Option> { - let mut pending_queue = self.pending_queue.write().await; - loop { - match pending_queue.peek()? { - Pending::Job(Job::Empty) => drop(pending_queue.pop()), - _ => return Some(pending_queue.peek_mut()?.take()), - } - } - } - - /// Returns the next task to process if there is one. - pub async fn get_processing_task(&self) -> Result> { - match self.peek_pending_task().await { - Some(Pending::Task(tid)) => { - let task = self.get_task(tid, None).await?; - Ok(matches!(task.events.last(), Some(TaskEvent::Processing(_))).then(|| task)) - } - _ => Ok(None), - } - } - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { let store = self.store.clone(); let task = tokio::task::spawn_blocking(move || -> Result<_> { @@ -207,17 +114,33 @@ impl TaskStore { } } - pub async fn update_tasks(&self, tasks: Vec>) -> Result>> { + pub async fn get_pending_tasks(&self, ids: Vec) -> Result<(Vec, Vec)> { + let store = self.store.clone(); + let tasks = tokio::task::spawn_blocking(move || -> Result<_> { + let mut tasks = Vec::new(); + let txn = store.rtxn()?; + + for id in ids.iter() { + let task = store + .get(&txn, *id)? + .ok_or(TaskError::UnexistingTask(*id))?; + tasks.push(task); + } + Ok((ids, tasks)) + }) + .await??; + + Ok(tasks) + } + + pub async fn update_tasks(&self, tasks: Vec) -> Result> { let store = self.store.clone(); let tasks = tokio::task::spawn_blocking(move || -> Result<_> { let mut txn = store.wtxn()?; for task in &tasks { - match task { - Pending::Task(task) => store.put(&mut txn, task)?, - Pending::Job(_) => (), - } + store.put(&mut txn, task)?; } txn.commit()?; @@ -229,21 +152,6 @@ impl TaskStore { Ok(tasks) } - /// Delete one task from the queue and remove all `Empty` job. - pub async fn delete_pending(&self, to_delete: &Pending) { - if let Pending::Task(Task { id: pending_id, .. }) = to_delete { - let mut pending_queue = self.pending_queue.write().await; - *pending_queue = std::mem::take(&mut *pending_queue) - .into_iter() - .filter(|pending| match pending { - Pending::Job(Job::Empty) => false, - Pending::Task(id) => pending_id != id, - _ => true, - }) - .collect::>>(); - } - } - pub async fn list_tasks( &self, offset: Option, @@ -348,23 +256,15 @@ pub mod test { Self::Mock(Arc::new(mocker)) } - pub async fn update_tasks(&self, tasks: Vec>) -> Result>> { + pub async fn update_tasks(&self, tasks: Vec) -> Result> { match self { Self::Real(s) => s.update_tasks(tasks).await, Self::Mock(m) => unsafe { - m.get::<_, Result>>>("update_tasks") - .call(tasks) + m.get::<_, Result>>("update_tasks").call(tasks) }, } } - pub async fn delete_pending(&self, to_delete: &Pending) { - match self { - Self::Real(s) => s.delete_pending(to_delete).await, - Self::Mock(m) => unsafe { m.get("delete_pending").call(to_delete) }, - } - } - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { match self { Self::Real(s) => s.get_task(id, filter).await, @@ -372,23 +272,13 @@ pub mod test { } } - pub async fn get_processing_task(&self) -> Result> { + pub async fn get_pending_tasks( + &self, + tasks: Vec, + ) -> Result<(Vec, Vec)> { match self { - Self::Real(s) => s.get_processing_task().await, - Self::Mock(m) => unsafe { - m.get::<_, Result>>("get_pending_task") - .call(()) - }, - } - } - - pub async fn peek_pending_task(&self) -> Option> { - match self { - Self::Real(s) => s.peek_pending_task().await, - Self::Mock(m) => unsafe { - m.get::<_, Option>>("peek_pending_task") - .call(()) - }, + Self::Real(s) => s.get_pending_tasks(tasks).await, + Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) }, } } @@ -400,14 +290,18 @@ pub mod test { ) -> Result> { match self { Self::Real(s) => s.list_tasks(from, filter, limit).await, - Self::Mock(_m) => todo!(), + Self::Mock(m) => unsafe { m.get("list_tasks").call((from, filter, limit)) }, } } - pub async fn dump(&self, path: &Path, update_file_store: UpdateFileStore) -> Result<()> { + pub async fn dump( + &self, + path: impl AsRef, + update_file_store: UpdateFileStore, + ) -> Result<()> { match self { Self::Real(s) => s.dump(path, update_file_store).await, - Self::Mock(_m) => todo!(), + Self::Mock(m) => unsafe { m.get("dump").call((path, update_file_store)) }, } } @@ -425,13 +319,6 @@ pub mod test { } } - pub async fn register_job(&self, content: Job) { - match self { - Self::Real(s) => s.register_job(content).await, - Self::Mock(_m) => todo!(), - } - } - pub fn load_dump(path: impl AsRef, env: Arc) -> anyhow::Result<()> { TaskStore::load_dump(path, env) } diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs index 49413f167..6032eec2c 100644 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ b/meilisearch-lib/src/tasks/task_store/store.rs @@ -19,7 +19,7 @@ use crate::tasks::task::{Task, TaskId}; use super::super::Result; -use super::{Pending, TaskFilter}; +use super::TaskFilter; enum IndexUidTaskIdCodec {} @@ -84,41 +84,6 @@ impl Store { }) } - /// This function should be called *right after* creating the store. - /// It put back all unfinished update in the `Created` state. This - /// allow us to re-enqueue an update that didn't had the time to finish - /// when Meilisearch closed. - pub fn reset_and_return_unfinished_tasks(&mut self) -> Result>> { - let mut unfinished_tasks: BinaryHeap> = BinaryHeap::new(); - - let mut wtxn = self.wtxn()?; - let mut iter = self.tasks.rev_iter_mut(&mut wtxn)?; - - while let Some(entry) = iter.next() { - let entry = entry?; - let (id, mut task): (BEU64, Task) = entry; - - // Since all tasks are ordered, we can stop iterating when we encounter our first non-finished task. - if task.is_finished() { - break; - } - - // we only keep the first state. It’s supposed to be a `Created` state. - task.events.drain(1..); - unfinished_tasks.push(Pending::Task(id.get())); - - // Since we own the id and the task this is a safe operation. - unsafe { - iter.put_current(&id, &task)?; - } - } - - drop(iter); - wtxn.commit()?; - - Ok(unfinished_tasks) - } - pub fn wtxn(&self) -> Result { Ok(self.env.write_txn()?) } @@ -166,7 +131,11 @@ impl Store { .map(|limit| (limit as u64).saturating_add(from)) .unwrap_or(u64::MAX); let iter: Box>> = match filter { - Some(filter) => { + Some( + ref filter @ TaskFilter { + indexes: Some(_), .. + }, + ) => { let iter = self .compute_candidates(txn, filter, range)? .into_iter() @@ -174,15 +143,24 @@ impl Store { Box::new(iter) } - None => Box::new( + _ => Box::new( self.tasks .rev_range(txn, &(BEU64::new(range.start)..BEU64::new(range.end)))? .map(|r| r.map(|(_, t)| t)), ), }; + let apply_fitler = |task: &StdResult<_, heed::Error>| match task { + Ok(ref t) => filter + .as_ref() + .and_then(|filter| filter.filter_fn.as_ref()) + .map(|f| f(t)) + .unwrap_or(true), + Err(_) => true, + }; // Collect 'limit' task if it exists or all of them. let tasks = iter + .filter(apply_fitler) .take(limit.unwrap_or(usize::MAX)) .try_fold::<_, _, StdResult<_, heed::Error>>(Vec::new(), |mut v, task| { v.push(task?); @@ -195,11 +173,11 @@ impl Store { fn compute_candidates( &self, txn: &heed::RoTxn, - filter: TaskFilter, + filter: &TaskFilter, range: Range, ) -> Result> { let mut candidates = BinaryHeap::new(); - if let Some(indexes) = filter.indexes { + if let Some(ref indexes) = filter.indexes { for index in indexes { // We need to prefix search the null terminated string to make sure that we only // get exact matches for the index, and not other uids that would share the same @@ -290,13 +268,6 @@ pub mod test { Ok(Self::Real(Store::new(env)?)) } - pub fn reset_and_return_unfinished_tasks(&mut self) -> Result>> { - match self { - MockStore::Real(index) => index.reset_and_return_unfinished_tasks(), - MockStore::Fake(_) => todo!(), - } - } - pub fn wtxn(&self) -> Result { match self { MockStore::Real(index) => index.wtxn(), diff --git a/meilisearch-lib/src/tasks/update_loop.rs b/meilisearch-lib/src/tasks/update_loop.rs new file mode 100644 index 000000000..5cdbf1b46 --- /dev/null +++ b/meilisearch-lib/src/tasks/update_loop.rs @@ -0,0 +1,107 @@ +use std::sync::Arc; +use std::time::Duration; + +use chrono::Utc; +use tokio::sync::{watch, RwLock}; +use tokio::time::interval_at; + +use super::batch::Batch; +use super::error::Result; +use super::scheduler::Pending; +use super::{Scheduler, TaskPerformer}; +use crate::tasks::task::TaskEvent; + +/// The update loop sequentially performs batches of updates by asking the scheduler for a batch, +/// and handing it to the `TaskPerformer`. +pub struct UpdateLoop { + scheduler: Arc>, + performer: Arc

, + + notifier: Option>, + debounce_duration: Option, +} + +impl

UpdateLoop

+where + P: TaskPerformer + Send + Sync + 'static, +{ + pub fn new( + scheduler: Arc>, + performer: Arc

, + debuf_duration: Option, + notifier: watch::Receiver<()>, + ) -> Self { + Self { + scheduler, + performer, + debounce_duration: debuf_duration, + notifier: Some(notifier), + } + } + + pub async fn run(mut self) { + let mut notifier = self.notifier.take().unwrap(); + + loop { + if notifier.changed().await.is_err() { + break; + } + + if let Some(t) = self.debounce_duration { + let mut interval = interval_at(tokio::time::Instant::now() + t, t); + interval.tick().await; + }; + + if let Err(e) = self.process_next_batch().await { + log::error!("an error occured while processing an update batch: {}", e); + } + } + } + + async fn process_next_batch(&self) -> Result<()> { + let pending = { self.scheduler.write().await.prepare().await? }; + match pending { + Pending::Batch(mut batch) => { + for task in &mut batch.tasks { + task.events.push(TaskEvent::Processing(Utc::now())); + } + + batch.tasks = { + self.scheduler + .read() + .await + .update_tasks(batch.tasks) + .await? + }; + + let performer = self.performer.clone(); + + let batch = performer.process_batch(batch).await; + + self.handle_batch_result(batch).await?; + } + Pending::Job(job) => { + let performer = self.performer.clone(); + performer.process_job(job).await; + } + Pending::Nothing => (), + } + + Ok(()) + } + + /// Handles the result from a processed batch. + /// + /// When a task is processed, the result of the process is pushed to its event list. The + /// `handle_batch_result` make sure that the new state is saved to the store. + /// The tasks are then removed from the processing queue. + async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> { + let mut scheduler = self.scheduler.write().await; + let tasks = scheduler.update_tasks(batch.tasks).await?; + scheduler.finish(); + drop(scheduler); + batch.tasks = tasks; + self.performer.finish(&batch).await; + Ok(()) + } +}