From e14640e530daca3fe9aeaf52255b5f6f4f0a25b9 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 14 Sep 2021 18:39:02 +0200 Subject: [PATCH] refactor meilisearch --- Cargo.lock | 104 ++++-- meilisearch-http/Cargo.toml | 5 +- meilisearch-http/src/data/mod.rs | 26 +- meilisearch-http/src/data/updates.rs | 56 +-- meilisearch-http/src/error.rs | 1 - meilisearch-http/src/index/dump.rs | 154 ++++----- meilisearch-http/src/index/mod.rs | 14 +- meilisearch-http/src/index/search.rs | 2 +- meilisearch-http/src/index/update_handler.rs | 27 +- meilisearch-http/src/index/updates.rs | 230 ++++++------- .../dump_actor/handle_impl.rs | 3 +- .../index_controller/dump_actor/loaders/v1.rs | 104 +++--- .../src/index_controller/index_actor/actor.rs | 10 +- .../index_actor/handle_impl.rs | 4 +- .../index_controller/index_actor/message.rs | 1 - .../src/index_controller/index_actor/mod.rs | 2 - .../src/index_controller/index_actor/store.rs | 10 +- meilisearch-http/src/index_controller/mod.rs | 299 +++++++++------- .../src/index_controller/snapshot.rs | 153 ++++----- .../index_controller/update_actor/actor.rs | 180 +++++----- .../update_actor/handle_impl.rs | 30 +- .../index_controller/update_actor/message.rs | 9 +- .../src/index_controller/update_actor/mod.rs | 23 +- .../update_actor/store/dump.rs | 128 +++---- .../update_actor/store/mod.rs | 149 ++++---- .../src/index_controller/update_file_store.rs | 63 ++++ .../src/index_controller/updates.rs | 24 +- meilisearch-http/src/main.rs | 37 +- meilisearch-http/src/option.rs | 28 +- .../src/routes/indexes/documents.rs | 132 +++---- meilisearch-http/src/routes/indexes/mod.rs | 36 +- .../src/routes/indexes/settings.rs | 325 +++++++++--------- meilisearch-http/src/routes/mod.rs | 19 +- 33 files changed, 1222 insertions(+), 1166 deletions(-) create mode 100644 meilisearch-http/src/index_controller/update_file_store.rs diff --git a/Cargo.lock b/Cargo.lock index 1f216ffc7..809535e1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -235,6 +235,15 @@ dependencies = [ "path-slash", ] +[[package]] +name = "addr2line" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61f2b7f93d2c7d2b08263acaa4a363b3e276806c68af6134c44f523bf1aacd" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -281,6 +290,9 @@ name = "anyhow" version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +dependencies = [ + "backtrace", +] [[package]] name = "arc-swap" @@ -346,6 +358,21 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backtrace" +version = "0.3.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01" +dependencies = [ + "addr2line", + "cc", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base-x" version = "0.2.8" @@ -358,6 +385,15 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "bimap" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07" +dependencies = [ + "serde", +] + [[package]] name = "bincode" version = "1.3.3" @@ -432,7 +468,6 @@ dependencies = [ "lazy_static", "memchr", "regex-automata", - "serde", ] [[package]] @@ -734,28 +769,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "csv" -version = "1.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" -dependencies = [ - "bstr", - "csv-core", - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" -dependencies = [ - "memchr", -] - [[package]] name = "derive_more" version = "0.99.16" @@ -1089,6 +1102,12 @@ dependencies = [ "syn 1.0.76", ] +[[package]] +name = "gimli" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" + [[package]] name = "git2" version = "0.13.22" @@ -1618,6 +1637,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "urlencoding", "uuid", "vergen", @@ -1670,14 +1690,15 @@ dependencies = [ [[package]] name = "milli" version = "0.13.1" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.13.1#90d64d257fa944ab2ee1572193e501bb231627c7" +source = "git+https://github.com/meilisearch/milli.git?rev=6de1b41#6de1b41f791e7d117634e63783d78b29b5228a99" dependencies = [ + "bimap", + "bincode", "bstr", "byteorder", "chrono", "concat-arrays", "crossbeam-channel", - "csv", "either", "flate2", "fst", @@ -1706,6 +1727,7 @@ dependencies = [ "smallvec", "tempfile", "uuid", + "vec-utils", ] [[package]] @@ -1827,6 +1849,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2" +dependencies = [ + "memchr", +] + [[package]] name = "obkv" version = "0.2.0" @@ -2367,6 +2398,12 @@ dependencies = [ "retain_mut", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustc_version" version = "0.2.3" @@ -2959,6 +2996,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.8" @@ -3126,6 +3174,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec-utils" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dac984aa016c26ef4ed7b2c30d6a1bd570fd40a078caccaf6415a2ac5d96161" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 7d5b92a87..02e72668b 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -25,7 +25,7 @@ zip = { version = "0.5.13", optional = true } actix-cors = { git = "https://github.com/MarinPostma/actix-extras.git", rev = "963ac94d" } actix-web = { version = "4.0.0-beta.9", features = ["rustls"] } actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "39d8006", optional = true } -anyhow = "1.0.43" +anyhow = { version = "1.0.43", features = ["backtrace"] } async-stream = "0.3.2" async-trait = "0.1.51" arc-swap = "1.3.2" @@ -48,7 +48,7 @@ main_error = "0.1.1" meilisearch-error = { path = "../meilisearch-error" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } memmap = "0.7.0" -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.13.1" } +milli = { git = "https://github.com/meilisearch/milli.git", rev = "6de1b41" } mime = "0.3.16" num_cpus = "1.13.0" once_cell = "1.8.0" @@ -75,6 +75,7 @@ whoami = { version = "1.1.3", optional = true } reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-features = false, optional = true } serdeval = "0.1.0" sysinfo = "0.20.2" +tokio-stream = "0.1.7" [dev-dependencies] actix-rt = "2.2.0" diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index 48dfcfa06..c0e83155c 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -5,7 +5,7 @@ use sha2::Digest; use crate::index::{Checked, Settings}; use crate::index_controller::{ - error::Result, DumpInfo, IndexController, IndexMetadata, IndexSettings, IndexStats, Stats, + error::Result, DumpInfo, IndexController, IndexMetadata, IndexStats, Stats, }; use crate::option::Opt; @@ -91,19 +91,19 @@ impl Data { self.index_controller.get_index(uid).await } - pub async fn create_index( - &self, - uid: String, - primary_key: Option, - ) -> Result { - let settings = IndexSettings { - uid: Some(uid), - primary_key, - }; + //pub async fn create_index( + //&self, + //uid: String, + //primary_key: Option, + //) -> Result { + //let settings = IndexSettings { + //uid: Some(uid), + //primary_key, + //}; - let meta = self.index_controller.create_index(settings).await?; - Ok(meta) - } + //let meta = self.index_controller.create_index(settings).await?; + //Ok(meta) + //} pub async fn get_index_stats(&self, uid: String) -> Result { Ok(self.index_controller.get_index_stats(uid).await?) diff --git a/meilisearch-http/src/data/updates.rs b/meilisearch-http/src/data/updates.rs index 4e38294e9..8228cd2b2 100644 --- a/meilisearch-http/src/data/updates.rs +++ b/meilisearch-http/src/data/updates.rs @@ -1,59 +1,11 @@ -use milli::update::{IndexDocumentsMethod, UpdateFormat}; - -use crate::extractors::payload::Payload; -use crate::index::{Checked, Settings}; +use crate::index_controller::Update; use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus}; use crate::Data; impl Data { - pub async fn add_documents( - &self, - index: String, - method: IndexDocumentsMethod, - format: UpdateFormat, - stream: Payload, - primary_key: Option, - ) -> Result { - let update_status = self - .index_controller - .add_documents(index, method, format, stream, primary_key) - .await?; - Ok(update_status) - } - - pub async fn update_settings( - &self, - index: String, - settings: Settings, - create: bool, - ) -> Result { - let update = self - .index_controller - .update_settings(index, settings, create) - .await?; - Ok(update) - } - - pub async fn clear_documents(&self, index: String) -> Result { - let update = self.index_controller.clear_documents(index).await?; - Ok(update) - } - - pub async fn delete_documents( - &self, - index: String, - document_ids: Vec, - ) -> Result { - let update = self - .index_controller - .delete_documents(index, document_ids) - .await?; - Ok(update) - } - - pub async fn delete_index(&self, index: String) -> Result<()> { - self.index_controller.delete_index(index).await?; - Ok(()) + pub async fn register_update(&self, index_uid: &str, update: Update) -> Result { + let status = self.index_controller.register_update(index_uid, update).await?; + Ok(status) } pub async fn get_update_status(&self, index: String, uid: u64) -> Result { diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index 2ec556de2..61b8dbcd9 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -86,7 +86,6 @@ impl ErrorCode for MilliError<'_> { milli::Error::UserError(ref error) => { match error { // TODO: wait for spec for new error codes. - UserError::Csv(_) | UserError::SerdeJson(_) | UserError::MaxDatabaseSizeReached | UserError::InvalidCriterionName { .. } diff --git a/meilisearch-http/src/index/dump.rs b/meilisearch-http/src/index/dump.rs index 7df704339..9c8acf960 100644 --- a/meilisearch-http/src/index/dump.rs +++ b/meilisearch-http/src/index/dump.rs @@ -1,20 +1,15 @@ -use std::fs::{create_dir_all, File}; -use std::io::{BufRead, BufReader, Write}; +use std::fs::File; +use std::io::Write; use std::path::Path; -use std::sync::Arc; -use anyhow::{bail, Context}; use heed::RoTxn; use indexmap::IndexMap; -use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream}; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use crate::option::IndexerOpts; use super::error::Result; -use super::{update_handler::UpdateHandler, Index, Settings, Unchecked}; +use super::{Index, Settings, Unchecked}; #[derive(Serialize, Deserialize)] struct DumpMeta { @@ -80,91 +75,92 @@ impl Index { } pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - size: usize, - indexing_options: &IndexerOpts, + _src: impl AsRef, + _dst: impl AsRef, + _size: usize, + _indexing_options: &IndexerOpts, ) -> anyhow::Result<()> { - let dir_name = src - .as_ref() - .file_name() - .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + //let dir_name = src + //.as_ref() + //.file_name() + //.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; - let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); - create_dir_all(&dst_dir_path)?; + //let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); + //create_dir_all(&dst_dir_path)?; - let meta_path = src.as_ref().join(META_FILE_NAME); - let mut meta_file = File::open(meta_path)?; + //let meta_path = src.as_ref().join(META_FILE_NAME); + //let mut meta_file = File::open(meta_path)?; - // We first deserialize the dump meta into a serde_json::Value and change - // the custom ranking rules settings from the old format to the new format. - let mut meta: Value = serde_json::from_reader(&mut meta_file)?; - if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { - convert_custom_ranking_rules(ranking_rules); - } + //// We first deserialize the dump meta into a serde_json::Value and change + //// the custom ranking rules settings from the old format to the new format. + //let mut meta: Value = serde_json::from_reader(&mut meta_file)?; + //if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { + //convert_custom_ranking_rules(ranking_rules); + //} - // Then we serialize it back into a vec to deserialize it - // into a `DumpMeta` struct with the newly patched `rankingRules` format. - let patched_meta = serde_json::to_vec(&meta)?; + //// Then we serialize it back into a vec to deserialize it + //// into a `DumpMeta` struct with the newly patched `rankingRules` format. + //let patched_meta = serde_json::to_vec(&meta)?; - let DumpMeta { - settings, - primary_key, - } = serde_json::from_slice(&patched_meta)?; - let settings = settings.check(); - let index = Self::open(&dst_dir_path, size)?; - let mut txn = index.write_txn()?; + //let DumpMeta { + //settings, + //primary_key, + //} = serde_json::from_slice(&patched_meta)?; + //let settings = settings.check(); + //let index = Self::open(&dst_dir_path, size)?; + //let mut txn = index.write_txn()?; - let handler = UpdateHandler::new(indexing_options)?; + //let handler = UpdateHandler::new(indexing_options)?; - index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?; + //index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?; - let document_file_path = src.as_ref().join(DATA_FILE_NAME); - let reader = File::open(&document_file_path)?; - let mut reader = BufReader::new(reader); - reader.fill_buf()?; + //let document_file_path = src.as_ref().join(DATA_FILE_NAME); + //let reader = File::open(&document_file_path)?; + //let mut reader = BufReader::new(reader); + //reader.fill_buf()?; // If the document file is empty, we don't perform the document addition, to prevent // a primary key error to be thrown. - if !reader.buffer().is_empty() { - index.update_documents_txn( - &mut txn, - JsonStream, - IndexDocumentsMethod::UpdateDocuments, - Some(reader), - handler.update_builder(0), - primary_key.as_deref(), - )?; - } - txn.commit()?; + todo!("fix obk document dumps") + //if !reader.buffer().is_empty() { + //index.update_documents_txn( + //&mut txn, + //IndexDocumentsMethod::UpdateDocuments, + //Some(reader), + //handler.update_builder(0), + //primary_key.as_deref(), + //)?; + //} - match Arc::try_unwrap(index.0) { - Ok(inner) => inner.prepare_for_closing().wait(), - Err(_) => bail!("Could not close index properly."), - } + //txn.commit()?; - Ok(()) + //match Arc::try_unwrap(index.0) { + //Ok(inner) => inner.prepare_for_closing().wait(), + //Err(_) => bail!("Could not close index properly."), + //} + + //Ok(()) } } -/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. -/// -/// This is done for compatibility reasons, and to avoid a new dump version, -/// since the new syntax was introduced soon after the new dump version. -fn convert_custom_ranking_rules(ranking_rules: &mut Value) { - *ranking_rules = match ranking_rules.take() { - Value::Array(values) => values - .into_iter() - .filter_map(|value| match value { - Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s) - .map(|f| format!("{}:asc", f)) - .map(Value::String), - Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s) - .map(|f| format!("{}:desc", f)) - .map(Value::String), - otherwise => Some(otherwise), - }) - .collect(), - otherwise => otherwise, - } -} +// /// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. +// /// +// /// This is done for compatibility reasons, and to avoid a new dump version, +// /// since the new syntax was introduced soon after the new dump version. +//fn convert_custom_ranking_rules(ranking_rules: &mut Value) { + //*ranking_rules = match ranking_rules.take() { + //Value::Array(values) => values + //.into_iter() + //.filter_map(|value| match value { + //Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s) + //.map(|f| format!("{}:asc", f)) + //.map(Value::String), + //Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s) + //.map(|f| format!("{}:desc", f)) + //.map(Value::String), + //otherwise => Some(otherwise), + //}) + //.collect(), + //otherwise => otherwise, + //} +//} diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index e4243aadc..1ea481ec9 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -15,6 +15,7 @@ pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_ pub use updates::{Checked, Facets, Settings, Unchecked}; use crate::helpers::EnvSizer; +use crate::index_controller::update_file_store::UpdateFileStore; use self::error::IndexError; @@ -28,23 +29,26 @@ mod updates; pub type Document = Map; #[derive(Clone)] -pub struct Index(pub Arc); +pub struct Index { + pub inner: Arc, + update_file_store: Arc, +} impl Deref for Index { type Target = milli::Index; fn deref(&self) -> &Self::Target { - self.0.as_ref() + self.inner.as_ref() } } impl Index { - pub fn open(path: impl AsRef, size: usize) -> Result { + pub fn open(path: impl AsRef, size: usize, update_file_store: Arc) -> Result { create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); - let index = milli::Index::new(options, &path)?; - Ok(Index(Arc::new(index))) + let inner = Arc::new(milli::Index::new(options, &path)?); + Ok(Index { inner, update_file_store }) } pub fn settings(&self) -> Result> { diff --git a/meilisearch-http/src/index/search.rs b/meilisearch-http/src/index/search.rs index 26eb816a0..c7949fea6 100644 --- a/meilisearch-http/src/index/search.rs +++ b/meilisearch-http/src/index/search.rs @@ -662,7 +662,7 @@ fn parse_filter_array( } } - Ok(FilterCondition::from_array(txn, &index.0, ands)?) + Ok(FilterCondition::from_array(txn, &index, ands)?) } #[cfg(test)] diff --git a/meilisearch-http/src/index/update_handler.rs b/meilisearch-http/src/index/update_handler.rs index f3977a00d..0ad71d313 100644 --- a/meilisearch-http/src/index/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -1,11 +1,9 @@ -use std::fs::File; - use crate::index::Index; use milli::update::UpdateBuilder; use milli::CompressionType; use rayon::ThreadPool; -use crate::index_controller::UpdateMeta; +use crate::index_controller::update_actor::RegisterUpdate; use crate::index_controller::{Failed, Processed, Processing}; use crate::option::IndexerOpts; @@ -54,31 +52,16 @@ impl UpdateHandler { pub fn handle_update( &self, - meta: Processing, - content: Option, index: Index, + meta: Processing, ) -> Result { - use UpdateMeta::*; - let update_id = meta.id(); - let update_builder = self.update_builder(update_id); let result = match meta.meta() { - DocumentsAddition { - method, - format, - primary_key, - } => index.update_documents( - *format, - *method, - content, - update_builder, - primary_key.as_deref(), - ), - ClearDocuments => index.clear_documents(update_builder), - DeleteDocuments { ids } => index.delete_documents(ids, update_builder), - Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), + RegisterUpdate::DocumentAddition { primary_key, content_uuid, method } => { + index.update_documents(*method, *content_uuid, update_builder, primary_key.as_deref()) + } }; match result { diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 924e6b1ef..6c7ae1416 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -1,17 +1,17 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::io; use std::marker::PhantomData; use std::num::NonZeroUsize; -use flate2::read::GzDecoder; use log::{debug, info, trace}; -use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat}; +use milli::documents::DocumentBatchReader; +use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder}; use serde::{Deserialize, Serialize, Serializer}; +use uuid::Uuid; use crate::index_controller::UpdateResult; -use super::error::Result; use super::Index; +use super::error::Result; fn serialize_with_wildcard( field: &Setting>, @@ -162,31 +162,23 @@ pub struct Facets { impl Index { pub fn update_documents( &self, - format: UpdateFormat, method: IndexDocumentsMethod, - content: Option, + content_uuid: Uuid, update_builder: UpdateBuilder, primary_key: Option<&str>, ) -> Result { let mut txn = self.write_txn()?; - let result = self.update_documents_txn( - &mut txn, - format, - method, - content, - update_builder, - primary_key, - )?; + let result = self.update_documents_txn(&mut txn, method, content_uuid, update_builder, primary_key)?; txn.commit()?; + Ok(result) } pub fn update_documents_txn<'a, 'b>( &'a self, txn: &mut heed::RwTxn<'a, 'b>, - format: UpdateFormat, method: IndexDocumentsMethod, - content: Option, + content_uuid: Uuid, update_builder: UpdateBuilder, primary_key: Option<&str>, ) -> Result { @@ -199,138 +191,132 @@ impl Index { builder.execute(|_, _| ())?; } - let mut builder = update_builder.index_documents(txn, self); - builder.update_format(format); - builder.index_documents_method(method); - let indexing_callback = |indexing_step, update_id| debug!("update {}: {:?}", update_id, indexing_step); - let gzipped = false; - let addition = match content { - Some(content) if gzipped => { - builder.execute(GzDecoder::new(content), indexing_callback)? - } - Some(content) => builder.execute(content, indexing_callback)?, - None => builder.execute(std::io::empty(), indexing_callback)?, - }; + let content_file = self.update_file_store.get_update(content_uuid).unwrap(); + let reader = DocumentBatchReader::from_reader(content_file).unwrap(); + + let mut builder = update_builder.index_documents(txn, self); + builder.index_documents_method(method); + let addition = builder.execute(reader, indexing_callback)?; info!("document addition done: {:?}", addition); Ok(UpdateResult::DocumentsAddition(addition)) } - pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result { - // We must use the write transaction of the update here. - let mut wtxn = self.write_txn()?; - let builder = update_builder.clear_documents(&mut wtxn, self); + //pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result { + //// We must use the write transaction of the update here. + //let mut wtxn = self.write_txn()?; + //let builder = update_builder.clear_documents(&mut wtxn, self); - let _count = builder.execute()?; + //let _count = builder.execute()?; - wtxn.commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into) - } + //wtxn.commit() + //.and(Ok(UpdateResult::Other)) + //.map_err(Into::into) + //} - pub fn update_settings_txn<'a, 'b>( - &'a self, - txn: &mut heed::RwTxn<'a, 'b>, - settings: &Settings, - update_builder: UpdateBuilder, - ) -> Result { - // We must use the write transaction of the update here. - let mut builder = update_builder.settings(txn, self); + //pub fn update_settings_txn<'a, 'b>( + //&'a self, + //txn: &mut heed::RwTxn<'a, 'b>, + //settings: &Settings, + //update_builder: UpdateBuilder, + //) -> Result { + //// We must use the write transaction of the update here. + //let mut builder = update_builder.settings(txn, self); - match settings.searchable_attributes { - Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), - Setting::Reset => builder.reset_searchable_fields(), - Setting::NotSet => (), - } + //match settings.searchable_attributes { + //Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), + //Setting::Reset => builder.reset_searchable_fields(), + //Setting::NotSet => (), + //} - match settings.displayed_attributes { - Setting::Set(ref names) => builder.set_displayed_fields(names.clone()), - Setting::Reset => builder.reset_displayed_fields(), - Setting::NotSet => (), - } + //match settings.displayed_attributes { + //Setting::Set(ref names) => builder.set_displayed_fields(names.clone()), + //Setting::Reset => builder.reset_displayed_fields(), + //Setting::NotSet => (), + //} - match settings.filterable_attributes { - Setting::Set(ref facets) => { - builder.set_filterable_fields(facets.clone().into_iter().collect()) - } - Setting::Reset => builder.reset_filterable_fields(), - Setting::NotSet => (), - } + //match settings.filterable_attributes { + //Setting::Set(ref facets) => { + //builder.set_filterable_fields(facets.clone().into_iter().collect()) + //} + //Setting::Reset => builder.reset_filterable_fields(), + //Setting::NotSet => (), + //} - match settings.sortable_attributes { - Setting::Set(ref fields) => { - builder.set_sortable_fields(fields.iter().cloned().collect()) - } - Setting::Reset => builder.reset_sortable_fields(), - Setting::NotSet => (), - } + //match settings.sortable_attributes { + //Setting::Set(ref fields) => { + //builder.set_sortable_fields(fields.iter().cloned().collect()) + //} + //Setting::Reset => builder.reset_sortable_fields(), + //Setting::NotSet => (), + //} - match settings.ranking_rules { - Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()), - Setting::Reset => builder.reset_criteria(), - Setting::NotSet => (), - } + //match settings.ranking_rules { + //Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()), + //Setting::Reset => builder.reset_criteria(), + //Setting::NotSet => (), + //} - match settings.stop_words { - Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()), - Setting::Reset => builder.reset_stop_words(), - Setting::NotSet => (), - } + //match settings.stop_words { + //Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()), + //Setting::Reset => builder.reset_stop_words(), + //Setting::NotSet => (), + //} - match settings.synonyms { - Setting::Set(ref synonyms) => { - builder.set_synonyms(synonyms.clone().into_iter().collect()) - } - Setting::Reset => builder.reset_synonyms(), - Setting::NotSet => (), - } + //match settings.synonyms { + //Setting::Set(ref synonyms) => { + //builder.set_synonyms(synonyms.clone().into_iter().collect()) + //} + //Setting::Reset => builder.reset_synonyms(), + //Setting::NotSet => (), + //} - match settings.distinct_attribute { - Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()), - Setting::Reset => builder.reset_distinct_field(), - Setting::NotSet => (), - } + //match settings.distinct_attribute { + //Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()), + //Setting::Reset => builder.reset_distinct_field(), + //Setting::NotSet => (), + //} - builder.execute(|indexing_step, update_id| { - debug!("update {}: {:?}", update_id, indexing_step) - })?; + //builder.execute(|indexing_step, update_id| { + //debug!("update {}: {:?}", update_id, indexing_step) + //})?; - Ok(UpdateResult::Other) - } + //Ok(UpdateResult::Other) + //} - pub fn update_settings( - &self, - settings: &Settings, - update_builder: UpdateBuilder, - ) -> Result { - let mut txn = self.write_txn()?; - let result = self.update_settings_txn(&mut txn, settings, update_builder)?; - txn.commit()?; - Ok(result) - } + //pub fn update_settings( + //&self, + //settings: &Settings, + //update_builder: UpdateBuilder, + //) -> Result { + //let mut txn = self.write_txn()?; + //let result = self.update_settings_txn(&mut txn, settings, update_builder)?; + //txn.commit()?; + //Ok(result) + //} - pub fn delete_documents( - &self, - document_ids: &[String], - update_builder: UpdateBuilder, - ) -> Result { - let mut txn = self.write_txn()?; - let mut builder = update_builder.delete_documents(&mut txn, self)?; + //pub fn delete_documents( + //&self, + //document_ids: &[String], + //update_builder: UpdateBuilder, + //) -> Result { + //let mut txn = self.write_txn()?; + //let mut builder = update_builder.delete_documents(&mut txn, self)?; - // We ignore unexisting document ids - document_ids.iter().for_each(|id| { - builder.delete_external_id(id); - }); + //// We ignore unexisting document ids + //document_ids.iter().for_each(|id| { + //builder.delete_external_id(id); + //}); - let deleted = builder.execute()?; - txn.commit() - .and(Ok(UpdateResult::DocumentDeletion { deleted })) - .map_err(Into::into) - } + //let deleted = builder.execute()?; + //txn.commit() + //.and(Ok(UpdateResult::DocumentDeletion { deleted })) + //.map_err(Into::into) + //} } #[cfg(test)] diff --git a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs index db11fb8fc..649d82405 100644 --- a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs @@ -1,6 +1,5 @@ use std::path::Path; -use actix_web::web::Bytes; use tokio::sync::{mpsc, oneshot}; use super::error::Result; @@ -32,7 +31,7 @@ impl DumpActorHandleImpl { pub fn new( path: impl AsRef, uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, - update: crate::index_controller::update_actor::UpdateActorHandleImpl, + update: crate::index_controller::update_actor::UpdateActorHandleImpl, index_db_size: usize, update_db_size: usize, ) -> anyhow::Result { diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs index 997fd2801..21893eb49 100644 --- a/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs @@ -1,20 +1,16 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::fs::{create_dir_all, File}; -use std::io::BufRead; use std::marker::PhantomData; use std::path::Path; -use std::sync::Arc; -use heed::EnvOpenOptions; use log::{error, info, warn}; -use milli::update::{IndexDocumentsMethod, Setting, UpdateFormat}; +use milli::update::Setting; use serde::{Deserialize, Deserializer, Serialize}; use uuid::Uuid; use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata}; use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use crate::{ - index::{update_handler::UpdateHandler, Index, Unchecked}, + index::Unchecked, option::IndexerOpts, }; @@ -86,57 +82,57 @@ struct Settings { } fn load_index( - src: impl AsRef, - dst: impl AsRef, - uuid: Uuid, - primary_key: Option<&str>, - size: usize, - indexer_options: &IndexerOpts, + _src: impl AsRef, + _dst: impl AsRef, + _uuid: Uuid, + _primary_key: Option<&str>, + _size: usize, + _indexer_options: &IndexerOpts, ) -> anyhow::Result<()> { - let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid)); + todo!("fix dump obkv documents") + //let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid)); - create_dir_all(&index_path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, index_path)?; - let index = Index(Arc::new(index)); + //create_dir_all(&index_path)?; + //let mut options = EnvOpenOptions::new(); + //options.map_size(size); + //let index = milli::Index::new(options, index_path)?; + //let index = Index(Arc::new(index)); - // extract `settings.json` file and import content - let settings = import_settings(&src)?; - let settings: index_controller::Settings = settings.into(); + //// extract `settings.json` file and import content + //let settings = import_settings(&src)?; + //let settings: index_controller::Settings = settings.into(); - let mut txn = index.write_txn()?; + //let mut txn = index.write_txn()?; - let handler = UpdateHandler::new(indexer_options)?; + //let handler = UpdateHandler::new(indexer_options)?; - index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?; + //index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?; - let file = File::open(&src.as_ref().join("documents.jsonl"))?; - let mut reader = std::io::BufReader::new(file); - reader.fill_buf()?; - if !reader.buffer().is_empty() { - index.update_documents_txn( - &mut txn, - UpdateFormat::JsonStream, - IndexDocumentsMethod::ReplaceDocuments, - Some(reader), - handler.update_builder(0), - primary_key, - )?; - } + //let file = File::open(&src.as_ref().join("documents.jsonl"))?; + //let mut reader = std::io::BufReader::new(file); + //reader.fill_buf()?; + //if !reader.buffer().is_empty() { + //index.update_documents_txn( + //&mut txn, + //IndexDocumentsMethod::ReplaceDocuments, + //Some(reader), + //handler.update_builder(0), + //primary_key, + //)?; + //} - txn.commit()?; + //txn.commit()?; - // Finaly, we extract the original milli::Index and close it - Arc::try_unwrap(index.0) - .map_err(|_e| "Couldn't close the index properly") - .unwrap() - .prepare_for_closing() - .wait(); + //// Finaly, we extract the original milli::Index and close it + //Arc::try_unwrap(index.0) + //.map_err(|_e| "Couldn't close the index properly") + //.unwrap() + //.prepare_for_closing() + //.wait(); - // Updates are ignored in dumps V1. + //// Updates are ignored in dumps V1. - Ok(()) + //Ok(()) } /// we need to **always** be able to convert the old settings to the settings currently being used @@ -203,15 +199,15 @@ impl From for index_controller::Settings { } } -/// Extract Settings from `settings.json` file present at provided `dir_path` -fn import_settings(dir_path: impl AsRef) -> anyhow::Result { - let path = dir_path.as_ref().join("settings.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; +// /// Extract Settings from `settings.json` file present at provided `dir_path` +//fn import_settings(dir_path: impl AsRef) -> anyhow::Result { + //let path = dir_path.as_ref().join("settings.json"); + //let file = File::open(path)?; + //let reader = std::io::BufReader::new(file); + //let metadata = serde_json::from_reader(reader)?; - Ok(metadata) -} + //Ok(metadata) +//} #[cfg(test)] mod test { diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index fc40a5090..abc08788e 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::path::PathBuf; use std::sync::Arc; @@ -39,6 +38,7 @@ impl IndexActor { let update_handler = UpdateHandler::new(options)?; let update_handler = Arc::new(update_handler); let receiver = Some(receiver); + Ok(Self { receiver, update_handler, @@ -82,10 +82,9 @@ impl IndexActor { Update { ret, meta, - data, uuid, } => { - let _ = ret.send(self.handle_update(uuid, meta, data).await); + let _ = ret.send(self.handle_update(uuid, meta).await); } Search { ret, query, uuid } => { let _ = ret.send(self.handle_search(uuid, query).await); @@ -165,7 +164,6 @@ impl IndexActor { &self, uuid: Uuid, meta: Processing, - data: Option, ) -> Result> { debug!("Processing update {}", meta.id()); let update_handler = self.update_handler.clone(); @@ -174,7 +172,7 @@ impl IndexActor { None => self.store.create(uuid, None).await?, }; - Ok(spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?) + Ok(spawn_blocking(move || update_handler.handle_update(index, meta)).await?) } async fn handle_settings(&self, uuid: Uuid) -> Result> { @@ -230,7 +228,7 @@ impl IndexActor { if let Some(index) = index { tokio::task::spawn(async move { - let index = index.0; + let index = index.inner; let store = get_arc_ownership_blocking(index).await; spawn_blocking(move || { store.prepare_for_closing().wait(); diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index ceb2a8226..efc104c54 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -38,13 +38,11 @@ impl IndexActorHandle for IndexActorHandleImpl { &self, uuid: Uuid, meta: Processing, - data: Option, ) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Update { ret, meta, - data, uuid, }; let _ = self.sender.send(msg).await; @@ -156,7 +154,7 @@ impl IndexActorHandleImpl { ) -> anyhow::Result { let (sender, receiver) = mpsc::channel(100); - let store = MapIndexStore::new(path, index_size); + let store = MapIndexStore::new(&path, index_size); let actor = IndexActor::new(receiver, store, options)?; tokio::task::spawn(actor.run()); Ok(Self { sender }) diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 415b90e4b..1b93ec34f 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -19,7 +19,6 @@ pub enum IndexMsg { Update { uuid: Uuid, meta: Processing, - data: Option, ret: oneshot::Sender>>, }, Search { diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index faad75e01..bf5833222 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::path::PathBuf; use chrono::{DateTime, Utc}; @@ -59,7 +58,6 @@ pub trait IndexActorHandle { &self, uuid: Uuid, meta: Processing, - data: Option, ) -> Result>; async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result; async fn settings(&self, uuid: Uuid) -> Result>; diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 2cfda61b5..252271d51 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -10,6 +10,7 @@ use uuid::Uuid; use super::error::{IndexActorError, Result}; use crate::index::Index; +use crate::index_controller::update_file_store::UpdateFileStore; type AsyncMap = Arc>>; @@ -24,16 +25,19 @@ pub struct MapIndexStore { index_store: AsyncMap, path: PathBuf, index_size: usize, + update_file_store: Arc, } impl MapIndexStore { pub fn new(path: impl AsRef, index_size: usize) -> Self { + let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap()); let path = path.as_ref().join("indexes/"); let index_store = Arc::new(RwLock::new(HashMap::new())); Self { index_store, path, index_size, + update_file_store, } } } @@ -54,8 +58,9 @@ impl IndexStore for MapIndexStore { } let index_size = self.index_size; + let file_store = self.update_file_store.clone(); let index = spawn_blocking(move || -> Result { - let index = Index::open(path, index_size)?; + let index = Index::open(path, index_size, file_store)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; @@ -87,7 +92,8 @@ impl IndexStore for MapIndexStore { } let index_size = self.index_size; - let index = spawn_blocking(move || Index::open(path, index_size)).await??; + let file_store = self.update_file_store.clone(); + let index = spawn_blocking(move || Index::open(path, index_size, file_store)).await??; self.index_store.write().await.insert(uuid, index.clone()); Ok(Some(index)) } diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 4565a1dd0..88a219530 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -1,42 +1,43 @@ use std::collections::BTreeMap; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use actix_web::web::Bytes; +use actix_web::error::PayloadError; +use bytes::Bytes; use chrono::{DateTime, Utc}; -use futures::stream::StreamExt; -use log::error; +use futures::Stream; use log::info; use milli::FieldDistribution; +use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; use tokio::time::sleep; use uuid::Uuid; use dump_actor::DumpActorHandle; pub use dump_actor::{DumpInfo, DumpStatus}; use index_actor::IndexActorHandle; -use snapshot::{load_snapshot, SnapshotService}; +use snapshot::load_snapshot; use update_actor::UpdateActorHandle; pub use updates::*; use uuid_resolver::{error::UuidResolverError, UuidResolverHandle}; -use crate::extractors::payload::Payload; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::option::Opt; use error::Result; use self::dump_actor::load_dump; -use self::error::IndexControllerError; mod dump_actor; pub mod error; pub mod index_actor; mod snapshot; -mod update_actor; +pub mod update_actor; mod updates; mod uuid_resolver; +pub mod update_file_store; + +pub type Payload = Box> + Send + Sync + 'static + Unpin>; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -72,10 +73,15 @@ pub struct IndexStats { pub struct IndexController { uuid_resolver: uuid_resolver::UuidResolverHandleImpl, index_handle: index_actor::IndexActorHandleImpl, - update_handle: update_actor::UpdateActorHandleImpl, + update_handle: update_actor::UpdateActorHandleImpl, dump_handle: dump_actor::DumpActorHandleImpl, } +pub enum DocumentAdditionFormat { + Json, +} + + #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Stats { @@ -84,6 +90,15 @@ pub struct Stats { pub indexes: BTreeMap, } +pub enum Update { + DocumentAddition { + payload: Payload, + primary_key: Option, + method: IndexDocumentsMethod, + format: DocumentAdditionFormat, + } +} + impl IndexController { pub fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { let index_size = options.max_index_size.get_bytes() as usize; @@ -125,21 +140,21 @@ impl IndexController { options.max_udb_size.get_bytes() as usize, )?; - if options.schedule_snapshot { - let snapshot_service = SnapshotService::new( - uuid_resolver.clone(), - update_handle.clone(), - Duration::from_secs(options.snapshot_interval_sec), - options.snapshot_dir.clone(), - options - .db_path - .file_name() - .map(|n| n.to_owned().into_string().expect("invalid path")) - .unwrap_or_else(|| String::from("data.ms")), - ); + //if options.schedule_snapshot { + //let snapshot_service = SnapshotService::new( + //uuid_resolver.clone(), + //update_handle.clone(), + //Duration::from_secs(options.snapshot_interval_sec), + //options.snapshot_dir.clone(), + //options + //.db_path + //.file_name() + //.map(|n| n.to_owned().into_string().expect("invalid path")) + //.unwrap_or_else(|| String::from("data.ms")), + //); - tokio::task::spawn(snapshot_service.run()); - } + //tokio::task::spawn(snapshot_service.run()); + //} Ok(Self { uuid_resolver, @@ -149,132 +164,148 @@ impl IndexController { }) } - pub async fn add_documents( - &self, - uid: String, - method: milli::update::IndexDocumentsMethod, - format: milli::update::UpdateFormat, - payload: Payload, - primary_key: Option, - ) -> Result { - let perform_update = |uuid| async move { - let meta = UpdateMeta::DocumentsAddition { - method, - format, - primary_key, - }; - let (sender, receiver) = mpsc::channel(10); - - // It is necessary to spawn a local task to send the payload to the update handle to - // prevent dead_locking between the update_handle::update that waits for the update to be - // registered and the update_actor that waits for the the payload to be sent to it. - tokio::task::spawn_local(async move { - payload - .for_each(|r| async { - let _ = sender.send(r).await; - }) - .await - }); - - // This must be done *AFTER* spawning the task. - self.update_handle.update(meta, receiver, uuid).await - }; - - match self.uuid_resolver.get(uid).await { - Ok(uuid) => Ok(perform_update(uuid).await?), + pub async fn register_update(&self, uid: &str, update: Update) -> Result { + match self.uuid_resolver.get(uid.to_string()).await { + Ok(uuid) => { + let update_result = self.update_handle.update(uuid, update).await?; + Ok(update_result) + }, Err(UuidResolverError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); - let status = perform_update(uuid).await?; + let update_result = self.update_handle.update(uuid, update).await?; // ignore if index creation fails now, since it may already have been created let _ = self.index_handle.create_index(uuid, None).await; self.uuid_resolver.insert(name, uuid).await?; - Ok(status) + Ok(update_result) } Err(e) => Err(e.into()), } } - pub async fn clear_documents(&self, uid: String) -> Result { - let uuid = self.uuid_resolver.get(uid).await?; - let meta = UpdateMeta::ClearDocuments; - let (_, receiver) = mpsc::channel(1); - let status = self.update_handle.update(meta, receiver, uuid).await?; - Ok(status) - } + //pub async fn add_documents( + //&self, + //uid: String, + //method: milli::update::IndexDocumentsMethod, + //payload: Payload, + //primary_key: Option, + //) -> Result { + //let perform_update = |uuid| async move { + //let meta = UpdateMeta::DocumentsAddition { + //method, + //primary_key, + //}; + //let (sender, receiver) = mpsc::channel(10); - pub async fn delete_documents( - &self, - uid: String, - documents: Vec, - ) -> Result { - let uuid = self.uuid_resolver.get(uid).await?; - let meta = UpdateMeta::DeleteDocuments { ids: documents }; - let (_, receiver) = mpsc::channel(1); - let status = self.update_handle.update(meta, receiver, uuid).await?; - Ok(status) - } + //// It is necessary to spawn a local task to send the payload to the update handle to + //// prevent dead_locking between the update_handle::update that waits for the update to be + //// registered and the update_actor that waits for the the payload to be sent to it. + //tokio::task::spawn_local(async move { + //payload + //.for_each(|r| async { + //let _ = sender.send(r).await; + //}) + //.await + //}); - pub async fn update_settings( - &self, - uid: String, - settings: Settings, - create: bool, - ) -> Result { - let perform_udpate = |uuid| async move { - let meta = UpdateMeta::Settings(settings.into_unchecked()); - // Nothing so send, drop the sender right away, as not to block the update actor. - let (_, receiver) = mpsc::channel(1); - self.update_handle.update(meta, receiver, uuid).await - }; + //// This must be done *AFTER* spawning the task. + //self.update_handle.update(meta, receiver, uuid).await + //}; - match self.uuid_resolver.get(uid).await { - Ok(uuid) => Ok(perform_udpate(uuid).await?), - Err(UuidResolverError::UnexistingIndex(name)) if create => { - let uuid = Uuid::new_v4(); - let status = perform_udpate(uuid).await?; - // ignore if index creation fails now, since it may already have been created - let _ = self.index_handle.create_index(uuid, None).await; - self.uuid_resolver.insert(name, uuid).await?; - Ok(status) - } - Err(e) => Err(e.into()), - } - } + //match self.uuid_resolver.get(uid).await { + //Ok(uuid) => Ok(perform_update(uuid).await?), + //Err(UuidResolverError::UnexistingIndex(name)) => { + //let uuid = Uuid::new_v4(); + //let status = perform_update(uuid).await?; + //// ignore if index creation fails now, since it may already have been created + //let _ = self.index_handle.create_index(uuid, None).await; + //self.uuid_resolver.insert(name, uuid).await?; + //Ok(status) + //} + //Err(e) => Err(e.into()), + //} + //} - pub async fn create_index(&self, index_settings: IndexSettings) -> Result { - let IndexSettings { uid, primary_key } = index_settings; - let uid = uid.ok_or(IndexControllerError::MissingUid)?; - let uuid = Uuid::new_v4(); - let meta = self.index_handle.create_index(uuid, primary_key).await?; - self.uuid_resolver.insert(uid.clone(), uuid).await?; - let meta = IndexMetadata { - uuid, - name: uid.clone(), - uid, - meta, - }; + //pub async fn clear_documents(&self, uid: String) -> Result { + //let uuid = self.uuid_resolver.get(uid).await?; + //let meta = UpdateMeta::ClearDocuments; + //let (_, receiver) = mpsc::channel(1); + //let status = self.update_handle.update(meta, receiver, uuid).await?; + //Ok(status) + //} - Ok(meta) - } + //pub async fn delete_documents( + //&self, + //uid: String, + //documents: Vec, + //) -> Result { + //let uuid = self.uuid_resolver.get(uid).await?; + //let meta = UpdateMeta::DeleteDocuments { ids: documents }; + //let (_, receiver) = mpsc::channel(1); + //let status = self.update_handle.update(meta, receiver, uuid).await?; + //Ok(status) + //} - pub async fn delete_index(&self, uid: String) -> Result<()> { - let uuid = self.uuid_resolver.delete(uid).await?; + //pub async fn update_settings( + //&self, + //uid: String, + //settings: Settings, + //create: bool, + //) -> Result { + //let perform_udpate = |uuid| async move { + //let meta = UpdateMeta::Settings(settings.into_unchecked()); + //// Nothing so send, drop the sender right away, as not to block the update actor. + //let (_, receiver) = mpsc::channel(1); + //self.update_handle.update(meta, receiver, uuid).await + //}; - // We remove the index from the resolver synchronously, and effectively perform the index - // deletion as a background task. - let update_handle = self.update_handle.clone(); - let index_handle = self.index_handle.clone(); - tokio::spawn(async move { - if let Err(e) = update_handle.delete(uuid).await { - error!("Error while deleting index: {}", e); - } - if let Err(e) = index_handle.delete(uuid).await { - error!("Error while deleting index: {}", e); - } - }); + //match self.uuid_resolver.get(uid).await { + //Ok(uuid) => Ok(perform_udpate(uuid).await?), + //Err(UuidResolverError::UnexistingIndex(name)) if create => { + //let uuid = Uuid::new_v4(); + //let status = perform_udpate(uuid).await?; + //// ignore if index creation fails now, since it may already have been created + //let _ = self.index_handle.create_index(uuid, None).await; + //self.uuid_resolver.insert(name, uuid).await?; + //Ok(status) + //} + //Err(e) => Err(e.into()), + //} + //} - Ok(()) - } + //pub async fn create_index(&self, index_settings: IndexSettings) -> Result { + //let IndexSettings { uid, primary_key } = index_settings; + //let uid = uid.ok_or(IndexControllerError::MissingUid)?; + //let uuid = Uuid::new_v4(); + //let meta = self.index_handle.create_index(uuid, primary_key).await?; + //self.uuid_resolver.insert(uid.clone(), uuid).await?; + //let meta = IndexMetadata { + //uuid, + //name: uid.clone(), + //uid, + //meta, + //}; + + //Ok(meta) + //} + + //pub async fn delete_index(&self, uid: String) -> Result<()> { + //let uuid = self.uuid_resolver.delete(uid).await?; + + //// We remove the index from the resolver synchronously, and effectively perform the index + //// deletion as a background task. + //let update_handle = self.update_handle.clone(); + //let index_handle = self.index_handle.clone(); + //tokio::spawn(async move { + //if let Err(e) = update_handle.delete(uuid).await { + //error!("Error while deleting index: {}", e); + //} + //if let Err(e) = index_handle.delete(uuid).await { + //error!("Error while deleting index: {}", e); + //} + //}); + + //Ok(()) + //} pub async fn update_status(&self, uid: String, id: u64) -> Result { let uuid = self.uuid_resolver.get(uid).await?; @@ -454,3 +485,7 @@ pub fn desc_ranking_rule(text: &str) -> Option<&str> { .and_then(|(_, tail)| tail.rsplit_once(")")) .map(|(field, _)| field) } + +fn update_files_path(path: impl AsRef) -> PathBuf { + path.as_ref().join("updates/updates_files") +} diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 4c731efd8..6c5171d62 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -1,97 +1,90 @@ -use std::path::{Path, PathBuf}; -use std::time::Duration; +use std::path::Path; use anyhow::bail; -use log::{error, info, trace}; -use tokio::fs; -use tokio::task::spawn_blocking; -use tokio::time::sleep; -use super::update_actor::UpdateActorHandle; -use super::uuid_resolver::UuidResolverHandle; use crate::helpers::compression; -pub struct SnapshotService { - uuid_resolver_handle: R, - update_handle: U, - snapshot_period: Duration, - snapshot_path: PathBuf, - db_name: String, -} +//pub struct SnapshotService { + //uuid_resolver_handle: R, + //update_handle: U, + //snapshot_period: Duration, + //snapshot_path: PathBuf, + //db_name: String, +//} -impl SnapshotService -where - U: UpdateActorHandle, - R: UuidResolverHandle, -{ - pub fn new( - uuid_resolver_handle: R, - update_handle: U, - snapshot_period: Duration, - snapshot_path: PathBuf, - db_name: String, - ) -> Self { - Self { - uuid_resolver_handle, - update_handle, - snapshot_period, - snapshot_path, - db_name, - } - } +//impl SnapshotService +//where + //U: UpdateActorHandle, + //R: UuidResolverHandle, +//{ + //pub fn new( + //uuid_resolver_handle: R, + //update_handle: U, + //snapshot_period: Duration, + //snapshot_path: PathBuf, + //db_name: String, + //) -> Self { + //Self { + //uuid_resolver_handle, + //update_handle, + //snapshot_period, + //snapshot_path, + //db_name, + //} + //} - pub async fn run(self) { - info!( - "Snapshot scheduled every {}s.", - self.snapshot_period.as_secs() - ); - loop { - if let Err(e) = self.perform_snapshot().await { - error!("Error while performing snapshot: {}", e); - } - sleep(self.snapshot_period).await; - } - } + //pub async fn run(self) { + //info!( + //"Snapshot scheduled every {}s.", + //self.snapshot_period.as_secs() + //); + //loop { + //if let Err(e) = self.perform_snapshot().await { + //error!("Error while performing snapshot: {}", e); + //} + //sleep(self.snapshot_period).await; + //} + //} - async fn perform_snapshot(&self) -> anyhow::Result<()> { - trace!("Performing snapshot."); + //async fn perform_snapshot(&self) -> anyhow::Result<()> { + //trace!("Performing snapshot."); - let snapshot_dir = self.snapshot_path.clone(); - fs::create_dir_all(&snapshot_dir).await?; - let temp_snapshot_dir = - spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; - let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); + //let snapshot_dir = self.snapshot_path.clone(); + //fs::create_dir_all(&snapshot_dir).await?; + //let temp_snapshot_dir = + //spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; + //let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); - let uuids = self - .uuid_resolver_handle - .snapshot(temp_snapshot_path.clone()) - .await?; + //let uuids = self + //.uuid_resolver_handle + //.snapshot(temp_snapshot_path.clone()) + //.await?; - if uuids.is_empty() { - return Ok(()); - } + //if uuids.is_empty() { + //return Ok(()); + //} - self.update_handle - .snapshot(uuids, temp_snapshot_path.clone()) - .await?; - let snapshot_dir = self.snapshot_path.clone(); - let snapshot_path = self - .snapshot_path - .join(format!("{}.snapshot", self.db_name)); - let snapshot_path = spawn_blocking(move || -> anyhow::Result { - let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; - let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); - compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; - temp_snapshot_file.persist(&snapshot_path)?; - Ok(snapshot_path) - }) - .await??; + //self.update_handle + //.snapshot(uuids, temp_snapshot_path.clone()) + //.await?; + //let snapshot_dir = self.snapshot_path.clone(); + //let snapshot_path = self + //.snapshot_path + //.join(format!("{}.snapshot", self.db_name)); + //let snapshot_path = spawn_blocking(move || -> anyhow::Result { + //let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; + //let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); + //compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; + //temp_snapshot_file.persist(&snapshot_path)?; + //Ok(snapshot_path) + //}) + //.await??; - trace!("Created snapshot in {:?}.", snapshot_path); + //trace!("Created snapshot in {:?}.", snapshot_path); - Ok(()) - } -} + //Ok(()) + //} +//} pub fn load_snapshot( db_path: impl AsRef, diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 59a22910f..01e34e000 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -1,44 +1,82 @@ use std::collections::HashSet; -use std::io::SeekFrom; +use std::io; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use actix_web::error::PayloadError; use async_stream::stream; -use futures::StreamExt; +use bytes::Bytes; +use futures::{Stream, StreamExt}; use log::trace; -use serdeval::*; -use tokio::fs; -use tokio::io::AsyncWriteExt; +use milli::documents::DocumentBatchBuilder; +use serde_json::{Map, Value}; use tokio::sync::mpsc; use uuid::Uuid; use super::error::{Result, UpdateActorError}; -use super::{PayloadData, UpdateMsg, UpdateStore, UpdateStoreInfo}; +use super::RegisterUpdate; +use super::{UpdateMsg, UpdateStore, UpdateStoreInfo, Update}; use crate::index_controller::index_actor::IndexActorHandle; -use crate::index_controller::{UpdateMeta, UpdateStatus}; +use crate::index_controller::update_file_store::UpdateFileStore; +use crate::index_controller::{DocumentAdditionFormat, Payload, UpdateStatus}; -pub struct UpdateActor { - path: PathBuf, +pub struct UpdateActor { store: Arc, - inbox: Option>>, + inbox: Option>, + update_file_store: UpdateFileStore, index_handle: I, must_exit: Arc, } -impl UpdateActor +struct StreamReader { + stream: S, + current: Option, +} + +impl StreamReader { + fn new(stream: S) -> Self { + Self { stream, current: None } + } + +} + +impl> + Unpin> io::Read for StreamReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self.current.take() { + Some(mut bytes) => { + let copied = bytes.split_to(buf.len()); + buf.copy_from_slice(&copied); + if !bytes.is_empty() { + self.current.replace(bytes); + } + Ok(copied.len()) + } + None => { + match tokio::runtime::Handle::current().block_on(self.stream.next()) { + Some(Ok(bytes)) => { + self.current.replace(bytes); + self.read(buf) + }, + Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)), + None => return Ok(0), + } + } + } + } +} + +impl UpdateActor where - D: AsRef<[u8]> + Sized + 'static, - I: IndexActorHandle + Clone + Send + Sync + 'static, + I: IndexActorHandle + Clone + Sync + Send + 'static, { pub fn new( update_db_size: usize, - inbox: mpsc::Receiver>, + inbox: mpsc::Receiver, path: impl AsRef, index_handle: I, ) -> anyhow::Result { - let path = path.as_ref().join("updates"); - + let path = path.as_ref().to_owned(); std::fs::create_dir_all(&path)?; let mut options = heed::EnvOpenOptions::new(); @@ -47,14 +85,17 @@ where let must_exit = Arc::new(AtomicBool::new(false)); let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?; - std::fs::create_dir_all(path.join("update_files"))?; + let inbox = Some(inbox); + + let update_file_store = UpdateFileStore::new(&path).unwrap(); + Ok(Self { - path, store, inbox, index_handle, must_exit, + update_file_store }) } @@ -89,11 +130,10 @@ where match msg { Update { uuid, - meta, - data, + update, ret, } => { - let _ = ret.send(self.handle_update(uuid, meta, data).await); + let _ = ret.send(self.handle_update(uuid, update).await); } ListUpdates { uuid, ret } => { let _ = ret.send(self.handle_list_updates(uuid).await); @@ -120,90 +160,39 @@ where async fn handle_update( &self, - uuid: Uuid, - meta: UpdateMeta, - payload: mpsc::Receiver>, + index_uuid: Uuid, + update: Update, ) -> Result { - let file_path = match meta { - UpdateMeta::DocumentsAddition { .. } => { - let update_file_id = uuid::Uuid::new_v4(); - let path = self - .path - .join(format!("update_files/update_{}", update_file_id)); - let mut file = fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&path) - .await?; + let registration = match update { + Update::DocumentAddition { payload, primary_key, method, format } => { + let content_uuid = match format { + DocumentAdditionFormat::Json => self.documents_from_json(payload).await?, + }; - async fn write_to_file( - file: &mut fs::File, - mut payload: mpsc::Receiver>, - ) -> Result - where - D: AsRef<[u8]> + Sized + 'static, - { - let mut file_len = 0; - - while let Some(bytes) = payload.recv().await { - let bytes = bytes?; - file_len += bytes.as_ref().len(); - file.write_all(bytes.as_ref()).await?; - } - - file.flush().await?; - - Ok(file_len) - } - - let file_len = write_to_file(&mut file, payload).await; - - match file_len { - Ok(len) if len > 0 => { - let file = file.into_std().await; - Some((file, update_file_id)) - } - Err(e) => { - fs::remove_file(&path).await?; - return Err(e); - } - _ => { - fs::remove_file(&path).await?; - None - } - } + RegisterUpdate::DocumentAddition { primary_key, method, content_uuid } } - _ => None, }; - let update_store = self.store.clone(); + let store = self.store.clone(); + let status = tokio::task::spawn_blocking(move || store.register_update(index_uuid, registration)).await??; + Ok(status.into()) + } + + async fn documents_from_json(&self, payload: Payload) -> Result { + let file_store = self.update_file_store.clone(); tokio::task::spawn_blocking(move || { - use std::io::{BufReader, Seek}; + let (uuid, mut file) = file_store.new_update().unwrap(); + let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap(); - // If the payload is empty, ignore the check. - let update_uuid = if let Some((mut file, uuid)) = file_path { - // set the file back to the beginning - file.seek(SeekFrom::Start(0))?; - // Check that the json payload is valid: - let reader = BufReader::new(&mut file); - // Validate that the payload is in the correct format. - let _: Seq> = serde_json::from_reader(reader) - .map_err(|e| UpdateActorError::InvalidPayload(Box::new(e)))?; + let documents: Vec> = serde_json::from_reader(StreamReader::new(payload))?; + builder.add_documents(documents).unwrap(); + builder.finish().unwrap(); - Some(uuid) - } else { - None - }; + file.persist(); - // The payload is valid, we can register it to the update store. - let status = update_store - .register_update(meta, update_uuid, uuid) - .map(UpdateStatus::Enqueued)?; - Ok(status) - }) - .await? + Ok(uuid) + }).await? } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { @@ -267,4 +256,5 @@ where Ok(info) } + } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index 125c63401..5175f2eb5 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -4,45 +4,37 @@ use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::index_controller::{IndexActorHandle, UpdateStatus}; +use crate::index_controller::{IndexActorHandle, Update, UpdateStatus}; use super::error::Result; -use super::{PayloadData, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo}; +use super::{UpdateActor, UpdateActorHandle, UpdateMsg, UpdateStoreInfo}; #[derive(Clone)] -pub struct UpdateActorHandleImpl { - sender: mpsc::Sender>, +pub struct UpdateActorHandleImpl { + sender: mpsc::Sender, } -impl UpdateActorHandleImpl -where - D: AsRef<[u8]> + Sized + 'static + Sync + Send, -{ +impl UpdateActorHandleImpl { pub fn new( index_handle: I, path: impl AsRef, update_store_size: usize, ) -> anyhow::Result where - I: IndexActorHandle + Clone + Send + Sync + 'static, + I: IndexActorHandle + Clone + Sync + Send +'static, { let path = path.as_ref().to_owned(); let (sender, receiver) = mpsc::channel(100); let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?; - tokio::task::spawn(actor.run()); + tokio::task::spawn_local(actor.run()); Ok(Self { sender }) } } #[async_trait::async_trait] -impl UpdateActorHandle for UpdateActorHandleImpl -where - D: AsRef<[u8]> + Sized + 'static + Sync + Send, -{ - type Data = D; - +impl UpdateActorHandle for UpdateActorHandleImpl { async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::ListUpdates { uuid, ret }; @@ -86,15 +78,13 @@ where async fn update( &self, - meta: UpdateMeta, - data: mpsc::Receiver>, uuid: Uuid, + update: Update, ) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Update { uuid, - data, - meta, + update, ret, }; self.sender.send(msg).await?; diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 6b8a0f73f..40cc3360c 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -1,17 +1,16 @@ use std::collections::HashSet; use std::path::PathBuf; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use uuid::Uuid; use super::error::Result; -use super::{PayloadData, UpdateMeta, UpdateStatus, UpdateStoreInfo}; +use super::{UpdateStatus, UpdateStoreInfo, Update}; -pub enum UpdateMsg { +pub enum UpdateMsg { Update { uuid: Uuid, - meta: UpdateMeta, - data: mpsc::Receiver>, + update: Update, ret: oneshot::Sender>, }, ListUpdates { diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index ee388d2fa..b83cf491c 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,10 +1,11 @@ use std::{collections::HashSet, path::PathBuf}; -use actix_web::error::PayloadError; -use tokio::sync::mpsc; +use milli::update::IndexDocumentsMethod; use uuid::Uuid; +use serde::{Serialize, Deserialize}; -use crate::index_controller::{UpdateMeta, UpdateStatus}; +use crate::index_controller::UpdateStatus; +use super::Update; use actor::UpdateActor; use error::Result; @@ -19,16 +20,21 @@ mod handle_impl; mod message; pub mod store; -type PayloadData = std::result::Result; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RegisterUpdate { + DocumentAddition { + primary_key: Option, + method: IndexDocumentsMethod, + content_uuid: Uuid, + } +} + #[cfg(test)] use mockall::automock; #[async_trait::async_trait] -#[cfg_attr(test, automock(type Data=Vec;))] pub trait UpdateActorHandle { - type Data: AsRef<[u8]> + Sized + 'static + Sync + Send; - async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; @@ -37,8 +43,7 @@ pub trait UpdateActorHandle { async fn get_info(&self) -> Result; async fn update( &self, - meta: UpdateMeta, - data: mpsc::Receiver>, uuid: Uuid, + update: Update, ) -> Result; } diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs index 79a3cca05..5f3605999 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -1,17 +1,17 @@ use std::{ collections::HashSet, fs::{create_dir_all, File}, - io::{BufRead, BufReader, Write}, + io::Write, path::{Path, PathBuf}, }; -use heed::{EnvOpenOptions, RoTxn}; +use heed::RoTxn; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::{Result, State, UpdateStore}; use crate::index_controller::{ - index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued, + index_actor::IndexActorHandle, UpdateStatus, }; @@ -67,35 +67,36 @@ impl UpdateStore { fn dump_pending( &self, - txn: &RoTxn, - uuids: &HashSet, - mut file: &mut File, - dst_path: impl AsRef, + _txn: &RoTxn, + _uuids: &HashSet, + _file: &mut File, + _dst_path: impl AsRef, ) -> Result<()> { - let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); + todo!() + //let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); - for pending in pendings { - let ((_, uuid, _), data) = pending?; - if uuids.contains(&uuid) { - let update = data.decode()?; + //for pending in pendings { + //let ((_, uuid, _), data) = pending?; + //if uuids.contains(&uuid) { + //let update = data.decode()?; - if let Some(ref update_uuid) = update.content { - let src = super::update_uuid_to_file_path(&self.path, *update_uuid); - let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid); - std::fs::copy(src, dst)?; - } + //if let Some(ref update_uuid) = update.content { + //let src = super::update_uuid_to_file_path(&self.path, *update_uuid); + //let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid); + //std::fs::copy(src, dst)?; + //} - let update_json = UpdateEntry { - uuid, - update: update.into(), - }; + //let update_json = UpdateEntry { + //uuid, + //update: update.into(), + //}; - serde_json::to_writer(&mut file, &update_json)?; - file.write_all(b"\n")?; - } - } + //serde_json::to_writer(&mut file, &update_json)?; + //file.write_all(b"\n")?; + //} + //} - Ok(()) + //Ok(()) } fn dump_completed( @@ -122,52 +123,53 @@ impl UpdateStore { } pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - db_size: usize, + _src: impl AsRef, + _dst: impl AsRef, + _db_size: usize, ) -> anyhow::Result<()> { - let dst_update_path = dst.as_ref().join("updates/"); - create_dir_all(&dst_update_path)?; + todo!() + //let dst_update_path = dst.as_ref().join("updates/"); + //create_dir_all(&dst_update_path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(db_size as usize); - let (store, _) = UpdateStore::new(options, &dst_update_path)?; + //let mut options = EnvOpenOptions::new(); + //options.map_size(db_size as usize); + //let (store, _) = UpdateStore::new(options, &dst_update_path)?; - let src_update_path = src.as_ref().join("updates"); - let update_data = File::open(&src_update_path.join("data.jsonl"))?; - let mut update_data = BufReader::new(update_data); + //let src_update_path = src.as_ref().join("updates"); + //let update_data = File::open(&src_update_path.join("data.jsonl"))?; + //let mut update_data = BufReader::new(update_data); - std::fs::create_dir_all(dst_update_path.join("update_files/"))?; + //std::fs::create_dir_all(dst_update_path.join("update_files/"))?; - let mut wtxn = store.env.write_txn()?; - let mut line = String::new(); - loop { - match update_data.read_line(&mut line) { - Ok(0) => break, - Ok(_) => { - let UpdateEntry { uuid, update } = serde_json::from_str(&line)?; - store.register_raw_updates(&mut wtxn, &update, uuid)?; + //let mut wtxn = store.env.write_txn()?; + //let mut line = String::new(); + //loop { + //match update_data.read_line(&mut line) { + //Ok(0) => break, + //Ok(_) => { + //let UpdateEntry { uuid, update } = serde_json::from_str(&line)?; + //store.register_raw_updates(&mut wtxn, &update, uuid)?; - // Copy ascociated update path if it exists - if let UpdateStatus::Enqueued(Enqueued { - content: Some(uuid), - .. - }) = update - { - let src = update_uuid_to_file_path(&src_update_path, uuid); - let dst = update_uuid_to_file_path(&dst_update_path, uuid); - std::fs::copy(src, dst)?; - } - } - _ => break, - } + //// Copy ascociated update path if it exists + //if let UpdateStatus::Enqueued(Enqueued { + //content: Some(uuid), + //.. + //}) = update + //{ + //let src = update_uuid_to_file_path(&src_update_path, uuid); + //let dst = update_uuid_to_file_path(&dst_update_path, uuid); + //std::fs::copy(src, dst)?; + //} + //} + //_ => break, + //} - line.clear(); - } + //line.clear(); + //} - wtxn.commit()?; + //wtxn.commit()?; - Ok(()) + //Ok(()) } } diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index e23e05b52..2dd758b82 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -1,7 +1,7 @@ mod codec; pub mod dump; -use std::fs::{copy, create_dir_all, remove_file, File}; +use std::fs::{create_dir_all, remove_file}; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -26,9 +26,10 @@ use uuid::Uuid; use codec::*; +use super::RegisterUpdate; use super::error::Result; -use super::UpdateMeta; use crate::helpers::EnvSizer; +use crate::index_controller::update_files_path; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; #[allow(clippy::upper_case_acronyms)] @@ -116,7 +117,9 @@ impl UpdateStore { ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); - let env = options.open(&path)?; + let update_path = path.as_ref().join("updates"); + std::fs::create_dir_all(&update_path)?; + let env = options.open(update_path)?; let pending_queue = env.create_database(Some("pending-queue"))?; let next_update_id = env.create_database(Some("next-update-id"))?; let updates = env.create_database(Some("updates"))?; @@ -157,7 +160,7 @@ impl UpdateStore { // want to close the index. let duration = Duration::from_secs(10 * 60); // 10 minutes let update_store_weak = Arc::downgrade(&update_store); - tokio::task::spawn(async move { + tokio::task::spawn_local(async move { // Block and wait for something to process with a timeout. The timeout // function returns a Result and we must just unlock the loop on Result. 'outer: while timeout(duration, notification_receiver.recv()) @@ -233,14 +236,12 @@ impl UpdateStore { /// into the pending-meta store. Returns the new unique update id. pub fn register_update( &self, - meta: UpdateMeta, - content: Option, index_uuid: Uuid, + update: RegisterUpdate, ) -> heed::Result { let mut txn = self.env.write_txn()?; - let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; - let meta = Enqueued::new(meta, update_id, content); + let meta = Enqueued::new(update, update_id); self.pending_queue .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; @@ -254,30 +255,30 @@ impl UpdateStore { Ok(meta) } - /// Push already processed update in the UpdateStore without triggering the notification - /// process. This is useful for the dumps. - pub fn register_raw_updates( - &self, - wtxn: &mut heed::RwTxn, - update: &UpdateStatus, - index_uuid: Uuid, - ) -> heed::Result<()> { - match update { - UpdateStatus::Enqueued(enqueued) => { - let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; - self.pending_queue.remap_key_type::().put( - wtxn, - &(global_id, index_uuid, enqueued.id()), - enqueued, - )?; - } - _ => { - let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; - self.updates.put(wtxn, &(index_uuid, update.id()), update)?; - } - } - Ok(()) - } + // /// Push already processed update in the UpdateStore without triggering the notification + // /// process. This is useful for the dumps. + //pub fn register_raw_updates( + //&self, + //wtxn: &mut heed::RwTxn, + //update: &UpdateStatus, + //index_uuid: Uuid, + //) -> heed::Result<()> { + //match update { + //UpdateStatus::Enqueued(enqueued) => { + //let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; + //self.pending_queue.remap_key_type::().put( + //wtxn, + //&(global_id, index_uuid, enqueued.id()), + //enqueued, + //)?; + //} + //_ => { + //let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; + //self.updates.put(wtxn, &(index_uuid, update.id()), update)?; + //} + //} + //Ok(()) + //} /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and @@ -291,8 +292,7 @@ impl UpdateStore { // If there is a pending update we process and only keep // a reader while processing it, not a writer. match first_meta { - Some(((global_id, index_uuid, _), mut pending)) => { - let content = pending.content.take(); + Some(((global_id, index_uuid, _), pending)) => { let processing = pending.processing(); // Acquire the state lock and set the current state to processing. // txn must *always* be acquired after state lock, or it will dead lock. @@ -300,7 +300,7 @@ impl UpdateStore { state.swap(State::Processing(index_uuid, processing.clone())); let result = - self.perform_update(content, processing, index_handle, index_uuid, global_id); + self.perform_update(processing, index_handle, index_uuid, global_id); state.swap(State::Idle); @@ -312,27 +312,16 @@ impl UpdateStore { fn perform_update( &self, - content: Option, processing: Processing, index_handle: impl IndexActorHandle, index_uuid: Uuid, global_id: u64, ) -> Result> { - let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); - let update_id = processing.id(); - - let file = match content_path { - Some(ref path) => { - let file = File::open(path)?; - Some(file) - } - None => None, - }; - // Process the pending update using the provided user function. let handle = Handle::current(); + let update_id = processing.id(); let result = - match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { + match handle.block_on(index_handle.update(index_uuid, processing.clone())) { Ok(result) => result, Err(e) => Err(processing.fail(e.into())), }; @@ -354,10 +343,6 @@ impl UpdateStore { wtxn.commit()?; - if let Some(ref path) = content_path { - remove_file(&path)?; - } - Ok(Some(())) } @@ -435,16 +420,16 @@ impl UpdateStore { pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> { let mut txn = self.env.write_txn()?; // Contains all the content file paths that we need to be removed if the deletion was successful. - let mut uuids_to_remove = Vec::new(); + let uuids_to_remove = Vec::new(); let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); while let Some(Ok(((_, uuid, _), pending))) = pendings.next() { if uuid == index_uuid { - let mut pending = pending.decode()?; - if let Some(update_uuid) = pending.content.take() { - uuids_to_remove.push(update_uuid); - } + let mut _pending = pending.decode()?; + //if let Some(update_uuid) = pending.content.take() { + //uuids_to_remove.push(update_uuid); + //} // Invariant check: we can only delete the current entry when we don't hold // references to it anymore. This must be done after we have retrieved its content. @@ -486,7 +471,7 @@ impl UpdateStore { // them. uuids_to_remove .iter() - .map(|uuid| update_uuid_to_file_path(&self.path, *uuid)) + .map(|uuid: &Uuid| update_files_path(&self.path).join(uuid.to_string())) .for_each(|path| { let _ = remove_file(path); }); @@ -521,17 +506,17 @@ impl UpdateStore { let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); for entry in pendings { - let ((_, uuid, _), pending) = entry?; - if uuids.contains(&uuid) { - if let Enqueued { - content: Some(uuid), - .. - } = pending.decode()? - { - let path = update_uuid_to_file_path(&self.path, uuid); - copy(path, &update_files_path)?; - } - } + let ((_, _uuid, _), _pending) = entry?; + //if uuids.contains(&uuid) { + //if let Enqueued { + //content: Some(uuid), + //.. + //} = pending.decode()? + //{ + //let path = update_uuid_to_file_path(&self.path, uuid); + //copy(path, &update_files_path)?; + //} + //} } let path = &path.as_ref().to_path_buf(); @@ -553,18 +538,18 @@ impl UpdateStore { } pub fn get_info(&self) -> Result { - let mut size = self.env.size(); + let size = self.env.size(); let txn = self.env.read_txn()?; for entry in self.pending_queue.iter(&txn)? { - let (_, pending) = entry?; - if let Enqueued { - content: Some(uuid), - .. - } = pending - { - let path = update_uuid_to_file_path(&self.path, uuid); - size += File::open(path)?.metadata()?.len(); - } + let (_, _pending) = entry?; + //if let Enqueued { + //content: Some(uuid), + //.. + //} = pending + //{ + //let path = update_uuid_to_file_path(&self.path, uuid); + //size += File::open(path)?.metadata()?.len(); + //} } let processing = match *self.state.read() { State::Processing(uuid, _) => Some(uuid), @@ -575,12 +560,6 @@ impl UpdateStore { } } -fn update_uuid_to_file_path(root: impl AsRef, uuid: Uuid) -> PathBuf { - root.as_ref() - .join(UPDATE_DIR) - .join(format!("update_{}", uuid)) -} - #[cfg(test)] mod test { use super::*; diff --git a/meilisearch-http/src/index_controller/update_file_store.rs b/meilisearch-http/src/index_controller/update_file_store.rs new file mode 100644 index 000000000..1c60bcec9 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_file_store.rs @@ -0,0 +1,63 @@ +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::ops::{Deref, DerefMut}; + +use tempfile::NamedTempFile; +use uuid::Uuid; + +use super::error::Result; + +pub struct UpdateFile { + path: PathBuf, + file: NamedTempFile, +} + +impl UpdateFile { + pub fn persist(self) { + println!("persisting in {}", self.path.display()); + self.file.persist(&self.path).unwrap(); + } +} + +impl Deref for UpdateFile { + type Target = NamedTempFile; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + +impl DerefMut for UpdateFile { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.file + } +} + +#[derive(Clone, Debug)] +pub struct UpdateFileStore { + path: PathBuf, +} + +impl UpdateFileStore { + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref().join("updates/updates_files"); + std::fs::create_dir_all(&path).unwrap(); + Ok(Self { path }) + } + + pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { + let file = NamedTempFile::new().unwrap(); + let uuid = Uuid::new_v4(); + let path = self.path.join(uuid.to_string()); + let update_file = UpdateFile { file, path }; + + Ok((uuid, update_file)) + } + + pub fn get_update(&self, uuid: Uuid) -> Result { + let path = self.path.join(uuid.to_string()); + println!("reading in {}", path.display()); + let file = File::open(path).unwrap(); + Ok(file) + } +} diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index d02438d3c..7065b0462 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -1,13 +1,14 @@ use chrono::{DateTime, Utc}; -use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; +use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; use serde::{Deserialize, Serialize}; -use uuid::Uuid; use crate::{ error::ResponseError, index::{Settings, Unchecked}, }; +use super::update_actor::RegisterUpdate; + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateResult { DocumentsAddition(DocumentAdditionResult), @@ -21,7 +22,6 @@ pub enum UpdateResult { pub enum UpdateMeta { DocumentsAddition { method: IndexDocumentsMethod, - format: UpdateFormat, primary_key: Option, }, ClearDocuments, @@ -35,18 +35,16 @@ pub enum UpdateMeta { #[serde(rename_all = "camelCase")] pub struct Enqueued { pub update_id: u64, - pub meta: UpdateMeta, + pub meta: RegisterUpdate, pub enqueued_at: DateTime, - pub content: Option, } impl Enqueued { - pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { + pub fn new(meta: RegisterUpdate, update_id: u64) -> Self { Self { enqueued_at: Utc::now(), meta, update_id, - content, } } @@ -64,7 +62,7 @@ impl Enqueued { } } - pub fn meta(&self) -> &UpdateMeta { + pub fn meta(&self) -> &RegisterUpdate { &self.meta } @@ -87,7 +85,7 @@ impl Processed { self.from.id() } - pub fn meta(&self) -> &UpdateMeta { + pub fn meta(&self) -> &RegisterUpdate { self.from.meta() } } @@ -105,7 +103,7 @@ impl Processing { self.from.id() } - pub fn meta(&self) -> &UpdateMeta { + pub fn meta(&self) -> &RegisterUpdate { self.from.meta() } @@ -139,7 +137,7 @@ impl Aborted { self.from.id() } - pub fn meta(&self) -> &UpdateMeta { + pub fn meta(&self) -> &RegisterUpdate { self.from.meta() } } @@ -158,7 +156,7 @@ impl Failed { self.from.id() } - pub fn meta(&self) -> &UpdateMeta { + pub fn meta(&self) -> &RegisterUpdate { self.from.meta() } } @@ -184,7 +182,7 @@ impl UpdateStatus { } } - pub fn meta(&self) -> &UpdateMeta { + pub fn meta(&self) -> &RegisterUpdate { match self { UpdateStatus::Processing(u) => u.meta(), UpdateStatus::Enqueued(u) => u.meta(), diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index 0875806ac..daa18f480 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -1,7 +1,6 @@ use std::env; use actix_web::HttpServer; -use main_error::MainError; use meilisearch_http::{create_app, Data, Opt}; use structopt::StructOpt; @@ -12,10 +11,7 @@ use meilisearch_http::analytics; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -#[actix_web::main] -async fn main() -> Result<(), MainError> { - let opt = Opt::from_args(); - +fn setup(opt: &Opt) -> anyhow::Result<()> { let mut log_builder = env_logger::Builder::new(); log_builder.parse_filters(&opt.log_level); if opt.log_level == "info" { @@ -25,13 +21,34 @@ async fn main() -> Result<(), MainError> { log_builder.init(); + // Set the tempfile directory in the current db path, to avoid cross device references. Also + // remove the previous outstanding files found there + // + // TODO: if two processes open the same db, one might delete the other tmpdir. Need to make + // sure that no one is using it before deleting it. + let temp_path = opt.db_path.join("tmp"); + // Ignore error if tempdir doesn't exist + let _ = std::fs::remove_dir_all(&temp_path); + std::fs::create_dir_all(&temp_path)?; + if cfg!(windows) { + std::env::set_var("TMP", temp_path); + } else { + std::env::set_var("TMPDIR", temp_path); + } + + Ok(()) +} + +#[actix_web::main] +async fn main() -> anyhow::Result<()> { + let opt = Opt::from_args(); + + setup(&opt)?; + match opt.env.as_ref() { "production" => { if opt.master_key.is_none() { - return Err( - "In production mode, the environment variable MEILI_MASTER_KEY is mandatory" - .into(), - ); + anyhow::bail!("In production mode, the environment variable MEILI_MASTER_KEY is mandatory") } } "development" => (), @@ -54,7 +71,7 @@ async fn main() -> Result<(), MainError> { Ok(()) } -async fn run_http(data: Data, opt: Opt) -> Result<(), Box> { +async fn run_http(data: Data, opt: Opt) -> anyhow::Result<()> { let _enable_dashboard = &opt.env == "development"; let http_server = HttpServer::new(move || create_app!(data, _enable_dashboard)) // Disable signals allows the server to terminate immediately when a user enter CTRL-C diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 39966092e..3a0ab8acb 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -5,7 +5,7 @@ use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use std::{error, fs}; +use std::fs; use byte_unit::Byte; use milli::CompressionType; @@ -184,7 +184,7 @@ pub struct Opt { } impl Opt { - pub fn get_ssl_config(&self) -> Result, Box> { + pub fn get_ssl_config(&self) -> anyhow::Result> { if let (Some(cert_path), Some(key_path)) = (&self.ssl_cert_path, &self.ssl_key_path) { let client_auth = match &self.ssl_auth_path { Some(auth_path) => { @@ -210,7 +210,7 @@ impl Opt { let ocsp = load_ocsp(&self.ssl_ocsp_path)?; config .set_single_cert_with_ocsp_and_sct(certs, privkey, ocsp, vec![]) - .map_err(|_| "bad certificates/private key")?; + .map_err(|_| anyhow::anyhow!("bad certificates/private key"))?; if self.ssl_resumption { config.set_persistence(rustls::ServerSessionMemoryCache::new(256)); @@ -284,25 +284,25 @@ fn total_memory_bytes() -> Option { } } -fn load_certs(filename: PathBuf) -> Result, Box> { - let certfile = fs::File::open(filename).map_err(|_| "cannot open certificate file")?; +fn load_certs(filename: PathBuf) -> anyhow::Result> { + let certfile = fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open certificate file"))?; let mut reader = BufReader::new(certfile); - Ok(certs(&mut reader).map_err(|_| "cannot read certificate file")?) + Ok(certs(&mut reader).map_err(|_| anyhow::anyhow!("cannot read certificate file"))?) } -fn load_private_key(filename: PathBuf) -> Result> { +fn load_private_key(filename: PathBuf) -> anyhow::Result { let rsa_keys = { let keyfile = - fs::File::open(filename.clone()).map_err(|_| "cannot open private key file")?; + fs::File::open(filename.clone()).map_err(|_| anyhow::anyhow!("cannot open private key file"))?; let mut reader = BufReader::new(keyfile); - rsa_private_keys(&mut reader).map_err(|_| "file contains invalid rsa private key")? + rsa_private_keys(&mut reader).map_err(|_| anyhow::anyhow!("file contains invalid rsa private key"))? }; let pkcs8_keys = { - let keyfile = fs::File::open(filename).map_err(|_| "cannot open private key file")?; + let keyfile = fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open private key file"))?; let mut reader = BufReader::new(keyfile); pkcs8_private_keys(&mut reader) - .map_err(|_| "file contains invalid pkcs8 private key (encrypted keys not supported)")? + .map_err(|_| anyhow::anyhow!("file contains invalid pkcs8 private key (encrypted keys not supported)"))? }; // prefer to load pkcs8 keys @@ -314,14 +314,14 @@ fn load_private_key(filename: PathBuf) -> Result) -> Result, Box> { +fn load_ocsp(filename: &Option) -> anyhow::Result> { let mut ret = Vec::new(); if let Some(ref name) = filename { fs::File::open(name) - .map_err(|_| "cannot open ocsp file")? + .map_err(|_| anyhow::anyhow!("cannot open ocsp file"))? .read_to_end(&mut ret) - .map_err(|_| "cannot read oscp file")?; + .map_err(|_| anyhow::anyhow!("cannot read oscp file"))?; } Ok(ret) diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index a4bf465b5..be80a55a0 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -1,12 +1,17 @@ +use actix_web::error::PayloadError; use actix_web::{web, HttpResponse}; +use actix_web::web::Bytes; +use futures::{Stream, StreamExt}; use log::debug; -use milli::update::{IndexDocumentsMethod, UpdateFormat}; +use milli::update::IndexDocumentsMethod; use serde::Deserialize; -use serde_json::Value; +//use serde_json::Value; +use tokio::sync::mpsc; use crate::error::ResponseError; use crate::extractors::authentication::{policies::*, GuardedData}; use crate::extractors::payload::Payload; +use crate::index_controller::{DocumentAdditionFormat, Update}; use crate::routes::IndexParam; use crate::Data; @@ -32,6 +37,17 @@ macro_rules! guard_content_type { guard_content_type!(guard_json, "application/json"); */ +/// This is required because Payload is not Sync nor Send +fn payload_to_stream(mut payload: Payload) -> impl Stream> { + let (snd, recv) = mpsc::channel(1); + tokio::task::spawn_local(async move { + while let Some(data) = payload.next().await { + let _ = snd.send(data).await; + } + }); + tokio_stream::wrappers::ReceiverStream::new(recv) +} + fn guard_json(head: &actix_web::dev::RequestHead) -> bool { if let Some(_content_type) = head.headers.get("Content-Type") { // CURRENTLY AND FOR THIS RELEASE ONLY WE DECIDED TO INTERPRET ALL CONTENT-TYPES AS JSON @@ -60,14 +76,14 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .route(web::get().to(get_all_documents)) .route(web::post().guard(guard_json).to(add_documents)) .route(web::put().guard(guard_json).to(update_documents)) - .route(web::delete().to(clear_all_documents)), + //.route(web::delete().to(clear_all_documents)), ) // this route needs to be before the /documents/{document_id} to match properly - .service(web::resource("/delete-batch").route(web::post().to(delete_documents))) + //.service(web::resource("/delete-batch").route(web::post().to(delete_documents))) .service( web::resource("/{document_id}") .route(web::get().to(get_document)) - .route(web::delete().to(delete_document)), + //.route(web::delete().to(delete_document)), ); } @@ -84,16 +100,16 @@ pub async fn get_document( Ok(HttpResponse::Ok().json(document)) } -pub async fn delete_document( - data: GuardedData, - path: web::Path, -) -> Result { - let update_status = data - .delete_documents(path.index_uid.clone(), vec![path.document_id.clone()]) - .await?; - debug!("returns: {:?}", update_status); - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) -} +//pub async fn delete_document( + //data: GuardedData, + //path: web::Path, +//) -> Result { + //let update_status = data + //.delete_documents(path.index_uid.clone(), vec![path.document_id.clone()]) + //.await?; + //debug!("returns: {:?}", update_status); + //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) +//} #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] @@ -147,14 +163,14 @@ pub async fn add_documents( body: Payload, ) -> Result { debug!("called with params: {:?}", params); + let update = Update::DocumentAddition { + payload: Box::new(payload_to_stream(body)), + primary_key: params.primary_key.clone(), + method: IndexDocumentsMethod::ReplaceDocuments, + format: DocumentAdditionFormat::Json, + }; let update_status = data - .add_documents( - path.into_inner().index_uid, - IndexDocumentsMethod::ReplaceDocuments, - UpdateFormat::Json, - body, - params.primary_key.clone(), - ) + .register_update(path.index_uid.as_str(), update) .await?; debug!("returns: {:?}", update_status); @@ -170,45 +186,45 @@ pub async fn update_documents( body: Payload, ) -> Result { debug!("called with params: {:?}", params); - let update = data - .add_documents( - path.into_inner().index_uid, - IndexDocumentsMethod::UpdateDocuments, - UpdateFormat::Json, - body, - params.primary_key.clone(), - ) + let update = Update::DocumentAddition { + payload: Box::new(payload_to_stream(body)), + primary_key: params.primary_key.clone(), + method: IndexDocumentsMethod::UpdateDocuments, + format: DocumentAdditionFormat::Json, + }; + let update_status = data + .register_update(path.index_uid.as_str(), update) .await?; - debug!("returns: {:?}", update); - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update.id() }))) -} - -pub async fn delete_documents( - data: GuardedData, - path: web::Path, - body: web::Json>, -) -> Result { - debug!("called with params: {:?}", body); - let ids = body - .iter() - .map(|v| { - v.as_str() - .map(String::from) - .unwrap_or_else(|| v.to_string()) - }) - .collect(); - - let update_status = data.delete_documents(path.index_uid.clone(), ids).await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } -pub async fn clear_all_documents( - data: GuardedData, - path: web::Path, -) -> Result { - let update_status = data.clear_documents(path.index_uid.clone()).await?; - debug!("returns: {:?}", update_status); - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) -} +//pub async fn delete_documents( + //data: GuardedData, + //path: web::Path, + //body: web::Json>, +//) -> Result { + //debug!("called with params: {:?}", body); + //let ids = body + //.iter() + //.map(|v| { + //v.as_str() + //.map(String::from) + //.unwrap_or_else(|| v.to_string()) + //}) + //.collect(); + + //let update_status = data.delete_documents(path.index_uid.clone(), ids).await?; + //debug!("returns: {:?}", update_status); + //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) +//} + +//pub async fn clear_all_documents( + //data: GuardedData, + //path: web::Path, +//) -> Result { + //let update_status = data.clear_documents(path.index_uid.clone()).await?; + //debug!("returns: {:?}", update_status); + //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) +//} diff --git a/meilisearch-http/src/routes/indexes/mod.rs b/meilisearch-http/src/routes/indexes/mod.rs index 8314bf032..ef68215b4 100644 --- a/meilisearch-http/src/routes/indexes/mod.rs +++ b/meilisearch-http/src/routes/indexes/mod.rs @@ -17,7 +17,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") .route(web::get().to(list_indexes)) - .route(web::post().to(create_index)), + //.route(web::post().to(create_index)), ) .service( web::scope("/{index_uid}") @@ -25,13 +25,13 @@ pub fn configure(cfg: &mut web::ServiceConfig) { web::resource("") .route(web::get().to(get_index)) .route(web::put().to(update_index)) - .route(web::delete().to(delete_index)), + //.route(web::delete().to(delete_index)), ) .service(web::resource("/stats").route(web::get().to(get_index_stats))) .service(web::scope("/documents").configure(documents::configure)) .service(web::scope("/search").configure(search::configure)) .service(web::scope("/updates").configure(updates::configure)) - .service(web::scope("/settings").configure(settings::configure)), + //.service(web::scope("/settings").configure(settings::configure)), ); } @@ -48,14 +48,14 @@ pub struct IndexCreateRequest { primary_key: Option, } -pub async fn create_index( - data: GuardedData, - body: web::Json, -) -> Result { - let body = body.into_inner(); - let meta = data.create_index(body.uid, body.primary_key).await?; - Ok(HttpResponse::Created().json(meta)) -} +//pub async fn create_index( + //data: GuardedData, + //body: web::Json, +//) -> Result { + //let body = body.into_inner(); + //let meta = data.create_index(body.uid, body.primary_key).await?; + //Ok(HttpResponse::Created().json(meta)) +//} #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] @@ -97,13 +97,13 @@ pub async fn update_index( Ok(HttpResponse::Ok().json(meta)) } -pub async fn delete_index( - data: GuardedData, - path: web::Path, -) -> Result { - data.delete_index(path.index_uid.clone()).await?; - Ok(HttpResponse::NoContent().finish()) -} +//pub async fn delete_index( + //data: GuardedData, + //path: web::Path, +//) -> Result { + //data.delete_index(path.index_uid.clone()).await?; + //Ok(HttpResponse::NoContent().finish()) +//} pub async fn get_index_stats( data: GuardedData, diff --git a/meilisearch-http/src/routes/indexes/settings.rs b/meilisearch-http/src/routes/indexes/settings.rs index 05a4f308f..051483b20 100644 --- a/meilisearch-http/src/routes/indexes/settings.rs +++ b/meilisearch-http/src/routes/indexes/settings.rs @@ -1,185 +1,184 @@ -use actix_web::{web, HttpResponse}; -use log::debug; +//use log::debug; -use crate::extractors::authentication::{policies::*, GuardedData}; -use crate::index::Settings; -use crate::Data; -use crate::{error::ResponseError, index::Unchecked}; +//use crate::extractors::authentication::{policies::*, GuardedData}; +//use crate::index::Settings; +//use crate::Data; +//use crate::error::ResponseError; -#[macro_export] -macro_rules! make_setting_route { - ($route:literal, $type:ty, $attr:ident, $camelcase_attr:literal) => { - pub mod $attr { - use log::debug; - use actix_web::{web, HttpResponse, Resource}; +//#[macro_export] +//macro_rules! make_setting_route { + //($route:literal, $type:ty, $attr:ident, $camelcase_attr:literal) => { + //pub mod $attr { + //use log::debug; + //use actix_web::{web, HttpResponse, Resource}; - use milli::update::Setting; + //use milli::update::Setting; - use crate::data; - use crate::error::ResponseError; - use crate::index::Settings; - use crate::extractors::authentication::{GuardedData, policies::*}; + //use crate::data; + //use crate::error::ResponseError; + //use crate::index::Settings; + //use crate::extractors::authentication::{GuardedData, policies::*}; - pub async fn delete( - data: GuardedData, - index_uid: web::Path, - ) -> Result { - use crate::index::Settings; - let settings = Settings { - $attr: Setting::Reset, - ..Default::default() - }; - let update_status = data.update_settings(index_uid.into_inner(), settings, false).await?; - debug!("returns: {:?}", update_status); - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + //pub async fn delete( + //data: GuardedData, + //index_uid: web::Path, + //) -> Result { + //use crate::index::Settings; + //let settings = Settings { + //$attr: Setting::Reset, + //..Default::default() + //}; + //let update_status = data.update_settings(index_uid.into_inner(), settings, false).await?; + //debug!("returns: {:?}", update_status); + //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) + //} - pub async fn update( - data: GuardedData, - index_uid: actix_web::web::Path, - body: actix_web::web::Json>, - ) -> std::result::Result { - let settings = Settings { - $attr: match body.into_inner() { - Some(inner_body) => Setting::Set(inner_body), - None => Setting::Reset - }, - ..Default::default() - }; + //pub async fn update( + //data: GuardedData, + //index_uid: actix_web::web::Path, + //body: actix_web::web::Json>, + //) -> std::result::Result { + //let settings = Settings { + //$attr: match body.into_inner() { + //Some(inner_body) => Setting::Set(inner_body), + //None => Setting::Reset + //}, + //..Default::default() + //}; - let update_status = data.update_settings(index_uid.into_inner(), settings, true).await?; - debug!("returns: {:?}", update_status); - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + //let update_status = data.update_settings(index_uid.into_inner(), settings, true).await?; + //debug!("returns: {:?}", update_status); + //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) + //} - pub async fn get( - data: GuardedData, - index_uid: actix_web::web::Path, - ) -> std::result::Result { - let settings = data.settings(index_uid.into_inner()).await?; - debug!("returns: {:?}", settings); - let mut json = serde_json::json!(&settings); - let val = json[$camelcase_attr].take(); - Ok(HttpResponse::Ok().json(val)) - } + //pub async fn get( + //data: GuardedData, + //index_uid: actix_web::web::Path, + //) -> std::result::Result { + //let settings = data.settings(index_uid.into_inner()).await?; + //debug!("returns: {:?}", settings); + //let mut json = serde_json::json!(&settings); + //let val = json[$camelcase_attr].take(); + //Ok(HttpResponse::Ok().json(val)) + //} - pub fn resources() -> Resource { - Resource::new($route) - .route(web::get().to(get)) - .route(web::post().to(update)) - .route(web::delete().to(delete)) - } - } - }; -} + //pub fn resources() -> Resource { + //Resource::new($route) + //.route(web::get().to(get)) + //.route(web::post().to(update)) + //.route(web::delete().to(delete)) + //} + //} + //}; +//} -make_setting_route!( - "/filterable-attributes", - std::collections::BTreeSet, - filterable_attributes, - "filterableAttributes" -); +//make_setting_route!( + //"/filterable-attributes", + //std::collections::BTreeSet, + //filterable_attributes, + //"filterableAttributes" +//); -make_setting_route!( - "/sortable-attributes", - std::collections::BTreeSet, - sortable_attributes, - "sortableAttributes" -); +//make_setting_route!( + //"/sortable-attributes", + //std::collections::BTreeSet, + //sortable_attributes, + //"sortableAttributes" +//); -make_setting_route!( - "/displayed-attributes", - Vec, - displayed_attributes, - "displayedAttributes" -); +//make_setting_route!( + //"/displayed-attributes", + //Vec, + //displayed_attributes, + //"displayedAttributes" +//); -make_setting_route!( - "/searchable-attributes", - Vec, - searchable_attributes, - "searchableAttributes" -); +//make_setting_route!( + //"/searchable-attributes", + //Vec, + //searchable_attributes, + //"searchableAttributes" +//); -make_setting_route!( - "/stop-words", - std::collections::BTreeSet, - stop_words, - "stopWords" -); +//make_setting_route!( + //"/stop-words", + //std::collections::BTreeSet, + //stop_words, + //"stopWords" +//); -make_setting_route!( - "/synonyms", - std::collections::BTreeMap>, - synonyms, - "synonyms" -); +//make_setting_route!( + //"/synonyms", + //std::collections::BTreeMap>, + //synonyms, + //"synonyms" +//); -make_setting_route!( - "/distinct-attribute", - String, - distinct_attribute, - "distinctAttribute" -); +//make_setting_route!( + //"/distinct-attribute", + //String, + //distinct_attribute, + //"distinctAttribute" +//); -make_setting_route!("/ranking-rules", Vec, ranking_rules, "rankingRules"); +//make_setting_route!("/ranking-rules", Vec, ranking_rules, "rankingRules"); -macro_rules! generate_configure { - ($($mod:ident),*) => { - pub fn configure(cfg: &mut web::ServiceConfig) { - cfg.service( - web::resource("") - .route(web::post().to(update_all)) - .route(web::get().to(get_all)) - .route(web::delete().to(delete_all))) - $(.service($mod::resources()))*; - } - }; -} +//macro_rules! generate_configure { + //($($mod:ident),*) => { + //pub fn configure(cfg: &mut web::ServiceConfig) { + //cfg.service( + //web::resource("") + ////.route(web::post().to(update_all)) + //.route(web::get().to(get_all)) + ////.route(web::delete().to(delete_all))) + //$(.service($mod::resources()))*; + //} + //}; +//} -generate_configure!( - filterable_attributes, - sortable_attributes, - displayed_attributes, - searchable_attributes, - distinct_attribute, - stop_words, - synonyms, - ranking_rules -); +//generate_configure!( + //filterable_attributes, + //sortable_attributes, + //displayed_attributes, + //searchable_attributes, + //distinct_attribute, + //stop_words, + //synonyms, + //ranking_rules +//); -pub async fn update_all( - data: GuardedData, - index_uid: web::Path, - body: web::Json>, -) -> Result { - let settings = body.into_inner().check(); - let update_result = data - .update_settings(index_uid.into_inner(), settings, true) - .await?; - let json = serde_json::json!({ "updateId": update_result.id() }); - debug!("returns: {:?}", json); - Ok(HttpResponse::Accepted().json(json)) -} +//pub async fn update_all( + //data: GuardedData, + //index_uid: web::Path, + //body: web::Json>, +//) -> Result { + //let settings = body.into_inner().check(); + //let update_result = data + //.update_settings(index_uid.into_inner(), settings, true) + //.await?; + //let json = serde_json::json!({ "updateId": update_result.id() }); + //debug!("returns: {:?}", json); + //Ok(HttpResponse::Accepted().json(json)) +//} -pub async fn get_all( - data: GuardedData, - index_uid: web::Path, -) -> Result { - let settings = data.settings(index_uid.into_inner()).await?; - debug!("returns: {:?}", settings); - Ok(HttpResponse::Ok().json(settings)) -} +//pub async fn get_all( + //data: GuardedData, + //index_uid: web::Path, +//) -> Result { + //let settings = data.settings(index_uid.into_inner()).await?; + //debug!("returns: {:?}", settings); + //Ok(HttpResponse::Ok().json(settings)) +//} -pub async fn delete_all( - data: GuardedData, - index_uid: web::Path, -) -> Result { - let settings = Settings::cleared(); - let update_result = data - .update_settings(index_uid.into_inner(), settings, false) - .await?; - let json = serde_json::json!({ "updateId": update_result.id() }); - debug!("returns: {:?}", json); - Ok(HttpResponse::Accepted().json(json)) -} +//pub async fn delete_all( + //data: GuardedData, + //index_uid: web::Path, +//) -> Result { + //let settings = Settings::cleared(); + //let update_result = data + //.update_settings(index_uid.into_inner(), settings, false) + //.await?; + //let json = serde_json::json!({ "updateId": update_result.id() }); + //debug!("returns: {:?}", json); + //Ok(HttpResponse::Accepted().json(json)) +//} diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 2bacf9ed6..e6119ffe9 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -8,7 +8,8 @@ use serde::{Deserialize, Serialize}; use crate::error::ResponseError; use crate::extractors::authentication::{policies::*, GuardedData}; use crate::index::{Settings, Unchecked}; -use crate::index_controller::{UpdateMeta, UpdateResult, UpdateStatus}; +use crate::index_controller::update_actor::RegisterUpdate; +use crate::index_controller::{UpdateResult, UpdateStatus}; use crate::Data; mod dump; @@ -50,7 +51,7 @@ impl From<&UpdateStatus> for UpdateType { fn from(other: &UpdateStatus) -> Self { use milli::update::IndexDocumentsMethod::*; match other.meta() { - UpdateMeta::DocumentsAddition { method, .. } => { + RegisterUpdate::DocumentAddition{ method, .. } => { let number = match other { UpdateStatus::Processed(processed) => match processed.success { UpdateResult::DocumentsAddition(ref addition) => { @@ -67,13 +68,13 @@ impl From<&UpdateStatus> for UpdateType { _ => unreachable!(), } } - UpdateMeta::ClearDocuments => UpdateType::ClearAll, - UpdateMeta::DeleteDocuments { ids } => UpdateType::DocumentsDeletion { - number: Some(ids.len()), - }, - UpdateMeta::Settings(settings) => UpdateType::Settings { - settings: settings.clone(), - }, + //UpdateMeta::ClearDocuments => UpdateType::ClearAll, + //UpdateMeta::DeleteDocuments { ids } => UpdateType::DocumentsDeletion { + //number: Some(ids.len()), + //}, + //UpdateMeta::Settings(settings) => UpdateType::Settings { + //settings: settings.clone(), + //}, } } }