From 871222aebd4349d5fc989c66642bff41e373ad81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 19 Oct 2020 16:03:17 +0200 Subject: [PATCH] Introduce some new routes to handle live indexing --- Cargo.lock | 213 ++++++++++++++++++---------------------- Cargo.toml | 6 +- src/lib.rs | 1 + src/subcommand/serve.rs | 114 ++++++++++++++++++++- src/update_store.rs | 6 +- 5 files changed, 215 insertions(+), 125 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02aec6df6..daeb32d1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,6 +77,17 @@ dependencies = [ "warp", ] +[[package]] +name = "async-channel" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "atty" version = "0.2.11" @@ -191,9 +202,15 @@ checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" [[package]] name = "bytes" -version = "0.5.4" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" [[package]] name = "cast" @@ -258,12 +275,12 @@ dependencies = [ ] [[package]] -name = "cloudabi" -version = "0.1.0" +name = "concurrent-queue" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" dependencies = [ - "bitflags", + "cache-padded", ] [[package]] @@ -429,6 +446,12 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + [[package]] name = "fake-simd" version = "0.1.2" @@ -489,9 +512,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" [[package]] name = "futures" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +checksum = "5d8e3078b7b2a8a671cb7a3d17b4760e4181ea243227776ba83fd043b4ca034e" dependencies = [ "futures-channel", "futures-core", @@ -504,9 +527,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +checksum = "a7a4d35f7401e948629c9c3d6638fb9bf94e0b2121e96c3b428cc4e631f3eb74" dependencies = [ "futures-core", "futures-sink", @@ -514,15 +537,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +checksum = "d674eaa0056896d5ada519900dbf97ead2e46a7b6621e8160d79e2f2e1e2784b" [[package]] name = "futures-executor" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +checksum = "cc709ca1da6f66143b8c9bec8e6260181869893714e9b5a490b169b0414144ab" dependencies = [ "futures-core", "futures-task", @@ -531,15 +554,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" +checksum = "5fc94b64bb39543b4e432f1790b6bf18e3ee3b74653c5449f63310e9a74b123c" [[package]] name = "futures-macro" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +checksum = "f57ed14da4603b2554682e9f2ff3c65d7567b53188db96cb71538217fc64581b" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -549,24 +572,24 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" +checksum = "0d8764258ed64ebc5d9ed185cf86a95db5cac810269c5d20ececb32e0088abbd" [[package]] name = "futures-task" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +checksum = "4dd26820a9f3637f1302da8bceba3ff33adbe53464b54ca24d4e2d4f1db30f94" dependencies = [ "once_cell", ] [[package]] name = "futures-util" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +checksum = "8a894a0acddba51a2d49a6f4263b1e64b8c579ece8af50fa86503d52cd1eea34" dependencies = [ "futures-channel", "futures-core", @@ -646,7 +669,7 @@ dependencies = [ "indexmap", "log 0.4.11", "slab", - "tokio 0.2.21", + "tokio", "tokio-util", ] @@ -802,7 +825,7 @@ dependencies = [ "pin-project", "socket2", "time", - "tokio 0.2.21", + "tokio", "tower-service", "want", ] @@ -837,15 +860,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "instant" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63312a18f7ea8760cdd0a7c5aac1a619752a246b833545e3e36d1f81f7cd9e66" -dependencies = [ - "cfg-if 0.1.10", -] - [[package]] name = "iovec" version = "0.1.4" @@ -957,15 +971,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "lock_api" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28247cc5a5be2f05fbcd76dd0cf2c7d3b5400cb978a28042abcd4fa0b3f8261c" -dependencies = [ - "scopeguard", -] - [[package]] name = "log" version = "0.3.9" @@ -1002,6 +1007,16 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi 0.3.8", +] + [[package]] name = "memoffset" version = "0.5.4" @@ -1018,13 +1033,16 @@ dependencies = [ "anyhow", "askama", "askama_warp", + "async-channel", "bstr", "byteorder", + "bytes", "criterion", "crossbeam-channel", "csv", "flate2", "fst", + "futures", "fxhash", "grenad", "heed", @@ -1034,6 +1052,7 @@ dependencies = [ "levenshtein_automata", "linked-hash-map", "log 0.4.11", + "memmap", "near-proximity", "once_cell", "rayon", @@ -1047,7 +1066,7 @@ dependencies = [ "stderrlog", "structopt", "tempfile", - "tokio 0.3.0", + "tokio", "warp", ] @@ -1117,18 +1136,28 @@ dependencies = [ ] [[package]] -name = "mio" -version = "0.7.3" +name = "mio-named-pipes" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e53a6ea5f38c0a48ca42159868c6d8e1bd56c0451238856cc08d58563643bdc3" +checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" dependencies = [ - "libc", "log 0.4.11", + "mio", "miow 0.3.4", - "ntapi", "winapi 0.3.8", ] +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio", +] + [[package]] name = "miow" version = "0.2.1" @@ -1210,15 +1239,6 @@ dependencies = [ "version_check 0.9.2", ] -[[package]] -name = "ntapi" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a31937dea023539c72ddae0e3571deadc1414b300483fa7aaec176168cfa9d2" -dependencies = [ - "winapi 0.3.8", -] - [[package]] name = "num-integer" version = "0.1.43" @@ -1282,32 +1302,6 @@ dependencies = [ "winapi 0.3.8", ] -[[package]] -name = "parking_lot" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4893845fa2ca272e647da5d0e46660a314ead9c2fdd9a883aabc32e481a8733" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b" -dependencies = [ - "cfg-if 0.1.10", - "cloudabi 0.1.0", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi 0.3.8", -] - [[package]] name = "percent-encoding" version = "2.1.0" @@ -1355,18 +1349,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.17" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791" +checksum = "2ffbc8e94b38ea3d2d8ba92aea2983b503cd75d0888d75b86bb37970b5698e15" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.17" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40" +checksum = "65ad2ae56b6abe3a1ee25f15ee605bacadb9a764edaba9c2bf4103800d4a1895" dependencies = [ "proc-macro2", "quote", @@ -1449,9 +1443,9 @@ checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" [[package]] name = "proc-macro2" -version = "1.0.17" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1502d12e458c49a4c9cbff560d0fe0060c252bc29799ed94ca2ed4bb665a0101" +checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" dependencies = [ "unicode-xid", ] @@ -1591,7 +1585,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" dependencies = [ - "cloudabi 0.0.3", + "cloudabi", "fuchsia-cprng", "libc", "rand_core 0.4.2", @@ -1937,9 +1931,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.31" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5304cfdf27365b7585c25d4af91b35016ed21ef88f17ced89c7093b43dba8b6" +checksum = "ea9c5432ff16d6152371f808fb5a871cd67368171b09bb21b43df8e4a47a3556" dependencies = [ "proc-macro2", "quote", @@ -2079,27 +2073,12 @@ dependencies = [ "futures-core", "iovec", "lazy_static", - "memchr", - "mio 0.6.22", - "pin-project-lite", - "slab", -] - -[[package]] -name = "tokio" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7137dbb0abee577362ccdc7df21605cfcbb949243aeab47dac9ea6ef7d830e21" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "lazy_static", "libc", "memchr", - "mio 0.7.3", + "mio", + "mio-named-pipes", + "mio-uds", "num_cpus", - "parking_lot", "pin-project-lite", "signal-hook-registry", "slab", @@ -2109,9 +2088,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "0.3.0" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d48caa7b66c7a6ec943edf78d21a594fbeb24e536c781da67d5c32edec54103f" +checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ "proc-macro2", "quote", @@ -2127,7 +2106,7 @@ dependencies = [ "futures", "log 0.4.11", "pin-project", - "tokio 0.2.21", + "tokio", "tungstenite", ] @@ -2142,7 +2121,7 @@ dependencies = [ "futures-sink", "log 0.4.11", "pin-project-lite", - "tokio 0.2.21", + "tokio", ] [[package]] @@ -2330,7 +2309,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio 0.2.21", + "tokio", "tokio-tungstenite", "tower-service", "urlencoding", diff --git a/Cargo.toml b/Cargo.toml index 6cf5e0a51..d1e3d8343 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] anyhow = "1.0.28" +async-channel = "1.5.1" bstr = "0.2.13" byteorder = "1.3.4" crossbeam-channel = "0.5.0" @@ -19,6 +20,7 @@ human_format = "1.0.3" jemallocator = "0.3.2" levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } linked-hash-map = "0.5.3" +memmap = "0.7.0" near-proximity = { git = "https://github.com/Kerollmops/plane-sweep-proximity", rev = "6608205" } once_cell = "1.4.0" rayon = "1.3.1" @@ -41,8 +43,10 @@ stderrlog = "0.5.0" # http server askama = "0.10.1" askama_warp = "0.10.0" +bytes = "0.5.6" +futures = "0.3.6" serde = { version = "1.0", features = ["derive"] } -tokio = { version = "0.3.0", features = ["full"] } +tokio = { version = "0.2", features = ["full"] } warp = "0.2.2" [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 2be4a3b82..25a4f45a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ use heed::types::*; use heed::{PolyDatabase, Database}; use roaring::RoaringBitmap; +pub use self::update_store::UpdateStore; pub use self::search::{Search, SearchResult}; pub use self::criterion::{Criterion, default_criteria}; pub use self::heed_codec::{ diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 2f6940237..7dbfbd8c2 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -1,18 +1,25 @@ use std::collections::HashSet; -use std::fs::File; +use std::fs::{File, create_dir_all}; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; use std::time::Instant; use askama_warp::Template; +use futures::FutureExt; +use futures::StreamExt; use heed::EnvOpenOptions; use serde::Deserialize; use structopt::StructOpt; +use tokio::fs::File as TFile; +use tokio::io::AsyncWriteExt; +use warp::filters::ws::Message; use warp::{Filter, http::Response}; use crate::tokenizer::{simple_tokenizer, TokenType}; -use crate::{Index, SearchResult}; +use crate::{Index, UpdateStore, SearchResult}; #[derive(Debug, StructOpt)] /// The HTTP main server of the milli project. @@ -27,6 +34,11 @@ pub struct Opt { #[structopt(long = "db-size", default_value = "107374182400")] // 100 GB database_size: usize, + /// The maximum size the database that stores the updates can take on disk. It is recommended + /// to specify the whole disk space (value must be a multiple of a page size). + #[structopt(long = "udb-size", default_value = "10737418240")] // 10 GB + update_database_size: usize, + /// Disable document highlighting on the dashboard. #[structopt(long)] disable_highlighting: bool, @@ -84,6 +96,25 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { // Open the LMDB database. let index = Index::new(&env)?; + // Setup the LMDB based update database. + let mut update_store_options = EnvOpenOptions::new(); + update_store_options.map_size(opt.update_database_size); + + let update_store_path = opt.database.join("updates.mdb"); + create_dir_all(&update_store_path)?; + + let (update_status_sender, update_status_receiver) = async_channel::unbounded(); + let update_status_sender_cloned = update_status_sender.clone(); + let update_store = UpdateStore::open( + update_store_options, + update_store_path, + move |_uid, meta: String, _content| { + let _ = update_status_sender_cloned.try_send("processing update"); + std::thread::sleep(Duration::from_secs(3)); + let _ = update_status_sender_cloned.try_send("update processed"); + Ok(meta) + })?; + // Retrieve the database the file stem (w/o the extension), // the disk file size and the number of documents in the database. let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); @@ -212,6 +243,77 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .body(String::from_utf8(body).unwrap()) }); + async fn buf_stream( + update_store: Arc>, + update_status_sender: async_channel::Sender<&'static str>, + mut stream: impl futures::Stream> + Unpin, + ) -> Result + { + let file = tokio::task::block_in_place(tempfile::tempfile).unwrap(); + let mut file = TFile::from_std(file); + + while let Some(result) = stream.next().await { + let bytes = result.unwrap().to_bytes(); + file.write_all(&bytes[..]).await.unwrap(); + } + + let file = file.into_std().await; + let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; + + let meta = String::from("I am the metadata"); + let uid = update_store.register_update(&meta, &mmap[..]).unwrap(); + update_status_sender.try_send("registering update").unwrap(); + eprintln!("Registering update {}", uid); + + Ok(warp::reply()) + } + + let update_store_cloned = update_store.clone(); + let indexing_route = warp::filters::method::post() + .and(warp::path!("documents")) + .and(warp::body::stream()) + .and_then(move |stream| { + buf_stream(update_store_cloned.clone(), update_status_sender.clone(), stream) + }); + + let update_ws_route = warp::ws() + .and(warp::path!("updates" / "ws")) + .map(move |ws: warp::ws::Ws| { + // And then our closure will be called when it completes... + let update_status_receiver_cloned = update_status_receiver.clone(); + ws.on_upgrade(|websocket| { + // Just echo all updates messages... + update_status_receiver_cloned + .map(|msg| Ok(Message::text(msg))) + .forward(websocket) + .map(|result| { + if let Err(e) = result { + eprintln!("websocket error: {:?}", e); + } + }) + }) + }); + + let update_store_cloned = update_store.clone(); + let list_updates_route = warp::filters::method::get() + .and(warp::path!("updates")) + .map(move || { + let update_store = update_store_cloned.clone(); + let updates = update_store.iter_metas(|processed, pending| { + let mut updates = Vec::new(); + for result in processed { + let (id, _) = result?; + updates.push(format!("update {} processed", id.get())); + } + for result in pending { + let (id, _) = result?; + updates.push(format!("update {} pending", id.get())); + } + Ok(updates) + }).unwrap(); + Ok(warp::reply::json(&updates)) + }); + let routes = dash_html_route .or(dash_bulma_route) .or(dash_bulma_dark_route) @@ -222,10 +324,14 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .or(dash_script_route) .or(dash_logo_white_route) .or(dash_logo_black_route) - .or(query_route); + .or(query_route) + .or(indexing_route) + .or(update_ws_route) + .or(list_updates_route); let addr = SocketAddr::from_str(&opt.http_listen_addr)?; - tokio::runtime::Builder::new_multi_thread() + tokio::runtime::Builder::new() + .threaded_scheduler() .enable_all() .build()? .block_on(async { diff --git a/src/update_store.rs b/src/update_store.rs index 14566ed9f..564190af7 100644 --- a/src/update_store.rs +++ b/src/update_store.rs @@ -148,7 +148,7 @@ impl UpdateStore { } /// Execute the user defined function with both meta-store iterators, the first - /// iterator is the *pending* meta one and the secind is the *processed* meta one. + /// iterator is the *processed* meta one and the secind is the *pending* meta one. pub fn iter_metas(&self, mut f: F) -> heed::Result where M: for<'a> Deserialize<'a>, @@ -160,11 +160,11 @@ impl UpdateStore { let rtxn = self.env.read_txn()?; // We get both the pending and processed meta iterators. - let pending_iter = self.pending_meta.iter(&rtxn)?; let processed_iter = self.processed_meta.iter(&rtxn)?; + let pending_iter = self.pending_meta.iter(&rtxn)?; // We execute the user defined function with both iterators. - (f)(pending_iter, processed_iter) + (f)(processed_iter, pending_iter) } /// Returns the update associated meta or `None` if the update deosn't exist.