diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index e900dfa0..5f079475 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -67,5 +67,6 @@ jobs: cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean - name: Clear the cargo caches + continue-on-error: true run: | cargo-cache diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 6eb9bf0c..64168731 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.17] - 2024-01-05 + +* Allow to set default response payload limit and timeout + ## [0.7.16] - 2023-12-15 * Stop timer before handling UPGRADE h1 requests diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index ff1d5e63..2547f3bc 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.16" +version = "0.7.17" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" diff --git a/ntex/src/http/client/builder.rs b/ntex/src/http/client/builder.rs index ee905b34..747754a3 100644 --- a/ntex/src/http/client/builder.rs +++ b/ntex/src/http/client/builder.rs @@ -37,6 +37,8 @@ impl ClientBuilder { config: ClientConfig { headers: HeaderMap::new(), timeout: Millis(5_000), + response_pl_limit: 262_144, + response_pl_timeout: Millis(10_000), connector: Box::new(ConnectorWrapper(Connector::default().finish().into())), }, } @@ -91,6 +93,22 @@ impl ClientBuilder { self } + /// Max size of response payload. + /// By default max size is 256Kb + pub fn response_payload_limit(mut self, limit: usize) -> Self { + self.config.response_pl_limit = limit; + self + } + + /// Set response timeout. + /// + /// Response payload timeout is the total time before a payload must be received. + /// Default value is 10 seconds. + pub fn response_payload_timeout(mut self, timeout: Millis) -> Self { + self.config.response_pl_timeout = timeout; + self + } + /// Add default header. Headers added by this method /// get added to every request. pub fn header(mut self, key: K, value: V) -> Self diff --git a/ntex/src/http/client/connect.rs b/ntex/src/http/client/connect.rs index 2a164447..bd5a3096 100644 --- a/ntex/src/http/client/connect.rs +++ b/ntex/src/http/client/connect.rs @@ -1,11 +1,11 @@ -use std::{fmt, net}; +use std::{fmt, net, rc::Rc}; use crate::http::{body::Body, RequestHeadType}; use crate::{service::Pipeline, service::Service, time::Millis, util::BoxFuture}; use super::error::{ConnectError, SendRequestError}; use super::response::ClientResponse; -use super::{Connect as ClientConnect, Connection}; +use super::{ClientConfig, Connect as ClientConnect, Connection}; pub(super) struct ConnectorWrapper(pub(crate) Pipeline); @@ -27,6 +27,7 @@ pub(super) trait Connect: fmt::Debug { body: Body, addr: Option, timeout: Millis, + cfg: Rc, ) -> BoxFuture<'_, Result>; } @@ -40,6 +41,7 @@ where body: Body, addr: Option, timeout: Millis, + cfg: Rc, ) -> BoxFuture<'_, Result> { Box::pin(async move { // connect to the host @@ -54,7 +56,7 @@ where connection .send_request(head, body, timeout) .await - .map(|(head, payload)| ClientResponse::new(head, payload)) + .map(|(head, payload)| ClientResponse::new(head, payload, cfg)) }) } } diff --git a/ntex/src/http/client/frozen.rs b/ntex/src/http/client/frozen.rs index cad8c503..93424686 100644 --- a/ntex/src/http/client/frozen.rs +++ b/ntex/src/http/client/frozen.rs @@ -6,8 +6,7 @@ use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue}; use crate::http::{Method, RequestHead, RequestHeadType, Uri}; use crate::{time::Millis, util::Bytes, util::Stream}; -use super::sender::SendClientRequest; -use super::ClientConfig; +use super::{sender::SendClientRequest, ClientConfig}; /// `FrozenClientRequest` struct represents clonable client request. /// It could be used to send same request multiple times. diff --git a/ntex/src/http/client/mod.rs b/ntex/src/http/client/mod.rs index 9d2f0867..053ffed5 100644 --- a/ntex/src/http/client/mod.rs +++ b/ntex/src/http/client/mod.rs @@ -70,23 +70,27 @@ pub struct Connect { /// println!("Response: {:?}", res); /// } /// ``` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Client(Rc); #[derive(Debug)] -struct ClientConfig { +pub(crate) struct ClientConfig { pub(self) connector: Box, pub(self) headers: HeaderMap, pub(self) timeout: Millis, + pub(self) response_pl_limit: usize, + pub(self) response_pl_timeout: Millis, } -impl Default for Client { +impl Default for ClientConfig { fn default() -> Self { - Client(Rc::new(ClientConfig { - connector: Box::new(ConnectorWrapper(Connector::default().finish().into())), + ClientConfig { headers: HeaderMap::new(), timeout: Millis(5_000), - })) + response_pl_limit: 262_144, + response_pl_timeout: Millis(10_000), + connector: Box::new(ConnectorWrapper(Connector::default().finish().into())), + } } } diff --git a/ntex/src/http/client/response.rs b/ntex/src/http/client/response.rs index f8710fe3..beb49041 100644 --- a/ntex/src/http/client/response.rs +++ b/ntex/src/http/client/response.rs @@ -1,6 +1,6 @@ use std::cell::{Ref, RefMut}; use std::task::{Context, Poll}; -use std::{fmt, future::Future, marker::PhantomData, mem, pin::Pin}; +use std::{fmt, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc}; use serde::de::DeserializeOwned; @@ -13,12 +13,13 @@ use crate::http::{HeaderMap, HttpMessage, Payload, ResponseHead, StatusCode, Ver use crate::time::{Deadline, Millis}; use crate::util::{Bytes, BytesMut, Extensions, Stream}; -use super::error::JsonPayloadError; +use super::{error::JsonPayloadError, ClientConfig}; /// Client Response pub struct ClientResponse { pub(crate) head: ResponseHead, pub(crate) payload: Payload, + config: Rc, } impl HttpMessage for ClientResponse { @@ -58,12 +59,20 @@ impl HttpMessage for ClientResponse { impl ClientResponse { /// Create new client response instance - pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { - ClientResponse { head, payload } + pub(crate) fn new( + head: ResponseHead, + payload: Payload, + config: Rc, + ) -> Self { + ClientResponse { + head, + payload, + config, + } } - pub(crate) fn with_empty_payload(head: ResponseHead) -> Self { - ClientResponse::new(head, Payload::None) + pub(crate) fn with_empty_payload(head: ResponseHead, config: Rc) -> Self { + ClientResponse::new(head, Payload::None, config) } #[inline] @@ -193,7 +202,11 @@ impl MessageBody { MessageBody { length: len, err: None, - fut: Some(ReadBody::new(res.take_payload(), 262_144)), + fut: Some(ReadBody::new( + res.take_payload(), + res.config.response_pl_limit, + res.config.response_pl_timeout, + )), } } @@ -236,7 +249,8 @@ impl Future for MessageBody { } if let Some(len) = this.length.take() { - if len > this.fut.as_ref().unwrap().limit { + let limit = this.fut.as_ref().unwrap().limit; + if limit > 0 && len > limit { return Poll::Ready(Err(PayloadError::Overflow)); } } @@ -264,9 +278,9 @@ where U: DeserializeOwned, { /// Create `JsonBody` for request. - pub fn new(req: &mut ClientResponse) -> Self { + pub fn new(res: &mut ClientResponse) -> Self { // check content-type - let json = if let Ok(Some(mime)) = req.mime_type() { + let json = if let Ok(Some(mime)) = res.mime_type() { mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) } else { false @@ -281,7 +295,7 @@ where } let mut len = None; - if let Some(l) = req.headers().get(&CONTENT_LENGTH) { + if let Some(l) = res.headers().get(&CONTENT_LENGTH) { if let Ok(s) = l.to_str() { if let Ok(l) = s.parse::() { len = Some(l) @@ -292,7 +306,11 @@ where JsonBody { length: len, err: None, - fut: Some(ReadBody::new(req.take_payload(), 65536)), + fut: Some(ReadBody::new( + res.take_payload(), + res.config.response_pl_limit, + res.config.response_pl_timeout, + )), _t: PhantomData, } } @@ -331,7 +349,8 @@ where } if let Some(len) = self.length.take() { - if len > self.fut.as_ref().unwrap().limit { + let limit = self.fut.as_ref().unwrap().limit; + if limit > 0 && len > limit { return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow))); } } @@ -353,12 +372,12 @@ struct ReadBody { } impl ReadBody { - fn new(stream: Payload, limit: usize) -> Self { + fn new(stream: Payload, limit: usize, timeout: Millis) -> Self { Self { stream, limit, buf: BytesMut::with_capacity(std::cmp::min(limit, 32768)), - timeout: Deadline::new(Millis(10000)), + timeout: Deadline::new(timeout), } } } @@ -372,7 +391,7 @@ impl Future for ReadBody { loop { return match Pin::new(&mut this.stream).poll_next(cx)? { Poll::Ready(Some(chunk)) => { - if (this.buf.len() + chunk.len()) > this.limit { + if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit { Poll::Ready(Err(PayloadError::Overflow)) } else { this.buf.extend_from_slice(&chunk); diff --git a/ntex/src/http/client/sender.rs b/ntex/src/http/client/sender.rs index 72955334..d2c91c89 100644 --- a/ntex/src/http/client/sender.rs +++ b/ntex/src/http/client/sender.rs @@ -16,8 +16,7 @@ use crate::http::encoding::Decoder; use crate::http::Payload; use super::error::{FreezeRequestError, InvalidUrl, SendRequestError}; -use super::response::ClientResponse; -use super::ClientConfig; +use super::{ClientConfig, ClientResponse}; #[derive(thiserror::Error, Debug)] pub(crate) enum PrepForSendingError { @@ -136,8 +135,9 @@ impl RequestHeadType { let fut = Box::pin(async move { config + .clone() .connector - .send_request(self, body, addr, timeout) + .send_request(self, body, addr, timeout, config) .await }); diff --git a/ntex/src/http/client/test.rs b/ntex/src/http/client/test.rs index 70c44139..d9198264 100644 --- a/ntex/src/http/client/test.rs +++ b/ntex/src/http/client/test.rs @@ -106,9 +106,9 @@ impl TestResponse { } if let Some(pl) = self.payload { - ClientResponse::new(head, pl) + ClientResponse::new(head, pl, Default::default()) } else { - ClientResponse::new(head, h1::Payload::empty().into()) + ClientResponse::new(head, h1::Payload::empty().into(), Default::default()) } } } @@ -118,8 +118,8 @@ mod tests { use super::*; use crate::http::header; - #[test] - fn test_basics() { + #[crate::rt_test] + async fn test_basics() { let res = { #[cfg(feature = "cookie")] { diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index 1dff57ed..c5a3e65e 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -64,8 +64,8 @@ impl Default for ReadRate { fn default() -> Self { ReadRate { rate: 256, - timeout: Seconds(1), - max_timeout: Seconds(4), + timeout: Seconds(5), + max_timeout: Seconds(15), } } } @@ -112,7 +112,7 @@ impl ServiceConfig { headers_read_rate: Some(ReadRate { rate: 256, timeout: client_timeout, - max_timeout: client_timeout + Seconds(3), + max_timeout: client_timeout + Seconds(15), }), payload_read_rate: None, } diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 3114086c..d6c114b1 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -895,7 +895,7 @@ where } } - log::trace!("Timeout during reading"); + log::trace!("{}: Timeout during reading", self.io.tag()); if self.flags.contains(Flags::READ_PL_TIMEOUT) { self.set_payload_error(PayloadError::Io(io::Error::new( io::ErrorKind::TimedOut, diff --git a/ntex/src/ws/client.rs b/ntex/src/ws/client.rs index 615e277b..e36f185f 100644 --- a/ntex/src/ws/client.rs +++ b/ntex/src/ws/client.rs @@ -13,7 +13,7 @@ use nanorand::{Rng, WyRand}; use crate::connect::{Connect, ConnectError, Connector}; use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue, AUTHORIZATION}; -use crate::http::{body::BodySize, client::ClientResponse, error::HttpError, h1}; +use crate::http::{body::BodySize, client, client::ClientResponse, error::HttpError, h1}; use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri}; use crate::io::{ Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed, @@ -35,6 +35,7 @@ pub struct WsClient { timeout: Millis, extra_headers: RefCell>, config: DispatcherConfig, + client_cfg: Rc, _t: marker::PhantomData, } @@ -243,7 +244,7 @@ where // response and ws io Ok(WsConnection::new( io, - ClientResponse::with_empty_payload(response), + ClientResponse::with_empty_payload(response, self.client_cfg.clone()), if server_mode { ws::Codec::new().max_size(max_size) } else { @@ -639,6 +640,7 @@ where timeout: inner.timeout, config: inner.config, extra_headers: RefCell::new(None), + client_cfg: Default::default(), _t: marker::PhantomData, }) }