From ae766a5629a0094493e960a1d0fbebdba2c20f7d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 25 Nov 2023 21:39:55 +0600 Subject: [PATCH] Fix keep-alive timeout handling (#257) --- ntex-io/CHANGES.md | 4 +++ ntex-io/Cargo.toml | 2 +- ntex-io/src/dispatcher.rs | 76 ++++++++++++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index cc20efbb..2095a8a9 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.11] - 2023-11-25 + +* Fix keep-alive timeout handling + ## [0.3.10] - 2023-11-23 * Refactor slow frame timeout handling diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index fc010e3a..caea8382 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.10" +version = "0.3.11" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 883202b5..df652352 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -127,7 +127,8 @@ bitflags::bitflags! { struct Flags: u8 { const READY_ERR = 0b0001; const IO_ERR = 0b0010; - const READ_TIMEOUT = 0b0100; + const KA_TIMEOUT = 0b0100; + const READ_TIMEOUT = 0b1000; } } @@ -205,9 +206,6 @@ where F: IntoService>, { let io = IoBoxed::from(io); - - // register keepalive timer - io.start_timer(cfg.keepalive_timeout()); io.set_disconnect_timeout(cfg.disconnect_timeout()); let pool = io.memory_pool().pool(); @@ -259,8 +257,6 @@ where /// /// By default keep-alive timeout is set to 30 seconds. pub fn keepalive_timeout(self, timeout: Seconds) -> Self { - // register keepalive timer - self.inner.shared.io.start_timer(timeout.into()); self.inner.cfg.set_keepalive_timeout(timeout); self } @@ -530,11 +526,23 @@ where } fn update_timer(&mut self, decoded: &Decoded<::Item>) { + log::debug!( + "update timer, item: {:?}, remains: {:?}, consumed: {:?}, flags: {:?}", + decoded.item.is_some(), + decoded.remains, + decoded.consumed, + self.flags + ); + // got parsed frame if decoded.item.is_some() { - self.flags.remove(Flags::READ_TIMEOUT); + self.read_remains = 0; + self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); + } else if self.read_remains == 0 && decoded.remains == 0 { // no new data, start keep-alive timer - if decoded.remains == 0 { + if !self.flags.contains(Flags::KA_TIMEOUT) { + log::debug!("Start keep-alive timer {:?}", self.cfg.keepalive_timeout()); + self.flags.insert(Flags::KA_TIMEOUT); self.shared.io.start_timer(self.cfg.keepalive_timeout()); } } else if self.flags.contains(Flags::READ_TIMEOUT) { @@ -543,6 +551,7 @@ where } else if let Some((timeout, max, _)) = self.cfg.frame_read_rate() { // we got new data but not enough to parse single frame // start read timer + self.flags.remove(Flags::KA_TIMEOUT); self.flags.insert(Flags::READ_TIMEOUT); self.read_remains = decoded.remains as u32; @@ -555,6 +564,8 @@ where } fn handle_timeout(&mut self) -> Result<(), DispatchItem> { + log::debug!("handle timeout, flags: {:?}", self.flags); + // check read timer if self.flags.contains(Flags::READ_TIMEOUT) { if let Some((timeout, max, rate)) = self.cfg.frame_read_rate() { @@ -1017,6 +1028,55 @@ mod tests { assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]); } + #[ntex::test] + async fn test_keepalive2() { + let _ = env_logger::try_init(); + let (client, server) = IoTest::create(); + client.remote_buffer_cap(1024); + + let data = Arc::new(Mutex::new(RefCell::new(Vec::new()))); + let data2 = data.clone(); + + let (disp, state) = Dispatcher::debug( + server, + BCodec(8), + ntex_service::fn_service(move |msg: DispatchItem| { + let data = data2.clone(); + async move { + match msg { + DispatchItem::Item(bytes) => { + data.lock().unwrap().borrow_mut().push(0); + return Ok::<_, ()>(Some(bytes.freeze())); + } + DispatchItem::KeepAliveTimeout => { + data.lock().unwrap().borrow_mut().push(1); + } + _ => (), + } + Ok(None) + } + }), + ); + spawn(async move { + disp.inner + .cfg + .set_keepalive_timeout(Seconds(1)) + .set_frame_read_rate(Seconds(1), Seconds(2), 2); + let _ = disp.await; + }); + + client.write("12345678"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"12345678")); + sleep(Millis(1250)).await; + + // write side must be closed, dispatcher should fail with keep-alive + let flags = state.flags(); + assert!(flags.contains(Flags::IO_STOPPING)); + assert!(client.is_closed()); + assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]); + } + #[ntex::test] async fn test_read_timeout() { let (client, server) = IoTest::create();