Fix update timer wheel bucket calculation

This commit is contained in:
Nikolay Kim 2021-09-10 08:57:36 +06:00
parent f5d3034e09
commit a30acbd330
4 changed files with 55 additions and 35 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.0-b.13] - 2021-09-12
* Fix update timer wheel bucket calculation
## [0.4.0-b.12] - 2021-09-07
* Fix race in low res timer

View file

@ -782,7 +782,7 @@ mod tests {
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
sleep(Millis(3000)).await;
sleep(Millis(3500)).await;
// write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags();

View file

@ -256,7 +256,7 @@ mod tests {
let elapsed = time::Instant::now() - time;
assert!(
elapsed > time::Duration::from_millis(200)
&& elapsed < time::Duration::from_millis(300),
&& elapsed < time::Duration::from_millis(450),
"elapsed: {:?}",
elapsed
);
@ -266,9 +266,26 @@ mod tests {
let elapsed = time::Instant::now() - time;
assert!(
elapsed > time::Duration::from_millis(200)
&& elapsed < time::Duration::from_millis(300),
&& elapsed < time::Duration::from_millis(450),
"elapsed: {:?}",
elapsed
);
}
#[crate::rt_test]
async fn test_interval_one_sec() {
let int = interval(Millis::ONE_SEC);
for _i in 0..3 {
let time = time::Instant::now();
int.tick().await;
let elapsed = time::Instant::now() - time;
assert!(
elapsed > time::Duration::from_millis(1000)
&& elapsed < time::Duration::from_millis(1200),
"elapsed: {:?}",
elapsed
);
}
}
}

View file

@ -4,7 +4,7 @@
#![allow(arithmetic_overflow)]
use std::cell::{Cell, RefCell};
use std::time::{Duration, Instant, SystemTime};
use std::{future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
use std::{cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
use slab::Slab;
@ -15,9 +15,11 @@ use crate::task::LocalWaker;
const LVL_CLK_SHIFT: u64 = 3;
const LVL_CLK_DIV: u64 = 1 << LVL_CLK_SHIFT;
const LVL_CLK_MASK: u64 = LVL_CLK_DIV - 1;
const fn lvl_shift(n: u64) -> u64 {
n * LVL_CLK_SHIFT
}
const fn lvl_gran(n: u64) -> u64 {
1 << lvl_shift(n)
}
@ -67,7 +69,7 @@ const fn as_millis(dur: Duration) -> u64 {
/// Returns an instant corresponding to “now”.
///
/// Resolution is ~5ms
/// Resolution is 5ms
#[inline]
pub fn now() -> Instant {
TIMER.with(|t| t.borrow_mut().now(t))
@ -75,7 +77,7 @@ pub fn now() -> Instant {
/// Returns the system time corresponding to “now”.
///
/// Resolution is ~5ms
/// Resolution is 5ms
#[inline]
pub fn system_time() -> SystemTime {
TIMER.with(|t| t.borrow_mut().system_time(t))
@ -276,15 +278,16 @@ impl Timer {
let mut slf = inner.borrow_mut();
if millis == 0 {
slf.timers[hnd].flags = TimerEntryFlags::ELAPSED;
slf.remove_timer_bucket(hnd);
return;
}
let now = slf.now(inner);
let delta = if now >= slf.elapsed_time {
to_units(as_millis(now - slf.elapsed_time) + millis)
max(to_units(as_millis(now - slf.elapsed_time) + millis), 1)
} else {
slf.lowres_time.set(Some(slf.elapsed_time));
to_units(millis)
max(to_units(millis), 1)
};
let bucket_expiry = {
@ -295,27 +298,27 @@ impl Timer {
let entry = &mut slf.timers[hnd];
// do not do anything if bucket is the same
if idx == entry.bucket as usize {
return;
}
// remove timer entry from current bucket
// cleanup active timer
if !entry.flags.contains(TimerEntryFlags::ELAPSED) {
// do not do anything if wheel bucket is the same
if idx == entry.bucket as usize {
return;
}
// remove timer entry from current bucket
let b = &mut slf.buckets[entry.bucket as usize];
b.entries.remove(entry.bucket_entry);
if b.entries.is_empty() {
slf.occupied[b.lvl as usize] &= b.bit_n;
}
} else {
entry.flags = TimerEntryFlags::empty();
}
// put timer to new bucket
let bucket = &mut slf.buckets[idx];
let bucket_entry = bucket.add_entry(hnd);
entry.bucket = idx as u16;
entry.bucket_entry = bucket_entry;
entry.flags = TimerEntryFlags::empty();
entry.bucket_entry = bucket.add_entry(hnd);
slf.occupied[bucket.lvl as usize] |= bucket.bit;
bucket_expiry
@ -334,8 +337,12 @@ impl Timer {
}
fn remove_timer(&mut self, handle: usize) {
let entry = self.timers.remove(handle);
self.remove_timer_bucket(handle);
self.timers.remove(handle);
}
fn remove_timer_bucket(&mut self, handle: usize) {
let entry = &mut self.timers[handle];
if !entry.flags.contains(TimerEntryFlags::ELAPSED) {
let b = &mut self.buckets[entry.bucket as usize];
b.entries.remove(entry.bucket_entry);
@ -445,7 +452,7 @@ impl Timer {
}
/// Helper function to calculate the bucket index and bucket expiration
fn calc_index(expires2: u64, lvl: u64) -> (usize, u64) {
fn calc_index(expires: u64, lvl: u64) -> (usize, u64) {
// The timer wheel has to guarantee that a timer does not fire
// early. Early expiry can happen due to:
// - Timer is armed at the edge of a tick
@ -453,7 +460,7 @@ impl Timer {
//
// Round up with level granularity to prevent this.
let expires = (expires2 + lvl_gran(lvl)) >> lvl_shift(lvl);
let expires = (expires + lvl_gran(lvl)) >> lvl_shift(lvl);
(
(lvl_offs(lvl) + (expires & LVL_MASK)) as usize,
expires << lvl_shift(lvl),
@ -554,7 +561,6 @@ impl Future for TimerDriver {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut counter = 0;
let mut inner = self.0.borrow_mut();
inner.driver.register(cx.waker());
@ -566,18 +572,8 @@ impl Future for TimerDriver {
}
loop {
counter += 1;
if Pin::as_mut(&mut inner.driver_sleep).poll(cx).is_ready() {
let now = inner.driver_sleep.deadline();
if counter > 3 {
log::warn!(
"Nested timer call: {:?}, elapsed: {:?} now: {:?}",
counter,
inner.elapsed_time,
now
);
}
inner.elapsed = inner.next_expiry;
inner.elapsed_time = now;
inner.execute_expired_timers();
@ -646,6 +642,7 @@ mod tests {
use super::*;
use crate::time::*;
#[cfg(not(target_os = "macos"))]
#[crate::rt_test]
async fn test_timer() {
crate::rt::spawn(async {
@ -661,7 +658,7 @@ mod tests {
fut2.await;
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(270),
elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(250),
"elapsed: {:?}",
elapsed
);
@ -670,14 +667,16 @@ mod tests {
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(1000)
&& elapsed < Duration::from_millis(1250)
&& elapsed < Duration::from_millis(1200),
"elapsed: {:?}",
elapsed
);
let time = Instant::now();
sleep(Millis(25)).await;
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(45)
elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(40)
);
}
}