From c355ca2587edb057bc9e3af2f03f0b3ac7d4cc8e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 29 Aug 2024 16:58:52 +0500 Subject: [PATCH] Add compio runtime support --- Cargo.toml | 2 + ntex-rt/CHANGES.md | 4 + ntex-rt/Cargo.toml | 10 +- ntex-rt/src/arbiter.rs | 4 +- ntex-rt/src/lib.rs | 469 ++++++++++++++++++++++++++--------------- 5 files changed, 316 insertions(+), 173 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fcacfd41..d058ae65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,8 @@ members = [ "ntex-tls", "ntex-macros", "ntex-util", + + "ntex-compio", "ntex-glommio", "ntex-tokio", "ntex-async-std", diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index 820597b8..967ae179 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.14] - 2024-08-29 + +* Add `compio` runtime support + ## [0.4.13] - 2024-04-04 * Use tokio Handle if available diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 9110d8bb..e8a72a0b 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.13" +version = "0.4.14" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -24,6 +24,9 @@ glommio = ["glomm-io", "futures-channel"] # tokio support tokio = ["tok-io"] +# compio support +compio = ["comp-io"] + # async-std support async-std = ["async_std/unstable"] @@ -33,11 +36,14 @@ futures-core = "0.3" log = "0.4" oneshot = "0.1" +async_std = { version = "1", package = "async-std", optional = true } +comp-io = { version = "0.11", package = "compio", default-features = false, features = [ + "runtime" +], optional = true } tok-io = { version = "1", package = "tokio", default-features = false, features = [ "rt", "net", ], optional = true } -async_std = { version = "1", package = "async-std", optional = true } [target.'cfg(target_os = "linux")'.dependencies] glomm-io = { version = "0.8", package = "glommio", optional = true } diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index 17e1d674..ee25b280 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -26,7 +26,9 @@ pub(super) enum ArbiterCommand { } /// Arbiters provide an asynchronous execution environment for actors, functions -/// and futures. When an Arbiter is created, it spawns a new OS thread, and +/// and futures. +/// +/// When an Arbiter is created, it spawns a new OS thread, and /// hosts an event loop. Some Arbiter functions execute on the current thread. pub struct Arbiter { sender: Sender, diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 18150d70..ee74235f 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -47,6 +47,291 @@ pub unsafe fn spawn_cbs( }); } +#[cfg(feature = "tokio")] +mod tokio { + use std::future::{poll_fn, Future}; + use tok_io::runtime::Handle; + pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle}; + + /// Runs the provided future, blocking the current thread until the future + /// completes. + pub fn block_on>(fut: F) { + if let Ok(hnd) = Handle::try_current() { + log::debug!("Use existing tokio runtime and block on future"); + hnd.block_on(tok_io::task::LocalSet::new().run_until(fut)); + } else { + log::debug!("Create tokio runtime and block on future"); + + let rt = tok_io::runtime::Builder::new_current_thread() + .enable_all() + //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + tok_io::task::LocalSet::new().block_on(&rt, fut); + } + } + + /// Spawn a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for spawning futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn(f: F) -> tok_io::task::JoinHandle + where + F: Future + 'static, + { + let ptr = crate::CB.with(|cb| (cb.borrow().0)()); + tok_io::task::spawn_local(async move { + if let Some(ptr) = ptr { + tok_io::pin!(f); + let result = poll_fn(|ctx| { + let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); + let result = f.as_mut().poll(ctx); + crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); + result + }) + .await; + crate::CB.with(|cb| (cb.borrow().3)(ptr)); + result + } else { + f.await + } + }) + } + + /// Executes a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for executing futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn_fn(f: F) -> tok_io::task::JoinHandle + where + F: FnOnce() -> R + 'static, + R: Future + 'static, + { + spawn(async move { f().await }) + } +} + +#[allow(dead_code)] +#[cfg(feature = "compio")] +mod compio { + use std::task::{ready, Context, Poll}; + use std::{fmt, future::poll_fn, future::Future, pin::Pin}; + + use comp_io::runtime::Runtime; + + /// Runs the provided future, blocking the current thread until the future + /// completes. + pub fn block_on>(fut: F) { + log::debug!("Create compio runtime and block on future"); + let rt = Runtime::new().unwrap(); + rt.block_on(fut); + } + + /// Spawns a blocking task. + /// + /// The task will be spawned onto a thread pool specifically dedicated + /// to blocking tasks. This is useful to prevent long-running synchronous + /// operations from blocking the main futures executor. + pub fn spawn_blocking(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + Sync + 'static, + T: Send + 'static, + { + JoinHandle { + fut: Some(comp_io::runtime::spawn_blocking(f)), + } + } + + /// Spawn a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for spawning futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn(f: F) -> JoinHandle + where + F: Future + 'static, + { + let ptr = crate::CB.with(|cb| (cb.borrow().0)()); + let fut = comp_io::runtime::spawn(async move { + if let Some(ptr) = ptr { + let mut f = std::pin::pin!(f); + let result = poll_fn(|ctx| { + let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); + let result = f.as_mut().poll(ctx); + crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); + result + }) + .await; + crate::CB.with(|cb| (cb.borrow().3)(ptr)); + result + } else { + f.await + } + }); + + JoinHandle { fut: Some(fut) } + } + + /// Executes a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for executing futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn_fn(f: F) -> JoinHandle + where + F: FnOnce() -> R + 'static, + R: Future + 'static, + { + spawn(async move { f().await }) + } + + #[derive(Debug, Copy, Clone)] + pub struct JoinError; + + impl fmt::Display for JoinError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "JoinError") + } + } + + impl std::error::Error for JoinError {} + + pub struct JoinHandle { + fut: Option>, + } + + impl Drop for JoinHandle { + fn drop(&mut self) { + self.fut.take().unwrap().detach(); + } + } + + impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready( + ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx)) + .map_err(|_| JoinError), + ) + } + } +} + +#[allow(dead_code)] +#[cfg(feature = "async-std")] +mod asyncstd { + use std::future::{poll_fn, Future}; + use std::{fmt, pin::Pin, task::ready, task::Context, task::Poll}; + + /// Runs the provided future, blocking the current thread until the future + /// completes. + pub fn block_on>(fut: F) { + async_std::task::block_on(fut); + } + + /// Spawn a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for spawning futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn(mut f: F) -> JoinHandle + where + F: Future + 'static, + { + let ptr = crate::CB.with(|cb| (cb.borrow().0)()); + JoinHandle { + fut: async_std::task::spawn_local(async move { + if let Some(ptr) = ptr { + let mut f = unsafe { Pin::new_unchecked(&mut f) }; + let result = poll_fn(|ctx| { + let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); + let result = f.as_mut().poll(ctx); + crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); + result + }) + .await; + crate::CB.with(|cb| (cb.borrow().3)(ptr)); + result + } else { + f.await + } + }), + } + } + + /// Executes a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for executing futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn_fn(f: F) -> JoinHandle + where + F: FnOnce() -> R + 'static, + R: Future + 'static, + { + spawn(async move { f().await }) + } + + /// Spawns a blocking task. + /// + /// The task will be spawned onto a thread pool specifically dedicated + /// to blocking tasks. This is useful to prevent long-running synchronous + /// operations from blocking the main futures executor. + pub fn spawn_blocking(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + JoinHandle { + fut: async_std::task::spawn_blocking(f), + } + } + + #[derive(Debug, Copy, Clone)] + pub struct JoinError; + + impl fmt::Display for JoinError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "JoinError") + } + } + + impl std::error::Error for JoinError {} + + pub struct JoinHandle { + fut: async_std::task::JoinHandle, + } + + impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx)))) + } + } +} + #[allow(dead_code)] #[cfg(all(feature = "glommio", target_os = "linux"))] mod glommio { @@ -161,181 +446,12 @@ mod glommio { } } -#[cfg(feature = "tokio")] -mod tokio { - use std::future::{poll_fn, Future}; - use tok_io::runtime::Handle; - pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle}; - - /// Runs the provided future, blocking the current thread until the future - /// completes. - pub fn block_on>(fut: F) { - if let Ok(hnd) = Handle::try_current() { - hnd.block_on(tok_io::task::LocalSet::new().run_until(fut)); - } else { - let rt = tok_io::runtime::Builder::new_current_thread() - .enable_all() - //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime) - .build() - .unwrap(); - tok_io::task::LocalSet::new().block_on(&rt, fut); - } - } - - /// Spawn a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for spawning futures on the current - /// thread. - /// - /// # Panics - /// - /// This function panics if ntex system is not running. - #[inline] - pub fn spawn(f: F) -> tok_io::task::JoinHandle - where - F: Future + 'static, - { - let ptr = crate::CB.with(|cb| (cb.borrow().0)()); - tok_io::task::spawn_local(async move { - if let Some(ptr) = ptr { - tok_io::pin!(f); - let result = poll_fn(|ctx| { - let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); - let result = f.as_mut().poll(ctx); - crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); - result - }) - .await; - crate::CB.with(|cb| (cb.borrow().3)(ptr)); - result - } else { - f.await - } - }) - } - - /// Executes a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for executing futures on the current - /// thread. - /// - /// # Panics - /// - /// This function panics if ntex system is not running. - #[inline] - pub fn spawn_fn(f: F) -> tok_io::task::JoinHandle - where - F: FnOnce() -> R + 'static, - R: Future + 'static, - { - spawn(async move { f().await }) - } -} - -#[allow(dead_code)] -#[cfg(feature = "async-std")] -mod asyncstd { - use futures_core::ready; - use std::future::{poll_fn, Future}; - use std::{fmt, pin::Pin, task::Context, task::Poll}; - - /// Runs the provided future, blocking the current thread until the future - /// completes. - pub fn block_on>(fut: F) { - async_std::task::block_on(fut); - } - - /// Spawn a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for spawning futures on the current - /// thread. - /// - /// # Panics - /// - /// This function panics if ntex system is not running. - #[inline] - pub fn spawn(mut f: F) -> JoinHandle - where - F: Future + 'static, - { - let ptr = crate::CB.with(|cb| (cb.borrow().0)()); - JoinHandle { - fut: async_std::task::spawn_local(async move { - if let Some(ptr) = ptr { - let mut f = unsafe { Pin::new_unchecked(&mut f) }; - let result = poll_fn(|ctx| { - let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); - let result = f.as_mut().poll(ctx); - crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); - result - }) - .await; - crate::CB.with(|cb| (cb.borrow().3)(ptr)); - result - } else { - f.await - } - }), - } - } - - /// Executes a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for executing futures on the current - /// thread. - /// - /// # Panics - /// - /// This function panics if ntex system is not running. - #[inline] - pub fn spawn_fn(f: F) -> JoinHandle - where - F: FnOnce() -> R + 'static, - R: Future + 'static, - { - spawn(async move { f().await }) - } - - /// Spawns a blocking task. - /// - /// The task will be spawned onto a thread pool specifically dedicated - /// to blocking tasks. This is useful to prevent long-running synchronous - /// operations from blocking the main futures executor. - pub fn spawn_blocking(f: F) -> JoinHandle - where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, - { - JoinHandle { - fut: async_std::task::spawn_blocking(f), - } - } - - #[derive(Debug, Copy, Clone)] - pub struct JoinError; - - impl fmt::Display for JoinError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "JoinError") - } - } - - impl std::error::Error for JoinError {} - - pub struct JoinHandle { - fut: async_std::task::JoinHandle, - } - - impl Future for JoinHandle { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx)))) - } - } -} - #[cfg(feature = "tokio")] pub use self::tokio::*; #[cfg(all( not(feature = "tokio"), + not(feature = "compio"), not(feature = "glommio"), feature = "async-std", ))] @@ -343,16 +459,26 @@ pub use self::asyncstd::*; #[cfg(all( not(feature = "tokio"), + not(feature = "compio"), not(feature = "async-std"), feature = "glommio" ))] pub use self::glommio::*; +#[cfg(all( + not(feature = "tokio"), + not(feature = "glommio"), + not(feature = "async-std"), + feature = "compio" +))] +pub use self::compio::*; + /// Runs the provided future, blocking the current thread until the future /// completes. #[cfg(all( not(feature = "tokio"), not(feature = "async-std"), + not(feature = "compio"), not(feature = "glommio") ))] pub fn block_on>(_: F) { @@ -362,6 +488,7 @@ pub fn block_on>(_: F) { #[cfg(all( not(feature = "tokio"), not(feature = "async-std"), + not(feature = "compio"), not(feature = "glommio") ))] pub fn spawn(_: F) -> std::pin::Pin>> @@ -374,6 +501,7 @@ where #[cfg(all( not(feature = "tokio"), not(feature = "async-std"), + not(feature = "compio"), not(feature = "glommio") ))] mod spawn_blocking_stub { @@ -403,6 +531,7 @@ mod spawn_blocking_stub { #[cfg(all( not(feature = "tokio"), not(feature = "async-std"), + not(feature = "compio"), not(feature = "glommio") ))] pub use self::spawn_blocking_stub::*;