mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-05 13:57:39 +03:00
Fix KeepAlive::Os support for h1 dispatcher
This commit is contained in:
parent
ff33374810
commit
96ca5c3542
3 changed files with 22 additions and 25 deletions
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
## [0.2.0-b.12] - 2021-02-18
|
## [0.2.0-b.12] - 2021-02-18
|
||||||
|
|
||||||
|
* http: Fix KeepAlive::Os support for h1 dispatcher
|
||||||
|
|
||||||
* Handle EINTR in server accept loop
|
* Handle EINTR in server accept loop
|
||||||
|
|
||||||
* Fix double registation for accept back-pressure
|
* Fix double registation for accept back-pressure
|
||||||
|
|
|
@ -99,7 +99,7 @@ pub(super) struct DispatcherConfig<S, X, U> {
|
||||||
pub(super) service: S,
|
pub(super) service: S,
|
||||||
pub(super) expect: X,
|
pub(super) expect: X,
|
||||||
pub(super) upgrade: Option<U>,
|
pub(super) upgrade: Option<U>,
|
||||||
pub(super) keep_alive: u64,
|
pub(super) keep_alive: Duration,
|
||||||
pub(super) client_timeout: u64,
|
pub(super) client_timeout: u64,
|
||||||
pub(super) client_disconnect: u64,
|
pub(super) client_disconnect: u64,
|
||||||
pub(super) ka_enabled: bool,
|
pub(super) ka_enabled: bool,
|
||||||
|
@ -118,7 +118,7 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
|
||||||
service,
|
service,
|
||||||
expect,
|
expect,
|
||||||
upgrade,
|
upgrade,
|
||||||
keep_alive: cfg.0.keep_alive,
|
keep_alive: Duration::from_secs(cfg.0.keep_alive),
|
||||||
client_timeout: cfg.0.client_timeout,
|
client_timeout: cfg.0.client_timeout,
|
||||||
client_disconnect: cfg.0.client_disconnect,
|
client_disconnect: cfg.0.client_disconnect,
|
||||||
ka_enabled: cfg.0.ka_enabled,
|
ka_enabled: cfg.0.ka_enabled,
|
||||||
|
@ -134,10 +134,8 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
|
||||||
|
|
||||||
/// Return keep-alive timer delay is configured.
|
/// Return keep-alive timer delay is configured.
|
||||||
pub(super) fn keep_alive_timer(&self) -> Option<Delay> {
|
pub(super) fn keep_alive_timer(&self) -> Option<Delay> {
|
||||||
if self.keep_alive != 0 {
|
if self.keep_alive.as_secs() != 0 {
|
||||||
Some(delay_until(
|
Some(delay_until(self.timer.now() + self.keep_alive))
|
||||||
self.timer.now() + Duration::from_secs(self.keep_alive),
|
|
||||||
))
|
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -145,8 +143,8 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
|
||||||
|
|
||||||
/// Keep-alive expire time
|
/// Keep-alive expire time
|
||||||
pub(super) fn keep_alive_expire(&self) -> Option<Instant> {
|
pub(super) fn keep_alive_expire(&self) -> Option<Instant> {
|
||||||
if self.keep_alive != 0 {
|
if self.keep_alive.as_secs() != 0 {
|
||||||
Some(self.timer.now() + Duration::from_secs(self.keep_alive))
|
Some(self.timer.now() + self.keep_alive)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,9 +111,7 @@ where
|
||||||
on_connect_data: Option<Box<dyn DataFactory>>,
|
on_connect_data: Option<Box<dyn DataFactory>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let codec = Codec::new(config.timer.clone(), config.keep_alive_enabled());
|
let codec = Codec::new(config.timer.clone(), config.keep_alive_enabled());
|
||||||
|
let state = IoState::new().disconnect_timeout(config.client_disconnect as u16);
|
||||||
let state = IoState::new();
|
|
||||||
state.set_disconnect_timeout(config.client_disconnect as u16);
|
|
||||||
|
|
||||||
let mut expire = config.timer_h1.now();
|
let mut expire = config.timer_h1.now();
|
||||||
let io = Rc::new(RefCell::new(io));
|
let io = Rc::new(RefCell::new(io));
|
||||||
|
@ -171,8 +169,6 @@ where
|
||||||
let next = match this.call.project() {
|
let next = match this.call.project() {
|
||||||
// handle SERVICE call
|
// handle SERVICE call
|
||||||
CallStateProject::Service { fut } => {
|
CallStateProject::Service { fut } => {
|
||||||
// we have to loop because of read back-pressure,
|
|
||||||
// check Poll::Pending processing
|
|
||||||
match fut.poll(cx) {
|
match fut.poll(cx) {
|
||||||
Poll::Ready(result) => match result {
|
Poll::Ready(result) => match result {
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
|
@ -314,15 +310,14 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.head().expect() {
|
if req.head().expect() {
|
||||||
// call service
|
// Handle normal requests with EXPECT: 100-Continue` header
|
||||||
*this.st = State::Call;
|
*this.st = State::Call;
|
||||||
// Handle `EXPECT: 100-Continue` header
|
|
||||||
this.call.set(CallState::Expect {
|
this.call.set(CallState::Expect {
|
||||||
fut: this.inner.config.expect.call(req),
|
fut: this.inner.config.expect.call(req),
|
||||||
});
|
});
|
||||||
} else if upgrade {
|
} else if upgrade {
|
||||||
log::trace!("prep io for upgrade handler");
|
|
||||||
// Handle UPGRADE request
|
// Handle UPGRADE request
|
||||||
|
log::trace!("prep io for upgrade handler");
|
||||||
this.inner.state.dsp_stop_io(cx.waker());
|
this.inner.state.dsp_stop_io(cx.waker());
|
||||||
*this.st = State::Upgrade(Some(req));
|
*this.st = State::Upgrade(Some(req));
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
|
@ -352,8 +347,8 @@ where
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::trace!("malformed request: {:?}", err);
|
|
||||||
// Malformed requests, respond with 400
|
// Malformed requests, respond with 400
|
||||||
|
log::trace!("malformed request: {:?}", err);
|
||||||
let (res, body) =
|
let (res, body) =
|
||||||
Response::BadRequest().finish().into_parts();
|
Response::BadRequest().finish().into_parts();
|
||||||
this.inner.error = Some(DispatchError::Parse(err));
|
this.inner.error = Some(DispatchError::Parse(err));
|
||||||
|
@ -480,9 +475,10 @@ where
|
||||||
|
|
||||||
fn reset_keepalive(&mut self) {
|
fn reset_keepalive(&mut self) {
|
||||||
// re-register keep-alive
|
// re-register keep-alive
|
||||||
if self.flags.contains(Flags::KEEPALIVE) {
|
if self.flags.contains(Flags::KEEPALIVE) && self.config.keep_alive.as_secs() != 0
|
||||||
let expire = self.config.timer_h1.now()
|
{
|
||||||
+ time::Duration::from_secs(self.config.keep_alive);
|
let expire = self.config.timer_h1.now() + self.config.keep_alive;
|
||||||
|
if expire != self.expire {
|
||||||
self.config
|
self.config
|
||||||
.timer_h1
|
.timer_h1
|
||||||
.register(expire, self.expire, &self.state);
|
.register(expire, self.expire, &self.state);
|
||||||
|
@ -490,6 +486,7 @@ where
|
||||||
self.state.reset_keepalive();
|
self.state.reset_keepalive();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_error<E>(&mut self, err: E, critical: bool) -> State<B>
|
fn handle_error<E>(&mut self, err: E, critical: bool) -> State<B>
|
||||||
where
|
where
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue