1747: Add new error types for document additions r=curquiza a=MarinPostma

Adds the missing errors for the documents routes, as specified.

close #1691
close #1690


Co-authored-by: mpostma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-09-30 15:16:44 +00:00 committed by GitHub
commit 81993b6a15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 140 deletions

View File

@ -83,6 +83,11 @@ pub enum Code {
DumpAlreadyInProgress, DumpAlreadyInProgress,
DumpProcessFailed, DumpProcessFailed,
InvalidContentType,
MissingContentType,
MalformedPayload,
MissingPayload,
} }
impl Code { impl Code {
@ -154,6 +159,14 @@ impl Code {
DumpProcessFailed => { DumpProcessFailed => {
ErrCode::internal("dump_process_failed", StatusCode::INTERNAL_SERVER_ERROR) ErrCode::internal("dump_process_failed", StatusCode::INTERNAL_SERVER_ERROR)
} }
MissingContentType => {
ErrCode::invalid("missing_content_type", StatusCode::UNSUPPORTED_MEDIA_TYPE)
}
MalformedPayload => ErrCode::invalid("malformed_payload", StatusCode::BAD_REQUEST),
InvalidContentType => {
ErrCode::invalid("invalid_content_type", StatusCode::UNSUPPORTED_MEDIA_TYPE)
}
MissingPayload => ErrCode::invalid("missing_payload", StatusCode::BAD_REQUEST),
} }
} }

View File

