Fix keep-alive timeout handling (#257)

This commit is contained in:
Nikolay Kim 2023-11-25 21:39:55 +06:00 committed by GitHub
parent 9813dff65d
commit ae766a5629
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 9 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.3.11] - 2023-11-25
* Fix keep-alive timeout handling
## [0.3.10] - 2023-11-23
* Refactor slow frame timeout handling

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.3.10"
version = "0.3.11"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -127,7 +127,8 @@ bitflags::bitflags! {
struct Flags: u8 {
const READY_ERR = 0b0001;
const IO_ERR = 0b0010;
const READ_TIMEOUT = 0b0100;
const KA_TIMEOUT = 0b0100;
const READ_TIMEOUT = 0b1000;
}
}
@ -205,9 +206,6 @@ where
F: IntoService<S, DispatchItem<U>>,
{
let io = IoBoxed::from(io);
// register keepalive timer
io.start_timer(cfg.keepalive_timeout());
io.set_disconnect_timeout(cfg.disconnect_timeout());
let pool = io.memory_pool().pool();
@ -259,8 +257,6 @@ where
///
/// By default keep-alive timeout is set to 30 seconds.
pub fn keepalive_timeout(self, timeout: Seconds) -> Self {
// register keepalive timer
self.inner.shared.io.start_timer(timeout.into());
self.inner.cfg.set_keepalive_timeout(timeout);
self
}
@ -530,11 +526,23 @@ where
}
fn update_timer(&mut self, decoded: &Decoded<<U as Decoder>::Item>) {
log::debug!(
"update timer, item: {:?}, remains: {:?}, consumed: {:?}, flags: {:?}",
decoded.item.is_some(),
decoded.remains,
decoded.consumed,
self.flags
);
// got parsed frame
if decoded.item.is_some() {
self.flags.remove(Flags::READ_TIMEOUT);
self.read_remains = 0;
self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
} else if self.read_remains == 0 && decoded.remains == 0 {
// no new data, start keep-alive timer
if decoded.remains == 0 {
if !self.flags.contains(Flags::KA_TIMEOUT) {
log::debug!("Start keep-alive timer {:?}", self.cfg.keepalive_timeout());
self.flags.insert(Flags::KA_TIMEOUT);
self.shared.io.start_timer(self.cfg.keepalive_timeout());
}
} else if self.flags.contains(Flags::READ_TIMEOUT) {
@ -543,6 +551,7 @@ where
} else if let Some((timeout, max, _)) = self.cfg.frame_read_rate() {
// we got new data but not enough to parse single frame
// start read timer
self.flags.remove(Flags::KA_TIMEOUT);
self.flags.insert(Flags::READ_TIMEOUT);
self.read_remains = decoded.remains as u32;
@ -555,6 +564,8 @@ where
}
fn handle_timeout(&mut self) -> Result<(), DispatchItem<U>> {
log::debug!("handle timeout, flags: {:?}", self.flags);
// check read timer
if self.flags.contains(Flags::READ_TIMEOUT) {
if let Some((timeout, max, rate)) = self.cfg.frame_read_rate() {
@ -1017,6 +1028,55 @@ mod tests {
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
}
#[ntex::test]
async fn test_keepalive2() {
let _ = env_logger::try_init();
let (client, server) = IoTest::create();
client.remote_buffer_cap(1024);
let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();
let (disp, state) = Dispatcher::debug(
server,
BCodec(8),
ntex_service::fn_service(move |msg: DispatchItem<BCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::KeepAliveTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),
}
Ok(None)
}
}),
);
spawn(async move {
disp.inner
.cfg
.set_keepalive_timeout(Seconds(1))
.set_frame_read_rate(Seconds(1), Seconds(2), 2);
let _ = disp.await;
});
client.write("12345678");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"12345678"));
sleep(Millis(1250)).await;
// write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags();
assert!(flags.contains(Flags::IO_STOPPING));
assert!(client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
}
#[ntex::test]
async fn test_read_timeout() {
let (client, server) = IoTest::create();