diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 124b5693..cc20efbb 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.10] - 2023-11-23 + +* Refactor slow frame timeout handling + ## [0.3.9] - 2023-11-21 * Remove slow frame timer if service is not ready diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 763f2e3a..fc010e3a 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.9" +version = "0.3.10" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 1d362c2b..883202b5 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -9,8 +9,6 @@ use ntex_util::{future::Either, ready, spawn}; use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError}; -const ONE_SEC: time::Duration = time::Duration::from_secs(1); - type Response = ::Item; #[derive(Clone, Debug)] @@ -92,8 +90,8 @@ impl DispatcherConfig { /// Set read rate parameters for single frame. /// - /// Set max timeout for reading single frame. If the client - /// sends `rate` amount of data, increase the timeout by 1 second for every. + /// Set read timeout, max timeout and rate for reading payload. If the client + /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds. /// But no more than `max_timeout` timeout. /// /// By default frame read rate is disabled. @@ -144,7 +142,8 @@ where shared: Rc>, pool: Pool, cfg: DispatcherConfig, - read_bytes: u32, + read_remains: u32, + read_remains_prev: u32, read_max_timeout: time::Instant, } @@ -227,7 +226,8 @@ where cfg: cfg.clone(), error: None, flags: Flags::empty(), - read_bytes: 0, + read_remains: 0, + read_remains_prev: 0, read_max_timeout: now(), st: DispatcherState::Processing, }, @@ -324,58 +324,48 @@ where let item = match ready!(slf.poll_service(cx)) { PollService::Ready => { // decode incoming bytes if buffer is ready - if slf.shared.io.is_closed() { - log::trace!("io has been closed, stop dispatcher"); - slf.st = DispatcherState::Stop; - DispatchItem::Disconnect(None) - } else { - match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) - { - Ok(decoded) => { - slf.update_timer(&decoded); - if let Some(el) = decoded.item { - DispatchItem::Item(el) - } else { - return Poll::Pending; - } + match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) { + Ok(decoded) => { + slf.update_timer(&decoded); + if let Some(el) = decoded.item { + DispatchItem::Item(el) + } else { + return Poll::Pending; } - Err(RecvError::KeepAlive) => { - log::trace!( - "keep-alive error, stopping dispatcher" - ); - slf.st = DispatcherState::Stop; - if slf.flags.contains(Flags::READ_TIMEOUT) { - DispatchItem::ReadTimeout - } else { - DispatchItem::KeepAliveTimeout - } - } - Err(RecvError::Stop) => { - log::trace!("dispatcher is instructed to stop"); + } + Err(RecvError::KeepAlive) => { + if let Err(err) = slf.handle_timeout() { slf.st = DispatcherState::Stop; + err + } else { continue; } - Err(RecvError::WriteBackpressure) => { - // instruct write task to notify dispatcher when data is flushed - slf.st = DispatcherState::Backpressure; - DispatchItem::WBackPressureEnabled - } - Err(RecvError::Decoder(err)) => { - log::trace!( - "decoder error, stopping dispatcher: {:?}", - err - ); - slf.st = DispatcherState::Stop; - DispatchItem::DecoderError(err) - } - Err(RecvError::PeerGone(err)) => { - log::trace!( - "peer is gone, stopping dispatcher: {:?}", - err - ); - slf.st = DispatcherState::Stop; - DispatchItem::Disconnect(err) - } + } + Err(RecvError::Stop) => { + log::trace!("dispatcher is instructed to stop"); + slf.st = DispatcherState::Stop; + continue; + } + Err(RecvError::WriteBackpressure) => { + // instruct write task to notify dispatcher when data is flushed + slf.st = DispatcherState::Backpressure; + DispatchItem::WBackPressureEnabled + } + Err(RecvError::Decoder(err)) => { + log::trace!( + "decoder error, stopping dispatcher: {:?}", + err + ); + slf.st = DispatcherState::Stop; + DispatchItem::DecoderError(err) + } + Err(RecvError::PeerGone(err)) => { + log::trace!( + "peer is gone, stopping dispatcher: {:?}", + err + ); + slf.st = DispatcherState::Stop; + DispatchItem::Disconnect(err) } } } @@ -499,9 +489,7 @@ where log::trace!("service is not ready, register dispatch task"); // remove all timers - if self.flags.contains(Flags::READ_TIMEOUT) { - self.flags.remove(Flags::READ_TIMEOUT); - } + self.flags.remove(Flags::READ_TIMEOUT); self.shared.io.stop_timer(); match ready!(self.shared.io.poll_read_pause(cx)) { @@ -542,55 +530,56 @@ where } fn update_timer(&mut self, decoded: &Decoded<::Item>) { - // we got parsed frame + // got parsed frame if decoded.item.is_some() { - // remove all timers - if self.flags.contains(Flags::READ_TIMEOUT) { - self.flags.remove(Flags::READ_TIMEOUT); - } - self.shared.io.stop_timer(); - } else if self.flags.contains(Flags::READ_TIMEOUT) { - // update read timer - if let Some((_, max, rate)) = self.cfg.frame_read_rate() { - let bytes = decoded.remains as u32; - let delta = if bytes > self.read_bytes { - (bytes - self.read_bytes).try_into().unwrap_or(u16::MAX) - } else { - bytes.try_into().unwrap_or(u16::MAX) - }; - - // read rate higher than min rate - if delta >= rate { - let n = now(); - let next = self.shared.io.timer_deadline() + ONE_SEC; - let new_timeout = if n >= next { ONE_SEC } else { next - n }; - - // extend timeout - if max.is_zero() || (n + new_timeout) <= self.read_max_timeout { - self.shared.io.stop_timer(); - self.shared.io.start_timer(new_timeout); - - // store current buf size for future rate calculation - self.read_bytes = bytes; - } - } - } - } else { - // no new data then start keep-alive timer + self.flags.remove(Flags::READ_TIMEOUT); + // no new data, start keep-alive timer if decoded.remains == 0 { self.shared.io.start_timer(self.cfg.keepalive_timeout()); - } else if let Some((period, max, _)) = self.cfg.frame_read_rate() { - // we got new data but not enough to parse single frame - // start read timer - self.flags.insert(Flags::READ_TIMEOUT); + } + } else if self.flags.contains(Flags::READ_TIMEOUT) { + // received new data but not enough for parsing complete frame + self.read_remains = decoded.remains as u32; + } else if let Some((timeout, max, _)) = self.cfg.frame_read_rate() { + // we got new data but not enough to parse single frame + // start read timer + self.flags.insert(Flags::READ_TIMEOUT); - self.read_bytes = decoded.remains as u32; - self.shared.io.start_timer(period); - if !max.is_zero() { - self.read_max_timeout = now() + max; + self.read_remains = decoded.remains as u32; + self.read_remains_prev = 0; + if !max.is_zero() { + self.read_max_timeout = now() + max; + } + self.shared.io.start_timer(timeout); + } + } + + fn handle_timeout(&mut self) -> Result<(), DispatchItem> { + // check read timer + if self.flags.contains(Flags::READ_TIMEOUT) { + if let Some((timeout, max, rate)) = self.cfg.frame_read_rate() { + let total = (self.read_remains - self.read_remains_prev) + .try_into() + .unwrap_or(u16::MAX); + + // read rate, start timer for next period + if total > rate { + self.read_remains_prev = self.read_remains; + self.read_remains = 0; + + if max.is_zero() || (!max.is_zero() && now() < self.read_max_timeout) { + log::trace!("Frame read rate {:?}, extend timer", total); + self.shared.io.start_timer(timeout); + return Ok(()); + } + log::trace!("Max payload timeout has been reached"); } + return Err(DispatchItem::ReadTimeout); } } + + log::trace!("Keep-alive error, stopping dispatcher"); + Err(DispatchItem::KeepAliveTimeout) } } @@ -690,7 +679,8 @@ mod tests { flags: super::Flags::empty(), st: DispatcherState::Processing, read_max_timeout: time::Instant::now(), - read_bytes: 0, + read_remains: 0, + read_remains_prev: 0, pool, shared, cfg, diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 0e458d12..2406033c 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -58,7 +58,7 @@ ntex-util = "0.3.4" ntex-bytes = "0.1.21" ntex-h2 = "0.4.4" ntex-rt = "0.4.11" -ntex-io = "0.3.9" +ntex-io = "0.3.10" ntex-tls = "0.3.2" ntex-tokio = { version = "0.3.1", optional = true } ntex-glommio = { version = "0.3.1", optional = true } diff --git a/ntex/src/http/builder.rs b/ntex/src/http/builder.rs index fb87f3ba..859cdba5 100644 --- a/ntex/src/http/builder.rs +++ b/ntex/src/http/builder.rs @@ -123,8 +123,8 @@ where /// Set read rate parameters for request's payload. /// - /// Set max timeout for reading payload. If the client - /// sends `rate` amount of data, increase the timeout by 1 second for every. + /// Set read timeout, max timeout and rate for reading payload. If the client + /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds. /// But no more than `max_timeout` timeout. /// /// By default payload read rate is disabled. diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index a35ecbd1..9f743066 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -204,8 +204,8 @@ impl ServiceConfig { /// Set read rate parameters for request's payload. /// - /// Set time pariod for reading payload. Client must - /// sends `rate` amount of data per one time period. + /// Set read timeout, max timeout and rate for reading payload. If the client + /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds. /// But no more than `max_timeout` timeout. /// /// By default payload read rate is disabled.