ntex-rt improvements (#519)

This commit is contained in:
Nikolay Kim 2025-03-13 02:07:26 +05:00 committed by GitHub
parent 2db266ca0c
commit ecfc2936b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 373 additions and 197 deletions

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, collections::VecDeque, fmt, io, ptr, rc::Rc, task, task::Poll};
use std::{cell::Cell, collections::VecDeque, io, rc::Rc, task, task::Poll};
use ntex_neon::driver::op::{CloseSocket, Handler, Interest};
use ntex_neon::driver::{AsRawFd, DriverApi, RawFd};
@ -372,21 +372,3 @@ impl<T> Drop for StreamCtl<T> {
}
}
}
impl<T> PartialEq for StreamCtl<T> {
#[inline]
fn eq(&self, other: &StreamCtl<T>) -> bool {
self.id == other.id && ptr::eq(&self.inner, &other.inner)
}
}
impl<T: fmt::Debug> fmt::Debug for StreamCtl<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.with(|streams| {
f.debug_struct("StreamCtl")
.field("id", &self.id)
.field("io", &streams[self.id].io)
.finish()
})
}
}

View file

@ -2,6 +2,10 @@
## [0.4.27] - 2025-03-14
* Add srbiters pings ttl
* Retrieves a list of all arbiters in the system
* Add "neon" runtime support
* Drop glommio support

View file

@ -31,7 +31,7 @@ neon = ["ntex-neon"]
[dependencies]
async-channel = "2"
futures-core = "0.3"
futures-timer = "3.0"
log = "0.4"
oneshot = "0.1"
@ -43,3 +43,6 @@ tok-io = { version = "1", package = "tokio", default-features = false, features
], optional = true }
ntex-neon = { version = "0.1", optional = true }
[dev-dependencies]
env_logger = "0.11"

View file

