split csv and json document routes

This commit is contained in:
mpostma 2021-09-29 00:12:25 +02:00
parent 6e8a3fe8de
commit 911630000f
4 changed files with 49 additions and 48 deletions

View File

@ -18,7 +18,6 @@ use crate::routes::IndexParam;
const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0; const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0;
const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20; const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20;
/*
macro_rules! guard_content_type { macro_rules! guard_content_type {
($fn_name:ident, $guard_value:literal) => { ($fn_name:ident, $guard_value:literal) => {
fn $fn_name(head: &actix_web::dev::RequestHead) -> bool { fn $fn_name(head: &actix_web::dev::RequestHead) -> bool {
@ -33,9 +32,8 @@ macro_rules! guard_content_type {
} }
}; };
} }
guard_content_type!(guard_json, "application/json"); guard_content_type!(guard_json, "application/json");
*/ guard_content_type!(guard_csv, "application/csv");
/// This is required because Payload is not Sync nor Send /// This is required because Payload is not Sync nor Send
fn payload_to_stream(mut payload: Payload) -> impl Stream<Item = Result<Bytes, PayloadError>> { fn payload_to_stream(mut payload: Payload) -> impl Stream<Item = Result<Bytes, PayloadError>> {
@ -48,22 +46,6 @@ fn payload_to_stream(mut payload: Payload) -> impl Stream<Item = Result<Bytes, P
tokio_stream::wrappers::ReceiverStream::new(recv) tokio_stream::wrappers::ReceiverStream::new(recv)
} }
fn guard_json(head: &actix_web::dev::RequestHead) -> bool {
if let Some(_content_type) = head.headers.get("Content-Type") {
// CURRENTLY AND FOR THIS RELEASE ONLY WE DECIDED TO INTERPRET ALL CONTENT-TYPES AS JSON
true
/*
content_type
.to_str()
.map(|v| v.contains("application/json"))
.unwrap_or(false)
*/
} else {
// if no content-type is specified we still accept the data as json!
true
}
}
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct DocumentParam { pub struct DocumentParam {
index_uid: String, index_uid: String,
@ -74,8 +56,10 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service( cfg.service(
web::resource("") web::resource("")
.route(web::get().to(get_all_documents)) .route(web::get().to(get_all_documents))
.route(web::post().guard(guard_json).to(add_documents)) .route(web::post().guard(guard_json).to(add_documents_json))
.route(web::put().guard(guard_json).to(update_documents)) .route(web::post().guard(guard_csv).to(add_documents_csv))
.route(web::post().guard(guard_json).to(update_documents_json))
.route(web::post().guard(guard_csv).to(update_documents_csv))
.route(web::delete().to(clear_all_documents)), .route(web::delete().to(clear_all_documents)),
) )
// this route needs to be before the /documents/{document_id} to match properly // this route needs to be before the /documents/{document_id} to match properly
@ -159,43 +143,57 @@ pub struct UpdateDocumentsQuery {
primary_key: Option<String>, primary_key: Option<String>,
} }
/// Route used when the payload type is "application/json" pub async fn add_documents_json(
/// Used to add or replace documents
pub async fn add_documents(
meilisearch: GuardedData<Private, MeiliSearch>, meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params); document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Json, IndexDocumentsMethod::ReplaceDocuments).await
let update = Update::DocumentAddition {
payload: Box::new(payload_to_stream(body)),
primary_key: params.primary_key.clone(),
method: IndexDocumentsMethod::ReplaceDocuments,
format: DocumentAdditionFormat::Json,
};
let update_status = meilisearch
.register_update(path.into_inner().index_uid, update, true)
.await?;
debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
} }
/// Route used when the payload type is "application/json" pub async fn add_documents_csv(
/// Used to add or replace documents
pub async fn update_documents(
meilisearch: GuardedData<Private, MeiliSearch>, meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Csv, IndexDocumentsMethod::ReplaceDocuments).await
}
pub async fn update_documents_json(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Json, IndexDocumentsMethod::UpdateDocuments).await
}
pub async fn update_documents_csv(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Csv, IndexDocumentsMethod::UpdateDocuments).await
}
/// Route used when the payload type is "application/json"
/// Used to add or replace documents
async fn document_addition(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
format: DocumentAdditionFormat,
method: IndexDocumentsMethod,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params); debug!("called with params: {:?}", params);
let update = Update::DocumentAddition { let update = Update::DocumentAddition {
payload: Box::new(payload_to_stream(body)), payload: Box::new(payload_to_stream(body)),
primary_key: params.primary_key.clone(), primary_key: params.primary_key.clone(),
method: IndexDocumentsMethod::UpdateDocuments, method,
format: DocumentAdditionFormat::Json, format,
}; };
let update_status = meilisearch let update_status = meilisearch
.register_update(path.into_inner().index_uid, update, true) .register_update(path.into_inner().index_uid, update, true)

View File

@ -361,8 +361,10 @@ mod test {
indexes::documents::clear_all_documents, indexes::documents::clear_all_documents,
indexes::documents::delete_documents, indexes::documents::delete_documents,
indexes::documents::update_documents, indexes::documents::update_documents_json,
indexes::documents::add_documents, indexes::documents::update_documents_csv,
indexes::documents::add_documents_json,
indexes::documents::add_documents_csv,
indexes::documents::delete_document, indexes::documents::delete_document,
indexes::updates::get_all_updates_status, indexes::updates::get_all_updates_status,

View File

@ -152,8 +152,8 @@ async fn document_add_create_index_bad_uid() {
async fn document_update_create_index_bad_uid() { async fn document_update_create_index_bad_uid() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("883 fj!"); let index = server.index("883 fj!");
let (_response, code) = index.update_documents(json!([]), None).await; let (response, code) = index.update_documents(json!([]), None).await;
assert_eq!(code, 400); assert_eq!(code, 400, "{}", response);
} }
#[actix_rt::test] #[actix_rt::test]

View File

@ -67,8 +67,9 @@ impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Rea
// TODO: optimize buf filling // TODO: optimize buf filling
match self.current.take() { match self.current.take() {
Some(mut bytes) => { Some(mut bytes) => {
let copied = bytes.split_to(buf.len()); let split_at = bytes.len().min(buf.len());
buf.copy_from_slice(&copied); let copied = bytes.split_to(split_at);
buf[..split_at].copy_from_slice(&copied);
if !bytes.is_empty() { if !bytes.is_empty() {
self.current.replace(bytes); self.current.replace(bytes);
} }