Allow to set default response payload limit and timeout (#276)

This commit is contained in:
Nikolay Kim 2024-01-06 00:09:57 +06:00 committed by GitHub
parent 90cdab9c2a
commit e3971e2d59
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 90 additions and 41 deletions

View file

@ -67,5 +67,6 @@ jobs:
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
- name: Clear the cargo caches - name: Clear the cargo caches
continue-on-error: true
run: | run: |
cargo-cache cargo-cache

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.7.17] - 2024-01-05
* Allow to set default response payload limit and timeout
## [0.7.16] - 2023-12-15 ## [0.7.16] - 2023-12-15
* Stop timer before handling UPGRADE h1 requests * Stop timer before handling UPGRADE h1 requests

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.7.16" version = "0.7.17"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"

View file

@ -37,6 +37,8 @@ impl ClientBuilder {
config: ClientConfig { config: ClientConfig {
headers: HeaderMap::new(), headers: HeaderMap::new(),
timeout: Millis(5_000), timeout: Millis(5_000),
response_pl_limit: 262_144,
response_pl_timeout: Millis(10_000),
connector: Box::new(ConnectorWrapper(Connector::default().finish().into())), connector: Box::new(ConnectorWrapper(Connector::default().finish().into())),
}, },
} }
@ -91,6 +93,22 @@ impl ClientBuilder {
self self
} }
/// Max size of response payload.
/// By default max size is 256Kb
pub fn response_payload_limit(mut self, limit: usize) -> Self {
self.config.response_pl_limit = limit;
self
}
/// Set response timeout.
///
/// Response payload timeout is the total time before a payload must be received.
/// Default value is 10 seconds.
pub fn response_payload_timeout(mut self, timeout: Millis) -> Self {
self.config.response_pl_timeout = timeout;
self
}
/// Add default header. Headers added by this method /// Add default header. Headers added by this method
/// get added to every request. /// get added to every request.
pub fn header<K, V>(mut self, key: K, value: V) -> Self pub fn header<K, V>(mut self, key: K, value: V) -> Self

View file

@ -1,11 +1,11 @@
use std::{fmt, net}; use std::{fmt, net, rc::Rc};
use crate::http::{body::Body, RequestHeadType}; use crate::http::{body::Body, RequestHeadType};
use crate::{service::Pipeline, service::Service, time::Millis, util::BoxFuture}; use crate::{service::Pipeline, service::Service, time::Millis, util::BoxFuture};
use super::error::{ConnectError, SendRequestError}; use super::error::{ConnectError, SendRequestError};
use super::response::ClientResponse; use super::response::ClientResponse;
use super::{Connect as ClientConnect, Connection}; use super::{ClientConfig, Connect as ClientConnect, Connection};
pub(super) struct ConnectorWrapper<T>(pub(crate) Pipeline<T>); pub(super) struct ConnectorWrapper<T>(pub(crate) Pipeline<T>);
@ -27,6 +27,7 @@ pub(super) trait Connect: fmt::Debug {
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
timeout: Millis, timeout: Millis,
cfg: Rc<ClientConfig>,
) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>>; ) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>>;
} }
@ -40,6 +41,7 @@ where
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
timeout: Millis, timeout: Millis,
cfg: Rc<ClientConfig>,
) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>> { ) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>> {
Box::pin(async move { Box::pin(async move {
// connect to the host // connect to the host
@ -54,7 +56,7 @@ where
connection connection
.send_request(head, body, timeout) .send_request(head, body, timeout)
.await .await
.map(|(head, payload)| ClientResponse::new(head, payload)) .map(|(head, payload)| ClientResponse::new(head, payload, cfg))
}) })
} }
} }

View file

