mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
add time::now() and time::system_time(), remove LowResTime and SystemTime services
This commit is contained in:
parent
d3a4b65cf5
commit
4012bb9352
9 changed files with 197 additions and 285 deletions
|
@ -1,5 +1,13 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.4.0-b.8] - 2021-09-01
|
||||||
|
|
||||||
|
* Add `ntex::time::now()` helper, returns low res time.
|
||||||
|
|
||||||
|
* Add `ntex::time::system_time()` helper, returns low res system time.
|
||||||
|
|
||||||
|
* Removed `LowResTime` and `SystemTime` services
|
||||||
|
|
||||||
## [0.4.0-b.7] - 2021-08-31
|
## [0.4.0-b.7] - 2021-08-31
|
||||||
|
|
||||||
* Remove From<u64> for Millis impl
|
* Remove From<u64> for Millis impl
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex"
|
name = "ntex"
|
||||||
version = "0.4.0-b.7"
|
version = "0.4.0-b.8"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Framework for composable network services"
|
description = "Framework for composable network services"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Instant};
|
use std::{cell::RefCell, collections::BTreeMap, rc::Rc, time::Instant};
|
||||||
|
|
||||||
use crate::framed::State;
|
use crate::framed::State;
|
||||||
use crate::time::{sleep, Millis};
|
use crate::time::{now, sleep, Millis};
|
||||||
use crate::util::HashSet;
|
use crate::util::HashSet;
|
||||||
|
|
||||||
pub struct Timer(Rc<RefCell<Inner>>);
|
pub struct Timer(Rc<RefCell<Inner>>);
|
||||||
|
@ -75,11 +75,11 @@ impl Timer {
|
||||||
if let Some(cur) = cur {
|
if let Some(cur) = cur {
|
||||||
cur
|
cur
|
||||||
} else {
|
} else {
|
||||||
let now = Instant::now();
|
let now_val = now();
|
||||||
let inner = self.0.clone();
|
let inner = self.0.clone();
|
||||||
let interval = {
|
let interval = {
|
||||||
let mut b = inner.borrow_mut();
|
let mut b = inner.borrow_mut();
|
||||||
b.current = Some(now);
|
b.current = Some(now_val);
|
||||||
b.resolution
|
b.resolution
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ impl Timer {
|
||||||
sleep(interval).await;
|
sleep(interval).await;
|
||||||
let empty = {
|
let empty = {
|
||||||
let mut i = inner.borrow_mut();
|
let mut i = inner.borrow_mut();
|
||||||
let now = i.current.take().unwrap_or_else(Instant::now);
|
let now = i.current.take().unwrap_or_else(now);
|
||||||
|
|
||||||
// notify io dispatcher
|
// notify io dispatcher
|
||||||
while let Some(key) = i.notifications.keys().next() {
|
while let Some(key) = i.notifications.keys().next() {
|
||||||
|
@ -109,7 +109,7 @@ impl Timer {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
now
|
now_val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ use crate::http::Protocol;
|
||||||
use crate::rt::spawn;
|
use crate::rt::spawn;
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::task::LocalWaker;
|
use crate::task::LocalWaker;
|
||||||
use crate::time::{sleep, Millis, Sleep};
|
use crate::time::{now, sleep, Millis, Sleep};
|
||||||
use crate::util::{poll_fn, Bytes, HashMap};
|
use crate::util::{poll_fn, Bytes, HashMap};
|
||||||
|
|
||||||
use super::connection::{ConnectionType, IoConnection};
|
use super::connection::{ConnectionType, IoConnection};
|
||||||
|
@ -239,7 +239,7 @@ where
|
||||||
// check if open connection is available
|
// check if open connection is available
|
||||||
// cleanup stale connections at the same time
|
// cleanup stale connections at the same time
|
||||||
if let Some(ref mut connections) = self.available.get_mut(key) {
|
if let Some(ref mut connections) = self.available.get_mut(key) {
|
||||||
let now = Instant::now();
|
let now = now();
|
||||||
while let Some(conn) = connections.pop_back() {
|
while let Some(conn) = connections.pop_back() {
|
||||||
// check if it still usable
|
// check if it still usable
|
||||||
if (now - conn.used) > self.conn_keep_alive
|
if (now - conn.used) > self.conn_keep_alive
|
||||||
|
@ -279,7 +279,7 @@ where
|
||||||
.push_back(AvailableConnection {
|
.push_back(AvailableConnection {
|
||||||
io,
|
io,
|
||||||
created,
|
created,
|
||||||
used: Instant::now(),
|
used: now(),
|
||||||
});
|
});
|
||||||
self.check_availibility();
|
self.check_availibility();
|
||||||
}
|
}
|
||||||
|
@ -480,7 +480,7 @@ where
|
||||||
// h2 connection is ready
|
// h2 connection is ready
|
||||||
let conn = IoConnection::new(
|
let conn = IoConnection::new(
|
||||||
ConnectionType::H2(snd),
|
ConnectionType::H2(snd),
|
||||||
Instant::now(),
|
now(),
|
||||||
Some(this.guard.take().unwrap().consume()),
|
Some(this.guard.take().unwrap().consume()),
|
||||||
);
|
);
|
||||||
if let Err(Ok(conn)) = this.tx.take().unwrap().send(Ok(conn)) {
|
if let Err(Ok(conn)) = this.tx.take().unwrap().send(Ok(conn)) {
|
||||||
|
@ -517,7 +517,7 @@ where
|
||||||
if proto == Protocol::Http1 {
|
if proto == Protocol::Http1 {
|
||||||
let conn = IoConnection::new(
|
let conn = IoConnection::new(
|
||||||
ConnectionType::H1(io),
|
ConnectionType::H1(io),
|
||||||
Instant::now(),
|
now(),
|
||||||
Some(this.guard.take().unwrap().consume()),
|
Some(this.guard.take().unwrap().consume()),
|
||||||
);
|
);
|
||||||
if let Err(Ok(conn)) = this.tx.take().unwrap().send(Ok(conn)) {
|
if let Err(Ok(conn)) = this.tx.take().unwrap().send(Ok(conn)) {
|
||||||
|
|
|
@ -6,12 +6,12 @@ mod types;
|
||||||
mod wheel;
|
mod wheel;
|
||||||
|
|
||||||
pub use self::types::{Millis, Seconds};
|
pub use self::types::{Millis, Seconds};
|
||||||
pub use self::wheel::TimerHandle;
|
pub use self::wheel::{now, system_time, TimerHandle};
|
||||||
|
|
||||||
/// Waits until `duration` has elapsed.
|
/// Waits until `duration` has elapsed.
|
||||||
///
|
///
|
||||||
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
|
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
|
||||||
/// operates at 16.5 millisecond granularity and should not be used for tasks that
|
/// operates at 16 millisecond granularity and should not be used for tasks that
|
||||||
/// require high-resolution timers.
|
/// require high-resolution timers.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn sleep<T: Into<Millis>>(dur: T) -> Sleep {
|
pub fn sleep<T: Into<Millis>>(dur: T) -> Sleep {
|
||||||
|
@ -189,3 +189,60 @@ impl crate::Stream for Interval {
|
||||||
self.poll_tick(cx).map(|_| Some(()))
|
self.poll_tick(cx).map(|_| Some(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::time;
|
||||||
|
|
||||||
|
/// State Under Test: Two calls of `now()` return the same value if they are done within resolution interval.
|
||||||
|
///
|
||||||
|
/// Expected Behavior: Two back-to-back calls of `now()` return the same value.
|
||||||
|
#[crate::rt_test]
|
||||||
|
async fn lowres_time_does_not_immediately_change() {
|
||||||
|
assert_eq!(now(), now());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State Under Test: `now()` updates returned value every ~1ms period.
|
||||||
|
///
|
||||||
|
/// Expected Behavior: Two calls of `now()` made in subsequent resolution interval return different values
|
||||||
|
/// and second value is greater than the first one at least by a 1ms interval.
|
||||||
|
#[crate::rt_test]
|
||||||
|
async fn lowres_time_updates_after_resolution_interval() {
|
||||||
|
let first_time = now();
|
||||||
|
|
||||||
|
sleep(Millis(25)).await;
|
||||||
|
|
||||||
|
let second_time = now();
|
||||||
|
assert!(second_time - first_time >= time::Duration::from_millis(25));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State Under Test: Two calls of `system_time()` return the same value if they are done within 1ms interval.
|
||||||
|
///
|
||||||
|
/// Expected Behavior: Two back-to-back calls of `now()` return the same value.
|
||||||
|
#[crate::rt_test]
|
||||||
|
async fn system_time_service_time_does_not_immediately_change() {
|
||||||
|
assert_eq!(system_time(), system_time());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State Under Test: `system_time()` updates returned value every 1ms period.
|
||||||
|
///
|
||||||
|
/// Expected Behavior: Two calls of `system_time()` made in subsequent resolution interval return different values
|
||||||
|
/// and second value is greater than the first one at least by a resolution interval.
|
||||||
|
#[crate::rt_test]
|
||||||
|
async fn system_time_service_time_updates_after_resolution_interval() {
|
||||||
|
let wait_time = 300;
|
||||||
|
|
||||||
|
let first_time = system_time()
|
||||||
|
.duration_since(time::SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
sleep(Millis(wait_time)).await;
|
||||||
|
|
||||||
|
let second_time = system_time()
|
||||||
|
.duration_since(time::SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(second_time - first_time >= time::Duration::from_millis(wait_time));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,9 +2,8 @@
|
||||||
//!
|
//!
|
||||||
//! Inspired by linux kernel timers system
|
//! Inspired by linux kernel timers system
|
||||||
#![allow(arithmetic_overflow)]
|
#![allow(arithmetic_overflow)]
|
||||||
use std::{
|
use std::cell::{Cell, RefCell};
|
||||||
cell::RefCell, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll, time,
|
use std::{future::Future, mem, pin::Pin, rc::Rc, task, task::Poll, time};
|
||||||
};
|
|
||||||
|
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
|
@ -57,6 +56,23 @@ const fn lvl_offs(n: u64) -> u64 {
|
||||||
const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
|
const WHEEL_TIMEOUT_CUTOFF: u64 = lvl_start(LVL_DEPTH);
|
||||||
const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
|
const WHEEL_TIMEOUT_MAX: u64 = WHEEL_TIMEOUT_CUTOFF - (lvl_gran(LVL_DEPTH - 1));
|
||||||
const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
|
const WHEEL_SIZE: usize = (LVL_SIZE as usize) * (LVL_DEPTH as usize);
|
||||||
|
const ONE_MS: time::Duration = time::Duration::from_millis(1);
|
||||||
|
|
||||||
|
/// Returns an instant corresponding to “now”.
|
||||||
|
///
|
||||||
|
/// Resolution is ~1ms
|
||||||
|
#[inline]
|
||||||
|
pub fn now() -> time::Instant {
|
||||||
|
TIMER.with(|t| t.0.borrow().now())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the system time corresponding to “now”.
|
||||||
|
///
|
||||||
|
/// Resolution is ~1ms
|
||||||
|
#[inline]
|
||||||
|
pub fn system_time() -> time::SystemTime {
|
||||||
|
TIMER.with(|t| t.0.borrow().system_time())
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TimerHandle(usize);
|
pub struct TimerHandle(usize);
|
||||||
|
@ -100,6 +116,7 @@ bitflags::bitflags! {
|
||||||
const DRIVER_STARTED = 0b0000_0001;
|
const DRIVER_STARTED = 0b0000_0001;
|
||||||
const NEEDS_RECALC = 0b0000_0010;
|
const NEEDS_RECALC = 0b0000_0010;
|
||||||
const TIMER_ACTIVE = 0b0000_0100;
|
const TIMER_ACTIVE = 0b0000_0100;
|
||||||
|
const LOWRES_TIMER = 0b0000_1000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,11 +136,16 @@ struct TimerInner {
|
||||||
buckets: Vec<Bucket>,
|
buckets: Vec<Bucket>,
|
||||||
/// Bit field tracking which bucket currently contain entries.
|
/// Bit field tracking which bucket currently contain entries.
|
||||||
occupied: [u64; WHEEL_SIZE],
|
occupied: [u64; WHEEL_SIZE],
|
||||||
|
lowres_time: Cell<Option<time::Instant>>,
|
||||||
|
lowres_stime: Cell<Option<time::SystemTime>>,
|
||||||
|
lowres_driver: LocalWaker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Timer {
|
impl Timer {
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
Timer(Rc::new(RefCell::new(TimerInner::new())))
|
let inner = Rc::new(RefCell::new(TimerInner::new()));
|
||||||
|
LowresTimerDriver::start(&inner);
|
||||||
|
Timer(inner)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn with_entry<F, R>(no: usize, f: F) -> R
|
fn with_entry<F, R>(no: usize, f: F) -> R
|
||||||
|
@ -158,6 +180,9 @@ impl TimerInner {
|
||||||
flags: Flags::empty(),
|
flags: Flags::empty(),
|
||||||
driver: LocalWaker::new(),
|
driver: LocalWaker::new(),
|
||||||
occupied: [0; WHEEL_SIZE],
|
occupied: [0; WHEEL_SIZE],
|
||||||
|
lowres_time: Cell::new(None),
|
||||||
|
lowres_stime: Cell::new(None),
|
||||||
|
lowres_driver: LocalWaker::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,6 +196,30 @@ impl TimerInner {
|
||||||
buckets
|
buckets
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn now(&self) -> time::Instant {
|
||||||
|
let cur = self.lowres_time.get();
|
||||||
|
if let Some(cur) = cur {
|
||||||
|
cur
|
||||||
|
} else {
|
||||||
|
let now = time::Instant::now();
|
||||||
|
self.lowres_driver.wake();
|
||||||
|
self.lowres_time.set(Some(now));
|
||||||
|
now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn system_time(&self) -> time::SystemTime {
|
||||||
|
let cur = self.lowres_stime.get();
|
||||||
|
if let Some(cur) = cur {
|
||||||
|
cur
|
||||||
|
} else {
|
||||||
|
let now = time::SystemTime::now();
|
||||||
|
self.lowres_driver.wake();
|
||||||
|
self.lowres_stime.set(Some(now));
|
||||||
|
now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add the timer into the hash bucket
|
// Add the timer into the hash bucket
|
||||||
fn add_timer(inner: &Rc<RefCell<Self>>, millis: u64) -> TimerHandle {
|
fn add_timer(inner: &Rc<RefCell<Self>>, millis: u64) -> TimerHandle {
|
||||||
let mut slf = inner.borrow_mut();
|
let mut slf = inner.borrow_mut();
|
||||||
|
@ -188,8 +237,7 @@ impl TimerInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
let delta = to_units(
|
let delta = to_units(
|
||||||
(time::Instant::now() + time::Duration::from_millis(millis)
|
(slf.now() + time::Duration::from_millis(millis) - slf.elapsed_instant)
|
||||||
- slf.elapsed_instant)
|
|
||||||
.as_millis() as u64,
|
.as_millis() as u64,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -237,8 +285,7 @@ impl TimerInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
let delta = to_units(
|
let delta = to_units(
|
||||||
(time::Instant::now() + time::Duration::from_millis(millis)
|
(slf.now() + time::Duration::from_millis(millis) - slf.elapsed_instant)
|
||||||
- slf.elapsed_instant)
|
|
||||||
.as_millis() as u64,
|
.as_millis() as u64,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -343,9 +390,9 @@ impl TimerInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get instant of the next expiry
|
// Get instant of the next expiry
|
||||||
fn next_expiry(&mut self) -> time::Instant {
|
fn next_expiry(&self) -> time::Instant {
|
||||||
let millis = to_millis(self.next_expiry - self.elapsed);
|
let millis = to_millis(self.next_expiry - self.elapsed);
|
||||||
time::Instant::now() + time::Duration::from_millis(millis)
|
self.now() + time::Duration::from_millis(millis)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn execute_expired_timers(&mut self, instant: time::Instant) {
|
fn execute_expired_timers(&mut self, instant: time::Instant) {
|
||||||
|
@ -550,6 +597,57 @@ impl Future for TimerDriver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct LowresTimerDriver {
|
||||||
|
inner: Rc<RefCell<TimerInner>>,
|
||||||
|
sleep: Pin<Box<Sleep>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LowresTimerDriver {
|
||||||
|
fn start(cell: &Rc<RefCell<TimerInner>>) {
|
||||||
|
let mut inner = cell.borrow_mut();
|
||||||
|
inner.flags.insert(Flags::LOWRES_TIMER);
|
||||||
|
|
||||||
|
crate::rt::spawn(LowresTimerDriver {
|
||||||
|
inner: cell.clone(),
|
||||||
|
sleep: Box::pin(sleep_until(time::Instant::now() + ONE_MS)),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for LowresTimerDriver {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let mut inner = self.inner.borrow_mut();
|
||||||
|
inner.flags = Flags::empty();
|
||||||
|
inner.lowres_time.set(None);
|
||||||
|
inner.lowres_stime.set(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for LowresTimerDriver {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let mut inner = self.inner.borrow_mut();
|
||||||
|
inner.lowres_driver.register(cx.waker());
|
||||||
|
|
||||||
|
if inner.flags.contains(Flags::LOWRES_TIMER) {
|
||||||
|
drop(inner);
|
||||||
|
if Pin::as_mut(&mut self.sleep).poll(cx).is_ready() {
|
||||||
|
let mut inner = self.inner.borrow_mut();
|
||||||
|
inner.lowres_time.set(None);
|
||||||
|
inner.lowres_stime.set(None);
|
||||||
|
inner.flags.remove(Flags::LOWRES_TIMER);
|
||||||
|
}
|
||||||
|
task::Poll::Pending
|
||||||
|
} else {
|
||||||
|
inner.flags.insert(Flags::LOWRES_TIMER);
|
||||||
|
drop(inner);
|
||||||
|
Pin::as_mut(&mut self.sleep).reset(time::Instant::now() + ONE_MS);
|
||||||
|
self.poll(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
|
@ -1,18 +1,15 @@
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::{cell::Cell, convert::Infallible, marker, time::Duration, time::Instant};
|
use std::{cell::Cell, convert::Infallible, marker, time::Duration, time::Instant};
|
||||||
|
|
||||||
use crate::time::{sleep, Millis, Sleep};
|
use crate::time::{now, sleep, Millis, Sleep};
|
||||||
use crate::{util::Ready, Service, ServiceFactory};
|
use crate::{util::Ready, Service, ServiceFactory};
|
||||||
|
|
||||||
use super::time::{LowResTime, LowResTimeService};
|
|
||||||
|
|
||||||
/// KeepAlive service factory
|
/// KeepAlive service factory
|
||||||
///
|
///
|
||||||
/// Controls min time between requests.
|
/// Controls min time between requests.
|
||||||
pub struct KeepAlive<R, E, F> {
|
pub struct KeepAlive<R, E, F> {
|
||||||
f: F,
|
f: F,
|
||||||
ka: Millis,
|
ka: Millis,
|
||||||
time: LowResTime,
|
|
||||||
_t: marker::PhantomData<(R, E)>,
|
_t: marker::PhantomData<(R, E)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,10 +21,9 @@ where
|
||||||
///
|
///
|
||||||
/// ka - keep-alive timeout
|
/// ka - keep-alive timeout
|
||||||
/// err - error factory function
|
/// err - error factory function
|
||||||
pub fn new(ka: Millis, time: LowResTime, err: F) -> Self {
|
pub fn new(ka: Millis, err: F) -> Self {
|
||||||
KeepAlive {
|
KeepAlive {
|
||||||
ka,
|
ka,
|
||||||
time,
|
|
||||||
f: err,
|
f: err,
|
||||||
_t: marker::PhantomData,
|
_t: marker::PhantomData,
|
||||||
}
|
}
|
||||||
|
@ -42,7 +38,6 @@ where
|
||||||
KeepAlive {
|
KeepAlive {
|
||||||
f: self.f.clone(),
|
f: self.f.clone(),
|
||||||
ka: self.ka,
|
ka: self.ka,
|
||||||
time: self.time.clone(),
|
|
||||||
_t: marker::PhantomData,
|
_t: marker::PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,18 +56,13 @@ where
|
||||||
type Future = Ready<Self::Service, Self::InitError>;
|
type Future = Ready<Self::Service, Self::InitError>;
|
||||||
|
|
||||||
fn new_service(&self, _: ()) -> Self::Future {
|
fn new_service(&self, _: ()) -> Self::Future {
|
||||||
Ready::Ok(KeepAliveService::new(
|
Ready::Ok(KeepAliveService::new(self.ka, self.f.clone()))
|
||||||
self.ka,
|
|
||||||
self.time.timer(),
|
|
||||||
self.f.clone(),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct KeepAliveService<R, E, F> {
|
pub struct KeepAliveService<R, E, F> {
|
||||||
f: F,
|
f: F,
|
||||||
dur: Millis,
|
dur: Millis,
|
||||||
time: LowResTimeService,
|
|
||||||
sleep: Sleep,
|
sleep: Sleep,
|
||||||
expire: Cell<Instant>,
|
expire: Cell<Instant>,
|
||||||
_t: marker::PhantomData<(R, E)>,
|
_t: marker::PhantomData<(R, E)>,
|
||||||
|
@ -82,13 +72,12 @@ impl<R, E, F> KeepAliveService<R, E, F>
|
||||||
where
|
where
|
||||||
F: Fn() -> E,
|
F: Fn() -> E,
|
||||||
{
|
{
|
||||||
pub fn new(dur: Millis, time: LowResTimeService, f: F) -> Self {
|
pub fn new(dur: Millis, f: F) -> Self {
|
||||||
let expire = Cell::new(time.now() + Duration::from(dur));
|
let expire = Cell::new(now());
|
||||||
|
|
||||||
KeepAliveService {
|
KeepAliveService {
|
||||||
f,
|
f,
|
||||||
dur,
|
dur,
|
||||||
time,
|
|
||||||
expire,
|
expire,
|
||||||
sleep: sleep(dur),
|
sleep: sleep(dur),
|
||||||
_t: marker::PhantomData,
|
_t: marker::PhantomData,
|
||||||
|
@ -108,11 +97,12 @@ where
|
||||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
match self.sleep.poll_elapsed(cx) {
|
match self.sleep.poll_elapsed(cx) {
|
||||||
Poll::Ready(_) => {
|
Poll::Ready(_) => {
|
||||||
let now = self.time.now();
|
let now = now();
|
||||||
if self.expire.get() <= now {
|
let expire = self.expire.get() + Duration::from(self.dur);
|
||||||
|
if expire <= now {
|
||||||
Poll::Ready(Err((self.f)()))
|
Poll::Ready(Err((self.f)()))
|
||||||
} else {
|
} else {
|
||||||
let expire = self.expire.get() - Instant::now();
|
let expire = expire - now;
|
||||||
self.sleep.reset(Millis(expire.as_millis() as u64));
|
self.sleep.reset(Millis(expire.as_millis() as u64));
|
||||||
let _ = self.sleep.poll_elapsed(cx);
|
let _ = self.sleep.poll_elapsed(cx);
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
|
@ -123,7 +113,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&self, req: R) -> Self::Future {
|
fn call(&self, req: R) -> Self::Future {
|
||||||
self.expire.set(self.time.now() + Duration::from(self.dur));
|
self.expire.set(now());
|
||||||
Ready::Ok(req)
|
Ready::Ok(req)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,8 +129,7 @@ mod tests {
|
||||||
|
|
||||||
#[crate::rt_test]
|
#[crate::rt_test]
|
||||||
async fn test_ka() {
|
async fn test_ka() {
|
||||||
let factory =
|
let factory = KeepAlive::new(Millis(100), || TestErr);
|
||||||
KeepAlive::new(Millis(100), LowResTime::new(Millis(10)), || TestErr);
|
|
||||||
let _ = factory.clone();
|
let _ = factory.clone();
|
||||||
|
|
||||||
let service = factory.new_service(()).await.unwrap();
|
let service = factory.new_service(()).await.unwrap();
|
||||||
|
|
|
@ -5,7 +5,6 @@ pub mod inflight;
|
||||||
pub mod keepalive;
|
pub mod keepalive;
|
||||||
pub mod sink;
|
pub mod sink;
|
||||||
pub mod stream;
|
pub mod stream;
|
||||||
pub mod time;
|
|
||||||
pub mod timeout;
|
pub mod timeout;
|
||||||
pub mod variant;
|
pub mod variant;
|
||||||
|
|
||||||
|
|
|
@ -1,239 +0,0 @@
|
||||||
use std::task::{Context, Poll};
|
|
||||||
use std::time::{self, Instant};
|
|
||||||
use std::{cell::RefCell, convert::Infallible, rc::Rc};
|
|
||||||
|
|
||||||
use crate::service::{Service, ServiceFactory};
|
|
||||||
use crate::time::{sleep, Millis};
|
|
||||||
use crate::util::Ready;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct LowResTime(Rc<RefCell<Inner>>);
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Inner {
|
|
||||||
resolution: Millis,
|
|
||||||
current: Option<Instant>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Inner {
|
|
||||||
fn new(resolution: Millis) -> Self {
|
|
||||||
Inner {
|
|
||||||
resolution,
|
|
||||||
current: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LowResTime {
|
|
||||||
/// Create new timer service
|
|
||||||
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTime {
|
|
||||||
LowResTime(Rc::new(RefCell::new(Inner::new(resolution.into()))))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn timer(&self) -> LowResTimeService {
|
|
||||||
LowResTimeService(self.0.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for LowResTime {
|
|
||||||
fn default() -> Self {
|
|
||||||
LowResTime(Rc::new(RefCell::new(Inner::new(Millis(1000)))))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceFactory for LowResTime {
|
|
||||||
type Request = ();
|
|
||||||
type Response = Instant;
|
|
||||||
type Error = Infallible;
|
|
||||||
type InitError = Infallible;
|
|
||||||
type Config = ();
|
|
||||||
type Service = LowResTimeService;
|
|
||||||
type Future = Ready<Self::Service, Self::InitError>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn new_service(&self, _: ()) -> Self::Future {
|
|
||||||
Ready::Ok(self.timer())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct LowResTimeService(Rc<RefCell<Inner>>);
|
|
||||||
|
|
||||||
impl LowResTimeService {
|
|
||||||
pub fn new<T: Into<Millis>>(resolution: T) -> LowResTimeService {
|
|
||||||
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution.into()))))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get current time. This function has to be called from
|
|
||||||
/// future's poll method, otherwise it panics.
|
|
||||||
pub fn now(&self) -> Instant {
|
|
||||||
let cur = self.0.borrow().current;
|
|
||||||
if let Some(cur) = cur {
|
|
||||||
cur
|
|
||||||
} else {
|
|
||||||
let now = Instant::now();
|
|
||||||
let inner = self.0.clone();
|
|
||||||
let interval = {
|
|
||||||
let mut b = inner.borrow_mut();
|
|
||||||
b.current = Some(now);
|
|
||||||
b.resolution
|
|
||||||
};
|
|
||||||
|
|
||||||
crate::rt::spawn(async move {
|
|
||||||
sleep(interval).await;
|
|
||||||
inner.borrow_mut().current.take();
|
|
||||||
});
|
|
||||||
now
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Service for LowResTimeService {
|
|
||||||
type Request = ();
|
|
||||||
type Response = Instant;
|
|
||||||
type Error = Infallible;
|
|
||||||
type Future = Ready<Self::Response, Self::Error>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn call(&self, _: ()) -> Self::Future {
|
|
||||||
Ready::Ok(self.now())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct SystemTimeInner {
|
|
||||||
resolution: Millis,
|
|
||||||
current: Option<time::SystemTime>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SystemTimeInner {
|
|
||||||
fn new(resolution: Millis) -> Self {
|
|
||||||
SystemTimeInner {
|
|
||||||
resolution,
|
|
||||||
current: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
|
|
||||||
|
|
||||||
impl SystemTimeService {
|
|
||||||
/// Create new system time service
|
|
||||||
pub fn new<T: Into<Millis>>(resolution: T) -> SystemTimeService {
|
|
||||||
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(
|
|
||||||
resolution.into(),
|
|
||||||
))))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get current time. This function has to be called from
|
|
||||||
/// future's poll method, otherwise it panics.
|
|
||||||
pub fn now(&self) -> time::SystemTime {
|
|
||||||
let cur = self.0.borrow().current;
|
|
||||||
if let Some(cur) = cur {
|
|
||||||
cur
|
|
||||||
} else {
|
|
||||||
let now = time::SystemTime::now();
|
|
||||||
let inner = self.0.clone();
|
|
||||||
let interval = {
|
|
||||||
let mut b = inner.borrow_mut();
|
|
||||||
b.current = Some(now);
|
|
||||||
b.resolution
|
|
||||||
};
|
|
||||||
|
|
||||||
crate::rt::spawn(async move {
|
|
||||||
sleep(interval).await;
|
|
||||||
inner.borrow_mut().current.take();
|
|
||||||
});
|
|
||||||
now
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use crate::{time::sleep, util::lazy};
|
|
||||||
use std::time::{Duration, SystemTime};
|
|
||||||
|
|
||||||
#[crate::rt_test]
|
|
||||||
async fn low_res_timee() {
|
|
||||||
let f = LowResTime::default();
|
|
||||||
let srv = f.new_service(()).await.unwrap();
|
|
||||||
assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready());
|
|
||||||
srv.call(()).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
|
|
||||||
///
|
|
||||||
/// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value.
|
|
||||||
#[crate::rt_test]
|
|
||||||
async fn system_time_service_time_does_not_immediately_change() {
|
|
||||||
let resolution = Duration::from_millis(50);
|
|
||||||
|
|
||||||
let time_service = SystemTimeService::new(resolution);
|
|
||||||
assert_eq!(time_service.now(), time_service.now());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
|
|
||||||
///
|
|
||||||
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
|
|
||||||
#[crate::rt_test]
|
|
||||||
async fn lowres_time_service_time_does_not_immediately_change() {
|
|
||||||
let resolution = Duration::from_millis(50);
|
|
||||||
let time_service = LowResTimeService::new(resolution);
|
|
||||||
assert_eq!(time_service.now(), time_service.now());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// State Under Test: `SystemTimeService::now()` updates returned value every resolution period.
|
|
||||||
///
|
|
||||||
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
|
|
||||||
/// and second value is greater than the first one at least by a resolution interval.
|
|
||||||
#[crate::rt_test]
|
|
||||||
async fn system_time_service_time_updates_after_resolution_interval() {
|
|
||||||
let resolution = Duration::from_millis(100);
|
|
||||||
let wait_time = 300;
|
|
||||||
|
|
||||||
let time_service = SystemTimeService::new(resolution);
|
|
||||||
|
|
||||||
let first_time = time_service
|
|
||||||
.now()
|
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
sleep(Millis(wait_time)).await;
|
|
||||||
|
|
||||||
let second_time = time_service
|
|
||||||
.now()
|
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert!(second_time - first_time >= Duration::from_millis(wait_time));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
|
|
||||||
///
|
|
||||||
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
|
|
||||||
/// and second value is greater than the first one at least by a resolution interval.
|
|
||||||
#[crate::rt_test]
|
|
||||||
async fn lowres_time_service_time_updates_after_resolution_interval() {
|
|
||||||
let resolution = Duration::from_millis(100);
|
|
||||||
let wait_time = 300;
|
|
||||||
let time_service = LowResTimeService::new(resolution);
|
|
||||||
|
|
||||||
let first_time = time_service.now();
|
|
||||||
|
|
||||||
sleep(Millis(wait_time)).await;
|
|
||||||
|
|
||||||
let second_time = time_service.now();
|
|
||||||
assert!(second_time - first_time >= Duration::from_millis(wait_time));
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Add table
Add a link
Reference in a new issue