do not use lowres time for timer driver

This commit is contained in:
Nikolay Kim 2021-09-04 10:04:58 +06:00
parent db8a56c62f
commit d93555231c
2 changed files with 65 additions and 68 deletions

View file

@ -3,7 +3,8 @@
//! Inspired by linux kernel timers system
#![allow(arithmetic_overflow)]
use std::cell::{Cell, RefCell};
use std::{future::Future, mem, pin::Pin, rc::Rc, task, task::Poll, time};
use std::time::{Duration, Instant, SystemTime};
use std::{future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
use slab::Slab;
@ -58,13 +59,13 @@ const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
// Low res time resolution
const LOWRES_RESOLUTION: time::Duration = time::Duration::from_millis(5);
const LOWRES_RESOLUTION: Duration = Duration::from_millis(5);
/// Returns an instant corresponding to “now”.
///
/// Resolution is ~5ms
#[inline]
pub fn now() -> time::Instant {
pub fn now() -> Instant {
TIMER.with(|t| t.borrow_mut().now(t))
}
@ -72,7 +73,7 @@ pub fn now() -> time::Instant {
///
/// Resolution is ~5ms
#[inline]
pub fn system_time() -> time::SystemTime {
pub fn system_time() -> SystemTime {
TIMER.with(|t| t.borrow_mut().system_time(t))
}
@ -134,15 +135,15 @@ thread_local! {
struct Timer {
timers: Slab<TimerEntry>,
elapsed: u64,
elapsed_instant: time::Instant,
elapsed_instant: Instant,
next_expiry: u64,
flags: Flags,
driver: LocalWaker,
buckets: Vec<Bucket>,
/// Bit field tracking which bucket currently contain entries.
occupied: [u64; WHEEL_SIZE],
lowres_time: Cell<Option<time::Instant>>,
lowres_stime: Cell<Option<time::SystemTime>>,
lowres_time: Cell<Option<Instant>>,
lowres_stime: Cell<Option<SystemTime>>,
lowres_driver: LocalWaker,
}
@ -152,7 +153,7 @@ impl Timer {
buckets: Self::create_buckets(),
timers: Slab::default(),
elapsed: 0,
elapsed_instant: time::Instant::now(),
elapsed_instant: Instant::now(),
next_expiry: u64::MAX,
flags: Flags::empty(),
driver: LocalWaker::new(),
@ -173,12 +174,12 @@ impl Timer {
buckets
}
fn now(&mut self, inner: &Rc<RefCell<Timer>>) -> time::Instant {
fn now(&mut self, inner: &Rc<RefCell<Timer>>) -> Instant {
let cur = self.lowres_time.get();
if let Some(cur) = cur {
cur
} else {
let now = time::Instant::now();
let now = Instant::now();
self.lowres_time.set(Some(now));
if self.flags.contains(Flags::LOWRES_DRIVER) {
@ -190,12 +191,12 @@ impl Timer {
}
}
fn system_time(&mut self, inner: &Rc<RefCell<Timer>>) -> time::SystemTime {
fn system_time(&mut self, inner: &Rc<RefCell<Timer>>) -> SystemTime {
let cur = self.lowres_stime.get();
if let Some(cur) = cur {
cur
} else {
let now = time::SystemTime::now();
let now = SystemTime::now();
self.lowres_stime.set(Some(now));
if self.flags.contains(Flags::LOWRES_DRIVER) {
@ -223,7 +224,7 @@ impl Timer {
return TimerHandle(no);
}
let expire = slf.now(inner) + time::Duration::from_millis(millis);
let expire = slf.now(inner) + Duration::from_millis(millis);
let delta = if expire > slf.elapsed_instant {
to_units((expire - slf.elapsed_instant).as_millis() as u64)
} else {
@ -273,7 +274,7 @@ impl Timer {
return;
}
let expire = slf.now(inner) + time::Duration::from_millis(millis);
let expire = slf.now(inner) + Duration::from_millis(millis);
let delta = if expire > slf.elapsed_instant {
to_units((expire - slf.elapsed_instant).as_millis() as u64)
} else {
@ -381,9 +382,8 @@ impl Timer {
}
// Get instant of the next expiry
fn next_expiry(&mut self, inner: &Rc<RefCell<Self>>) -> time::Instant {
let millis = to_millis(self.next_expiry - self.elapsed);
self.now(inner) + time::Duration::from_millis(millis)
fn next_expiry_ms(&mut self) -> u64 {
to_millis(self.next_expiry - self.elapsed)
}
fn execute_expired_timers(&mut self) {
@ -472,7 +472,7 @@ impl Timer {
self.occupied = [0; WHEEL_SIZE];
self.next_expiry = u64::MAX;
self.elapsed = 0;
self.elapsed_instant = time::Instant::now();
self.elapsed_instant = Instant::now();
self.lowres_time.set(None);
self.lowres_stime.set(None);
}
@ -539,9 +539,10 @@ impl TimerDriver {
let mut inner = cell.borrow_mut();
inner.flags.insert(Flags::TIMER_ACTIVE);
let deadline = Instant::now() + Duration::from_millis(inner.next_expiry_ms());
crate::rt::spawn(TimerDriver {
inner: cell.clone(),
sleep: Box::pin(sleep_until(inner.next_expiry(cell))),
sleep: Box::pin(sleep_until(deadline)),
});
}
}
@ -562,9 +563,10 @@ impl Future for TimerDriver {
if inner.flags.contains(Flags::NEEDS_RECALC) {
inner.flags.remove(Flags::NEEDS_RECALC);
inner.flags.insert(Flags::TIMER_ACTIVE);
let exp = inner.next_expiry(&self.inner);
let deadline =
Instant::now() + Duration::from_millis(inner.next_expiry_ms());
drop(inner);
Pin::as_mut(&mut self.sleep).reset(exp);
Pin::as_mut(&mut self.sleep).reset(deadline);
return self.poll(cx);
} else if inner.flags.contains(Flags::TIMER_ACTIVE) {
drop(inner);
@ -575,23 +577,16 @@ impl Future for TimerDriver {
inner.elapsed_instant = now;
inner.execute_expired_timers();
loop {
if let Some(next_expiry) = inner.next_pending_bucket() {
if inner.next_expiry == next_expiry {
inner.elapsed += 1;
continue;
}
inner.next_expiry = next_expiry;
inner.flags.insert(Flags::TIMER_ACTIVE);
let exp = inner.next_expiry(&self.inner);
drop(inner);
Pin::as_mut(&mut self.sleep).reset(exp);
return self.poll(cx);
} else {
inner.next_expiry = u64::MAX;
inner.flags.remove(Flags::TIMER_ACTIVE);
}
break;
if let Some(next_expiry) = inner.next_pending_bucket() {
inner.next_expiry = next_expiry;
inner.flags.insert(Flags::TIMER_ACTIVE);
let deadline = now + Duration::from_millis(inner.next_expiry_ms());
drop(inner);
Pin::as_mut(&mut self.sleep).reset(deadline);
return self.poll(cx);
} else {
inner.next_expiry = u64::MAX;
inner.flags.remove(Flags::TIMER_ACTIVE);
}
}
}
@ -610,7 +605,7 @@ impl LowresTimerDriver {
crate::rt::spawn(LowresTimerDriver {
inner: cell.clone(),
sleep: Box::pin(sleep_until(time::Instant::now() + LOWRES_RESOLUTION)),
sleep: Box::pin(sleep_until(Instant::now() + LOWRES_RESOLUTION)),
});
}
}
@ -640,7 +635,7 @@ impl Future for LowresTimerDriver {
} else {
inner.flags.insert(Flags::LOWRES_TIMER);
drop(inner);
Pin::as_mut(&mut self.sleep).reset(time::Instant::now() + LOWRES_RESOLUTION);
Pin::as_mut(&mut self.sleep).reset(Instant::now() + LOWRES_RESOLUTION);
self.poll(cx)
}
}
@ -648,8 +643,6 @@ impl Future for LowresTimerDriver {
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use super::*;
use crate::time::*;

View file

@ -1,9 +1,9 @@
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
use std::time::{self, Instant};
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use crate::rt::time_driver::sleep_until;
use crate::service::{Service, ServiceFactory};
use crate::time::{sleep, Millis};
use crate::util::Ready;
#[derive(Clone, Debug)]
@ -11,12 +11,12 @@ pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
resolution: Duration,
resolution: Millis,
current: Option<Instant>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
fn new(resolution: Millis) -> Self {
Inner {
resolution,
current: None,
@ -25,8 +25,9 @@ impl Inner {
}
impl LowResTime {
pub fn with(resolution: Duration) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution))))
/// Create new timer service
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution.into()))))
}
pub fn timer(&self) -> LowResTimeService {
@ -36,7 +37,7 @@ impl LowResTime {
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1)))))
LowResTime(Rc::new(RefCell::new(Inner::new(Millis(1000)))))
}
}
@ -59,8 +60,8 @@ impl ServiceFactory for LowResTime {
pub struct LowResTimeService(Rc<RefCell<Inner>>);
impl LowResTimeService {
pub fn with(resolution: Duration) -> LowResTimeService {
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution))))
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTimeService {
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution.into()))))
}
/// Get current time. This function has to be called from
@ -79,7 +80,7 @@ impl LowResTimeService {
};
crate::rt::spawn(async move {
sleep_until(Instant::now() + interval).await;
sleep(interval).await;
inner.borrow_mut().current.take();
});
now
@ -109,12 +110,12 @@ pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
resolution: Duration,
resolution: Millis,
current: Option<time::SystemTime>,
}
impl SystemTimeInner {
fn new(resolution: Duration) -> Self {
fn new(resolution: Millis) -> Self {
SystemTimeInner {
resolution,
current: None,
@ -126,8 +127,11 @@ impl SystemTimeInner {
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
pub fn with(resolution: Duration) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution))))
/// Create new system time service
pub fn new<T: Into<Millis>>(resolution: T) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(
resolution.into(),
))))
}
/// Get current time. This function has to be called from
@ -146,7 +150,7 @@ impl SystemTimeService {
};
crate::rt::spawn(async move {
sleep_until(Instant::now() + interval).await;
sleep(interval).await;
inner.borrow_mut().current.take();
});
now
@ -157,7 +161,7 @@ impl SystemTimeService {
#[cfg(test)]
mod tests {
use super::*;
use crate::util::lazy;
use crate::{time::sleep, util::lazy};
use std::time::{Duration, SystemTime};
#[crate::rt_test]
@ -175,7 +179,7 @@ mod tests {
async fn system_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = SystemTimeService::with(resolution);
let time_service = SystemTimeService::new(resolution);
assert_eq!(time_service.now(), time_service.now());
}
@ -185,7 +189,7 @@ mod tests {
#[crate::rt_test]
async fn lowres_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution);
let time_service = LowResTimeService::new(resolution);
assert_eq!(time_service.now(), time_service.now());
}
@ -196,23 +200,23 @@ mod tests {
#[crate::rt_test]
async fn system_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let wait_time = 300;
let time_service = SystemTimeService::with(resolution);
let time_service = SystemTimeService::new(resolution);
let first_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
sleep_until(Instant::now() + wait_time).await;
sleep(Millis(wait_time)).await;
let second_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
assert!(second_time - first_time >= wait_time);
assert!(second_time - first_time >= Duration::from_millis(wait_time));
}
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
@ -222,14 +226,14 @@ mod tests {
#[crate::rt_test]
async fn lowres_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution);
let wait_time = 300;
let time_service = LowResTimeService::new(resolution);
let first_time = time_service.now();
sleep_until(Instant::now() + wait_time).await;
sleep(Millis(wait_time)).await;
let second_time = time_service.now();
assert!(second_time - first_time >= wait_time);
assert!(second_time - first_time >= Duration::from_millis(wait_time));
}
}