@ -1,22 +1,17 @@
#![allow(clippy::let_underscore_future)]
use std::any::{Any, TypeId};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{ready, Context, Poll};
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
use async_channel::{unbounded, Receiver, Sender};
use futures_core::stream::Stream;
use crate::system::System;
use crate::system::{FnExec, Id, System, SystemCommand};
thread_local!(
static ADDR: RefCell<Option<Arbiter>> = const { RefCell::new(None) };
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
type ServerCommandRx = Pin<Box<dyn Stream<Item = SystemCommand>>>;
type ArbiterCommandRx = Pin<Box<dyn Stream<Item = ArbiterCommand>>>;
pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0);
pub(super) enum ArbiterCommand {
@ -31,13 +26,16 @@ pub(super) enum ArbiterCommand {
/// When an Arbiter is created, it spawns a new OS thread, and
/// hosts an event loop. Some Arbiter functions execute on the current thread.
pub struct Arbiter {
id: usize,
pub(crate) sys_id: usize,
name: Arc<String>,
sender: Sender<ArbiterCommand>,
thread_handle: Option<thread::JoinHandle<()>>,
}
impl fmt::Debug for Arbiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Arbiter")
write!(f, "Arbiter({:?})", self.name.as_ref())
}
}
@ -49,26 +47,20 @@ impl Default for Arbiter {
impl Clone for Arbiter {
fn clone(&self) -> Self {
Self::with_sender(self.sender.clone())
Self::with_sender(self.sys_id, self.id, self.name.clone(), self.sender.clone())
}
}
impl Arbiter {
#[allow(clippy::borrowed_box)]
pub(super) fn new_system() -> (Self, ArbiterController) {
pub(super) fn new_system(name: String) -> (Self, ArbiterController) {
let (tx, rx) = unbounded();
let arb = Arbiter::with_sender(tx);
let arb = Arbiter::with_sender(0, 0, Arc::new(name), tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
(
arb,
ArbiterController {
stop: None,
rx: Box::pin(rx),
},
)
(arb, ArbiterController { rx, stop: None })
}
/// Returns the current thread's arbiter's address. If no Arbiter is present, then this
@ -85,27 +77,37 @@ impl Arbiter {
let _ = self.sender.try_send(ArbiterCommand::Stop);
}
/// Spawn new thread and run event loop in spawned thread.
/// Spawn new thread and run runtime in spawned thread.
/// Returns address of newly created arbiter.
pub fn new() -> Arbiter {
let name = format!("ntex-rt:worker:{}", COUNT.load(Ordering::Relaxed) + 1);
Arbiter::with_name(name)
}
/// Spawn new thread and run runtime in spawned thread.
/// Returns address of newly created arbiter.
pub fn with_name(name: String) -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("ntex-rt:worker:{}", id);
let sys = System::current();
let name2 = Arc::new(name.clone());
let config = sys.config();
let (arb_tx, arb_rx) = unbounded();
let arb_tx2 = arb_tx.clone();
let builder = if sys.config().stack_size > 0 {
thread::Builder::new()
.name(name.clone())
.name(name)
.stack_size(sys.config().stack_size)
} else {
thread::Builder::new().name(name.clone())
thread::Builder::new().name(name)
};
let name = name2.clone();
let sys_id = sys.id();
let handle = builder
.spawn(move || {
let arb = Arbiter::with_sender(arb_tx);
let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
let (stop, stop_rx) = oneshot::channel();
STORAGE.with(|cell| cell.borrow_mut().clear());
@ -114,16 +116,19 @@ impl Arbiter {
config.block_on(async move {
// start arbiter controller
let _ = crate::spawn(ArbiterController {
stop: Some(stop),
rx: Box::pin(arb_rx),
});
let _ = crate::spawn(
ArbiterController {
stop: Some(stop),
rx: arb_rx,
}
.run(),
);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register arbiter
let _ = System::current()
.sys()
.try_send(SystemCommand::RegisterArbiter(id, arb));
.try_send(SystemCommand::RegisterArbiter(Id(id), arb));
// run loop
let _ = stop_rx.await;
@ -132,18 +137,46 @@ impl Arbiter {
// unregister arbiter
let _ = System::current()
.sys()
.try_send(SystemCommand::UnregisterArbiter(id));
.try_send(SystemCommand::UnregisterArbiter(Id(id)));
})
.unwrap_or_else(|err| {
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
});
Arbiter {
id,
name,
sys_id: sys_id.0,
sender: arb_tx2,
thread_handle: Some(handle),
}
}
fn with_sender(
sys_id: usize,
id: usize,
name: Arc<String>,
sender: Sender<ArbiterCommand>,
) -> Self {
Self {
id,
sys_id,
name,
sender,
thread_handle: None,
}
}
/// Id of the arbiter
pub fn id(&self) -> Id {
Id(self.id)
}
/// Name of the arbiter
pub fn name(&self) -> &str {
self.name.as_ref()
}
/// Send a future to the Arbiter's thread, and spawn it.
pub fn spawn<F>(&self, future: F)
where
@ -170,11 +203,9 @@ impl Arbiter {
let _ = self
.sender
.try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
let fut = f();
let fut = Box::pin(async {
let _ = tx.send(fut.await);
crate::spawn(async move {
let _ = tx.send(f().await);
});
crate::spawn(fut);
})));
rx
}
@ -255,13 +286,6 @@ impl Arbiter {
})
}
fn with_sender(sender: Sender<ArbiterCommand>) -> Self {
Self {
sender,
thread_handle: None,
}
}
/// Wait for the event loop to stop by joining the underlying thread (if have Some).
pub fn join(&mut self) -> thread::Result<()> {
if let Some(thread_handle) = self.thread_handle.take() {
@ -272,9 +296,17 @@ impl Arbiter {
}
}
impl Eq for Arbiter {}
impl PartialEq for Arbiter {
fn eq(&self, other: &Self) -> bool {
self.id == other.id && self.sys_id == other.sys_id
}
}
pub(crate) struct ArbiterController {
stop: Option<oneshot::Sender<i32>>,
rx: ArbiterCommandRx,
rx: Receiver<ArbiterCommand>,
}
impl Drop for ArbiterController {
@ -290,118 +322,28 @@ impl Drop for ArbiterController {
}
}
impl Future for ArbiterController {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
impl ArbiterController {
pub(super) async fn run(mut self) {
loop {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => {
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
};
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => {
let _ = crate::spawn(fut);
}
ArbiterCommand::ExecuteFn(f) => {
f.call_box();
}
},
Poll::Pending => return Poll::Pending,
}
}
}
}
#[derive(Debug)]
pub(super) enum SystemCommand {
Exit(i32),
RegisterArbiter(usize, Arbiter),
UnregisterArbiter(usize),
}
pub(super) struct SystemArbiter {
stop: Option<oneshot::Sender<i32>>,
commands: ServerCommandRx,
arbiters: HashMap<usize, Arbiter>,
}
impl SystemArbiter {
pub(super) fn new(
stop: oneshot::Sender<i32>,
commands: Receiver<SystemCommand>,
) -> Self {
SystemArbiter {
commands: Box::pin(commands),
stop: Some(stop),
arbiters: HashMap::new(),
}
}
}
impl fmt::Debug for SystemArbiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SystemArbiter")
.field("arbiters", &self.arbiters)
.finish()
}
}
impl Future for SystemArbiter {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let cmd = ready!(Pin::new(&mut self.commands).poll_next(cx));
log::debug!("Received system command: {:?}", cmd);
match cmd {
None => {
log::debug!("System stopped");
return Poll::Ready(());
match self.rx.recv().await {
Ok(ArbiterCommand::Stop) => {
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
};
break;
}
Some(cmd) => match cmd {
SystemCommand::Exit(code) => {
log::debug!("Stopping system with {} code", code);
// stop arbiters
for arb in self.arbiters.values() {
arb.stop();
}
// stop event loop
if let Some(stop) = self.stop.take() {
let _ = stop.send(code);
}
}
SystemCommand::RegisterArbiter(name, hnd) => {
self.arbiters.insert(name, hnd);
}
SystemCommand::UnregisterArbiter(name) => {
self.arbiters.remove(&name);
}
},
Ok(ArbiterCommand::Execute(fut)) => {
let _ = crate::spawn(fut);
}
Ok(ArbiterCommand::ExecuteFn(f)) => {
f.call_box();
}
Err(_) => break,
}
}
}
}
pub(super) trait FnExec: Send + 'static {
fn call_box(self: Box<Self>);
}
impl<F> FnExec for F
where
F: FnOnce() + Send + 'static,
{
#[allow(clippy::boxed_local)]
fn call_box(self: Box<Self>) {
(*self)()
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -1,9 +1,9 @@
use std::{future::Future, io, pin::Pin, sync::Arc};
use std::{future::Future, io, marker::PhantomData, pin::Pin, rc::Rc, sync::Arc};
use async_channel::unbounded;
use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter};
use crate::{system::SystemConfig, System};
use crate::arbiter::{Arbiter, ArbiterController};
use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
/// Builder struct for a ntex runtime.
///
@ -17,6 +17,8 @@ pub struct Builder {
stop_on_panic: bool,
/// New thread stack size
stack_size: usize,
/// Arbiters ping interval
ping_interval: usize,
/// Block on fn
block_on: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
}
@ -28,6 +30,7 @@ impl Builder {
stop_on_panic: false,
stack_size: 0,
block_on: None,
ping_interval: 1000,
}
}
@ -52,6 +55,15 @@ impl Builder {
self
}
/// Sets ping interval for spawned arbiters.
///
/// Interval is in milliseconds. By default 5000 milliseconds is set.
/// To disable pings set value to zero.
pub fn ping_interval(mut self, interval: usize) -> Self {
self.ping_interval = interval;
self
}
/// Use custom block_on function
pub fn block_on<F>(mut self, block_on: F) -> Self
where
@ -74,18 +86,20 @@ impl Builder {
stop_on_panic: self.stop_on_panic,
};
let (arb, arb_controller) = Arbiter::new_system();
let system = System::construct(sys_sender, arb, config);
let (arb, controller) = Arbiter::new_system(self.name.clone());
let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
let system = System::construct(sys_sender, arb.clone(), config);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
// init system arbiter and run configuration method
SystemRunner {
stop,
arb,
arb_controller,
support,
controller,
system,
_t: PhantomData,
}
}
}
@ -94,9 +108,10 @@ impl Builder {
#[must_use = "SystemRunner must be run"]
pub struct SystemRunner {
stop: oneshot::Receiver<i32>,
arb: SystemArbiter,
arb_controller: ArbiterController,
support: SystemSupport,
controller: ArbiterController,
system: System,
_t: PhantomData<Rc<()>>,
}
impl SystemRunner {
@ -113,15 +128,14 @@ impl SystemRunner {
/// This function will start event loop and will finish once the
/// `System::stop()` function is called.
#[inline]
pub fn run<F>(self, f: F) -> io::Result<()>
where
F: FnOnce() -> io::Result<()> + 'static,
{
let SystemRunner {
controller,
stop,
arb,
arb_controller,
support,
system,
..
} = self;
@ -130,8 +144,8 @@ impl SystemRunner {
system.config().block_on(async move {
f()?;
let _ = crate::spawn(arb);
let _ = crate::spawn(arb_controller);
let _ = crate::spawn(support.run());
let _ = crate::spawn(controller.run());
match stop.await {
Ok(code) => {
if code != 0 {
@ -149,22 +163,21 @@ impl SystemRunner {
}
/// Execute a future and wait for result.
#[inline]
pub fn block_on<F, R>(self, fut: F) -> R
where
F: Future<Output = R> + 'static,
R: 'static,
{
let SystemRunner {
arb,
arb_controller,
controller,
support,
system,
..
} = self;
system.config().block_on(async move {
let _ = crate::spawn(arb);
let _ = crate::spawn(arb_controller);
let _ = crate::spawn(support.run());
let _ = crate::spawn(controller.run());
fut.await
})
}
@ -177,16 +190,16 @@ impl SystemRunner {
R: 'static,
{
let SystemRunner {
arb,
arb_controller,
controller,
support,
..
} = self;
// run loop
tok_io::task::LocalSet::new()
.run_until(async move {
let _ = crate::spawn(arb);
let _ = crate::spawn(arb_controller);
let _ = crate::spawn(support.run());
let _ = crate::spawn(controller.run());
fut.await
})
.await
@ -242,6 +255,7 @@ mod tests {
thread::spawn(move || {
let runner = crate::System::build()
.stop_on_panic(true)
.ping_interval(25)
.block_on(|fut| {
let rt = tok_io::runtime::Builder::new_current_thread()
.enable_all()
@ -270,6 +284,18 @@ mod tests {
.unwrap();
assert_eq!(id, id2);
let (tx, rx) = mpsc::channel();
sys.arbiter().spawn(async move {
futures_timer::Delay::new(std::time::Duration::from_millis(100)).await;
let recs = System::list_arbiter_pings(Arbiter::current().id(), |recs| {
recs.unwrap().clone()
});
let _ = tx.send(recs);
});
let recs = rx.recv().unwrap();
assert!(!recs.is_empty());
sys.stop();
}
}

View file

@ -8,7 +8,7 @@ mod system;
pub use self::arbiter::Arbiter;
pub use self::builder::{Builder, SystemRunner};
pub use self::system::System;
pub use self::system::{Id, PingRecord, System};
thread_local! {
static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((

View file

@ -1,13 +1,31 @@
use std::collections::{HashMap, VecDeque};
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use std::time::{Duration, Instant};
use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc};
use async_channel::Sender;
use async_channel::{Receiver, Sender};
use futures_timer::Delay;
use super::arbiter::{Arbiter, SystemCommand};
use super::arbiter::Arbiter;
use super::builder::{Builder, SystemRunner};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local!(
static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
RefCell::new(HashMap::default());
);
#[derive(Default)]
struct Arbiters {
all: HashMap<Id, Arbiter>,
list: Vec<Arbiter>,
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct Id(pub(crate) usize);
/// System is a runtime manager.
#[derive(Clone, Debug)]
pub struct System {
@ -33,14 +51,17 @@ impl System {
/// Constructs new system and sets it as current
pub(super) fn construct(
sys: Sender<SystemCommand>,
arbiter: Arbiter,
mut arbiter: Arbiter,
config: SystemConfig,
) -> Self {
let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
arbiter.sys_id = id;
let sys = System {
id,
sys,
config,
arbiter,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
};
System::set_current(sys.clone());
sys
@ -79,8 +100,8 @@ impl System {
}
/// System id
pub fn id(&self) -> usize {
self.id
pub fn id(&self) -> Id {
Id(self.id)
}
/// Stop the system
@ -104,6 +125,34 @@ impl System {
&self.arbiter
}
/// Retrieves a list of all arbiters in the system.
///
/// This method should be called from the thread where the system has been initialized,
/// typically the "main" thread.
pub fn list_arbiters<F, R>(f: F) -> R
where
F: FnOnce(&[Arbiter]) -> R,
{
ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
}
/// Retrieves a list of last pings records for specified arbiter.
///
/// This method should be called from the thread where the system has been initialized,
/// typically the "main" thread.
pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
where
F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
{
PINGS.with(|pings| {
if let Some(recs) = pings.borrow().get(&id) {
f(Some(recs))
} else {
f(None)
}
})
}
pub(super) fn sys(&self) -> &Sender<SystemCommand> {
&self.sys
}
@ -150,3 +199,173 @@ impl fmt::Debug for SystemConfig {
.finish()
}
}
#[derive(Debug)]
pub(super) enum SystemCommand {
Exit(i32),
RegisterArbiter(Id, Arbiter),
UnregisterArbiter(Id),
}
pub(super) struct SystemSupport {
stop: Option<oneshot::Sender<i32>>,
commands: Receiver<SystemCommand>,
ping_interval: Duration,
}
impl SystemSupport {
pub(super) fn new(
stop: oneshot::Sender<i32>,
commands: Receiver<SystemCommand>,
ping_interval: usize,
) -> Self {
Self {
commands,
stop: Some(stop),
ping_interval: Duration::from_millis(ping_interval as u64),
}
}
pub(super) async fn run(mut self) {
ARBITERS.with(move |arbs| {
let mut arbiters = arbs.borrow_mut();
arbiters.all.clear();
arbiters.list.clear();
});
loop {
match self.commands.recv().await {
Ok(SystemCommand::Exit(code)) => {
log::debug!("Stopping system with {} code", code);
// stop arbiters
ARBITERS.with(move |arbs| {
let mut arbiters = arbs.borrow_mut();
for arb in arbiters.list.drain(..) {
arb.stop();
}
arbiters.all.clear();
});
// stop event loop
if let Some(stop) = self.stop.take() {
let _ = stop.send(code);
}
}
Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
ARBITERS.with(move |arbs| {
let mut arbiters = arbs.borrow_mut();
arbiters.all.insert(id, hnd.clone());
arbiters.list.push(hnd);
});
}
Ok(SystemCommand::UnregisterArbiter(id)) => {
ARBITERS.with(move |arbs| {
let mut arbiters = arbs.borrow_mut();
if let Some(hnd) = arbiters.all.remove(&id) {
for (idx, arb) in arbiters.list.iter().enumerate() {
if &hnd == arb {
arbiters.list.remove(idx);
break;
}
}
}
});
}
Err(_) => {
log::debug!("System stopped");
return;
}
}
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct PingRecord {
/// Ping start time
pub start: Instant,
/// Round-trip time, if value is not set then ping is in process
pub rtt: Option<Duration>,
}
async fn ping_arbiter(arb: Arbiter, interval: Duration) {
loop {
Delay::new(interval).await;
// check if arbiter is still active
let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
if !is_alive {
PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
break;
}
// calc ttl
let start = Instant::now();
PINGS.with(|pings| {
let mut p = pings.borrow_mut();
let recs = p.entry(arb.id()).or_default();
recs.push_front(PingRecord { start, rtt: None });
recs.truncate(10);
});
let result = arb
.spawn_with(|| async {
yield_to().await;
})
.await;
if result.is_err() {
break;
}
PINGS.with(|pings| {
pings
.borrow_mut()
.get_mut(&arb.id())
.unwrap()
.front_mut()
.unwrap()
.rtt = Some(Instant::now() - start);
});
}
}
async fn yield_to() {
use std::task::{Context, Poll};
struct Yield {
completed: bool,
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.completed {
return Poll::Ready(());
}
self.completed = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
Yield { completed: false }.await;
}
pub(super) trait FnExec: Send + 'static {
fn call_box(self: Box<Self>);
}
impl<F> FnExec for F
where
F: FnOnce() + Send + 'static,
{
#[allow(clippy::boxed_local)]
fn call_box(self: Box<Self>) {
(*self)()
}
}