Unify keep-alive timers

This commit is contained in:
Nikolay Kim 2021-12-30 18:15:51 +06:00
parent 7e6735a044
commit 2cc1c0e3d1
12 changed files with 113 additions and 173 deletions

View file

@ -1,6 +1,6 @@
<div align="center">
<p><h1>ntex</h1> </p>
<p><strong>Framework for composable network services. This is personal project. This project uses *unsafe*!</strong> </p>
<p><strong>Framework for composable network services. This project uses *unsafe*!</strong> </p>
<p>
[![build status](https://github.com/ntex-rs/ntex/workflows/CI%20%28Linux%29/badge.svg?branch=master&event=push)](https://github.com/ntex-rs/ntex/actions?query=workflow%3A"CI+(Linux)")

View file

@ -2,6 +2,8 @@
## [0.1.0] - 2021-12-30
* Unify keep-alive timers
* Add Io::poll_status_update() method to use instead of register_dispatcher()
* Reset DSP_STOP and DSP_KEEPALIVE flags

View file

@ -4,10 +4,10 @@ use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time}
use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder};
use ntex_service::{IntoService, Service};
use ntex_util::time::{now, Seconds};
use ntex_util::time::Seconds;
use ntex_util::{future::Either, ready};
use crate::{rt::spawn, DispatchItem, IoBoxed, IoRef, IoStatusUpdate, RecvError, Timer};
use crate::{rt::spawn, DispatchItem, IoBoxed, IoRef, IoStatusUpdate, RecvError};
type Response<U> = <U as Encoder>::Item;
@ -30,8 +30,8 @@ pin_project_lite::pin_project! {
bitflags::bitflags! {
struct Flags: u8 {
const READY_ERR = 0b0001;
const IO_ERR = 0b0010;
const READY_ERR = 0b0001;
const IO_ERR = 0b0010;
}
}
@ -42,9 +42,7 @@ where
{
io: IoBoxed,
st: Cell<DispatcherState>,
timer: Timer,
ka_timeout: Cell<Seconds>,
ka_updated: Cell<time::Instant>,
ka_timeout: Cell<time::Duration>,
error: Cell<Option<S::Error>>,
flags: Cell<Flags>,
shared: Rc<DispatcherShared<S, U>>,
@ -95,29 +93,21 @@ where
U: Decoder + Encoder + 'static,
{
/// Construct new `Dispatcher` instance.
pub fn new<Io, F: IntoService<S, DispatchItem<U>>>(
io: Io,
codec: U,
service: F,
timer: Timer,
) -> Self
pub fn new<Io, F: IntoService<S, DispatchItem<U>>>(io: Io, codec: U, service: F) -> Self
where
IoBoxed: From<Io>,
{
let io = IoBoxed::from(io);
let updated = now();
let ka_timeout = Cell::new(Seconds(30));
let ka_timeout = Cell::new(Seconds(30).into());
// register keepalive timer
let expire = updated + time::Duration::from(ka_timeout.get());
timer.register(expire, expire, &io);
io.start_keepalive_timer(ka_timeout.get());
Dispatcher {
service: service.into_service(),
fut: None,
inner: DispatcherInner {
pool: io.memory_pool().pool(),
ka_updated: Cell::new(updated),
error: Cell::new(None),
flags: Cell::new(Flags::empty()),
st: Cell::new(DispatcherState::Processing),
@ -127,7 +117,6 @@ where
inflight: Cell::new(0),
}),
io,
timer,
ka_timeout,
},
}
@ -139,15 +128,11 @@ where
///
/// By default keep-alive timeout is set to 30 seconds.
pub fn keepalive_timeout(self, timeout: Seconds) -> Self {
let ka_timeout = time::Duration::from(timeout);
// register keepalive timer
let prev = self.inner.ka_updated.get() + time::Duration::from(self.inner.ka());
if timeout.is_zero() {
self.inner.timer.unregister(prev, &self.inner.io);
} else {
let expire = self.inner.ka_updated.get() + time::Duration::from(timeout);
self.inner.timer.register(expire, prev, &self.inner.io);
}
self.inner.ka_timeout.set(timeout);
self.inner.io.start_keepalive_timer(ka_timeout);
self.inner.ka_timeout.set(ka_timeout);
self
}
@ -436,14 +421,6 @@ where
}
}
fn ka(&self) -> Seconds {
self.ka_timeout.get()
}
fn ka_enabled(&self) -> bool {
self.ka_timeout.get().non_zero()
}
fn insert_flags(&self, f: Flags) {
let mut flags = self.flags.get();
flags.insert(f);
@ -452,26 +429,13 @@ where
/// update keep-alive timer
fn update_keepalive(&self) {
if self.ka_enabled() {
let updated = now();
if updated != self.ka_updated.get() {
let ka = time::Duration::from(self.ka());
self.timer
.register(updated + ka, self.ka_updated.get() + ka, &self.io);
self.ka_updated.set(updated);
}
}
self.io.start_keepalive_timer(self.ka_timeout.get());
}
/// unregister keep-alive timer
fn unregister_keepalive(&self) {
if self.ka_enabled() {
self.ka_timeout.set(Seconds::ZERO);
self.timer.unregister(
self.ka_updated.get() + time::Duration::from(self.ka()),
&self.io,
);
}
self.io.remove_keepalive_timer();
self.ka_timeout.set(time::Duration::ZERO);
}
}
@ -524,32 +488,26 @@ mod tests {
service: F,
) -> (Self, State) {
let state = Io::new(io);
let timer = Timer::default();
let ka_timeout = Cell::new(Seconds(1));
let ka_updated = now();
let ka_timeout = Cell::new(Seconds(1).into());
let shared = Rc::new(DispatcherShared {
codec: codec,
error: Cell::new(None),
inflight: Cell::new(0),
});
let inner = State(state.get_ref());
let expire = ka_updated + Duration::from_millis(500);
timer.register(expire, expire, &state);
state.start_keepalive_timer(Duration::from_millis(500));
(
Dispatcher {
service: service.into_service(),
fut: None,
inner: DispatcherInner {
ka_updated: Cell::new(ka_updated),
error: Cell::new(None),
flags: Cell::new(super::Flags::empty()),
st: Cell::new(DispatcherState::Processing),
pool: state.memory_pool().pool(),
io: state.into(),
shared,
timer,
ka_timeout,
},
},

View file

@ -1,6 +1,6 @@
use std::cell::{Cell, RefCell};
use std::task::{Context, Poll};
use std::{fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc};
use std::{fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc, time};
use ntex_bytes::{BytesMut, PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder};
@ -9,7 +9,7 @@ use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Millis}
use super::filter::{Base, NullFilter};
use super::seal::Sealed;
use super::tasks::{ReadContext, WriteContext};
use super::{Filter, FilterFactory, Handle, IoStatusUpdate, IoStream, RecvError};
use super::{timer, Filter, FilterFactory, Handle, IoStatusUpdate, IoStream, RecvError};
bitflags::bitflags! {
pub struct Flags: u16 {
@ -65,6 +65,7 @@ pub(crate) struct IoState {
pub(super) filter: Cell<&'static dyn Filter>,
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
pub(super) on_disconnect: RefCell<Vec<Option<LocalWaker>>>,
keepalive: Cell<Option<time::Instant>>,
}
impl IoState {
@ -253,6 +254,7 @@ impl Io {
filter: Cell::new(NullFilter::get()),
handle: Cell::new(None),
on_disconnect: RefCell::new(Vec::new()),
keepalive: Cell::new(None),
});
let filter = Box::new(Base::new(IoRef(inner.clone())));
@ -302,19 +304,34 @@ impl<F> Io<F> {
self.0 .0.flags.get()
}
#[inline]
#[allow(clippy::should_implement_trait)]
/// Get `IoRef` reference
pub fn as_ref(&self) -> &IoRef {
&self.0
}
#[inline]
/// Get instance of `IoRef`
pub fn get_ref(&self) -> IoRef {
self.0.clone()
}
#[inline]
/// Start keep-alive timer
pub fn start_keepalive_timer(&self, timeout: time::Duration) {
if let Some(expire) = self.0 .0.keepalive.take() {
timer::unregister(expire, &self.0)
}
if timeout != time::Duration::ZERO {
self.0
.0
.keepalive
.set(Some(timer::register(timeout, &self.0)));
}
}
#[inline]
/// Remove keep-alive timer
pub fn remove_keepalive_timer(&self) {
if let Some(expire) = self.0 .0.keepalive.take() {
timer::unregister(expire, &self.0)
}
}
/// Get current io error
fn error(&self) -> Option<io::Error> {
self.0 .0.error.take()
@ -643,8 +660,16 @@ impl<F> Io<F> {
}
}
impl<F> AsRef<IoRef> for Io<F> {
fn as_ref(&self) -> &IoRef {
&self.0
}
}
impl<F> Drop for Io<F> {
fn drop(&mut self) {
self.remove_keepalive_timer();
if let FilterItem::Ptr(p) = self.1 {
if p.is_null() {
return;

View file

@ -15,7 +15,7 @@ mod io;
mod ioref;
mod seal;
mod tasks;
mod time;
mod timer;
pub mod utils;
#[cfg(feature = "async-std")]
@ -34,7 +34,6 @@ pub use self::filter::Base;
pub use self::io::{Io, IoRef};
pub use self::seal::{IoBoxed, Sealed};
pub use self::tasks::{ReadContext, WriteContext};
pub use self::time::Timer;
pub use self::utils::filter;
/// Status for read task

View file

@ -1,28 +1,24 @@
use std::{
cell::RefCell, collections::BTreeMap, collections::HashSet, rc::Rc, time::Instant,
};
use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Duration, time::Instant};
use ntex_util::time::{now, sleep, Millis};
use ntex_util::HashSet;
use crate::{io::IoState, rt::spawn, IoRef};
pub struct Timer(Rc<RefCell<Inner>>);
thread_local! {
static TIMER: Rc<RefCell<Inner>> = Rc::new(RefCell::new(
Inner {
running: false,
notifications: BTreeMap::default(),
}));
}
struct Inner {
running: bool,
resolution: Millis,
notifications: BTreeMap<Instant, HashSet<Rc<IoState>, fxhash::FxBuildHasher>>,
notifications: BTreeMap<Instant, HashSet<Rc<IoState>>>,
}
impl Inner {
fn new(resolution: Millis) -> Self {
Inner {
resolution,
running: false,
notifications: BTreeMap::default(),
}
}
fn unregister(&mut self, expire: Instant, io: &IoRef) {
if let Some(states) = self.notifications.get_mut(&expire) {
states.remove(&io.0);
@ -33,28 +29,12 @@ impl Inner {
}
}
impl Clone for Timer {
fn clone(&self) -> Self {
Timer(self.0.clone())
}
}
pub(crate) fn register(timeout: Duration, io: &IoRef) -> Instant {
let expire = now() + timeout;
impl Default for Timer {
fn default() -> Self {
Timer::new(Millis::ONE_SEC)
}
}
TIMER.with(|timer| {
let mut inner = timer.borrow_mut();
impl Timer {
/// Create new timer with resolution in milliseconds
pub fn new(resolution: Millis) -> Timer {
Timer(Rc::new(RefCell::new(Inner::new(resolution))))
}
pub fn register(&self, expire: Instant, previous: Instant, io: &IoRef) {
let mut inner = self.0.borrow_mut();
inner.unregister(previous, io);
inner
.notifications
.entry(expire)
@ -63,12 +43,11 @@ impl Timer {
if !inner.running {
inner.running = true;
let interval = inner.resolution;
let inner = self.0.clone();
let inner = timer.clone();
spawn(async move {
loop {
sleep(interval).await;
sleep(Millis::ONE_SEC).await;
{
let mut i = inner.borrow_mut();
let now_time = now();
@ -94,9 +73,13 @@ impl Timer {
}
});
}
}
});
pub fn unregister(&self, expire: Instant, io: &IoRef) {
self.0.borrow_mut().unregister(expire, io);
}
expire
}
pub(crate) fn unregister(expire: Instant, io: &IoRef) {
TIMER.with(|timer| {
let _ = timer.borrow_mut().unregister(expire, io);
})
}

View file

@ -1,10 +1,8 @@
use std::{cell::Cell, ptr::copy_nonoverlapping, rc::Rc, time};
use std::{cell::Cell, ptr::copy_nonoverlapping, rc::Rc, time, time::Duration};
use crate::http::{Request, Response};
use crate::io::{IoRef, Timer};
use crate::service::boxed::BoxService;
use crate::time::{sleep, Millis, Seconds, Sleep};
use crate::util::BytesMut;
use crate::time::{now, sleep, Millis, Seconds, Sleep};
use crate::{io::IoRef, service::boxed::BoxService, util::BytesMut};
#[derive(Debug, PartialEq, Clone, Copy)]
/// Server keep-alive setting
@ -49,7 +47,6 @@ pub(super) struct Inner {
pub(super) ka_enabled: bool,
pub(super) timer: DateService,
pub(super) ssl_handshake_timeout: Millis,
pub(super) timer_h1: Timer,
}
impl Clone for ServiceConfig {
@ -91,7 +88,6 @@ impl ServiceConfig {
client_disconnect,
ssl_handshake_timeout,
timer: DateService::new(),
timer_h1: Timer::default(),
}))
}
}
@ -102,12 +98,11 @@ pub(super) struct DispatcherConfig<S, X, U> {
pub(super) service: S,
pub(super) expect: X,
pub(super) upgrade: Option<U>,
pub(super) keep_alive: Millis,
pub(super) client_timeout: Millis,
pub(super) keep_alive: Duration,
pub(super) client_timeout: Duration,
pub(super) client_disconnect: Seconds,
pub(super) ka_enabled: bool,
pub(super) timer: DateService,
pub(super) timer_h1: Timer,
pub(super) on_request: Option<OnRequest>,
}
@ -124,12 +119,11 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
expect,
upgrade,
on_request,
keep_alive: cfg.0.keep_alive,
client_timeout: cfg.0.client_timeout,
keep_alive: Duration::from(cfg.0.keep_alive),
client_timeout: Duration::from(cfg.0.client_timeout),
client_disconnect: cfg.0.client_disconnect,
ka_enabled: cfg.0.ka_enabled,
timer: cfg.0.timer.clone(),
timer_h1: cfg.0.timer_h1.clone(),
}
}
@ -140,13 +134,20 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
/// Return keep-alive timer Sleep is configured.
pub(super) fn keep_alive_timer(&self) -> Option<Sleep> {
self.keep_alive.map(sleep)
if self.keep_alive != Duration::ZERO {
Some(sleep(self.keep_alive))
} else {
None
}
}
/// Keep-alive expire time
pub(super) fn keep_alive_expire(&self) -> Option<time::Instant> {
self.keep_alive
.map(|t| self.timer.now() + time::Duration::from(t))
if self.keep_alive != Duration::ZERO {
Some(now() + self.keep_alive)
} else {
None
}
}
}

View file

@ -1,10 +1,9 @@
//! Framed transport dispatcher
use std::task::{Context, Poll};
use std::{error::Error, fmt, future::Future, io, marker, pin::Pin, rc::Rc, time};
use std::{error::Error, fmt, future::Future, io, marker, pin::Pin, rc::Rc};
use crate::io::{Filter, Io, IoRef, RecvError};
use crate::service::Service;
use crate::{time::now, util::ready, util::Bytes};
use crate::{service::Service, util::ready, util::Bytes};
use crate::http;
use crate::http::body::{BodySize, MessageBody, ResponseBody};
@ -70,7 +69,6 @@ struct DispatcherInner<F, S, B, X, U> {
codec: Codec,
state: IoRef,
config: Rc<DispatcherConfig<S, X, U>>,
expire: time::Instant,
error: Option<DispatchError>,
payload: Option<(PayloadDecoder, PayloadSender)>,
_t: marker::PhantomData<(S, B)>,
@ -90,29 +88,24 @@ where
{
/// Construct new `Dispatcher` instance with outgoing messages stream.
pub(in crate::http) fn new(io: Io<F>, config: Rc<DispatcherConfig<S, X, U>>) -> Self {
let mut expire = now();
let state = io.get_ref();
let codec = Codec::new(config.timer.clone(), config.keep_alive_enabled());
io.set_disconnect_timeout(config.client_disconnect.into());
// slow-request timer
if config.client_timeout.non_zero() {
expire += time::Duration::from(config.client_timeout);
config.timer_h1.register(expire, expire, &state);
}
io.start_keepalive_timer(config.client_timeout);
Dispatcher {
call: CallState::None,
st: State::ReadRequest,
inner: DispatcherInner {
codec,
state,
config,
io: Some(io),
flags: Flags::empty(),
error: None,
payload: None,
codec,
state,
config,
expire,
_t: marker::PhantomData,
},
}
@ -275,10 +268,7 @@ where
// unregister slow-request timer
if !this.inner.flags.contains(Flags::STARTED) {
this.inner.flags.insert(Flags::STARTED);
this.inner
.config
.timer_h1
.unregister(this.inner.expire, &this.inner.state);
this.inner.io().remove_keepalive_timer();
}
if upgrade {
@ -436,21 +426,15 @@ where
fn unregister_keepalive(&mut self) {
if self.flags.contains(Flags::KEEPALIVE) {
self.config.timer_h1.unregister(self.expire, &self.state);
self.io().remove_keepalive_timer();
self.flags.remove(Flags::KEEPALIVE);
}
}
fn reset_keepalive(&mut self) {
// re-register keep-alive
if self.flags.contains(Flags::KEEPALIVE) && self.config.keep_alive.non_zero() {
let expire = now() + time::Duration::from(self.config.keep_alive);
if expire != self.expire {
self.config
.timer_h1
.register(expire, self.expire, &self.state);
self.expire = expire;
}
if self.flags.contains(Flags::KEEPALIVE) {
self.io().start_keepalive_timer(self.config.keep_alive);
}
}

View file

@ -49,10 +49,10 @@ where
) -> Self {
// keep-alive timer
let (ka_expire, ka_timer) = if let Some(delay) = timeout {
let expire = config.timer.now() + std::time::Duration::from(config.keep_alive);
let expire = config.timer.now() + config.keep_alive;
(expire, Some(delay))
} else if let Some(delay) = config.keep_alive_timer() {
let expire = config.timer.now() + std::time::Duration::from(config.keep_alive);
let expire = config.timer.now() + config.keep_alive;
(expire, Some(delay))
} else {
(now(), None)

View file

@ -743,7 +743,7 @@ impl WsConnection<Sealed> {
},
);
Dispatcher::new(self.io, self.codec, service, Default::default()).await
Dispatcher::new(self.io, self.codec, service).await
}
}

View file

@ -5,7 +5,7 @@ use std::{cell::Cell, future::Future, io, pin::Pin};
use ntex::codec::BytesCodec;
use ntex::http::test::server as test_server;
use ntex::http::{body, h1, test, HttpService, Request, Response, StatusCode};
use ntex::io::{DispatchItem, Dispatcher, Io, Timer};
use ntex::io::{DispatchItem, Dispatcher, Io};
use ntex::service::{fn_factory, Service};
use ntex::ws::{handshake, handshake_response};
use ntex::{util::ByteString, util::Bytes, util::Ready, ws};
@ -49,7 +49,7 @@ impl Service<(Request, Io, h1::Codec)> for WsService {
io.encode((res, body::BodySize::None).into(), &codec)
.unwrap();
Dispatcher::new(io.seal(), ws::Codec::new(), service, Timer::default())
Dispatcher::new(io.seal(), ws::Codec::new(), service)
.await
.map_err(|_| panic!())
};

View file

@ -38,13 +38,7 @@ async fn test_simple() {
.unwrap();
// start websocket service
Dispatcher::new(
io.seal(),
ws::Codec::default(),
ws_service,
Default::default(),
)
.await
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service).await
}
})
.finish(|_| Ready::Ok::<_, io::Error>(Response::NotFound()))
@ -94,13 +88,7 @@ async fn test_transport() {
.unwrap();
// start websocket service
Dispatcher::new(
io.seal(),
ws::Codec::default(),
ws_service,
Default::default(),
)
.await
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service).await
}
})
.finish(|_| Ready::Ok::<_, io::Error>(Response::NotFound()))