From df613e6f2d9a75a47c5b9d9e128c96d189c977ee Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 10 Dec 2023 08:12:33 +0600 Subject: [PATCH] Fix KEEP-ALIVE timer handling (#264) --- ntex-io/CHANGES.md | 4 ++ ntex-io/Cargo.toml | 2 +- ntex-io/src/dispatcher.rs | 86 +++++++++++++++++++----- ntex-io/src/ioref.rs | 28 +++++--- ntex-io/src/timer.rs | 135 ++++++++++++++++++++++---------------- ntex/Cargo.toml | 2 +- 6 files changed, 172 insertions(+), 85 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 9c440558..a803f941 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.14] - 2023-12-10 + +* Fix KEEP-ALIVE timer handling + ## [0.3.13] - 2023-12-02 * Optimize KEEP-ALIVE timer diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index be6999ea..67d705d4 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.13" +version = "0.3.14" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 93d9ac6e..9f8201be 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -151,7 +151,7 @@ bitflags::bitflags! { const READY_ERR = 0b00001; const IO_ERR = 0b00010; const KA_ENABLED = 0b00100; - const NO_KA_TIMEOUT = 0b01000; + const KA_TIMEOUT = 0b01000; const READ_TIMEOUT = 0b10000; } } @@ -233,9 +233,9 @@ where io.set_disconnect_timeout(cfg.disconnect_timeout()); let flags = if cfg.keepalive_timeout_secs().is_zero() { - Flags::NO_KA_TIMEOUT + Flags::empty() } else { - Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT + Flags::KA_ENABLED }; let pool = io.memory_pool().pool(); @@ -526,21 +526,20 @@ where // got parsed frame if decoded.item.is_some() { self.read_remains = 0; - self.flags.remove(Flags::READ_TIMEOUT); + self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); } else if self.flags.contains(Flags::READ_TIMEOUT) { // received new data but not enough for parsing complete frame self.read_remains = decoded.remains as u32; } else if self.read_remains == 0 && decoded.remains == 0 { // no new data, start keep-alive timer - if self - .flags - .contains(Flags::NO_KA_TIMEOUT | Flags::KA_ENABLED) + if self.flags.contains(Flags::KA_ENABLED) + && !self.flags.contains(Flags::KA_TIMEOUT) { log::debug!( "Start keep-alive timer {:?}", self.cfg.keepalive_timeout_secs() ); - self.flags.remove(Flags::NO_KA_TIMEOUT); + self.flags.insert(Flags::KA_TIMEOUT); self.shared .io .start_timer_secs(self.cfg.keepalive_timeout_secs()); @@ -548,8 +547,7 @@ where } else if let Some((timeout, max, _)) = self.cfg.frame_read_rate_params() { // we got new data but not enough to parse single frame // start read timer - self.flags - .insert(Flags::READ_TIMEOUT | Flags::NO_KA_TIMEOUT); + self.flags.insert(Flags::READ_TIMEOUT); self.read_remains = decoded.remains as u32; self.read_remains_prev = 0; @@ -682,9 +680,9 @@ mod tests { state.set_disconnect_timeout(cfg.disconnect_timeout()); let flags = if cfg.keepalive_timeout_secs().is_zero() { - super::Flags::NO_KA_TIMEOUT + super::Flags::empty() } else { - super::Flags::KA_ENABLED | super::Flags::NO_KA_TIMEOUT + super::Flags::KA_ENABLED }; let inner = State(state.get_ref()); @@ -750,9 +748,7 @@ mod tests { client.close().await; assert!(client.is_server_dropped()); - assert!( - format!("{:?}", super::Flags::NO_KA_TIMEOUT.clone()).contains("NO_KA_TIMEOUT") - ); + assert!(format!("{:?}", super::Flags::KA_TIMEOUT.clone()).contains("KA_TIMEOUT")); } #[ntex::test] @@ -999,8 +995,6 @@ mod tests { #[ntex::test] async fn test_keepalive() { - let _ = env_logger::try_init(); - let (client, server) = IoTest::create(); client.remote_buffer_cap(1024); client.write("GET /test HTTP/1\r\n\r\n"); @@ -1099,6 +1093,64 @@ mod tests { assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]); } + /// Update keep-alive timer after receiving frame + #[ntex::test] + async fn test_keepalive3() { + let (client, server) = IoTest::create(); + client.remote_buffer_cap(1024); + + let data = Arc::new(Mutex::new(RefCell::new(Vec::new()))); + let data2 = data.clone(); + + let cfg = DispatcherConfig::default() + .set_keepalive_timeout(Seconds(2)) + .set_frame_read_rate(Seconds(1), Seconds(2), 2) + .clone(); + + let (disp, _) = Dispatcher::debug_cfg( + server, + BCodec(1), + ntex_service::fn_service(move |msg: DispatchItem| { + let data = data2.clone(); + async move { + match msg { + DispatchItem::Item(bytes) => { + data.lock().unwrap().borrow_mut().push(0); + return Ok::<_, ()>(Some(bytes.freeze())); + } + DispatchItem::KeepAliveTimeout => { + data.lock().unwrap().borrow_mut().push(1); + } + _ => (), + } + Ok(None) + } + }), + cfg, + ); + spawn(async move { + let _ = disp.await; + }); + + client.write("1"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"1")); + sleep(Millis(750)).await; + + client.write("2"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"2")); + + sleep(Millis(750)).await; + client.write("3"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"3")); + + sleep(Millis(750)).await; + assert!(!client.is_closed()); + assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 0, 0]); + } + #[ntex::test] async fn test_read_timeout() { let (client, server) = IoTest::create(); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 2d95445d..ed033ad7 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -234,18 +234,26 @@ impl IoRef { #[inline] /// Start timer pub fn start_timer_secs(&self, timeout: Seconds) -> timer::TimerHandle { - if self.flags().contains(Flags::TIMEOUT) { - timer::unregister(self.0.keepalive.get(), self); - } - if !timeout.is_zero() { - log::debug!("start timer {:?}", timeout); - self.0.insert_flags(Flags::TIMEOUT); - let hnd = timer::register(timeout, self); - self.0.keepalive.set(hnd); - hnd + if self.flags().contains(Flags::TIMEOUT) { + let old_hnd = self.0.keepalive.get(); + let hnd = timer::update(old_hnd, timeout, self); + if old_hnd != hnd { + self.0.keepalive.set(hnd); + } + hnd + } else { + log::debug!("start timer {:?}", timeout); + self.0.insert_flags(Flags::TIMEOUT); + let hnd = timer::register(timeout, self); + self.0.keepalive.set(hnd); + hnd + } } else { - self.0.remove_flags(Flags::TIMEOUT); + if self.flags().contains(Flags::TIMEOUT) { + self.0.remove_flags(Flags::TIMEOUT); + timer::unregister(self.0.keepalive.get(), self); + } Default::default() } } diff --git a/ntex-io/src/timer.rs b/ntex-io/src/timer.rs index ee09b01a..20f3fe4b 100644 --- a/ntex-io/src/timer.rs +++ b/ntex-io/src/timer.rs @@ -1,6 +1,6 @@ #![allow(clippy::mutable_key_type)] use std::collections::{BTreeMap, VecDeque}; -use std::{cell::RefCell, ops, rc::Rc, time::Duration, time::Instant}; +use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant}; use ntex_util::time::{now, sleep, Seconds}; use ntex_util::{spawn, HashSet}; @@ -11,14 +11,15 @@ const CAP: usize = 64; const SEC: Duration = Duration::from_secs(1); thread_local! { - static TIMER: Rc> = Rc::new(RefCell::new( - Inner { - running: false, - base: Instant::now(), - current: 0, + static TIMER: Inner = Inner { + running: Cell::new(false), + base: Cell::new(Instant::now()), + current: Cell::new(0), + storage: RefCell::new(InnerMut { cache: VecDeque::with_capacity(CAP), notifications: BTreeMap::default(), - })); + }) + } } #[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] @@ -27,7 +28,7 @@ pub struct TimerHandle(u32); impl TimerHandle { pub fn remains(&self) -> Seconds { TIMER.with(|timer| { - let cur = timer.borrow().current; + let cur = timer.current.get(); if self.0 <= cur { Seconds::ZERO } else { @@ -37,7 +38,7 @@ impl TimerHandle { } pub fn instant(&self) -> Instant { - TIMER.with(|timer| timer.borrow().base + Duration::from_secs(self.0 as u64)) + TIMER.with(|timer| timer.base.get() + Duration::from_secs(self.0 as u64)) } } @@ -51,14 +52,18 @@ impl ops::Add for TimerHandle { } struct Inner { - running: bool, - base: Instant, - current: u32, + running: Cell, + base: Cell, + current: Cell, + storage: RefCell, +} + +struct InnerMut { cache: VecDeque>>, notifications: BTreeMap>>, } -impl Inner { +impl InnerMut { fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) { if let Some(states) = self.notifications.get_mut(&hnd.0) { states.remove(&io.0); @@ -73,52 +78,69 @@ impl Inner { } } +pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) { + TIMER.with(|timer| { + timer.storage.borrow_mut().unregister(hnd, io); + }) +} + +pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle { + TIMER.with(|timer| { + let new_hnd = timer.current.get() + timeout.0 as u32; + if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 { + hnd + } else { + timer.storage.borrow_mut().unregister(hnd, io); + register(timeout, io) + } + }) +} + pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle { TIMER.with(|timer| { - let mut inner = timer.borrow_mut(); - // setup current delta - if !inner.running { - inner.current = (now() - inner.base).as_secs() as u32; + if !timer.running.get() { + timer + .current + .set((now() - timer.base.get()).as_secs() as u32); } - let hnd = inner.current + timeout.0 as u32; + let hnd = { + let hnd = timer.current.get() + timeout.0 as u32; + let mut inner = timer.storage.borrow_mut(); - // search existing key - let hnd = if let Some((hnd, _)) = inner.notifications.range(hnd..hnd + 1).next() { - *hnd - } else { - let items = inner.cache.pop_front().unwrap_or_default(); - inner.notifications.insert(hnd, items); - hnd + // insert key + if let Some(item) = inner.notifications.range_mut(hnd..hnd + 1).next() { + item.1.insert(io.0.clone()); + *item.0 + } else { + let mut items = inner.cache.pop_front().unwrap_or_default(); + items.insert(io.0.clone()); + inner.notifications.insert(hnd, items); + hnd + } }; - inner - .notifications - .get_mut(&hnd) - .unwrap() - .insert(io.0.clone()); - - if !inner.running { - inner.running = true; - let inner = timer.clone(); + if !timer.running.get() { + timer.running.set(true); spawn(async move { - let guard = TimerGuard(inner.clone()); + let guard = TimerGuard; loop { sleep(SEC).await; - { - let mut i = inner.borrow_mut(); - i.current += 1; + let stop = TIMER.with(|timer| { + timer.current.set(timer.current.get() + 1); // notify io dispatcher - while let Some(key) = i.notifications.keys().next() { + let current = timer.current.get(); + let mut inner = timer.storage.borrow_mut(); + while let Some(key) = inner.notifications.keys().next() { let key = *key; - if key <= i.current { - let mut items = i.notifications.remove(&key).unwrap(); + if key <= current { + let mut items = inner.notifications.remove(&key).unwrap(); items.drain().for_each(|st| st.notify_timeout()); - if i.cache.len() <= CAP { - i.cache.push_back(items); + if inner.cache.len() <= CAP { + inner.cache.push_back(items); } } else { break; @@ -126,10 +148,16 @@ pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle { } // new tick - if i.notifications.is_empty() { - i.running = false; - break; + if inner.notifications.is_empty() { + timer.running.set(false); + true + } else { + false } + }); + + if stop { + break; } } drop(guard); @@ -140,18 +168,13 @@ pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle { }) } -struct TimerGuard(Rc>); +struct TimerGuard; impl Drop for TimerGuard { fn drop(&mut self) { - let mut inner = self.0.borrow_mut(); - inner.running = false; - inner.notifications.clear(); + TIMER.with(|timer| { + timer.running.set(false); + timer.storage.borrow_mut().notifications.clear(); + }) } } - -pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) { - TIMER.with(|timer| { - timer.borrow_mut().unregister(hnd, io); - }) -} diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 16009a15..eaebc477 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -58,7 +58,7 @@ ntex-util = "0.3.4" ntex-bytes = "0.1.21" ntex-h2 = "0.4.4" ntex-rt = "0.4.11" -ntex-io = "0.3.13" +ntex-io = "0.3.14" ntex-tls = "0.3.2" ntex-tokio = { version = "0.3.1", optional = true } ntex-glommio = { version = "0.3.1", optional = true }