From 8ee296a399e969425f08f4ee0e4c229ce6c2a89b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 12 Dec 2023 15:35:21 +0600 Subject: [PATCH] Stop dispatcher timers on memory pool pause (#266) --- ntex-io/CHANGES.md | 4 ++++ ntex-io/Cargo.toml | 2 +- ntex-io/src/dispatcher.rs | 10 ++++------ ntex-io/src/io.rs | 12 ++++-------- ntex-io/src/ioref.rs | 34 ++++++++++++++++++---------------- ntex-io/src/timer.rs | 13 ++++++------- ntex/Cargo.toml | 2 +- 7 files changed, 38 insertions(+), 39 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index a803f941..5125cc26 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.15] - 2023-12-12 + +* Stop dispatcher timers on memory pool pause + ## [0.3.14] - 2023-12-10 * Fix KEEP-ALIVE timer handling diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 67d705d4..03c7b068 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.14" +version = "0.3.15" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 9f8201be..dc1e0167 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -307,6 +307,8 @@ where // handle memory pool pressure if slf.pool.poll_ready(cx).is_pending() { + slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); + slf.shared.io.stop_timer(); slf.shared.io.pause(); return Poll::Pending; } @@ -482,18 +484,14 @@ where log::trace!("service is not ready, register dispatch task"); // remove all timers - self.flags.remove(Flags::READ_TIMEOUT); + self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); self.shared.io.stop_timer(); match ready!(self.shared.io.poll_read_pause(cx)) { IoStatusUpdate::KeepAlive => { log::trace!("keep-alive error, stopping dispatcher during pause"); self.st = DispatcherState::Stop; - if self.flags.contains(Flags::READ_TIMEOUT) { - Poll::Ready(PollService::Item(DispatchItem::ReadTimeout)) - } else { - Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout)) - } + Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout)) } IoStatusUpdate::Stop => { log::trace!("dispatcher is instructed to stop during pause"); diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 034ffbd5..23345eb0 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -45,9 +45,6 @@ bitflags::bitflags! { const DSP_STOP = 0b0001_0000_0000_0000; /// timeout occured const DSP_TIMEOUT = 0b0010_0000_0000_0000; - - /// timer started - const TIMEOUT = 0b1000_0000_0000_0000; } } @@ -68,9 +65,9 @@ pub(crate) struct IoState { pub(super) buffer: Stack, pub(super) filter: Cell<&'static dyn Filter>, pub(super) handle: Cell>>, + pub(super) timeout: Cell, #[allow(clippy::box_collection)] pub(super) on_disconnect: Cell>>>, - pub(super) keepalive: Cell, } impl IoState { @@ -95,7 +92,6 @@ impl IoState { log::trace!("timeout, notify dispatcher"); let mut flags = self.flags.get(); - flags.remove(Flags::TIMEOUT); if !flags.contains(Flags::DSP_TIMEOUT) { flags.insert(Flags::DSP_TIMEOUT); self.flags.set(flags); @@ -170,7 +166,7 @@ impl fmt::Debug for IoState { .field("flags", &self.flags) .field("pool", &self.pool) .field("disconnect_timeout", &self.disconnect_timeout) - .field("keepalive", &self.keepalive) + .field("timeout", &self.timeout) .field("error", &err) .field("buffer", &self.buffer) .finish(); @@ -200,8 +196,8 @@ impl Io { buffer: Stack::new(), filter: Cell::new(NullFilter::get()), handle: Cell::new(None), + timeout: Cell::new(TimerHandle::default()), on_disconnect: Cell::new(None), - keepalive: Cell::new(TimerHandle::default()), }); let filter = Box::new(Base::new(IoRef(inner.clone()))); @@ -256,8 +252,8 @@ impl Io { buffer: Stack::new(), filter: Cell::new(NullFilter::get()), handle: Cell::new(None), + timeout: Cell::new(TimerHandle::default()), on_disconnect: Cell::new(None), - keepalive: Cell::new(TimerHandle::default()), }); let state = mem::replace(&mut self.0, IoRef(inner)); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index ed033ad7..2881c080 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -214,7 +214,7 @@ impl IoRef { #[inline] /// current timer handle pub fn timer_handle(&self) -> timer::TimerHandle { - self.0.keepalive.get() + self.0.timeout.get() } #[doc(hidden)] @@ -222,7 +222,7 @@ impl IoRef { #[inline] /// current timer deadline pub fn timer_deadline(&self) -> time::Instant { - self.0.keepalive.get().instant() + self.0.timeout.get().instant() } #[inline] @@ -234,37 +234,39 @@ impl IoRef { #[inline] /// Start timer pub fn start_timer_secs(&self, timeout: Seconds) -> timer::TimerHandle { + let cur_hnd = self.0.timeout.get(); + if !timeout.is_zero() { - if self.flags().contains(Flags::TIMEOUT) { - let old_hnd = self.0.keepalive.get(); - let hnd = timer::update(old_hnd, timeout, self); - if old_hnd != hnd { - self.0.keepalive.set(hnd); + if cur_hnd.is_set() { + let hnd = timer::update(cur_hnd, timeout, self); + if hnd != cur_hnd { + log::debug!("update timer {:?}", timeout); + self.0.timeout.set(hnd); } hnd } else { log::debug!("start timer {:?}", timeout); - self.0.insert_flags(Flags::TIMEOUT); let hnd = timer::register(timeout, self); - self.0.keepalive.set(hnd); + self.0.timeout.set(hnd); hnd } } else { - if self.flags().contains(Flags::TIMEOUT) { - self.0.remove_flags(Flags::TIMEOUT); - timer::unregister(self.0.keepalive.get(), self); + if cur_hnd.is_set() { + timer::unregister(cur_hnd, self); + self.0.timeout.set(timer::TimerHandle::ZERO); } - Default::default() + timer::TimerHandle::ZERO } } #[inline] /// Stop timer pub fn stop_timer(&self) { - if self.flags().contains(Flags::TIMEOUT) { + let hnd = self.0.timeout.get(); + if hnd.is_set() { log::debug!("unregister timer"); - self.0.remove_flags(Flags::TIMEOUT); - timer::unregister(self.0.keepalive.get(), self) + self.0.timeout.set(timer::TimerHandle::ZERO); + timer::unregister(hnd, self) } } diff --git a/ntex-io/src/timer.rs b/ntex-io/src/timer.rs index 20f3fe4b..cf0b1580 100644 --- a/ntex-io/src/timer.rs +++ b/ntex-io/src/timer.rs @@ -26,6 +26,12 @@ thread_local! { pub struct TimerHandle(u32); impl TimerHandle { + pub const ZERO: TimerHandle = TimerHandle(0); + + pub fn is_set(&self) -> bool { + self.0 != 0 + } + pub fn remains(&self) -> Seconds { TIMER.with(|timer| { let cur = timer.current.get(); @@ -67,13 +73,6 @@ impl InnerMut { fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) { if let Some(states) = self.notifications.get_mut(&hnd.0) { states.remove(&io.0); - if states.is_empty() { - if let Some(items) = self.notifications.remove(&hnd.0) { - if self.cache.len() <= CAP { - self.cache.push_back(items); - } - } - } } } } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index eaebc477..c8c58dd7 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -58,7 +58,7 @@ ntex-util = "0.3.4" ntex-bytes = "0.1.21" ntex-h2 = "0.4.4" ntex-rt = "0.4.11" -ntex-io = "0.3.14" +ntex-io = "0.3.15" ntex-tls = "0.3.2" ntex-tokio = { version = "0.3.1", optional = true } ntex-glommio = { version = "0.3.1", optional = true }