From 9bd05487de0293ed7035a2bdf3b0b5b5409f1bb2 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 4 Apr 2024 21:14:41 +0500 Subject: [PATCH] Use tokio Handle if available (#339) --- ntex-rt/CHANGES.md | 4 ++++ ntex-rt/Cargo.toml | 4 ++-- ntex-rt/src/builder.rs | 27 +++++++++++++++++++++++++-- ntex-rt/src/lib.rs | 17 +++++++++++------ ntex-server/Cargo.toml | 4 ++-- 5 files changed, 44 insertions(+), 12 deletions(-) diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index cacd5dfc..820597b8 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.13] - 2024-04-04 + +* Use tokio Handle if available + ## [0.4.12] - 2024-03-25 * Relax Arbiter::exec() generic param diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 4ebee9c4..9110d8bb 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.12" +version = "0.4.13" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -28,7 +28,7 @@ tokio = ["tok-io"] async-std = ["async_std/unstable"] [dependencies] -async-channel = "2.1" +async-channel = "2" futures-core = "0.3" log = "0.4" oneshot = "0.1" diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index 55db7bd0..fef01e11 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -135,6 +135,29 @@ impl SystemRunner { Err(_) => unreachable!(), } } + + #[cfg(feature = "tokio")] + /// Execute a future and wait for result. + pub async fn run_local(self, fut: F) -> R + where + F: Future + 'static, + R: 'static, + { + let SystemRunner { + arb, + arb_controller, + .. + } = self; + + // run loop + tok_io::task::LocalSet::new() + .run_until(async move { + let _ = crate::spawn(arb); + let _ = crate::spawn(arb_controller); + fut.await + }) + .await + } } pub struct BlockResult(Rc>>); @@ -159,7 +182,7 @@ where { let result = Rc::new(RefCell::new(None)); let result_inner = result.clone(); - crate::block_on(Box::pin(async move { + crate::block_on(async move { let _ = crate::spawn(arb); let _ = crate::spawn(arb_controller); if let Err(e) = f() { @@ -168,7 +191,7 @@ where let r = fut.await; *result_inner.borrow_mut() = Some(Ok(r)); } - })); + }); BlockResult(result) } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index d17d8ecf..fd686073 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -163,17 +163,22 @@ mod glommio { #[cfg(feature = "tokio")] mod tokio { use std::future::{poll_fn, Future}; + use tok_io::runtime::Handle; pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle}; /// Runs the provided future, blocking the current thread until the future /// completes. pub fn block_on>(fut: F) { - let rt = tok_io::runtime::Builder::new_current_thread() - .enable_all() - // .unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime) - .build() - .unwrap(); - tok_io::task::LocalSet::new().block_on(&rt, fut); + if let Ok(hnd) = Handle::try_current() { + hnd.block_on(tok_io::task::LocalSet::new().run_until(fut)); + } else { + let rt = tok_io::runtime::Builder::new_current_thread() + .enable_all() + //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + tok_io::task::LocalSet::new().block_on(&rt, fut); + } } /// Spawn a future on the current thread. This does not create a new Arbiter diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index 322fe4ef..76893170 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -19,10 +19,10 @@ path = "src/lib.rs" ntex-bytes = "0.1.24" ntex-net = "1.0" ntex-service = "2.0" -ntex-rt = "0.4.12" +ntex-rt = "0.4.13" ntex-util = "1.0" -async-channel = "2.2" +async-channel = "2" async-broadcast = "0.7" polling = "3.3" log = "0.4"