@ -9,6 +9,23 @@ use aweb::error::{JsonPayloadError, QueryPayloadError};
use meilisearch_error::{Code, ErrorCode}; use meilisearch_error::{Code, ErrorCode};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)]
pub enum MeilisearchHttpError {
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")]
MissingContentType,
#[error("The Content-Type \"{0}\" is invalid. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")]
InvalidContentType(String),
}
impl ErrorCode for MeilisearchHttpError {
fn error_code(&self) -> Code {
match self {
MeilisearchHttpError::MissingContentType => Code::MissingContentType,
MeilisearchHttpError::InvalidContentType(_) => Code::InvalidContentType,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ResponseError { pub struct ResponseError {

View File

@ -1,6 +1,6 @@
use actix_web::error::PayloadError; use actix_web::error::PayloadError;
use actix_web::web::Bytes; use actix_web::web::Bytes;
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::debug; use log::debug;
use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update}; use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update};
@ -10,7 +10,7 @@ use serde::Deserialize;
use serde_json::Value; use serde_json::Value;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::error::ResponseError; use crate::error::{MeilisearchHttpError, ResponseError};
use crate::extractors::authentication::{policies::*, GuardedData}; use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::payload::Payload; use crate::extractors::payload::Payload;
use crate::routes::IndexParam; use crate::routes::IndexParam;
@ -18,29 +18,6 @@ use crate::routes::IndexParam;
const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0; const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0;
const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20; const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20;
macro_rules! guard_content_type {
($fn_name:ident, $guard_value:literal) => {
fn $fn_name(head: &actix_web::dev::RequestHead) -> bool {
if let Some(content_type) = head.headers.get("Content-Type") {
content_type
.to_str()
.map(|v| v.contains($guard_value))
.unwrap_or(false)
} else {
false
}
}
};
}
guard_content_type!(guard_ndjson, "application/x-ndjson");
guard_content_type!(guard_csv, "text/csv");
guard_content_type!(guard_json, "application/json");
fn empty_application_type(head: &actix_web::dev::RequestHead) -> bool {
head.headers.get("Content-Type").is_none()
}
/// This is required because Payload is not Sync nor Send /// This is required because Payload is not Sync nor Send
fn payload_to_stream(mut payload: Payload) -> impl Stream<Item = Result<Bytes, PayloadError>> { fn payload_to_stream(mut payload: Payload) -> impl Stream<Item = Result<Bytes, PayloadError>> {
let (snd, recv) = mpsc::channel(1); let (snd, recv) = mpsc::channel(1);
@ -62,26 +39,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service( cfg.service(
web::resource("") web::resource("")
.route(web::get().to(get_all_documents)) .route(web::get().to(get_all_documents))
// replace documents routes .route(web::post().to(add_documents))
.route( .route(web::put().to(update_documents))
web::post()
.guard(empty_application_type)
.to(HttpResponse::UnsupportedMediaType),
)
.route(web::post().guard(guard_json).to(add_documents_json))
.route(web::post().guard(guard_ndjson).to(add_documents_ndjson))
.route(web::post().guard(guard_csv).to(add_documents_csv))
.route(web::post().to(HttpResponse::UnsupportedMediaType))
// update documents routes
.route(
web::put()
.guard(empty_application_type)
.to(HttpResponse::UnsupportedMediaType),
)
.route(web::put().guard(guard_json).to(update_documents_json))
.route(web::put().guard(guard_ndjson).to(update_documents_ndjson))
.route(web::put().guard(guard_csv).to(update_documents_csv))
.route(web::put().to(HttpResponse::UnsupportedMediaType))
.route(web::delete().to(clear_all_documents)), .route(web::delete().to(clear_all_documents)),
) )
// this route needs to be before the /documents/{document_id} to match properly // this route needs to be before the /documents/{document_id} to match properly
@ -165,127 +124,76 @@ pub struct UpdateDocumentsQuery {
primary_key: Option<String>, primary_key: Option<String>,
} }
pub async fn add_documents_json( pub async fn add_documents(
meilisearch: GuardedData<Private, MeiliSearch>, meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
req: HttpRequest,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params);
document_addition( document_addition(
req.headers()
.get("Content-type")
.map(|s| s.to_str().unwrap_or("unkown")),
meilisearch, meilisearch,
path, path.into_inner().index_uid,
params, params.into_inner().primary_key,
body, body,
DocumentAdditionFormat::Json,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
) )
.await .await
} }
pub async fn add_documents_ndjson( pub async fn update_documents(
meilisearch: GuardedData<Private, MeiliSearch>, meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
req: HttpRequest,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params);
document_addition( document_addition(
req.headers()
.get("Content-type")
.map(|s| s.to_str().unwrap_or("unkown")),
meilisearch, meilisearch,
path, path.into_inner().index_uid,
params, params.into_inner().primary_key,
body, body,
DocumentAdditionFormat::Ndjson,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
}
pub async fn add_documents_csv(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Csv,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
}
pub async fn update_documents_json(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Json,
IndexDocumentsMethod::UpdateDocuments, IndexDocumentsMethod::UpdateDocuments,
) )
.await .await
} }
pub async fn update_documents_ndjson(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Ndjson,
IndexDocumentsMethod::UpdateDocuments,
)
.await
}
pub async fn update_documents_csv(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Csv,
IndexDocumentsMethod::UpdateDocuments,
)
.await
}
/// Route used when the payload type is "application/json" /// Route used when the payload type is "application/json"
/// Used to add or replace documents /// Used to add or replace documents
async fn document_addition( async fn document_addition(
content_type: Option<&str>,
meilisearch: GuardedData<Private, MeiliSearch>, meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>, index_uid: String,
params: web::Query<UpdateDocumentsQuery>, primary_key: Option<String>,
body: Payload, body: Payload,
format: DocumentAdditionFormat,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params); let format = match content_type {
Some("application/json") => DocumentAdditionFormat::Json,
Some("application/x-ndjson") => DocumentAdditionFormat::Ndjson,
Some("text/csv") => DocumentAdditionFormat::Csv,
Some(other) => {
return Err(MeilisearchHttpError::InvalidContentType(other.to_string()).into())
}
None => return Err(MeilisearchHttpError::MissingContentType.into()),
};
let update = Update::DocumentAddition { let update = Update::DocumentAddition {
payload: Box::new(payload_to_stream(body)), payload: Box::new(payload_to_stream(body)),
primary_key: params.primary_key.clone(), primary_key,
method, method,
format, format,
}; };
let update_status = meilisearch
.register_update(path.into_inner().index_uid, update, true) let update_status = meilisearch.register_update(index_uid, update, true).await?;
.await?;
debug!("returns: {:?}", update_status); debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))

View File

