Fix KEEP-ALIVE timer handling (#264)

This commit is contained in:
Nikolay Kim 2023-12-10 08:12:33 +06:00 committed by GitHub
parent c9993afa89
commit df613e6f2d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 172 additions and 85 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.3.14] - 2023-12-10
* Fix KEEP-ALIVE timer handling
## [0.3.13] - 2023-12-02
* Optimize KEEP-ALIVE timer

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.3.13"
version = "0.3.14"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -151,7 +151,7 @@ bitflags::bitflags! {
const READY_ERR = 0b00001;
const IO_ERR = 0b00010;
const KA_ENABLED = 0b00100;
const NO_KA_TIMEOUT = 0b01000;
const KA_TIMEOUT = 0b01000;
const READ_TIMEOUT = 0b10000;
}
}
@ -233,9 +233,9 @@ where
io.set_disconnect_timeout(cfg.disconnect_timeout());
let flags = if cfg.keepalive_timeout_secs().is_zero() {
Flags::NO_KA_TIMEOUT
Flags::empty()
} else {
Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT
Flags::KA_ENABLED
};
let pool = io.memory_pool().pool();
@ -526,21 +526,20 @@ where
// got parsed frame
if decoded.item.is_some() {
self.read_remains = 0;
self.flags.remove(Flags::READ_TIMEOUT);
self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
} else if self.flags.contains(Flags::READ_TIMEOUT) {
// received new data but not enough for parsing complete frame
self.read_remains = decoded.remains as u32;
} else if self.read_remains == 0 && decoded.remains == 0 {
// no new data, start keep-alive timer
if self
.flags
.contains(Flags::NO_KA_TIMEOUT | Flags::KA_ENABLED)
if self.flags.contains(Flags::KA_ENABLED)
&& !self.flags.contains(Flags::KA_TIMEOUT)
{
log::debug!(
"Start keep-alive timer {:?}",
self.cfg.keepalive_timeout_secs()
);
self.flags.remove(Flags::NO_KA_TIMEOUT);
self.flags.insert(Flags::KA_TIMEOUT);
self.shared
.io
.start_timer_secs(self.cfg.keepalive_timeout_secs());
@ -548,8 +547,7 @@ where
} else if let Some((timeout, max, _)) = self.cfg.frame_read_rate_params() {
// we got new data but not enough to parse single frame
// start read timer
self.flags
.insert(Flags::READ_TIMEOUT | Flags::NO_KA_TIMEOUT);
self.flags.insert(Flags::READ_TIMEOUT);
self.read_remains = decoded.remains as u32;
self.read_remains_prev = 0;
@ -682,9 +680,9 @@ mod tests {
state.set_disconnect_timeout(cfg.disconnect_timeout());
let flags = if cfg.keepalive_timeout_secs().is_zero() {
super::Flags::NO_KA_TIMEOUT
super::Flags::empty()
} else {
super::Flags::KA_ENABLED | super::Flags::NO_KA_TIMEOUT
super::Flags::KA_ENABLED
};
let inner = State(state.get_ref());
@ -750,9 +748,7 @@ mod tests {
client.close().await;
assert!(client.is_server_dropped());
assert!(
format!("{:?}", super::Flags::NO_KA_TIMEOUT.clone()).contains("NO_KA_TIMEOUT")
);
assert!(format!("{:?}", super::Flags::KA_TIMEOUT.clone()).contains("KA_TIMEOUT"));
}
#[ntex::test]
@ -999,8 +995,6 @@ mod tests {
#[ntex::test]
async fn test_keepalive() {
let _ = env_logger::try_init();
let (client, server) = IoTest::create();
client.remote_buffer_cap(1024);
client.write("GET /test HTTP/1\r\n\r\n");
@ -1099,6 +1093,64 @@ mod tests {
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
}
/// Update keep-alive timer after receiving frame
#[ntex::test]
async fn test_keepalive3() {
let (client, server) = IoTest::create();
client.remote_buffer_cap(1024);
let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();
let cfg = DispatcherConfig::default()
.set_keepalive_timeout(Seconds(2))
.set_frame_read_rate(Seconds(1), Seconds(2), 2)
.clone();
let (disp, _) = Dispatcher::debug_cfg(
server,
BCodec(1),
ntex_service::fn_service(move |msg: DispatchItem<BCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::KeepAliveTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),
}
Ok(None)
}
}),
cfg,
);
spawn(async move {
let _ = disp.await;
});
client.write("1");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"1"));
sleep(Millis(750)).await;
client.write("2");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"2"));
sleep(Millis(750)).await;
client.write("3");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"3"));
sleep(Millis(750)).await;
assert!(!client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 0, 0]);
}
#[ntex::test]
async fn test_read_timeout() {
let (client, server) = IoTest::create();

View file

@ -234,18 +234,26 @@ impl IoRef {
#[inline]
/// Start timer
pub fn start_timer_secs(&self, timeout: Seconds) -> timer::TimerHandle {
if self.flags().contains(Flags::TIMEOUT) {
timer::unregister(self.0.keepalive.get(), self);
}
if !timeout.is_zero() {
log::debug!("start timer {:?}", timeout);
self.0.insert_flags(Flags::TIMEOUT);
let hnd = timer::register(timeout, self);
self.0.keepalive.set(hnd);
hnd
if self.flags().contains(Flags::TIMEOUT) {
let old_hnd = self.0.keepalive.get();
let hnd = timer::update(old_hnd, timeout, self);
if old_hnd != hnd {
self.0.keepalive.set(hnd);
}
hnd
} else {
log::debug!("start timer {:?}", timeout);
self.0.insert_flags(Flags::TIMEOUT);
let hnd = timer::register(timeout, self);
self.0.keepalive.set(hnd);
hnd
}
} else {
self.0.remove_flags(Flags::TIMEOUT);
if self.flags().contains(Flags::TIMEOUT) {
self.0.remove_flags(Flags::TIMEOUT);
timer::unregister(self.0.keepalive.get(), self);
}
Default::default()
}
}

View file

@ -1,6 +1,6 @@
#![allow(clippy::mutable_key_type)]
use std::collections::{BTreeMap, VecDeque};
use std::{cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
use ntex_util::time::{now, sleep, Seconds};
use ntex_util::{spawn, HashSet};
@ -11,14 +11,15 @@ const CAP: usize = 64;
const SEC: Duration = Duration::from_secs(1);
thread_local! {
static TIMER: Rc<RefCell<Inner>> = Rc::new(RefCell::new(
Inner {
running: false,
base: Instant::now(),
current: 0,
static TIMER: Inner = Inner {
running: Cell::new(false),
base: Cell::new(Instant::now()),
current: Cell::new(0),
storage: RefCell::new(InnerMut {
cache: VecDeque::with_capacity(CAP),
notifications: BTreeMap::default(),
}));
})
}
}
#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
@ -27,7 +28,7 @@ pub struct TimerHandle(u32);
impl TimerHandle {
pub fn remains(&self) -> Seconds {
TIMER.with(|timer| {
let cur = timer.borrow().current;
let cur = timer.current.get();
if self.0 <= cur {
Seconds::ZERO
} else {
@ -37,7 +38,7 @@ impl TimerHandle {
}
pub fn instant(&self) -> Instant {
TIMER.with(|timer| timer.borrow().base + Duration::from_secs(self.0 as u64))
TIMER.with(|timer| timer.base.get() + Duration::from_secs(self.0 as u64))
}
}
@ -51,14 +52,18 @@ impl ops::Add<Seconds> for TimerHandle {
}
struct Inner {
running: bool,
base: Instant,
current: u32,
running: Cell<bool>,
base: Cell<Instant>,
current: Cell<u32>,
storage: RefCell<InnerMut>,
}
struct InnerMut {
cache: VecDeque<HashSet<Rc<IoState>>>,
notifications: BTreeMap<u32, HashSet<Rc<IoState>>>,
}
impl Inner {
impl InnerMut {
fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
if let Some(states) = self.notifications.get_mut(&hnd.0) {
states.remove(&io.0);
@ -73,52 +78,69 @@ impl Inner {
}
}
pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
TIMER.with(|timer| {
timer.storage.borrow_mut().unregister(hnd, io);
})
}
pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle {
TIMER.with(|timer| {
let new_hnd = timer.current.get() + timeout.0 as u32;
if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 {
hnd
} else {
timer.storage.borrow_mut().unregister(hnd, io);
register(timeout, io)
}
})
}
pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
TIMER.with(|timer| {
let mut inner = timer.borrow_mut();
// setup current delta
if !inner.running {
inner.current = (now() - inner.base).as_secs() as u32;
if !timer.running.get() {
timer
.current
.set((now() - timer.base.get()).as_secs() as u32);
}
let hnd = inner.current + timeout.0 as u32;
let hnd = {
let hnd = timer.current.get() + timeout.0 as u32;
let mut inner = timer.storage.borrow_mut();
// search existing key
let hnd = if let Some((hnd, _)) = inner.notifications.range(hnd..hnd + 1).next() {
*hnd
} else {
let items = inner.cache.pop_front().unwrap_or_default();
inner.notifications.insert(hnd, items);
hnd
// insert key
if let Some(item) = inner.notifications.range_mut(hnd..hnd + 1).next() {
item.1.insert(io.0.clone());
*item.0
} else {
let mut items = inner.cache.pop_front().unwrap_or_default();
items.insert(io.0.clone());
inner.notifications.insert(hnd, items);
hnd
}
};
inner
.notifications
.get_mut(&hnd)
.unwrap()
.insert(io.0.clone());
if !inner.running {
inner.running = true;
let inner = timer.clone();
if !timer.running.get() {
timer.running.set(true);
spawn(async move {
let guard = TimerGuard(inner.clone());
let guard = TimerGuard;
loop {
sleep(SEC).await;
{
let mut i = inner.borrow_mut();
i.current += 1;
let stop = TIMER.with(|timer| {
timer.current.set(timer.current.get() + 1);
// notify io dispatcher
while let Some(key) = i.notifications.keys().next() {
let current = timer.current.get();
let mut inner = timer.storage.borrow_mut();
while let Some(key) = inner.notifications.keys().next() {
let key = *key;
if key <= i.current {
let mut items = i.notifications.remove(&key).unwrap();
if key <= current {
let mut items = inner.notifications.remove(&key).unwrap();
items.drain().for_each(|st| st.notify_timeout());
if i.cache.len() <= CAP {
i.cache.push_back(items);
if inner.cache.len() <= CAP {
inner.cache.push_back(items);
}
} else {
break;
@ -126,10 +148,16 @@ pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
}
// new tick
if i.notifications.is_empty() {
i.running = false;
break;
if inner.notifications.is_empty() {
timer.running.set(false);
true
} else {
false
}
});
if stop {
break;
}
}
drop(guard);
@ -140,18 +168,13 @@ pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
})
}
struct TimerGuard(Rc<RefCell<Inner>>);
struct TimerGuard;
impl Drop for TimerGuard {
fn drop(&mut self) {
let mut inner = self.0.borrow_mut();
inner.running = false;
inner.notifications.clear();
TIMER.with(|timer| {
timer.running.set(false);
timer.storage.borrow_mut().notifications.clear();
})
}
}
pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
TIMER.with(|timer| {
timer.borrow_mut().unregister(hnd, io);
})
}

View file

@ -58,7 +58,7 @@ ntex-util = "0.3.4"
ntex-bytes = "0.1.21"
ntex-h2 = "0.4.4"
ntex-rt = "0.4.11"
ntex-io = "0.3.13"
ntex-io = "0.3.14"
ntex-tls = "0.3.2"
ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.1", optional = true }