@ -6,8 +6,7 @@ use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::{Method, RequestHead, RequestHeadType, Uri}; use crate::http::{Method, RequestHead, RequestHeadType, Uri};
use crate::{time::Millis, util::Bytes, util::Stream}; use crate::{time::Millis, util::Bytes, util::Stream};
use super::sender::SendClientRequest; use super::{sender::SendClientRequest, ClientConfig};
use super::ClientConfig;
/// `FrozenClientRequest` struct represents clonable client request. /// `FrozenClientRequest` struct represents clonable client request.
/// It could be used to send same request multiple times. /// It could be used to send same request multiple times.

View file

@ -70,23 +70,27 @@ pub struct Connect {
/// println!("Response: {:?}", res); /// println!("Response: {:?}", res);
/// } /// }
/// ``` /// ```
#[derive(Debug, Clone)] #[derive(Debug, Clone, Default)]
pub struct Client(Rc<ClientConfig>); pub struct Client(Rc<ClientConfig>);
#[derive(Debug)] #[derive(Debug)]
struct ClientConfig { pub(crate) struct ClientConfig {
pub(self) connector: Box<dyn HttpConnect>, pub(self) connector: Box<dyn HttpConnect>,
pub(self) headers: HeaderMap, pub(self) headers: HeaderMap,
pub(self) timeout: Millis, pub(self) timeout: Millis,
pub(self) response_pl_limit: usize,
pub(self) response_pl_timeout: Millis,
} }
impl Default for Client { impl Default for ClientConfig {
fn default() -> Self { fn default() -> Self {
Client(Rc::new(ClientConfig { ClientConfig {
connector: Box::new(ConnectorWrapper(Connector::default().finish().into())),
headers: HeaderMap::new(), headers: HeaderMap::new(),
timeout: Millis(5_000), timeout: Millis(5_000),
})) response_pl_limit: 262_144,
response_pl_timeout: Millis(10_000),
connector: Box::new(ConnectorWrapper(Connector::default().finish().into())),
}
} }
} }

View file

@ -1,6 +1,6 @@
use std::cell::{Ref, RefMut}; use std::cell::{Ref, RefMut};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, future::Future, marker::PhantomData, mem, pin::Pin}; use std::{fmt, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
@ -13,12 +13,13 @@ use crate::http::{HeaderMap, HttpMessage, Payload, ResponseHead, StatusCode, Ver
use crate::time::{Deadline, Millis}; use crate::time::{Deadline, Millis};
use crate::util::{Bytes, BytesMut, Extensions, Stream}; use crate::util::{Bytes, BytesMut, Extensions, Stream};
use super::error::JsonPayloadError; use super::{error::JsonPayloadError, ClientConfig};
/// Client Response /// Client Response
pub struct ClientResponse { pub struct ClientResponse {
pub(crate) head: ResponseHead, pub(crate) head: ResponseHead,
pub(crate) payload: Payload, pub(crate) payload: Payload,
config: Rc<ClientConfig>,
} }
impl HttpMessage for ClientResponse { impl HttpMessage for ClientResponse {
@ -58,12 +59,20 @@ impl HttpMessage for ClientResponse {
impl ClientResponse { impl ClientResponse {
/// Create new client response instance /// Create new client response instance
pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { pub(crate) fn new(
ClientResponse { head, payload } head: ResponseHead,
payload: Payload,
config: Rc<ClientConfig>,
) -> Self {
ClientResponse {
head,
payload,
config,
}
} }
pub(crate) fn with_empty_payload(head: ResponseHead) -> Self { pub(crate) fn with_empty_payload(head: ResponseHead, config: Rc<ClientConfig>) -> Self {
ClientResponse::new(head, Payload::None) ClientResponse::new(head, Payload::None, config)
} }
#[inline] #[inline]
@ -193,7 +202,11 @@ impl MessageBody {
MessageBody { MessageBody {
length: len, length: len,
err: None, err: None,
fut: Some(ReadBody::new(res.take_payload(), 262_144)), fut: Some(ReadBody::new(
res.take_payload(),
res.config.response_pl_limit,
res.config.response_pl_timeout,
)),
} }
} }
@ -236,7 +249,8 @@ impl Future for MessageBody {
} }
if let Some(len) = this.length.take() { if let Some(len) = this.length.take() {
if len > this.fut.as_ref().unwrap().limit { let limit = this.fut.as_ref().unwrap().limit;
if limit > 0 && len > limit {
return Poll::Ready(Err(PayloadError::Overflow)); return Poll::Ready(Err(PayloadError::Overflow));
} }
} }
@ -264,9 +278,9 @@ where
U: DeserializeOwned, U: DeserializeOwned,
{ {
/// Create `JsonBody` for request. /// Create `JsonBody` for request.
pub fn new(req: &mut ClientResponse) -> Self { pub fn new(res: &mut ClientResponse) -> Self {
// check content-type // check content-type
let json = if let Ok(Some(mime)) = req.mime_type() { let json = if let Ok(Some(mime)) = res.mime_type() {
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
} else { } else {
false false
@ -281,7 +295,7 @@ where
} }
let mut len = None; let mut len = None;
if let Some(l) = req.headers().get(&CONTENT_LENGTH) { if let Some(l) = res.headers().get(&CONTENT_LENGTH) {
if let Ok(s) = l.to_str() { if let Ok(s) = l.to_str() {
if let Ok(l) = s.parse::<usize>() { if let Ok(l) = s.parse::<usize>() {
len = Some(l) len = Some(l)
@ -292,7 +306,11 @@ where
JsonBody { JsonBody {
length: len, length: len,
err: None, err: None,
fut: Some(ReadBody::new(req.take_payload(), 65536)), fut: Some(ReadBody::new(
res.take_payload(),
res.config.response_pl_limit,
res.config.response_pl_timeout,
)),
_t: PhantomData, _t: PhantomData,
} }
} }
@ -331,7 +349,8 @@ where
} }
if let Some(len) = self.length.take() { if let Some(len) = self.length.take() {
if len > self.fut.as_ref().unwrap().limit { let limit = self.fut.as_ref().unwrap().limit;
if limit > 0 && len > limit {
return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow))); return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow)));
} }
} }
@ -353,12 +372,12 @@ struct ReadBody {
} }
impl ReadBody { impl ReadBody {
fn new(stream: Payload, limit: usize) -> Self { fn new(stream: Payload, limit: usize, timeout: Millis) -> Self {
Self { Self {
stream, stream,
limit, limit,
buf: BytesMut::with_capacity(std::cmp::min(limit, 32768)), buf: BytesMut::with_capacity(std::cmp::min(limit, 32768)),
timeout: Deadline::new(Millis(10000)), timeout: Deadline::new(timeout),
} }
} }
} }
@ -372,7 +391,7 @@ impl Future for ReadBody {
loop { loop {
return match Pin::new(&mut this.stream).poll_next(cx)? { return match Pin::new(&mut this.stream).poll_next(cx)? {
Poll::Ready(Some(chunk)) => { Poll::Ready(Some(chunk)) => {
if (this.buf.len() + chunk.len()) > this.limit { if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit {
Poll::Ready(Err(PayloadError::Overflow)) Poll::Ready(Err(PayloadError::Overflow))
} else { } else {
this.buf.extend_from_slice(&chunk); this.buf.extend_from_slice(&chunk);

View file

@ -16,8 +16,7 @@ use crate::http::encoding::Decoder;
use crate::http::Payload; use crate::http::Payload;
use super::error::{FreezeRequestError, InvalidUrl, SendRequestError}; use super::error::{FreezeRequestError, InvalidUrl, SendRequestError};
use super::response::ClientResponse; use super::{ClientConfig, ClientResponse};
use super::ClientConfig;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub(crate) enum PrepForSendingError { pub(crate) enum PrepForSendingError {
@ -136,8 +135,9 @@ impl RequestHeadType {
let fut = Box::pin(async move { let fut = Box::pin(async move {
config config
.clone()
.connector .connector
.send_request(self, body, addr, timeout) .send_request(self, body, addr, timeout, config)
.await .await
}); });

View file

@ -106,9 +106,9 @@ impl TestResponse {
} }
if let Some(pl) = self.payload { if let Some(pl) = self.payload {
ClientResponse::new(head, pl) ClientResponse::new(head, pl, Default::default())
} else { } else {
ClientResponse::new(head, h1::Payload::empty().into()) ClientResponse::new(head, h1::Payload::empty().into(), Default::default())
} }
} }
} }
@ -118,8 +118,8 @@ mod tests {
use super::*; use super::*;
use crate::http::header; use crate::http::header;
#[test] #[crate::rt_test]
fn test_basics() { async fn test_basics() {
let res = { let res = {
#[cfg(feature = "cookie")] #[cfg(feature = "cookie")]
{ {

View file

@ -64,8 +64,8 @@ impl Default for ReadRate {
fn default() -> Self { fn default() -> Self {
ReadRate { ReadRate {
rate: 256, rate: 256,
timeout: Seconds(1), timeout: Seconds(5),
max_timeout: Seconds(4), max_timeout: Seconds(15),
} }
} }
} }
@ -112,7 +112,7 @@ impl ServiceConfig {
headers_read_rate: Some(ReadRate { headers_read_rate: Some(ReadRate {
rate: 256, rate: 256,
timeout: client_timeout, timeout: client_timeout,
max_timeout: client_timeout + Seconds(3), max_timeout: client_timeout + Seconds(15),
}), }),
payload_read_rate: None, payload_read_rate: None,
} }

View file

@ -895,7 +895,7 @@ where
} }
} }
log::trace!("Timeout during reading"); log::trace!("{}: Timeout during reading", self.io.tag());
if self.flags.contains(Flags::READ_PL_TIMEOUT) { if self.flags.contains(Flags::READ_PL_TIMEOUT) {
self.set_payload_error(PayloadError::Io(io::Error::new( self.set_payload_error(PayloadError::Io(io::Error::new(
io::ErrorKind::TimedOut, io::ErrorKind::TimedOut,

View file

@ -13,7 +13,7 @@ use nanorand::{Rng, WyRand};
use crate::connect::{Connect, ConnectError, Connector}; use crate::connect::{Connect, ConnectError, Connector};
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue, AUTHORIZATION}; use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue, AUTHORIZATION};
use crate::http::{body::BodySize, client::ClientResponse, error::HttpError, h1}; use crate::http::{body::BodySize, client, client::ClientResponse, error::HttpError, h1};
use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri}; use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri};
use crate::io::{ use crate::io::{
Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed, Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed,
@ -35,6 +35,7 @@ pub struct WsClient<F, T> {
timeout: Millis, timeout: Millis,
extra_headers: RefCell<Option<HeaderMap>>, extra_headers: RefCell<Option<HeaderMap>>,
config: DispatcherConfig, config: DispatcherConfig,
client_cfg: Rc<client::ClientConfig>,
_t: marker::PhantomData<F>, _t: marker::PhantomData<F>,
} }
@ -243,7 +244,7 @@ where
// response and ws io // response and ws io
Ok(WsConnection::new( Ok(WsConnection::new(
io, io,
ClientResponse::with_empty_payload(response), ClientResponse::with_empty_payload(response, self.client_cfg.clone()),
if server_mode { if server_mode {
ws::Codec::new().max_size(max_size) ws::Codec::new().max_size(max_size)
} else { } else {
@ -639,6 +640,7 @@ where
timeout: inner.timeout, timeout: inner.timeout,
config: inner.config, config: inner.config,
extra_headers: RefCell::new(None), extra_headers: RefCell::new(None),
client_cfg: Default::default(),
_t: marker::PhantomData, _t: marker::PhantomData,
}) })
} }