Refactor http/1 keep-alive timer (#248)

This commit is contained in:
Nikolay Kim 2023-11-14 16:54:04 +06:00 committed by GitHub
parent f07c0576a7
commit f9759a4ddc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 403 additions and 197 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.7.11] - 2023-11-xx
* Refactor http/1 timeouts
## [0.7.10] - 2023-11-12
* Start http client timeout after sending body

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.7.10"
version = "0.7.11"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -12,36 +12,34 @@ use crate::http::response::Response;
use crate::http::service::HttpService;
use crate::io::{Filter, Io, IoRef};
use crate::service::{boxed, IntoService, IntoServiceFactory, Service, ServiceFactory};
use crate::time::{Millis, Seconds};
use crate::time::Seconds;
/// A http service builder
///
/// This type can be used to construct an instance of `http service` through a
/// builder-like pattern.
pub struct HttpServiceBuilder<F, S, X = ExpectHandler, U = UpgradeHandler<F>> {
keep_alive: KeepAlive,
client_timeout: Millis,
client_disconnect: Seconds,
handshake_timeout: Millis,
config: ServiceConfig,
expect: X,
upgrade: Option<U>,
on_request: Option<OnRequest>,
h2config: h2::Config,
_t: PhantomData<(F, S)>,
}
impl<F, S> HttpServiceBuilder<F, S, ExpectHandler, UpgradeHandler<F>> {
/// Create instance of `ServiceConfigBuilder`
pub fn new() -> Self {
HttpServiceBuilder::with_config(ServiceConfig::default())
}
#[doc(hidden)]
/// Create instance of `ServiceConfigBuilder`
pub fn with_config(config: ServiceConfig) -> Self {
HttpServiceBuilder {
keep_alive: KeepAlive::Timeout(Seconds(5)),
client_timeout: Millis::from_secs(3),
client_disconnect: Seconds(3),
handshake_timeout: Millis::from_secs(5),
config,
expect: ExpectHandler,
upgrade: None,
on_request: None,
h2config: h2::Config::server(),
_t: PhantomData,
}
}
@ -64,10 +62,11 @@ where
///
/// By default keep alive is set to a 5 seconds.
pub fn keep_alive<W: Into<KeepAlive>>(mut self, val: W) -> Self {
self.keep_alive = val.into();
self.config.keepalive(val);
self
}
#[deprecated(since = "0.7.11", note = "Use .headers_read_rate() method")]
/// Set server client timeout for first request.
///
/// Defines a timeout for reading client request header. If a client does not transmit
@ -78,8 +77,7 @@ where
///
/// By default client timeout is set to 3 seconds.
pub fn client_timeout(mut self, timeout: Seconds) -> Self {
self.client_timeout = timeout.into();
self.h2config.client_timeout(timeout);
self.config.client_timeout(timeout);
self
}
@ -92,8 +90,7 @@ where
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self {
self.client_disconnect = timeout;
self.h2config.disconnect_timeout(timeout);
self.config.disconnect_timeout(timeout);
self
}
@ -104,8 +101,41 @@ where
///
/// By default handshake timeout is set to 5 seconds.
pub fn ssl_handshake_timeout(mut self, timeout: Seconds) -> Self {
self.handshake_timeout = timeout.into();
self.h2config.handshake_timeout(timeout);
self.config.ssl_handshake_timeout(timeout);
self
}
/// Set read rate parameters for request headers.
///
/// Set max timeout for reading request headers. If the client
/// sends `rate` amount of data, increase the timeout by 1 second for every.
/// But no more than `max_timeout` timeout.
///
/// By default headers read rate is set to 1sec with max timeout 5sec.
pub fn headers_read_rate(
mut self,
timeout: Seconds,
max_timeout: Seconds,
rate: u16,
) -> Self {
self.config.headers_read_rate(timeout, max_timeout, rate);
self
}
/// Set read rate parameters for request's payload.
///
/// Set max timeout for reading payload. If the client
/// sends `rate` amount of data, increase the timeout by 1 second for every.
/// But no more than `max_timeout` timeout.
///
/// By default payload read rate is disabled.
pub fn payload_read_rate(
mut self,
timeout: Seconds,
max_timeout: Seconds,
rate: u16,
) -> Self {
self.config.payload_read_rate(timeout, max_timeout, rate);
self
}
@ -115,7 +145,7 @@ where
where
O: FnOnce(&h2::Config) -> R,
{
let _ = f(&self.h2config);
let _ = f(&self.config.h2config);
self
}
@ -131,14 +161,10 @@ where
X1::InitError: fmt::Debug,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
client_timeout: self.client_timeout,
client_disconnect: self.client_disconnect,
handshake_timeout: self.handshake_timeout,
config: self.config,
expect: expect.into_factory(),
upgrade: self.upgrade,
on_request: self.on_request,
h2config: self.h2config,
_t: PhantomData,
}
}
@ -155,14 +181,10 @@ where
U1::InitError: fmt::Debug,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
client_timeout: self.client_timeout,
client_disconnect: self.client_disconnect,
handshake_timeout: self.handshake_timeout,
config: self.config,
expect: self.expect,
upgrade: Some(upgrade.into_factory()),
on_request: self.on_request,
h2config: self.h2config,
_t: PhantomData,
}
}
@ -188,14 +210,7 @@ where
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
{
let cfg = ServiceConfig::new(
self.keep_alive,
self.client_timeout,
self.client_disconnect,
self.handshake_timeout,
self.h2config,
);
H1Service::with_config(cfg, service.into_factory())
H1Service::with_config(self.config, service.into_factory())
.expect(self.expect)
.upgrade(self.upgrade)
.on_request(self.on_request)
@ -210,15 +225,7 @@ where
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
{
let cfg = ServiceConfig::new(
self.keep_alive,
self.client_timeout,
self.client_disconnect,
self.handshake_timeout,
self.h2config,
);
H2Service::with_config(cfg, service.into_factory())
H2Service::with_config(self.config, service.into_factory())
}
/// Finish service configuration and create `HttpService` instance.
@ -230,14 +237,7 @@ where
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
{
let cfg = ServiceConfig::new(
self.keep_alive,
self.client_timeout,
self.client_disconnect,
self.handshake_timeout,
self.h2config,
);
HttpService::with_config(cfg, service.into_factory())
HttpService::with_config(self.config, service.into_factory())
.expect(self.expect)
.upgrade(self.upgrade)
.on_request(self.on_request)

View file

@ -40,24 +40,33 @@ impl From<Option<usize>> for KeepAlive {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
/// Http service configuration
pub struct ServiceConfig(pub(super) Rc<Inner>);
#[derive(Debug)]
pub(super) struct Inner {
pub struct ServiceConfig {
pub(super) keep_alive: Millis,
pub(super) client_timeout: Millis,
pub(super) client_disconnect: Seconds,
pub(super) ka_enabled: bool,
pub(super) timer: DateService,
pub(super) ssl_handshake_timeout: Millis,
pub(super) h2config: h2::Config,
pub(super) headers_read_rate: Option<ReadRate>,
pub(super) payload_read_rate: Option<ReadRate>,
pub(super) timer: DateService,
}
impl Clone for ServiceConfig {
fn clone(&self) -> Self {
ServiceConfig(self.0.clone())
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(super) struct ReadRate {
pub(super) rate: u16,
pub(super) timeout: time::Duration,
pub(super) max_timeout: time::Duration,
}
impl Default for ReadRate {
fn default() -> Self {
ReadRate {
rate: 256,
timeout: time::Duration::from_secs(1),
max_timeout: time::Duration::from_secs(4),
}
}
}
@ -89,15 +98,133 @@ impl ServiceConfig {
};
let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO };
ServiceConfig(Rc::new(Inner {
keep_alive,
ka_enabled,
client_timeout,
ServiceConfig {
client_disconnect,
ssl_handshake_timeout,
h2config,
keep_alive,
ka_enabled,
timer: DateService::new(),
}))
headers_read_rate: Some(ReadRate {
rate: 256,
timeout: client_timeout.into(),
max_timeout: (client_timeout + Millis(3_000)).into(),
}),
payload_read_rate: None,
}
}
pub(crate) fn client_timeout(&mut self, timeout: Seconds) {
if timeout.is_zero() {
self.headers_read_rate = None;
} else {
let mut rate = self.headers_read_rate.clone().unwrap_or_default();
rate.timeout = timeout.into();
self.headers_read_rate = Some(rate);
}
}
/// Set server keep-alive setting
///
/// By default keep alive is set to a 5 seconds.
pub fn keepalive<W: Into<KeepAlive>>(&mut self, val: W) -> &mut Self {
let (keep_alive, ka_enabled) = match val.into() {
KeepAlive::Timeout(val) => (Millis::from(val), true),
KeepAlive::Os => (Millis::ZERO, true),
KeepAlive::Disabled => (Millis::ZERO, false),
};
let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO };
self.keep_alive = keep_alive;
self.ka_enabled = ka_enabled;
self
}
/// Set keep-alive timeout in seconds.
///
/// To disable timeout set value to 0.
///
/// By default keep-alive timeout is set to 30 seconds.
pub fn keepalive_timeout(&mut self, timeout: Seconds) -> &mut Self {
self.keep_alive = timeout.into();
self.ka_enabled = !timeout.is_zero();
self
}
/// Set connection disconnect timeout.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 1 seconds.
pub fn disconnect_timeout(&mut self, timeout: Seconds) -> &mut Self {
self.client_disconnect = timeout;
self.h2config.disconnect_timeout(timeout);
self
}
/// Set server ssl handshake timeout.
///
/// Defines a timeout for connection ssl handshake negotiation.
/// To disable timeout set value to 0.
///
/// By default handshake timeout is set to 5 seconds.
pub fn ssl_handshake_timeout(&mut self, timeout: Seconds) -> &mut Self {
self.ssl_handshake_timeout = timeout.into();
self.h2config.handshake_timeout(timeout);
self
}
/// Set read rate parameters for request headers.
///
/// Set max timeout for reading request headers. If the client
/// sends `rate` amount of data, increase the timeout by 1 second for every.
/// But no more than `max_timeout` timeout.
///
/// By default headers read rate is set to 1sec with max timeout 5sec.
pub fn headers_read_rate(
&mut self,
timeout: Seconds,
max_timeout: Seconds,
rate: u16,
) -> &mut Self {
if !timeout.is_zero() {
self.headers_read_rate = Some(ReadRate {
rate,
timeout: timeout.into(),
max_timeout: max_timeout.into(),
});
} else {
self.headers_read_rate = None;
}
self
}
/// Set read rate parameters for request's payload.
///
/// Set max timeout for reading payload. If the client
/// sends `rate` amount of data, increase the timeout by 1 second for every.
/// But no more than `max_timeout` timeout.
///
/// By default payload read rate is disabled.
pub fn payload_read_rate(
&mut self,
timeout: Seconds,
max_timeout: Seconds,
rate: u16,
) -> &mut Self {
if !timeout.is_zero() {
self.payload_read_rate = Some(ReadRate {
rate,
timeout: timeout.into(),
max_timeout: max_timeout.into(),
});
} else {
self.payload_read_rate = None;
}
self
}
}
@ -108,10 +235,11 @@ pub(super) struct DispatcherConfig<S, X, U> {
pub(super) expect: Pipeline<X>,
pub(super) upgrade: Option<Pipeline<U>>,
pub(super) keep_alive: Duration,
pub(super) client_timeout: Duration,
pub(super) client_disconnect: Seconds,
pub(super) h2config: h2::Config,
pub(super) ka_enabled: bool,
pub(super) headers_read_rate: Option<ReadRate>,
pub(super) payload_read_rate: Option<ReadRate>,
pub(super) timer: DateService,
pub(super) on_request: Option<Pipeline<OnRequest>>,
}
@ -129,12 +257,13 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
expect: expect.into(),
upgrade: upgrade.map(|v| v.into()),
on_request: on_request.map(|v| v.into()),
keep_alive: Duration::from(cfg.0.keep_alive),
client_timeout: Duration::from(cfg.0.client_timeout),
client_disconnect: cfg.0.client_disconnect,
ka_enabled: cfg.0.ka_enabled,
h2config: cfg.0.h2config.clone(),
timer: cfg.0.timer.clone(),
keep_alive: Duration::from(cfg.keep_alive),
client_disconnect: cfg.client_disconnect.into(),
ka_enabled: cfg.ka_enabled,
headers_read_rate: cfg.headers_read_rate,
payload_read_rate: cfg.payload_read_rate,
h2config: cfg.h2config.clone(),
timer: cfg.timer.clone(),
}
}
@ -142,6 +271,14 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
pub(super) fn keep_alive_enabled(&self) -> bool {
self.ka_enabled
}
pub(super) fn headers_read_rate(&self) -> Option<&ReadRate> {
self.headers_read_rate.as_ref()
}
pub(super) fn payload_read_rate(&self) -> Option<&ReadRate> {
self.payload_read_rate.as_ref()
}
}
const DATE_VALUE_LENGTH_HDR: usize = 39;

