Add callback for task spawning

This commit is contained in:
Nikolay Kim 2023-01-03 18:27:40 +01:00
parent f5a7f97c9f
commit 925b757565
4 changed files with 70 additions and 7 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.7] - 2023-01-03
* Add callback for task spawning
## [0.4.6] - 2022-09-20
* Add System::block_on() helper method

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.4.6"
version = "0.4.7"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -29,7 +29,7 @@ async-std = ["async_std/unstable"]
[dependencies]
async-oneshot = "0.5.0"
async-channel = "1.6.1"
async-channel = "1.8.0"
futures-core = "0.3"
log = "0.4"

View file

@ -1,4 +1,6 @@
//! A runtime implementation that runs everything on the current thread.
use std::{cell::RefCell, ptr};
mod arbiter;
mod builder;
mod system;
@ -7,6 +9,34 @@ pub use self::arbiter::Arbiter;
pub use self::builder::{Builder, SystemRunner};
pub use self::system::System;
thread_local! {
static CB: RefCell<(TBefore, TSpawn, TAfter)> = RefCell::new((
Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}))
);
}
type TBefore = Box<dyn Fn() -> Option<*const ()>>;
type TSpawn = Box<dyn Fn(*const ()) -> *const ()>;
type TAfter = Box<dyn Fn(*const ())>;
pub unsafe fn spawn_cbs<FBefore, FSpawn, FAfter>(
before: FBefore,
before_spawn: FSpawn,
after_spawn: FAfter,
) where
FBefore: Fn() -> Option<*const ()> + 'static,
FSpawn: Fn(*const ()) -> *const () + 'static,
FAfter: Fn(*const ()) + 'static,
{
CB.with(|cb| {
*cb.borrow_mut() = (
Box::new(before),
Box::new(before_spawn),
Box::new(after_spawn),
);
});
}
#[allow(dead_code)]
#[cfg(all(feature = "glommio", target_os = "linux"))]
mod glommio {
@ -39,11 +69,20 @@ mod glommio {
F: Future + 'static,
F::Output: 'static,
{
let ptr = crate::CB.with(|cb| (&cb.borrow().0)());
JoinHandle {
fut: Either::Left(
glomm_io::spawn_local(async move {
glomm_io::executor().yield_now().await;
f.await
if let Some(ptr) = ptr {
let new_ptr = crate::CB.with(|cb| (&cb.borrow().1)(ptr));
glomm_io::executor().yield_now().await;
let res = f.await;
crate::CB.with(|cb| (&cb.borrow().2)(new_ptr));
res
} else {
glomm_io::executor().yield_now().await;
f.await
}
})
.detach(),
),
@ -131,7 +170,17 @@ mod tokio {
where
F: Future + 'static,
{
tok_io::task::spawn_local(f)
let ptr = crate::CB.with(|cb| (&cb.borrow().0)());
tok_io::task::spawn_local(async move {
if let Some(ptr) = ptr {
let new_ptr = crate::CB.with(|cb| (&cb.borrow().1)(ptr));
let res = f.await;
crate::CB.with(|cb| (&cb.borrow().2)(new_ptr));
res
} else {
f.await
}
})
}
/// Executes a future on the current thread. This does not create a new Arbiter
@ -175,8 +224,18 @@ mod asyncstd {
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (&cb.borrow().0)());
JoinHandle {
fut: async_std::task::spawn_local(f),
fut: async_std::task::spawn_local(async move {
if let Some(ptr) = ptr {
let new_ptr = crate::CB.with(|cb| (&cb.borrow().1)(ptr));
let res = f.await;
crate::CB.with(|cb| (&cb.borrow().2)(new_ptr));
res
} else {
f.await
}
}),
}
}

View file

@ -57,7 +57,7 @@ ntex-macros = "0.1.3"
ntex-util = "0.2.0-beta.0"
ntex-bytes = "0.1.18"
ntex-h2 = "0.2.0-beta.0"
ntex-rt = "0.4.6"
ntex-rt = "0.4.7"
ntex-io = "0.2.0-beta.0"
ntex-tls = "0.2.0-beta.0"
ntex-tokio = { version = "0.2.0-beta.0", optional = true }