Refactor slow frame timeout handling (#256)

This commit is contained in:
Nikolay Kim 2023-11-23 19:03:55 +06:00 committed by GitHub
parent 111450539e
commit 9813dff65d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 100 additions and 106 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.3.10] - 2023-11-23
* Refactor slow frame timeout handling
## [0.3.9] - 2023-11-21 ## [0.3.9] - 2023-11-21
* Remove slow frame timer if service is not ready * Remove slow frame timer if service is not ready

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.3.9" version = "0.3.10"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -9,8 +9,6 @@ use ntex_util::{future::Either, ready, spawn};
use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError}; use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError};
const ONE_SEC: time::Duration = time::Duration::from_secs(1);
type Response<U> = <U as Encoder>::Item; type Response<U> = <U as Encoder>::Item;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -92,8 +90,8 @@ impl DispatcherConfig {
/// Set read rate parameters for single frame. /// Set read rate parameters for single frame.
/// ///
/// Set max timeout for reading single frame. If the client /// Set read timeout, max timeout and rate for reading payload. If the client
/// sends `rate` amount of data, increase the timeout by 1 second for every. /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
/// But no more than `max_timeout` timeout. /// But no more than `max_timeout` timeout.
/// ///
/// By default frame read rate is disabled. /// By default frame read rate is disabled.
@ -144,7 +142,8 @@ where
shared: Rc<DispatcherShared<S, U>>, shared: Rc<DispatcherShared<S, U>>,
pool: Pool, pool: Pool,
cfg: DispatcherConfig, cfg: DispatcherConfig,
read_bytes: u32, read_remains: u32,
read_remains_prev: u32,
read_max_timeout: time::Instant, read_max_timeout: time::Instant,
} }
@ -227,7 +226,8 @@ where
cfg: cfg.clone(), cfg: cfg.clone(),
error: None, error: None,
flags: Flags::empty(), flags: Flags::empty(),
read_bytes: 0, read_remains: 0,
read_remains_prev: 0,
read_max_timeout: now(), read_max_timeout: now(),
st: DispatcherState::Processing, st: DispatcherState::Processing,
}, },
@ -324,58 +324,48 @@ where
let item = match ready!(slf.poll_service(cx)) { let item = match ready!(slf.poll_service(cx)) {
PollService::Ready => { PollService::Ready => {
// decode incoming bytes if buffer is ready // decode incoming bytes if buffer is ready
if slf.shared.io.is_closed() { match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) {
log::trace!("io has been closed, stop dispatcher"); Ok(decoded) => {
slf.st = DispatcherState::Stop; slf.update_timer(&decoded);
DispatchItem::Disconnect(None) if let Some(el) = decoded.item {
} else { DispatchItem::Item(el)
match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) } else {
{ return Poll::Pending;
Ok(decoded) => {
slf.update_timer(&decoded);
if let Some(el) = decoded.item {
DispatchItem::Item(el)
} else {
return Poll::Pending;
}
} }
Err(RecvError::KeepAlive) => { }
log::trace!( Err(RecvError::KeepAlive) => {
"keep-alive error, stopping dispatcher" if let Err(err) = slf.handle_timeout() {
);
slf.st = DispatcherState::Stop;
if slf.flags.contains(Flags::READ_TIMEOUT) {
DispatchItem::ReadTimeout
} else {
DispatchItem::KeepAliveTimeout
}
}
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
slf.st = DispatcherState::Stop; slf.st = DispatcherState::Stop;
err
} else {
continue; continue;
} }
Err(RecvError::WriteBackpressure) => { }
// instruct write task to notify dispatcher when data is flushed Err(RecvError::Stop) => {
slf.st = DispatcherState::Backpressure; log::trace!("dispatcher is instructed to stop");
DispatchItem::WBackPressureEnabled slf.st = DispatcherState::Stop;
} continue;
Err(RecvError::Decoder(err)) => { }
log::trace!( Err(RecvError::WriteBackpressure) => {
"decoder error, stopping dispatcher: {:?}", // instruct write task to notify dispatcher when data is flushed
err slf.st = DispatcherState::Backpressure;
); DispatchItem::WBackPressureEnabled
slf.st = DispatcherState::Stop; }
DispatchItem::DecoderError(err) Err(RecvError::Decoder(err)) => {
} log::trace!(
Err(RecvError::PeerGone(err)) => { "decoder error, stopping dispatcher: {:?}",
log::trace!( err
"peer is gone, stopping dispatcher: {:?}", );
err slf.st = DispatcherState::Stop;
); DispatchItem::DecoderError(err)
slf.st = DispatcherState::Stop; }
DispatchItem::Disconnect(err) Err(RecvError::PeerGone(err)) => {
} log::trace!(
"peer is gone, stopping dispatcher: {:?}",
err
);
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(err)
} }
} }
} }
@ -499,9 +489,7 @@ where
log::trace!("service is not ready, register dispatch task"); log::trace!("service is not ready, register dispatch task");
// remove all timers // remove all timers
if self.flags.contains(Flags::READ_TIMEOUT) { self.flags.remove(Flags::READ_TIMEOUT);
self.flags.remove(Flags::READ_TIMEOUT);
}
self.shared.io.stop_timer(); self.shared.io.stop_timer();
match ready!(self.shared.io.poll_read_pause(cx)) { match ready!(self.shared.io.poll_read_pause(cx)) {
@ -542,55 +530,56 @@ where
} }
fn update_timer(&mut self, decoded: &Decoded<<U as Decoder>::Item>) { fn update_timer(&mut self, decoded: &Decoded<<U as Decoder>::Item>) {
// we got parsed frame // got parsed frame
if decoded.item.is_some() { if decoded.item.is_some() {
// remove all timers self.flags.remove(Flags::READ_TIMEOUT);
if self.flags.contains(Flags::READ_TIMEOUT) { // no new data, start keep-alive timer
self.flags.remove(Flags::READ_TIMEOUT);
}
self.shared.io.stop_timer();
} else if self.flags.contains(Flags::READ_TIMEOUT) {
// update read timer
if let Some((_, max, rate)) = self.cfg.frame_read_rate() {
let bytes = decoded.remains as u32;
let delta = if bytes > self.read_bytes {
(bytes - self.read_bytes).try_into().unwrap_or(u16::MAX)
} else {
bytes.try_into().unwrap_or(u16::MAX)
};
// read rate higher than min rate
if delta >= rate {
let n = now();
let next = self.shared.io.timer_deadline() + ONE_SEC;
let new_timeout = if n >= next { ONE_SEC } else { next - n };
// extend timeout
if max.is_zero() || (n + new_timeout) <= self.read_max_timeout {
self.shared.io.stop_timer();
self.shared.io.start_timer(new_timeout);
// store current buf size for future rate calculation
self.read_bytes = bytes;
}
}
}
} else {
// no new data then start keep-alive timer
if decoded.remains == 0 { if decoded.remains == 0 {
self.shared.io.start_timer(self.cfg.keepalive_timeout()); self.shared.io.start_timer(self.cfg.keepalive_timeout());
} else if let Some((period, max, _)) = self.cfg.frame_read_rate() { }
// we got new data but not enough to parse single frame } else if self.flags.contains(Flags::READ_TIMEOUT) {
// start read timer // received new data but not enough for parsing complete frame
self.flags.insert(Flags::READ_TIMEOUT); self.read_remains = decoded.remains as u32;
} else if let Some((timeout, max, _)) = self.cfg.frame_read_rate() {
// we got new data but not enough to parse single frame
// start read timer
self.flags.insert(Flags::READ_TIMEOUT);
self.read_bytes = decoded.remains as u32; self.read_remains = decoded.remains as u32;
self.shared.io.start_timer(period); self.read_remains_prev = 0;
if !max.is_zero() { if !max.is_zero() {
self.read_max_timeout = now() + max; self.read_max_timeout = now() + max;
}
self.shared.io.start_timer(timeout);
}
}
fn handle_timeout(&mut self) -> Result<(), DispatchItem<U>> {
// check read timer
if self.flags.contains(Flags::READ_TIMEOUT) {
if let Some((timeout, max, rate)) = self.cfg.frame_read_rate() {
let total = (self.read_remains - self.read_remains_prev)
.try_into()
.unwrap_or(u16::MAX);
// read rate, start timer for next period
if total > rate {
self.read_remains_prev = self.read_remains;
self.read_remains = 0;
if max.is_zero() || (!max.is_zero() && now() < self.read_max_timeout) {
log::trace!("Frame read rate {:?}, extend timer", total);
self.shared.io.start_timer(timeout);
return Ok(());
}
log::trace!("Max payload timeout has been reached");
} }
return Err(DispatchItem::ReadTimeout);
} }
} }
log::trace!("Keep-alive error, stopping dispatcher");
Err(DispatchItem::KeepAliveTimeout)
} }
} }
@ -690,7 +679,8 @@ mod tests {
flags: super::Flags::empty(), flags: super::Flags::empty(),
st: DispatcherState::Processing, st: DispatcherState::Processing,
read_max_timeout: time::Instant::now(), read_max_timeout: time::Instant::now(),
read_bytes: 0, read_remains: 0,
read_remains_prev: 0,
pool, pool,
shared, shared,
cfg, cfg,

View file

@ -58,7 +58,7 @@ ntex-util = "0.3.4"
ntex-bytes = "0.1.21" ntex-bytes = "0.1.21"
ntex-h2 = "0.4.4" ntex-h2 = "0.4.4"
ntex-rt = "0.4.11" ntex-rt = "0.4.11"
ntex-io = "0.3.9" ntex-io = "0.3.10"
ntex-tls = "0.3.2" ntex-tls = "0.3.2"
ntex-tokio = { version = "0.3.1", optional = true } ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.1", optional = true } ntex-glommio = { version = "0.3.1", optional = true }

View file

@ -123,8 +123,8 @@ where
/// Set read rate parameters for request's payload. /// Set read rate parameters for request's payload.
/// ///
/// Set max timeout for reading payload. If the client /// Set read timeout, max timeout and rate for reading payload. If the client
/// sends `rate` amount of data, increase the timeout by 1 second for every. /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
/// But no more than `max_timeout` timeout. /// But no more than `max_timeout` timeout.
/// ///
/// By default payload read rate is disabled. /// By default payload read rate is disabled.

View file

@ -204,8 +204,8 @@ impl ServiceConfig {
/// Set read rate parameters for request's payload. /// Set read rate parameters for request's payload.
/// ///
/// Set time pariod for reading payload. Client must /// Set read timeout, max timeout and rate for reading payload. If the client
/// sends `rate` amount of data per one time period. /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
/// But no more than `max_timeout` timeout. /// But no more than `max_timeout` timeout.
/// ///
/// By default payload read rate is disabled. /// By default payload read rate is disabled.