Cleanup timer wheel on driver drop

This commit is contained in:
Nikolay Kim 2021-08-29 03:18:41 +06:00
parent 05c0f51ad4
commit 2b35f5d980
3 changed files with 120 additions and 37 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.4.0-b.5] - 2021-08-28
* Cleanup timer wheel on driver drop
## [0.4.0-b.4] - 2021-08-28 ## [0.4.0-b.4] - 2021-08-28
* Reduce timer resolution * Reduce timer resolution

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.4.0-b.4" version = "0.4.0-b.5"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"

View file

@ -2,7 +2,9 @@
//! //!
//! Inspired by linux kernel timers system //! Inspired by linux kernel timers system
#![allow(arithmetic_overflow)] #![allow(arithmetic_overflow)]
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task, task::Poll, time}; use std::{
cell::RefCell, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll, time,
};
use slab::Slab; use slab::Slab;
@ -147,15 +149,8 @@ impl Timer {
impl TimerInner { impl TimerInner {
fn new() -> Self { fn new() -> Self {
let mut buckets = Vec::with_capacity(WHEEL_SIZE);
for idx in 0..WHEEL_SIZE {
let lvl = idx / (LVL_SIZE as usize);
let offs = idx % (LVL_SIZE as usize);
buckets.push(Bucket::new(lvl, offs))
}
TimerInner { TimerInner {
buckets, buckets: Self::create_buckets(),
timers: Slab::default(), timers: Slab::default(),
elapsed: 0, elapsed: 0,
elapsed_instant: time::Instant::now(), elapsed_instant: time::Instant::now(),
@ -166,9 +161,32 @@ impl TimerInner {
} }
} }
fn create_buckets() -> Vec<Bucket> {
let mut buckets = Vec::with_capacity(WHEEL_SIZE);
for idx in 0..WHEEL_SIZE {
let lvl = idx / (LVL_SIZE as usize);
let offs = idx % (LVL_SIZE as usize);
buckets.push(Bucket::new(lvl, offs))
}
buckets
}
// Add the timer into the hash bucket // Add the timer into the hash bucket
fn add_timer(inner: &Rc<RefCell<Self>>, millis: u64) -> TimerHandle { fn add_timer(inner: &Rc<RefCell<Self>>, millis: u64) -> TimerHandle {
let mut slf = inner.borrow_mut(); let mut slf = inner.borrow_mut();
if millis == 0 {
let entry = slf.timers.vacant_entry();
let no = entry.key();
entry.insert(TimerEntry {
bucket_entry: 0,
bucket: 0,
task: LocalWaker::new(),
flags: TimerEntryFlags::ELAPSED,
});
return TimerHandle(no);
}
let delta = to_units( let delta = to_units(
(time::Instant::now() + time::Duration::from_millis(millis) (time::Instant::now() + time::Duration::from_millis(millis)
- slf.elapsed_instant) - slf.elapsed_instant)
@ -213,6 +231,11 @@ impl TimerInner {
fn update_timer(inner: &Rc<RefCell<Self>>, hnd: usize, millis: u64) { fn update_timer(inner: &Rc<RefCell<Self>>, hnd: usize, millis: u64) {
let mut slf = inner.borrow_mut(); let mut slf = inner.borrow_mut();
if millis == 0 {
slf.timers[hnd].flags = TimerEntryFlags::ELAPSED;
return;
}
let delta = to_units( let delta = to_units(
(time::Instant::now() + time::Duration::from_millis(millis) (time::Instant::now() + time::Duration::from_millis(millis)
- slf.elapsed_instant) - slf.elapsed_instant)
@ -449,12 +472,9 @@ impl TimerEntry {
} }
} }
pin_project_lite::pin_project! { struct TimerDriver {
struct TimerDriver {
inner: Rc<RefCell<TimerInner>>, inner: Rc<RefCell<TimerInner>>,
#[pin] sleep: Pin<Box<Sleep>>,
sleep: Sleep,
}
} }
impl TimerDriver { impl TimerDriver {
@ -464,46 +484,105 @@ impl TimerDriver {
crate::rt::spawn(TimerDriver { crate::rt::spawn(TimerDriver {
inner: cell.clone(), inner: cell.clone(),
sleep: sleep_until(inner.next_expiry()), sleep: Box::pin(sleep_until(inner.next_expiry())),
}); });
} }
} }
impl Drop for TimerDriver {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
// mark all old timers as elapsed
let mut buckets = mem::take(&mut inner.buckets);
for b in &mut buckets {
for no in b.entries.drain() {
inner.timers[no].flags.insert(TimerEntryFlags::ELAPSED);
}
}
// cleanup info
inner.flags = Flags::empty();
inner.buckets = buckets;
inner.occupied = [0; WHEEL_SIZE];
}
}
impl Future for TimerDriver { impl Future for TimerDriver {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut inner = self.inner.borrow_mut();
let mut inner = this.inner.borrow_mut();
inner.driver.register(cx.waker()); inner.driver.register(cx.waker());
if inner.flags.contains(Flags::NEEDS_RECALC) { if inner.flags.contains(Flags::NEEDS_RECALC) {
inner.flags.remove(Flags::NEEDS_RECALC); inner.flags.remove(Flags::NEEDS_RECALC);
inner.flags.insert(Flags::TIMER_ACTIVE); inner.flags.insert(Flags::TIMER_ACTIVE);
this.sleep.reset(inner.next_expiry()); let exp = inner.next_expiry();
drop(inner); drop(inner);
Pin::as_mut(&mut self.sleep).reset(exp);
return self.poll(cx); return self.poll(cx);
} else if inner.flags.contains(Flags::TIMER_ACTIVE) } else if inner.flags.contains(Flags::TIMER_ACTIVE) {
&& this.sleep.poll(cx).is_ready()
{
drop(inner); drop(inner);
this = self.as_mut().project(); let result = Pin::as_mut(&mut self.sleep).poll(cx).is_ready();
let mut inner = this.inner.borrow_mut(); if result {
let instant = this.sleep.deadline(); let instant = self.sleep.deadline();
let mut inner = self.inner.borrow_mut();
inner.execute_expired_timers(instant); inner.execute_expired_timers(instant);
if let Some(next_expiry) = inner.next_pending_bucket() { if let Some(next_expiry) = inner.next_pending_bucket() {
inner.next_expiry = next_expiry; inner.next_expiry = next_expiry;
inner.flags.insert(Flags::TIMER_ACTIVE); inner.flags.insert(Flags::TIMER_ACTIVE);
this.sleep.reset(inner.next_expiry()); let exp = inner.next_expiry();
drop(inner); drop(inner);
Pin::as_mut(&mut self.sleep).reset(exp);
return self.poll(cx); return self.poll(cx);
} else { } else {
inner.next_expiry = u64::MAX; inner.next_expiry = u64::MAX;
inner.flags.remove(Flags::TIMER_ACTIVE); inner.flags.remove(Flags::TIMER_ACTIVE);
} }
} }
}
Poll::Pending Poll::Pending
} }
} }
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use crate::time::*;
#[crate::rt_test]
async fn test_timer() {
crate::rt::spawn(async {
let s = interval(25);
loop {
s.tick().await;
}
});
let time = Instant::now();
let fut1 = sleep(Millis(1000));
let fut2 = sleep(Millis(200));
fut2.await;
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(250)
);
fut1.await;
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(1000)
&& elapsed < Duration::from_millis(1200)
);
let time = Instant::now();
sleep(Millis(25)).await;
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(45)
);
}
}