diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 68bb59d5..7c36064b 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, collections::VecDeque, fmt, io, ptr, rc::Rc, task, task::Poll}; +use std::{cell::Cell, collections::VecDeque, io, rc::Rc, task, task::Poll}; use ntex_neon::driver::op::{CloseSocket, Handler, Interest}; use ntex_neon::driver::{AsRawFd, DriverApi, RawFd}; @@ -372,21 +372,3 @@ impl Drop for StreamCtl { } } } - -impl PartialEq for StreamCtl { - #[inline] - fn eq(&self, other: &StreamCtl) -> bool { - self.id == other.id && ptr::eq(&self.inner, &other.inner) - } -} - -impl fmt::Debug for StreamCtl { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.with(|streams| { - f.debug_struct("StreamCtl") - .field("id", &self.id) - .field("io", &streams[self.id].io) - .finish() - }) - } -} diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index 5676ac12..f2ab4736 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -2,6 +2,10 @@ ## [0.4.27] - 2025-03-14 +* Add srbiters pings ttl + +* Retrieves a list of all arbiters in the system + * Add "neon" runtime support * Drop glommio support diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 4fde0c17..e8ff8910 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -31,7 +31,7 @@ neon = ["ntex-neon"] [dependencies] async-channel = "2" -futures-core = "0.3" +futures-timer = "3.0" log = "0.4" oneshot = "0.1" @@ -43,3 +43,6 @@ tok-io = { version = "1", package = "tokio", default-features = false, features ], optional = true } ntex-neon = { version = "0.1", optional = true } + +[dev-dependencies] +env_logger = "0.11" diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index 6379acc5..48a673ca 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -1,22 +1,17 @@ #![allow(clippy::let_underscore_future)] use std::any::{Any, TypeId}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{ready, Context, Poll}; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread}; use async_channel::{unbounded, Receiver, Sender}; -use futures_core::stream::Stream; -use crate::system::System; +use crate::system::{FnExec, Id, System, SystemCommand}; thread_local!( static ADDR: RefCell> = const { RefCell::new(None) }; static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); -type ServerCommandRx = Pin>>; -type ArbiterCommandRx = Pin>>; - pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(super) enum ArbiterCommand { @@ -31,13 +26,16 @@ pub(super) enum ArbiterCommand { /// When an Arbiter is created, it spawns a new OS thread, and /// hosts an event loop. Some Arbiter functions execute on the current thread. pub struct Arbiter { + id: usize, + pub(crate) sys_id: usize, + name: Arc, sender: Sender, thread_handle: Option>, } impl fmt::Debug for Arbiter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Arbiter") + write!(f, "Arbiter({:?})", self.name.as_ref()) } } @@ -49,26 +47,20 @@ impl Default for Arbiter { impl Clone for Arbiter { fn clone(&self) -> Self { - Self::with_sender(self.sender.clone()) + Self::with_sender(self.sys_id, self.id, self.name.clone(), self.sender.clone()) } } impl Arbiter { #[allow(clippy::borrowed_box)] - pub(super) fn new_system() -> (Self, ArbiterController) { + pub(super) fn new_system(name: String) -> (Self, ArbiterController) { let (tx, rx) = unbounded(); - let arb = Arbiter::with_sender(tx); + let arb = Arbiter::with_sender(0, 0, Arc::new(name), tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); STORAGE.with(|cell| cell.borrow_mut().clear()); - ( - arb, - ArbiterController { - stop: None, - rx: Box::pin(rx), - }, - ) + (arb, ArbiterController { rx, stop: None }) } /// Returns the current thread's arbiter's address. If no Arbiter is present, then this @@ -85,27 +77,37 @@ impl Arbiter { let _ = self.sender.try_send(ArbiterCommand::Stop); } - /// Spawn new thread and run event loop in spawned thread. + /// Spawn new thread and run runtime in spawned thread. /// Returns address of newly created arbiter. pub fn new() -> Arbiter { + let name = format!("ntex-rt:worker:{}", COUNT.load(Ordering::Relaxed) + 1); + Arbiter::with_name(name) + } + + /// Spawn new thread and run runtime in spawned thread. + /// Returns address of newly created arbiter. + pub fn with_name(name: String) -> Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("ntex-rt:worker:{}", id); let sys = System::current(); + let name2 = Arc::new(name.clone()); let config = sys.config(); let (arb_tx, arb_rx) = unbounded(); let arb_tx2 = arb_tx.clone(); let builder = if sys.config().stack_size > 0 { thread::Builder::new() - .name(name.clone()) + .name(name) .stack_size(sys.config().stack_size) } else { - thread::Builder::new().name(name.clone()) + thread::Builder::new().name(name) }; + let name = name2.clone(); + let sys_id = sys.id(); + let handle = builder .spawn(move || { - let arb = Arbiter::with_sender(arb_tx); + let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx); let (stop, stop_rx) = oneshot::channel(); STORAGE.with(|cell| cell.borrow_mut().clear()); @@ -114,16 +116,19 @@ impl Arbiter { config.block_on(async move { // start arbiter controller - let _ = crate::spawn(ArbiterController { - stop: Some(stop), - rx: Box::pin(arb_rx), - }); + let _ = crate::spawn( + ArbiterController { + stop: Some(stop), + rx: arb_rx, + } + .run(), + ); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); // register arbiter let _ = System::current() .sys() - .try_send(SystemCommand::RegisterArbiter(id, arb)); + .try_send(SystemCommand::RegisterArbiter(Id(id), arb)); // run loop let _ = stop_rx.await; @@ -132,18 +137,46 @@ impl Arbiter { // unregister arbiter let _ = System::current() .sys() - .try_send(SystemCommand::UnregisterArbiter(id)); + .try_send(SystemCommand::UnregisterArbiter(Id(id))); }) .unwrap_or_else(|err| { panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) }); Arbiter { + id, + name, + sys_id: sys_id.0, sender: arb_tx2, thread_handle: Some(handle), } } + fn with_sender( + sys_id: usize, + id: usize, + name: Arc, + sender: Sender, + ) -> Self { + Self { + id, + sys_id, + name, + sender, + thread_handle: None, + } + } + + /// Id of the arbiter + pub fn id(&self) -> Id { + Id(self.id) + } + + /// Name of the arbiter + pub fn name(&self) -> &str { + self.name.as_ref() + } + /// Send a future to the Arbiter's thread, and spawn it. pub fn spawn(&self, future: F) where @@ -170,11 +203,9 @@ impl Arbiter { let _ = self .sender .try_send(ArbiterCommand::ExecuteFn(Box::new(move || { - let fut = f(); - let fut = Box::pin(async { - let _ = tx.send(fut.await); + crate::spawn(async move { + let _ = tx.send(f().await); }); - crate::spawn(fut); }))); rx } @@ -255,13 +286,6 @@ impl Arbiter { }) } - fn with_sender(sender: Sender) -> Self { - Self { - sender, - thread_handle: None, - } - } - /// Wait for the event loop to stop by joining the underlying thread (if have Some). pub fn join(&mut self) -> thread::Result<()> { if let Some(thread_handle) = self.thread_handle.take() { @@ -272,9 +296,17 @@ impl Arbiter { } } +impl Eq for Arbiter {} + +impl PartialEq for Arbiter { + fn eq(&self, other: &Self) -> bool { + self.id == other.id && self.sys_id == other.sys_id + } +} + pub(crate) struct ArbiterController { stop: Option>, - rx: ArbiterCommandRx, + rx: Receiver, } impl Drop for ArbiterController { @@ -290,118 +322,28 @@ impl Drop for ArbiterController { } } -impl Future for ArbiterController { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +impl ArbiterController { + pub(super) async fn run(mut self) { loop { - match Pin::new(&mut self.rx).poll_next(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(item)) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Poll::Ready(()); - } - ArbiterCommand::Execute(fut) => { - let _ = crate::spawn(fut); - } - ArbiterCommand::ExecuteFn(f) => { - f.call_box(); - } - }, - Poll::Pending => return Poll::Pending, - } - } - } -} - -#[derive(Debug)] -pub(super) enum SystemCommand { - Exit(i32), - RegisterArbiter(usize, Arbiter), - UnregisterArbiter(usize), -} - -pub(super) struct SystemArbiter { - stop: Option>, - commands: ServerCommandRx, - arbiters: HashMap, -} - -impl SystemArbiter { - pub(super) fn new( - stop: oneshot::Sender, - commands: Receiver, - ) -> Self { - SystemArbiter { - commands: Box::pin(commands), - stop: Some(stop), - arbiters: HashMap::new(), - } - } -} - -impl fmt::Debug for SystemArbiter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SystemArbiter") - .field("arbiters", &self.arbiters) - .finish() - } -} - -impl Future for SystemArbiter { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - let cmd = ready!(Pin::new(&mut self.commands).poll_next(cx)); - log::debug!("Received system command: {:?}", cmd); - match cmd { - None => { - log::debug!("System stopped"); - return Poll::Ready(()); + match self.rx.recv().await { + Ok(ArbiterCommand::Stop) => { + if let Some(stop) = self.stop.take() { + let _ = stop.send(0); + }; + break; } - Some(cmd) => match cmd { - SystemCommand::Exit(code) => { - log::debug!("Stopping system with {} code", code); - - // stop arbiters - for arb in self.arbiters.values() { - arb.stop(); - } - // stop event loop - if let Some(stop) = self.stop.take() { - let _ = stop.send(code); - } - } - SystemCommand::RegisterArbiter(name, hnd) => { - self.arbiters.insert(name, hnd); - } - SystemCommand::UnregisterArbiter(name) => { - self.arbiters.remove(&name); - } - }, + Ok(ArbiterCommand::Execute(fut)) => { + let _ = crate::spawn(fut); + } + Ok(ArbiterCommand::ExecuteFn(f)) => { + f.call_box(); + } + Err(_) => break, } } } } -pub(super) trait FnExec: Send + 'static { - fn call_box(self: Box); -} - -impl FnExec for F -where - F: FnOnce() + Send + 'static, -{ - #[allow(clippy::boxed_local)] - fn call_box(self: Box) { - (*self)() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index 597e107a..e16c01cb 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -1,9 +1,9 @@ -use std::{future::Future, io, pin::Pin, sync::Arc}; +use std::{future::Future, io, marker::PhantomData, pin::Pin, rc::Rc, sync::Arc}; use async_channel::unbounded; -use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter}; -use crate::{system::SystemConfig, System}; +use crate::arbiter::{Arbiter, ArbiterController}; +use crate::system::{System, SystemCommand, SystemConfig, SystemSupport}; /// Builder struct for a ntex runtime. /// @@ -17,6 +17,8 @@ pub struct Builder { stop_on_panic: bool, /// New thread stack size stack_size: usize, + /// Arbiters ping interval + ping_interval: usize, /// Block on fn block_on: Option>>) + Sync + Send>>, } @@ -28,6 +30,7 @@ impl Builder { stop_on_panic: false, stack_size: 0, block_on: None, + ping_interval: 1000, } } @@ -52,6 +55,15 @@ impl Builder { self } + /// Sets ping interval for spawned arbiters. + /// + /// Interval is in milliseconds. By default 5000 milliseconds is set. + /// To disable pings set value to zero. + pub fn ping_interval(mut self, interval: usize) -> Self { + self.ping_interval = interval; + self + } + /// Use custom block_on function pub fn block_on(mut self, block_on: F) -> Self where @@ -74,18 +86,20 @@ impl Builder { stop_on_panic: self.stop_on_panic, }; - let (arb, arb_controller) = Arbiter::new_system(); - let system = System::construct(sys_sender, arb, config); + let (arb, controller) = Arbiter::new_system(self.name.clone()); + let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone())); + let system = System::construct(sys_sender, arb.clone(), config); // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); + let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval); // init system arbiter and run configuration method SystemRunner { stop, - arb, - arb_controller, + support, + controller, system, + _t: PhantomData, } } } @@ -94,9 +108,10 @@ impl Builder { #[must_use = "SystemRunner must be run"] pub struct SystemRunner { stop: oneshot::Receiver, - arb: SystemArbiter, - arb_controller: ArbiterController, + support: SystemSupport, + controller: ArbiterController, system: System, + _t: PhantomData>, } impl SystemRunner { @@ -113,15 +128,14 @@ impl SystemRunner { /// This function will start event loop and will finish once the /// `System::stop()` function is called. - #[inline] pub fn run(self, f: F) -> io::Result<()> where F: FnOnce() -> io::Result<()> + 'static, { let SystemRunner { + controller, stop, - arb, - arb_controller, + support, system, .. } = self; @@ -130,8 +144,8 @@ impl SystemRunner { system.config().block_on(async move { f()?; - let _ = crate::spawn(arb); - let _ = crate::spawn(arb_controller); + let _ = crate::spawn(support.run()); + let _ = crate::spawn(controller.run()); match stop.await { Ok(code) => { if code != 0 { @@ -149,22 +163,21 @@ impl SystemRunner { } /// Execute a future and wait for result. - #[inline] pub fn block_on(self, fut: F) -> R where F: Future + 'static, R: 'static, { let SystemRunner { - arb, - arb_controller, + controller, + support, system, .. } = self; system.config().block_on(async move { - let _ = crate::spawn(arb); - let _ = crate::spawn(arb_controller); + let _ = crate::spawn(support.run()); + let _ = crate::spawn(controller.run()); fut.await }) } @@ -177,16 +190,16 @@ impl SystemRunner { R: 'static, { let SystemRunner { - arb, - arb_controller, + controller, + support, .. } = self; // run loop tok_io::task::LocalSet::new() .run_until(async move { - let _ = crate::spawn(arb); - let _ = crate::spawn(arb_controller); + let _ = crate::spawn(support.run()); + let _ = crate::spawn(controller.run()); fut.await }) .await @@ -242,6 +255,7 @@ mod tests { thread::spawn(move || { let runner = crate::System::build() .stop_on_panic(true) + .ping_interval(25) .block_on(|fut| { let rt = tok_io::runtime::Builder::new_current_thread() .enable_all() @@ -270,6 +284,18 @@ mod tests { .unwrap(); assert_eq!(id, id2); + let (tx, rx) = mpsc::channel(); + sys.arbiter().spawn(async move { + futures_timer::Delay::new(std::time::Duration::from_millis(100)).await; + + let recs = System::list_arbiter_pings(Arbiter::current().id(), |recs| { + recs.unwrap().clone() + }); + let _ = tx.send(recs); + }); + let recs = rx.recv().unwrap(); + + assert!(!recs.is_empty()); sys.stop(); } } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 8c3ebf83..37715f2e 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -8,7 +8,7 @@ mod system; pub use self::arbiter::Arbiter; pub use self::builder::{Builder, SystemRunner}; -pub use self::system::System; +pub use self::system::{Id, PingRecord, System}; thread_local! { static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new(( diff --git a/ntex-rt/src/system.rs b/ntex-rt/src/system.rs index 86b783ad..257f81ed 100644 --- a/ntex-rt/src/system.rs +++ b/ntex-rt/src/system.rs @@ -1,13 +1,31 @@ +use std::collections::{HashMap, VecDeque}; use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; +use std::time::{Duration, Instant}; use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc}; -use async_channel::Sender; +use async_channel::{Receiver, Sender}; +use futures_timer::Delay; -use super::arbiter::{Arbiter, SystemCommand}; +use super::arbiter::Arbiter; use super::builder::{Builder, SystemRunner}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); +thread_local!( + static ARBITERS: RefCell = RefCell::new(Arbiters::default()); + static PINGS: RefCell>> = + RefCell::new(HashMap::default()); +); + +#[derive(Default)] +struct Arbiters { + all: HashMap, + list: Vec, +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub struct Id(pub(crate) usize); + /// System is a runtime manager. #[derive(Clone, Debug)] pub struct System { @@ -33,14 +51,17 @@ impl System { /// Constructs new system and sets it as current pub(super) fn construct( sys: Sender, - arbiter: Arbiter, + mut arbiter: Arbiter, config: SystemConfig, ) -> Self { + let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst); + arbiter.sys_id = id; + let sys = System { + id, sys, config, arbiter, - id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; System::set_current(sys.clone()); sys @@ -79,8 +100,8 @@ impl System { } /// System id - pub fn id(&self) -> usize { - self.id + pub fn id(&self) -> Id { + Id(self.id) } /// Stop the system @@ -104,6 +125,34 @@ impl System { &self.arbiter } + /// Retrieves a list of all arbiters in the system. + /// + /// This method should be called from the thread where the system has been initialized, + /// typically the "main" thread. + pub fn list_arbiters(f: F) -> R + where + F: FnOnce(&[Arbiter]) -> R, + { + ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref())) + } + + /// Retrieves a list of last pings records for specified arbiter. + /// + /// This method should be called from the thread where the system has been initialized, + /// typically the "main" thread. + pub fn list_arbiter_pings(id: Id, f: F) -> R + where + F: FnOnce(Option<&VecDeque>) -> R, + { + PINGS.with(|pings| { + if let Some(recs) = pings.borrow().get(&id) { + f(Some(recs)) + } else { + f(None) + } + }) + } + pub(super) fn sys(&self) -> &Sender { &self.sys } @@ -150,3 +199,173 @@ impl fmt::Debug for SystemConfig { .finish() } } + +#[derive(Debug)] +pub(super) enum SystemCommand { + Exit(i32), + RegisterArbiter(Id, Arbiter), + UnregisterArbiter(Id), +} + +pub(super) struct SystemSupport { + stop: Option>, + commands: Receiver, + ping_interval: Duration, +} + +impl SystemSupport { + pub(super) fn new( + stop: oneshot::Sender, + commands: Receiver, + ping_interval: usize, + ) -> Self { + Self { + commands, + stop: Some(stop), + ping_interval: Duration::from_millis(ping_interval as u64), + } + } + + pub(super) async fn run(mut self) { + ARBITERS.with(move |arbs| { + let mut arbiters = arbs.borrow_mut(); + arbiters.all.clear(); + arbiters.list.clear(); + }); + + loop { + match self.commands.recv().await { + Ok(SystemCommand::Exit(code)) => { + log::debug!("Stopping system with {} code", code); + + // stop arbiters + ARBITERS.with(move |arbs| { + let mut arbiters = arbs.borrow_mut(); + for arb in arbiters.list.drain(..) { + arb.stop(); + } + arbiters.all.clear(); + }); + + // stop event loop + if let Some(stop) = self.stop.take() { + let _ = stop.send(code); + } + } + Ok(SystemCommand::RegisterArbiter(id, hnd)) => { + crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval)); + ARBITERS.with(move |arbs| { + let mut arbiters = arbs.borrow_mut(); + arbiters.all.insert(id, hnd.clone()); + arbiters.list.push(hnd); + }); + } + Ok(SystemCommand::UnregisterArbiter(id)) => { + ARBITERS.with(move |arbs| { + let mut arbiters = arbs.borrow_mut(); + if let Some(hnd) = arbiters.all.remove(&id) { + for (idx, arb) in arbiters.list.iter().enumerate() { + if &hnd == arb { + arbiters.list.remove(idx); + break; + } + } + } + }); + } + Err(_) => { + log::debug!("System stopped"); + return; + } + } + } + } +} + +#[derive(Copy, Clone, Debug)] +pub struct PingRecord { + /// Ping start time + pub start: Instant, + /// Round-trip time, if value is not set then ping is in process + pub rtt: Option, +} + +async fn ping_arbiter(arb: Arbiter, interval: Duration) { + loop { + Delay::new(interval).await; + + // check if arbiter is still active + let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id())); + + if !is_alive { + PINGS.with(|pings| pings.borrow_mut().remove(&arb.id())); + break; + } + + // calc ttl + let start = Instant::now(); + PINGS.with(|pings| { + let mut p = pings.borrow_mut(); + let recs = p.entry(arb.id()).or_default(); + recs.push_front(PingRecord { start, rtt: None }); + recs.truncate(10); + }); + + let result = arb + .spawn_with(|| async { + yield_to().await; + }) + .await; + + if result.is_err() { + break; + } + + PINGS.with(|pings| { + pings + .borrow_mut() + .get_mut(&arb.id()) + .unwrap() + .front_mut() + .unwrap() + .rtt = Some(Instant::now() - start); + }); + } +} + +async fn yield_to() { + use std::task::{Context, Poll}; + + struct Yield { + completed: bool, + } + + impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.completed { + return Poll::Ready(()); + } + self.completed = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + Yield { completed: false }.await; +} + +pub(super) trait FnExec: Send + 'static { + fn call_box(self: Box); +} + +impl FnExec for F +where + F: FnOnce() + Send + 'static, +{ + #[allow(clippy::boxed_local)] + fn call_box(self: Box) { + (*self)() + } +}