Use tokio Handle if available (#339)

This commit is contained in:
Nikolay Kim 2024-04-04 21:14:41 +05:00 committed by GitHub
parent 395cf694e5
commit 9bd05487de
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 44 additions and 12 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.13] - 2024-04-04
* Use tokio Handle if available
## [0.4.12] - 2024-03-25
* Relax Arbiter::exec() generic param

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.4.12"
version = "0.4.13"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -28,7 +28,7 @@ tokio = ["tok-io"]
async-std = ["async_std/unstable"]
[dependencies]
async-channel = "2.1"
async-channel = "2"
futures-core = "0.3"
log = "0.4"
oneshot = "0.1"

View file

@ -135,6 +135,29 @@ impl SystemRunner {
Err(_) => unreachable!(),
}
}
#[cfg(feature = "tokio")]
/// Execute a future and wait for result.
pub async fn run_local<F, R>(self, fut: F) -> R
where
F: Future<Output = R> + 'static,
R: 'static,
{
let SystemRunner {
arb,
arb_controller,
..
} = self;
// run loop
tok_io::task::LocalSet::new()
.run_until(async move {
let _ = crate::spawn(arb);
let _ = crate::spawn(arb_controller);
fut.await
})
.await
}
}
pub struct BlockResult<T>(Rc<RefCell<Option<T>>>);
@ -159,7 +182,7 @@ where
{
let result = Rc::new(RefCell::new(None));
let result_inner = result.clone();
crate::block_on(Box::pin(async move {
crate::block_on(async move {
let _ = crate::spawn(arb);
let _ = crate::spawn(arb_controller);
if let Err(e) = f() {
@ -168,7 +191,7 @@ where
let r = fut.await;
*result_inner.borrow_mut() = Some(Ok(r));
}
}));
});
BlockResult(result)
}

View file

@ -163,17 +163,22 @@ mod glommio {
#[cfg(feature = "tokio")]
mod tokio {
use std::future::{poll_fn, Future};
use tok_io::runtime::Handle;
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
let rt = tok_io::runtime::Builder::new_current_thread()
.enable_all()
// .unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
.build()
.unwrap();
tok_io::task::LocalSet::new().block_on(&rt, fut);
if let Ok(hnd) = Handle::try_current() {
hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
} else {
let rt = tok_io::runtime::Builder::new_current_thread()
.enable_all()
//.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
.build()
.unwrap();
tok_io::task::LocalSet::new().block_on(&rt, fut);
}
}
/// Spawn a future on the current thread. This does not create a new Arbiter