mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 21:07:39 +03:00
Add compio runtime support
This commit is contained in:
parent
20011a9120
commit
c355ca2587
5 changed files with 316 additions and 173 deletions
|
@ -14,6 +14,8 @@ members = [
|
|||
"ntex-tls",
|
||||
"ntex-macros",
|
||||
"ntex-util",
|
||||
|
||||
"ntex-compio",
|
||||
"ntex-glommio",
|
||||
"ntex-tokio",
|
||||
"ntex-async-std",
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [0.4.14] - 2024-08-29
|
||||
|
||||
* Add `compio` runtime support
|
||||
|
||||
## [0.4.13] - 2024-04-04
|
||||
|
||||
* Use tokio Handle if available
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-rt"
|
||||
version = "0.4.13"
|
||||
version = "0.4.14"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "ntex runtime"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -24,6 +24,9 @@ glommio = ["glomm-io", "futures-channel"]
|
|||
# tokio support
|
||||
tokio = ["tok-io"]
|
||||
|
||||
# compio support
|
||||
compio = ["comp-io"]
|
||||
|
||||
# async-std support
|
||||
async-std = ["async_std/unstable"]
|
||||
|
||||
|
@ -33,11 +36,14 @@ futures-core = "0.3"
|
|||
log = "0.4"
|
||||
oneshot = "0.1"
|
||||
|
||||
async_std = { version = "1", package = "async-std", optional = true }
|
||||
comp-io = { version = "0.11", package = "compio", default-features = false, features = [
|
||||
"runtime"
|
||||
], optional = true }
|
||||
tok-io = { version = "1", package = "tokio", default-features = false, features = [
|
||||
"rt",
|
||||
"net",
|
||||
], optional = true }
|
||||
async_std = { version = "1", package = "async-std", optional = true }
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
glomm-io = { version = "0.8", package = "glommio", optional = true }
|
||||
|
|
|
@ -26,7 +26,9 @@ pub(super) enum ArbiterCommand {
|
|||
}
|
||||
|
||||
/// Arbiters provide an asynchronous execution environment for actors, functions
|
||||
/// and futures. When an Arbiter is created, it spawns a new OS thread, and
|
||||
/// and futures.
|
||||
///
|
||||
/// When an Arbiter is created, it spawns a new OS thread, and
|
||||
/// hosts an event loop. Some Arbiter functions execute on the current thread.
|
||||
pub struct Arbiter {
|
||||
sender: Sender<ArbiterCommand>,
|
||||
|
|
|
@ -47,6 +47,291 @@ pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
|
|||
});
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
mod tokio {
|
||||
use std::future::{poll_fn, Future};
|
||||
use tok_io::runtime::Handle;
|
||||
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
pub fn block_on<F: Future<Output = ()>>(fut: F) {
|
||||
if let Ok(hnd) = Handle::try_current() {
|
||||
log::debug!("Use existing tokio runtime and block on future");
|
||||
hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
|
||||
} else {
|
||||
log::debug!("Create tokio runtime and block on future");
|
||||
|
||||
let rt = tok_io::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
//.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
|
||||
.build()
|
||||
.unwrap();
|
||||
tok_io::task::LocalSet::new().block_on(&rt, fut);
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
|
||||
tok_io::task::spawn_local(async move {
|
||||
if let Some(ptr) = ptr {
|
||||
tok_io::pin!(f);
|
||||
let result = poll_fn(|ctx| {
|
||||
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
|
||||
let result = f.as_mut().poll(ctx);
|
||||
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
|
||||
result
|
||||
})
|
||||
.await;
|
||||
crate::CB.with(|cb| (cb.borrow().3)(ptr));
|
||||
result
|
||||
} else {
|
||||
f.await
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
spawn(async move { f().await })
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[cfg(feature = "compio")]
|
||||
mod compio {
|
||||
use std::task::{ready, Context, Poll};
|
||||
use std::{fmt, future::poll_fn, future::Future, pin::Pin};
|
||||
|
||||
use comp_io::runtime::Runtime;
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
pub fn block_on<F: Future<Output = ()>>(fut: F) {
|
||||
log::debug!("Create compio runtime and block on future");
|
||||
let rt = Runtime::new().unwrap();
|
||||
rt.block_on(fut);
|
||||
}
|
||||
|
||||
/// Spawns a blocking task.
|
||||
///
|
||||
/// The task will be spawned onto a thread pool specifically dedicated
|
||||
/// to blocking tasks. This is useful to prevent long-running synchronous
|
||||
/// operations from blocking the main futures executor.
|
||||
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + Sync + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
JoinHandle {
|
||||
fut: Some(comp_io::runtime::spawn_blocking(f)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
|
||||
let fut = comp_io::runtime::spawn(async move {
|
||||
if let Some(ptr) = ptr {
|
||||
let mut f = std::pin::pin!(f);
|
||||
let result = poll_fn(|ctx| {
|
||||
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
|
||||
let result = f.as_mut().poll(ctx);
|
||||
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
|
||||
result
|
||||
})
|
||||
.await;
|
||||
crate::CB.with(|cb| (cb.borrow().3)(ptr));
|
||||
result
|
||||
} else {
|
||||
f.await
|
||||
}
|
||||
});
|
||||
|
||||
JoinHandle { fut: Some(fut) }
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
spawn(async move { f().await })
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct JoinError;
|
||||
|
||||
impl fmt::Display for JoinError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "JoinError")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for JoinError {}
|
||||
|
||||
pub struct JoinHandle<T> {
|
||||
fut: Option<comp_io::runtime::JoinHandle<T>>,
|
||||
}
|
||||
|
||||
impl<T> Drop for JoinHandle<T> {
|
||||
fn drop(&mut self) {
|
||||
self.fut.take().unwrap().detach();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for JoinHandle<T> {
|
||||
type Output = Result<T, JoinError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Poll::Ready(
|
||||
ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
|
||||
.map_err(|_| JoinError),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[cfg(feature = "async-std")]
|
||||
mod asyncstd {
|
||||
use std::future::{poll_fn, Future};
|
||||
use std::{fmt, pin::Pin, task::ready, task::Context, task::Poll};
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
pub fn block_on<F: Future<Output = ()>>(fut: F) {
|
||||
async_std::task::block_on(fut);
|
||||
}
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
|
||||
JoinHandle {
|
||||
fut: async_std::task::spawn_local(async move {
|
||||
if let Some(ptr) = ptr {
|
||||
let mut f = unsafe { Pin::new_unchecked(&mut f) };
|
||||
let result = poll_fn(|ctx| {
|
||||
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
|
||||
let result = f.as_mut().poll(ctx);
|
||||
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
|
||||
result
|
||||
})
|
||||
.await;
|
||||
crate::CB.with(|cb| (cb.borrow().3)(ptr));
|
||||
result
|
||||
} else {
|
||||
f.await
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
spawn(async move { f().await })
|
||||
}
|
||||
|
||||
/// Spawns a blocking task.
|
||||
///
|
||||
/// The task will be spawned onto a thread pool specifically dedicated
|
||||
/// to blocking tasks. This is useful to prevent long-running synchronous
|
||||
/// operations from blocking the main futures executor.
|
||||
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
JoinHandle {
|
||||
fut: async_std::task::spawn_blocking(f),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct JoinError;
|
||||
|
||||
impl fmt::Display for JoinError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "JoinError")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for JoinError {}
|
||||
|
||||
pub struct JoinHandle<T> {
|
||||
fut: async_std::task::JoinHandle<T>,
|
||||
}
|
||||
|
||||
impl<T> Future for JoinHandle<T> {
|
||||
type Output = Result<T, JoinError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[cfg(all(feature = "glommio", target_os = "linux"))]
|
||||
mod glommio {
|
||||
|
@ -161,181 +446,12 @@ mod glommio {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
mod tokio {
|
||||
use std::future::{poll_fn, Future};
|
||||
use tok_io::runtime::Handle;
|
||||
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
pub fn block_on<F: Future<Output = ()>>(fut: F) {
|
||||
if let Ok(hnd) = Handle::try_current() {
|
||||
hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
|
||||
} else {
|
||||
let rt = tok_io::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
//.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
|
||||
.build()
|
||||
.unwrap();
|
||||
tok_io::task::LocalSet::new().block_on(&rt, fut);
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
|
||||
tok_io::task::spawn_local(async move {
|
||||
if let Some(ptr) = ptr {
|
||||
tok_io::pin!(f);
|
||||
let result = poll_fn(|ctx| {
|
||||
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
|
||||
let result = f.as_mut().poll(ctx);
|
||||
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
|
||||
result
|
||||
})
|
||||
.await;
|
||||
crate::CB.with(|cb| (cb.borrow().3)(ptr));
|
||||
result
|
||||
} else {
|
||||
f.await
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
spawn(async move { f().await })
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[cfg(feature = "async-std")]
|
||||
mod asyncstd {
|
||||
use futures_core::ready;
|
||||
use std::future::{poll_fn, Future};
|
||||
use std::{fmt, pin::Pin, task::Context, task::Poll};
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
pub fn block_on<F: Future<Output = ()>>(fut: F) {
|
||||
async_std::task::block_on(fut);
|
||||
}
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
|
||||
JoinHandle {
|
||||
fut: async_std::task::spawn_local(async move {
|
||||
if let Some(ptr) = ptr {
|
||||
let mut f = unsafe { Pin::new_unchecked(&mut f) };
|
||||
let result = poll_fn(|ctx| {
|
||||
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
|
||||
let result = f.as_mut().poll(ctx);
|
||||
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
|
||||
result
|
||||
})
|
||||
.await;
|
||||
crate::CB.with(|cb| (cb.borrow().3)(ptr));
|
||||
result
|
||||
} else {
|
||||
f.await
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
spawn(async move { f().await })
|
||||
}
|
||||
|
||||
/// Spawns a blocking task.
|
||||
///
|
||||
/// The task will be spawned onto a thread pool specifically dedicated
|
||||
/// to blocking tasks. This is useful to prevent long-running synchronous
|
||||
/// operations from blocking the main futures executor.
|
||||
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
JoinHandle {
|
||||
fut: async_std::task::spawn_blocking(f),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct JoinError;
|
||||
|
||||
impl fmt::Display for JoinError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "JoinError")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for JoinError {}
|
||||
|
||||
pub struct JoinHandle<T> {
|
||||
fut: async_std::task::JoinHandle<T>,
|
||||
}
|
||||
|
||||
impl<T> Future for JoinHandle<T> {
|
||||
type Output = Result<T, JoinError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use self::tokio::*;
|
||||
|
||||
#[cfg(all(
|
||||
not(feature = "tokio"),
|
||||
not(feature = "compio"),
|
||||
not(feature = "glommio"),
|
||||
feature = "async-std",
|
||||
))]
|
||||
|
@ -343,16 +459,26 @@ pub use self::asyncstd::*;
|
|||
|
||||
#[cfg(all(
|
||||
not(feature = "tokio"),
|
||||
not(feature = "compio"),
|
||||
not(feature = "async-std"),
|
||||
feature = "glommio"
|
||||
))]
|
||||
pub use self::glommio::*;
|
||||
|
||||
#[cfg(all(
|
||||
not(feature = "tokio"),
|
||||
not(feature = "glommio"),
|
||||
not(feature = "async-std"),
|
||||
feature = "compio"
|
||||
))]
|
||||
pub use self::compio::*;
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
#[cfg(all(
|
||||
not(feature = "tokio"),
|
||||
not(feature = "async-std"),
|
||||
not(feature = "compio"),
|
||||
not(feature = "glommio")
|
||||
))]
|
||||
pub fn block_on<F: std::future::Future<Output = ()>>(_: F) {
|
||||
|
@ -362,6 +488,7 @@ pub fn block_on<F: std::future::Future<Output = ()>>(_: F) {
|
|||
#[cfg(all(
|
||||
not(feature = "tokio"),
|
||||
not(feature = "async-std"),
|
||||
not(feature = "compio"),
|
||||
not(feature = "glommio")
|
||||
))]
|
||||
pub fn spawn<F>(_: F) -> std::pin::Pin<Box<dyn std::future::Future<Output = F::Output>>>
|
||||
|
@ -374,6 +501,7 @@ where
|
|||
#[cfg(all(
|
||||
not(feature = "tokio"),
|
||||
not(feature = "async-std"),
|
||||
not(feature = "compio"),
|
||||
not(feature = "glommio")
|
||||
))]
|
||||
mod spawn_blocking_stub {
|
||||
|
@ -403,6 +531,7 @@ mod spawn_blocking_stub {
|
|||
#[cfg(all(
|
||||
not(feature = "tokio"),
|
||||
not(feature = "async-std"),
|
||||
not(feature = "compio"),
|
||||
not(feature = "glommio")
|
||||
))]
|
||||
pub use self::spawn_blocking_stub::*;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue