mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 21:37:58 +03:00
parent
adb74edf01
commit
a35535da32
5 changed files with 347 additions and 270 deletions
|
@ -293,6 +293,7 @@ where
|
|||
///
|
||||
/// This type allows you to wait on a sequence of instants with a certain
|
||||
/// duration between each instant.
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
#[derive(Debug)]
|
||||
pub struct Interval {
|
||||
hnd: TimerHandle,
|
||||
|
@ -350,6 +351,8 @@ mod tests {
|
|||
/// Expected Behavior: Two back-to-back calls of `now()` return the same value.
|
||||
#[ntex_macros::rt_test2]
|
||||
async fn lowres_time_does_not_immediately_change() {
|
||||
let _ = sleep(Seconds(1));
|
||||
|
||||
assert_eq!(now(), now())
|
||||
}
|
||||
|
||||
|
@ -359,6 +362,8 @@ mod tests {
|
|||
/// and second value is greater than the first one at least by a 1ms interval.
|
||||
#[ntex_macros::rt_test2]
|
||||
async fn lowres_time_updates_after_resolution_interval() {
|
||||
let _ = sleep(Seconds(1));
|
||||
|
||||
let first_time = now();
|
||||
|
||||
sleep(Millis(25)).await;
|
||||
|
@ -372,6 +377,8 @@ mod tests {
|
|||
/// Expected Behavior: Two back-to-back calls of `now()` return the same value.
|
||||
#[ntex_macros::rt_test2]
|
||||
async fn system_time_service_time_does_not_immediately_change() {
|
||||
let _ = sleep(Seconds(1));
|
||||
|
||||
assert_eq!(system_time(), system_time());
|
||||
assert_eq!(system_time(), query_system_time());
|
||||
}
|
||||
|
@ -382,6 +389,8 @@ mod tests {
|
|||
/// and second value is greater than the first one at least by a resolution interval.
|
||||
#[ntex_macros::rt_test2]
|
||||
async fn system_time_service_time_updates_after_resolution_interval() {
|
||||
let _ = sleep(Seconds(1));
|
||||
|
||||
let wait_time = 300;
|
||||
|
||||
let first_time = system_time()
|
||||
|
@ -399,6 +408,8 @@ mod tests {
|
|||
|
||||
#[ntex_macros::rt_test2]
|
||||
async fn test_sleep_0() {
|
||||
let _ = sleep(Seconds(1));
|
||||
|
||||
let first_time = now();
|
||||
sleep(Millis(0)).await;
|
||||
let second_time = now();
|
||||
|
@ -429,6 +440,8 @@ mod tests {
|
|||
|
||||
#[ntex_macros::rt_test2]
|
||||
async fn test_deadline() {
|
||||
let _ = sleep(Seconds(1));
|
||||
|
||||
let first_time = now();
|
||||
let dl = deadline(Millis(1));
|
||||
dl.await;
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//!
|
||||
//! Inspired by linux kernel timers system
|
||||
#![allow(arithmetic_overflow)]
|
||||
use std::cell::RefCell;
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
|
||||
|
||||
|
@ -72,7 +72,7 @@ const fn as_millis(dur: Duration) -> u64 {
|
|||
/// Resolution is 5ms
|
||||
#[inline]
|
||||
pub fn now() -> Instant {
|
||||
TIMER.with(|t| t.borrow_mut().now(t))
|
||||
TIMER.with(Timer::now)
|
||||
}
|
||||
|
||||
/// Returns the system time corresponding to “now”.
|
||||
|
@ -80,7 +80,7 @@ pub fn now() -> Instant {
|
|||
/// Resolution is 5ms
|
||||
#[inline]
|
||||
pub fn system_time() -> SystemTime {
|
||||
TIMER.with(|t| t.borrow_mut().system_time(t))
|
||||
TIMER.with(Timer::system_time)
|
||||
}
|
||||
|
||||
/// Returns the system time corresponding to “now”.
|
||||
|
@ -88,8 +88,9 @@ pub fn system_time() -> SystemTime {
|
|||
/// If low resolution system time is not set, use system time.
|
||||
/// This method does not start timer driver.
|
||||
#[inline]
|
||||
#[doc(hidden)]
|
||||
pub fn query_system_time() -> SystemTime {
|
||||
TIMER.with(|t| t.borrow().query_system_time())
|
||||
TIMER.with(Timer::system_time)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -98,21 +99,21 @@ pub struct TimerHandle(usize);
|
|||
impl TimerHandle {
|
||||
/// Createt new timer and return handle
|
||||
pub fn new(millis: u64) -> Self {
|
||||
TIMER.with(|t| Timer::add_timer(t, millis))
|
||||
TIMER.with(|t| t.add_timer(millis))
|
||||
}
|
||||
|
||||
/// Resets the `TimerHandle` instance to a new deadline.
|
||||
pub fn reset(&self, millis: u64) {
|
||||
TIMER.with(|t| Timer::update_timer(t, self.0, millis))
|
||||
TIMER.with(|t| t.update_timer(self.0, millis))
|
||||
}
|
||||
|
||||
pub fn is_elapsed(&self) -> bool {
|
||||
TIMER.with(|t| t.borrow_mut().timers[self.0].bucket.is_none())
|
||||
TIMER.with(|t| t.0.inner.borrow().timers[self.0].bucket.is_none())
|
||||
}
|
||||
|
||||
pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
|
||||
TIMER.with(|t| {
|
||||
let entry = &t.borrow().timers[self.0];
|
||||
let entry = &t.0.inner.borrow().timers[self.0];
|
||||
if entry.bucket.is_none() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
|
@ -125,7 +126,7 @@ impl TimerHandle {
|
|||
|
||||
impl Drop for TimerHandle {
|
||||
fn drop(&mut self) {
|
||||
TIMER.with(|t| t.borrow_mut().remove_timer(self.0));
|
||||
TIMER.with(|t| t.remove_timer(self.0));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,47 +136,56 @@ bitflags::bitflags! {
|
|||
const DRIVER_RECALC = 0b0000_0010;
|
||||
const LOWRES_TIMER = 0b0000_1000;
|
||||
const LOWRES_DRIVER = 0b0001_0000;
|
||||
const RUNNING = 0b0010_0000;
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static TIMER: Rc<RefCell<Timer>>= Rc::new(RefCell::new(Timer::new()));
|
||||
static TIMER: Timer = Timer::new();
|
||||
}
|
||||
|
||||
struct Timer {
|
||||
timers: Slab<TimerEntry>,
|
||||
elapsed: u64,
|
||||
elapsed_time: Option<Instant>,
|
||||
next_expiry: u64,
|
||||
flags: Flags,
|
||||
struct Timer(Rc<TimerInner>);
|
||||
|
||||
struct TimerInner {
|
||||
elapsed: Cell<u64>,
|
||||
elapsed_time: Cell<Option<Instant>>,
|
||||
next_expiry: Cell<u64>,
|
||||
flags: Cell<Flags>,
|
||||
driver: LocalWaker,
|
||||
lowres_time: Cell<Option<Instant>>,
|
||||
lowres_stime: Cell<Option<SystemTime>>,
|
||||
lowres_driver: LocalWaker,
|
||||
inner: RefCell<TimerMod>,
|
||||
}
|
||||
|
||||
struct TimerMod {
|
||||
timers: Slab<TimerEntry>,
|
||||
driver_sleep: Delay,
|
||||
buckets: Vec<Bucket>,
|
||||
/// Bit field tracking which bucket currently contain entries.
|
||||
occupied: [u64; WHEEL_SIZE],
|
||||
lowres_time: Option<Instant>,
|
||||
lowres_stime: Option<SystemTime>,
|
||||
lowres_driver: LocalWaker,
|
||||
lowres_driver_sleep: Delay,
|
||||
}
|
||||
|
||||
impl Timer {
|
||||
fn new() -> Self {
|
||||
Timer {
|
||||
buckets: Self::create_buckets(),
|
||||
timers: Slab::default(),
|
||||
elapsed: 0,
|
||||
elapsed_time: None,
|
||||
next_expiry: u64::MAX,
|
||||
flags: Flags::empty(),
|
||||
Timer(Rc::new(TimerInner {
|
||||
elapsed: Cell::new(0),
|
||||
elapsed_time: Cell::new(None),
|
||||
next_expiry: Cell::new(u64::MAX),
|
||||
flags: Cell::new(Flags::empty()),
|
||||
driver: LocalWaker::new(),
|
||||
driver_sleep: Delay::new(Duration::ZERO),
|
||||
occupied: [0; WHEEL_SIZE],
|
||||
lowres_time: None,
|
||||
lowres_stime: None,
|
||||
lowres_time: Cell::new(None),
|
||||
lowres_stime: Cell::new(None),
|
||||
lowres_driver: LocalWaker::new(),
|
||||
lowres_driver_sleep: Delay::new(Duration::ZERO),
|
||||
}
|
||||
inner: RefCell::new(TimerMod {
|
||||
buckets: Self::create_buckets(),
|
||||
timers: Slab::default(),
|
||||
driver_sleep: Delay::new(Duration::ZERO),
|
||||
occupied: [0; WHEEL_SIZE],
|
||||
lowres_driver_sleep: Delay::new(Duration::ZERO),
|
||||
}),
|
||||
}))
|
||||
}
|
||||
|
||||
fn create_buckets() -> Vec<Bucket> {
|
||||
|
@ -188,61 +198,52 @@ impl Timer {
|
|||
buckets
|
||||
}
|
||||
|
||||
fn now(&mut self, inner: &Rc<RefCell<Timer>>) -> Instant {
|
||||
if let Some(cur) = self.lowres_time {
|
||||
fn now(&self) -> Instant {
|
||||
if let Some(cur) = self.0.lowres_time.get() {
|
||||
cur
|
||||
} else {
|
||||
let now = Instant::now();
|
||||
self.lowres_time = Some(now);
|
||||
|
||||
if self.flags.contains(Flags::LOWRES_DRIVER) {
|
||||
self.lowres_driver.wake();
|
||||
} else {
|
||||
LowresTimerDriver::start(self, inner);
|
||||
let flags = self.0.flags.get();
|
||||
if flags.contains(Flags::RUNNING) {
|
||||
self.0.lowres_time.set(Some(now));
|
||||
|
||||
if flags.contains(Flags::LOWRES_DRIVER) {
|
||||
self.0.lowres_driver.wake();
|
||||
} else {
|
||||
LowresTimerDriver::start(self.0.clone());
|
||||
}
|
||||
}
|
||||
now
|
||||
}
|
||||
}
|
||||
|
||||
fn system_time(&mut self, inner: &Rc<RefCell<Timer>>) -> SystemTime {
|
||||
if let Some(cur) = self.lowres_stime {
|
||||
fn system_time(&self) -> SystemTime {
|
||||
if let Some(cur) = self.0.lowres_stime.get() {
|
||||
cur
|
||||
} else {
|
||||
let now = SystemTime::now();
|
||||
self.lowres_stime = Some(now);
|
||||
let flags = self.0.flags.get();
|
||||
|
||||
if self.flags.contains(Flags::LOWRES_DRIVER) {
|
||||
self.lowres_driver.wake();
|
||||
} else {
|
||||
LowresTimerDriver::start(self, inner);
|
||||
if flags.contains(Flags::RUNNING) {
|
||||
self.0.lowres_stime.set(Some(now));
|
||||
|
||||
if flags.contains(Flags::LOWRES_DRIVER) {
|
||||
self.0.lowres_driver.wake();
|
||||
} else {
|
||||
LowresTimerDriver::start(self.0.clone());
|
||||
}
|
||||
}
|
||||
now
|
||||
}
|
||||
}
|
||||
|
||||
fn query_system_time(&self) -> SystemTime {
|
||||
if let Some(cur) = self.lowres_stime {
|
||||
cur
|
||||
} else {
|
||||
SystemTime::now()
|
||||
}
|
||||
}
|
||||
|
||||
fn elapsed_time(&mut self) -> Instant {
|
||||
if let Some(elapsed_time) = self.elapsed_time {
|
||||
elapsed_time
|
||||
} else {
|
||||
let elapsed_time = Instant::now();
|
||||
self.elapsed_time = Some(elapsed_time);
|
||||
elapsed_time
|
||||
}
|
||||
}
|
||||
|
||||
/// Add the timer into the hash bucket
|
||||
fn add_timer(inner: &Rc<RefCell<Self>>, millis: u64) -> TimerHandle {
|
||||
let mut slf = inner.borrow_mut();
|
||||
fn add_timer(&self, millis: u64) -> TimerHandle {
|
||||
if millis == 0 {
|
||||
let entry = slf.timers.vacant_entry();
|
||||
let mut inner = self.0.inner.borrow_mut();
|
||||
|
||||
let entry = inner.timers.vacant_entry();
|
||||
let no = entry.key();
|
||||
|
||||
entry.insert(TimerEntry {
|
||||
|
@ -253,8 +254,12 @@ impl Timer {
|
|||
return TimerHandle(no);
|
||||
}
|
||||
|
||||
let now = slf.now(inner);
|
||||
let elapsed_time = slf.elapsed_time();
|
||||
let mut flags = self.0.flags.get();
|
||||
flags.insert(Flags::RUNNING);
|
||||
self.0.flags.set(flags);
|
||||
|
||||
let now = self.now();
|
||||
let elapsed_time = self.0.elapsed_time();
|
||||
let delta = if now >= elapsed_time {
|
||||
to_units(as_millis(now - elapsed_time) + millis)
|
||||
} else {
|
||||
|
@ -262,33 +267,24 @@ impl Timer {
|
|||
};
|
||||
|
||||
let (no, bucket_expiry) = {
|
||||
let slf = &mut *slf;
|
||||
|
||||
// crate timer entry
|
||||
let (idx, bucket_expiry) =
|
||||
slf.calc_wheel_index(slf.elapsed.wrapping_add(delta), delta);
|
||||
let entry = slf.timers.vacant_entry();
|
||||
let no = entry.key();
|
||||
let bucket = &mut slf.buckets[idx];
|
||||
let bucket_entry = bucket.add_entry(no);
|
||||
let (idx, bucket_expiry) = self
|
||||
.0
|
||||
.calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
|
||||
|
||||
entry.insert(TimerEntry {
|
||||
bucket_entry,
|
||||
bucket: Some(idx as u16),
|
||||
task: LocalWaker::new(),
|
||||
});
|
||||
slf.occupied[bucket.lvl as usize] |= bucket.bit;
|
||||
let no = self.0.inner.borrow_mut().add_entry(idx);
|
||||
(no, bucket_expiry)
|
||||
};
|
||||
|
||||
// Check whether new bucket expire earlier
|
||||
if bucket_expiry < slf.next_expiry {
|
||||
slf.next_expiry = bucket_expiry;
|
||||
if slf.flags.contains(Flags::DRIVER_STARTED) {
|
||||
slf.flags.insert(Flags::DRIVER_RECALC);
|
||||
slf.driver.wake();
|
||||
if bucket_expiry < self.0.next_expiry.get() {
|
||||
self.0.next_expiry.set(bucket_expiry);
|
||||
if flags.contains(Flags::DRIVER_STARTED) {
|
||||
flags.insert(Flags::DRIVER_RECALC);
|
||||
self.0.flags.set(flags);
|
||||
self.0.driver.wake();
|
||||
} else {
|
||||
TimerDriver::start(&mut slf, inner);
|
||||
TimerDriver::start(self.0.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -296,16 +292,15 @@ impl Timer {
|
|||
}
|
||||
|
||||
/// Update existing timer
|
||||
fn update_timer(inner: &Rc<RefCell<Self>>, hnd: usize, millis: u64) {
|
||||
let mut slf = inner.borrow_mut();
|
||||
fn update_timer(&self, hnd: usize, millis: u64) {
|
||||
if millis == 0 {
|
||||
slf.remove_timer_bucket(hnd);
|
||||
slf.timers[hnd].bucket = None;
|
||||
self.remove_timer_bucket(hnd);
|
||||
self.0.inner.borrow_mut().timers[hnd].bucket = None;
|
||||
return;
|
||||
}
|
||||
|
||||
let now = slf.now(inner);
|
||||
let elapsed_time = slf.elapsed_time();
|
||||
let now = self.now();
|
||||
let elapsed_time = self.0.elapsed_time();
|
||||
let delta = if now >= elapsed_time {
|
||||
max(to_units(as_millis(now - elapsed_time) + millis), 1)
|
||||
} else {
|
||||
|
@ -313,56 +308,63 @@ impl Timer {
|
|||
};
|
||||
|
||||
let bucket_expiry = {
|
||||
let slf = &mut *slf;
|
||||
|
||||
// calc bucket
|
||||
let (idx, bucket_expiry) =
|
||||
slf.calc_wheel_index(slf.elapsed.wrapping_add(delta), delta);
|
||||
let (idx, bucket_expiry) = self
|
||||
.0
|
||||
.calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta);
|
||||
|
||||
let entry = &mut slf.timers[hnd];
|
||||
self.0.inner.borrow_mut().update_entry(hnd, idx);
|
||||
|
||||
// cleanup active timer
|
||||
if let Some(bucket) = entry.bucket {
|
||||
// do not do anything if wheel bucket is the same
|
||||
if idx == bucket as usize {
|
||||
return;
|
||||
}
|
||||
|
||||
// remove timer entry from current bucket
|
||||
let b = &mut slf.buckets[bucket as usize];
|
||||
b.entries.remove(entry.bucket_entry);
|
||||
if b.entries.is_empty() {
|
||||
slf.occupied[b.lvl as usize] &= b.bit_n;
|
||||
}
|
||||
}
|
||||
|
||||
// put timer to new bucket
|
||||
let bucket = &mut slf.buckets[idx];
|
||||
entry.bucket = Some(idx as u16);
|
||||
entry.bucket_entry = bucket.add_entry(hnd);
|
||||
|
||||
slf.occupied[bucket.lvl as usize] |= bucket.bit;
|
||||
bucket_expiry
|
||||
};
|
||||
|
||||
// Check whether new bucket expire earlier
|
||||
if bucket_expiry < slf.next_expiry {
|
||||
slf.next_expiry = bucket_expiry;
|
||||
if slf.flags.contains(Flags::DRIVER_STARTED) {
|
||||
slf.flags.insert(Flags::DRIVER_RECALC);
|
||||
slf.driver.wake();
|
||||
if bucket_expiry < self.0.next_expiry.get() {
|
||||
self.0.next_expiry.set(bucket_expiry);
|
||||
let mut flags = self.0.flags.get();
|
||||
if flags.contains(Flags::DRIVER_STARTED) {
|
||||
flags.insert(Flags::DRIVER_RECALC);
|
||||
self.0.flags.set(flags);
|
||||
self.0.driver.wake();
|
||||
} else {
|
||||
TimerDriver::start(&mut slf, inner);
|
||||
TimerDriver::start(self.0.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_timer(&mut self, handle: usize) {
|
||||
self.remove_timer_bucket(handle);
|
||||
self.timers.remove(handle);
|
||||
fn remove_timer(&self, handle: usize) {
|
||||
self.0.inner.borrow_mut().remove_timer_bucket(handle, true)
|
||||
}
|
||||
|
||||
fn remove_timer_bucket(&mut self, handle: usize) {
|
||||
fn remove_timer_bucket(&self, handle: usize) {
|
||||
self.0.inner.borrow_mut().remove_timer_bucket(handle, false)
|
||||
}
|
||||
}
|
||||
|
||||
impl TimerMod {
|
||||
fn execute_expired_timers(&mut self, mut clk: u64) {
|
||||
for lvl in 0..LVL_DEPTH {
|
||||
let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
|
||||
let b = &mut self.buckets[idx as usize];
|
||||
if !b.entries.is_empty() {
|
||||
self.occupied[b.lvl as usize] &= b.bit_n;
|
||||
for no in b.entries.drain() {
|
||||
if let Some(timer) = self.timers.get_mut(no) {
|
||||
timer.complete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Is it time to look at the next level?
|
||||
if (clk & LVL_CLK_MASK) != 0 {
|
||||
break;
|
||||
}
|
||||
// Shift clock for the next level granularity
|
||||
clk >>= LVL_CLK_SHIFT;
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_timer_bucket(&mut self, handle: usize, remove_handle: bool) {
|
||||
let entry = &mut self.timers[handle];
|
||||
if let Some(bucket) = entry.bucket {
|
||||
let b = &mut self.buckets[bucket as usize];
|
||||
|
@ -371,16 +373,129 @@ impl Timer {
|
|||
self.occupied[b.lvl as usize] &= b.bit_n;
|
||||
}
|
||||
}
|
||||
|
||||
if remove_handle {
|
||||
self.timers.remove(handle);
|
||||
}
|
||||
}
|
||||
|
||||
fn add_entry(&mut self, idx: usize) -> usize {
|
||||
let entry = self.timers.vacant_entry();
|
||||
let no = entry.key();
|
||||
let bucket = &mut self.buckets[idx];
|
||||
let bucket_entry = bucket.add_entry(no);
|
||||
|
||||
entry.insert(TimerEntry {
|
||||
bucket_entry,
|
||||
bucket: Some(idx as u16),
|
||||
task: LocalWaker::new(),
|
||||
});
|
||||
self.occupied[bucket.lvl as usize] |= bucket.bit;
|
||||
|
||||
no
|
||||
}
|
||||
|
||||
fn update_entry(&mut self, hnd: usize, idx: usize) {
|
||||
let entry = &mut self.timers[hnd];
|
||||
|
||||
// cleanup active timer
|
||||
if let Some(bucket) = entry.bucket {
|
||||
// do not do anything if wheel bucket is the same
|
||||
if idx == bucket as usize {
|
||||
return;
|
||||
}
|
||||
|
||||
// remove timer entry from current bucket
|
||||
let b = &mut self.buckets[bucket as usize];
|
||||
b.entries.remove(entry.bucket_entry);
|
||||
if b.entries.is_empty() {
|
||||
self.occupied[b.lvl as usize] &= b.bit_n;
|
||||
}
|
||||
}
|
||||
|
||||
// put timer to new bucket
|
||||
let bucket = &mut self.buckets[idx];
|
||||
entry.bucket = Some(idx as u16);
|
||||
entry.bucket_entry = bucket.add_entry(hnd);
|
||||
|
||||
self.occupied[bucket.lvl as usize] |= bucket.bit;
|
||||
}
|
||||
}
|
||||
|
||||
impl TimerInner {
|
||||
fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
|
||||
if delta < lvl_start(1) {
|
||||
Self::calc_index(expires, 0)
|
||||
} else if delta < lvl_start(2) {
|
||||
Self::calc_index(expires, 1)
|
||||
} else if delta < lvl_start(3) {
|
||||
Self::calc_index(expires, 2)
|
||||
} else if delta < lvl_start(4) {
|
||||
Self::calc_index(expires, 3)
|
||||
} else if delta < lvl_start(5) {
|
||||
Self::calc_index(expires, 4)
|
||||
} else if delta < lvl_start(6) {
|
||||
Self::calc_index(expires, 5)
|
||||
} else if delta < lvl_start(7) {
|
||||
Self::calc_index(expires, 6)
|
||||
} else if delta < lvl_start(8) {
|
||||
Self::calc_index(expires, 7)
|
||||
} else {
|
||||
// Force expire obscene large timeouts to expire at the
|
||||
// capacity limit of the wheel.
|
||||
if delta >= WHEEL_TIMEOUT_CUTOFF {
|
||||
Self::calc_index(
|
||||
self.elapsed.get().wrapping_add(WHEEL_TIMEOUT_MAX),
|
||||
LVL_DEPTH - 1,
|
||||
)
|
||||
} else {
|
||||
Self::calc_index(expires, LVL_DEPTH - 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to calculate the bucket index and bucket expiration
|
||||
fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
|
||||
// The timer wheel has to guarantee that a timer does not fire
|
||||
// early. Early expiry can happen due to:
|
||||
// - Timer is armed at the edge of a tick
|
||||
// - Truncation of the expiry time in the outer wheel levels
|
||||
//
|
||||
// Round up with level granularity to prevent this.
|
||||
|
||||
let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
|
||||
(
|
||||
(lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
|
||||
expires << lvl_shift(lvl),
|
||||
)
|
||||
}
|
||||
|
||||
fn elapsed_time(&self) -> Instant {
|
||||
if let Some(elapsed_time) = self.elapsed_time.get() {
|
||||
elapsed_time
|
||||
} else {
|
||||
let elapsed_time = Instant::now();
|
||||
self.elapsed_time.set(Some(elapsed_time));
|
||||
elapsed_time
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_expired_timers(&self) {
|
||||
self.inner
|
||||
.borrow_mut()
|
||||
.execute_expired_timers(self.next_expiry.get());
|
||||
}
|
||||
|
||||
/// Find next expiration bucket
|
||||
fn next_pending_bucket(&mut self) -> Option<u64> {
|
||||
let mut clk = self.elapsed;
|
||||
fn next_pending_bucket(&self) -> Option<u64> {
|
||||
let inner = self.inner.borrow_mut();
|
||||
|
||||
let mut clk = self.elapsed.get();
|
||||
let mut next = u64::MAX;
|
||||
|
||||
for lvl in 0..LVL_DEPTH {
|
||||
let lvl_clk = clk & LVL_CLK_MASK;
|
||||
let occupied = self.occupied[lvl as usize];
|
||||
let occupied = inner.occupied[lvl as usize];
|
||||
let pos = if occupied == 0 {
|
||||
-1
|
||||
} else {
|
||||
|
@ -415,100 +530,32 @@ impl Timer {
|
|||
}
|
||||
|
||||
/// Get next expiry time in millis
|
||||
fn next_expiry_ms(&mut self) -> u64 {
|
||||
to_millis(self.next_expiry.saturating_sub(self.elapsed))
|
||||
fn next_expiry_ms(&self) -> u64 {
|
||||
to_millis(self.next_expiry.get().saturating_sub(self.elapsed.get()))
|
||||
}
|
||||
|
||||
fn execute_expired_timers(&mut self) {
|
||||
let mut clk = self.next_expiry;
|
||||
|
||||
for lvl in 0..LVL_DEPTH {
|
||||
let idx = (clk & LVL_MASK) + lvl * LVL_SIZE;
|
||||
let b = &mut self.buckets[idx as usize];
|
||||
if !b.entries.is_empty() {
|
||||
self.occupied[b.lvl as usize] &= b.bit_n;
|
||||
fn stop_wheel(&self) {
|
||||
// mark all timers as elapsed
|
||||
if let Ok(mut inner) = self.inner.try_borrow_mut() {
|
||||
let mut buckets = mem::take(&mut inner.buckets);
|
||||
for b in &mut buckets {
|
||||
for no in b.entries.drain() {
|
||||
if let Some(timer) = self.timers.get_mut(no) {
|
||||
timer.complete();
|
||||
}
|
||||
inner.timers[no].bucket = None;
|
||||
}
|
||||
}
|
||||
|
||||
// Is it time to look at the next level?
|
||||
if (clk & LVL_CLK_MASK) != 0 {
|
||||
break;
|
||||
}
|
||||
// Shift clock for the next level granularity
|
||||
clk >>= LVL_CLK_SHIFT;
|
||||
// cleanup info
|
||||
self.flags.set(Flags::empty());
|
||||
self.next_expiry.set(u64::MAX);
|
||||
self.elapsed.set(0);
|
||||
self.elapsed_time.set(None);
|
||||
self.lowres_time.set(None);
|
||||
self.lowres_stime.set(None);
|
||||
|
||||
inner.buckets = buckets;
|
||||
inner.occupied = [0; WHEEL_SIZE];
|
||||
}
|
||||
}
|
||||
|
||||
fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) {
|
||||
if delta < lvl_start(1) {
|
||||
Self::calc_index(expires, 0)
|
||||
} else if delta < lvl_start(2) {
|
||||
Self::calc_index(expires, 1)
|
||||
} else if delta < lvl_start(3) {
|
||||
Self::calc_index(expires, 2)
|
||||
} else if delta < lvl_start(4) {
|
||||
Self::calc_index(expires, 3)
|
||||
} else if delta < lvl_start(5) {
|
||||
Self::calc_index(expires, 4)
|
||||
} else if delta < lvl_start(6) {
|
||||
Self::calc_index(expires, 5)
|
||||
} else if delta < lvl_start(7) {
|
||||
Self::calc_index(expires, 6)
|
||||
} else if delta < lvl_start(8) {
|
||||
Self::calc_index(expires, 7)
|
||||
} else {
|
||||
// Force expire obscene large timeouts to expire at the
|
||||
// capacity limit of the wheel.
|
||||
if delta >= WHEEL_TIMEOUT_CUTOFF {
|
||||
Self::calc_index(
|
||||
self.elapsed.wrapping_add(WHEEL_TIMEOUT_MAX),
|
||||
LVL_DEPTH - 1,
|
||||
)
|
||||
} else {
|
||||
Self::calc_index(expires, LVL_DEPTH - 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to calculate the bucket index and bucket expiration
|
||||
fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
|
||||
// The timer wheel has to guarantee that a timer does not fire
|
||||
// early. Early expiry can happen due to:
|
||||
// - Timer is armed at the edge of a tick
|
||||
// - Truncation of the expiry time in the outer wheel levels
|
||||
//
|
||||
// Round up with level granularity to prevent this.
|
||||
|
||||
let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
|
||||
(
|
||||
(lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
|
||||
expires << lvl_shift(lvl),
|
||||
)
|
||||
}
|
||||
|
||||
fn stop_wheel(&mut self) {
|
||||
// mark all timers as elapsed
|
||||
let mut buckets = mem::take(&mut self.buckets);
|
||||
for b in &mut buckets {
|
||||
for no in b.entries.drain() {
|
||||
self.timers[no].bucket = None;
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup info
|
||||
self.flags = Flags::empty();
|
||||
self.buckets = buckets;
|
||||
self.occupied = [0; WHEEL_SIZE];
|
||||
self.next_expiry = u64::MAX;
|
||||
self.elapsed = 0;
|
||||
self.elapsed_time = None;
|
||||
self.lowres_time = None;
|
||||
self.lowres_stime = None;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -553,22 +600,23 @@ impl TimerEntry {
|
|||
}
|
||||
}
|
||||
|
||||
struct TimerDriver(Rc<RefCell<Timer>>);
|
||||
struct TimerDriver(Rc<TimerInner>);
|
||||
|
||||
impl TimerDriver {
|
||||
fn start(slf: &mut Timer, cell: &Rc<RefCell<Timer>>) {
|
||||
slf.flags.insert(Flags::DRIVER_STARTED);
|
||||
slf.driver_sleep = Delay::new(Duration::from_millis(slf.next_expiry_ms()));
|
||||
fn start(timer: Rc<TimerInner>) {
|
||||
let mut flags = timer.flags.get();
|
||||
flags.insert(Flags::DRIVER_STARTED);
|
||||
timer.flags.set(flags);
|
||||
timer.inner.borrow_mut().driver_sleep =
|
||||
Delay::new(Duration::from_millis(timer.next_expiry_ms()));
|
||||
|
||||
crate::spawn(TimerDriver(cell.clone()));
|
||||
crate::spawn(TimerDriver(timer));
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimerDriver {
|
||||
fn drop(&mut self) {
|
||||
if let Ok(mut timer) = self.0.try_borrow_mut() {
|
||||
timer.stop_wheel();
|
||||
}
|
||||
self.0.stop_wheel();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -576,36 +624,41 @@ impl Future for TimerDriver {
|
|||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
let mut inner = self.0.borrow_mut();
|
||||
inner.driver.register(cx.waker());
|
||||
self.0.driver.register(cx.waker());
|
||||
|
||||
let mut flags = self.0.flags.get();
|
||||
if flags.contains(Flags::DRIVER_RECALC) {
|
||||
flags.remove(Flags::DRIVER_RECALC);
|
||||
self.0.flags.set(flags);
|
||||
|
||||
if inner.flags.contains(Flags::DRIVER_RECALC) {
|
||||
inner.flags.remove(Flags::DRIVER_RECALC);
|
||||
let now = Instant::now();
|
||||
let deadline =
|
||||
if let Some(diff) = now.checked_duration_since(inner.elapsed_time()) {
|
||||
Duration::from_millis(inner.next_expiry_ms()).saturating_sub(diff)
|
||||
if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) {
|
||||
Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff)
|
||||
} else {
|
||||
Duration::from_millis(inner.next_expiry_ms())
|
||||
Duration::from_millis(self.0.next_expiry_ms())
|
||||
};
|
||||
inner.driver_sleep.reset(deadline);
|
||||
self.0.inner.borrow_mut().driver_sleep.reset(deadline);
|
||||
}
|
||||
|
||||
loop {
|
||||
if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() {
|
||||
if Pin::new(&mut self.0.inner.borrow_mut().driver_sleep)
|
||||
.poll(cx)
|
||||
.is_ready()
|
||||
{
|
||||
let now = Instant::now();
|
||||
inner.elapsed = inner.next_expiry;
|
||||
inner.elapsed_time = Some(now);
|
||||
inner.execute_expired_timers();
|
||||
self.0.elapsed.set(self.0.next_expiry.get());
|
||||
self.0.elapsed_time.set(Some(now));
|
||||
self.0.execute_expired_timers();
|
||||
|
||||
if let Some(next_expiry) = inner.next_pending_bucket() {
|
||||
inner.next_expiry = next_expiry;
|
||||
let dur = Duration::from_millis(inner.next_expiry_ms());
|
||||
inner.driver_sleep.reset(dur);
|
||||
if let Some(next_expiry) = self.0.next_pending_bucket() {
|
||||
self.0.next_expiry.set(next_expiry);
|
||||
let dur = Duration::from_millis(self.0.next_expiry_ms());
|
||||
self.0.inner.borrow_mut().driver_sleep.reset(dur);
|
||||
continue;
|
||||
} else {
|
||||
inner.next_expiry = u64::MAX;
|
||||
inner.elapsed_time = None;
|
||||
self.0.next_expiry.set(u64::MAX);
|
||||
self.0.elapsed_time.set(None);
|
||||
}
|
||||
}
|
||||
return Poll::Pending;
|
||||
|
@ -613,22 +666,22 @@ impl Future for TimerDriver {
|
|||
}
|
||||
}
|
||||
|
||||
struct LowresTimerDriver(Rc<RefCell<Timer>>);
|
||||
struct LowresTimerDriver(Rc<TimerInner>);
|
||||
|
||||
impl LowresTimerDriver {
|
||||
fn start(slf: &mut Timer, cell: &Rc<RefCell<Timer>>) {
|
||||
slf.flags.insert(Flags::LOWRES_DRIVER);
|
||||
slf.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
|
||||
fn start(timer: Rc<TimerInner>) {
|
||||
let mut flags = timer.flags.get();
|
||||
flags.insert(Flags::LOWRES_DRIVER);
|
||||
timer.flags.set(flags);
|
||||
timer.inner.borrow_mut().lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
|
||||
|
||||
crate::spawn(LowresTimerDriver(cell.clone()));
|
||||
crate::spawn(LowresTimerDriver(timer));
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LowresTimerDriver {
|
||||
fn drop(&mut self) {
|
||||
if let Ok(mut timer) = self.0.try_borrow_mut() {
|
||||
timer.stop_wheel();
|
||||
}
|
||||
self.0.stop_wheel();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -636,22 +689,29 @@ impl Future for LowresTimerDriver {
|
|||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
let mut inner = self.0.borrow_mut();
|
||||
inner.lowres_driver.register(cx.waker());
|
||||
self.0.lowres_driver.register(cx.waker());
|
||||
|
||||
loop {
|
||||
if inner.flags.contains(Flags::LOWRES_TIMER) {
|
||||
if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() {
|
||||
inner.lowres_time = None;
|
||||
inner.lowres_stime = None;
|
||||
inner.flags.remove(Flags::LOWRES_TIMER);
|
||||
}
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
inner.flags.insert(Flags::LOWRES_TIMER);
|
||||
inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION);
|
||||
}
|
||||
let mut flags = self.0.flags.get();
|
||||
if !flags.contains(Flags::LOWRES_TIMER) {
|
||||
flags.insert(Flags::LOWRES_TIMER);
|
||||
self.0.flags.set(flags);
|
||||
self.0
|
||||
.inner
|
||||
.borrow_mut()
|
||||
.lowres_driver_sleep
|
||||
.reset(LOWRES_RESOLUTION);
|
||||
}
|
||||
|
||||
if Pin::new(&mut self.0.inner.borrow_mut().lowres_driver_sleep)
|
||||
.poll(cx)
|
||||
.is_ready()
|
||||
{
|
||||
self.0.lowres_time.set(None);
|
||||
self.0.lowres_stime.set(None);
|
||||
flags.remove(Flags::LOWRES_TIMER);
|
||||
self.0.flags.set(flags);
|
||||
}
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue