move payload to own module

This commit is contained in:
marin postma 2021-06-23 14:56:02 +02:00
parent 834995b130
commit a838238a63
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
6 changed files with 79 additions and 75 deletions

View File

@ -1,8 +1,9 @@
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
use crate::extractors::payload::Payload;
use crate::index::{Checked, Settings}; use crate::index::{Checked, Settings};
use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus}; use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus};
use crate::{Data, Payload}; use crate::Data;
impl Data { impl Data {
pub async fn add_documents( pub async fn add_documents(

View File

@ -0,0 +1 @@
pub mod payload;

View File

@ -0,0 +1,69 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_http::error::PayloadError;
use actix_web::{dev, web, FromRequest, HttpRequest};
use futures::future::{ready, Ready};
use futures::Stream;
pub struct Payload {
payload: dev::Payload,
limit: usize,
}
pub struct PayloadConfig {
limit: usize,
}
impl PayloadConfig {
pub fn new(limit: usize) -> Self {
Self { limit }
}
}
impl Default for PayloadConfig {
fn default() -> Self {
Self { limit: 256 * 1024 }
}
}
impl FromRequest for Payload {
type Config = PayloadConfig;
type Error = PayloadError;
type Future = Ready<Result<Payload, Self::Error>>;
#[inline]
fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future {
let limit = req
.app_data::<PayloadConfig>()
.map(|c| c.limit)
.unwrap_or(Self::Config::default().limit);
ready(Ok(Payload {
payload: payload.take(),
limit,
}))
}
}
impl Stream for Payload {
type Item = Result<web::Bytes, PayloadError>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.payload).poll_next(cx) {
Poll::Ready(Some(result)) => match result {
Ok(bytes) => match self.limit.checked_sub(bytes.len()) {
Some(new_limit) => {
self.limit = new_limit;
Poll::Ready(Some(Ok(bytes)))
}
None => Poll::Ready(Some(Err(PayloadError::Overflow))),
},
x => Poll::Ready(Some(x)),
},
otherwise => otherwise,
}
}
}

View File

@ -22,9 +22,9 @@ use update_actor::UpdateActorHandle;
pub use updates::*; pub use updates::*;
use uuid_resolver::{error::UuidResolverError, UuidResolverHandle}; use uuid_resolver::{error::UuidResolverError, UuidResolverHandle};
use crate::extractors::payload::Payload;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt; use crate::option::Opt;
use crate::Payload;
use error::Result; use error::Result;
use self::dump_actor::load_dump; use self::dump_actor::load_dump;

View File

@ -1,6 +1,7 @@
pub mod data; pub mod data;
#[macro_use] #[macro_use]
pub mod error; pub mod error;
pub mod extractors;
pub mod helpers; pub mod helpers;
mod index; mod index;
mod index_controller; mod index_controller;
@ -10,19 +11,12 @@ pub mod routes;
#[cfg(all(not(debug_assertions), feature = "analytics"))] #[cfg(all(not(debug_assertions), feature = "analytics"))]
pub mod analytics; pub mod analytics;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub use self::data::Data; pub use self::data::Data;
use futures::{
future::{ready, Ready},
Stream,
};
pub use option::Opt; pub use option::Opt;
use actix_web::{dev, error::PayloadError, web, FromRequest, HttpRequest}; use actix_web::web;
use extractors::payload::PayloadConfig;
pub fn configure_data(config: &mut web::ServiceConfig, data: Data) { pub fn configure_data(config: &mut web::ServiceConfig, data: Data) {
let http_payload_size_limit = data.http_payload_size_limit(); let http_payload_size_limit = data.http_payload_size_limit();
@ -114,65 +108,3 @@ macro_rules! create_app {
)) ))
}}; }};
} }
pub struct Payload {
payload: dev::Payload,
limit: usize,
}
pub struct PayloadConfig {
limit: usize,
}
impl PayloadConfig {
pub fn new(limit: usize) -> Self {
Self { limit }
}
}
impl Default for PayloadConfig {
fn default() -> Self {
Self { limit: 256 * 1024 }
}
}
impl FromRequest for Payload {
type Config = PayloadConfig;
type Error = PayloadError;
type Future = Ready<Result<Payload, Self::Error>>;
#[inline]
fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future {
let limit = req
.app_data::<PayloadConfig>()
.map(|c| c.limit)
.unwrap_or(Self::Config::default().limit);
ready(Ok(Payload {
payload: payload.take(),
limit,
}))
}
}
impl Stream for Payload {
type Item = Result<web::Bytes, PayloadError>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.payload).poll_next(cx) {
Poll::Ready(Some(result)) => match result {
Ok(bytes) => match self.limit.checked_sub(bytes.len()) {
Some(new_limit) => {
self.limit = new_limit;
Poll::Ready(Some(Ok(bytes)))
}
None => Poll::Ready(Some(Err(PayloadError::Overflow))),
},
x => Poll::Ready(Some(x)),
},
otherwise => otherwise,
}
}
}

View File

@ -7,9 +7,10 @@ use serde::Deserialize;
use serde_json::Value; use serde_json::Value;
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::extractors::payload::Payload;
use crate::helpers::Authentication; use crate::helpers::Authentication;
use crate::routes::IndexParam; use crate::routes::IndexParam;
use crate::{Data, Payload}; use crate::Data;
const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0; const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0;
const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20; const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20;