diff --git a/Cargo.lock b/Cargo.lock index da8d649dd..7f3b20e89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,9 +421,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.36" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68803225a7b13e47191bab76f2687382b60d259e8cf37f6e1893658b84bb9479" +checksum = "afddf7f520a80dbf76e6f50a35bca42a2331ef227a28b3b6dc5c2e2338d114b1" [[package]] name = "assert-json-diff" @@ -447,6 +447,27 @@ dependencies = [ "tokio 0.2.24", ] +[[package]] +name = "async-stream" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3670df70cbc01729f901f94c887814b3c68db038aad1329a418bae178bc5295c" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3548b8efc9f8e8a5a0a2808c5bd8451a9031b9e5b879a79590304ae928b0a70" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.42" @@ -609,9 +630,9 @@ dependencies = [ [[package]] name = "bstr" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "473fc6b38233f9af7baa94fb5852dca389e3d95b8e21c8e3719301462c5d9faf" +checksum = "a40b47ad93e1a5404e6c18dec46b628214fee441c70f4ab5d6942142cc268a3d" dependencies = [ "lazy_static", "memchr", @@ -641,10 +662,16 @@ dependencies = [ ] [[package]] -name = "byteorder" -version = "1.3.4" +name = "bytemuck" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" +checksum = "bed57e2090563b83ba8f83366628ce535a7584c9afa4c9fc0612a03925c6df58" + +[[package]] +name = "byteorder" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae44d1a3d5a19df61dd0c8beb138458ac2a53a7ac09eba97d55592540004306b" [[package]] name = "bytes" @@ -1022,15 +1049,15 @@ checksum = "0c122a393ea57648015bf06fbd3d372378992e86b9ff5a7a497b076a28c79efe" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall", + "redox_syscall 0.1.57", "winapi 0.3.9", ] [[package]] name = "flate2" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7411863d55df97a419aa64cb4d2f167103ea9d767e2c54a1868b7ac3f6b47129" +checksum = "cd3aec53de10fe96d7d8c565eb17f2c687bb5518a2ec453b5b1252964526abe0" dependencies = [ "cfg-if 1.0.0", "crc32fast", @@ -1665,9 +1692,9 @@ checksum = "b7282d924be3275cec7f6756ff4121987bc6481325397dde6ba3e7802b1a8b1c" [[package]] name = "linked-hash-map" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" +checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" [[package]] name = "lmdb-rkv-sys" @@ -1691,11 +1718,11 @@ dependencies = [ [[package]] name = "log" -version = "0.4.11" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", ] [[package]] @@ -1741,6 +1768,7 @@ dependencies = [ "anyhow", "assert-json-diff", "async-compression", + "async-stream", "async-trait", "byte-unit", "bytes 0.6.0", @@ -1853,7 +1881,7 @@ dependencies = [ "grenad", "heed", "human_format", - "itertools 0.9.0", + "itertools 0.10.0", "levenshtein_automata", "linked-hash-map", "log", @@ -2060,9 +2088,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "ordered-float" -version = "2.0.1" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dacdec97876ef3ede8c50efc429220641a0b11ba0048b4b0c357bccbc47c5204" +checksum = "766f840da25490628d8e63e529cd21c014f6600c6b8517add12a6fa6167a6218" dependencies = [ "num-traits", ] @@ -2097,7 +2125,7 @@ dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall", + "redox_syscall 0.1.57", "smallvec", "winapi 0.3.9", ] @@ -2496,10 +2524,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] -name = "regex" -version = "1.4.2" +name = "redox_syscall" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" +checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a" dependencies = [ "aho-corasick", "memchr", @@ -2518,9 +2555,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.21" +version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" +checksum = "b5eb417147ba9860a96cfe72a0b93bf88fee1744b5636ec99ab20c1aa9376581" [[package]] name = "remove_dir_all" @@ -2578,6 +2615,12 @@ dependencies = [ "quick-error", ] +[[package]] +name = "retain_mut" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53552c6c49e1e13f1a203ef0080ab3bbef0beb570a528993e83df057a9d9bba1" + [[package]] name = "ring" version = "0.16.19" @@ -2595,11 +2638,13 @@ dependencies = [ [[package]] name = "roaring" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d60b41c8f25d07cecab125cb46ebbf234fc055effc61ca2392a3ef4f9422304" +checksum = "c6744a4a918e91359ad1d356a91e2e943a86d9fb9ae77f715d617032ea2af88f" dependencies = [ + "bytemuck", "byteorder", + "retain_mut", ] [[package]] @@ -2721,18 +2766,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.118" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800" +checksum = "92d5161132722baa40d802cc70b15262b98258453e85e5d1d365c757c73869ae" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.118" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df" +checksum = "9391c295d64fc0abb2c556bad848f33cb8296276b1ad2677d1ae1ace4f258f31" dependencies = [ "proc-macro2", "quote", @@ -2741,9 +2786,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.60" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1500e84d27fe482ed1dc791a56eddc2f230046a040fa908c08bda1d9fb615779" +checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" dependencies = [ "indexmap", "itoa", @@ -2983,9 +3028,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.55" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a571a711dddd09019ccc628e1b17fe87c59b09d513c06c026877aa708334f37a" +checksum = "c700597eca8a5a762beb35753ef6b94df201c81cca676604f547495a0d7f0081" dependencies = [ "proc-macro2", "quote", @@ -3021,7 +3066,7 @@ checksum = "489997b7557e9a43e192c527face4feacc78bfbe6eed67fd55c4c9e381cba290" dependencies = [ "filetime", "libc", - "redox_syscall", + "redox_syscall 0.1.57", "xattr", ] @@ -3037,14 +3082,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.1.0" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "libc", - "rand 0.7.3", - "redox_syscall", + "rand 0.8.3", + "redox_syscall 0.2.5", "remove_dir_all", "winapi 0.3.9", ] diff --git a/Cargo.toml b/Cargo.toml index 126f859e7..aaf571ef3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ itertools = "0.10.0" either = "1.6.1" async-trait = "0.1.42" thiserror = "1.0.24" +async-stream = "0.3.0" [dependencies.sentry] default-features = false diff --git a/src/data/updates.rs b/src/data/updates.rs index a559d812a..7db04cd5d 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -23,10 +23,11 @@ impl Data { { let file = tempfile::tempfile_in(".")?; let mut file = File::from_std(file); - while let Some(Ok(bytes)) = stream.next().await { - file.write(bytes.as_ref()).await; + while let Some(item) = stream.next().await { + file.write_all(&item?).await?; } file.seek(std::io::SeekFrom::Start(0)).await?; + file.flush().await?; let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, file, primary_key).await?; Ok(update_status) } diff --git a/src/index_controller/actor_index_controller/index_actor.rs b/src/index_controller/actor_index_controller/index_actor.rs index 0cb057a9b..7100edef6 100644 --- a/src/index_controller/actor_index_controller/index_actor.rs +++ b/src/index_controller/actor_index_controller/index_actor.rs @@ -13,7 +13,7 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; use log::info; use crate::data::SearchQuery; -use futures::stream::{StreamExt, Stream}; +use futures::stream::StreamExt; use super::update_handler::UpdateHandler; use async_stream::stream; @@ -32,7 +32,7 @@ enum IndexMsg { } struct IndexActor { - inbox: mpsc::Receiver, + inbox: Option>, update_handler: Arc, store: S, } @@ -57,26 +57,31 @@ impl IndexActor { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options).unwrap(); let update_handler = Arc::new(update_handler); + let inbox = Some(inbox); Self { inbox, store, update_handler } } async fn run(mut self) { + let mut inbox = self.inbox.take().expect("Index Actor must have a inbox at this point."); + let stream = stream! { loop { - match self.inbox.recv().await { + match inbox.recv().await { Some(msg) => yield msg, None => break, } } }; - stream.for_each_concurent(Some(10), |msg| { + let fut = stream.for_each_concurrent(Some(10), |msg| async { match msg { - IndexMsg::CreateIndex { uuid, primary_key, ret } => self.handle_create_index(uuid, primary_key, ret), - IndexMsg::Update { ret, meta, data } => self.handle_update(meta, data, ret), - IndexMsg::Search { ret, query, uuid } => self.handle_search(uuid, query, ret), + IndexMsg::CreateIndex { uuid, primary_key, ret } => self.handle_create_index(uuid, primary_key, ret).await, + IndexMsg::Update { ret, meta, data } => self.handle_update(meta, data, ret).await, + IndexMsg::Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, } - }) + }); + + fut.await; } async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender>) { diff --git a/src/index_controller/actor_index_controller/update_actor.rs b/src/index_controller/actor_index_controller/update_actor.rs index d182ef1c8..3d783009f 100644 --- a/src/index_controller/actor_index_controller/update_actor.rs +++ b/src/index_controller/actor_index_controller/update_actor.rs @@ -41,7 +41,6 @@ impl UpdateActor { } async fn run(mut self) { - info!("started update actor."); loop { diff --git a/src/index_controller/actor_index_controller/update_store.rs b/src/index_controller/actor_index_controller/update_store.rs index 371ac7bd9..d14f47f05 100644 --- a/src/index_controller/actor_index_controller/update_store.rs +++ b/src/index_controller/actor_index_controller/update_store.rs @@ -1,6 +1,6 @@ use std::path::Path; use std::sync::{Arc, RwLock}; -use std::io::{Cursor, SeekFrom, Seek}; +use std::io::{Cursor, SeekFrom, Seek, Write}; use crossbeam_channel::Sender; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; @@ -192,7 +192,9 @@ where .replace(processing.clone()); let mut cursor = Cursor::new(first_content); let mut file = tempfile::tempfile()?; - std::io::copy(&mut cursor, &mut file)?; + let n = std::io::copy(&mut cursor, &mut file)?; + println!("copied count: {}", n); + file.flush()?; file.seek(SeekFrom::Start(0))?; // Process the pending update using the provided user function. let result = handler.handle_update(processing, file);