Add compio runtime support (#407)

This commit is contained in:
Nikolay Kim 2024-08-29 17:02:09 +05:00 committed by GitHub
parent 20011a9120
commit 7944f30f56
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 316 additions and 173 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.14] - 2024-08-29
* Add `compio` runtime support
## [0.4.13] - 2024-04-04
* Use tokio Handle if available

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.4.13"
version = "0.4.14"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -24,6 +24,9 @@ glommio = ["glomm-io", "futures-channel"]
# tokio support
tokio = ["tok-io"]
# compio support
compio = ["comp-io"]
# async-std support
async-std = ["async_std/unstable"]
@ -33,11 +36,14 @@ futures-core = "0.3"
log = "0.4"
oneshot = "0.1"
async_std = { version = "1", package = "async-std", optional = true }
comp-io = { version = "0.11", package = "compio", default-features = false, features = [
"runtime"
], optional = true }
tok-io = { version = "1", package = "tokio", default-features = false, features = [
"rt",
"net",
], optional = true }
async_std = { version = "1", package = "async-std", optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
glomm-io = { version = "0.8", package = "glommio", optional = true }

View file

@ -26,7 +26,9 @@ pub(super) enum ArbiterCommand {
}
/// Arbiters provide an asynchronous execution environment for actors, functions
/// and futures. When an Arbiter is created, it spawns a new OS thread, and
/// and futures.
///
/// When an Arbiter is created, it spawns a new OS thread, and
/// hosts an event loop. Some Arbiter functions execute on the current thread.
pub struct Arbiter {
sender: Sender<ArbiterCommand>,

View file

@ -47,6 +47,291 @@ pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
});
}
#[cfg(feature = "tokio")]
mod tokio {
use std::future::{poll_fn, Future};
use tok_io::runtime::Handle;
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
if let Ok(hnd) = Handle::try_current() {
log::debug!("Use existing tokio runtime and block on future");
hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
} else {
log::debug!("Create tokio runtime and block on future");
let rt = tok_io::runtime::Builder::new_current_thread()
.enable_all()
//.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
.build()
.unwrap();
tok_io::task::LocalSet::new().block_on(&rt, fut);
}
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
tok_io::task::spawn_local(async move {
if let Some(ptr) = ptr {
tok_io::pin!(f);
let result = poll_fn(|ctx| {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else {
f.await
}
})
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
}
#[allow(dead_code)]
#[cfg(feature = "compio")]
mod compio {
use std::task::{ready, Context, Poll};
use std::{fmt, future::poll_fn, future::Future, pin::Pin};
use comp_io::runtime::Runtime;
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
log::debug!("Create compio runtime and block on future");
let rt = Runtime::new().unwrap();
rt.block_on(fut);
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated
/// to blocking tasks. This is useful to prevent long-running synchronous
/// operations from blocking the main futures executor.
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + Sync + 'static,
T: Send + 'static,
{
JoinHandle {
fut: Some(comp_io::runtime::spawn_blocking(f)),
}
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
let fut = comp_io::runtime::spawn(async move {
if let Some(ptr) = ptr {
let mut f = std::pin::pin!(f);
let result = poll_fn(|ctx| {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else {
f.await
}
});
JoinHandle { fut: Some(fut) }
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
#[derive(Debug, Copy, Clone)]
pub struct JoinError;
impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "JoinError")
}
}
impl std::error::Error for JoinError {}
pub struct JoinHandle<T> {
fut: Option<comp_io::runtime::JoinHandle<T>>,
}
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
self.fut.take().unwrap().detach();
}
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(
ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
.map_err(|_| JoinError),
)
}
}
}
#[allow(dead_code)]
#[cfg(feature = "async-std")]
mod asyncstd {
use std::future::{poll_fn, Future};
use std::{fmt, pin::Pin, task::ready, task::Context, task::Poll};
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
async_std::task::block_on(fut);
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
JoinHandle {
fut: async_std::task::spawn_local(async move {
if let Some(ptr) = ptr {
let mut f = unsafe { Pin::new_unchecked(&mut f) };
let result = poll_fn(|ctx| {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else {
f.await
}
}),
}
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated
/// to blocking tasks. This is useful to prevent long-running synchronous
/// operations from blocking the main futures executor.
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
JoinHandle {
fut: async_std::task::spawn_blocking(f),
}
}
#[derive(Debug, Copy, Clone)]
pub struct JoinError;
impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "JoinError")
}
}
impl std::error::Error for JoinError {}
pub struct JoinHandle<T> {
fut: async_std::task::JoinHandle<T>,
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx))))
}
}
}
#[allow(dead_code)]
#[cfg(all(feature = "glommio", target_os = "linux"))]
mod glommio {
@ -161,181 +446,12 @@ mod glommio {
}
}
#[cfg(feature = "tokio")]
mod tokio {
use std::future::{poll_fn, Future};
use tok_io::runtime::Handle;
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
if let Ok(hnd) = Handle::try_current() {
hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
} else {
let rt = tok_io::runtime::Builder::new_current_thread()
.enable_all()
//.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
.build()
.unwrap();
tok_io::task::LocalSet::new().block_on(&rt, fut);
}
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
tok_io::task::spawn_local(async move {
if let Some(ptr) = ptr {
tok_io::pin!(f);
let result = poll_fn(|ctx| {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else {
f.await
}
})
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
}
#[allow(dead_code)]
#[cfg(feature = "async-std")]
mod asyncstd {
use futures_core::ready;
use std::future::{poll_fn, Future};
use std::{fmt, pin::Pin, task::Context, task::Poll};
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
async_std::task::block_on(fut);
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
JoinHandle {
fut: async_std::task::spawn_local(async move {
if let Some(ptr) = ptr {
let mut f = unsafe { Pin::new_unchecked(&mut f) };
let result = poll_fn(|ctx| {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else {
f.await
}
}),
}
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated
/// to blocking tasks. This is useful to prevent long-running synchronous
/// operations from blocking the main futures executor.
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
JoinHandle {
fut: async_std::task::spawn_blocking(f),
}
}
#[derive(Debug, Copy, Clone)]
pub struct JoinError;
impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "JoinError")
}
}
impl std::error::Error for JoinError {}
pub struct JoinHandle<T> {
fut: async_std::task::JoinHandle<T>,
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx))))
}
}
}
#[cfg(feature = "tokio")]
pub use self::tokio::*;
#[cfg(all(
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "glommio"),
feature = "async-std",
))]
@ -343,16 +459,26 @@ pub use self::asyncstd::*;
#[cfg(all(
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "async-std"),
feature = "glommio"
))]
pub use self::glommio::*;
#[cfg(all(
not(feature = "tokio"),
not(feature = "glommio"),
not(feature = "async-std"),
feature = "compio"
))]
pub use self::compio::*;
/// Runs the provided future, blocking the current thread until the future
/// completes.
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "glommio")
))]
pub fn block_on<F: std::future::Future<Output = ()>>(_: F) {
@ -362,6 +488,7 @@ pub fn block_on<F: std::future::Future<Output = ()>>(_: F) {
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "glommio")
))]
pub fn spawn<F>(_: F) -> std::pin::Pin<Box<dyn std::future::Future<Output = F::Output>>>
@ -374,6 +501,7 @@ where
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "glommio")
))]
mod spawn_blocking_stub {
@ -403,6 +531,7 @@ mod spawn_blocking_stub {
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "glommio")
))]
pub use self::spawn_blocking_stub::*;