View file

@ -82,7 +82,7 @@ impl Codec {
flags: Cell::new(flags),
decoder: decoder::MessageDecoder::default(),
version: Cell::new(Version::HTTP_11),
ctype: Cell::new(ConnectionType::Close),
ctype: Cell::new(ConnectionType::KeepAlive),
encoder: encoder::MessageEncoder::default(),
}
}
@ -99,12 +99,6 @@ impl Codec {
self.ctype.get() == ConnectionType::KeepAlive
}
#[inline]
/// Check if keep-alive enabled on server level
pub fn keepalive_enabled(&self) -> bool {
self.flags.get().contains(Flags::KEEPALIVE_ENABLED)
}
pub(super) fn set_ctype(&self, ctype: ConnectionType) {
self.ctype.set(ctype)
}
@ -139,11 +133,14 @@ impl Decoder for Codec {
flags.set(Flags::HEAD, head.method == Method::HEAD);
self.flags.set(flags);
self.version.set(head.version);
self.ctype.set(head.connection_type());
if self.ctype.get() == ConnectionType::KeepAlive
let ctype = head.connection_type();
if ctype == ConnectionType::KeepAlive
&& !flags.contains(Flags::KEEPALIVE_ENABLED)
{
self.ctype.set(ConnectionType::Close)
} else {
self.ctype.set(ctype)
}
if let PayloadType::Stream(_) = payload {
@ -249,6 +246,6 @@ mod tests {
);
let _item = codec.decode(&mut buf).unwrap().unwrap();
assert!(codec.upgrade());
assert!(!codec.keepalive_enabled());
assert!(!codec.keepalive());
}
}

