diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index cd21c545..5132bd29 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -66,7 +66,7 @@ jobs: if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') continue-on-error: true run: | - cargo tarpaulin --out Xml --all --all-features -- --skip time:: + cargo tarpaulin --out Xml --all --all-features - name: Upload to Codecov if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index a4982e09..929f7a86 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.2] - 2021-12-10 + +* Set spawn fn to ntex-util + ## [0.3.1] - 2021-08-28 * Re-export time as different module diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index a3c8082d..a54b9620 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.3.1" +version = "0.3.2" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -16,6 +16,6 @@ name = "ntex_rt" path = "src/lib.rs" [dependencies] -ntex-util = "0.1.0" +ntex-util = "0.1.2" pin-project-lite = "0.2" tokio = { version = "1", default-features = false, features = ["rt", "net", "time", "signal", "sync"] } diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index 5d73c770..a7ba02a3 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -87,7 +87,10 @@ impl Builder { // start the system arbiter let _ = local.spawn_local(arb); - AsyncSystemRunner { stop, system } + AsyncSystemRunner { + stop, + _system: system, + } } fn create_runtime(self, f: F) -> SystemRunner @@ -99,6 +102,11 @@ impl Builder { let rt = Runtime::new().unwrap(); + // set ntex-util spawn fn + ntex_util::set_spawn_fn(|fut| { + tokio::task::spawn_local(fut); + }); + // system arbiter let system = System::construct( sys_sender, @@ -111,14 +119,18 @@ impl Builder { // init system arbiter and run configuration method rt.block_on(lazy(move |_| f())); - SystemRunner { rt, stop, system } + SystemRunner { + rt, + stop, + _system: system, + } } } #[derive(Debug)] pub struct AsyncSystemRunner { stop: Receiver, - system: System, + _system: System, } impl AsyncSystemRunner { @@ -152,7 +164,7 @@ impl AsyncSystemRunner { pub struct SystemRunner { rt: Runtime, stop: Receiver, - system: System, + _system: System, } impl SystemRunner { diff --git a/ntex-rt/src/runtime.rs b/ntex-rt/src/runtime.rs index d9389476..364ea299 100644 --- a/ntex-rt/src/runtime.rs +++ b/ntex-rt/src/runtime.rs @@ -89,6 +89,11 @@ impl Runtime { where F: Future, { + // set ntex-util spawn fn + ntex_util::set_spawn_fn(|fut| { + crate::spawn(fut); + }); + self.local.block_on(&self.rt, f) } } diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 7dc4cb9e..1b066aa4 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -16,8 +16,8 @@ name = "ntex_service" path = "src/lib.rs" [dependencies] -ntex-util = "0.1.1" +ntex-util = "0.1.2" pin-project-lite = "0.2.6" [dev-dependencies] -ntex = "0.3.13" +ntex = "0.4.13" diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 042e0171..6377ce1c 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.1.2] - 2021-12-10 + +* move in ntex::time utils + +* replace tokio::time with futures-timer + ## [0.1.1] - 2021-04-11 * next renamed to stream_recv diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 02cf4572..fe88e1a4 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "0.1.1" +version = "0.1.2" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,10 +17,17 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2" +log = "0.4" slab = "0.4" +futures-timer = "3.0.2" futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] } futures-sink = { version = "0.3.18", default-features = false, features = ["alloc"] } pin-project-lite = "0.2.6" +backtrace = "*" + [dev-dependencies] -ntex = "0.3.14" +ntex = "0.4.10" +ntex-rt = "0.3.2" +ntex-macros = "0.1.3" +futures-util = { version = "0.3.18", default-features = false, features = ["alloc"] } diff --git a/ntex-util/src/channel/condition.rs b/ntex-util/src/channel/condition.rs index 9ca6ecc6..ac00dc9f 100644 --- a/ntex-util/src/channel/condition.rs +++ b/ntex-util/src/channel/condition.rs @@ -99,7 +99,7 @@ mod tests { use super::*; use crate::future::lazy; - #[ntex::test] + #[ntex_macros::rt_test2] #[allow(clippy::unit_cmp)] async fn test_condition() { let cond = Condition::new(); @@ -127,7 +127,7 @@ mod tests { assert_eq!(waiter2.await, ()); } - #[ntex::test] + #[ntex_macros::rt_test2] async fn test_condition_poll() { let cond = Condition::new(); let waiter = cond.wait(); diff --git a/ntex-util/src/channel/oneshot.rs b/ntex-util/src/channel/oneshot.rs index fe9f6134..8446afc6 100644 --- a/ntex-util/src/channel/oneshot.rs +++ b/ntex-util/src/channel/oneshot.rs @@ -103,7 +103,7 @@ mod tests { use super::*; use crate::future::lazy; - #[ntex::test] + #[ntex_macros::rt_test2] async fn test_oneshot() { let (tx, rx) = channel(); tx.send("test").unwrap(); diff --git a/ntex-util/src/channel/pool.rs b/ntex-util/src/channel/pool.rs index b0c4568e..b900da44 100644 --- a/ntex-util/src/channel/pool.rs +++ b/ntex-util/src/channel/pool.rs @@ -177,7 +177,7 @@ mod tests { use super::*; use crate::future::lazy; - #[ntex::test] + #[ntex_macros::rt_test2] async fn test_pool() { let p = new(); let (tx, rx) = p.channel(); diff --git a/ntex-util/src/lib.rs b/ntex-util/src/lib.rs index 9b7b34e7..70568dc8 100644 --- a/ntex-util/src/lib.rs +++ b/ntex-util/src/lib.rs @@ -1,7 +1,41 @@ //! Utilities for ntex framework +use std::{cell::RefCell, future::Future, pin::Pin}; + pub mod channel; pub mod future; pub mod task; +pub mod time; pub use futures_core::{ready, Stream}; pub use futures_sink::Sink; + +thread_local! { + #[allow(clippy::type_complexity)] + static SPAWNER: RefCell>>)>> = RefCell::new(Box::new(|_| { + panic!("spawn fn is not configured"); + })); +} + +/// Spawn a future on the current thread. +/// +/// # Panics +/// +/// This function panics if spawn fn is not set. +#[inline] +pub fn spawn(fut: F) +where + F: Future + 'static, +{ + SPAWNER.with(move |f| { + (*f.borrow())(Box::pin(fut)); + }); +} + +pub fn set_spawn_fn(f: F) +where + F: Fn(Pin>>) + 'static, +{ + SPAWNER.with(|ctx| { + *ctx.borrow_mut() = Box::new(f); + }); +} diff --git a/ntex/src/time/mod.rs b/ntex-util/src/time/mod.rs similarity index 91% rename from ntex/src/time/mod.rs rename to ntex-util/src/time/mod.rs index 7753a1d1..aa538d2d 100644 --- a/ntex/src/time/mod.rs +++ b/ntex-util/src/time/mod.rs @@ -1,5 +1,4 @@ //! Utilities for tracking time. - use std::{future::Future, pin::Pin, task, task::Poll}; mod types; @@ -164,7 +163,7 @@ impl Interval { #[inline] pub async fn tick(&self) { - crate::util::poll_fn(|cx| self.poll_tick(cx)).await; + crate::future::poll_fn(|cx| self.poll_tick(cx)).await; } #[inline] @@ -192,24 +191,32 @@ impl crate::Stream for Interval { #[cfg(test)] mod tests { - use super::*; - use futures::StreamExt; + use futures_util::StreamExt; use std::time; + use super::*; + /// State Under Test: Two calls of `now()` return the same value if they are done within resolution interval. /// /// Expected Behavior: Two back-to-back calls of `now()` return the same value. - #[crate::rt_test] + #[ntex_macros::rt_test2] async fn lowres_time_does_not_immediately_change() { - assert_eq!(now(), now()); + crate::set_spawn_fn(|f| { + ntex_rt::spawn(f); + }); + assert_eq!(now(), now()) } /// State Under Test: `now()` updates returned value every ~1ms period. /// /// Expected Behavior: Two calls of `now()` made in subsequent resolution interval return different values /// and second value is greater than the first one at least by a 1ms interval. - #[crate::rt_test] + #[ntex_macros::rt_test2] async fn lowres_time_updates_after_resolution_interval() { + crate::set_spawn_fn(|f| { + ntex_rt::spawn(f); + }); + let first_time = now(); sleep(Millis(25)).await; @@ -221,8 +228,12 @@ mod tests { /// State Under Test: Two calls of `system_time()` return the same value if they are done within 1ms interval. /// /// Expected Behavior: Two back-to-back calls of `now()` return the same value. - #[crate::rt_test] + #[ntex_macros::rt_test2] async fn system_time_service_time_does_not_immediately_change() { + crate::set_spawn_fn(|f| { + ntex_rt::spawn(f); + }); + assert_eq!(system_time(), system_time()); } @@ -230,8 +241,12 @@ mod tests { /// /// Expected Behavior: Two calls of `system_time()` made in subsequent resolution interval return different values /// and second value is greater than the first one at least by a resolution interval. - #[crate::rt_test] + #[ntex_macros::rt_test2] async fn system_time_service_time_updates_after_resolution_interval() { + crate::set_spawn_fn(|f| { + ntex_rt::spawn(f); + }); + let wait_time = 300; let first_time = system_time() @@ -247,9 +262,12 @@ mod tests { assert!(second_time - first_time >= time::Duration::from_millis(wait_time)); } - #[cfg(not(target_os = "macos"))] - #[crate::rt_test] + #[ntex_macros::rt_test2] async fn test_interval() { + crate::set_spawn_fn(|f| { + ntex_rt::spawn(f); + }); + let mut int = interval(Millis(250)); let time = time::Instant::now(); @@ -273,9 +291,12 @@ mod tests { ); } - #[cfg(not(target_os = "macos"))] - #[crate::rt_test] + #[ntex_macros::rt_test2] async fn test_interval_one_sec() { + crate::set_spawn_fn(|f| { + ntex_rt::spawn(f); + }); + let int = interval(Millis::ONE_SEC); for _i in 0..3 { diff --git a/ntex/src/time/types.rs b/ntex-util/src/time/types.rs similarity index 88% rename from ntex/src/time/types.rs rename to ntex-util/src/time/types.rs index f941c535..2a88fe6a 100644 --- a/ntex/src/time/types.rs +++ b/ntex-util/src/time/types.rs @@ -1,19 +1,5 @@ use std::{convert::TryInto, ops}; -// /// A measurement of a monotonically nondecreasing clock. Opaque and useful only with Duration. -// /// -// /// Instants are always guaranteed to be no less than any previously -// /// measured instant when created, and are often useful for tasks such as measuring -// /// benchmarks or timing how long an operation takes. -// #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -// pub struct Instant(u64); - -// impl Instant { -// pub fn now() -> Instant { -// todo!() -// } -// } - /// A Duration type to represent a span of time. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Millis(pub u64); diff --git a/ntex/src/time/wheel.rs b/ntex-util/src/time/wheel.rs similarity index 91% rename from ntex/src/time/wheel.rs rename to ntex-util/src/time/wheel.rs index 60647525..6fd54f1a 100644 --- a/ntex/src/time/wheel.rs +++ b/ntex-util/src/time/wheel.rs @@ -6,9 +6,9 @@ use std::cell::RefCell; use std::time::{Duration, Instant, SystemTime}; use std::{cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll}; +use futures_timer::Delay; use slab::Slab; -use crate::rt::time_driver::{sleep_until, Sleep}; use crate::task::LocalWaker; // Clock divisor for the next level @@ -140,14 +140,14 @@ struct Timer { next_expiry: u64, flags: Flags, driver: LocalWaker, - driver_sleep: Pin>, + driver_sleep: Delay, buckets: Vec, /// Bit field tracking which bucket currently contain entries. occupied: [u64; WHEEL_SIZE], lowres_time: Option, lowres_stime: Option, lowres_driver: LocalWaker, - lowres_driver_sleep: Pin>, + lowres_driver_sleep: Delay, } impl Timer { @@ -160,12 +160,12 @@ impl Timer { next_expiry: u64::MAX, flags: Flags::empty(), driver: LocalWaker::new(), - driver_sleep: Box::pin(sleep_until(Instant::now())), + driver_sleep: Delay::new(Duration::ZERO), occupied: [0; WHEEL_SIZE], lowres_time: None, lowres_stime: None, lowres_driver: LocalWaker::new(), - lowres_driver_sleep: Box::pin(sleep_until(Instant::now())), + lowres_driver_sleep: Delay::new(Duration::ZERO), } } @@ -542,10 +542,9 @@ struct TimerDriver(Rc>); impl TimerDriver { fn start(slf: &mut Timer, cell: &Rc>) { slf.flags.insert(Flags::DRIVER_STARTED); - let deadline = Instant::now() + Duration::from_millis(slf.next_expiry_ms()); - slf.driver_sleep = Box::pin(sleep_until(deadline)); + slf.driver_sleep = Delay::new(Duration::from_millis(slf.next_expiry_ms())); - crate::rt::spawn(TimerDriver(cell.clone())); + crate::spawn(TimerDriver(cell.clone())); } } @@ -565,27 +564,26 @@ impl Future for TimerDriver { if inner.flags.contains(Flags::DRIVER_RECALC) { inner.flags.remove(Flags::DRIVER_RECALC); let now = Instant::now(); - let deadline = if let Some(diff) = - now.checked_duration_since(inner.elapsed_time()) - { - now + Duration::from_millis(inner.next_expiry_ms()).saturating_sub(diff) - } else { - now + Duration::from_millis(inner.next_expiry_ms()) - }; - Pin::as_mut(&mut inner.driver_sleep).reset(deadline); + let deadline = + if let Some(diff) = now.checked_duration_since(inner.elapsed_time()) { + Duration::from_millis(inner.next_expiry_ms()).saturating_sub(diff) + } else { + Duration::from_millis(inner.next_expiry_ms()) + }; + inner.driver_sleep.reset(deadline); } loop { - if Pin::as_mut(&mut inner.driver_sleep).poll(cx).is_ready() { - let now = inner.driver_sleep.deadline(); + if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() { + let now = Instant::now(); inner.elapsed = inner.next_expiry; inner.elapsed_time = Some(now); inner.execute_expired_timers(); if let Some(next_expiry) = inner.next_pending_bucket() { inner.next_expiry = next_expiry; - let deadline = now + Duration::from_millis(inner.next_expiry_ms()); - Pin::as_mut(&mut inner.driver_sleep).reset(deadline); + let dur = Duration::from_millis(inner.next_expiry_ms()); + inner.driver_sleep.reset(dur); continue; } else { inner.next_expiry = u64::MAX; @@ -602,10 +600,9 @@ struct LowresTimerDriver(Rc>); impl LowresTimerDriver { fn start(slf: &mut Timer, cell: &Rc>) { slf.flags.insert(Flags::LOWRES_DRIVER); - slf.lowres_driver_sleep = - Box::pin(sleep_until(Instant::now() + LOWRES_RESOLUTION)); + slf.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION); - crate::rt::spawn(LowresTimerDriver(cell.clone())); + crate::spawn(LowresTimerDriver(cell.clone())); } } @@ -624,10 +621,7 @@ impl Future for LowresTimerDriver { loop { if inner.flags.contains(Flags::LOWRES_TIMER) { - if Pin::as_mut(&mut inner.lowres_driver_sleep) - .poll(cx) - .is_ready() - { + if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() { inner.lowres_time = None; inner.lowres_stime = None; inner.flags.remove(Flags::LOWRES_TIMER); @@ -635,22 +629,24 @@ impl Future for LowresTimerDriver { return Poll::Pending; } else { inner.flags.insert(Flags::LOWRES_TIMER); - Pin::as_mut(&mut inner.lowres_driver_sleep) - .reset(Instant::now() + LOWRES_RESOLUTION); + inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION); } } } } -#[cfg(not(target_os = "macos"))] #[cfg(test)] mod tests { use super::*; use crate::time::{interval, sleep, Millis}; - #[crate::rt_test] + #[ntex_macros::rt_test2] async fn test_timer() { - crate::rt::spawn(async { + crate::set_spawn_fn(|f| { + ntex_rt::spawn(f); + }); + + crate::spawn(async { let s = interval(Millis(25)); loop { s.tick().await; @@ -663,7 +659,7 @@ mod tests { fut2.await; let elapsed = Instant::now() - time; assert!( - elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(250), + elapsed > Duration::from_millis(200) && elapsed < Duration::from_millis(500), "elapsed: {:?}", elapsed ); @@ -672,7 +668,7 @@ mod tests { let elapsed = Instant::now() - time; assert!( elapsed > Duration::from_millis(1000) - && elapsed < Duration::from_millis(1200), + && elapsed < Duration::from_millis(3000), // osx "elapsed: {:?}", elapsed ); @@ -681,7 +677,7 @@ mod tests { sleep(Millis(25)).await; let elapsed = Instant::now() - time; assert!( - elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(40) + elapsed > Duration::from_millis(20) && elapsed < Duration::from_millis(50) ); } } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 92654eba..8fbea1d8 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.14] - 2021-12-xx + +* Move ntex::time to ntex-util crate + ## [0.4.13] - 2021-12-07 * server: Rename .apply/.apply_async to .on_worker_start() diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 2c867e0f..9a2984a7 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.4.13" +version = "0.4.14" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -44,11 +44,11 @@ http-framework = ["h2", "http", "httparse", [dependencies] ntex-codec = "0.5.1" -ntex-rt = "0.3.1" +ntex-rt = "0.3.2" ntex-router = "0.5.1" ntex-service = "0.2.1" ntex-macros = "0.1.3" -ntex-util = "0.1.1" +ntex-util = "0.1.2" ntex-bytes = "0.1.7" base64 = "0.13" diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index a9bffbd8..5731ce16 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -40,7 +40,6 @@ pub mod framed; pub mod http; pub mod server; pub mod testing; -pub mod time; pub mod util; #[cfg(feature = "http-framework")] pub mod web; @@ -74,3 +73,8 @@ pub mod rt { pub mod service { pub use ntex_service::*; } + +pub mod time { + //! Utilities for tracking time. + pub use ntex_util::time::*; +}