diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index a38ddaab2..a78b0d11b 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -250,6 +250,7 @@ impl super::Analytics for SegmentAnalytics { struct Infos { env: String, experimental_enable_metrics: bool, + experimental_ha_parameters: bool, experimental_enable_logs_route: bool, experimental_reduce_indexing_memory_usage: bool, experimental_max_number_of_batched_tasks: usize, @@ -288,6 +289,7 @@ impl From for Infos { let Opt { db_path, experimental_enable_metrics, + experimental_ha_parameters, experimental_enable_logs_route, experimental_reduce_indexing_memory_usage, experimental_max_number_of_batched_tasks, @@ -335,6 +337,7 @@ impl From for Infos { Self { env, experimental_enable_metrics, + experimental_ha_parameters, experimental_enable_logs_route, experimental_reduce_indexing_memory_usage, db_path: db_path != PathBuf::from("./data.ms"), diff --git a/meilisearch/src/extractors/sequential_extractor.rs b/meilisearch/src/extractors/sequential_extractor.rs index c04210616..23d6cb997 100644 --- a/meilisearch/src/extractors/sequential_extractor.rs +++ b/meilisearch/src/extractors/sequential_extractor.rs @@ -131,6 +131,7 @@ gen_seq! { SeqFromRequestFut3; A B C } gen_seq! { SeqFromRequestFut4; A B C D } gen_seq! { SeqFromRequestFut5; A B C D E } gen_seq! { SeqFromRequestFut6; A B C D E F } +gen_seq! { SeqFromRequestFut7; A B C D E F G } pin_project! { #[project = ExtractProj] diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 328b9e9b2..2d9dec485 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -453,6 +453,7 @@ pub fn configure_data( .app_data(auth) .app_data(web::Data::from(analytics)) .app_data(web::Data::new(logs)) + .app_data(web::Data::new(opt.clone())) .app_data( web::JsonConfig::default() .limit(http_payload_size_limit) diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 9586a3f6f..4dd17d546 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -51,6 +51,7 @@ const MEILI_IGNORE_MISSING_DUMP: &str = "MEILI_IGNORE_MISSING_DUMP"; const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS"; const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR"; const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL"; +const MEILI_EXPERIMENTAL_HA_PARAMETERS: &str = "MEILI_EXPERIMENTAL_HA_PARAMETERS"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = @@ -317,6 +318,17 @@ pub struct Opt { #[serde(default)] pub experimental_enable_logs_route: bool, + /// Enable multiple features that helps you to run meilisearch in a high availability context. + /// TODO: TAMO: Update the discussion link + /// For more information, see: + /// + /// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now + /// - Lets you specify a custom task ID upon registering a task + /// - Lets you execute dry-register a task (get an answer from the route but nothing is actually registered in meilisearch and it won't be processed) + #[clap(long, env = MEILI_EXPERIMENTAL_HA_PARAMETERS)] + #[serde(default)] + pub experimental_ha_parameters: bool, + /// Experimental RAM reduction during indexing, do not use in production, see: #[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)] #[serde(default)] @@ -423,6 +435,7 @@ impl Opt { no_analytics, experimental_enable_metrics, experimental_enable_logs_route, + experimental_ha_parameters, experimental_reduce_indexing_memory_usage, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); @@ -479,6 +492,10 @@ impl Opt { MEILI_EXPERIMENTAL_ENABLE_METRICS, experimental_enable_metrics.to_string(), ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_HA_PARAMETERS, + experimental_ha_parameters.to_string(), + ); export_to_env_if_not_present( MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE, experimental_enable_logs_route.to_string(), diff --git a/meilisearch/src/routes/dump.rs b/meilisearch/src/routes/dump.rs index 8f44070d8..56231a759 100644 --- a/meilisearch/src/routes/dump.rs +++ b/meilisearch/src/routes/dump.rs @@ -12,6 +12,7 @@ use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; use crate::routes::{get_task_id, SummarizedTaskView}; +use crate::Opt; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump)))); @@ -21,6 +22,7 @@ pub async fn create_dump( index_scheduler: GuardedData, Data>, auth_controller: GuardedData, Data>, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { analytics.publish("Dump Created".to_string(), json!({}), Some(&req)); @@ -29,7 +31,7 @@ pub async fn create_dump( keys: auth_controller.list_keys()?, instance_uid: analytics.instance_uid().cloned(), }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 492f039cf..5bf7eaa8d 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -38,6 +38,7 @@ use crate::extractors::payload::Payload; use crate::extractors::sequential_extractor::SeqHandler; use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use crate::search::parse_filter; +use crate::Opt; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] @@ -119,6 +120,7 @@ pub async fn delete_document( index_scheduler: GuardedData, Data>, path: web::Path, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let DocumentParam { index_uid, document_id } = path.into_inner(); @@ -130,7 +132,7 @@ pub async fn delete_document( index_uid: index_uid.to_string(), documents_ids: vec![document_id], }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!("returns: {:?}", task); @@ -268,6 +270,7 @@ pub async fn replace_documents( params: AwebQueryParameter, body: Payload, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -278,7 +281,7 @@ pub async fn replace_documents( analytics.add_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task = document_addition( extract_mime_type(&req)?, index_scheduler, @@ -302,6 +305,7 @@ pub async fn update_documents( params: AwebQueryParameter, body: Payload, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -312,7 +316,7 @@ pub async fn update_documents( analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task = document_addition( extract_mime_type(&req)?, index_scheduler, @@ -472,6 +476,7 @@ pub async fn delete_documents_batch( index_uid: web::Path, body: web::Json>, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { debug!(parameters = ?body, "Delete documents by batch"); @@ -486,7 +491,7 @@ pub async fn delete_documents_batch( let task = KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); @@ -506,6 +511,7 @@ pub async fn delete_documents_by_filter( index_uid: web::Path, body: AwebJson, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { debug!(parameters = ?body, "Delete documents by filter"); @@ -523,7 +529,7 @@ pub async fn delete_documents_by_filter( .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); @@ -535,13 +541,14 @@ pub async fn clear_all_documents( index_scheduler: GuardedData, Data>, index_uid: web::Path, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; analytics.delete_documents(DocumentDeletionKind::ClearAll, &req); let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index 6451d930d..59a1f0e64 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -22,6 +22,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::sequential_extractor::SeqHandler; +use crate::Opt; pub mod documents; pub mod facet_search; @@ -123,6 +124,7 @@ pub async fn create_index( index_scheduler: GuardedData, Data>, body: AwebJson, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { debug!(parameters = ?body, "Create index"); @@ -137,7 +139,7 @@ pub async fn create_index( ); let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Create index"); @@ -191,6 +193,7 @@ pub async fn update_index( index_uid: web::Path, body: AwebJson, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { debug!(parameters = ?body, "Update index"); @@ -207,7 +210,7 @@ pub async fn update_index( primary_key: body.primary_key, }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); @@ -219,10 +222,11 @@ pub async fn delete_index( index_scheduler: GuardedData, Data>, index_uid: web::Path, req: HttpRequest, + opt: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); debug!(returns = ?task, "Delete index"); diff --git a/meilisearch/src/routes/indexes/settings.rs b/meilisearch/src/routes/indexes/settings.rs index 9fbd84161..6e43bce41 100644 --- a/meilisearch/src/routes/indexes/settings.rs +++ b/meilisearch/src/routes/indexes/settings.rs @@ -16,6 +16,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::routes::{get_task_id, SummarizedTaskView}; +use crate::Opt; #[macro_export] macro_rules! make_setting_route { @@ -34,6 +35,7 @@ macro_rules! make_setting_route { use $crate::extractors::authentication::policies::*; use $crate::extractors::authentication::GuardedData; use $crate::extractors::sequential_extractor::SeqHandler; + use $crate::Opt; use $crate::routes::{get_task_id, SummarizedTaskView}; pub async fn delete( @@ -43,6 +45,7 @@ macro_rules! make_setting_route { >, index_uid: web::Path, req: HttpRequest, + opt: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -57,7 +60,7 @@ macro_rules! make_setting_route { is_deletion: true, allow_index_creation, }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) .await?? @@ -75,6 +78,7 @@ macro_rules! make_setting_route { index_uid: actix_web::web::Path, body: deserr::actix_web::AwebJson, $err_ty>, req: HttpRequest, + opt: web::Data, $analytics_var: web::Data, ) -> std::result::Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -107,7 +111,7 @@ macro_rules! make_setting_route { is_deletion: false, allow_index_creation, }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) .await?? @@ -655,6 +659,7 @@ pub async fn update_all( index_uid: web::Path, body: AwebJson, DeserrJsonError>, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -770,7 +775,7 @@ pub async fn update_all( is_deletion: false, allow_index_creation, }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); @@ -795,6 +800,7 @@ pub async fn delete_all( index_scheduler: GuardedData, Data>, index_uid: web::Path, req: HttpRequest, + opt: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; @@ -808,7 +814,7 @@ pub async fn delete_all( is_deletion: true, allow_index_creation, }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index 61a9f3352..2dc89b150 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -15,6 +15,7 @@ use tracing::debug; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; +use crate::Opt; const PAGINATION_DEFAULT_LIMIT: usize = 20; @@ -45,7 +46,10 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::scope("/experimental-features").configure(features::configure)); } -pub fn get_task_id(req: &HttpRequest) -> Result, ResponseError> { +pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result, ResponseError> { + if !opt.experimental_ha_parameters { + return Ok(None); + } let task_id = req .headers() .get("TaskId") diff --git a/meilisearch/src/routes/snapshot.rs b/meilisearch/src/routes/snapshot.rs index 28dbac85f..6b3178126 100644 --- a/meilisearch/src/routes/snapshot.rs +++ b/meilisearch/src/routes/snapshot.rs @@ -11,6 +11,7 @@ use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; use crate::routes::{get_task_id, SummarizedTaskView}; +use crate::Opt; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot)))); @@ -19,12 +20,13 @@ pub fn configure(cfg: &mut web::ServiceConfig) { pub async fn create_snapshot( index_scheduler: GuardedData, Data>, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req)); let task = KindWithContent::SnapshotCreation; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); diff --git a/meilisearch/src/routes/swap_indexes.rs b/meilisearch/src/routes/swap_indexes.rs index 64268dbfa..f8adeeb18 100644 --- a/meilisearch/src/routes/swap_indexes.rs +++ b/meilisearch/src/routes/swap_indexes.rs @@ -16,6 +16,7 @@ use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::sequential_extractor::SeqHandler; +use crate::Opt; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes)))); @@ -32,6 +33,7 @@ pub async fn swap_indexes( index_scheduler: GuardedData, Data>, params: AwebJson, DeserrJsonError>, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let params = params.into_inner(); @@ -60,7 +62,7 @@ pub async fn swap_indexes( } let task = KindWithContent::IndexSwap { swaps }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task: SummarizedTaskView = tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index 26e1c43f8..279b57e3d 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -23,6 +23,7 @@ use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; +use crate::Opt; const DEFAULT_LIMIT: u32 = 20; @@ -161,6 +162,7 @@ async fn cancel_tasks( index_scheduler: GuardedData, Data>, params: AwebQueryParameter, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let params = params.into_inner(); @@ -197,7 +199,7 @@ async fn cancel_tasks( let task_cancelation = KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??; let task: SummarizedTaskView = task.into(); @@ -209,6 +211,7 @@ async fn delete_tasks( index_scheduler: GuardedData, Data>, params: AwebQueryParameter, req: HttpRequest, + opt: web::Data, analytics: web::Data, ) -> Result { let params = params.into_inner(); @@ -244,7 +247,7 @@ async fn delete_tasks( let task_deletion = KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; - let uid = get_task_id(&req)?; + let uid = get_task_id(&req, &opt)?; let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??; let task: SummarizedTaskView = task.into(); diff --git a/meilisearch/tests/index/create_index.rs b/meilisearch/tests/index/create_index.rs index b9f755f35..7a678624c 100644 --- a/meilisearch/tests/index/create_index.rs +++ b/meilisearch/tests/index/create_index.rs @@ -2,9 +2,10 @@ use actix_web::http::header::ContentType; use actix_web::test; use http::header::ACCEPT_ENCODING; use meili_snap::{json_string, snapshot}; +use meilisearch::Opt; use crate::common::encoder::Encoder; -use crate::common::{Server, Value}; +use crate::common::{default_settings, Server, Value}; use crate::json; #[actix_rt::test] @@ -202,7 +203,11 @@ async fn error_create_with_invalid_index_uid() { #[actix_rt::test] async fn send_task_id() { - let server = Server::new().await; + let temp = tempfile::tempdir().unwrap(); + + let options = Opt { experimental_ha_parameters: true, ..default_settings(temp.path()) }; + let server = Server::new_with_options(options).await.unwrap(); + let app = server.init_web_app().await; let index = server.index("catto"); let (response, code) = index.create(None).await;