diff --git a/Cargo.lock b/Cargo.lock index f27dc423a..7e98ffb68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -825,6 +825,12 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2c9736e15e7df1638a7f6eee92a6511615c738246a052af5ba86f039b65aede" +[[package]] +name = "difference" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" + [[package]] name = "digest" version = "0.8.1" @@ -849,6 +855,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "downcast" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" + [[package]] name = "either" version = "1.6.1" @@ -933,6 +945,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -949,6 +970,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2" + [[package]] name = "fs_extra" version = "1.2.0" @@ -1686,6 +1713,7 @@ dependencies = [ "meilisearch-tokenizer", "milli", "mime", + "mockall", "num_cpus", "obkv", "once_cell", @@ -1755,8 +1783,8 @@ dependencies = [ [[package]] name = "milli" -version = "0.17.0" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.0#22551d0941bee1a9cdcf7d5bfc4ca46517dd25f3" +version = "0.17.2" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.2#07fb6d64e579b17e6565e9aa7f444e1b03802f4a" dependencies = [ "bimap", "bincode", @@ -1845,6 +1873,39 @@ dependencies = [ "winapi", ] +[[package]] +name = "mockall" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ab571328afa78ae322493cacca3efac6a0f2e0a67305b4df31fd439ef129ac0" +dependencies = [ + "cfg-if 1.0.0", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033" +dependencies = [ + "cfg-if 1.0.0", + "proc-macro2 1.0.29", + "quote 1.0.9", + "syn 1.0.77", +] + +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "ntapi" version = "0.3.6" @@ -2117,6 +2178,35 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "predicates" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df" +dependencies = [ + "difference", + "float-cmp", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451" + +[[package]] +name = "predicates-tree" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7dd0fd014130206c9352efbdc92be592751b2b9274dff685348341082c6ea3d" +dependencies = [ + "predicates-core", + "treeline", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3042,6 +3132,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "treeline" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41" + [[package]] name = "try-lock" version = "0.2.3" diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index cbe963615..1f2f4742d 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -11,21 +11,31 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, thiserror::Error)] pub enum MeilisearchHttpError { - #[error("A Content-Type header is missing. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")] - MissingContentType, - #[error("The Content-Type \"{0}\" is invalid. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")] - InvalidContentType(String), + #[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}", + .0.iter().map(|s| format!("\"{}\"", s)).collect::>().join(", "))] + MissingContentType(Vec), + #[error( + "The Content-Type \"{0}\" is invalid. Accepted values for the Content-Type header are: {}", + .1.iter().map(|s| format!("\"{}\"", s)).collect::>().join(", ") + )] + InvalidContentType(String, Vec), } impl ErrorCode for MeilisearchHttpError { fn error_code(&self) -> Code { match self { - MeilisearchHttpError::MissingContentType => Code::MissingContentType, - MeilisearchHttpError::InvalidContentType(_) => Code::InvalidContentType, + MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType, + MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType, } } } +impl From for aweb::Error { + fn from(other: MeilisearchHttpError) -> Self { + aweb::Error::from(ResponseError::from(other)) + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct ResponseError { @@ -121,9 +131,8 @@ impl From for PayloadError { } } -pub fn payload_error_handler(err: E) -> ResponseError -where - E: Into, -{ - err.into().into() +impl From for aweb::Error { + fn from(other: PayloadError) -> Self { + aweb::Error::from(ResponseError::from(other)) + } } diff --git a/meilisearch-http/src/extractors/authentication/mod.rs b/meilisearch-http/src/extractors/authentication/mod.rs index 4d7b21ae1..d90eb4277 100644 --- a/meilisearch-http/src/extractors/authentication/mod.rs +++ b/meilisearch-http/src/extractors/authentication/mod.rs @@ -168,7 +168,8 @@ impl FromRequest for GuardedData None => err(AuthenticationError::IrretrievableState.into()), } } else { - err(AuthenticationError::InvalidToken(String::from("hello")).into()) + let token = token.to_str().unwrap_or("unknown").to_string(); + err(AuthenticationError::InvalidToken(token).into()) } } None => err(AuthenticationError::MissingAuthorizationHeader.into()), diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 0b9dacbe0..cbb417ab1 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -11,10 +11,14 @@ pub mod routes; use std::path::Path; use std::time::Duration; +use crate::error::MeilisearchHttpError; use crate::extractors::authentication::AuthConfig; +use actix_web::error::JsonPayloadError; +use error::PayloadError; +use http::header::CONTENT_TYPE; pub use option::Opt; -use actix_web::web; +use actix_web::{web, HttpRequest}; use extractors::authentication::policies::*; use extractors::payload::PayloadConfig; @@ -98,14 +102,25 @@ pub fn configure_data(config: &mut web::ServiceConfig, data: MeiliSearch, opt: & .app_data(data) .app_data( web::JsonConfig::default() - .limit(http_payload_size_limit) - .content_type(|_mime| true) // Accept all mime types - .error_handler(|err, _req| error::payload_error_handler(err).into()), + .content_type(|mime| mime == mime::APPLICATION_JSON) + .error_handler(|err, req: &HttpRequest| match err { + JsonPayloadError::ContentType => match req.headers().get(CONTENT_TYPE) { + Some(content_type) => MeilisearchHttpError::InvalidContentType( + content_type.to_str().unwrap_or("unknown").to_string(), + vec![mime::APPLICATION_JSON.to_string()], + ) + .into(), + None => MeilisearchHttpError::MissingContentType(vec![ + mime::APPLICATION_JSON.to_string(), + ]) + .into(), + }, + err => PayloadError::from(err).into(), + }), ) .app_data(PayloadConfig::new(http_payload_size_limit)) .app_data( - web::QueryConfig::default() - .error_handler(|err, _req| error::payload_error_handler(err).into()), + web::QueryConfig::default().error_handler(|err, _req| PayloadError::from(err).into()), ); } @@ -180,6 +195,7 @@ macro_rules! create_app { use actix_web::middleware::TrailingSlash; use actix_web::App; use actix_web::{middleware, web}; + use meilisearch_http::error::{MeilisearchHttpError, ResponseError}; use meilisearch_http::routes; use meilisearch_http::{configure_auth, configure_data, dashboard}; diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 50ba31c97..e890bab24 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -6,6 +6,7 @@ use log::debug; use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update}; use meilisearch_lib::milli::update::IndexDocumentsMethod; use meilisearch_lib::MeiliSearch; +use once_cell::sync::Lazy; use serde::Deserialize; use serde_json::Value; use tokio::sync::mpsc; @@ -176,14 +177,29 @@ async fn document_addition( body: Payload, method: IndexDocumentsMethod, ) -> Result { + static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { + vec![ + "application/json".to_string(), + "application/x-ndjson".to_string(), + "application/csv".to_string(), + ] + }); let format = match content_type { Some("application/json") => DocumentAdditionFormat::Json, Some("application/x-ndjson") => DocumentAdditionFormat::Ndjson, Some("text/csv") => DocumentAdditionFormat::Csv, Some(other) => { - return Err(MeilisearchHttpError::InvalidContentType(other.to_string()).into()) + return Err(MeilisearchHttpError::InvalidContentType( + other.to_string(), + ACCEPTED_CONTENT_TYPE.clone(), + ) + .into()) + } + None => { + return Err( + MeilisearchHttpError::MissingContentType(ACCEPTED_CONTENT_TYPE.clone()).into(), + ) } - None => return Err(MeilisearchHttpError::MissingContentType.into()), }; let update = Update::DocumentAddition { diff --git a/meilisearch-http/tests/content_type.rs b/meilisearch-http/tests/content_type.rs new file mode 100644 index 000000000..5402a7cd6 --- /dev/null +++ b/meilisearch-http/tests/content_type.rs @@ -0,0 +1,111 @@ +#![allow(dead_code)] + +mod common; + +use crate::common::Server; +use actix_web::test; +use meilisearch_http::create_app; +use serde_json::{json, Value}; + +#[actix_rt::test] +async fn strict_json_bad_content_type() { + let routes = [ + // all the POST routes except the dumps that can be created without any body or content-type + // and the search that is not a strict json + "/indexes", + "/indexes/doggo/documents/delete-batch", + "/indexes/doggo/search", + "/indexes/doggo/settings", + "/indexes/doggo/settings/displayed-attributes", + "/indexes/doggo/settings/distinct-attribute", + "/indexes/doggo/settings/filterable-attributes", + "/indexes/doggo/settings/ranking-rules", + "/indexes/doggo/settings/searchable-attributes", + "/indexes/doggo/settings/sortable-attributes", + "/indexes/doggo/settings/stop-words", + "/indexes/doggo/settings/synonyms", + ]; + let bad_content_types = [ + "application/csv", + "application/x-ndjson", + "application/x-www-form-urlencoded", + "text/plain", + "json", + "application", + "json/application", + ]; + + let document = "{}"; + let server = Server::new().await; + let app = test::init_service(create_app!( + &server.service.meilisearch, + true, + &server.service.options + )) + .await; + for route in routes { + // Good content-type, we probably have an error since we didn't send anything in the json + // so we only ensure we didn't get a bad media type error. + let req = test::TestRequest::post() + .uri(route) + .set_payload(document) + .insert_header(("content-type", "application/json")) + .to_request(); + let res = test::call_service(&app, req).await; + let status_code = res.status(); + assert_ne!(status_code, 415, + "calling the route `{}` with a content-type of json isn't supposed to throw a bad media type error", route); + + // No content-type. + let req = test::TestRequest::post() + .uri(route) + .set_payload(document) + .to_request(); + let res = test::call_service(&app, req).await; + let status_code = res.status(); + let body = test::read_body(res).await; + let response: Value = serde_json::from_slice(&body).unwrap_or_default(); + assert_eq!(status_code, 415, "calling the route `{}` without content-type is supposed to throw a bad media type error", route); + assert_eq!( + response, + json!({ + "message": r#"A Content-Type header is missing. Accepted values for the Content-Type header are: "application/json""#, + "errorCode": "missing_content_type", + "errorType": "invalid_request_error", + "errorLink": "https://docs.meilisearch.com/errors#missing_content_type", + }), + "when calling the route `{}` with no content-type", + route, + ); + + for bad_content_type in bad_content_types { + // Always bad content-type + let req = test::TestRequest::post() + .uri(route) + .set_payload(document.to_string()) + .insert_header(("content-type", bad_content_type)) + .to_request(); + let res = test::call_service(&app, req).await; + let status_code = res.status(); + let body = test::read_body(res).await; + let response: Value = serde_json::from_slice(&body).unwrap_or_default(); + assert_eq!(status_code, 415); + let expected_error_message = format!( + r#"The Content-Type "{}" is invalid. Accepted values for the Content-Type header are: "application/json""#, + bad_content_type + ); + assert_eq!( + response, + json!({ + "message": expected_error_message, + "errorCode": "invalid_content_type", + "errorType": "invalid_request_error", + "errorLink": "https://docs.meilisearch.com/errors#invalid_content_type", + }), + "when calling the route `{}` with a content-type of `{}`", + route, + bad_content_type, + ); + } + } +} diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 13265dcfd..a9189a30c 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -22,6 +22,7 @@ async fn add_documents_test_json_content_types() { &server.service.options )) .await; + // post let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) @@ -33,6 +34,19 @@ async fn add_documents_test_json_content_types() { let response: Value = serde_json::from_slice(&body).unwrap_or_default(); assert_eq!(status_code, 202); assert_eq!(response, json!({ "updateId": 0 })); + + // put + let req = test::TestRequest::put() + .uri("/indexes/dog/documents") + .set_payload(document.to_string()) + .insert_header(("content-type", "application/json")) + .to_request(); + let res = test::call_service(&app, req).await; + let status_code = res.status(); + let body = test::read_body(res).await; + let response: Value = serde_json::from_slice(&body).unwrap_or_default(); + assert_eq!(status_code, 202); + assert_eq!(response, json!({ "updateId": 1 })); } /// no content type is still supposed to be accepted as json @@ -52,6 +66,7 @@ async fn add_documents_test_no_content_types() { &server.service.options )) .await; + // post let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) @@ -63,11 +78,23 @@ async fn add_documents_test_no_content_types() { let response: Value = serde_json::from_slice(&body).unwrap_or_default(); assert_eq!(status_code, 202); assert_eq!(response, json!({ "updateId": 0 })); + + // put + let req = test::TestRequest::put() + .uri("/indexes/dog/documents") + .set_payload(document.to_string()) + .insert_header(("content-type", "application/json")) + .to_request(); + let res = test::call_service(&app, req).await; + let status_code = res.status(); + let body = test::read_body(res).await; + let response: Value = serde_json::from_slice(&body).unwrap_or_default(); + assert_eq!(status_code, 202); + assert_eq!(response, json!({ "updateId": 1 })); } /// any other content-type is must be refused #[actix_rt::test] -#[ignore] async fn add_documents_test_bad_content_types() { let document = json!([ { @@ -83,6 +110,7 @@ async fn add_documents_test_bad_content_types() { &server.service.options )) .await; + // post let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) @@ -91,8 +119,32 @@ async fn add_documents_test_bad_content_types() { let res = test::call_service(&app, req).await; let status_code = res.status(); let body = test::read_body(res).await; - assert_eq!(status_code, 405); - assert!(body.is_empty()); + let response: Value = serde_json::from_slice(&body).unwrap_or_default(); + assert_eq!(status_code, 415); + assert_eq!( + response["message"], + json!( + r#"The Content-Type "text/plain" is invalid. Accepted values for the Content-Type header are: "application/json", "application/x-ndjson", "application/csv""# + ) + ); + + // put + let req = test::TestRequest::put() + .uri("/indexes/dog/documents") + .set_payload(document.to_string()) + .insert_header(("content-type", "text/plain")) + .to_request(); + let res = test::call_service(&app, req).await; + let status_code = res.status(); + let body = test::read_body(res).await; + let response: Value = serde_json::from_slice(&body).unwrap_or_default(); + assert_eq!(status_code, 415); + assert_eq!( + response["message"], + json!( + r#"The Content-Type "text/plain" is invalid. Accepted values for the Content-Type header are: "application/json", "application/x-ndjson", "application/csv""# + ) + ); } #[actix_rt::test] diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index bc50e3bb9..098b091e9 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -30,7 +30,7 @@ lazy_static = "1.4.0" log = "0.4.14" meilisearch-error = { path = "../meilisearch-error" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.0"} +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.2" } mime = "0.3.16" num_cpus = "1.13.0" once_cell = "1.8.0" @@ -59,4 +59,5 @@ derivative = "2.2.0" [dev-dependencies] actix-rt = "2.2.0" +mockall = "0.10.2" paste = "1.0.5" diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index a48d9b834..f37777206 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -13,7 +13,7 @@ use crate::index::update_handler::UpdateHandler; use crate::index::updates::apply_settings_to_builder; use super::error::Result; -use super::{Index, Settings, Unchecked}; +use super::{index::Index, Settings, Unchecked}; #[derive(Serialize, Deserialize)] struct DumpMeta { diff --git a/meilisearch-lib/src/index/index.rs b/meilisearch-lib/src/index/index.rs new file mode 100644 index 000000000..565c7c4b5 --- /dev/null +++ b/meilisearch-lib/src/index/index.rs @@ -0,0 +1,286 @@ +use std::collections::{BTreeSet, HashSet}; +use std::fs::create_dir_all; +use std::marker::PhantomData; +use std::ops::Deref; +use std::path::Path; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use heed::{EnvOpenOptions, RoTxn}; +use milli::update::Setting; +use milli::{obkv_to_json, FieldDistribution, FieldId}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use uuid::Uuid; + +use crate::index_controller::update_file_store::UpdateFileStore; +use crate::EnvSizer; + +use super::error::IndexError; +use super::error::Result; +use super::update_handler::UpdateHandler; +use super::{Checked, Settings}; + +pub type Document = Map; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMeta { + created_at: DateTime, + pub updated_at: DateTime, + pub primary_key: Option, +} + +impl IndexMeta { + pub fn new(index: &Index) -> Result { + let txn = index.read_txn()?; + Self::new_txn(index, &txn) + } + + pub fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + let created_at = index.created_at(txn)?; + let updated_at = index.updated_at(txn)?; + let primary_key = index.primary_key(txn)?.map(String::from); + Ok(Self { + created_at, + updated_at, + primary_key, + }) + } +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct IndexStats { + #[serde(skip)] + pub size: u64, + pub number_of_documents: u64, + /// Whether the current index is performing an update. It is initially `None` when the + /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is + /// later set to either true or false, we we retrieve the information from the `UpdateStore` + pub is_indexing: Option, + pub field_distribution: FieldDistribution, +} + +#[derive(Clone, derivative::Derivative)] +#[derivative(Debug)] +pub struct Index { + pub uuid: Uuid, + #[derivative(Debug = "ignore")] + pub inner: Arc, + #[derivative(Debug = "ignore")] + pub update_file_store: Arc, + #[derivative(Debug = "ignore")] + pub update_handler: Arc, +} + +impl Deref for Index { + type Target = milli::Index; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} + +impl Index { + pub fn open( + path: impl AsRef, + size: usize, + update_file_store: Arc, + uuid: Uuid, + update_handler: Arc, + ) -> Result { + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let inner = Arc::new(milli::Index::new(options, &path)?); + Ok(Index { + inner, + update_file_store, + uuid, + update_handler, + }) + } + + pub fn inner(&self) -> &milli::Index { + &self.inner + } + + pub fn stats(&self) -> Result { + let rtxn = self.read_txn()?; + + Ok(IndexStats { + size: self.size(), + number_of_documents: self.number_of_documents(&rtxn)?, + is_indexing: None, + field_distribution: self.field_distribution(&rtxn)?, + }) + } + + pub fn meta(&self) -> Result { + IndexMeta::new(self) + } + pub fn settings(&self) -> Result> { + let txn = self.read_txn()?; + self.settings_txn(&txn) + } + + pub fn uuid(&self) -> Uuid { + self.uuid + } + + pub fn settings_txn(&self, txn: &RoTxn) -> Result> { + let displayed_attributes = self + .displayed_fields(txn)? + .map(|fields| fields.into_iter().map(String::from).collect()); + + let searchable_attributes = self + .searchable_fields(txn)? + .map(|fields| fields.into_iter().map(String::from).collect()); + + let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); + + let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); + + let criteria = self + .criteria(txn)? + .into_iter() + .map(|c| c.to_string()) + .collect(); + + let stop_words = self + .stop_words(txn)? + .map(|stop_words| -> Result> { + Ok(stop_words.stream().into_strs()?.into_iter().collect()) + }) + .transpose()? + .unwrap_or_else(BTreeSet::new); + let distinct_field = self.distinct_field(txn)?.map(String::from); + + // in milli each word in the synonyms map were split on their separator. Since we lost + // this information we are going to put space between words. + let synonyms = self + .synonyms(txn)? + .iter() + .map(|(key, values)| { + ( + key.join(" "), + values.iter().map(|value| value.join(" ")).collect(), + ) + }) + .collect(); + + Ok(Settings { + displayed_attributes: match displayed_attributes { + Some(attrs) => Setting::Set(attrs), + None => Setting::Reset, + }, + searchable_attributes: match searchable_attributes { + Some(attrs) => Setting::Set(attrs), + None => Setting::Reset, + }, + filterable_attributes: Setting::Set(filterable_attributes), + sortable_attributes: Setting::Set(sortable_attributes), + ranking_rules: Setting::Set(criteria), + stop_words: Setting::Set(stop_words), + distinct_attribute: match distinct_field { + Some(field) => Setting::Set(field), + None => Setting::Reset, + }, + synonyms: Setting::Set(synonyms), + _kind: PhantomData, + }) + } + + pub fn retrieve_documents>( + &self, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result>> { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + let fields_to_display = + self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; + + let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit); + + let mut documents = Vec::new(); + + for entry in iter { + let (_id, obkv) = entry?; + let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?; + documents.push(object); + } + + Ok(documents) + } + + pub fn retrieve_document>( + &self, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result> { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + + let fields_to_display = + self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; + + let internal_id = self + .external_documents_ids(&txn)? + .get(doc_id.as_bytes()) + .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; + + let document = self + .documents(&txn, std::iter::once(internal_id))? + .into_iter() + .next() + .map(|(_, d)| d) + .ok_or(IndexError::DocumentNotFound(doc_id))?; + + let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?; + + Ok(document) + } + + pub fn size(&self) -> u64 { + self.env.size() + } + + fn fields_to_display>( + &self, + txn: &heed::RoTxn, + attributes_to_retrieve: &Option>, + fields_ids_map: &milli::FieldsIdsMap, + ) -> Result> { + let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? { + Some(ids) => ids.into_iter().collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); + Ok(displayed_fields_ids) + } + + pub fn snapshot(&self, path: impl AsRef) -> Result<()> { + let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); + create_dir_all(&dst)?; + dst.push("data.mdb"); + let _txn = self.write_txn()?; + self.inner + .env + .copy_to_path(dst, heed::CompactionOption::Enabled)?; + Ok(()) + } +} diff --git a/meilisearch-lib/src/index/mod.rs b/meilisearch-lib/src/index/mod.rs index 899c830a5..613c60f7d 100644 --- a/meilisearch-lib/src/index/mod.rs +++ b/meilisearch-lib/src/index/mod.rs @@ -1,287 +1,365 @@ -use std::collections::{BTreeSet, HashSet}; -use std::fs::create_dir_all; -use std::marker::PhantomData; -use std::ops::Deref; -use std::path::Path; -use std::sync::Arc; - -use chrono::{DateTime, Utc}; -use heed::{EnvOpenOptions, RoTxn}; -use milli::update::Setting; -use milli::{obkv_to_json, FieldDistribution, FieldId}; -use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; - -use error::Result; pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked}; -use uuid::Uuid; - -use crate::index_controller::update_file_store::UpdateFileStore; -use crate::EnvSizer; - -use self::error::IndexError; -use self::update_handler::UpdateHandler; - -pub mod error; -pub mod update_handler; mod dump; +pub mod error; mod search; +pub mod update_handler; mod updates; -pub type Document = Map; +#[allow(clippy::module_inception)] +mod index; -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - created_at: DateTime, - pub updated_at: DateTime, - pub primary_key: Option, -} +pub use index::{Document, IndexMeta, IndexStats}; -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct IndexStats { - #[serde(skip)] - pub size: u64, - pub number_of_documents: u64, - /// Whether the current index is performing an update. It is initially `None` when the - /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is - /// later set to either true or false, we we retrieve the information from the `UpdateStore` - pub is_indexing: Option, - pub field_distribution: FieldDistribution, -} +#[cfg(not(test))] +pub use index::Index; -impl IndexMeta { - pub fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) +#[cfg(test)] +pub use test::MockIndex as Index; + +/// The index::test module provides means of mocking an index instance. I can be used throughout the +/// code for unit testing, in places where an index would normally be used. +#[cfg(test)] +pub mod test { + use std::any::Any; + use std::collections::HashMap; + use std::panic::{RefUnwindSafe, UnwindSafe}; + use std::path::Path; + use std::path::PathBuf; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, Mutex}; + + use serde_json::{Map, Value}; + use uuid::Uuid; + + use crate::index_controller::update_file_store::UpdateFileStore; + use crate::index_controller::updates::status::{Failed, Processed, Processing}; + + use super::error::Result; + use super::index::Index; + use super::update_handler::UpdateHandler; + use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings}; + + pub struct Stub { + name: String, + times: Mutex>, + stub: Box R + Sync + Send>, + invalidated: AtomicBool, } - fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { - let created_at = index.created_at(txn)?; - let updated_at = index.updated_at(txn)?; - let primary_key = index.primary_key(txn)?.map(String::from); - Ok(Self { - created_at, - updated_at, - primary_key, - }) - } -} - -#[derive(Clone, derivative::Derivative)] -#[derivative(Debug)] -pub struct Index { - pub uuid: Uuid, - #[derivative(Debug = "ignore")] - pub inner: Arc, - #[derivative(Debug = "ignore")] - update_file_store: Arc, - #[derivative(Debug = "ignore")] - update_handler: Arc, -} - -impl Deref for Index { - type Target = milli::Index; - - fn deref(&self) -> &Self::Target { - self.inner.as_ref() - } -} - -impl Index { - pub fn open( - path: impl AsRef, - size: usize, - update_file_store: Arc, - uuid: Uuid, - update_handler: Arc, - ) -> Result { - create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let inner = Arc::new(milli::Index::new(options, &path)?); - Ok(Index { - inner, - update_file_store, - uuid, - update_handler, - }) + impl Drop for Stub { + fn drop(&mut self) { + if !self.invalidated.load(Ordering::Relaxed) { + let lock = self.times.lock().unwrap(); + if let Some(n) = *lock { + assert_eq!(n, 0, "{} not called enough times", self.name); + } + } + } } - pub fn stats(&self) -> Result { - let rtxn = self.read_txn()?; - - Ok(IndexStats { - size: self.size(), - number_of_documents: self.number_of_documents(&rtxn)?, - is_indexing: None, - field_distribution: self.field_distribution(&rtxn)?, - }) + impl Stub { + fn invalidate(&self) { + self.invalidated.store(true, Ordering::Relaxed); + } } - pub fn meta(&self) -> Result { - IndexMeta::new(self) - } - pub fn settings(&self) -> Result> { - let txn = self.read_txn()?; - self.settings_txn(&txn) + impl Stub { + fn call(&self, args: A) -> R { + let mut lock = self.times.lock().unwrap(); + match *lock { + Some(0) => panic!("{} called to many times", self.name), + Some(ref mut times) => { + *times -= 1; + } + None => (), + } + + // Since we add assertions in the drop implementation for Stub, a panic can occur in a + // panic, causing a hard abort of the program. To handle that, we catch the panic, and + // set the stub as invalidated so the assertions aren't run during the drop. + impl<'a, A, R> RefUnwindSafe for StubHolder<'a, A, R> {} + struct StubHolder<'a, A, R>(&'a (dyn Fn(A) -> R + Sync + Send)); + + let stub = StubHolder(self.stub.as_ref()); + + match std::panic::catch_unwind(|| (stub.0)(args)) { + Ok(r) => r, + Err(panic) => { + self.invalidate(); + std::panic::resume_unwind(panic); + } + } + } } - pub fn settings_txn(&self, txn: &RoTxn) -> Result> { - let displayed_attributes = self - .displayed_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let searchable_attributes = self - .searchable_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); - - let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); - - let criteria = self - .criteria(txn)? - .into_iter() - .map(|c| c.to_string()) - .collect(); - - let stop_words = self - .stop_words(txn)? - .map(|stop_words| -> Result> { - Ok(stop_words.stream().into_strs()?.into_iter().collect()) - }) - .transpose()? - .unwrap_or_else(BTreeSet::new); - let distinct_field = self.distinct_field(txn)?.map(String::from); - - // in milli each word in the synonyms map were split on their separator. Since we lost - // this information we are going to put space between words. - let synonyms = self - .synonyms(txn)? - .iter() - .map(|(key, values)| { - ( - key.join(" "), - values.iter().map(|value| value.join(" ")).collect(), - ) - }) - .collect(); - - Ok(Settings { - displayed_attributes: match displayed_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - searchable_attributes: match searchable_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - filterable_attributes: Setting::Set(filterable_attributes), - sortable_attributes: Setting::Set(sortable_attributes), - ranking_rules: Setting::Set(criteria), - stop_words: Setting::Set(stop_words), - distinct_attribute: match distinct_field { - Some(field) => Setting::Set(field), - None => Setting::Reset, - }, - synonyms: Setting::Set(synonyms), - _kind: PhantomData, - }) + #[derive(Debug, Default)] + struct StubStore { + inner: Arc>>>, } - pub fn retrieve_documents>( - &self, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result>> { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - let fields_to_display = - self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; - - let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit); - - let mut documents = Vec::new(); - - for entry in iter { - let (_id, obkv) = entry?; - let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?; - documents.push(object); + impl StubStore { + pub fn insert(&self, name: String, stub: Stub) { + let mut lock = self.inner.lock().unwrap(); + lock.insert(name, Box::new(stub)); } - Ok(documents) + pub fn get(&self, name: &str) -> Option<&Stub> { + let mut lock = self.inner.lock().unwrap(); + match lock.get_mut(name) { + Some(s) => { + let s = s.as_mut() as *mut dyn Any as *mut Stub; + Some(unsafe { &mut *s }) + } + None => None, + } + } } - pub fn retrieve_document>( - &self, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result> { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - - let fields_to_display = - self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; - - let internal_id = self - .external_documents_ids(&txn)? - .get(doc_id.as_bytes()) - .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; - - let document = self - .documents(&txn, std::iter::once(internal_id))? - .into_iter() - .next() - .map(|(_, d)| d) - .ok_or(IndexError::DocumentNotFound(doc_id))?; - - let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?; - - Ok(document) + pub struct StubBuilder<'a, A, R> { + name: String, + store: &'a StubStore, + times: Option, + _f: std::marker::PhantomData R>, } - pub fn size(&self) -> u64 { - self.env.size() + impl<'a, A: 'static, R: 'static> StubBuilder<'a, A, R> { + /// Asserts the stub has been called exactly `times` times. + #[must_use] + pub fn times(mut self, times: usize) -> Self { + self.times = Some(times); + self + } + + /// Asserts the stub has been called exactly once. + #[must_use] + pub fn once(mut self) -> Self { + self.times = Some(1); + self + } + + /// The function that will be called when the stub is called. This needs to be called to + /// actually build the stub and register it to the stub store. + pub fn then(self, f: impl Fn(A) -> R + Sync + Send + 'static) { + let times = Mutex::new(self.times); + let stub = Stub { + stub: Box::new(f), + times, + name: self.name.clone(), + invalidated: AtomicBool::new(false), + }; + + self.store.insert(self.name, stub); + } } - fn fields_to_display>( - &self, - txn: &heed::RoTxn, - attributes_to_retrieve: &Option>, - fields_ids_map: &milli::FieldsIdsMap, - ) -> Result> { - let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? { - Some(ids) => ids.into_iter().collect::>(), - None => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; - - let attributes_to_retrieve_ids = match attributes_to_retrieve { - Some(attrs) => attrs - .iter() - .filter_map(|f| fields_ids_map.id(f.as_ref())) - .collect::>(), - None => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; - - displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); - Ok(displayed_fields_ids) + /// Mocker allows to stub metod call on any struct. you can register stubs by calling + /// `Mocker::when` and retrieve it in the proxy implementation when with `Mocker::get`. + #[derive(Debug, Default)] + pub struct Mocker { + store: StubStore, } - pub fn snapshot(&self, path: impl AsRef) -> Result<()> { - let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); - create_dir_all(&dst)?; - dst.push("data.mdb"); - let _txn = self.write_txn()?; - self.inner - .env - .copy_to_path(dst, heed::CompactionOption::Enabled)?; - Ok(()) + impl Mocker { + pub fn when(&self, name: &str) -> StubBuilder { + StubBuilder { + name: name.to_string(), + store: &self.store, + times: None, + _f: std::marker::PhantomData, + } + } + + pub fn get(&self, name: &str) -> &Stub { + match self.store.get(name) { + Some(stub) => stub, + None => { + // panic here causes the stubs to get dropped, and panic in turn. To prevent + // that, we forget them, and let them be cleaned by the os later. This is not + // optimal, but is still better than nested panicks. + let mut stubs = self.store.inner.lock().unwrap(); + let stubs = std::mem::take(&mut *stubs); + std::mem::forget(stubs); + panic!("unexpected call to {}", name) + } + } + } + } + + #[derive(Debug, Clone)] + pub enum MockIndex { + Vrai(Index), + Faux(Arc), + } + + impl MockIndex { + pub fn faux(faux: Mocker) -> Self { + Self::Faux(Arc::new(faux)) + } + + pub fn open( + path: impl AsRef, + size: usize, + update_file_store: Arc, + uuid: Uuid, + update_handler: Arc, + ) -> Result { + let index = Index::open(path, size, update_file_store, uuid, update_handler)?; + Ok(Self::Vrai(index)) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + size: usize, + update_handler: &UpdateHandler, + ) -> anyhow::Result<()> { + Index::load_dump(src, dst, size, update_handler)?; + Ok(()) + } + + pub fn handle_update(&self, update: Processing) -> std::result::Result { + match self { + MockIndex::Vrai(index) => index.handle_update(update), + MockIndex::Faux(faux) => faux.get("handle_update").call(update), + } + } + + pub fn uuid(&self) -> Uuid { + match self { + MockIndex::Vrai(index) => index.uuid(), + MockIndex::Faux(faux) => faux.get("uuid").call(()), + } + } + + pub fn stats(&self) -> Result { + match self { + MockIndex::Vrai(index) => index.stats(), + MockIndex::Faux(_) => todo!(), + } + } + + pub fn meta(&self) -> Result { + match self { + MockIndex::Vrai(index) => index.meta(), + MockIndex::Faux(_) => todo!(), + } + } + pub fn settings(&self) -> Result> { + match self { + MockIndex::Vrai(index) => index.settings(), + MockIndex::Faux(_) => todo!(), + } + } + + pub fn retrieve_documents>( + &self, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result>> { + match self { + MockIndex::Vrai(index) => { + index.retrieve_documents(offset, limit, attributes_to_retrieve) + } + MockIndex::Faux(_) => todo!(), + } + } + + pub fn retrieve_document>( + &self, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result> { + match self { + MockIndex::Vrai(index) => index.retrieve_document(doc_id, attributes_to_retrieve), + MockIndex::Faux(_) => todo!(), + } + } + + pub fn size(&self) -> u64 { + match self { + MockIndex::Vrai(index) => index.size(), + MockIndex::Faux(_) => todo!(), + } + } + + pub fn snapshot(&self, path: impl AsRef) -> Result<()> { + match self { + MockIndex::Vrai(index) => index.snapshot(path), + MockIndex::Faux(faux) => faux.get("snapshot").call(path.as_ref()), + } + } + + pub fn inner(&self) -> &milli::Index { + match self { + MockIndex::Vrai(index) => index.inner(), + MockIndex::Faux(_) => todo!(), + } + } + + pub fn update_primary_key(&self, primary_key: Option) -> Result { + match self { + MockIndex::Vrai(index) => index.update_primary_key(primary_key), + MockIndex::Faux(_) => todo!(), + } + } + pub fn perform_search(&self, query: SearchQuery) -> Result { + match self { + MockIndex::Vrai(index) => index.perform_search(query), + MockIndex::Faux(faux) => faux.get("perform_search").call(query), + } + } + + pub fn dump(&self, path: impl AsRef) -> Result<()> { + match self { + MockIndex::Vrai(index) => index.dump(path), + MockIndex::Faux(faux) => faux.get("dump").call(path.as_ref()), + } + } + } + + #[test] + fn test_faux_index() { + let faux = Mocker::default(); + faux.when("snapshot") + .times(2) + .then(|_: &Path| -> Result<()> { Ok(()) }); + + let index = MockIndex::faux(faux); + + let path = PathBuf::from("hello"); + index.snapshot(&path).unwrap(); + index.snapshot(&path).unwrap(); + } + + #[test] + #[should_panic] + fn test_faux_unexisting_method_stub() { + let faux = Mocker::default(); + + let index = MockIndex::faux(faux); + + let path = PathBuf::from("hello"); + index.snapshot(&path).unwrap(); + index.snapshot(&path).unwrap(); + } + + #[test] + #[should_panic] + fn test_faux_panic() { + let faux = Mocker::default(); + faux.when("snapshot") + .times(2) + .then(|_: &Path| -> Result<()> { + panic!(); + }); + + let index = MockIndex::faux(faux); + + let path = PathBuf::from("hello"); + index.snapshot(&path).unwrap(); + index.snapshot(&path).unwrap(); } } diff --git a/meilisearch-lib/src/index/search.rs b/meilisearch-lib/src/index/search.rs index a0ea26127..4521d3ed0 100644 --- a/meilisearch-lib/src/index/search.rs +++ b/meilisearch-lib/src/index/search.rs @@ -12,15 +12,14 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use crate::index::error::FacetError; -use crate::index::IndexError; -use super::error::Result; -use super::Index; +use super::error::{IndexError, Result}; +use super::index::Index; pub type Document = IndexMap; type MatchesInfo = BTreeMap>; -#[derive(Serialize, Debug, Clone)] +#[derive(Serialize, Debug, Clone, PartialEq)] pub struct MatchInfo { start: usize, length: usize, @@ -36,7 +35,7 @@ pub const fn default_crop_length() -> usize { DEFAULT_CROP_LENGTH } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone, PartialEq)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct SearchQuery { pub q: Option, @@ -56,7 +55,7 @@ pub struct SearchQuery { pub facets_distribution: Option>, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, PartialEq)] pub struct SearchHit { #[serde(flatten)] pub document: Document, @@ -66,7 +65,7 @@ pub struct SearchHit { pub matches_info: Option, } -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Clone, PartialEq)] #[serde(rename_all = "camelCase")] pub struct SearchResult { pub hits: Vec, diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index 92d1bdcfe..772d27d76 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -12,7 +12,7 @@ use crate::index_controller::updates::status::{Failed, Processed, Processing, Up use crate::Update; use super::error::{IndexError, Result}; -use super::{Index, IndexMeta}; +use super::index::{Index, IndexMeta}; fn serialize_with_wildcard( field: &Setting>, diff --git a/meilisearch-lib/src/index_controller/dump_actor/actor.rs b/meilisearch-lib/src/index_controller/dump_actor/actor.rs index bfde3896c..9cdeacfaf 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/actor.rs @@ -10,14 +10,16 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use super::error::{DumpActorError, Result}; use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask}; -use crate::index_controller::index_resolver::HardStateIndexResolver; +use crate::index_controller::index_resolver::index_store::IndexStore; +use crate::index_controller::index_resolver::uuid_store::UuidStore; +use crate::index_controller::index_resolver::IndexResolver; use crate::index_controller::updates::UpdateSender; pub const CONCURRENT_DUMP_MSG: usize = 10; -pub struct DumpActor { +pub struct DumpActor { inbox: Option>, - index_resolver: Arc, + index_resolver: Arc>, update: UpdateSender, dump_path: PathBuf, lock: Arc>, @@ -31,10 +33,14 @@ fn generate_uid() -> String { Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() } -impl DumpActor { +impl DumpActor +where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, +{ pub fn new( inbox: mpsc::Receiver, - index_resolver: Arc, + index_resolver: Arc>, update: UpdateSender, dump_path: impl AsRef, index_db_size: usize, @@ -114,7 +120,7 @@ impl DumpActor { let task = DumpTask { path: self.dump_path.clone(), index_resolver: self.index_resolver.clone(), - update_handle: self.update.clone(), + update_sender: self.update.clone(), uid: uid.clone(), update_db_size: self.update_db_size, index_db_size: self.index_db_size, diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs index 0040d4cea..35640aaef 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs @@ -4,7 +4,6 @@ use std::path::{Path, PathBuf}; use serde_json::{Deserializer, Value}; use tempfile::NamedTempFile; -use uuid::Uuid; use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule}; use crate::index_controller::dump_actor::Metadata; @@ -200,7 +199,7 @@ impl From for Enqueued { method, // Just ignore if the uuid is no present. If it is needed later, an error will // be thrown. - content_uuid: content.unwrap_or_else(Uuid::default), + content_uuid: content.unwrap_or_default(), } } compat::UpdateMeta::ClearDocuments => Update::ClearDocuments, diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 0ebedaa09..70f0f8889 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -13,7 +13,9 @@ pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; -use super::index_resolver::HardStateIndexResolver; +use super::index_resolver::index_store::IndexStore; +use super::index_resolver::uuid_store::UuidStore; +use super::index_resolver::IndexResolver; use super::updates::UpdateSender; use crate::compression::{from_tar_gz, to_tar_gz}; use crate::index_controller::dump_actor::error::DumpActorError; @@ -51,6 +53,7 @@ impl Metadata { } #[async_trait::async_trait] +#[cfg_attr(test, mockall::automock)] pub trait DumpActorHandle { /// Start the creation of a dump /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] @@ -218,16 +221,20 @@ pub fn load_dump( Ok(()) } -struct DumpTask { +struct DumpTask { path: PathBuf, - index_resolver: Arc, - update_handle: UpdateSender, + index_resolver: Arc>, + update_sender: UpdateSender, uid: String, update_db_size: usize, index_db_size: usize, } -impl DumpTask { +impl DumpTask +where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, +{ async fn run(self) -> Result<()> { trace!("Performing dump."); @@ -243,7 +250,7 @@ impl DumpTask { let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?; - UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?; + UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?; let dump_path = tokio::task::spawn_blocking(move || -> Result { let temp_dump_file = tempfile::NamedTempFile::new()?; @@ -262,3 +269,110 @@ impl DumpTask { Ok(()) } } + +#[cfg(test)] +mod test { + use std::collections::HashSet; + + use futures::future::{err, ok}; + use once_cell::sync::Lazy; + use uuid::Uuid; + + use super::*; + use crate::index::error::Result as IndexResult; + use crate::index::test::Mocker; + use crate::index::Index; + use crate::index_controller::index_resolver::error::IndexResolverError; + use crate::index_controller::index_resolver::index_store::MockIndexStore; + use crate::index_controller::index_resolver::uuid_store::MockUuidStore; + use crate::index_controller::updates::create_update_handler; + + fn setup() { + static SETUP: Lazy<()> = Lazy::new(|| { + if cfg!(windows) { + std::env::set_var("TMP", "."); + } else { + std::env::set_var("TMPDIR", "."); + } + }); + + // just deref to make sure the env is setup + *SETUP + } + + #[actix_rt::test] + async fn test_dump_normal() { + setup(); + + let tmp = tempfile::tempdir().unwrap(); + + let uuids = std::iter::repeat_with(Uuid::new_v4) + .take(4) + .collect::>(); + let mut uuid_store = MockUuidStore::new(); + let uuids_cloned = uuids.clone(); + uuid_store + .expect_dump() + .once() + .returning(move |_| Box::pin(ok(uuids_cloned.clone()))); + + let mut index_store = MockIndexStore::new(); + index_store.expect_get().times(4).returning(move |uuid| { + let mocker = Mocker::default(); + let uuids_clone = uuids.clone(); + mocker.when::<(), Uuid>("uuid").once().then(move |_| { + assert!(uuids_clone.contains(&uuid)); + uuid + }); + mocker + .when::<&Path, IndexResult<()>>("dump") + .once() + .then(move |_| Ok(())); + Box::pin(ok(Some(Index::faux(mocker)))) + }); + + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); + + let update_sender = + create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap(); + + let task = DumpTask { + path: tmp.path().to_owned(), + index_resolver, + update_sender, + uid: String::from("test"), + update_db_size: 4096 * 10, + index_db_size: 4096 * 10, + }; + + task.run().await.unwrap(); + } + + #[actix_rt::test] + async fn error_performing_dump() { + let tmp = tempfile::tempdir().unwrap(); + + let mut uuid_store = MockUuidStore::new(); + uuid_store + .expect_dump() + .once() + .returning(move |_| Box::pin(err(IndexResolverError::ExistingPrimaryKey))); + + let index_store = MockIndexStore::new(); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); + + let update_sender = + create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap(); + + let task = DumpTask { + path: tmp.path().to_owned(), + index_resolver, + update_sender, + uid: String::from("test"), + update_db_size: 4096 * 10, + index_db_size: 4096 * 10, + }; + + assert!(task.run().await.is_err()); + } +} diff --git a/meilisearch-lib/src/index_controller/index_resolver/index_store.rs b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs index 047711a96..dcc024121 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/index_store.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs @@ -17,6 +17,7 @@ use crate::options::IndexerOpts; type AsyncMap = Arc>>; #[async_trait::async_trait] +#[cfg_attr(test, mockall::automock)] pub trait IndexStore { async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; async fn get(&self, uuid: Uuid) -> Result>; @@ -72,9 +73,10 @@ impl IndexStore for MapIndexStore { let index = spawn_blocking(move || -> Result { let index = Index::open(path, index_size, file_store, uuid, update_handler)?; if let Some(primary_key) = primary_key { - let mut txn = index.write_txn()?; + let inner = index.inner(); + let mut txn = inner.write_txn()?; - let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); + let mut builder = UpdateBuilder::new(0).settings(&mut txn, index.inner()); builder.set_primary_key(primary_key); builder.execute(|_, _| ())?; diff --git a/meilisearch-lib/src/index_controller/index_resolver/mod.rs b/meilisearch-lib/src/index_controller/index_resolver/mod.rs index 008d0d219..979f6dc6c 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/mod.rs @@ -1,11 +1,12 @@ pub mod error; -mod index_store; +pub mod index_store; pub mod uuid_store; use std::path::Path; use error::{IndexResolverError, Result}; use index_store::{IndexStore, MapIndexStore}; +use log::error; use uuid::Uuid; use uuid_store::{HeedUuidStore, UuidStore}; @@ -98,8 +99,19 @@ where } let uuid = Uuid::new_v4(); let index = self.index_store.create(uuid, primary_key).await?; - self.index_uuid_store.insert(uid, uuid).await?; - Ok(index) + match self.index_uuid_store.insert(uid, uuid).await { + Err(e) => { + match self.index_store.delete(uuid).await { + Ok(Some(index)) => { + index.inner().clone().prepare_for_closing(); + } + Ok(None) => (), + Err(e) => error!("Error while deleting index: {:?}", e), + } + Err(e) + } + Ok(()) => Ok(index), + } } pub async fn list(&self) -> Result> { @@ -121,7 +133,13 @@ where pub async fn delete_index(&self, uid: String) -> Result { match self.index_uuid_store.delete(uid.clone()).await? { Some(uuid) => { - let _ = self.index_store.delete(uuid).await; + match self.index_store.delete(uuid).await { + Ok(Some(index)) => { + index.inner().clone().prepare_for_closing(); + } + Ok(None) => (), + Err(e) => error!("Error while deleting index: {:?}", e), + } Ok(uuid) } None => Err(IndexResolverError::UnexistingIndex(uid)), diff --git a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs index f8bde7270..f10bad757 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs @@ -22,6 +22,7 @@ struct DumpEntry { const UUIDS_DB_PATH: &str = "index_uuids"; #[async_trait::async_trait] +#[cfg_attr(test, mockall::automock)] pub trait UuidStore: Sized { // Create a new entry for `name`. Return an error if `err` and the entry already exists, return // the uuid otherwise. diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index f6fcda46c..7273a80db 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -30,7 +30,9 @@ use error::Result; use self::dump_actor::load_dump; use self::index_resolver::error::IndexResolverError; -use self::index_resolver::HardStateIndexResolver; +use self::index_resolver::index_store::{IndexStore, MapIndexStore}; +use self::index_resolver::uuid_store::{HeedUuidStore, UuidStore}; +use self::index_resolver::IndexResolver; use self::updates::status::UpdateStatus; use self::updates::UpdateMsg; @@ -41,6 +43,10 @@ mod snapshot; pub mod update_file_store; pub mod updates; +/// Concrete implementation of the IndexController, exposed by meilisearch-lib +pub type MeiliSearch = + IndexController; + pub type Payload = Box< dyn Stream> + Send + Sync + 'static + Unpin, >; @@ -62,13 +68,6 @@ pub struct IndexSettings { pub primary_key: Option, } -#[derive(Clone)] -pub struct IndexController { - index_resolver: Arc, - update_sender: updates::UpdateSender, - dump_handle: dump_actor::DumpActorHandleImpl, -} - #[derive(Debug)] pub enum DocumentAdditionFormat { Json, @@ -129,7 +128,7 @@ impl IndexControllerBuilder { self, db_path: impl AsRef, indexer_options: IndexerOpts, - ) -> anyhow::Result { + ) -> anyhow::Result { let index_size = self .max_index_size .ok_or_else(|| anyhow::anyhow!("Missing index size"))?; @@ -178,6 +177,8 @@ impl IndexControllerBuilder { update_store_size, )?; + let dump_handle = Arc::new(dump_handle); + if self.schedule_snapshot { let snapshot_service = SnapshotService::new( index_resolver.clone(), @@ -266,7 +267,22 @@ impl IndexControllerBuilder { } } -impl IndexController { +// We are using derivative here to derive Clone, because U, I and D do not necessarily implement +// Clone themselves. +#[derive(derivative::Derivative)] +#[derivative(Clone(bound = ""))] +pub struct IndexController { + index_resolver: Arc>, + update_sender: updates::UpdateSender, + dump_handle: Arc, +} + +impl IndexController +where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + D: DumpActorHandle + Send + Sync, +{ pub fn builder() -> IndexControllerBuilder { IndexControllerBuilder::default() } @@ -286,7 +302,7 @@ impl IndexController { if create_index { let index = self.index_resolver.create_index(name, None).await?; let update_result = - UpdateMsg::update(&self.update_sender, index.uuid, update).await?; + UpdateMsg::update(&self.update_sender, index.uuid(), update).await?; Ok(update_result) } else { Err(IndexResolverError::UnexistingIndex(name).into()) @@ -314,7 +330,7 @@ impl IndexController { for (uid, index) in indexes { let meta = index.meta()?; let meta = IndexMetadata { - uuid: index.uuid, + uuid: index.uuid(), name: uid.clone(), uid, meta, @@ -366,7 +382,7 @@ impl IndexController { index_settings.uid.take(); let index = self.index_resolver.get_index(uid.clone()).await?; - let uuid = index.uuid; + let uuid = index.uuid(); let meta = spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??; let meta = IndexMetadata { @@ -386,7 +402,7 @@ impl IndexController { pub async fn get_index(&self, uid: String) -> Result { let index = self.index_resolver.get_index(uid.clone()).await?; - let uuid = index.uuid; + let uuid = index.uuid(); let meta = spawn_blocking(move || index.meta()).await??; let meta = IndexMetadata { uuid, @@ -400,7 +416,7 @@ impl IndexController { pub async fn get_index_stats(&self, uid: String) -> Result { let update_infos = UpdateMsg::get_info(&self.update_sender).await?; let index = self.index_resolver.get_index(uid).await?; - let uuid = index.uuid; + let uuid = index.uuid(); let mut stats = spawn_blocking(move || index.stats()).await??; // Check if the currently indexing update is from our index. stats.is_indexing = Some(Some(uuid) == update_infos.processing); @@ -414,7 +430,7 @@ impl IndexController { let mut indexes = BTreeMap::new(); for (index_uid, index) in self.index_resolver.list().await? { - let uuid = index.uuid; + let uuid = index.uuid(); let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || { let stats = index.stats()?; let meta = index.meta()?; @@ -461,7 +477,7 @@ impl IndexController { let meta = spawn_blocking(move || -> IndexResult<_> { let meta = index.meta()?; let meta = IndexMetadata { - uuid: index.uuid, + uuid: index.uuid(), uid: uid.clone(), name: uid, meta, @@ -497,3 +513,103 @@ pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { } } } + +#[cfg(test)] +mod test { + use futures::future::ok; + use mockall::predicate::eq; + use tokio::sync::mpsc; + + use crate::index::error::Result as IndexResult; + use crate::index::test::Mocker; + use crate::index::Index; + use crate::index_controller::dump_actor::MockDumpActorHandle; + use crate::index_controller::index_resolver::index_store::MockIndexStore; + use crate::index_controller::index_resolver::uuid_store::MockUuidStore; + + use super::updates::UpdateSender; + use super::*; + + impl IndexController { + pub fn mock( + index_resolver: IndexResolver, + update_sender: UpdateSender, + dump_handle: D, + ) -> Self { + IndexController { + index_resolver: Arc::new(index_resolver), + update_sender, + dump_handle: Arc::new(dump_handle), + } + } + } + + #[actix_rt::test] + async fn test_search_simple() { + let index_uid = "test"; + let index_uuid = Uuid::new_v4(); + let query = SearchQuery { + q: Some(String::from("hello world")), + offset: Some(10), + limit: 0, + attributes_to_retrieve: Some(vec!["string".to_owned()].into_iter().collect()), + attributes_to_crop: None, + crop_length: 18, + attributes_to_highlight: None, + matches: true, + filter: None, + sort: None, + facets_distribution: None, + }; + + let result = SearchResult { + hits: vec![], + nb_hits: 29, + exhaustive_nb_hits: true, + query: "hello world".to_string(), + limit: 24, + offset: 0, + processing_time_ms: 50, + facets_distribution: None, + exhaustive_facets_count: Some(true), + }; + + let mut uuid_store = MockUuidStore::new(); + uuid_store + .expect_get_uuid() + .with(eq(index_uid.to_owned())) + .returning(move |s| Box::pin(ok((s, Some(index_uuid))))); + + let mut index_store = MockIndexStore::new(); + let result_clone = result.clone(); + let query_clone = query.clone(); + index_store + .expect_get() + .with(eq(index_uuid)) + .returning(move |_uuid| { + let result = result_clone.clone(); + let query = query_clone.clone(); + let mocker = Mocker::default(); + mocker + .when::>("perform_search") + .once() + .then(move |q| { + assert_eq!(&q, &query); + Ok(result.clone()) + }); + let index = Index::faux(mocker); + Box::pin(ok(Some(index))) + }); + + let index_resolver = IndexResolver::new(uuid_store, index_store); + let (update_sender, _) = mpsc::channel(1); + let dump_actor = MockDumpActorHandle::new(); + let index_controller = IndexController::mock(index_resolver, update_sender, dump_actor); + + let r = index_controller + .search(index_uid.to_owned(), query.clone()) + .await + .unwrap(); + assert_eq!(r, result); + } +} diff --git a/meilisearch-lib/src/index_controller/snapshot.rs b/meilisearch-lib/src/index_controller/snapshot.rs index 36e45547e..6a22a285c 100644 --- a/meilisearch-lib/src/index_controller/snapshot.rs +++ b/meilisearch-lib/src/index_controller/snapshot.rs @@ -11,20 +11,26 @@ use tokio::time::sleep; use crate::compression::from_tar_gz; use crate::index_controller::updates::UpdateMsg; -use super::index_resolver::HardStateIndexResolver; +use super::index_resolver::index_store::IndexStore; +use super::index_resolver::uuid_store::UuidStore; +use super::index_resolver::IndexResolver; use super::updates::UpdateSender; -pub struct SnapshotService { - index_resolver: Arc, +pub struct SnapshotService { + index_resolver: Arc>, update_sender: UpdateSender, snapshot_period: Duration, snapshot_path: PathBuf, db_name: String, } -impl SnapshotService { +impl SnapshotService +where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, +{ pub fn new( - index_resolver: Arc, + index_resolver: Arc>, update_sender: UpdateSender, snapshot_period: Duration, snapshot_path: PathBuf, @@ -125,133 +131,169 @@ pub fn load_snapshot( } } -//#[cfg(test)] -//mod test { -//use std::iter::FromIterator; -//use std::{collections::HashSet, sync::Arc}; +#[cfg(test)] +mod test { + use std::{collections::HashSet, sync::Arc}; -//use futures::future::{err, ok}; -//use rand::Rng; -//use tokio::time::timeout; -//use uuid::Uuid; + use futures::future::{err, ok}; + use once_cell::sync::Lazy; + use rand::Rng; + use uuid::Uuid; -//use super::*; + use crate::index::error::IndexError; + use crate::index::test::Mocker; + use crate::index::{error::Result as IndexResult, Index}; + use crate::index_controller::index_resolver::error::IndexResolverError; + use crate::index_controller::index_resolver::index_store::MockIndexStore; + use crate::index_controller::index_resolver::uuid_store::MockUuidStore; + use crate::index_controller::index_resolver::IndexResolver; + use crate::index_controller::updates::create_update_handler; -//#[actix_rt::test] -//async fn test_normal() { -//let mut rng = rand::thread_rng(); -//let uuids_num: usize = rng.gen_range(5..10); -//let uuids = (0..uuids_num) -//.map(|_| Uuid::new_v4()) -//.collect::>(); + use super::*; -//let mut uuid_resolver = MockUuidResolverHandle::new(); -//let uuids_clone = uuids.clone(); -//uuid_resolver -//.expect_snapshot() -//.times(1) -//.returning(move |_| Box::pin(ok(uuids_clone.clone()))); + fn setup() { + static SETUP: Lazy<()> = Lazy::new(|| { + if cfg!(windows) { + std::env::set_var("TMP", "."); + } else { + std::env::set_var("TMPDIR", "."); + } + }); -//let uuids_clone = uuids.clone(); -//let mut index_handle = MockIndexActorHandle::new(); -//index_handle -//.expect_snapshot() -//.withf(move |uuid, _path| uuids_clone.contains(uuid)) -//.times(uuids_num) -//.returning(move |_, _| Box::pin(ok(()))); + // just deref to make sure the env is setup + *SETUP + } -//let dir = tempfile::tempdir_in(".").unwrap(); -//let handle = Arc::new(index_handle); -//let update_handle = -//UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); + #[actix_rt::test] + async fn test_normal() { + setup(); -//let snapshot_path = tempfile::tempdir_in(".").unwrap(); -//let snapshot_service = SnapshotService::new( -//uuid_resolver, -//update_handle, -//Duration::from_millis(100), -//snapshot_path.path().to_owned(), -//"data.ms".to_string(), -//); + let mut rng = rand::thread_rng(); + let uuids_num: usize = rng.gen_range(5..10); + let uuids = (0..uuids_num) + .map(|_| Uuid::new_v4()) + .collect::>(); -//snapshot_service.perform_snapshot().await.unwrap(); -//} + let mut uuid_store = MockUuidStore::new(); + let uuids_clone = uuids.clone(); + uuid_store + .expect_snapshot() + .times(1) + .returning(move |_| Box::pin(ok(uuids_clone.clone()))); -//#[actix_rt::test] -//async fn error_performing_uuid_snapshot() { -//let mut uuid_resolver = MockUuidResolverHandle::new(); -//uuid_resolver -//.expect_snapshot() -//.times(1) -////abitrary error -//.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + let mut indexes = uuids.clone().into_iter().map(|uuid| { + let mocker = Mocker::default(); + mocker + .when("snapshot") + .times(1) + .then(|_: &Path| -> IndexResult<()> { Ok(()) }); + mocker.when("uuid").then(move |_: ()| uuid); + Index::faux(mocker) + }); -//let update_handle = MockUpdateActorHandle::new(); + let uuids_clone = uuids.clone(); + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .withf(move |uuid| uuids_clone.contains(uuid)) + .times(uuids_num) + .returning(move |_| Box::pin(ok(Some(indexes.next().unwrap())))); -//let snapshot_path = tempfile::tempdir_in(".").unwrap(); -//let snapshot_service = SnapshotService::new( -//uuid_resolver, -//update_handle, -//Duration::from_millis(100), -//snapshot_path.path().to_owned(), -//"data.ms".to_string(), -//); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); -//assert!(snapshot_service.perform_snapshot().await.is_err()); -////Nothing was written to the file -//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); -//} + let dir = tempfile::tempdir().unwrap(); + let update_sender = + create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap(); -//#[actix_rt::test] -//async fn error_performing_index_snapshot() { -//let uuid = Uuid::new_v4(); -//let mut uuid_resolver = MockUuidResolverHandle::new(); -//uuid_resolver -//.expect_snapshot() -//.times(1) -//.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); + let snapshot_path = tempfile::tempdir().unwrap(); + let snapshot_service = SnapshotService::new( + index_resolver, + update_sender, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); -//let mut update_handle = MockUpdateActorHandle::new(); -//update_handle -//.expect_snapshot() -////abitrary error -//.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); + snapshot_service.perform_snapshot().await.unwrap(); + } -//let snapshot_path = tempfile::tempdir_in(".").unwrap(); -//let snapshot_service = SnapshotService::new( -//uuid_resolver, -//update_handle, -//Duration::from_millis(100), -//snapshot_path.path().to_owned(), -//"data.ms".to_string(), -//); + #[actix_rt::test] + async fn error_performing_uuid_snapshot() { + setup(); -//assert!(snapshot_service.perform_snapshot().await.is_err()); -////Nothing was written to the file -//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); -//} + let mut uuid_store = MockUuidStore::new(); + uuid_store + .expect_snapshot() + .once() + .returning(move |_| Box::pin(err(IndexResolverError::IndexAlreadyExists))); -//#[actix_rt::test] -//async fn test_loop() { -//let mut uuid_resolver = MockUuidResolverHandle::new(); -//uuid_resolver -//.expect_snapshot() -////we expect the funtion to be called between 2 and 3 time in the given interval. -//.times(2..4) -////abitrary error, to short-circuit the function -//.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + let mut index_store = MockIndexStore::new(); + index_store.expect_get().never(); -//let update_handle = MockUpdateActorHandle::new(); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); -//let snapshot_path = tempfile::tempdir_in(".").unwrap(); -//let snapshot_service = SnapshotService::new( -//uuid_resolver, -//update_handle, -//Duration::from_millis(100), -//snapshot_path.path().to_owned(), -//"data.ms".to_string(), -//); + let dir = tempfile::tempdir().unwrap(); + let update_sender = + create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap(); -//let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; -//} -//} + let snapshot_path = tempfile::tempdir().unwrap(); + let snapshot_service = SnapshotService::new( + index_resolver, + update_sender, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + assert!(snapshot_service.perform_snapshot().await.is_err()); + } + + #[actix_rt::test] + async fn error_performing_index_snapshot() { + setup(); + + let uuids: HashSet = vec![Uuid::new_v4()].into_iter().collect(); + + let mut uuid_store = MockUuidStore::new(); + let uuids_clone = uuids.clone(); + uuid_store + .expect_snapshot() + .once() + .returning(move |_| Box::pin(ok(uuids_clone.clone()))); + + let mut indexes = uuids.clone().into_iter().map(|uuid| { + let mocker = Mocker::default(); + // index returns random error + mocker + .when("snapshot") + .then(|_: &Path| -> IndexResult<()> { Err(IndexError::ExistingPrimaryKey) }); + mocker.when("uuid").then(move |_: ()| uuid); + Index::faux(mocker) + }); + + let uuids_clone = uuids.clone(); + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .withf(move |uuid| uuids_clone.contains(uuid)) + .once() + .returning(move |_| Box::pin(ok(Some(indexes.next().unwrap())))); + + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); + + let dir = tempfile::tempdir().unwrap(); + let update_sender = + create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap(); + + let snapshot_path = tempfile::tempdir().unwrap(); + let snapshot_service = SnapshotService::new( + index_resolver, + update_sender, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + assert!(snapshot_service.perform_snapshot().await.is_err()); + } +} diff --git a/meilisearch-lib/src/index_controller/updates/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs index eb539963e..39a73c7c4 100644 --- a/meilisearch-lib/src/index_controller/updates/error.rs +++ b/meilisearch-lib/src/index_controller/updates/error.rs @@ -5,6 +5,7 @@ use meilisearch_error::{Code, ErrorCode}; use crate::{ document_formats::DocumentFormatError, + index::error::IndexError, index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat}, }; @@ -28,6 +29,8 @@ pub enum UpdateLoopError { PayloadError(#[from] actix_web::error::PayloadError), #[error("A {0} payload is missing.")] MissingPayload(DocumentAdditionFormat), + #[error("{0}")] + IndexError(#[from] IndexError), } impl From> for UpdateLoopError @@ -58,7 +61,6 @@ impl ErrorCode for UpdateLoopError { match self { Self::UnexistingUpdate(_) => Code::NotFound, Self::Internal(_) => Code::Internal, - //Self::IndexActor(e) => e.error_code(), Self::FatalUpdateStoreError => Code::Internal, Self::DocumentFormatError(error) => error.error_code(), Self::PayloadError(error) => match error { @@ -66,6 +68,7 @@ impl ErrorCode for UpdateLoopError { _ => Code::Internal, }, Self::MissingPayload(_) => Code::MissingPayload, + Self::IndexError(e) => e.error_code(), } } } diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 037cf96b0..f106b87f3 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -26,16 +26,22 @@ use crate::index::{Index, Settings, Unchecked}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; -use super::index_resolver::HardStateIndexResolver; +use super::index_resolver::index_store::IndexStore; +use super::index_resolver::uuid_store::UuidStore; +use super::index_resolver::IndexResolver; use super::{DocumentAdditionFormat, Update}; pub type UpdateSender = mpsc::Sender; -pub fn create_update_handler( - index_resolver: Arc, +pub fn create_update_handler( + index_resolver: Arc>, db_path: impl AsRef, update_store_size: usize, -) -> anyhow::Result { +) -> anyhow::Result +where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, +{ let path = db_path.as_ref().to_owned(); let (sender, receiver) = mpsc::channel(100); let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?; @@ -95,12 +101,16 @@ pub struct UpdateLoop { } impl UpdateLoop { - pub fn new( + pub fn new( update_db_size: usize, inbox: mpsc::Receiver, path: impl AsRef, - index_resolver: Arc, - ) -> anyhow::Result { + index_resolver: Arc>, + ) -> anyhow::Result + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { let path = path.as_ref().to_owned(); std::fs::create_dir_all(&path)?; diff --git a/meilisearch-lib/src/index_controller/updates/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs index cec5431a8..48e1ec821 100644 --- a/meilisearch-lib/src/index_controller/updates/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -34,7 +34,7 @@ impl UpdateStore { // txn must *always* be acquired after state lock, or it will dead lock. let txn = self.env.write_txn()?; - let uuids = indexes.iter().map(|i| i.uuid).collect(); + let uuids = indexes.iter().map(|i| i.uuid()).collect(); self.dump_updates(&txn, &uuids, &path)?; diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index df89d6ecc..81525c3fd 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -29,6 +29,8 @@ use codec::*; use super::error::Result; use super::status::{Enqueued, Processing}; use crate::index::Index; +use crate::index_controller::index_resolver::index_store::IndexStore; +use crate::index_controller::index_resolver::uuid_store::UuidStore; use crate::index_controller::updates::*; use crate::EnvSizer; @@ -157,13 +159,17 @@ impl UpdateStore { )) } - pub fn open( + pub fn open( options: EnvOpenOptions, path: impl AsRef, - index_resolver: Arc, + index_resolver: Arc>, must_exit: Arc, update_file_store: UpdateFileStore, - ) -> anyhow::Result> { + ) -> anyhow::Result> + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { let (update_store, mut notification_receiver) = Self::new(options, path, update_file_store)?; let update_store = Arc::new(update_store); @@ -296,10 +302,14 @@ impl UpdateStore { /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update( + fn process_pending_update( &self, - index_resolver: Arc, - ) -> Result> { + index_resolver: Arc>, + ) -> Result> + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; @@ -325,13 +335,17 @@ impl UpdateStore { } } - fn perform_update( + fn perform_update( &self, processing: Processing, - index_resolver: Arc, + index_resolver: Arc>, index_uuid: Uuid, global_id: u64, - ) -> Result> { + ) -> Result> + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { // Process the pending update using the provided user function. let handle = Handle::current(); let update_id = processing.id(); @@ -509,7 +523,7 @@ impl UpdateStore { let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); - let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid).collect(); + let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid()).collect(); for entry in pendings { let ((_, uuid, _), pending) = entry?; if uuids.contains(&uuid) { @@ -518,9 +532,7 @@ impl UpdateStore { .. } = pending.decode()? { - self.update_file_store - .snapshot(content_uuid, &path) - .unwrap(); + self.update_file_store.snapshot(content_uuid, &path)?; } } } @@ -528,8 +540,7 @@ impl UpdateStore { let path = path.as_ref().to_owned(); indexes .par_iter() - .try_for_each(|index| index.snapshot(path.clone())) - .unwrap(); + .try_for_each(|index| index.snapshot(&path))?; Ok(()) } @@ -557,149 +568,217 @@ impl UpdateStore { } } -//#[cfg(test)] -//mod test { -//use super::*; -//use crate::index_controller::{ -//index_actor::{error::IndexActorError, MockIndexActorHandle}, -//UpdateResult, -//}; +#[cfg(test)] +mod test { + use futures::future::ok; + use mockall::predicate::eq; -//use futures::future::ok; + use crate::index::error::IndexError; + use crate::index::test::Mocker; + use crate::index_controller::index_resolver::index_store::MockIndexStore; + use crate::index_controller::index_resolver::uuid_store::MockUuidStore; + use crate::index_controller::updates::status::{Failed, Processed}; -//#[actix_rt::test] -//async fn test_next_id() { -//let dir = tempfile::tempdir_in(".").unwrap(); -//let mut options = EnvOpenOptions::new(); -//let handle = Arc::new(MockIndexActorHandle::new()); -//options.map_size(4096 * 100); -//let update_store = UpdateStore::open( -//options, -//dir.path(), -//handle, -//Arc::new(AtomicBool::new(false)), -//) -//.unwrap(); + use super::*; -//let index1_uuid = Uuid::new_v4(); -//let index2_uuid = Uuid::new_v4(); + #[actix_rt::test] + async fn test_next_id() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut options = EnvOpenOptions::new(); + let index_store = MockIndexStore::new(); + let uuid_store = MockUuidStore::new(); + let index_resolver = IndexResolver::new(uuid_store, index_store); + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + options.map_size(4096 * 100); + let update_store = UpdateStore::open( + options, + dir.path(), + Arc::new(index_resolver), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); -//let mut txn = update_store.env.write_txn().unwrap(); -//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); -//txn.commit().unwrap(); -//assert_eq!((0, 0), ids); + let index1_uuid = Uuid::new_v4(); + let index2_uuid = Uuid::new_v4(); -//let mut txn = update_store.env.write_txn().unwrap(); -//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); -//txn.commit().unwrap(); -//assert_eq!((1, 0), ids); + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((0, 0), ids); -//let mut txn = update_store.env.write_txn().unwrap(); -//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); -//txn.commit().unwrap(); -//assert_eq!((2, 1), ids); -//} + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((1, 0), ids); -//#[actix_rt::test] -//async fn test_register_update() { -//let dir = tempfile::tempdir_in(".").unwrap(); -//let mut options = EnvOpenOptions::new(); -//let handle = Arc::new(MockIndexActorHandle::new()); -//options.map_size(4096 * 100); -//let update_store = UpdateStore::open( -//options, -//dir.path(), -//handle, -//Arc::new(AtomicBool::new(false)), -//) -//.unwrap(); -//let meta = UpdateMeta::ClearDocuments; -//let uuid = Uuid::new_v4(); -//let store_clone = update_store.clone(); -//tokio::task::spawn_blocking(move || { -//store_clone.register_update(meta, None, uuid).unwrap(); -//}) -//.await -//.unwrap(); + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((2, 1), ids); + } -//let txn = update_store.env.read_txn().unwrap(); -//assert!(update_store -//.pending_queue -//.get(&txn, &(0, uuid, 0)) -//.unwrap() -//.is_some()); -//} + #[actix_rt::test] + async fn test_register_update() { + let dir = tempfile::tempdir_in(".").unwrap(); + let index_store = MockIndexStore::new(); + let uuid_store = MockUuidStore::new(); + let index_resolver = IndexResolver::new(uuid_store, index_store); + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let update_store = UpdateStore::open( + options, + dir.path(), + Arc::new(index_resolver), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); + let update = Update::ClearDocuments; + let uuid = Uuid::new_v4(); + let store_clone = update_store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.register_update(uuid, update).unwrap(); + }) + .await + .unwrap(); -//#[actix_rt::test] -//async fn test_process_update() { -//let dir = tempfile::tempdir_in(".").unwrap(); -//let mut handle = MockIndexActorHandle::new(); + let txn = update_store.env.read_txn().unwrap(); + assert!(update_store + .pending_queue + .get(&txn, &(0, uuid, 0)) + .unwrap() + .is_some()); + } -//handle -//.expect_update() -//.times(2) -//.returning(|_index_uuid, processing, _file| { -//if processing.id() == 0 { -//Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) -//} else { -//Box::pin(ok(Err( -//processing.fail(IndexActorError::ExistingPrimaryKey.into()) -//))) -//} -//}); + #[actix_rt::test] + async fn test_process_update_success() { + let dir = tempfile::tempdir_in(".").unwrap(); + let index_uuid = Uuid::new_v4(); -//let handle = Arc::new(handle); + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .with(eq(index_uuid)) + .returning(|_uuid| { + let mocker = Mocker::default(); + mocker + .when::>("handle_update") + .once() + .then(|update| Ok(update.process(status::UpdateResult::Other))); -//let mut options = EnvOpenOptions::new(); -//options.map_size(4096 * 100); -//let store = UpdateStore::open( -//options, -//dir.path(), -//handle.clone(), -//Arc::new(AtomicBool::new(false)), -//) -//.unwrap(); + Box::pin(ok(Some(Index::faux(mocker)))) + }); -//// wait a bit for the event loop exit. -//tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let uuid_store = MockUuidStore::new(); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); -//let mut txn = store.env.write_txn().unwrap(); + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let store = UpdateStore::open( + options, + dir.path(), + index_resolver.clone(), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); -//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); -//let uuid = Uuid::new_v4(); + // wait a bit for the event loop exit. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; -//store -//.pending_queue -//.put(&mut txn, &(0, uuid, 0), &update) -//.unwrap(); + let mut txn = store.env.write_txn().unwrap(); -//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); + let update = Enqueued::new(Update::ClearDocuments, 0); -//store -//.pending_queue -//.put(&mut txn, &(1, uuid, 1), &update) -//.unwrap(); + store + .pending_queue + .put(&mut txn, &(0, index_uuid, 0), &update) + .unwrap(); -//txn.commit().unwrap(); + txn.commit().unwrap(); -//// Process the pending, and check that it has been moved to the update databases, and -//// removed from the pending database. -//let store_clone = store.clone(); -//tokio::task::spawn_blocking(move || { -//store_clone.process_pending_update(handle.clone()).unwrap(); -//store_clone.process_pending_update(handle).unwrap(); -//}) -//.await -//.unwrap(); + // Process the pending, and check that it has been moved to the update databases, and + // removed from the pending database. + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.process_pending_update(index_resolver).unwrap(); + }) + .await + .unwrap(); -//let txn = store.env.read_txn().unwrap(); + let txn = store.env.read_txn().unwrap(); -//assert!(store.pending_queue.first(&txn).unwrap().is_none()); -//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); + assert!(store.pending_queue.first(&txn).unwrap().is_none()); + let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap(); -//assert!(matches!(update, UpdateStatus::Processed(_))); -//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); + assert!(matches!(update, UpdateStatus::Processed(_))); + } -//assert!(matches!(update, UpdateStatus::Failed(_))); -//} -//} + #[actix_rt::test] + async fn test_process_update_failure() { + let dir = tempfile::tempdir_in(".").unwrap(); + let index_uuid = Uuid::new_v4(); + + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .with(eq(index_uuid)) + .returning(|_uuid| { + let mocker = Mocker::default(); + mocker + .when::>("handle_update") + .once() + .then(|update| Err(update.fail(IndexError::ExistingPrimaryKey))); + + Box::pin(ok(Some(Index::faux(mocker)))) + }); + + let uuid_store = MockUuidStore::new(); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); + + let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let store = UpdateStore::open( + options, + dir.path(), + index_resolver.clone(), + Arc::new(AtomicBool::new(false)), + update_file_store, + ) + .unwrap(); + + // wait a bit for the event loop exit. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let mut txn = store.env.write_txn().unwrap(); + + let update = Enqueued::new(Update::ClearDocuments, 0); + + store + .pending_queue + .put(&mut txn, &(0, index_uuid, 0), &update) + .unwrap(); + + txn.commit().unwrap(); + + // Process the pending, and check that it has been moved to the update databases, and + // removed from the pending database. + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.process_pending_update(index_resolver).unwrap(); + }) + .await + .unwrap(); + + let txn = store.env.read_txn().unwrap(); + + assert!(store.pending_queue.first(&txn).unwrap().is_none()); + let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap(); + + assert!(matches!(update, UpdateStatus::Failed(_))); + } +} diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 364a96dcf..b232d11ea 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -5,7 +5,8 @@ pub mod options; pub mod index; pub mod index_controller; -pub use index_controller::{updates::store::Update, IndexController as MeiliSearch}; +pub use index_controller::updates::store::Update; +pub use index_controller::MeiliSearch; pub use milli;