diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 5e85b876..43db609c 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,13 @@ # Changes +## [0.4.0-b.8] - 2021-09-01 + +* Add `ntex::time::now()` helper, returns low res time. + +* Add `ntex::time::system_time()` helper, returns low res system time. + +* Removed `LowResTime` and `SystemTime` services + ## [0.4.0-b.7] - 2021-08-31 * Remove From for Millis impl diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index e2110c45..82cde462 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.4.0-b.7" +version = "0.4.0-b.8" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" diff --git a/ntex/src/framed/time.rs b/ntex/src/framed/time.rs index 648cfd31..e1105546 100644 --- a/ntex/src/framed/time.rs +++ b/ntex/src/framed/time.rs @@ -1,7 +1,7 @@ use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Instant}; use crate::framed::State; -use crate::time::{sleep, Millis}; +use crate::time::{now, sleep, Millis}; use crate::util::HashSet; pub struct Timer(Rc>); @@ -75,11 +75,11 @@ impl Timer { if let Some(cur) = cur { cur } else { - let now = Instant::now(); + let now_val = now(); let inner = self.0.clone(); let interval = { let mut b = inner.borrow_mut(); - b.current = Some(now); + b.current = Some(now_val); b.resolution }; @@ -87,7 +87,7 @@ impl Timer { sleep(interval).await; let empty = { let mut i = inner.borrow_mut(); - let now = i.current.take().unwrap_or_else(Instant::now); + let now = i.current.take().unwrap_or_else(now); // notify io dispatcher while let Some(key) = i.notifications.keys().next() { @@ -109,7 +109,7 @@ impl Timer { } }); - now + now_val } } } diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 48d349a4..1526d409 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -11,7 +11,7 @@ use crate::http::Protocol; use crate::rt::spawn; use crate::service::Service; use crate::task::LocalWaker; -use crate::time::{sleep, Millis, Sleep}; +use crate::time::{now, sleep, Millis, Sleep}; use crate::util::{poll_fn, Bytes, HashMap}; use super::connection::{ConnectionType, IoConnection}; @@ -239,7 +239,7 @@ where // check if open connection is available // cleanup stale connections at the same time if let Some(ref mut connections) = self.available.get_mut(key) { - let now = Instant::now(); + let now = now(); while let Some(conn) = connections.pop_back() { // check if it still usable if (now - conn.used) > self.conn_keep_alive @@ -279,7 +279,7 @@ where .push_back(AvailableConnection { io, created, - used: Instant::now(), + used: now(), }); self.check_availibility(); } @@ -480,7 +480,7 @@ where // h2 connection is ready let conn = IoConnection::new( ConnectionType::H2(snd), - Instant::now(), + now(), Some(this.guard.take().unwrap().consume()), ); if let Err(Ok(conn)) = this.tx.take().unwrap().send(Ok(conn)) { @@ -517,7 +517,7 @@ where if proto == Protocol::Http1 { let conn = IoConnection::new( ConnectionType::H1(io), - Instant::now(), + now(), Some(this.guard.take().unwrap().consume()), ); if let Err(Ok(conn)) = this.tx.take().unwrap().send(Ok(conn)) { diff --git a/ntex/src/time/mod.rs b/ntex/src/time/mod.rs index 33571e56..fc40b87b 100644 --- a/ntex/src/time/mod.rs +++ b/ntex/src/time/mod.rs @@ -6,12 +6,12 @@ mod types; mod wheel; pub use self::types::{Millis, Seconds}; -pub use self::wheel::TimerHandle; +pub use self::wheel::{now, system_time, TimerHandle}; /// Waits until `duration` has elapsed. /// /// No work is performed while awaiting on the sleep future to complete. `Sleep` -/// operates at 16.5 millisecond granularity and should not be used for tasks that +/// operates at 16 millisecond granularity and should not be used for tasks that /// require high-resolution timers. #[inline] pub fn sleep>(dur: T) -> Sleep { @@ -189,3 +189,60 @@ impl crate::Stream for Interval { self.poll_tick(cx).map(|_| Some(())) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::time; + + /// State Under Test: Two calls of `now()` return the same value if they are done within resolution interval. + /// + /// Expected Behavior: Two back-to-back calls of `now()` return the same value. + #[crate::rt_test] + async fn lowres_time_does_not_immediately_change() { + assert_eq!(now(), now()); + } + + /// State Under Test: `now()` updates returned value every ~1ms period. + /// + /// Expected Behavior: Two calls of `now()` made in subsequent resolution interval return different values + /// and second value is greater than the first one at least by a 1ms interval. + #[crate::rt_test] + async fn lowres_time_updates_after_resolution_interval() { + let first_time = now(); + + sleep(Millis(25)).await; + + let second_time = now(); + assert!(second_time - first_time >= time::Duration::from_millis(25)); + } + + /// State Under Test: Two calls of `system_time()` return the same value if they are done within 1ms interval. + /// + /// Expected Behavior: Two back-to-back calls of `now()` return the same value. + #[crate::rt_test] + async fn system_time_service_time_does_not_immediately_change() { + assert_eq!(system_time(), system_time()); + } + + /// State Under Test: `system_time()` updates returned value every 1ms period. + /// + /// Expected Behavior: Two calls of `system_time()` made in subsequent resolution interval return different values + /// and second value is greater than the first one at least by a resolution interval. + #[crate::rt_test] + async fn system_time_service_time_updates_after_resolution_interval() { + let wait_time = 300; + + let first_time = system_time() + .duration_since(time::SystemTime::UNIX_EPOCH) + .unwrap(); + + sleep(Millis(wait_time)).await; + + let second_time = system_time() + .duration_since(time::SystemTime::UNIX_EPOCH) + .unwrap(); + + assert!(second_time - first_time >= time::Duration::from_millis(wait_time)); + } +} diff --git a/ntex/src/time/wheel.rs b/ntex/src/time/wheel.rs index 47ea015b..5d02e582 100644 --- a/ntex/src/time/wheel.rs +++ b/ntex/src/time/wheel.rs @@ -2,9 +2,8 @@ //! //! Inspired by linux kernel timers system #![allow(arithmetic_overflow)] -use std::{ - cell::RefCell, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll, time, -}; +use std::cell::{Cell, RefCell}; +use std::{future::Future, mem, pin::Pin, rc::Rc, task, task::Poll, time}; use slab::Slab; @@ -57,6 +56,23 @@ const fn lvl_offs(n: u64) -> u64 { const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH); const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1)); const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize); +const ONE_MS: time::Duration = time::Duration::from_millis(1); + +/// Returns an instant corresponding to “now”. +/// +/// Resolution is ~1ms +#[inline] +pub fn now() -> time::Instant { + TIMER.with(|t| t.0.borrow().now()) +} + +/// Returns the system time corresponding to “now”. +/// +/// Resolution is ~1ms +#[inline] +pub fn system_time() -> time::SystemTime { + TIMER.with(|t| t.0.borrow().system_time()) +} #[derive(Debug)] pub struct TimerHandle(usize); @@ -100,6 +116,7 @@ bitflags::bitflags! { const DRIVER_STARTED = 0b0000_0001; const NEEDS_RECALC = 0b0000_0010; const TIMER_ACTIVE = 0b0000_0100; + const LOWRES_TIMER = 0b0000_1000; } } @@ -119,11 +136,16 @@ struct TimerInner { buckets: Vec, /// Bit field tracking which bucket currently contain entries. occupied: [u64; WHEEL_SIZE], + lowres_time: Cell>, + lowres_stime: Cell>, + lowres_driver: LocalWaker, } impl Timer { fn new() -> Self { - Timer(Rc::new(RefCell::new(TimerInner::new()))) + let inner = Rc::new(RefCell::new(TimerInner::new())); + LowresTimerDriver::start(&inner); + Timer(inner) } fn with_entry(no: usize, f: F) -> R @@ -158,6 +180,9 @@ impl TimerInner { flags: Flags::empty(), driver: LocalWaker::new(), occupied: [0; WHEEL_SIZE], + lowres_time: Cell::new(None), + lowres_stime: Cell::new(None), + lowres_driver: LocalWaker::new(), } } @@ -171,6 +196,30 @@ impl TimerInner { buckets } + fn now(&self) -> time::Instant { + let cur = self.lowres_time.get(); + if let Some(cur) = cur { + cur + } else { + let now = time::Instant::now(); + self.lowres_driver.wake(); + self.lowres_time.set(Some(now)); + now + } + } + + fn system_time(&self) -> time::SystemTime { + let cur = self.lowres_stime.get(); + if let Some(cur) = cur { + cur + } else { + let now = time::SystemTime::now(); + self.lowres_driver.wake(); + self.lowres_stime.set(Some(now)); + now + } + } + // Add the timer into the hash bucket fn add_timer(inner: &Rc>, millis: u64) -> TimerHandle { let mut slf = inner.borrow_mut(); @@ -188,8 +237,7 @@ impl TimerInner { } let delta = to_units( - (time::Instant::now() + time::Duration::from_millis(millis) - - slf.elapsed_instant) + (slf.now() + time::Duration::from_millis(millis) - slf.elapsed_instant) .as_millis() as u64, ); @@ -237,8 +285,7 @@ impl TimerInner { } let delta = to_units( - (time::Instant::now() + time::Duration::from_millis(millis) - - slf.elapsed_instant) + (slf.now() + time::Duration::from_millis(millis) - slf.elapsed_instant) .as_millis() as u64, ); @@ -343,9 +390,9 @@ impl TimerInner { } // Get instant of the next expiry - fn next_expiry(&mut self) -> time::Instant { + fn next_expiry(&self) -> time::Instant { let millis = to_millis(self.next_expiry - self.elapsed); - time::Instant::now() + time::Duration::from_millis(millis) + self.now() + time::Duration::from_millis(millis) } fn execute_expired_timers(&mut self, instant: time::Instant) { @@ -550,6 +597,57 @@ impl Future for TimerDriver { } } +struct LowresTimerDriver { + inner: Rc>, + sleep: Pin>, +} + +impl LowresTimerDriver { + fn start(cell: &Rc>) { + let mut inner = cell.borrow_mut(); + inner.flags.insert(Flags::LOWRES_TIMER); + + crate::rt::spawn(LowresTimerDriver { + inner: cell.clone(), + sleep: Box::pin(sleep_until(time::Instant::now() + ONE_MS)), + }); + } +} + +impl Drop for LowresTimerDriver { + fn drop(&mut self) { + let mut inner = self.inner.borrow_mut(); + inner.flags = Flags::empty(); + inner.lowres_time.set(None); + inner.lowres_stime.set(None); + } +} + +impl Future for LowresTimerDriver { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut inner = self.inner.borrow_mut(); + inner.lowres_driver.register(cx.waker()); + + if inner.flags.contains(Flags::LOWRES_TIMER) { + drop(inner); + if Pin::as_mut(&mut self.sleep).poll(cx).is_ready() { + let mut inner = self.inner.borrow_mut(); + inner.lowres_time.set(None); + inner.lowres_stime.set(None); + inner.flags.remove(Flags::LOWRES_TIMER); + } + task::Poll::Pending + } else { + inner.flags.insert(Flags::LOWRES_TIMER); + drop(inner); + Pin::as_mut(&mut self.sleep).reset(time::Instant::now() + ONE_MS); + self.poll(cx) + } + } +} + #[cfg(test)] mod tests { use std::time::{Duration, Instant}; diff --git a/ntex/src/util/keepalive.rs b/ntex/src/util/keepalive.rs index 0e76f563..d6b20da5 100644 --- a/ntex/src/util/keepalive.rs +++ b/ntex/src/util/keepalive.rs @@ -1,18 +1,15 @@ use std::task::{Context, Poll}; use std::{cell::Cell, convert::Infallible, marker, time::Duration, time::Instant}; -use crate::time::{sleep, Millis, Sleep}; +use crate::time::{now, sleep, Millis, Sleep}; use crate::{util::Ready, Service, ServiceFactory}; -use super::time::{LowResTime, LowResTimeService}; - /// KeepAlive service factory /// /// Controls min time between requests. pub struct KeepAlive { f: F, ka: Millis, - time: LowResTime, _t: marker::PhantomData<(R, E)>, } @@ -24,10 +21,9 @@ where /// /// ka - keep-alive timeout /// err - error factory function - pub fn new(ka: Millis, time: LowResTime, err: F) -> Self { + pub fn new(ka: Millis, err: F) -> Self { KeepAlive { ka, - time, f: err, _t: marker::PhantomData, } @@ -42,7 +38,6 @@ where KeepAlive { f: self.f.clone(), ka: self.ka, - time: self.time.clone(), _t: marker::PhantomData, } } @@ -61,18 +56,13 @@ where type Future = Ready; fn new_service(&self, _: ()) -> Self::Future { - Ready::Ok(KeepAliveService::new( - self.ka, - self.time.timer(), - self.f.clone(), - )) + Ready::Ok(KeepAliveService::new(self.ka, self.f.clone())) } } pub struct KeepAliveService { f: F, dur: Millis, - time: LowResTimeService, sleep: Sleep, expire: Cell, _t: marker::PhantomData<(R, E)>, @@ -82,13 +72,12 @@ impl KeepAliveService where F: Fn() -> E, { - pub fn new(dur: Millis, time: LowResTimeService, f: F) -> Self { - let expire = Cell::new(time.now() + Duration::from(dur)); + pub fn new(dur: Millis, f: F) -> Self { + let expire = Cell::new(now()); KeepAliveService { f, dur, - time, expire, sleep: sleep(dur), _t: marker::PhantomData, @@ -108,11 +97,12 @@ where fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { match self.sleep.poll_elapsed(cx) { Poll::Ready(_) => { - let now = self.time.now(); - if self.expire.get() <= now { + let now = now(); + let expire = self.expire.get() + Duration::from(self.dur); + if expire <= now { Poll::Ready(Err((self.f)())) } else { - let expire = self.expire.get() - Instant::now(); + let expire = expire - now; self.sleep.reset(Millis(expire.as_millis() as u64)); let _ = self.sleep.poll_elapsed(cx); Poll::Ready(Ok(())) @@ -123,7 +113,7 @@ where } fn call(&self, req: R) -> Self::Future { - self.expire.set(self.time.now() + Duration::from(self.dur)); + self.expire.set(now()); Ready::Ok(req) } } @@ -139,8 +129,7 @@ mod tests { #[crate::rt_test] async fn test_ka() { - let factory = - KeepAlive::new(Millis(100), LowResTime::new(Millis(10)), || TestErr); + let factory = KeepAlive::new(Millis(100), || TestErr); let _ = factory.clone(); let service = factory.new_service(()).await.unwrap(); diff --git a/ntex/src/util/mod.rs b/ntex/src/util/mod.rs index 884c5d69..b531a19d 100644 --- a/ntex/src/util/mod.rs +++ b/ntex/src/util/mod.rs @@ -5,7 +5,6 @@ pub mod inflight; pub mod keepalive; pub mod sink; pub mod stream; -pub mod time; pub mod timeout; pub mod variant; diff --git a/ntex/src/util/time.rs b/ntex/src/util/time.rs deleted file mode 100644 index a878232a..00000000 --- a/ntex/src/util/time.rs +++ /dev/null @@ -1,239 +0,0 @@ -use std::task::{Context, Poll}; -use std::time::{self, Instant}; -use std::{cell::RefCell, convert::Infallible, rc::Rc}; - -use crate::service::{Service, ServiceFactory}; -use crate::time::{sleep, Millis}; -use crate::util::Ready; - -#[derive(Clone, Debug)] -pub struct LowResTime(Rc>); - -#[derive(Debug)] -struct Inner { - resolution: Millis, - current: Option, -} - -impl Inner { - fn new(resolution: Millis) -> Self { - Inner { - resolution, - current: None, - } - } -} - -impl LowResTime { - /// Create new timer service - pub fn new>(resolution: T) -> LowResTime { - LowResTime(Rc::new(RefCell::new(Inner::new(resolution.into())))) - } - - pub fn timer(&self) -> LowResTimeService { - LowResTimeService(self.0.clone()) - } -} - -impl Default for LowResTime { - fn default() -> Self { - LowResTime(Rc::new(RefCell::new(Inner::new(Millis(1000))))) - } -} - -impl ServiceFactory for LowResTime { - type Request = (); - type Response = Instant; - type Error = Infallible; - type InitError = Infallible; - type Config = (); - type Service = LowResTimeService; - type Future = Ready; - - #[inline] - fn new_service(&self, _: ()) -> Self::Future { - Ready::Ok(self.timer()) - } -} - -#[derive(Clone, Debug)] -pub struct LowResTimeService(Rc>); - -impl LowResTimeService { - pub fn new>(resolution: T) -> LowResTimeService { - LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution.into())))) - } - - /// Get current time. This function has to be called from - /// future's poll method, otherwise it panics. - pub fn now(&self) -> Instant { - let cur = self.0.borrow().current; - if let Some(cur) = cur { - cur - } else { - let now = Instant::now(); - let inner = self.0.clone(); - let interval = { - let mut b = inner.borrow_mut(); - b.current = Some(now); - b.resolution - }; - - crate::rt::spawn(async move { - sleep(interval).await; - inner.borrow_mut().current.take(); - }); - now - } - } -} - -impl Service for LowResTimeService { - type Request = (); - type Response = Instant; - type Error = Infallible; - type Future = Ready; - - #[inline] - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - #[inline] - fn call(&self, _: ()) -> Self::Future { - Ready::Ok(self.now()) - } -} - -#[derive(Clone, Debug)] -pub struct SystemTime(Rc>); - -#[derive(Debug)] -struct SystemTimeInner { - resolution: Millis, - current: Option, -} - -impl SystemTimeInner { - fn new(resolution: Millis) -> Self { - SystemTimeInner { - resolution, - current: None, - } - } -} - -#[derive(Clone, Debug)] -pub struct SystemTimeService(Rc>); - -impl SystemTimeService { - /// Create new system time service - pub fn new>(resolution: T) -> SystemTimeService { - SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new( - resolution.into(), - )))) - } - - /// Get current time. This function has to be called from - /// future's poll method, otherwise it panics. - pub fn now(&self) -> time::SystemTime { - let cur = self.0.borrow().current; - if let Some(cur) = cur { - cur - } else { - let now = time::SystemTime::now(); - let inner = self.0.clone(); - let interval = { - let mut b = inner.borrow_mut(); - b.current = Some(now); - b.resolution - }; - - crate::rt::spawn(async move { - sleep(interval).await; - inner.borrow_mut().current.take(); - }); - now - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{time::sleep, util::lazy}; - use std::time::{Duration, SystemTime}; - - #[crate::rt_test] - async fn low_res_timee() { - let f = LowResTime::default(); - let srv = f.new_service(()).await.unwrap(); - assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready()); - srv.call(()).await.unwrap(); - } - - /// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. - /// - /// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value. - #[crate::rt_test] - async fn system_time_service_time_does_not_immediately_change() { - let resolution = Duration::from_millis(50); - - let time_service = SystemTimeService::new(resolution); - assert_eq!(time_service.now(), time_service.now()); - } - - /// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. - /// - /// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value. - #[crate::rt_test] - async fn lowres_time_service_time_does_not_immediately_change() { - let resolution = Duration::from_millis(50); - let time_service = LowResTimeService::new(resolution); - assert_eq!(time_service.now(), time_service.now()); - } - - /// State Under Test: `SystemTimeService::now()` updates returned value every resolution period. - /// - /// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values - /// and second value is greater than the first one at least by a resolution interval. - #[crate::rt_test] - async fn system_time_service_time_updates_after_resolution_interval() { - let resolution = Duration::from_millis(100); - let wait_time = 300; - - let time_service = SystemTimeService::new(resolution); - - let first_time = time_service - .now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - sleep(Millis(wait_time)).await; - - let second_time = time_service - .now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - assert!(second_time - first_time >= Duration::from_millis(wait_time)); - } - - /// State Under Test: `LowResTimeService::now()` updates returned value every resolution period. - /// - /// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values - /// and second value is greater than the first one at least by a resolution interval. - #[crate::rt_test] - async fn lowres_time_service_time_updates_after_resolution_interval() { - let resolution = Duration::from_millis(100); - let wait_time = 300; - let time_service = LowResTimeService::new(resolution); - - let first_time = time_service.now(); - - sleep(Millis(wait_time)).await; - - let second_time = time_service.now(); - assert!(second_time - first_time >= Duration::from_millis(wait_time)); - } -}