implement custom payload

This commit is contained in:
marin postma 2021-06-23 13:55:16 +02:00
parent 71226feb74
commit 1c13100948
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
4 changed files with 65 additions and 7 deletions

View File

@ -1,7 +1,6 @@
use actix_web::web::Payload;
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
use super::Data; use crate::{Data, Payload};
use crate::index::{Checked, Settings}; use crate::index::{Checked, Settings};
use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus}; use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus};

View File

@ -3,7 +3,7 @@ use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use actix_web::web::{Bytes, Payload}; use actix_web::web::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::error; use log::error;
@ -25,6 +25,7 @@ use uuid_resolver::{error::UuidResolverError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt; use crate::option::Opt;
use error::Result; use error::Result;
use crate::Payload;
use self::dump_actor::load_dump; use self::dump_actor::load_dump;
use self::error::IndexControllerError; use self::error::IndexControllerError;

View File

@ -10,10 +10,13 @@ pub mod routes;
#[cfg(all(not(debug_assertions), feature = "analytics"))] #[cfg(all(not(debug_assertions), feature = "analytics"))]
pub mod analytics; pub mod analytics;
use std::{pin::Pin, task::{Context, Poll}};
pub use self::data::Data; pub use self::data::Data;
use futures::{Stream, future::{Ready, ready}};
pub use option::Opt; pub use option::Opt;
use actix_web::{HttpResponse, web}; use actix_web::{FromRequest, HttpRequest, dev, error::PayloadError, web};
pub fn configure_data(config: &mut web::ServiceConfig, data: Data) { pub fn configure_data(config: &mut web::ServiceConfig, data: Data) {
let http_payload_size_limit = data.http_payload_size_limit(); let http_payload_size_limit = data.http_payload_size_limit();
@ -35,6 +38,7 @@ pub fn configure_data(config: &mut web::ServiceConfig, data: Data) {
#[cfg(feature = "mini-dashboard")] #[cfg(feature = "mini-dashboard")]
pub fn dashboard(config: &mut web::ServiceConfig, enable_frontend: bool) { pub fn dashboard(config: &mut web::ServiceConfig, enable_frontend: bool) {
use actix_web_static_files::Resource; use actix_web_static_files::Resource;
use actix_web::HttpResponse;
mod dashboard { mod dashboard {
include!(concat!(env!("OUT_DIR"), "/generated.rs")); include!(concat!(env!("OUT_DIR"), "/generated.rs"));
@ -102,3 +106,57 @@ macro_rules! create_app {
.wrap(middleware::NormalizePath::new(middleware::TrailingSlash::Trim)) .wrap(middleware::NormalizePath::new(middleware::TrailingSlash::Trim))
}}; }};
} }
pub struct Payload {
payload: dev::Payload,
limit: usize,
}
pub struct PayloadConfig {
limit: usize,
}
impl Default for PayloadConfig {
fn default() -> Self {
Self { limit: 256 * 1024 }
}
}
impl FromRequest for Payload {
type Config = PayloadConfig;
type Error = PayloadError;
type Future = Ready<Result<Payload, Self::Error>>;
#[inline]
fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future {
let limit = req.app_data::<PayloadConfig>().map(|c| c.limit).unwrap_or(Self::Config::default().limit);
ready(Ok(Payload { payload: payload.take(), limit }))
}
}
impl Stream for Payload {
type Item = Result<web::Bytes, PayloadError>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.payload).poll_next(cx) {
Poll::Ready(Some(result)) => {
match result {
Ok(bytes) => {
match self.limit.checked_sub(bytes.len()) {
Some(new_limit) => {
self.limit = new_limit;
Poll::Ready(Some(Ok(bytes)))
}
None => Poll::Ready(Some(Err(PayloadError::Overflow))),
}
}
x => Poll::Ready(Some(x)),
}
},
otherwise => otherwise,
}
}
}

View File

@ -9,7 +9,7 @@ use serde_json::Value;
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::helpers::Authentication; use crate::helpers::Authentication;
use crate::routes::IndexParam; use crate::routes::IndexParam;
use crate::Data; use crate::{Data, Payload};
const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0; const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0;
const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20; const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20;
@ -129,7 +129,7 @@ async fn add_documents(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: web::Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let update_status = data let update_status = data
.add_documents( .add_documents(
@ -173,7 +173,7 @@ async fn update_documents(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: web::Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let update = data let update = data
.add_documents( .add_documents(