4442: Send custom task r=ManyTheFish a=irevoire

This PR has already been merged on main but was supposed to be merged on `release-v1.7.0` thus we need to merge it a second time; sorry 😓 

### This PR implements the necessary parameters for the High Availability

Introduce a new CLI flag called `--experimental-replication-parameters` that changes a few behaviors in the engine:
- [The auto-deletion of tasks is disabled](https://specs.meilisearch.com/specifications/text/0060-tasks-api.html#_2-technical-details)
- Upon registering a task, you can choose its task ID by sending a new header: `TaskId: 456645`. It must be a valid number, which must be superior to the last task id ever seen.
- Add the ability to « dry-register » a task. That means meilisearch will answer to you with a valid task ID like everything went well, but won’t actually write anything in the database. To do that, you need to use the `DryRun: true` header.
- Specification’s here: https://github.com/meilisearch/specifications/pull/266

Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-02-26 15:20:16 +00:00 committed by GitHub
commit 6dcb5219a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1331 additions and 434 deletions

View File

@ -1,5 +1,5 @@
use std::fs::File as StdFile; use std::fs::File as StdFile;
use std::ops::{Deref, DerefMut}; use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
@ -22,20 +22,6 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
impl Deref for File {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct FileStore { pub struct FileStore {
path: PathBuf, path: PathBuf,
@ -56,7 +42,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?; let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
let update_file = File { file, path }; let update_file = File { file: Some(file), path };
Ok((uuid, update_file)) Ok((uuid, update_file))
} }
@ -67,7 +53,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?; let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::from_u128(uuid); let uuid = Uuid::from_u128(uuid);
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
let update_file = File { file, path }; let update_file = File { file: Some(file), path };
Ok((uuid, update_file)) Ok((uuid, update_file))
} }
@ -136,16 +122,40 @@ impl FileStore {
pub struct File { pub struct File {
path: PathBuf, path: PathBuf,
file: NamedTempFile, file: Option<NamedTempFile>,
} }
impl File { impl File {
pub fn dry_file() -> Result<Self> {
Ok(Self { path: PathBuf::new(), file: None })
}
pub fn persist(self) -> Result<()> { pub fn persist(self) -> Result<()> {
self.file.persist(&self.path)?; if let Some(file) = self.file {
file.persist(&self.path)?;
}
Ok(()) Ok(())
} }
} }
impl Write for File {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Some(file) = self.file.as_mut() {
file.write(buf)
} else {
Ok(buf.len())
}
}
fn flush(&mut self) -> std::io::Result<()> {
if let Some(file) = self.file.as_mut() {
file.flush()
} else {
Ok(())
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::io::Write; use std::io::Write;

View File

@ -48,6 +48,8 @@ impl From<DateField> for Code {
pub enum Error { pub enum Error {
#[error("{1}")] #[error("{1}")]
WithCustomErrorCode(Code, Box<Self>), WithCustomErrorCode(Code, Box<Self>),
#[error("Received bad task id: {received} should be >= to {expected}.")]
BadTaskId { received: TaskId, expected: TaskId },
#[error("Index `{0}` not found.")] #[error("Index `{0}` not found.")]
IndexNotFound(String), IndexNotFound(String),
#[error("Index `{0}` already exists.")] #[error("Index `{0}` already exists.")]
@ -161,6 +163,7 @@ impl Error {
match self { match self {
Error::IndexNotFound(_) Error::IndexNotFound(_)
| Error::WithCustomErrorCode(_, _) | Error::WithCustomErrorCode(_, _)
| Error::BadTaskId { .. }
| Error::IndexAlreadyExists(_) | Error::IndexAlreadyExists(_)
| Error::SwapDuplicateIndexFound(_) | Error::SwapDuplicateIndexFound(_)
| Error::SwapDuplicateIndexesFound(_) | Error::SwapDuplicateIndexesFound(_)
@ -205,6 +208,7 @@ impl ErrorCode for Error {
fn error_code(&self) -> Code { fn error_code(&self) -> Code {
match self { match self {
Error::WithCustomErrorCode(code, _) => *code, Error::WithCustomErrorCode(code, _) => *code,
Error::BadTaskId { .. } => Code::BadRequest,
Error::IndexNotFound(_) => Code::IndexNotFound, Error::IndexNotFound(_) => Code::IndexNotFound,
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists, Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound, Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,

View File

@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler { let IndexScheduler {
autobatching_enabled, autobatching_enabled,
cleanup_enabled: _,
must_stop_processing: _, must_stop_processing: _,
processing_tasks, processing_tasks,
file_store, file_store,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,90 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 2,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 3,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]

View File

@ -0,0 +1,90 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 2,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 3,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]

View File

@ -1,6 +1,6 @@
use std::fmt::{self, Debug, Display}; use std::fmt::{self, Debug, Display};
use std::fs::File; use std::fs::File;
use std::io::{self, Seek, Write}; use std::io::{self, BufWriter, Write};
use std::marker::PhantomData; use std::marker::PhantomData;
use memmap2::MmapOptions; use memmap2::MmapOptions;
@ -104,8 +104,8 @@ impl ErrorCode for DocumentFormatError {
} }
/// Reads CSV from input and write an obkv batch to writer. /// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result<u64> { pub fn read_csv(file: &File, writer: impl Write, delimiter: u8) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref()); let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?; builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?;
@ -117,8 +117,8 @@ pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result
} }
/// Reads JSON from temporary file and write an obkv batch to writer. /// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> { pub fn read_json(file: &File, writer: impl Write) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
let mut deserializer = serde_json::Deserializer::from_slice(&mmap); let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
@ -151,8 +151,8 @@ pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
} }
/// Reads JSON from temporary file and write an obkv batch to writer. /// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> { pub fn read_ndjson(file: &File, writer: impl Write) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
for result in serde_json::Deserializer::from_slice(&mmap).into_iter() { for result in serde_json::Deserializer::from_slice(&mmap).into_iter() {

View File

@ -253,6 +253,7 @@ struct Infos {
env: String, env: String,
experimental_enable_metrics: bool, experimental_enable_metrics: bool,
experimental_logs_mode: LogMode, experimental_logs_mode: LogMode,
experimental_replication_parameters: bool,
experimental_enable_logs_route: bool, experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize, experimental_max_number_of_batched_tasks: usize,
@ -292,6 +293,7 @@ impl From<Opt> for Infos {
db_path, db_path,
experimental_enable_metrics, experimental_enable_metrics,
experimental_logs_mode, experimental_logs_mode,
experimental_replication_parameters,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
@ -340,6 +342,7 @@ impl From<Opt> for Infos {
env, env,
experimental_enable_metrics, experimental_enable_metrics,
experimental_logs_mode, experimental_logs_mode,
experimental_replication_parameters,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
db_path: db_path != PathBuf::from("./data.ms"), db_path: db_path != PathBuf::from("./data.ms"),

View File

@ -131,6 +131,7 @@ gen_seq! { SeqFromRequestFut3; A B C }
gen_seq! { SeqFromRequestFut4; A B C D } gen_seq! { SeqFromRequestFut4; A B C D }
gen_seq! { SeqFromRequestFut5; A B C D E } gen_seq! { SeqFromRequestFut5; A B C D E }
gen_seq! { SeqFromRequestFut6; A B C D E F } gen_seq! { SeqFromRequestFut6; A B C D E F }
gen_seq! { SeqFromRequestFut7; A B C D E F G }
pin_project! { pin_project! {
#[project = ExtractProj] #[project = ExtractProj]

View File

@ -265,7 +265,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
.name(String::from("register-snapshot-tasks")) .name(String::from("register-snapshot-tasks"))
.spawn(move || loop { .spawn(move || loop {
thread::sleep(snapshot_delay); thread::sleep(snapshot_delay);
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { if let Err(e) =
index_scheduler.register(KindWithContent::SnapshotCreation, None, false)
{
error!("Error while registering snapshot: {}", e); error!("Error while registering snapshot: {}", e);
} }
}) })
@ -300,6 +302,7 @@ fn open_or_create_database_unchecked(
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?, indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true, autobatching_enabled: true,
cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
@ -468,6 +471,7 @@ pub fn configure_data(
.app_data(web::Data::from(analytics)) .app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs_route)) .app_data(web::Data::new(logs_route))
.app_data(web::Data::new(logs_stderr)) .app_data(web::Data::new(logs_stderr))
.app_data(web::Data::new(opt.clone()))
.app_data( .app_data(
web::JsonConfig::default() web::JsonConfig::default()
.limit(http_payload_size_limit) .limit(http_payload_size_limit)

View File

@ -52,6 +52,7 @@ const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR"; const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL"; const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE"; const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE";
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
@ -358,6 +359,16 @@ pub struct Opt {
#[serde(default)] #[serde(default)]
pub experimental_enable_logs_route: bool, pub experimental_enable_logs_route: bool,
/// Enable multiple features that helps you to run meilisearch in a replicated context.
/// For more information, see: <https://github.com/orgs/meilisearch/discussions/725>
///
/// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now
/// - Lets you specify a custom task ID upon registering a task
/// - Lets you execute dry-register a task (get an answer from the route but nothing is actually registered in meilisearch and it won't be processed)
#[clap(long, env = MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS)]
#[serde(default)]
pub experimental_replication_parameters: bool,
/// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652> /// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652>
#[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)] #[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)]
#[serde(default)] #[serde(default)]
@ -465,6 +476,7 @@ impl Opt {
experimental_enable_metrics, experimental_enable_metrics,
experimental_logs_mode, experimental_logs_mode,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_replication_parameters,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
@ -525,6 +537,10 @@ impl Opt {
MEILI_EXPERIMENTAL_LOGS_MODE, MEILI_EXPERIMENTAL_LOGS_MODE,
experimental_logs_mode.to_string(), experimental_logs_mode.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS,
experimental_replication_parameters.to_string(),
);
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE, MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE,
experimental_enable_logs_route.to_string(), experimental_enable_logs_route.to_string(),

View File

@ -11,7 +11,8 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::SummarizedTaskView; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
@ -21,6 +22,7 @@ pub async fn create_dump(
index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>,
auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>, auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
analytics.publish("Dump Created".to_string(), json!({}), Some(&req)); analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
@ -29,8 +31,12 @@ pub async fn create_dump(
keys: auth_controller.list_keys()?, keys: auth_controller.list_keys()?,
instance_uid: analytics.instance_uid().cloned(), instance_uid: analytics.instance_uid().cloned(),
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create dump"); debug!(returns = ?task, "Create dump");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -7,7 +7,7 @@ use bstr::ByteSlice as _;
use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::actix_web::{AwebJson, AwebQueryParameter};
use deserr::Deserr; use deserr::Deserr;
use futures::StreamExt; use futures::StreamExt;
use index_scheduler::IndexScheduler; use index_scheduler::{IndexScheduler, TaskId};
use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError}; use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
@ -36,8 +36,11 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload; use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use crate::routes::{
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
};
use crate::search::parse_filter; use crate::search::parse_filter;
use crate::Opt;
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| { static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
@ -119,6 +122,7 @@ pub async fn delete_document(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>, path: web::Path<DocumentParam>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let DocumentParam { index_uid, document_id } = path.into_inner(); let DocumentParam { index_uid, document_id } = path.into_inner();
@ -130,9 +134,13 @@ pub async fn delete_document(
index_uid: index_uid.to_string(), index_uid: index_uid.to_string(),
documents_ids: vec![document_id], documents_ids: vec![document_id],
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
debug!(returns = ?task, "Delete document"); .await??
.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
} }
@ -267,6 +275,7 @@ pub async fn replace_documents(
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>, params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
body: Payload, body: Payload,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -277,6 +286,8 @@ pub async fn replace_documents(
analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req); analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition( let task = document_addition(
extract_mime_type(&req)?, extract_mime_type(&req)?,
index_scheduler, index_scheduler,
@ -285,6 +296,8 @@ pub async fn replace_documents(
params.csv_delimiter, params.csv_delimiter,
body, body,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
uid,
dry_run,
allow_index_creation, allow_index_creation,
) )
.await?; .await?;
@ -299,6 +312,7 @@ pub async fn update_documents(
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>, params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
body: Payload, body: Payload,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -309,6 +323,8 @@ pub async fn update_documents(
analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req); analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition( let task = document_addition(
extract_mime_type(&req)?, extract_mime_type(&req)?,
index_scheduler, index_scheduler,
@ -317,6 +333,8 @@ pub async fn update_documents(
params.csv_delimiter, params.csv_delimiter,
body, body,
IndexDocumentsMethod::UpdateDocuments, IndexDocumentsMethod::UpdateDocuments,
uid,
dry_run,
allow_index_creation, allow_index_creation,
) )
.await?; .await?;
@ -334,6 +352,8 @@ async fn document_addition(
csv_delimiter: Option<u8>, csv_delimiter: Option<u8>,
mut body: Payload, mut body: Payload,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
task_id: Option<TaskId>,
dry_run: bool,
allow_index_creation: bool, allow_index_creation: bool,
) -> Result<SummarizedTaskView, MeilisearchHttpError> { ) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let format = match ( let format = match (
@ -366,7 +386,7 @@ async fn document_addition(
} }
}; };
let (uuid, mut update_file) = index_scheduler.create_update_file()?; let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?;
let temp_file = match tempfile() { let temp_file = match tempfile() {
Ok(file) => file, Ok(file) => file,
@ -405,11 +425,9 @@ async fn document_addition(
let read_file = buffer.into_inner().into_std().await; let read_file = buffer.into_inner().into_std().await;
let documents_count = tokio::task::spawn_blocking(move || { let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = match format { let documents_count = match format {
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, PayloadType::Json => read_json(&read_file, &mut update_file)?,
PayloadType::Csv { delimiter } => { PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?,
read_csv(&read_file, update_file.as_file_mut(), delimiter)? PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?,
}
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
}; };
// we NEED to persist the file here because we moved the `udpate_file` in another task. // we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?; update_file.persist()?;
@ -450,7 +468,9 @@ async fn document_addition(
}; };
let scheduler = index_scheduler.clone(); let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? { let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
.await?
{
Ok(task) => task, Ok(task) => task,
Err(e) => { Err(e) => {
index_scheduler.delete_update_file(uuid)?; index_scheduler.delete_update_file(uuid)?;
@ -466,6 +486,7 @@ pub async fn delete_documents_batch(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: web::Json<Vec<Value>>, body: web::Json<Vec<Value>>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by batch"); debug!(parameters = ?body, "Delete documents by batch");
@ -480,8 +501,12 @@ pub async fn delete_documents_batch(
let task = let task =
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by batch"); debug!(returns = ?task, "Delete documents by batch");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -499,6 +524,7 @@ pub async fn delete_documents_by_filter(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>, body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by filter"); debug!(parameters = ?body, "Delete documents by filter");
@ -516,8 +542,12 @@ pub async fn delete_documents_by_filter(
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by filter"); debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -527,14 +557,19 @@ pub async fn clear_all_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req); analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete all documents"); debug!(returns = ?task, "Delete all documents");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -17,11 +17,13 @@ use serde_json::json;
use time::OffsetDateTime; use time::OffsetDateTime;
use tracing::debug; use tracing::debug;
use super::{Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::is_dry_run;
use crate::Opt;
pub mod documents; pub mod documents;
pub mod facet_search; pub mod facet_search;
@ -123,6 +125,7 @@ pub async fn create_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>,
body: AwebJson<IndexCreateRequest, DeserrJsonError>, body: AwebJson<IndexCreateRequest, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Create index"); debug!(parameters = ?body, "Create index");
@ -137,8 +140,12 @@ pub async fn create_index(
); );
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create index"); debug!(returns = ?task, "Create index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -190,6 +197,7 @@ pub async fn update_index(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: AwebJson<UpdateIndexRequest, DeserrJsonError>, body: AwebJson<UpdateIndexRequest, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Update index"); debug!(parameters = ?body, "Update index");
@ -206,8 +214,12 @@ pub async fn update_index(
primary_key: body.primary_key, primary_key: body.primary_key,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Update index"); debug!(returns = ?task, "Update index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -216,11 +228,17 @@ pub async fn update_index(
pub async fn delete_index( pub async fn delete_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete index"); debug!(returns = ?task, "Delete index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -15,7 +15,8 @@ use tracing::debug;
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::routes::SummarizedTaskView; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt;
#[macro_export] #[macro_export]
macro_rules! make_setting_route { macro_rules! make_setting_route {
@ -34,7 +35,8 @@ macro_rules! make_setting_route {
use $crate::extractors::authentication::policies::*; use $crate::extractors::authentication::policies::*;
use $crate::extractors::authentication::GuardedData; use $crate::extractors::authentication::GuardedData;
use $crate::extractors::sequential_extractor::SeqHandler; use $crate::extractors::sequential_extractor::SeqHandler;
use $crate::routes::SummarizedTaskView; use $crate::Opt;
use $crate::routes::{is_dry_run, get_task_id, SummarizedTaskView};
pub async fn delete( pub async fn delete(
index_scheduler: GuardedData< index_scheduler: GuardedData<
@ -42,6 +44,8 @@ macro_rules! make_setting_route {
Data<IndexScheduler>, Data<IndexScheduler>,
>, >,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -56,8 +60,10 @@ macro_rules! make_setting_route {
is_deletion: true, is_deletion: true,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)) tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await?? .await??
.into(); .into();
@ -73,6 +79,7 @@ macro_rules! make_setting_route {
index_uid: actix_web::web::Path<String>, index_uid: actix_web::web::Path<String>,
body: deserr::actix_web::AwebJson<Option<$type>, $err_ty>, body: deserr::actix_web::AwebJson<Option<$type>, $err_ty>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
$analytics_var: web::Data<dyn Analytics>, $analytics_var: web::Data<dyn Analytics>,
) -> std::result::Result<HttpResponse, ResponseError> { ) -> std::result::Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -105,8 +112,10 @@ macro_rules! make_setting_route {
is_deletion: false, is_deletion: false,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)) tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await?? .await??
.into(); .into();
@ -652,6 +661,7 @@ pub async fn update_all(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: AwebJson<Settings<Unchecked>, DeserrJsonError>, body: AwebJson<Settings<Unchecked>, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -767,8 +777,12 @@ pub async fn update_all(
is_deletion: false, is_deletion: false,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Update all settings"); debug!(returns = ?task, "Update all settings");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -790,6 +804,8 @@ pub async fn get_all(
pub async fn delete_all( pub async fn delete_all(
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -803,8 +819,12 @@ pub async fn delete_all(
is_deletion: true, is_deletion: true,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete all settings"); debug!(returns = ?task, "Delete all settings");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -4,7 +4,7 @@ use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::IndexScheduler; use index_scheduler::IndexScheduler;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::{Code, ResponseError};
use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::{Kind, Status, Task, TaskId}; use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -15,6 +15,7 @@ use tracing::debug;
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::Opt;
const PAGINATION_DEFAULT_LIMIT: usize = 20; const PAGINATION_DEFAULT_LIMIT: usize = 20;
@ -45,6 +46,56 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/experimental-features").configure(features::configure)); .service(web::scope("/experimental-features").configure(features::configure));
} }
pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result<Option<TaskId>, ResponseError> {
if !opt.experimental_replication_parameters {
return Ok(None);
}
let task_id = req
.headers()
.get("TaskId")
.map(|header| {
header.to_str().map_err(|e| {
ResponseError::from_msg(
format!("TaskId is not a valid utf-8 string: {e}"),
Code::BadRequest,
)
})
})
.transpose()?
.map(|s| {
s.parse::<TaskId>().map_err(|e| {
ResponseError::from_msg(
format!(
"Could not parse the TaskId as a {}: {e}",
std::any::type_name::<TaskId>(),
),
Code::BadRequest,
)
})
})
.transpose()?;
Ok(task_id)
}
pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> {
if !opt.experimental_replication_parameters {
return Ok(false);
}
Ok(req
.headers()
.get("DryRun")
.map(|header| {
header.to_str().map_err(|e| {
ResponseError::from_msg(
format!("DryRun is not a valid utf-8 string: {e}"),
Code::BadRequest,
)
})
})
.transpose()?
.map_or(false, |s| s.to_lowercase() == "true"))
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView { pub struct SummarizedTaskView {

View File

@ -10,7 +10,8 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::SummarizedTaskView; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
@ -19,13 +20,18 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
pub async fn create_snapshot( pub async fn create_snapshot(
index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req)); analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
let task = KindWithContent::SnapshotCreation; let task = KindWithContent::SnapshotCreation;
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create snapshot"); debug!(returns = ?task, "Create snapshot");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -10,12 +10,13 @@ use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::tasks::{IndexSwap, KindWithContent}; use meilisearch_types::tasks::{IndexSwap, KindWithContent};
use serde_json::json; use serde_json::json;
use super::SummarizedTaskView; use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes))));
@ -32,6 +33,7 @@ pub async fn swap_indexes(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>,
params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>, params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner(); let params = params.into_inner();
@ -60,7 +62,11 @@ pub async fn swap_indexes(
} }
let task = KindWithContent::IndexSwap { swaps }; let task = KindWithContent::IndexSwap { swaps };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
} }

View File

@ -18,11 +18,12 @@ use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time}; use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task; use tokio::task;
use super::SummarizedTaskView; use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
const DEFAULT_LIMIT: u32 = 20; const DEFAULT_LIMIT: u32 = 20;
@ -161,6 +162,7 @@ async fn cancel_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>, params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner(); let params = params.into_inner();
@ -197,7 +199,11 @@ async fn cancel_tasks(
let task_cancelation = let task_cancelation =
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task =
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid, dry_run))
.await??;
let task: SummarizedTaskView = task.into(); let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task)) Ok(HttpResponse::Ok().json(task))
@ -207,6 +213,7 @@ async fn delete_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>, params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner(); let params = params.into_inner();
@ -242,7 +249,10 @@ async fn delete_tasks(
let task_deletion = let task_deletion =
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid, dry_run))
.await??;
let task: SummarizedTaskView = task.into(); let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task)) Ok(HttpResponse::Ok().json(task))

View File

@ -100,16 +100,11 @@ impl Index<'_> {
pub async fn raw_add_documents( pub async fn raw_add_documents(
&self, &self,
payload: &str, payload: &str,
content_type: Option<&str>, headers: Vec<(&str, &str)>,
query_parameter: &str, query_parameter: &str,
) -> (Value, StatusCode) { ) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter); let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter);
self.service.post_str(url, payload, headers).await
if let Some(content_type) = content_type {
self.service.post_str(url, payload, vec![("Content-Type", content_type)]).await
} else {
self.service.post_str(url, payload, Vec::new()).await
}
} }
pub async fn update_documents( pub async fn update_documents(

View File

@ -1,10 +1,11 @@
use actix_web::test; use actix_web::test;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use meilisearch::Opt;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::common::encoder::Encoder; use crate::common::encoder::Encoder;
use crate::common::{GetAllDocumentsOptions, Server, Value}; use crate::common::{default_settings, GetAllDocumentsOptions, Server, Value};
use crate::json; use crate::json;
/// This is the basic usage of our API and every other tests uses the content-type application/json /// This is the basic usage of our API and every other tests uses the content-type application/json
@ -2157,3 +2158,49 @@ async fn batch_several_documents_addition() {
assert_eq!(code, 200, "failed with `{}`", response); assert_eq!(code, 200, "failed with `{}`", response);
assert_eq!(response["results"].as_array().unwrap().len(), 120); assert_eq!(response["results"].as_array().unwrap().len(), 120);
} }
#[actix_rt::test]
async fn dry_register_file() {
let temp = tempfile::tempdir().unwrap();
let options =
Opt { experimental_replication_parameters: true, ..default_settings(temp.path()) };
let server = Server::new_with_options(options).await.unwrap();
let index = server.index("tamo");
let documents = r#"
{
"id": "12",
"doggo": "kefir"
}
"#;
let (response, code) = index
.raw_add_documents(
documents,
vec![("Content-Type", "application/json"), ("DryRun", "true")],
"",
)
.await;
snapshot!(response, @r###"
{
"taskUid": 0,
"indexUid": "tamo",
"status": "enqueued",
"type": "documentAdditionOrUpdate",
"enqueuedAt": "[date]"
}
"###);
snapshot!(code, @"202 Accepted");
let (response, code) = index.get_task(response.uid()).await;
snapshot!(response, @r###"
{
"message": "Task `0` not found.",
"code": "task_not_found",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#task_not_found"
}
"###);
snapshot!(code, @"404 Not Found");
}

View File

@ -209,7 +209,8 @@ async fn replace_documents_missing_payload() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = index.raw_add_documents("", Some("application/json"), "").await; let (response, code) =
index.raw_add_documents("", vec![("Content-Type", "application/json")], "").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -220,7 +221,8 @@ async fn replace_documents_missing_payload() {
} }
"###); "###);
let (response, code) = index.raw_add_documents("", Some("application/x-ndjson"), "").await; let (response, code) =
index.raw_add_documents("", vec![("Content-Type", "application/x-ndjson")], "").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -231,7 +233,8 @@ async fn replace_documents_missing_payload() {
} }
"###); "###);
let (response, code) = index.raw_add_documents("", Some("text/csv"), "").await; let (response, code) =
index.raw_add_documents("", vec![("Content-Type", "text/csv")], "").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -287,7 +290,7 @@ async fn replace_documents_missing_content_type() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = index.raw_add_documents("", None, "").await; let (response, code) = index.raw_add_documents("", Vec::new(), "").await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -299,7 +302,7 @@ async fn replace_documents_missing_content_type() {
"###); "###);
// even with a csv delimiter specified this error is triggered first // even with a csv delimiter specified this error is triggered first
let (response, code) = index.raw_add_documents("", None, "?csvDelimiter=;").await; let (response, code) = index.raw_add_documents("", Vec::new(), "?csvDelimiter=;").await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -345,7 +348,7 @@ async fn replace_documents_bad_content_type() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = index.raw_add_documents("", Some("doggo"), "").await; let (response, code) = index.raw_add_documents("", vec![("Content-Type", "doggo")], "").await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -379,8 +382,9 @@ async fn replace_documents_bad_csv_delimiter() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/json"), "?csvDelimiter").await; .raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter")
.await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -391,8 +395,9 @@ async fn replace_documents_bad_csv_delimiter() {
} }
"###); "###);
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/json"), "?csvDelimiter=doggo").await; .raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter=doggo")
.await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -404,7 +409,11 @@ async fn replace_documents_bad_csv_delimiter() {
"###); "###);
let (response, code) = index let (response, code) = index
.raw_add_documents("", Some("application/json"), &format!("?csvDelimiter={}", encode("🍰"))) .raw_add_documents(
"",
vec![("Content-Type", "application/json")],
&format!("?csvDelimiter={}", encode("🍰")),
)
.await; .await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
@ -469,8 +478,9 @@ async fn replace_documents_csv_delimiter_with_bad_content_type() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/json"), "?csvDelimiter=a").await; .raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter=a")
.await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -481,8 +491,9 @@ async fn replace_documents_csv_delimiter_with_bad_content_type() {
} }
"###); "###);
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/x-ndjson"), "?csvDelimiter=a").await; .raw_add_documents("", vec![("Content-Type", "application/x-ndjson")], "?csvDelimiter=a")
.await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {

View File

@ -2,9 +2,10 @@ use actix_web::http::header::ContentType;
use actix_web::test; use actix_web::test;
use http::header::ACCEPT_ENCODING; use http::header::ACCEPT_ENCODING;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use meilisearch::Opt;
use crate::common::encoder::Encoder; use crate::common::encoder::Encoder;
use crate::common::{Server, Value}; use crate::common::{default_settings, Server, Value};
use crate::json; use crate::json;
#[actix_rt::test] #[actix_rt::test]
@ -199,3 +200,79 @@ async fn error_create_with_invalid_index_uid() {
} }
"###); "###);
} }
#[actix_rt::test]
async fn send_task_id() {
let temp = tempfile::tempdir().unwrap();
let options =
Opt { experimental_replication_parameters: true, ..default_settings(temp.path()) };
let server = Server::new_with_options(options).await.unwrap();
let app = server.init_web_app().await;
let index = server.index("catto");
let (response, code) = index.create(None).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 0,
"indexUid": "catto",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
let body = serde_json::to_string(&json!({
"uid": "doggo",
"primaryKey": None::<&str>,
}))
.unwrap();
let req = test::TestRequest::post()
.uri("/indexes")
.insert_header(("TaskId", "25"))
.insert_header(ContentType::json())
.set_payload(body)
.to_request();
let res = test::call_service(&app, req).await;
snapshot!(res.status(), @"202 Accepted");
let bytes = test::read_body(res).await;
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 25,
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
let body = serde_json::to_string(&json!({
"uid": "girafo",
"primaryKey": None::<&str>,
}))
.unwrap();
let req = test::TestRequest::post()
.uri("/indexes")
.insert_header(("TaskId", "12"))
.insert_header(ContentType::json())
.set_payload(body)
.to_request();
let res = test::call_service(&app, req).await;
snapshot!(res.status(), @"400 Bad Request");
let bytes = test::read_body(res).await;
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
snapshot!(json_string!(response), @r###"
{
"message": "Received bad task id: 12 should be >= to 26.",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
}