View file

@ -1,9 +1,12 @@
//! Framed transport dispatcher
use std::task::{Context, Poll};
use std::{cell::RefCell, error::Error, future::Future, io, marker, pin::Pin, rc::Rc};
use std::{
cell::RefCell, error::Error, future::Future, io, marker, pin::Pin, rc::Rc, time,
};
use crate::io::{Filter, Io, IoBoxed, IoRef, IoStatusUpdate, RecvError};
use crate::service::{Pipeline, PipelineCall, Service};
use crate::time::now;
use crate::util::{ready, Bytes};
use crate::http;
@ -18,21 +21,22 @@ use super::decoder::{PayloadDecoder, PayloadItem, PayloadType};
use super::payload::{Payload, PayloadSender, PayloadStatus};
use super::{codec::Codec, Message};
const ONE_SEC: time::Duration = time::Duration::from_secs(1);
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Flags: u16 {
/// We parsed one complete request message
const STARTED = 0b0000_0001;
/// Keep-alive is enabled on current connection
const KEEPALIVE = 0b0000_0010;
/// Keep-alive is registered
const KEEPALIVE_REG = 0b0000_0100;
pub struct Flags: u8 {
/// Upgrade request
const UPGRADE = 0b0000_1000;
const UPGRADE = 0b0000_0001;
/// Handling upgrade
const UPGRADE_HND = 0b0001_0000;
const UPGRADE_HND = 0b0000_0010;
/// Stop after sending payload
const SENDPAYLOAD_AND_STOP = 0b0010_0000;
const SENDPAYLOAD_AND_STOP = 0b0000_0100;
/// Read headers timer is enabled
const READ_HDRS_TIMEOUT = 0b0001_0000;
/// Read headers payload is enabled
const READ_PL_TIMEOUT = 0b0010_0000;
}
}
@ -54,8 +58,6 @@ enum State<B> {
Call,
#[error("State::ReadRequest")]
ReadRequest,
#[error("State::ReadFirstRequest")]
ReadFirstRequest,
#[error("State::ReadPayload")]
ReadPayload,
#[error("State::SendPayload")]
@ -93,6 +95,8 @@ struct DispatcherInner<F, S, B, X, U> {
config: Rc<DispatcherConfig<S, X, U>>,
error: Option<DispatchError>,
payload: Option<(PayloadDecoder, PayloadSender)>,
read_bytes: u32,
read_max_timeout: time::Instant,
_t: marker::PhantomData<(S, B)>,
}
@ -113,16 +117,16 @@ where
io.set_disconnect_timeout(config.client_disconnect);
// slow-request timer
let flags = if config.client_timeout.is_zero() {
Flags::empty()
let flags = if let Some(cfg) = config.headers_read_rate() {
io.start_timer(cfg.timeout);
Flags::READ_HDRS_TIMEOUT
} else {
io.start_timer(config.client_timeout);
Flags::KEEPALIVE_REG
Flags::empty()
};
Dispatcher {
call: CallState::None,
st: State::ReadFirstRequest,
st: State::ReadRequest,
inner: DispatcherInner {
io,
flags,
@ -130,6 +134,8 @@ where
config,
error: None,
payload: None,
read_bytes: 0,
read_max_timeout: now(),
_t: marker::PhantomData,
},
}
@ -307,7 +313,7 @@ where
*this.st = State::Stop;
this.inner.error = Some(e);
} else {
*this.st = this.inner.switch_to_read_request();
*this.st = State::ReadRequest;
}
}
// send response body
@ -374,11 +380,6 @@ where
}
}
}
// read first request and call service
State::ReadFirstRequest => {
*this.st = ready!(this.inner.read_request(cx, &mut this.call));
this.inner.flags.insert(Flags::STARTED);
}
// stop io tasks and call upgrade service
State::Upgrade(ref mut req) => {
let io = this.inner.io.take();
@ -401,7 +402,7 @@ where
}
// prepare to shutdown
State::Stop => {
this.inner.unregister_keepalive();
this.inner.io.stop_timer();
return if let Err(e) = ready!(this.inner.io.poll_shutdown(cx)) {
// get io error
@ -432,29 +433,6 @@ where
B: MessageBody,
X: Service<Request>,
{
fn switch_to_read_request(&mut self) -> State<B> {
// connection is not keep-alive, disconnect
if !self.flags.contains(Flags::KEEPALIVE) || !self.codec.keepalive_enabled() {
self.io.close();
State::Stop
} else {
// register keep-alive timer
if self.flags.contains(Flags::KEEPALIVE) {
self.flags.remove(Flags::KEEPALIVE);
self.flags.insert(Flags::KEEPALIVE_REG);
self.io.start_timer(self.config.keep_alive);
}
State::ReadRequest
}
}
fn unregister_keepalive(&mut self) {
if self.flags.contains(Flags::KEEPALIVE_REG) {
self.io.stop_keepalive_timer();
self.flags.remove(Flags::KEEPALIVE | Flags::KEEPALIVE_REG);
}
}
fn handle_error<E>(&mut self, err: E, critical: bool) -> State<B>
where
E: ResponseError + 'static,
@ -477,27 +455,28 @@ where
}
}
/// Handle normal requests
fn service_call(&self, req: Request) -> CallState<S, X> {
// Handle normal requests
CallState::Service {
fut: self.config.service.call_nowait(req),
}
}
/// Handle filter fut
fn service_filter(&self, req: Request, f: &Pipeline<OnRequest>) -> CallState<S, X> {
// Handle filter fut
CallState::Filter {
fut: f.call_nowait((req, self.io.get_ref())),
}
}
/// Handle normal requests with EXPECT: 100-Continue` header
fn service_expect(&self, req: Request) -> CallState<S, X> {
// Handle normal requests with EXPECT: 100-Continue` header
CallState::Expect {
fut: self.config.expect.call_nowait(req),
}
}
/// Handle upgrade requests
fn service_upgrade(&mut self, mut req: Request) -> CallState<S, X> {
// Move io into request
let io: IoBoxed = self.io.take().into();
@ -505,7 +484,6 @@ where
io.get_ref(),
RefCell::new(Some(Box::new((io, self.codec.clone())))),
)));
// Handle upgrade requests
CallState::ServiceUpgrade {
fut: self.config.service.call_nowait(req),
}
@ -519,19 +497,28 @@ where
log::trace!("trying to read http message");
loop {
let result = ready!(self.io.poll_recv(&self.codec, cx));
// let result = ready!(self.io.poll_recv(&self.codec, cx));
let result = match self.io.poll_recv_decode(&self.codec, cx) {
Ok(decoded) => {
if let Some(st) =
self.update_timer(decoded.item.is_some(), decoded.remains)
{
return Poll::Ready(st);
}
if let Some(item) = decoded.item {
Ok(item)
} else {
return Poll::Pending;
}
}
Err(err) => Err(err),
};
// decode incoming bytes stream
return match result {
Ok((mut req, pl)) => {
log::trace!("http message is received: {:?} and payload {:?}", req, pl);
// keep-alive timer
if self.flags.contains(Flags::KEEPALIVE_REG) {
self.flags.remove(Flags::KEEPALIVE_REG);
self.io.stop_keepalive_timer();
}
// configure request payload
let upgrade = match pl {
PayloadType::None => false,
@ -602,12 +589,13 @@ where
Poll::Ready(State::Stop)
}
Err(RecvError::KeepAlive) => {
// keep-alive timeout
if !self.flags.contains(Flags::STARTED) {
if self.flags.contains(Flags::READ_HDRS_TIMEOUT) {
log::trace!("slow request timeout");
let (req, body) = Response::RequestTimeout().finish().into_parts();
let _ = self.send_response(req, body.into_body());
self.error = Some(DispatchError::SlowRequestTimeout);
} else if self.flags.contains(Flags::READ_PL_TIMEOUT) {
log::trace!("slow payload timeout");
} else {
log::trace!("keep-alive timeout, close connection");
}
@ -638,8 +626,6 @@ where
if result.is_err() {
State::Stop
} else {
self.flags.set(Flags::KEEPALIVE, self.codec.keepalive());
match body.size() {
BodySize::None | BodySize::Empty => {
if self.error.is_some() {
@ -647,7 +633,7 @@ where
} else if self.payload.is_some() {
State::ReadPayload
} else {
self.switch_to_read_request()
State::ReadRequest
}
}
_ => State::SendPayload { body },
@ -681,7 +667,7 @@ where
} else if self.payload.is_some() {
Some(State::ReadPayload)
} else {
Some(self.switch_to_read_request())
Some(State::ReadRequest)
}
}
Some(Err(e)) => {
@ -712,6 +698,61 @@ where
Poll::Ready(IoStatusUpdate::WriteBackpressure) => false,
}
}
fn update_timer(&mut self, received: bool, remains: usize) -> Option<State<B>> {
// we got parsed frame
if received {
// remove all timers
self.flags
.remove(Flags::READ_HDRS_TIMEOUT | Flags::READ_PL_TIMEOUT);
self.io.stop_timer();
} else if self.flags.contains(Flags::READ_HDRS_TIMEOUT) {
// update read timer
if let Some(ref cfg) = self.config.headers_read_rate {
let bytes = remains as u32;
let delta = (bytes - self.read_bytes).try_into().unwrap_or(u16::MAX);
if delta >= cfg.rate {
let n = now();
let next = self.io.timer_deadline() + ONE_SEC;
let new_timeout = if n >= next { ONE_SEC } else { next - n };
// max timeout
if cfg.max_timeout.is_zero()
|| (n + new_timeout) <= self.read_max_timeout
{
self.read_bytes = bytes;
self.io.stop_timer();
self.io.start_timer(new_timeout);
}
}
}
} else {
// no new data then start keep-alive timer
if remains == 0 {
if self.codec.keepalive() {
if self.config.keep_alive_enabled() {
self.io.start_timer(self.config.keep_alive);
}
} else {
self.io.close();
return Some(State::Stop);
}
} else if let Some(ref cfg) = self.config.headers_read_rate {
// we got new data but not enough to parse single frame
// start read timer
self.flags.insert(Flags::READ_HDRS_TIMEOUT);
self.read_bytes = remains as u32;
self.io.start_timer(cfg.timeout);
if !cfg.max_timeout.is_zero() {
self.read_max_timeout = now() + cfg.max_timeout;
}
}
}
None
}
}
/// Process request's payload
@ -973,7 +1014,6 @@ mod tests {
#[crate::rt_test]
async fn test_pipeline_with_payload() {
env_logger::init();
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
let mut decoder = ClientCodec::default();

View file

@ -6,7 +6,7 @@ use crate::http::error::{DispatchError, ResponseError};
use crate::http::{request::Request, response::Response};
use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::{time::Millis, util::BoxFuture};
use crate::util::BoxFuture;
use super::codec::Codec;
use super::dispatcher::Dispatcher;
@ -19,8 +19,6 @@ pub struct H1Service<F, S, B, X = ExpectHandler, U = UpgradeHandler<F>> {
expect: X,
upgrade: Option<U>,
on_request: RefCell<Option<OnRequest>>,
#[allow(dead_code)]
handshake_timeout: Millis,
_t: marker::PhantomData<(F, B)>,
}
@ -38,13 +36,12 @@ where
service: U,
) -> Self {
H1Service {
cfg,
srv: service.into_factory(),
expect: ExpectHandler,
upgrade: None,
on_request: RefCell::new(None),
handshake_timeout: cfg.0.ssl_handshake_timeout,
_t: marker::PhantomData,
cfg,
}
}
}
@ -84,7 +81,7 @@ mod openssl {
InitError = (),
> {
Acceptor::new(acceptor)
.timeout(self.handshake_timeout)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(SslError::Ssl)
.map_init_err(|_| panic!())
@ -130,7 +127,7 @@ mod rustls {
InitError = (),
> {
Acceptor::from(config)
.timeout(self.handshake_timeout)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(|e| SslError::Ssl(Box::new(e)))
.map_init_err(|_| panic!())
@ -160,7 +157,6 @@ where
srv: self.srv,
upgrade: self.upgrade,
on_request: self.on_request,
handshake_timeout: self.handshake_timeout,
_t: marker::PhantomData,
}
}
@ -177,7 +173,6 @@ where
srv: self.srv,
expect: self.expect,
on_request: self.on_request,
handshake_timeout: self.handshake_timeout,
_t: marker::PhantomData,
}
}

View file

@ -70,7 +70,7 @@ mod openssl {
InitError = S::InitError,
> {
Acceptor::new(acceptor)
.timeout(self.cfg.0.ssl_handshake_timeout)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(SslError::Ssl)
.map_init_err(|_| panic!())
@ -109,7 +109,7 @@ mod rustls {
config.alpn_protocols = protos;
Acceptor::from(config)
.timeout(self.cfg.0.ssl_handshake_timeout)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(|e| SslError::Ssl(Box::new(e)))
.map_init_err(|_| panic!())

View file

@ -3,12 +3,11 @@ use std::{cell, error, fmt, future, marker, pin::Pin, rc::Rc};
use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::time::{Millis, Seconds};
use crate::util::BoxFuture;
use super::body::MessageBody;
use super::builder::HttpServiceBuilder;
use super::config::{DispatcherConfig, KeepAlive, OnRequest, ServiceConfig};
use super::config::{DispatcherConfig, OnRequest, ServiceConfig};
use super::error::{DispatchError, ResponseError};
use super::request::Request;
use super::response::Response;
@ -36,6 +35,12 @@ where
pub fn build() -> HttpServiceBuilder<F, S> {
HttpServiceBuilder::new()
}
#[doc(hidden)]
/// Create builder for `HttpService` instance.
pub fn build_with_config(cfg: ServiceConfig) -> HttpServiceBuilder<F, S> {
HttpServiceBuilder::with_config(cfg)
}
}
impl<F, S, B> HttpService<F, S, B>
@ -49,13 +54,7 @@ where
{
/// Create new `HttpService` instance.
pub fn new<U: IntoServiceFactory<S, Request>>(service: U) -> Self {
let cfg = ServiceConfig::new(
KeepAlive::Timeout(Seconds(5)),
Millis(5_000),
Seconds::ONE,
Millis(5_000),
ntex_h2::Config::server(),
);
let cfg = ServiceConfig::default();
HttpService {
cfg,
@ -175,7 +174,7 @@ mod openssl {
InitError = (),
> {
Acceptor::new(acceptor)
.timeout(self.cfg.0.ssl_handshake_timeout)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(SslError::Ssl)
.map_init_err(|_| panic!())
@ -222,7 +221,7 @@ mod rustls {
config.alpn_protocols = protos;
Acceptor::from(config)
.timeout(self.cfg.0.ssl_handshake_timeout)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(|e| SslError::Ssl(Box::new(e)))
.map_init_err(|_| panic!())

View file

@ -613,21 +613,21 @@ where
let cfg =
AppConfig::new(false, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.h1(map_config(factory(), move |_| cfg.clone()))
}),
HttpVer::Http2 => builder.listen("test", tcp, move |_| {
let cfg =
AppConfig::new(false, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.h2(map_config(factory(), move |_| cfg.clone()))
}),
HttpVer::Both => builder.listen("test", tcp, move |_| {
let cfg =
AppConfig::new(false, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.finish(map_config(factory(), move |_| cfg.clone()))
}),
},
@ -637,7 +637,7 @@ where
let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.h1(map_config(factory(), move |_| cfg.clone()))
.openssl(acceptor.clone())
}),
@ -645,7 +645,7 @@ where
let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.h2(map_config(factory(), move |_| cfg.clone()))
.openssl(acceptor.clone())
}),
@ -653,7 +653,7 @@ where
let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.finish(map_config(factory(), move |_| cfg.clone()))
.openssl(acceptor.clone())
}),
@ -664,7 +664,7 @@ where
let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.h1(map_config(factory(), move |_| cfg.clone()))
.rustls(config.clone())
}),
@ -672,7 +672,7 @@ where
let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.h2(map_config(factory(), move |_| cfg.clone()))
.rustls(config.clone())
}),
@ -680,7 +680,7 @@ where
let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build()
.client_timeout(ctimeout)
.headers_read_rate(ctimeout, Seconds::ZERO, 256)
.finish(map_config(factory(), move |_| cfg.clone()))
.rustls(config.clone())
}),

