mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
let you specify your task id
This commit is contained in:
parent
5ee6aaddc4
commit
eb25b07390
@ -48,6 +48,8 @@ impl From<DateField> for Code {
|
|||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("{1}")]
|
#[error("{1}")]
|
||||||
WithCustomErrorCode(Code, Box<Self>),
|
WithCustomErrorCode(Code, Box<Self>),
|
||||||
|
#[error("Received bad task id: {received} should be >= to {expected}.")]
|
||||||
|
BadTaskId { received: TaskId, expected: TaskId },
|
||||||
#[error("Index `{0}` not found.")]
|
#[error("Index `{0}` not found.")]
|
||||||
IndexNotFound(String),
|
IndexNotFound(String),
|
||||||
#[error("Index `{0}` already exists.")]
|
#[error("Index `{0}` already exists.")]
|
||||||
@ -161,6 +163,7 @@ impl Error {
|
|||||||
match self {
|
match self {
|
||||||
Error::IndexNotFound(_)
|
Error::IndexNotFound(_)
|
||||||
| Error::WithCustomErrorCode(_, _)
|
| Error::WithCustomErrorCode(_, _)
|
||||||
|
| Error::BadTaskId { .. }
|
||||||
| Error::IndexAlreadyExists(_)
|
| Error::IndexAlreadyExists(_)
|
||||||
| Error::SwapDuplicateIndexFound(_)
|
| Error::SwapDuplicateIndexFound(_)
|
||||||
| Error::SwapDuplicateIndexesFound(_)
|
| Error::SwapDuplicateIndexesFound(_)
|
||||||
@ -205,6 +208,7 @@ impl ErrorCode for Error {
|
|||||||
fn error_code(&self) -> Code {
|
fn error_code(&self) -> Code {
|
||||||
match self {
|
match self {
|
||||||
Error::WithCustomErrorCode(code, _) => *code,
|
Error::WithCustomErrorCode(code, _) => *code,
|
||||||
|
Error::BadTaskId { .. } => Code::BadRequest,
|
||||||
Error::IndexNotFound(_) => Code::IndexNotFound,
|
Error::IndexNotFound(_) => Code::IndexNotFound,
|
||||||
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
|
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
|
||||||
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,
|
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -265,7 +265,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
|||||||
.name(String::from("register-snapshot-tasks"))
|
.name(String::from("register-snapshot-tasks"))
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
thread::sleep(snapshot_delay);
|
thread::sleep(snapshot_delay);
|
||||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation, None) {
|
||||||
error!("Error while registering snapshot: {}", e);
|
error!("Error while registering snapshot: {}", e);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -11,7 +11,7 @@ use crate::analytics::Analytics;
|
|||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use crate::routes::SummarizedTaskView;
|
use crate::routes::{get_task_id, SummarizedTaskView};
|
||||||
|
|
||||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||||
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
|
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
|
||||||
@ -29,8 +29,9 @@ pub async fn create_dump(
|
|||||||
keys: auth_controller.list_keys()?,
|
keys: auth_controller.list_keys()?,
|
||||||
instance_uid: analytics.instance_uid().cloned(),
|
instance_uid: analytics.instance_uid().cloned(),
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Create dump");
|
debug!(returns = ?task, "Create dump");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -7,7 +7,7 @@ use bstr::ByteSlice as _;
|
|||||||
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
||||||
use deserr::Deserr;
|
use deserr::Deserr;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use index_scheduler::IndexScheduler;
|
use index_scheduler::{IndexScheduler, TaskId};
|
||||||
use meilisearch_types::deserr::query_params::Param;
|
use meilisearch_types::deserr::query_params::Param;
|
||||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||||
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
||||||
@ -36,7 +36,7 @@ use crate::extractors::authentication::policies::*;
|
|||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::payload::Payload;
|
use crate::extractors::payload::Payload;
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
||||||
use crate::search::parse_filter;
|
use crate::search::parse_filter;
|
||||||
|
|
||||||
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
||||||
@ -130,9 +130,10 @@ pub async fn delete_document(
|
|||||||
index_uid: index_uid.to_string(),
|
index_uid: index_uid.to_string(),
|
||||||
documents_ids: vec![document_id],
|
documents_ids: vec![document_id],
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
debug!(returns = ?task, "Delete document");
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,6 +278,7 @@ pub async fn replace_documents(
|
|||||||
analytics.add_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
analytics.add_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
||||||
|
|
||||||
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task = document_addition(
|
let task = document_addition(
|
||||||
extract_mime_type(&req)?,
|
extract_mime_type(&req)?,
|
||||||
index_scheduler,
|
index_scheduler,
|
||||||
@ -285,6 +287,7 @@ pub async fn replace_documents(
|
|||||||
params.csv_delimiter,
|
params.csv_delimiter,
|
||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::ReplaceDocuments,
|
IndexDocumentsMethod::ReplaceDocuments,
|
||||||
|
uid,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@ -309,6 +312,7 @@ pub async fn update_documents(
|
|||||||
analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
||||||
|
|
||||||
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task = document_addition(
|
let task = document_addition(
|
||||||
extract_mime_type(&req)?,
|
extract_mime_type(&req)?,
|
||||||
index_scheduler,
|
index_scheduler,
|
||||||
@ -317,6 +321,7 @@ pub async fn update_documents(
|
|||||||
params.csv_delimiter,
|
params.csv_delimiter,
|
||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::UpdateDocuments,
|
IndexDocumentsMethod::UpdateDocuments,
|
||||||
|
uid,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@ -334,6 +339,7 @@ async fn document_addition(
|
|||||||
csv_delimiter: Option<u8>,
|
csv_delimiter: Option<u8>,
|
||||||
mut body: Payload,
|
mut body: Payload,
|
||||||
method: IndexDocumentsMethod,
|
method: IndexDocumentsMethod,
|
||||||
|
task_id: Option<TaskId>,
|
||||||
allow_index_creation: bool,
|
allow_index_creation: bool,
|
||||||
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
||||||
let format = match (
|
let format = match (
|
||||||
@ -450,7 +456,7 @@ async fn document_addition(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let scheduler = index_scheduler.clone();
|
let scheduler = index_scheduler.clone();
|
||||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
|
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? {
|
||||||
Ok(task) => task,
|
Ok(task) => task,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
index_scheduler.delete_update_file(uuid)?;
|
index_scheduler.delete_update_file(uuid)?;
|
||||||
@ -480,8 +486,9 @@ pub async fn delete_documents_batch(
|
|||||||
|
|
||||||
let task =
|
let task =
|
||||||
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete documents by batch");
|
debug!(returns = ?task, "Delete documents by batch");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -516,8 +523,9 @@ pub async fn delete_documents_by_filter(
|
|||||||
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
||||||
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
||||||
|
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete documents by filter");
|
debug!(returns = ?task, "Delete documents by filter");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -533,8 +541,9 @@ pub async fn clear_all_documents(
|
|||||||
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
||||||
|
|
||||||
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete all documents");
|
debug!(returns = ?task, "Delete all documents");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -17,7 +17,7 @@ use serde_json::json;
|
|||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use super::{Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
||||||
@ -137,8 +137,9 @@ pub async fn create_index(
|
|||||||
);
|
);
|
||||||
|
|
||||||
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
debug!(returns = ?task, "Create index");
|
debug!(returns = ?task, "Create index");
|
||||||
|
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -206,8 +207,9 @@ pub async fn update_index(
|
|||||||
primary_key: body.primary_key,
|
primary_key: body.primary_key,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Update index");
|
debug!(returns = ?task, "Update index");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -216,11 +218,13 @@ pub async fn update_index(
|
|||||||
pub async fn delete_index(
|
pub async fn delete_index(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
|
req: HttpRequest,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
debug!(returns = ?task, "Delete index");
|
debug!(returns = ?task, "Delete index");
|
||||||
|
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -15,7 +15,7 @@ use tracing::debug;
|
|||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::routes::SummarizedTaskView;
|
use crate::routes::{get_task_id, SummarizedTaskView};
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! make_setting_route {
|
macro_rules! make_setting_route {
|
||||||
@ -34,7 +34,7 @@ macro_rules! make_setting_route {
|
|||||||
use $crate::extractors::authentication::policies::*;
|
use $crate::extractors::authentication::policies::*;
|
||||||
use $crate::extractors::authentication::GuardedData;
|
use $crate::extractors::authentication::GuardedData;
|
||||||
use $crate::extractors::sequential_extractor::SeqHandler;
|
use $crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use $crate::routes::SummarizedTaskView;
|
use $crate::routes::{get_task_id, SummarizedTaskView};
|
||||||
|
|
||||||
pub async fn delete(
|
pub async fn delete(
|
||||||
index_scheduler: GuardedData<
|
index_scheduler: GuardedData<
|
||||||
@ -42,6 +42,7 @@ macro_rules! make_setting_route {
|
|||||||
Data<IndexScheduler>,
|
Data<IndexScheduler>,
|
||||||
>,
|
>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
|
req: HttpRequest,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
|
|
||||||
@ -56,8 +57,9 @@ macro_rules! make_setting_route {
|
|||||||
is_deletion: true,
|
is_deletion: true,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
|
||||||
.await??
|
.await??
|
||||||
.into();
|
.into();
|
||||||
|
|
||||||
@ -105,8 +107,9 @@ macro_rules! make_setting_route {
|
|||||||
is_deletion: false,
|
is_deletion: false,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
|
||||||
.await??
|
.await??
|
||||||
.into();
|
.into();
|
||||||
|
|
||||||
@ -767,8 +770,9 @@ pub async fn update_all(
|
|||||||
is_deletion: false,
|
is_deletion: false,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Update all settings");
|
debug!(returns = ?task, "Update all settings");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -790,6 +794,7 @@ pub async fn get_all(
|
|||||||
pub async fn delete_all(
|
pub async fn delete_all(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
|
req: HttpRequest,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
|
|
||||||
@ -803,8 +808,9 @@ pub async fn delete_all(
|
|||||||
is_deletion: true,
|
is_deletion: true,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete all settings");
|
debug!(returns = ?task, "Delete all settings");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -4,7 +4,7 @@ use actix_web::web::Data;
|
|||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use index_scheduler::IndexScheduler;
|
use index_scheduler::IndexScheduler;
|
||||||
use meilisearch_auth::AuthController;
|
use meilisearch_auth::AuthController;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::{Code, ResponseError};
|
||||||
use meilisearch_types::settings::{Settings, Unchecked};
|
use meilisearch_types::settings::{Settings, Unchecked};
|
||||||
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
|
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -45,6 +45,34 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
.service(web::scope("/experimental-features").configure(features::configure));
|
.service(web::scope("/experimental-features").configure(features::configure));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_task_id(req: &HttpRequest) -> Result<Option<TaskId>, ResponseError> {
|
||||||
|
let task_id = req
|
||||||
|
.headers()
|
||||||
|
.get("TaskId")
|
||||||
|
.map(|header| {
|
||||||
|
header.to_str().map_err(|e| {
|
||||||
|
ResponseError::from_msg(
|
||||||
|
format!("TaskId is not a valid utf-8 string: {e}"),
|
||||||
|
Code::BadRequest,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?
|
||||||
|
.map(|s| {
|
||||||
|
s.parse::<TaskId>().map_err(|e| {
|
||||||
|
ResponseError::from_msg(
|
||||||
|
format!(
|
||||||
|
"Could not parse the TaskId as a {}: {e}",
|
||||||
|
std::any::type_name::<TaskId>(),
|
||||||
|
),
|
||||||
|
Code::BadRequest,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
Ok(task_id)
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct SummarizedTaskView {
|
pub struct SummarizedTaskView {
|
||||||
|
@ -10,7 +10,7 @@ use crate::analytics::Analytics;
|
|||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use crate::routes::SummarizedTaskView;
|
use crate::routes::{get_task_id, SummarizedTaskView};
|
||||||
|
|
||||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||||
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
|
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
|
||||||
@ -24,8 +24,9 @@ pub async fn create_snapshot(
|
|||||||
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
|
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
|
||||||
|
|
||||||
let task = KindWithContent::SnapshotCreation;
|
let task = KindWithContent::SnapshotCreation;
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Create snapshot");
|
debug!(returns = ?task, "Create snapshot");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -10,7 +10,7 @@ use meilisearch_types::index_uid::IndexUid;
|
|||||||
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
|
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use super::SummarizedTaskView;
|
use super::{get_task_id, SummarizedTaskView};
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::error::MeilisearchHttpError;
|
use crate::error::MeilisearchHttpError;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
@ -60,7 +60,8 @@ pub async fn swap_indexes(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let task = KindWithContent::IndexSwap { swaps };
|
let task = KindWithContent::IndexSwap { swaps };
|
||||||
|
let uid = get_task_id(&req)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,7 @@ use time::macros::format_description;
|
|||||||
use time::{Date, Duration, OffsetDateTime, Time};
|
use time::{Date, Duration, OffsetDateTime, Time};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
use super::SummarizedTaskView;
|
use super::{get_task_id, SummarizedTaskView};
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
@ -197,7 +197,9 @@ async fn cancel_tasks(
|
|||||||
let task_cancelation =
|
let task_cancelation =
|
||||||
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
||||||
|
|
||||||
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??;
|
let uid = get_task_id(&req)?;
|
||||||
|
let task =
|
||||||
|
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??;
|
||||||
let task: SummarizedTaskView = task.into();
|
let task: SummarizedTaskView = task.into();
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(task))
|
Ok(HttpResponse::Ok().json(task))
|
||||||
@ -242,7 +244,8 @@ async fn delete_tasks(
|
|||||||
let task_deletion =
|
let task_deletion =
|
||||||
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
||||||
|
|
||||||
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??;
|
let uid = get_task_id(&req)?;
|
||||||
|
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??;
|
||||||
let task: SummarizedTaskView = task.into();
|
let task: SummarizedTaskView = task.into();
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(task))
|
Ok(HttpResponse::Ok().json(task))
|
||||||
|
@ -199,3 +199,74 @@ async fn error_create_with_invalid_index_uid() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn send_task_id() {
|
||||||
|
let server = Server::new().await;
|
||||||
|
let app = server.init_web_app().await;
|
||||||
|
let index = server.index("catto");
|
||||||
|
let (response, code) = index.create(None).await;
|
||||||
|
snapshot!(code, @"202 Accepted");
|
||||||
|
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
|
||||||
|
{
|
||||||
|
"taskUid": 0,
|
||||||
|
"indexUid": "catto",
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "indexCreation",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let body = serde_json::to_string(&json!({
|
||||||
|
"uid": "doggo",
|
||||||
|
"primaryKey": None::<&str>,
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
let req = test::TestRequest::post()
|
||||||
|
.uri("/indexes")
|
||||||
|
.insert_header(("TaskId", "25"))
|
||||||
|
.insert_header(ContentType::json())
|
||||||
|
.set_payload(body)
|
||||||
|
.to_request();
|
||||||
|
|
||||||
|
let res = test::call_service(&app, req).await;
|
||||||
|
snapshot!(res.status(), @"202 Accepted");
|
||||||
|
|
||||||
|
let bytes = test::read_body(res).await;
|
||||||
|
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
|
||||||
|
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
|
||||||
|
{
|
||||||
|
"taskUid": 25,
|
||||||
|
"indexUid": "doggo",
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "indexCreation",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let body = serde_json::to_string(&json!({
|
||||||
|
"uid": "girafo",
|
||||||
|
"primaryKey": None::<&str>,
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
let req = test::TestRequest::post()
|
||||||
|
.uri("/indexes")
|
||||||
|
.insert_header(("TaskId", "12"))
|
||||||
|
.insert_header(ContentType::json())
|
||||||
|
.set_payload(body)
|
||||||
|
.to_request();
|
||||||
|
|
||||||
|
let res = test::call_service(&app, req).await;
|
||||||
|
snapshot!(res.status(), @"400 Bad Request");
|
||||||
|
|
||||||
|
let bytes = test::read_body(res).await;
|
||||||
|
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
|
||||||
|
snapshot!(json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Received bad task id: 12 should be >= to 26.",
|
||||||
|
"code": "bad_request",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user