add enter/exit fn for spawn_cbs (#195)

This commit is contained in:
Will Brown 2023-04-06 13:39:56 -04:00 committed by GitHub
parent 369c43968e
commit 9de3f3060f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -10,29 +10,39 @@ pub use self::builder::{Builder, SystemRunner};
pub use self::system::System; pub use self::system::System;
thread_local! { thread_local! {
static CB: RefCell<(TBefore, TSpawn, TAfter)> = RefCell::new(( static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {})) Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
); );
} }
type TBefore = Box<dyn Fn() -> Option<*const ()>>; type TBefore = Box<dyn Fn() -> Option<*const ()>>;
type TSpawn = Box<dyn Fn(*const ()) -> *const ()>; type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
type TExit = Box<dyn Fn(*const ())>;
type TAfter = Box<dyn Fn(*const ())>; type TAfter = Box<dyn Fn(*const ())>;
pub unsafe fn spawn_cbs<FBefore, FSpawn, FAfter>( /// # Safety
///
/// The user must ensure that the pointer returned by `before` is `'static`. It will become
/// owned by the spawned task for the life of the task. Ownership of the pointer will be
/// returned to the user at the end of the task via `after`. The pointer is opaque to the
/// runtime.
pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
before: FBefore, before: FBefore,
before_spawn: FSpawn, enter: FEnter,
after_spawn: FAfter, exit: FExit,
after: FAfter,
) where ) where
FBefore: Fn() -> Option<*const ()> + 'static, FBefore: Fn() -> Option<*const ()> + 'static,
FSpawn: Fn(*const ()) -> *const () + 'static, FEnter: Fn(*const ()) -> *const () + 'static,
FExit: Fn(*const ()) + 'static,
FAfter: Fn(*const ()) + 'static, FAfter: Fn(*const ()) + 'static,
{ {
CB.with(|cb| { CB.with(|cb| {
*cb.borrow_mut() = ( *cb.borrow_mut() = (
Box::new(before), Box::new(before),
Box::new(before_spawn), Box::new(enter),
Box::new(after_spawn), Box::new(exit),
Box::new(after),
); );
}); });
} }
@ -40,7 +50,8 @@ pub unsafe fn spawn_cbs<FBefore, FSpawn, FAfter>(
#[allow(dead_code)] #[allow(dead_code)]
#[cfg(all(feature = "glommio", target_os = "linux"))] #[cfg(all(feature = "glommio", target_os = "linux"))]
mod glommio { mod glommio {
use std::{future::Future, pin::Pin, task::Context, task::Poll}; use std::future::{poll_fn, Future};
use std::{pin::Pin, task::Context, task::Poll};
use futures_channel::oneshot::Canceled; use futures_channel::oneshot::Canceled;
use glomm_io::task; use glomm_io::task;
@ -64,7 +75,7 @@ mod glommio {
/// ///
/// This function panics if ntex system is not running. /// This function panics if ntex system is not running.
#[inline] #[inline]
pub fn spawn<F>(f: F) -> JoinHandle<F::Output> pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
where where
F: Future + 'static, F: Future + 'static,
F::Output: 'static, F::Output: 'static,
@ -74,11 +85,17 @@ mod glommio {
fut: Either::Left( fut: Either::Left(
glomm_io::spawn_local(async move { glomm_io::spawn_local(async move {
if let Some(ptr) = ptr { if let Some(ptr) = ptr {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
glomm_io::executor().yield_now().await; glomm_io::executor().yield_now().await;
let res = f.await; let mut f = unsafe { Pin::new_unchecked(&mut f) };
crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); let result = poll_fn(|ctx| {
res let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else { } else {
glomm_io::executor().yield_now().await; glomm_io::executor().yield_now().await;
f.await f.await
@ -145,7 +162,7 @@ mod glommio {
#[cfg(feature = "tokio")] #[cfg(feature = "tokio")]
mod tokio { mod tokio {
use std::future::Future; use std::future::{poll_fn, Future};
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle}; pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
/// Runs the provided future, blocking the current thread until the future /// Runs the provided future, blocking the current thread until the future
@ -174,10 +191,16 @@ mod tokio {
let ptr = crate::CB.with(|cb| (cb.borrow().0)()); let ptr = crate::CB.with(|cb| (cb.borrow().0)());
tok_io::task::spawn_local(async move { tok_io::task::spawn_local(async move {
if let Some(ptr) = ptr { if let Some(ptr) = ptr {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); tok_io::pin!(f);
let res = f.await; let result = poll_fn(|ctx| {
crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
res let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else { } else {
f.await f.await
} }
@ -205,7 +228,8 @@ mod tokio {
#[cfg(feature = "async-std")] #[cfg(feature = "async-std")]
mod asyncstd { mod asyncstd {
use futures_core::ready; use futures_core::ready;
use std::{fmt, future::Future, pin::Pin, task::Context, task::Poll}; use std::future::{poll_fn, Future};
use std::{fmt, pin::Pin, task::Context, task::Poll};
/// Runs the provided future, blocking the current thread until the future /// Runs the provided future, blocking the current thread until the future
/// completes. /// completes.
@ -221,7 +245,7 @@ mod asyncstd {
/// ///
/// This function panics if ntex system is not running. /// This function panics if ntex system is not running.
#[inline] #[inline]
pub fn spawn<F>(f: F) -> JoinHandle<F::Output> pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
where where
F: Future + 'static, F: Future + 'static,
{ {
@ -229,10 +253,16 @@ mod asyncstd {
JoinHandle { JoinHandle {
fut: async_std::task::spawn_local(async move { fut: async_std::task::spawn_local(async move {
if let Some(ptr) = ptr { if let Some(ptr) = ptr {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); let mut f = unsafe { Pin::new_unchecked(&mut f) };
let res = f.await; let result = poll_fn(|ctx| {
crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
res let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else { } else {
f.await f.await
} }