View file

@ -17,8 +17,8 @@ use ntex::{service::fn_service, util::Bytes, util::Ready, web::error};
async fn test_h1() {
let srv = test_server(|| {
HttpService::build()
.headers_read_rate(Seconds(1), Seconds::ZERO, 256)
.keep_alive(KeepAlive::Disabled)
.client_timeout(Seconds(1))
.disconnect_timeout(Seconds(1))
.h1(|req: Request| {
assert!(req.peer_addr().is_some());
@ -35,8 +35,8 @@ async fn test_h1_2() {
let srv = test_server(|| {
HttpService::build()
.keep_alive(KeepAlive::Disabled)
.client_timeout(Seconds(1))
.disconnect_timeout(Seconds(1))
.headers_read_rate(Seconds(1), Seconds::ZERO, 256)
.finish(|req: Request| {
assert!(req.peer_addr().is_some());
assert_eq!(req.version(), Version::HTTP_11);
@ -147,14 +147,25 @@ async fn test_chunked_payload() {
#[ntex::test]
async fn test_slow_request() {
const DATA: &[u8] = b"GET /test/tests/test HTTP/1.1\r\n";
let srv = test_server(|| {
HttpService::build()
.client_timeout(Seconds(1))
.headers_read_rate(Seconds(1), Seconds(2), 4)
.finish(|_| Ready::Ok::<_, io::Error>(Response::Ok().finish()))
});
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n");
let _ = stream.write_all(DATA);
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeout"));
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(&DATA[..5]);
sleep(Millis(1100)).await;
let _ = stream.write_all(&DATA[5..20]);
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeout"));

View file

@ -51,9 +51,14 @@ impl Service<(Request, Io, h1::Codec)> for WsService {
io.encode((res, body::BodySize::None).into(), &codec)
.unwrap();
Dispatcher::new(io.seal(), ws::Codec::new(), service)
.await
.map_err(|_| panic!())
Dispatcher::with_config(
io.seal(),
ws::Codec::new(),
service,
&Default::default(),
)
.await
.map_err(|_| panic!())
};
Box::pin(fut)

View file

@ -3,7 +3,7 @@ use std::io;
use ntex::codec::BytesCodec;
use ntex::http::test::server as test_server;
use ntex::http::{body::BodySize, h1, HttpService, Request, Response};
use ntex::io::{DispatchItem, Dispatcher, Io};
use ntex::io::{DispatchItem, Dispatcher, DispatcherConfig, Io};
use ntex::service::{fn_factory_with_config, fn_service};
use ntex::web::{self, App, HttpRequest};
use ntex::ws::{self, handshake_response};
@ -40,7 +40,13 @@ async fn test_simple() {
.unwrap();
// start websocket service
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service).await
Dispatcher::with_config(
io.seal(),
ws::Codec::default(),
ws_service,
&Default::default(),
)
.await
}
})
.finish(|_| Ready::Ok::<_, io::Error>(Response::NotFound()))
@ -90,7 +96,13 @@ async fn test_transport() {
.unwrap();
// start websocket service
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service).await
Dispatcher::with_config(
io.seal(),
ws::Codec::default(),
ws_service,
&Default::default(),
)
.await
}
})
.finish(|_| Ready::Ok::<_, io::Error>(Response::NotFound()))
@ -119,9 +131,15 @@ async fn test_keepalive_timeout() {
.unwrap();
// start websocket service
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service)
.keepalive_timeout(Seconds::ZERO)
.await
let cfg = DispatcherConfig::default();
cfg.set_keepalive_timeout(Seconds::ZERO);
Dispatcher::with_config(
io.seal(),
ws::Codec::default(),
ws_service,
&cfg,
)
.await
}
})
.finish(|_| Ready::Ok::<_, io::Error>(Response::NotFound()))