From 432791356c83ab79a3824e12fe50925ed7f8a4b5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 2 Sep 2024 15:46:31 +0500 Subject: [PATCH] Allow to override block_on function (#411) --- ntex-rt/CHANGES.md | 6 ++ ntex-rt/Cargo.toml | 2 +- ntex-rt/src/arbiter.rs | 14 +++- ntex-rt/src/builder.rs | 149 +++++++++++++++++++++++++---------------- ntex-rt/src/lib.rs | 1 + ntex-rt/src/system.rs | 70 ++++++++++++++++--- 6 files changed, 173 insertions(+), 69 deletions(-) diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index dedccfb1..c06e2d59 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.4.16] - 2024-09-02 + +* Allow to override block_on function + +* Add stack size configuration + ## [0.4.15] - 2024-08-30 * No runtime compatibility diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index b9a3bb53..d5449dfc 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.15" +version = "0.4.16" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index ee25b280..f2c5e939 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -91,11 +91,19 @@ impl Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("ntex-rt:worker:{}", id); let sys = System::current(); + let config = sys.config(); let (arb_tx, arb_rx) = unbounded(); let arb_tx2 = arb_tx.clone(); - let handle = thread::Builder::new() - .name(name.clone()) + let builder = if sys.config().stack_size > 0 { + thread::Builder::new() + .name(name.clone()) + .stack_size(sys.config().stack_size) + } else { + thread::Builder::new().name(name.clone()) + }; + + let handle = builder .spawn(move || { let arb = Arbiter::with_sender(arb_tx); @@ -104,7 +112,7 @@ impl Arbiter { System::set_current(sys); - crate::block_on(async move { + config.block_on(async move { // start arbiter controller let _ = crate::spawn(ArbiterController { stop: Some(stop), diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index fef01e11..597e107a 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -1,10 +1,9 @@ -#![allow(clippy::let_underscore_future)] -use std::{cell::RefCell, future::Future, io, rc::Rc}; +use std::{future::Future, io, pin::Pin, sync::Arc}; use async_channel::unbounded; use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter}; -use crate::System; +use crate::{system::SystemConfig, System}; /// Builder struct for a ntex runtime. /// @@ -16,6 +15,10 @@ pub struct Builder { name: String, /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. stop_on_panic: bool, + /// New thread stack size + stack_size: usize, + /// Block on fn + block_on: Option>>) + Sync + Send>>, } impl Builder { @@ -23,6 +26,8 @@ impl Builder { Builder { name: "ntex".into(), stop_on_panic: false, + stack_size: 0, + block_on: None, } } @@ -41,16 +46,36 @@ impl Builder { self } + /// Sets the size of the stack (in bytes) for the new thread. + pub fn stack_size(mut self, size: usize) -> Self { + self.stack_size = size; + self + } + + /// Use custom block_on function + pub fn block_on(mut self, block_on: F) -> Self + where + F: Fn(Pin>>) + Sync + Send + 'static, + { + self.block_on = Some(Arc::new(block_on)); + self + } + /// Create new System. /// /// This method panics if it can not create tokio runtime pub fn finish(self) -> SystemRunner { let (stop_tx, stop) = oneshot::channel(); let (sys_sender, sys_receiver) = unbounded(); - let stop_on_panic = self.stop_on_panic; + + let config = SystemConfig { + block_on: self.block_on, + stack_size: self.stack_size, + stop_on_panic: self.stop_on_panic, + }; let (arb, arb_controller) = Arbiter::new_system(); - let system = System::construct(sys_sender, arb, stop_on_panic); + let system = System::construct(sys_sender, arb, config); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); @@ -97,23 +122,30 @@ impl SystemRunner { stop, arb, arb_controller, + system, .. } = self; // run loop - match block_on(stop, arb, arb_controller, f).take()? { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) + system.config().block_on(async move { + f()?; + + let _ = crate::spawn(arb); + let _ = crate::spawn(arb_controller); + match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } } + Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")), } - Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")), - } + }) } /// Execute a future and wait for result. @@ -126,14 +158,15 @@ impl SystemRunner { let SystemRunner { arb, arb_controller, + system, .. } = self; - // run loop - match block_on(fut, arb, arb_controller, || Ok(())).take() { - Ok(result) => result, - Err(_) => unreachable!(), - } + system.config().block_on(async move { + let _ = crate::spawn(arb); + let _ = crate::spawn(arb_controller); + fut.await + }) } #[cfg(feature = "tokio")] @@ -160,41 +193,6 @@ impl SystemRunner { } } -pub struct BlockResult(Rc>>); - -impl BlockResult { - pub fn take(self) -> T { - self.0.borrow_mut().take().unwrap() - } -} - -#[inline] -fn block_on( - fut: F, - arb: SystemArbiter, - arb_controller: ArbiterController, - f: F1, -) -> BlockResult> -where - F: Future + 'static, - R: 'static, - F1: FnOnce() -> io::Result<()> + 'static, -{ - let result = Rc::new(RefCell::new(None)); - let result_inner = result.clone(); - crate::block_on(async move { - let _ = crate::spawn(arb); - let _ = crate::spawn(arb_controller); - if let Err(e) = f() { - *result_inner.borrow_mut() = Some(Err(e)); - } else { - let r = fut.await; - *result_inner.borrow_mut() = Some(Ok(r)); - } - }); - BlockResult(result) -} - #[cfg(test)] mod tests { use std::sync::mpsc; @@ -235,4 +233,43 @@ mod tests { let id2 = rx.recv().unwrap(); assert_eq!(id, id2); } + + #[cfg(feature = "tokio")] + #[test] + fn test_block_on() { + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || { + let runner = crate::System::build() + .stop_on_panic(true) + .block_on(|fut| { + let rt = tok_io::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + tok_io::task::LocalSet::new().block_on(&rt, fut); + }) + .finish(); + + tx.send(runner.system()).unwrap(); + let _ = runner.run_until_stop(); + }); + let s = System::new("test"); + + let sys = rx.recv().unwrap(); + let id = sys.id(); + let (tx, rx) = mpsc::channel(); + sys.arbiter().exec_fn(move || { + let _ = tx.send(System::current().id()); + }); + let id2 = rx.recv().unwrap(); + assert_eq!(id, id2); + + let id2 = s + .block_on(sys.arbiter().exec(|| System::current().id())) + .unwrap(); + assert_eq!(id, id2); + + sys.stop(); + } } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 4c371b31..b58a4258 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(clippy::type_complexity, clippy::let_underscore_future)] //! A runtime implementation that runs everything on the current thread. use std::{cell::RefCell, ptr}; diff --git a/ntex-rt/src/system.rs b/ntex-rt/src/system.rs index dc2fddec..86b783ad 100644 --- a/ntex-rt/src/system.rs +++ b/ntex-rt/src/system.rs @@ -1,5 +1,7 @@ +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; +use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc}; + use async_channel::Sender; -use std::{cell::RefCell, sync::atomic::AtomicUsize, sync::atomic::Ordering}; use super::arbiter::{Arbiter, SystemCommand}; use super::builder::{Builder, SystemRunner}; @@ -12,7 +14,15 @@ pub struct System { id: usize, sys: Sender, arbiter: Arbiter, - stop_on_panic: bool, + config: SystemConfig, +} + +#[derive(Clone)] +pub(super) struct SystemConfig { + pub(super) stack_size: usize, + pub(super) stop_on_panic: bool, + pub(super) block_on: + Option>>) + Sync + Send>>, } thread_local!( @@ -24,12 +34,12 @@ impl System { pub(super) fn construct( sys: Sender, arbiter: Arbiter, - stop_on_panic: bool, + config: SystemConfig, ) -> Self { let sys = System { sys, + config, arbiter, - stop_on_panic, id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; System::set_current(sys.clone()); @@ -83,18 +93,60 @@ impl System { let _ = self.sys.try_send(SystemCommand::Exit(code)); } - pub(super) fn sys(&self) -> &Sender { - &self.sys - } - /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an /// uncaught panic is thrown from a worker thread. pub fn stop_on_panic(&self) -> bool { - self.stop_on_panic + self.config.stop_on_panic } /// System arbiter pub fn arbiter(&self) -> &Arbiter { &self.arbiter } + + pub(super) fn sys(&self) -> &Sender { + &self.sys + } + + /// System config + pub(super) fn config(&self) -> SystemConfig { + self.config.clone() + } +} + +impl SystemConfig { + /// Execute a future with custom `block_on` method and wait for result. + #[inline] + pub(super) fn block_on(&self, fut: F) -> R + where + F: Future + 'static, + R: 'static, + { + // run loop + let result = Rc::new(RefCell::new(None)); + let result_inner = result.clone(); + + if let Some(block_on) = &self.block_on { + (*block_on)(Box::pin(async move { + let r = fut.await; + *result_inner.borrow_mut() = Some(r); + })); + } else { + crate::block_on(Box::pin(async move { + let r = fut.await; + *result_inner.borrow_mut() = Some(r); + })); + } + let res = result.borrow_mut().take().unwrap(); + res + } +} + +impl fmt::Debug for SystemConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SystemConfig") + .field("stack_size", &self.stack_size) + .field("stop_on_panic", &self.stop_on_panic) + .finish() + } }