@ -361,10 +361,8 @@ mod test {
indexes::documents::clear_all_documents, indexes::documents::clear_all_documents,
indexes::documents::delete_documents, indexes::documents::delete_documents,
indexes::documents::update_documents_json, indexes::documents::update_documents,
indexes::documents::update_documents_csv, indexes::documents::add_documents,
indexes::documents::add_documents_json,
indexes::documents::add_documents_csv,
indexes::documents::delete_document, indexes::documents::delete_document,
indexes::updates::get_all_updates_status, indexes::updates::get_all_updates_status,

View File

@ -2,6 +2,7 @@ use std::fmt;
use std::io::{self, Read, Result as IoResult, Seek, Write}; use std::io::{self, Read, Result as IoResult, Seek, Write};
use csv::{Reader as CsvReader, StringRecordsIntoIter}; use csv::{Reader as CsvReader, StringRecordsIntoIter};
use meilisearch_error::{Code, ErrorCode};
use milli::documents::DocumentBatchBuilder; use milli::documents::DocumentBatchBuilder;
use serde_json::{Deserializer, Map, Value}; use serde_json::{Deserializer, Map, Value};
@ -35,6 +36,15 @@ pub enum DocumentFormatError {
), ),
} }
impl ErrorCode for DocumentFormatError {
fn error_code(&self) -> Code {
match self {
DocumentFormatError::Internal(_) => Code::Internal,
DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload,
}
}
}
internal_error!(DocumentFormatError: milli::documents::Error, io::Error); internal_error!(DocumentFormatError: milli::documents::Error, io::Error);
macro_rules! malformed { macro_rules! malformed {

View File

@ -1,4 +1,5 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -75,6 +76,16 @@ pub enum DocumentAdditionFormat {
Ndjson, Ndjson,
} }
impl fmt::Display for DocumentAdditionFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DocumentAdditionFormat::Json => write!(f, "json"),
DocumentAdditionFormat::Ndjson => write!(f, "ndjson"),
DocumentAdditionFormat::Csv => write!(f, "csv"),
}
}
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Stats { pub struct Stats {

View File

@ -5,7 +5,7 @@ use meilisearch_error::{Code, ErrorCode};
use crate::{ use crate::{
document_formats::DocumentFormatError, document_formats::DocumentFormatError,
index_controller::update_file_store::UpdateFileStoreError, index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat},
}; };
pub type Result<T> = std::result::Result<T, UpdateLoopError>; pub type Result<T> = std::result::Result<T, UpdateLoopError>;
@ -22,12 +22,12 @@ pub enum UpdateLoopError {
)] )]
FatalUpdateStoreError, FatalUpdateStoreError,
#[error("{0}")] #[error("{0}")]
InvalidPayload(#[from] DocumentFormatError), DocumentFormatError(#[from] DocumentFormatError),
#[error("{0}")]
MalformedPayload(Box<dyn Error + Send + Sync + 'static>),
// TODO: The reference to actix has to go. // TODO: The reference to actix has to go.
#[error("{0}")] #[error("{0}")]
PayloadError(#[from] actix_web::error::PayloadError), PayloadError(#[from] actix_web::error::PayloadError),
#[error("A {0} payload is missing.")]
MissingPayload(DocumentAdditionFormat),
} }
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError
@ -60,12 +60,12 @@ impl ErrorCode for UpdateLoopError {
Self::Internal(_) => Code::Internal, Self::Internal(_) => Code::Internal,
//Self::IndexActor(e) => e.error_code(), //Self::IndexActor(e) => e.error_code(),
Self::FatalUpdateStoreError => Code::Internal, Self::FatalUpdateStoreError => Code::Internal,
Self::InvalidPayload(_) => Code::BadRequest, Self::DocumentFormatError(error) => error.error_code(),
Self::MalformedPayload(_) => Code::BadRequest,
Self::PayloadError(error) => match error { Self::PayloadError(error) => match error {
actix_web::error::PayloadError::Overflow => Code::PayloadTooLarge, actix_web::error::PayloadError::Overflow => Code::PayloadTooLarge,
_ => Code::Internal, _ => Code::Internal,
}, },
Self::MissingPayload(_) => Code::MissingPayload,
} }
} }
} }

View File

@ -3,7 +3,7 @@ mod message;
pub mod status; pub mod status;
pub mod store; pub mod store;
use std::io; use std::io::{self, BufRead, BufReader};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
@ -191,9 +191,15 @@ impl UpdateLoop {
method, method,
format, format,
} => { } => {
let reader = StreamReader::new(payload); let mut reader = BufReader::new(StreamReader::new(payload));
let (content_uuid, mut update_file) = self.update_file_store.new_update()?; let (content_uuid, mut update_file) = self.update_file_store.new_update()?;
tokio::task::spawn_blocking(move || -> Result<_> { tokio::task::spawn_blocking(move || -> Result<_> {
// check if the payload is empty, and return an error
reader.fill_buf()?;
if reader.buffer().is_empty() {
return Err(UpdateLoopError::MissingPayload(format));
}
match format { match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?,