diff --git a/Cargo.toml b/Cargo.toml index 871d9de2..d9e97ef4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,10 @@ ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } [workspace.dependencies] +async-channel = "2" async-task = "4.5.0" +atomic-waker = "1.1" +core_affinity = "0.8" bitflags = "2" cfg_aliases = "0.2.1" cfg-if = "1.0.0" @@ -57,7 +60,8 @@ fxhash = "0.2" libc = "0.2.164" log = "0.4" io-uring = "0.7.4" -polling = "3.3.0" +oneshot = "0.1" +polling = "3.7.4" nohash-hasher = "0.2.0" scoped-tls = "1.0.1" slab = "0.4.9" diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index c800c00b..46cf5cc4 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.13", optional = true } +ntex-neon = { version = "0.1.14", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 2b5aa5d0..a5966d76 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -42,4 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } -ntex-neon = { version = "0.1.11", optional = true } +ntex-neon = { version = "0.1.14", optional = true } diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index 0d8dabc5..546a92ff 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.7.3] - 2025-03-28 + +* Better worker availability handling + ## [2.7.2] - 2025-03-27 * Handle paused state diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index bca5f8b2..dcfa8332 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.2" +version = "2.7.3" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -22,13 +22,13 @@ ntex-service = "3.4" ntex-rt = "0.4" ntex-util = "2.8" -async-channel = "2" -async-broadcast = "0.7" -core_affinity = "0.8" -polling = "3.3" -log = "0.4" -socket2 = "0.5" -oneshot = { version = "0.1", default-features = false, features = ["async"] } +async-channel = { workspace = true } +atomic-waker = { workspace = true } +core_affinity = { workspace = true } +oneshot = { workspace = true } +polling = { workspace = true } +log = { workspace = true } +socket2 = { workspace = true } [dev-dependencies] ntex = "2" diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs index ca558a54..f0719750 100644 --- a/ntex-server/src/manager.rs +++ b/ntex-server/src/manager.rs @@ -55,7 +55,7 @@ impl ServerManager { let no_signals = cfg.no_signals; let shared = Arc::new(ServerShared { - paused: AtomicBool::new(false), + paused: AtomicBool::new(true), }); let mgr = ServerManager(Rc::new(Inner { cfg, @@ -139,7 +139,6 @@ impl ServerManager { fn start_worker(mgr: ServerManager, cid: Option) { let _ = ntex_rt::spawn(async move { let id = mgr.next_id(); - let mut wrk = Worker::start(id, mgr.factory(), cid); loop { @@ -212,10 +211,9 @@ impl HandleCmdState { match upd { Update::Available(worker) => { self.workers.push(worker); - if !self.workers.is_empty() { + self.workers.sort(); + if self.workers.len() == 1 { self.mgr.resume(); - } else { - self.workers.sort(); } } Update::Unavailable(worker) => { @@ -234,6 +232,9 @@ impl HandleCmdState { if let Err(item) = self.workers[0].send(item) { self.backlog.push_back(item); self.workers.remove(0); + if self.workers.is_empty() { + self.mgr.pause(); + } break; } } diff --git a/ntex-server/src/net/accept.rs b/ntex-server/src/net/accept.rs index 31793d82..7694d286 100644 --- a/ntex-server/src/net/accept.rs +++ b/ntex-server/src/net/accept.rs @@ -203,14 +203,10 @@ impl Accept { let mut timeout = Some(Duration::ZERO); loop { if let Err(e) = self.poller.wait(&mut events, timeout) { - if e.kind() == io::ErrorKind::Interrupted { - continue; - } else { + if e.kind() != io::ErrorKind::Interrupted { panic!("Cannot wait for events in poller: {}", e) } - } - - if timeout.is_some() { + } else if timeout.is_some() { timeout = None; let _ = self.tx.take().unwrap().send(()); } diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index b9092d0f..b791817d 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -2,8 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{ready, Context, Poll}; use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc}; -use async_broadcast::{self as bus, broadcast}; use async_channel::{unbounded, Receiver, Sender}; +use atomic_waker::AtomicWaker; use core_affinity::CoreId; use ntex_rt::{spawn, Arbiter}; @@ -151,10 +151,8 @@ impl Worker { if self.failed.load(Ordering::Acquire) { WorkerStatus::Failed } else { - // cleanup updates - while self.avail.notify.try_recv().is_ok() {} - - if self.avail.notify.recv_direct().await.is_err() { + self.avail.wait_for_update().await; + if self.avail.failed() { self.failed.store(true, Ordering::Release); } self.status() @@ -196,46 +194,79 @@ impl Future for WorkerStop { #[derive(Debug, Clone)] struct WorkerAvailability { - notify: bus::Receiver<()>, - available: Arc, + inner: Arc, } #[derive(Debug, Clone)] struct WorkerAvailabilityTx { - notify: bus::Sender<()>, - available: Arc, + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + waker: AtomicWaker, + updated: AtomicBool, + available: AtomicBool, + failed: AtomicBool, } impl WorkerAvailability { fn create() -> (Self, WorkerAvailabilityTx) { - let (mut tx, rx) = broadcast(16); - tx.set_overflow(true); + let inner = Arc::new(Inner { + waker: AtomicWaker::new(), + updated: AtomicBool::new(false), + available: AtomicBool::new(false), + failed: AtomicBool::new(false), + }); let avail = WorkerAvailability { - notify: rx, - available: Arc::new(AtomicBool::new(false)), - }; - let avail_tx = WorkerAvailabilityTx { - notify: tx, - available: avail.available.clone(), + inner: inner.clone(), }; + let avail_tx = WorkerAvailabilityTx { inner }; (avail, avail_tx) } + fn failed(&self) -> bool { + self.inner.failed.load(Ordering::Acquire) + } + fn available(&self) -> bool { - self.available.load(Ordering::Acquire) + self.inner.available.load(Ordering::Acquire) + } + + async fn wait_for_update(&self) { + poll_fn(|cx| { + if self.inner.updated.load(Ordering::Acquire) { + self.inner.updated.store(false, Ordering::Release); + Poll::Ready(()) + } else { + self.inner.waker.register(cx.waker()); + Poll::Pending + } + }) + .await; } } impl WorkerAvailabilityTx { fn set(&self, val: bool) { - let old = self.available.swap(val, Ordering::Release); - if !old && val { - let _ = self.notify.try_broadcast(()); + let old = self.inner.available.swap(val, Ordering::Release); + if old != val { + self.inner.updated.store(true, Ordering::Release); + self.inner.waker.wake(); } } } +impl Drop for WorkerAvailabilityTx { + fn drop(&mut self) { + self.inner.failed.store(true, Ordering::Release); + self.inner.updated.store(true, Ordering::Release); + self.inner.available.store(false, Ordering::Release); + self.inner.waker.wake(); + } +} + /// Service worker /// /// Worker accepts message via unbounded channel and starts processing. @@ -256,10 +287,13 @@ where let mut recv = std::pin::pin!(wrk.rx.recv()); let fut = poll_fn(|cx| { match svc.poll_ready(cx) { - Poll::Ready(res) => { - res?; + Poll::Ready(Ok(())) => { wrk.availability.set(true); } + Poll::Ready(Err(err)) => { + wrk.availability.set(false); + return Poll::Ready(Err(err)); + } Poll::Pending => { wrk.availability.set(false); return Poll::Pending; @@ -287,7 +321,6 @@ where let _ = ntex_rt::spawn(async move { svc.shutdown().await; }); - wrk.availability.set(false); } Either::Right(Some(Shutdown { timeout, result })) => { wrk.availability.set(false); @@ -302,6 +335,7 @@ where return; } Either::Left(Ok(false)) | Either::Right(None) => { + wrk.availability.set(false); stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; return; } @@ -311,7 +345,6 @@ where loop { match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await { Either::Left(Ok(service)) => { - wrk.availability.set(true); svc = Pipeline::new(service).bind(); break; } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index da30a2ba..1a947b47 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -68,7 +68,7 @@ ntex-service = "3.4" ntex-macros = "0.1" ntex-util = "2.8" ntex-bytes = "0.1.27" -ntex-server = "2.7" +ntex-server = "2.7.3" ntex-h2 = "1.8.6" ntex-rt = "0.4.27" ntex-io = "2.11"