restore time util mod

This commit is contained in:
Nikolay Kim 2021-09-04 08:56:28 +06:00
parent 5a7e3ed589
commit 9511caf1db
3 changed files with 257 additions and 45 deletions

View file

@ -65,7 +65,7 @@ const LOWRES_RESOLUTION: time::Duration = time::Duration::from_millis(5);
/// Resolution is ~5ms /// Resolution is ~5ms
#[inline] #[inline]
pub fn now() -> time::Instant { pub fn now() -> time::Instant {
TIMER.with(|t| t.0.borrow_mut().now(&t.0)) TIMER.with(|t| t.borrow_mut().now(t))
} }
/// Returns the system time corresponding to “now”. /// Returns the system time corresponding to “now”.
@ -73,30 +73,34 @@ pub fn now() -> time::Instant {
/// Resolution is ~5ms /// Resolution is ~5ms
#[inline] #[inline]
pub fn system_time() -> time::SystemTime { pub fn system_time() -> time::SystemTime {
TIMER.with(|t| t.0.borrow_mut().system_time(&t.0)) TIMER.with(|t| t.borrow_mut().system_time(t))
} }
#[derive(Debug)] #[derive(Debug)]
pub struct TimerHandle(usize); pub struct TimerHandle(usize);
impl TimerHandle { impl TimerHandle {
/// Createt new timer and return handle
pub fn new(millis: u64) -> Self { pub fn new(millis: u64) -> Self {
Timer::add_timer(millis) TIMER.with(|t| Timer::add_timer(t, millis))
} }
/// Resets the `TimerHandle` instance to a new deadline. /// Resets the `TimerHandle` instance to a new deadline.
pub fn reset(&self, millis: u64) { pub fn reset(&self, millis: u64) {
Timer::update_timer(self.0, millis); TIMER.with(|t| Timer::update_timer(t, self.0, millis))
} }
pub fn is_elapsed(&self) -> bool { pub fn is_elapsed(&self) -> bool {
Timer::with_entry(self.0, |entry| { TIMER.with(|t| {
entry.flags.contains(TimerEntryFlags::ELAPSED) t.borrow_mut().timers[self.0]
.flags
.contains(TimerEntryFlags::ELAPSED)
}) })
} }
pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> { pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
Timer::with_entry(self.0, |entry| { TIMER.with(|t| {
let entry = &t.borrow_mut().timers[self.0];
if entry.flags.contains(TimerEntryFlags::ELAPSED) { if entry.flags.contains(TimerEntryFlags::ELAPSED) {
Poll::Ready(()) Poll::Ready(())
} else { } else {
@ -109,7 +113,7 @@ impl TimerHandle {
impl Drop for TimerHandle { impl Drop for TimerHandle {
fn drop(&mut self) { fn drop(&mut self) {
Timer::remove_timer(self.0); TIMER.with(|t| t.borrow_mut().remove_timer(self.0));
} }
} }
@ -123,13 +127,11 @@ bitflags::bitflags! {
} }
} }
struct Timer(Rc<RefCell<TimerInner>>);
thread_local! { thread_local! {
static TIMER: Timer = Timer::new(); static TIMER: Rc<RefCell<Timer>>= Rc::new(RefCell::new(Timer::new()));
} }
struct TimerInner { struct Timer {
timers: Slab<TimerEntry>, timers: Slab<TimerEntry>,
elapsed: u64, elapsed: u64,
elapsed_instant: time::Instant, elapsed_instant: time::Instant,
@ -146,33 +148,7 @@ struct TimerInner {
impl Timer { impl Timer {
fn new() -> Self { fn new() -> Self {
Timer(Rc::new(RefCell::new(TimerInner::new()))) Timer {
}
fn with_entry<F, R>(no: usize, f: F) -> R
where
F: Fn(&mut TimerEntry) -> R,
{
TIMER.with(|t| f(&mut t.0.borrow_mut().timers[no]))
}
// Add the timer into the hash bucket
fn add_timer(expires: u64) -> TimerHandle {
TIMER.with(|t| TimerInner::add_timer(&t.0, expires))
}
fn update_timer(handle: usize, expires: u64) {
TIMER.with(|t| TimerInner::update_timer(&t.0, handle, expires));
}
fn remove_timer(handle: usize) {
TIMER.with(|t| t.0.borrow_mut().remove_timer(handle));
}
}
impl TimerInner {
fn new() -> Self {
TimerInner {
buckets: Self::create_buckets(), buckets: Self::create_buckets(),
timers: Slab::default(), timers: Slab::default(),
elapsed: 0, elapsed: 0,
@ -197,7 +173,7 @@ impl TimerInner {
buckets buckets
} }
fn now(&mut self, inner: &Rc<RefCell<TimerInner>>) -> time::Instant { fn now(&mut self, inner: &Rc<RefCell<Timer>>) -> time::Instant {
let cur = self.lowres_time.get(); let cur = self.lowres_time.get();
if let Some(cur) = cur { if let Some(cur) = cur {
cur cur
@ -214,7 +190,7 @@ impl TimerInner {
} }
} }
fn system_time(&mut self, inner: &Rc<RefCell<TimerInner>>) -> time::SystemTime { fn system_time(&mut self, inner: &Rc<RefCell<Timer>>) -> time::SystemTime {
let cur = self.lowres_stime.get(); let cur = self.lowres_stime.get();
if let Some(cur) = cur { if let Some(cur) = cur {
cur cur
@ -555,12 +531,12 @@ impl TimerEntry {
} }
struct TimerDriver { struct TimerDriver {
inner: Rc<RefCell<TimerInner>>, inner: Rc<RefCell<Timer>>,
sleep: Pin<Box<Sleep>>, sleep: Pin<Box<Sleep>>,
} }
impl TimerDriver { impl TimerDriver {
fn start(cell: &Rc<RefCell<TimerInner>>) { fn start(cell: &Rc<RefCell<Timer>>) {
let mut inner = cell.borrow_mut(); let mut inner = cell.borrow_mut();
inner.flags.insert(Flags::TIMER_ACTIVE); inner.flags.insert(Flags::TIMER_ACTIVE);
@ -617,12 +593,12 @@ impl Future for TimerDriver {
} }
struct LowresTimerDriver { struct LowresTimerDriver {
inner: Rc<RefCell<TimerInner>>, inner: Rc<RefCell<Timer>>,
sleep: Pin<Box<Sleep>>, sleep: Pin<Box<Sleep>>,
} }
impl LowresTimerDriver { impl LowresTimerDriver {
fn start(slf: &mut TimerInner, cell: &Rc<RefCell<TimerInner>>) { fn start(slf: &mut Timer, cell: &Rc<RefCell<Timer>>) {
slf.flags.insert(Flags::LOWRES_DRIVER | Flags::LOWRES_TIMER); slf.flags.insert(Flags::LOWRES_DRIVER | Flags::LOWRES_TIMER);
crate::rt::spawn(LowresTimerDriver { crate::rt::spawn(LowresTimerDriver {

View file

@ -5,6 +5,7 @@ pub mod inflight;
pub mod keepalive; pub mod keepalive;
pub mod sink; pub mod sink;
pub mod stream; pub mod stream;
pub mod time;
pub mod timeout; pub mod timeout;
pub mod variant; pub mod variant;

235
ntex/src/util/time.rs Normal file
View file

@ -0,0 +1,235 @@
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use crate::rt::time::sleep;
use crate::service::{Service, ServiceFactory};
use crate::util::Ready;
#[derive(Clone, Debug)]
pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
resolution: Duration,
current: Option<Instant>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
Inner {
resolution,
current: None,
}
}
}
impl LowResTime {
pub fn with(resolution: Duration) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution))))
}
pub fn timer(&self) -> LowResTimeService {
LowResTimeService(self.0.clone())
}
}
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1)))))
}
}
impl ServiceFactory for LowResTime {
type Request = ();
type Response = Instant;
type Error = Infallible;
type InitError = Infallible;
type Config = ();
type Service = LowResTimeService;
type Future = Ready<Self::Service, Self::InitError>;
#[inline]
fn new_service(&self, _: ()) -> Self::Future {
Ready::Ok(self.timer())
}
}
#[derive(Clone, Debug)]
pub struct LowResTimeService(Rc<RefCell<Inner>>);
impl LowResTimeService {
pub fn with(resolution: Duration) -> LowResTimeService {
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> Instant {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = Instant::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
crate::rt::spawn(async move {
sleep(interval).await;
inner.borrow_mut().current.take();
});
now
}
}
}
impl Service for LowResTimeService {
type Request = ();
type Response = Instant;
type Error = Infallible;
type Future = Ready<Self::Response, Self::Error>;
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn call(&self, _: ()) -> Self::Future {
Ready::Ok(self.now())
}
}
#[derive(Clone, Debug)]
pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
resolution: Duration,
current: Option<time::SystemTime>,
}
impl SystemTimeInner {
fn new(resolution: Duration) -> Self {
SystemTimeInner {
resolution,
current: None,
}
}
}
#[derive(Clone, Debug)]
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
pub fn with(resolution: Duration) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> time::SystemTime {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = time::SystemTime::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
crate::rt::spawn(async move {
sleep(interval).await;
inner.borrow_mut().current.take();
});
now
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::lazy;
use std::time::{Duration, SystemTime};
#[crate::rt_test]
async fn low_res_timee() {
let f = LowResTime::default();
let srv = f.new_service(()).await.unwrap();
assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready());
srv.call(()).await.unwrap();
}
/// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value.
#[crate::rt_test]
async fn system_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = SystemTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[crate::rt_test]
async fn lowres_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: `SystemTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[crate::rt_test]
async fn system_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = SystemTimeService::with(resolution);
let first_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
sleep(wait_time).await;
let second_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
assert!(second_time - first_time >= wait_time);
}
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[crate::rt_test]
async fn lowres_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution);
let first_time = time_service.now();
sleep(wait_time).await;
let second_time = time_service.now();
assert!(second_time - first_time >= wait_time);
}
}