Refactor ntex::time (#68)

* move time to util crate

* update changes
This commit is contained in:
Nikolay Kim 2021-12-10 18:20:16 +06:00 committed by GitHub
parent 42296e9239
commit 2188d92725
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 160 additions and 81 deletions

View file

@ -66,7 +66,7 @@ jobs:
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo tarpaulin --out Xml --all --all-features -- --skip time::
cargo tarpaulin --out Xml --all --all-features
- name: Upload to Codecov
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')

View file

@ -1,5 +1,9 @@
# Changes
## [0.3.2] - 2021-12-10
* Set spawn fn to ntex-util
## [0.3.1] - 2021-08-28
* Re-export time as different module

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.3.1"
version = "0.3.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -16,6 +16,6 @@ name = "ntex_rt"
path = "src/lib.rs"
[dependencies]
ntex-util = "0.1.0"
ntex-util = "0.1.2"
pin-project-lite = "0.2"
tokio = { version = "1", default-features = false, features = ["rt", "net", "time", "signal", "sync"] }

View file

@ -87,7 +87,10 @@ impl Builder {
// start the system arbiter
let _ = local.spawn_local(arb);
AsyncSystemRunner { stop, system }
AsyncSystemRunner {
stop,
_system: system,
}
}
fn create_runtime<F>(self, f: F) -> SystemRunner
@ -99,6 +102,11 @@ impl Builder {
let rt = Runtime::new().unwrap();
// set ntex-util spawn fn
ntex_util::set_spawn_fn(|fut| {
tokio::task::spawn_local(fut);
});
// system arbiter
let system = System::construct(
sys_sender,
@ -111,14 +119,18 @@ impl Builder {
// init system arbiter and run configuration method
rt.block_on(lazy(move |_| f()));
SystemRunner { rt, stop, system }
SystemRunner {
rt,
stop,
_system: system,
}
}
}
#[derive(Debug)]
pub struct AsyncSystemRunner {
stop: Receiver<i32>,
system: System,
_system: System,
}
impl AsyncSystemRunner {
@ -152,7 +164,7 @@ impl AsyncSystemRunner {
pub struct SystemRunner {
rt: Runtime,
stop: Receiver<i32>,
system: System,
_system: System,
}
impl SystemRunner {

View file

@ -89,6 +89,11 @@ impl Runtime {
where
F: Future,
{
// set ntex-util spawn fn
ntex_util::set_spawn_fn(|fut| {
crate::spawn(fut);
});
self.local.block_on(&self.rt, f)
}
}

View file

@ -16,8 +16,8 @@ name = "ntex_service"
path = "src/lib.rs"
[dependencies]
ntex-util = "0.1.1"
ntex-util = "0.1.2"
pin-project-lite = "0.2.6"
[dev-dependencies]
ntex = "0.3.13"
ntex = "0.4.13"

View file

@ -1,5 +1,11 @@
# Changes
## [0.1.2] - 2021-12-10
* move in ntex::time utils
* replace tokio::time with futures-timer
## [0.1.1] - 2021-04-11
* next renamed to stream_recv

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "0.1.1"
version = "0.1.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -17,10 +17,17 @@ path = "src/lib.rs"
[dependencies]
bitflags = "1.2"
log = "0.4"
slab = "0.4"
futures-timer = "3.0.2"
futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.18", default-features = false, features = ["alloc"] }
pin-project-lite = "0.2.6"
backtrace = "*"
[dev-dependencies]
ntex = "0.3.14"
ntex = "0.4.10"
ntex-rt = "0.3.2"
ntex-macros = "0.1.3"
futures-util = { version = "0.3.18", default-features = false, features = ["alloc"] }

View file

@ -99,7 +99,7 @@ mod tests {
use super::*;
use crate::future::lazy;
#[ntex::test]
#[ntex_macros::rt_test2]
#[allow(clippy::unit_cmp)]
async fn test_condition() {
let cond = Condition::new();
@ -127,7 +127,7 @@ mod tests {
assert_eq!(waiter2.await, ());
}
#[ntex::test]
#[ntex_macros::rt_test2]
async fn test_condition_poll() {
let cond = Condition::new();
let waiter = cond.wait();

View file

@ -103,7 +103,7 @@ mod tests {
use super::*;
use crate::future::lazy;
#[ntex::test]
#[ntex_macros::rt_test2]
async fn test_oneshot() {
let (tx, rx) = channel();
tx.send("test").unwrap();

View file

@ -177,7 +177,7 @@ mod tests {
use super::*;
use crate::future::lazy;
#[ntex::test]
#[ntex_macros::rt_test2]
async fn test_pool() {
let p = new();
let (tx, rx) = p.channel();

View file

@ -1,7 +1,41 @@
//! Utilities for ntex framework
use std::{cell::RefCell, future::Future, pin::Pin};
pub mod channel;
pub mod future;
pub mod task;
pub mod time;
pub use futures_core::{ready, Stream};
pub use futures_sink::Sink;
thread_local! {
#[allow(clippy::type_complexity)]
static SPAWNER: RefCell<Box<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)>> = RefCell::new(Box::new(|_| {
panic!("spawn fn is not configured");
}));
}
/// Spawn a future on the current thread.
///
/// # Panics
///
/// This function panics if spawn fn is not set.
#[inline]
pub fn spawn<F>(fut: F)
where
F: Future<Output = ()> + 'static,
{
SPAWNER.with(move |f| {
(*f.borrow())(Box::pin(fut));
});
}
pub fn set_spawn_fn<F>(f: F)
where
F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
{
SPAWNER.with(|ctx| {
*ctx.borrow_mut() = Box::new(f);
});
}

View file

@ -1,5 +1,4 @@
//! Utilities for tracking time.
use std::{future::Future, pin::Pin, task, task::Poll};
mod types;
@ -164,7 +163,7 @@ impl Interval {
#[inline]
pub async fn tick(&self) {
crate::util::poll_fn(|cx| self.poll_tick(cx)).await;
crate::future::poll_fn(|cx| self.poll_tick(cx)).await;
}
#[inline]
@ -192,24 +191,32 @@ impl crate::Stream for Interval {
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use futures_util::StreamExt;
use std::time;
use super::*;
/// State Under Test: Two calls of `now()` return the same value if they are done within resolution interval.
///
/// Expected Behavior: Two back-to-back calls of `now()` return the same value.
#[crate::rt_test]
#[ntex_macros::rt_test2]
async fn lowres_time_does_not_immediately_change() {
assert_eq!(now(), now());
crate::set_spawn_fn(|f| {
ntex_rt::spawn(f);
});
assert_eq!(now(), now())
}
/// State Under Test: `now()` updates returned value every ~1ms period.
///
/// Expected Behavior: Two calls of `now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a 1ms interval.
#[crate::rt_test]
#[ntex_macros::rt_test2]
async fn lowres_time_updates_after_resolution_interval() {
crate::set_spawn_fn(|f| {
ntex_rt::spawn(f);
});
let first_time = now();
sleep(Millis(25)).await;
@ -221,8 +228,12 @@ mod tests {
/// State Under Test: Two calls of `system_time()` return the same value if they are done within 1ms interval.
///
/// Expected Behavior: Two back-to-back calls of `now()` return the same value.
#[crate::rt_test]
#[ntex_macros::rt_test2]
async fn system_time_service_time_does_not_immediately_change() {
crate::set_spawn_fn(|f| {
ntex_rt::spawn(f);
});
assert_eq!(system_time(), system_time());
}
@ -230,8 +241,12 @@ mod tests {
///
/// Expected Behavior: Two calls of `system_time()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[crate::rt_test]
#[ntex_macros::rt_test2]
async fn system_time_service_time_updates_after_resolution_interval() {
crate::set_spawn_fn(|f| {
ntex_rt::spawn(f);
});
let wait_time = 300;
let first_time = system_time()
@ -247,9 +262,12 @@ mod tests {
assert!(second_time - first_time >= time::Duration::from_millis(wait_time));
}
#[cfg(not(target_os = "macos"))]
#[crate::rt_test]
#[ntex_macros::rt_test2]
async fn test_interval() {
crate::set_spawn_fn(|f| {
ntex_rt::spawn(f);
});
let mut int = interval(Millis(250));
let time = time::Instant::now();
@ -273,9 +291,12 @@ mod tests {
);
}
#[cfg(not(target_os = "macos"))]
#[crate::rt_test]
#[ntex_macros::rt_test2]
async fn test_interval_one_sec() {
crate::set_spawn_fn(|f| {
ntex_rt::spawn(f);
});
let int = interval(Millis::ONE_SEC);
for _i in 0..3 {

View file

@ -1,19 +1,5 @@
use std::{convert::TryInto, ops};
// /// A measurement of a monotonically nondecreasing clock. Opaque and useful only with Duration.
// ///
// /// Instants are always guaranteed to be no less than any previously
// /// measured instant when created, and are often useful for tasks such as measuring
// /// benchmarks or timing how long an operation takes.
// #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
// pub struct Instant(u64);
// impl Instant {
// pub fn now() -> Instant {
// todo!()
// }
// }
/// A Duration type to represent a span of time.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Millis(pub u64);

View file

@ -6,9 +6,9 @@ use std::cell::RefCell;
use std::time::{Duration, Instant, SystemTime};
use std::{cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll};
use futures_timer::Delay;
use slab::Slab;
use crate::rt::time_driver::{sleep_until, Sleep};
use crate::task::LocalWaker;
// Clock divisor for the next level
@ -140,14 +140,14 @@ struct Timer {
next_expiry: u64,
flags: Flags,
driver: LocalWaker,
driver_sleep: Pin<Box<Sleep>>,
driver_sleep: Delay,
buckets: Vec<Bucket>,
/// Bit field tracking which bucket currently contain entries.
occupied: [u64; WHEEL_SIZE],
lowres_time: Option<Instant>,
lowres_stime: Option<SystemTime>,
lowres_driver: LocalWaker,
lowres_driver_sleep: Pin<Box<Sleep>>,
lowres_driver_sleep: Delay,
}
impl Timer {
@ -160,12 +160,12 @@ impl Timer {
next_expiry: u64::MAX,
flags: Flags::empty(),
driver: LocalWaker::new(),
driver_sleep: Box::pin(sleep_until(Instant::now())),
driver_sleep: Delay::new(Duration::ZERO),
occupied: [0; WHEEL_SIZE],
lowres_time: None,
lowres_stime: None,
lowres_driver: LocalWaker::new(),
lowres_driver_sleep: Box::pin(sleep_until(Instant::now())),
lowres_driver_sleep: Delay::new(Duration::ZERO),
}
}
@ -542,10 +542,9 @@ struct TimerDriver(Rc<RefCell<Timer>>);
impl TimerDriver {
fn start(slf: &mut Timer, cell: &Rc<RefCell<Timer>>) {
slf.flags.insert(Flags::DRIVER_STARTED);
let deadline = Instant::now() + Duration::from_millis(slf.next_expiry_ms());
slf.driver_sleep = Box::pin(sleep_until(deadline));
slf.driver_sleep = Delay::new(Duration::from_millis(slf.next_expiry_ms()));
crate::rt::spawn(TimerDriver(cell.clone()));
crate::spawn(TimerDriver(cell.clone()));
}
}
@ -565,27 +564,26 @@ impl Future for TimerDriver {
if inner.flags.contains(Flags::DRIVER_RECALC) {
inner.flags.remove(Flags::DRIVER_RECALC);
let now = Instant::now();
let deadline = if let Some(diff) =
now.checked_duration_since(inner.elapsed_time())
{
now + Duration::from_millis(inner.next_expiry_ms()).saturating_sub(diff)
let deadline =
if let Some(diff) = now.checked_duration_since(inner.elapsed_time()) {
Duration::from_millis(inner.next_expiry_ms()).saturating_sub(diff)
} else {
now + Duration::from_millis(inner.next_expiry_ms())
Duration::from_millis(inner.next_expiry_ms())
};
Pin::as_mut(&mut inner.driver_sleep).reset(deadline);
inner.driver_sleep.reset(deadline);
}
loop {
if Pin::as_mut(&mut inner.driver_sleep).poll(cx).is_ready() {
let now = inner.driver_sleep.deadline();
if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() {
let now = Instant::now();
inner.elapsed = inner.next_expiry;
inner.elapsed_time = Some(now);
inner.execute_expired_timers();
if let Some(next_expiry) = inner.next_pending_bucket() {
inner.next_expiry = next_expiry;
let deadline = now + Duration::from_millis(inner.next_expiry_ms());
Pin::as_mut(&mut inner.driver_sleep).reset(deadline);
let dur = Duration::from_millis(inner.next_expiry_ms());
inner.driver_sleep.reset(dur);
continue;
} else {
inner.next_expiry = u64::MAX;
@ -602,10 +600,9 @@ struct LowresTimerDriver(Rc<RefCell<Timer>>);
impl LowresTimerDriver {
fn start(slf: &mut Timer, cell: &Rc<RefCell<Timer>>) {
slf.flags.insert(Flags::LOWRES_DRIVER);
slf.lowres_driver_sleep =
Box::pin(sleep_until(Instant::now() + LOWRES_RESOLUTION));
slf.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION);
crate::rt::spawn(LowresTimerDriver(cell.clone()));
crate::spawn(LowresTimerDriver(cell.clone()));
}
}
@ -624,10 +621,7 @@ impl Future for LowresTimerDriver {
loop {
if inner.flags.contains(Flags::LOWRES_TIMER) {
if Pin::as_mut(&mut inner.lowres_driver_sleep)
.poll(cx)
.is_ready()
{
if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() {
inner.lowres_time = None;
inner.lowres_stime = None;
inner.flags.remove(Flags::LOWRES_TIMER);
@ -635,22 +629,24 @@ impl Future for LowresTimerDriver {
return Poll::Pending;
} else {
inner.flags.insert(Flags::LOWRES_TIMER);
Pin::as_mut(&mut inner.lowres_driver_sleep)
.reset(Instant::now() + LOWRES_RESOLUTION);
inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION);
}
}
}
}
#[cfg(not(target_os = "macos"))]
#[cfg(test)]
mod tests {
use super::*;
use crate::time::{interval, sleep, Millis};
#[crate::rt_test]
#[ntex_macros::rt_test2]
async fn test_timer() {
crate::rt::spawn(async {
crate::set_spawn_fn(|f| {
ntex_rt::spawn(f);
});
crate::spawn(async {
let s = interval(Millis(25));
loop {
s.tick().await;
@ -663,7 +659,7 @@ mod tests {
fut2.await;
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(250),
elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(500),
"elapsed: {:?}",
elapsed
);
@ -672,7 +668,7 @@ mod tests {
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(1000)
&& elapsed < Duration::from_millis(1200),
&& elapsed < Duration::from_millis(3000), // osx
"elapsed: {:?}",
elapsed
);
@ -681,7 +677,7 @@ mod tests {
sleep(Millis(25)).await;
let elapsed = Instant::now() - time;
assert!(
elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(40)
elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(50)
);
}
}

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.14] - 2021-12-xx
* Move ntex::time to ntex-util crate
## [0.4.13] - 2021-12-07
* server: Rename .apply/.apply_async to .on_worker_start()

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.4.13"
version = "0.4.14"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -44,11 +44,11 @@ http-framework = ["h2", "http", "httparse",
[dependencies]
ntex-codec = "0.5.1"
ntex-rt = "0.3.1"
ntex-rt = "0.3.2"
ntex-router = "0.5.1"
ntex-service = "0.2.1"
ntex-macros = "0.1.3"
ntex-util = "0.1.1"
ntex-util = "0.1.2"
ntex-bytes = "0.1.7"
base64 = "0.13"

View file

@ -40,7 +40,6 @@ pub mod framed;
pub mod http;
pub mod server;
pub mod testing;
pub mod time;
pub mod util;
#[cfg(feature = "http-framework")]
pub mod web;
@ -74,3 +73,8 @@ pub mod rt {
pub mod service {
pub use ntex_service::*;
}
pub mod time {
//! Utilities for tracking time.
pub use ntex_util::time::*;
}