diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index eb7a632f..447f8b98 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.0-b.5] - 2021-08-28 + +* Cleanup timer wheel on driver drop + ## [0.4.0-b.4] - 2021-08-28 * Reduce timer resolution diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index e75c6a41..ef239a5e 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.4.0-b.4" +version = "0.4.0-b.5" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" diff --git a/ntex/src/time/wheel.rs b/ntex/src/time/wheel.rs index 18aed430..1d4489f8 100644 --- a/ntex/src/time/wheel.rs +++ b/ntex/src/time/wheel.rs @@ -2,7 +2,9 @@ //! //! Inspired by linux kernel timers system #![allow(arithmetic_overflow)] -use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task, task::Poll, time}; +use std::{ + cell::RefCell, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll, time, +}; use slab::Slab; @@ -147,15 +149,8 @@ impl Timer { impl TimerInner { fn new() -> Self { - let mut buckets = Vec::with_capacity(WHEEL_SIZE); - for idx in 0..WHEEL_SIZE { - let lvl = idx / (LVL_SIZE as usize); - let offs = idx % (LVL_SIZE as usize); - buckets.push(Bucket::new(lvl, offs)) - } - TimerInner { - buckets, + buckets: Self::create_buckets(), timers: Slab::default(), elapsed: 0, elapsed_instant: time::Instant::now(), @@ -166,9 +161,32 @@ impl TimerInner { } } + fn create_buckets() -> Vec { + let mut buckets = Vec::with_capacity(WHEEL_SIZE); + for idx in 0..WHEEL_SIZE { + let lvl = idx / (LVL_SIZE as usize); + let offs = idx % (LVL_SIZE as usize); + buckets.push(Bucket::new(lvl, offs)) + } + buckets + } + // Add the timer into the hash bucket fn add_timer(inner: &Rc>, millis: u64) -> TimerHandle { let mut slf = inner.borrow_mut(); + if millis == 0 { + let entry = slf.timers.vacant_entry(); + let no = entry.key(); + + entry.insert(TimerEntry { + bucket_entry: 0, + bucket: 0, + task: LocalWaker::new(), + flags: TimerEntryFlags::ELAPSED, + }); + return TimerHandle(no); + } + let delta = to_units( (time::Instant::now() + time::Duration::from_millis(millis) - slf.elapsed_instant) @@ -213,6 +231,11 @@ impl TimerInner { fn update_timer(inner: &Rc>, hnd: usize, millis: u64) { let mut slf = inner.borrow_mut(); + if millis == 0 { + slf.timers[hnd].flags = TimerEntryFlags::ELAPSED; + return; + } + let delta = to_units( (time::Instant::now() + time::Duration::from_millis(millis) - slf.elapsed_instant) @@ -449,12 +472,9 @@ impl TimerEntry { } } -pin_project_lite::pin_project! { - struct TimerDriver { - inner: Rc>, - #[pin] - sleep: Sleep, - } +struct TimerDriver { + inner: Rc>, + sleep: Pin>, } impl TimerDriver { @@ -464,46 +484,105 @@ impl TimerDriver { crate::rt::spawn(TimerDriver { inner: cell.clone(), - sleep: sleep_until(inner.next_expiry()), + sleep: Box::pin(sleep_until(inner.next_expiry())), }); } } +impl Drop for TimerDriver { + fn drop(&mut self) { + let mut inner = self.inner.borrow_mut(); + + // mark all old timers as elapsed + let mut buckets = mem::take(&mut inner.buckets); + for b in &mut buckets { + for no in b.entries.drain() { + inner.timers[no].flags.insert(TimerEntryFlags::ELAPSED); + } + } + + // cleanup info + inner.flags = Flags::empty(); + inner.buckets = buckets; + inner.occupied = [0; WHEEL_SIZE]; + } +} + impl Future for TimerDriver { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - let mut inner = this.inner.borrow_mut(); + let mut inner = self.inner.borrow_mut(); inner.driver.register(cx.waker()); if inner.flags.contains(Flags::NEEDS_RECALC) { inner.flags.remove(Flags::NEEDS_RECALC); inner.flags.insert(Flags::TIMER_ACTIVE); - this.sleep.reset(inner.next_expiry()); + let exp = inner.next_expiry(); drop(inner); + Pin::as_mut(&mut self.sleep).reset(exp); return self.poll(cx); - } else if inner.flags.contains(Flags::TIMER_ACTIVE) - && this.sleep.poll(cx).is_ready() - { + } else if inner.flags.contains(Flags::TIMER_ACTIVE) { drop(inner); - this = self.as_mut().project(); - let mut inner = this.inner.borrow_mut(); - let instant = this.sleep.deadline(); - inner.execute_expired_timers(instant); + let result = Pin::as_mut(&mut self.sleep).poll(cx).is_ready(); + if result { + let instant = self.sleep.deadline(); + let mut inner = self.inner.borrow_mut(); + inner.execute_expired_timers(instant); - if let Some(next_expiry) = inner.next_pending_bucket() { - inner.next_expiry = next_expiry; - inner.flags.insert(Flags::TIMER_ACTIVE); - this.sleep.reset(inner.next_expiry()); - drop(inner); - return self.poll(cx); - } else { - inner.next_expiry = u64::MAX; - inner.flags.remove(Flags::TIMER_ACTIVE); + if let Some(next_expiry) = inner.next_pending_bucket() { + inner.next_expiry = next_expiry; + inner.flags.insert(Flags::TIMER_ACTIVE); + let exp = inner.next_expiry(); + drop(inner); + Pin::as_mut(&mut self.sleep).reset(exp); + return self.poll(cx); + } else { + inner.next_expiry = u64::MAX; + inner.flags.remove(Flags::TIMER_ACTIVE); + } } } - Poll::Pending } } + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use crate::time::*; + + #[crate::rt_test] + async fn test_timer() { + crate::rt::spawn(async { + let s = interval(25); + loop { + s.tick().await; + } + }); + let time = Instant::now(); + let fut1 = sleep(Millis(1000)); + let fut2 = sleep(Millis(200)); + + fut2.await; + let elapsed = Instant::now() - time; + assert!( + elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(250) + ); + + fut1.await; + let elapsed = Instant::now() - time; + assert!( + elapsed > Duration::from_millis(1000) + && elapsed < Duration::from_millis(1200) + ); + + let time = Instant::now(); + sleep(Millis(25)).await; + let elapsed = Instant::now() - time; + assert!( + elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(45) + ); + } +}