From 728ab919a35c7873b2e2d9446a38c6fbc4f15f66 Mon Sep 17 00:00:00 2001
From: Ruangyot Nanchiang <Rayato159@gmail.com>
Date: Fri, 28 Mar 2025 04:12:34 +0700
Subject: [PATCH 1/9] Expose WebStack for external wrapper support in
 downstream crates (#542)

* add public ServiceConfig::register constructor to support external configuration (#250)

* fix: doctest ServiceConfig::register() error (#250)

* add unit testing for ServiceConfig::register()

* replace pub(crate) to pub in ServiceConfig::new() (#250)

* replace pub to pub(crate) for ServiceConfig::new() and add pub for mod ntex::web::stack instead

* remove unsed DefaultError import in config.rs tests

---------

Co-authored-by: RuangyotN <ruangyotn@skyller.co>
---
 ntex/src/web/mod.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ntex/src/web/mod.rs b/ntex/src/web/mod.rs
index 6c3d37b1..8d9adf4d 100644
--- a/ntex/src/web/mod.rs
+++ b/ntex/src/web/mod.rs
@@ -82,7 +82,7 @@ mod route;
 mod scope;
 mod server;
 mod service;
-mod stack;
+pub mod stack;
 pub mod test;
 pub mod types;
 mod util;

From f647ad2eac60b4c9b70461498a3a881a4ecd72c6 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Thu, 27 Mar 2025 22:16:51 +0100
Subject: [PATCH 2/9] Update tests (#544)

---
 ntex-io/Cargo.toml     | 1 -
 ntex-macros/Cargo.toml | 1 -
 ntex-net/Cargo.toml    | 3 +--
 ntex-rt/Cargo.toml     | 3 ---
 ntex/src/http/test.rs  | 1 +
 5 files changed, 2 insertions(+), 7 deletions(-)

diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml
index 6a1e881d..f55aa5d0 100644
--- a/ntex-io/Cargo.toml
+++ b/ntex-io/Cargo.toml
@@ -28,4 +28,3 @@ pin-project-lite = "0.2"
 [dev-dependencies]
 ntex = "2"
 rand = "0.8"
-env_logger = "0.11"
diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml
index f6cad0e2..a5bcf67d 100644
--- a/ntex-macros/Cargo.toml
+++ b/ntex-macros/Cargo.toml
@@ -18,4 +18,3 @@ proc-macro2 = "^1"
 [dev-dependencies]
 ntex = "2"
 futures = "0.3"
-env_logger = "0.11"
diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml
index 8e7335c6..c800c00b 100644
--- a/ntex-net/Cargo.toml
+++ b/ntex-net/Cargo.toml
@@ -40,7 +40,7 @@ ntex-util = "2.5"
 
 ntex-tokio = { version = "0.5.3", optional = true }
 ntex-compio = { version = "0.2.4", optional = true }
-ntex-neon = { version = "0.1.11", optional = true }
+ntex-neon = { version = "0.1.13", optional = true }
 
 bitflags = { workspace = true }
 cfg-if = { workspace = true }
@@ -57,4 +57,3 @@ polling = { workspace = true, optional = true }
 
 [dev-dependencies]
 ntex = "2"
-env_logger = "0.11"
diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml
index e133ceb4..2b5aa5d0 100644
--- a/ntex-rt/Cargo.toml
+++ b/ntex-rt/Cargo.toml
@@ -43,6 +43,3 @@ tok-io = { version = "1", package = "tokio", default-features = false, features
 ], optional = true }
 
 ntex-neon = { version = "0.1.11", optional = true }
-
-[dev-dependencies]
-env_logger = "0.11"
diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs
index c9ecdb0b..0e4a6559 100644
--- a/ntex/src/http/test.rs
+++ b/ntex/src/http/test.rs
@@ -252,6 +252,7 @@ where
             Ok(())
         })
     });
+    thread::sleep(std::time::Duration::from_millis(150));
 
     let (system, server, addr) = rx.recv().unwrap();
 

From 8f2d5056c9a8aab11217273b8b51fb55522ef9df Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 28 Mar 2025 02:10:25 +0100
Subject: [PATCH 3/9] Return PayloadError::Incomplete on server disconnect
 (#545)

---
 ntex-io/src/tasks.rs             | 24 +++-------
 ntex-rt/src/lib.rs               |  8 +++-
 ntex/CHANGES.md                  |  6 +++
 ntex/Cargo.toml                  |  2 +-
 ntex/src/http/client/h1proto.rs  | 82 ++++++++++++++++++--------------
 ntex/src/http/client/response.rs |  5 +-
 6 files changed, 70 insertions(+), 57 deletions(-)

diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs
index 4a04196b..55f99416 100644
--- a/ntex-io/src/tasks.rs
+++ b/ntex-io/src/tasks.rs
@@ -537,7 +537,9 @@ impl IoContext {
                                 self.0.tag(),
                                 nbytes
                             );
-                            inner.dispatch_task.wake();
+                            if !inner.dispatch_task.wake_checked() {
+                                log::error!("Dispatcher waker is not registered");
+                            }
                         } else {
                             if nbytes >= hw {
                                 // read task is paused because of read back-pressure
@@ -735,22 +737,6 @@ impl IoContext {
         false
     }
 
-    pub fn is_write_ready(&self) -> bool {
-        if let Some(waker) = self.0 .0.write_task.take() {
-            let ready = self
-                .0
-                .filter()
-                .poll_write_ready(&mut Context::from_waker(&waker));
-            if !matches!(
-                ready,
-                Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
-            ) {
-                return true;
-            }
-        }
-        false
-    }
-
     pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
     where
         F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
@@ -803,7 +789,9 @@ impl IoContext {
                                 self.0.tag(),
                                 nbytes
                             );
-                            inner.dispatch_task.wake();
+                            if !inner.dispatch_task.wake_checked() {
+                                log::error!("Dispatcher waker is not registered");
+                            }
                         } else {
                             if nbytes >= hw {
                                 // read task is paused because of read back-pressure
diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs
index 1ffd7fe7..d5d85546 100644
--- a/ntex-rt/src/lib.rs
+++ b/ntex-rt/src/lib.rs
@@ -112,6 +112,8 @@ mod tokio {
     ///
     /// This function panics if ntex system is not running.
     #[inline]
+    #[doc(hidden)]
+    #[deprecated]
     pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
     where
         F: FnOnce() -> R + 'static,
@@ -196,6 +198,8 @@ mod compio {
     ///
     /// This function panics if ntex system is not running.
     #[inline]
+    #[doc(hidden)]
+    #[deprecated]
     pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
     where
         F: FnOnce() -> R + 'static,
@@ -323,6 +327,8 @@ mod neon {
     ///
     /// This function panics if ntex system is not running.
     #[inline]
+    #[doc(hidden)]
+    #[deprecated]
     pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
     where
         F: FnOnce() -> R + 'static,
@@ -377,7 +383,7 @@ mod neon {
 
     impl<T> JoinHandle<T> {
         pub fn is_finished(&self) -> bool {
-            false
+            self.fut.is_none()
         }
     }
 
diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md
index cb75aabc..6ef4b5ef 100644
--- a/ntex/CHANGES.md
+++ b/ntex/CHANGES.md
@@ -1,5 +1,11 @@
 # Changes
 
+## [2.12.4] - 2025-03-28
+
+* http: Return PayloadError::Incomplete on server disconnect
+
+* web: Expose WebStack for external wrapper support in downstream crates #542
+
 ## [2.12.3] - 2025-03-22
 
 * web: Export web::app_service::AppService #534
diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml
index 09655c7a..da30a2ba 100644
--- a/ntex/Cargo.toml
+++ b/ntex/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "ntex"
-version = "2.12.3"
+version = "2.12.4"
 authors = ["ntex contributors <team@ntex.rs>"]
 description = "Framework for composable network services"
 readme = "README.md"
diff --git a/ntex/src/http/client/h1proto.rs b/ntex/src/http/client/h1proto.rs
index 06572418..28871225 100644
--- a/ntex/src/http/client/h1proto.rs
+++ b/ntex/src/http/client/h1proto.rs
@@ -1,13 +1,11 @@
-use std::{
-    future::poll_fn, io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant,
-};
+use std::{future::poll_fn, io, io::Write, pin::Pin, task, task::Poll, time::Instant};
 
 use crate::http::body::{BodySize, MessageBody};
 use crate::http::error::PayloadError;
-use crate::http::h1;
 use crate::http::header::{HeaderMap, HeaderValue, HOST};
 use crate::http::message::{RequestHeadType, ResponseHead};
 use crate::http::payload::{Payload, PayloadStream};
+use crate::http::{h1, Version};
 use crate::io::{IoBoxed, RecvError};
 use crate::time::{timeout_checked, Millis};
 use crate::util::{ready, BufMut, Bytes, BytesMut, Stream};
@@ -101,7 +99,13 @@ where
             Ok((head, Payload::None))
         }
         _ => {
-            let pl: PayloadStream = Box::pin(PlStream::new(io, codec, created, pool));
+            let pl: PayloadStream = Box::pin(PlStream::new(
+                io,
+                codec,
+                created,
+                pool,
+                head.version == Version::HTTP_10,
+            ));
             Ok((head, pl.into()))
         }
     }
@@ -137,6 +141,7 @@ pub(super) struct PlStream {
     io: Option<IoBoxed>,
     codec: h1::ClientPayloadCodec,
     created: Instant,
+    http_10: bool,
     pool: Option<Acquired>,
 }
 
@@ -146,12 +151,14 @@ impl PlStream {
         codec: h1::ClientCodec,
         created: Instant,
         pool: Option<Acquired>,
+        http_10: bool,
     ) -> Self {
         PlStream {
             io: Some(io),
             codec: codec.into_payload_codec(),
             created,
             pool,
+            http_10,
         }
     }
 }
@@ -161,41 +168,46 @@ impl Stream for PlStream {
 
     fn poll_next(
         mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
+        cx: &mut task::Context<'_>,
     ) -> Poll<Option<Self::Item>> {
         let mut this = self.as_mut();
         loop {
-            return Poll::Ready(Some(
-                match ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)) {
-                    Ok(chunk) => {
-                        if let Some(chunk) = chunk {
-                            Ok(chunk)
-                        } else {
-                            release_connection(
-                                this.io.take().unwrap(),
-                                !this.codec.keepalive(),
-                                this.created,
-                                this.pool.take(),
-                            );
-                            return Poll::Ready(None);
-                        }
+            let item = ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx));
+            return Poll::Ready(Some(match item {
+                Ok(chunk) => {
+                    if let Some(chunk) = chunk {
+                        Ok(chunk)
+                    } else {
+                        release_connection(
+                            this.io.take().unwrap(),
+                            !this.codec.keepalive(),
+                            this.created,
+                            this.pool.take(),
+                        );
+                        return Poll::Ready(None);
                     }
-                    Err(RecvError::KeepAlive) => {
-                        Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into())
+                }
+                Err(RecvError::KeepAlive) => {
+                    Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into())
+                }
+                Err(RecvError::Stop) => {
+                    Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped").into())
+                }
+                Err(RecvError::WriteBackpressure) => {
+                    ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?;
+                    continue;
+                }
+                Err(RecvError::Decoder(err)) => Err(err),
+                Err(RecvError::PeerGone(Some(err))) => {
+                    Err(PayloadError::Incomplete(Some(err)))
+                }
+                Err(RecvError::PeerGone(None)) => {
+                    if this.http_10 {
+                        return Poll::Ready(None);
                     }
-                    Err(RecvError::Stop) => {
-                        Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped")
-                            .into())
-                    }
-                    Err(RecvError::WriteBackpressure) => {
-                        ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?;
-                        continue;
-                    }
-                    Err(RecvError::Decoder(err)) => Err(err),
-                    Err(RecvError::PeerGone(Some(err))) => Err(err.into()),
-                    Err(RecvError::PeerGone(None)) => return Poll::Ready(None),
-                },
-            ));
+                    Err(PayloadError::Incomplete(None))
+                }
+            }));
         }
     }
 }
diff --git a/ntex/src/http/client/response.rs b/ntex/src/http/client/response.rs
index c68b6e73..9a450687 100644
--- a/ntex/src/http/client/response.rs
+++ b/ntex/src/http/client/response.rs
@@ -387,8 +387,8 @@ impl Future for ReadBody {
         let this = self.get_mut();
 
         loop {
-            return match Pin::new(&mut this.stream).poll_next(cx)? {
-                Poll::Ready(Some(chunk)) => {
+            return match Pin::new(&mut this.stream).poll_next(cx) {
+                Poll::Ready(Some(Ok(chunk))) => {
                     if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit {
                         Poll::Ready(Err(PayloadError::Overflow))
                     } else {
@@ -397,6 +397,7 @@ impl Future for ReadBody {
                     }
                 }
                 Poll::Ready(None) => Poll::Ready(Ok(this.buf.split().freeze())),
+                Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
                 Poll::Pending => {
                     if this.timeout.poll_elapsed(cx).is_ready() {
                         Poll::Ready(Err(PayloadError::Incomplete(Some(

From e9a12841511609b105058a984546f41bfd55f681 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 28 Mar 2025 08:51:44 +0100
Subject: [PATCH 4/9] Better worker availability handling (#546)

---
 Cargo.toml                    |  6 ++-
 ntex-net/Cargo.toml           |  2 +-
 ntex-rt/Cargo.toml            |  2 +-
 ntex-server/CHANGES.md        |  4 ++
 ntex-server/Cargo.toml        | 16 +++----
 ntex-server/src/manager.rs    | 11 ++---
 ntex-server/src/net/accept.rs |  8 +---
 ntex-server/src/wrk.rs        | 83 ++++++++++++++++++++++++-----------
 ntex/Cargo.toml               |  2 +-
 9 files changed, 86 insertions(+), 48 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 871d9de2..d9e97ef4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -46,7 +46,10 @@ ntex-compio = { path = "ntex-compio" }
 ntex-tokio = { path = "ntex-tokio" }
 
 [workspace.dependencies]
+async-channel = "2"
 async-task = "4.5.0"
+atomic-waker = "1.1"
+core_affinity = "0.8"
 bitflags = "2"
 cfg_aliases = "0.2.1"
 cfg-if = "1.0.0"
@@ -57,7 +60,8 @@ fxhash = "0.2"
 libc = "0.2.164"
 log = "0.4"
 io-uring = "0.7.4"
-polling = "3.3.0"
+oneshot = "0.1"
+polling = "3.7.4"
 nohash-hasher = "0.2.0"
 scoped-tls = "1.0.1"
 slab = "0.4.9"
diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml
index c800c00b..46cf5cc4 100644
--- a/ntex-net/Cargo.toml
+++ b/ntex-net/Cargo.toml
@@ -40,7 +40,7 @@ ntex-util = "2.5"
 
 ntex-tokio = { version = "0.5.3", optional = true }
 ntex-compio = { version = "0.2.4", optional = true }
-ntex-neon = { version = "0.1.13", optional = true }
+ntex-neon = { version = "0.1.14", optional = true }
 
 bitflags = { workspace = true }
 cfg-if = { workspace = true }
diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml
index 2b5aa5d0..a5966d76 100644
--- a/ntex-rt/Cargo.toml
+++ b/ntex-rt/Cargo.toml
@@ -42,4 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features
     "net",
 ], optional = true }
 
-ntex-neon = { version = "0.1.11", optional = true }
+ntex-neon = { version = "0.1.14", optional = true }
diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md
index 0d8dabc5..546a92ff 100644
--- a/ntex-server/CHANGES.md
+++ b/ntex-server/CHANGES.md
@@ -1,5 +1,9 @@
 # Changes
 
+## [2.7.3] - 2025-03-28
+
+* Better worker availability handling
+
 ## [2.7.2] - 2025-03-27
 
 * Handle paused state
diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml
index bca5f8b2..dcfa8332 100644
--- a/ntex-server/Cargo.toml
+++ b/ntex-server/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "ntex-server"
-version = "2.7.2"
+version = "2.7.3"
 authors = ["ntex contributors <team@ntex.rs>"]
 description = "Server for ntex framework"
 keywords = ["network", "framework", "async", "futures"]
@@ -22,13 +22,13 @@ ntex-service = "3.4"
 ntex-rt = "0.4"
 ntex-util = "2.8"
 
-async-channel = "2"
-async-broadcast = "0.7"
-core_affinity = "0.8"
-polling = "3.3"
-log = "0.4"
-socket2 = "0.5"
-oneshot = { version = "0.1", default-features = false, features = ["async"] }
+async-channel = { workspace = true }
+atomic-waker = { workspace = true }
+core_affinity = { workspace = true }
+oneshot = { workspace = true }
+polling = { workspace = true }
+log = { workspace = true }
+socket2 = { workspace = true }
 
 [dev-dependencies]
 ntex = "2"
diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs
index ca558a54..f0719750 100644
--- a/ntex-server/src/manager.rs
+++ b/ntex-server/src/manager.rs
@@ -55,7 +55,7 @@ impl<F: ServerConfiguration> ServerManager<F> {
 
         let no_signals = cfg.no_signals;
         let shared = Arc::new(ServerShared {
-            paused: AtomicBool::new(false),
+            paused: AtomicBool::new(true),
         });
         let mgr = ServerManager(Rc::new(Inner {
             cfg,
@@ -139,7 +139,6 @@ impl<F: ServerConfiguration> ServerManager<F> {
 fn start_worker<F: ServerConfiguration>(mgr: ServerManager<F>, cid: Option<CoreId>) {
     let _ = ntex_rt::spawn(async move {
         let id = mgr.next_id();
-
         let mut wrk = Worker::start(id, mgr.factory(), cid);
 
         loop {
@@ -212,10 +211,9 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
         match upd {
             Update::Available(worker) => {
                 self.workers.push(worker);
-                if !self.workers.is_empty() {
+                self.workers.sort();
+                if self.workers.len() == 1 {
                     self.mgr.resume();
-                } else {
-                    self.workers.sort();
                 }
             }
             Update::Unavailable(worker) => {
@@ -234,6 +232,9 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
                 if let Err(item) = self.workers[0].send(item) {
                     self.backlog.push_back(item);
                     self.workers.remove(0);
+                    if self.workers.is_empty() {
+                        self.mgr.pause();
+                    }
                     break;
                 }
             }
diff --git a/ntex-server/src/net/accept.rs b/ntex-server/src/net/accept.rs
index 31793d82..7694d286 100644
--- a/ntex-server/src/net/accept.rs
+++ b/ntex-server/src/net/accept.rs
@@ -203,14 +203,10 @@ impl Accept {
         let mut timeout = Some(Duration::ZERO);
         loop {
             if let Err(e) = self.poller.wait(&mut events, timeout) {
-                if e.kind() == io::ErrorKind::Interrupted {
-                    continue;
-                } else {
+                if e.kind() != io::ErrorKind::Interrupted {
                     panic!("Cannot wait for events in poller: {}", e)
                 }
-            }
-
-            if timeout.is_some() {
+            } else if timeout.is_some() {
                 timeout = None;
                 let _ = self.tx.take().unwrap().send(());
             }
diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs
index b9092d0f..b791817d 100644
--- a/ntex-server/src/wrk.rs
+++ b/ntex-server/src/wrk.rs
@@ -2,8 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::task::{ready, Context, Poll};
 use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc};
 
-use async_broadcast::{self as bus, broadcast};
 use async_channel::{unbounded, Receiver, Sender};
+use atomic_waker::AtomicWaker;
 use core_affinity::CoreId;
 
 use ntex_rt::{spawn, Arbiter};
@@ -151,10 +151,8 @@ impl<T> Worker<T> {
         if self.failed.load(Ordering::Acquire) {
             WorkerStatus::Failed
         } else {
-            // cleanup updates
-            while self.avail.notify.try_recv().is_ok() {}
-
-            if self.avail.notify.recv_direct().await.is_err() {
+            self.avail.wait_for_update().await;
+            if self.avail.failed() {
                 self.failed.store(true, Ordering::Release);
             }
             self.status()
@@ -196,46 +194,79 @@ impl Future for WorkerStop {
 
 #[derive(Debug, Clone)]
 struct WorkerAvailability {
-    notify: bus::Receiver<()>,
-    available: Arc<AtomicBool>,
+    inner: Arc<Inner>,
 }
 
 #[derive(Debug, Clone)]
 struct WorkerAvailabilityTx {
-    notify: bus::Sender<()>,
-    available: Arc<AtomicBool>,
+    inner: Arc<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+    waker: AtomicWaker,
+    updated: AtomicBool,
+    available: AtomicBool,
+    failed: AtomicBool,
 }
 
 impl WorkerAvailability {
     fn create() -> (Self, WorkerAvailabilityTx) {
-        let (mut tx, rx) = broadcast(16);
-        tx.set_overflow(true);
+        let inner = Arc::new(Inner {
+            waker: AtomicWaker::new(),
+            updated: AtomicBool::new(false),
+            available: AtomicBool::new(false),
+            failed: AtomicBool::new(false),
+        });
 
         let avail = WorkerAvailability {
-            notify: rx,
-            available: Arc::new(AtomicBool::new(false)),
-        };
-        let avail_tx = WorkerAvailabilityTx {
-            notify: tx,
-            available: avail.available.clone(),
+            inner: inner.clone(),
         };
+        let avail_tx = WorkerAvailabilityTx { inner };
         (avail, avail_tx)
     }
 
+    fn failed(&self) -> bool {
+        self.inner.failed.load(Ordering::Acquire)
+    }
+
     fn available(&self) -> bool {
-        self.available.load(Ordering::Acquire)
+        self.inner.available.load(Ordering::Acquire)
+    }
+
+    async fn wait_for_update(&self) {
+        poll_fn(|cx| {
+            if self.inner.updated.load(Ordering::Acquire) {
+                self.inner.updated.store(false, Ordering::Release);
+                Poll::Ready(())
+            } else {
+                self.inner.waker.register(cx.waker());
+                Poll::Pending
+            }
+        })
+        .await;
     }
 }
 
 impl WorkerAvailabilityTx {
     fn set(&self, val: bool) {
-        let old = self.available.swap(val, Ordering::Release);
-        if !old && val {
-            let _ = self.notify.try_broadcast(());
+        let old = self.inner.available.swap(val, Ordering::Release);
+        if old != val {
+            self.inner.updated.store(true, Ordering::Release);
+            self.inner.waker.wake();
         }
     }
 }
 
+impl Drop for WorkerAvailabilityTx {
+    fn drop(&mut self) {
+        self.inner.failed.store(true, Ordering::Release);
+        self.inner.updated.store(true, Ordering::Release);
+        self.inner.available.store(false, Ordering::Release);
+        self.inner.waker.wake();
+    }
+}
+
 /// Service worker
 ///
 /// Worker accepts message via unbounded channel and starts processing.
@@ -256,10 +287,13 @@ where
         let mut recv = std::pin::pin!(wrk.rx.recv());
         let fut = poll_fn(|cx| {
             match svc.poll_ready(cx) {
-                Poll::Ready(res) => {
-                    res?;
+                Poll::Ready(Ok(())) => {
                     wrk.availability.set(true);
                 }
+                Poll::Ready(Err(err)) => {
+                    wrk.availability.set(false);
+                    return Poll::Ready(Err(err));
+                }
                 Poll::Pending => {
                     wrk.availability.set(false);
                     return Poll::Pending;
@@ -287,7 +321,6 @@ where
                 let _ = ntex_rt::spawn(async move {
                     svc.shutdown().await;
                 });
-                wrk.availability.set(false);
             }
             Either::Right(Some(Shutdown { timeout, result })) => {
                 wrk.availability.set(false);
@@ -302,6 +335,7 @@ where
                 return;
             }
             Either::Left(Ok(false)) | Either::Right(None) => {
+                wrk.availability.set(false);
                 stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await;
                 return;
             }
@@ -311,7 +345,6 @@ where
         loop {
             match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
                 Either::Left(Ok(service)) => {
-                    wrk.availability.set(true);
                     svc = Pipeline::new(service).bind();
                     break;
                 }
diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml
index da30a2ba..1a947b47 100644
--- a/ntex/Cargo.toml
+++ b/ntex/Cargo.toml
@@ -68,7 +68,7 @@ ntex-service = "3.4"
 ntex-macros = "0.1"
 ntex-util = "2.8"
 ntex-bytes = "0.1.27"
-ntex-server = "2.7"
+ntex-server = "2.7.3"
 ntex-h2 = "1.8.6"
 ntex-rt = "0.4.27"
 ntex-io = "2.11"

From 30928d019ce044b18fe5624b855fbbcee6cd02ae Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 28 Mar 2025 09:11:59 +0100
Subject: [PATCH 5/9] Improve tests (#547)

---
 ntex/tests/http_openssl.rs | 11 ++++++++---
 ntex/tests/http_server.rs  | 11 ++++++++---
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs
index 75227c2c..e46765af 100644
--- a/ntex/tests/http_openssl.rs
+++ b/ntex/tests/http_openssl.rs
@@ -425,11 +425,12 @@ async fn test_h2_service_error() {
     assert_eq!(bytes, Bytes::from_static(b"error"));
 }
 
-struct SetOnDrop(Arc<AtomicUsize>);
+struct SetOnDrop(Arc<AtomicUsize>, Arc<Mutex<Option<::oneshot::Sender<()>>>>);
 
 impl Drop for SetOnDrop {
     fn drop(&mut self) {
         self.0.fetch_add(1, Ordering::Relaxed);
+        let _ = self.1.lock().unwrap().take().unwrap().send(());
     }
 }
 
@@ -437,14 +438,18 @@ impl Drop for SetOnDrop {
 async fn test_h2_client_drop() -> io::Result<()> {
     let count = Arc::new(AtomicUsize::new(0));
     let count2 = count.clone();
+    let (tx, rx) = ::oneshot::channel();
+    let tx = Arc::new(Mutex::new(Some(tx)));
 
     let srv = test_server(move || {
+        let tx = tx.clone();
         let count = count2.clone();
         HttpService::build()
             .h2(move |req: Request| {
+                let tx = tx.clone();
                 let count = count.clone();
                 async move {
-                    let _st = SetOnDrop(count);
+                    let _st = SetOnDrop(count, tx);
                     assert!(req.peer_addr().is_some());
                     assert_eq!(req.version(), Version::HTTP_2);
                     sleep(Seconds(100)).await;
@@ -457,7 +462,7 @@ async fn test_h2_client_drop() -> io::Result<()> {
 
     let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await;
     assert!(result.is_err());
-    sleep(Millis(250)).await;
+    let _ = rx.await;
     assert_eq!(count.load(Ordering::Relaxed), 1);
     Ok(())
 }
diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs
index a4c1d05f..64ab4ede 100644
--- a/ntex/tests/http_server.rs
+++ b/ntex/tests/http_server.rs
@@ -723,11 +723,12 @@ async fn test_h1_service_error() {
     assert_eq!(bytes, Bytes::from_static(b"error"));
 }
 
-struct SetOnDrop(Arc<AtomicUsize>);
+struct SetOnDrop(Arc<AtomicUsize>, Option<::oneshot::Sender<()>>);
 
 impl Drop for SetOnDrop {
     fn drop(&mut self) {
         self.0.fetch_add(1, Ordering::Relaxed);
+        let _ = self.1.take().unwrap().send(());
     }
 }
 
@@ -735,13 +736,17 @@ impl Drop for SetOnDrop {
 async fn test_h1_client_drop() -> io::Result<()> {
     let count = Arc::new(AtomicUsize::new(0));
     let count2 = count.clone();
+    let (tx, rx) = ::oneshot::channel();
+    let tx = Arc::new(Mutex::new(Some(tx)));
 
     let srv = test_server(move || {
+        let tx = tx.clone();
         let count = count2.clone();
         HttpService::build().h1(move |req: Request| {
+            let tx = tx.clone();
             let count = count.clone();
             async move {
-                let _st = SetOnDrop(count);
+                let _st = SetOnDrop(count, tx.lock().unwrap().take());
                 assert!(req.peer_addr().is_some());
                 assert_eq!(req.version(), Version::HTTP_11);
                 sleep(Millis(500)).await;
@@ -752,7 +757,7 @@ async fn test_h1_client_drop() -> io::Result<()> {
 
     let result = timeout(Millis(100), srv.request(Method::GET, "/").send()).await;
     assert!(result.is_err());
-    sleep(Millis(1000)).await;
+    let _ = rx.await;
     assert_eq!(count.load(Ordering::Relaxed), 1);
     Ok(())
 }

From f6fe9c3e10d7eb182a52b8d6c6aebef4ee8c7910 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 28 Mar 2025 10:07:10 +0100
Subject: [PATCH 6/9] Improve tests (#548)

---
 ntex/tests/http_openssl.rs | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs
index e46765af..2d2c98f9 100644
--- a/ntex/tests/http_openssl.rs
+++ b/ntex/tests/http_openssl.rs
@@ -446,13 +446,12 @@ async fn test_h2_client_drop() -> io::Result<()> {
         let count = count2.clone();
         HttpService::build()
             .h2(move |req: Request| {
-                let tx = tx.clone();
-                let count = count.clone();
+                let st = SetOnDrop(count.clone(), tx.clone());
                 async move {
-                    let _st = SetOnDrop(count, tx);
                     assert!(req.peer_addr().is_some());
                     assert_eq!(req.version(), Version::HTTP_2);
-                    sleep(Seconds(100)).await;
+                    sleep(Seconds(30)).await;
+                    drop(st);
                     Ok::<_, io::Error>(Response::Ok().finish())
                 }
             })
@@ -460,9 +459,9 @@ async fn test_h2_client_drop() -> io::Result<()> {
             .map_err(|_| ())
     });
 
-    let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await;
+    let result = timeout(Millis(150), srv.srequest(Method::GET, "/").send()).await;
     assert!(result.is_err());
-    let _ = rx.await;
+    let _ = timeout(Millis(1500), rx).await;
     assert_eq!(count.load(Ordering::Relaxed), 1);
     Ok(())
 }

From e4f24ee41f2fec056c782c63ba2e3bea1f041a1b Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 28 Mar 2025 11:39:24 +0100
Subject: [PATCH 7/9] Handle flaky tests

---
 ntex/tests/http_openssl.rs | 2 +-
 ntex/tests/http_server.rs  | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs
index 2d2c98f9..921310a8 100644
--- a/ntex/tests/http_openssl.rs
+++ b/ntex/tests/http_openssl.rs
@@ -459,7 +459,7 @@ async fn test_h2_client_drop() -> io::Result<()> {
             .map_err(|_| ())
     });
 
-    let result = timeout(Millis(150), srv.srequest(Method::GET, "/").send()).await;
+    let result = timeout(Millis(1500), srv.srequest(Method::GET, "/").send()).await;
     assert!(result.is_err());
     let _ = timeout(Millis(1500), rx).await;
     assert_eq!(count.load(Ordering::Relaxed), 1);
diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs
index 64ab4ede..0227573b 100644
--- a/ntex/tests/http_server.rs
+++ b/ntex/tests/http_server.rs
@@ -749,13 +749,13 @@ async fn test_h1_client_drop() -> io::Result<()> {
                 let _st = SetOnDrop(count, tx.lock().unwrap().take());
                 assert!(req.peer_addr().is_some());
                 assert_eq!(req.version(), Version::HTTP_11);
-                sleep(Millis(500)).await;
+                sleep(Millis(50000)).await;
                 Ok::<_, io::Error>(Response::Ok().finish())
             }
         })
     });
 
-    let result = timeout(Millis(100), srv.request(Method::GET, "/").send()).await;
+    let result = timeout(Millis(1500), srv.request(Method::GET, "/").send()).await;
     assert!(result.is_err());
     let _ = rx.await;
     assert_eq!(count.load(Ordering::Relaxed), 1);

From f5ee55d598810f56379f789b2fbcd163b8730f03 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 28 Mar 2025 21:06:11 +0100
Subject: [PATCH 8/9] Handle socket close for poll driver (#549)

---
 ntex-net/CHANGES.md               |  4 ++
 ntex-net/Cargo.toml               |  2 +-
 ntex-net/src/rt_polling/driver.rs | 61 ++++++++++++++-----------------
 ntex-server/Cargo.toml            |  2 +-
 ntex-server/src/manager.rs        |  2 +-
 5 files changed, 35 insertions(+), 36 deletions(-)

diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md
index a16145fc..e60744ef 100644
--- a/ntex-net/CHANGES.md
+++ b/ntex-net/CHANGES.md
@@ -1,5 +1,9 @@
 # Changes
 
+## [2.5.10] - 2025-03-28
+
+* Better closed sockets handling
+
 ## [2.5.9] - 2025-03-27
 
 * Handle closed sockets
diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml
index 46cf5cc4..0142a911 100644
--- a/ntex-net/Cargo.toml
+++ b/ntex-net/Cargo.toml
@@ -40,7 +40,7 @@ ntex-util = "2.5"
 
 ntex-tokio = { version = "0.5.3", optional = true }
 ntex-compio = { version = "0.2.4", optional = true }
-ntex-neon = { version = "0.1.14", optional = true }
+ntex-neon = { version = "0.1.15", optional = true }
 
 bitflags = { workspace = true }
 cfg-if = { workspace = true }
diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs
index c179a77a..24db553d 100644
--- a/ntex-net/src/rt_polling/driver.rs
+++ b/ntex-net/src/rt_polling/driver.rs
@@ -1,5 +1,5 @@
 use std::os::fd::{AsRawFd, RawFd};
-use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task, task::Poll};
+use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task, task::Poll};
 
 use ntex_neon::driver::{DriverApi, Event, Handler};
 use ntex_neon::{syscall, Runtime};
@@ -18,7 +18,6 @@ bitflags::bitflags! {
     struct Flags: u8 {
         const RD     = 0b0000_0001;
         const WR     = 0b0000_0010;
-        const CLOSED = 0b0000_0100;
     }
 }
 
@@ -106,18 +105,15 @@ impl<T> Handler for StreamOpsHandler<T> {
                 return;
             }
             let item = &mut streams[id];
-            log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev);
-
-            if item.flags.contains(Flags::CLOSED) {
+            if item.io.is_none() {
                 return;
             }
+            log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev);
 
             // handle HUP
             if ev.is_interrupt() {
                 item.context.stopped(None);
-                if item.io.take().is_some() {
-                    close(id as u32, item, &self.inner.api);
-                }
+                close(id as u32, item, &self.inner.api, None, true);
                 return;
             }
 
@@ -177,9 +173,7 @@ impl<T> Handler for StreamOpsHandler<T> {
                             item.fd,
                             item.io.is_some()
                         );
-                        if item.io.is_some() {
-                            close(id, &mut item, &self.inner.api);
-                        }
+                        close(id, &mut item, &self.inner.api, None, true);
                     }
                 }
                 self.inner.delayd_drop.set(false);
@@ -197,10 +191,7 @@ impl<T> Handler for StreamOpsHandler<T> {
                     item.fd,
                     err
                 );
-                item.context.stopped(Some(err));
-                if item.io.take().is_some() {
-                    close(id as u32, item, &self.inner.api);
-                }
+                close(id as u32, item, &self.inner.api, Some(err), false);
             }
         })
     }
@@ -222,14 +213,26 @@ fn close<T>(
     id: u32,
     item: &mut StreamItem<T>,
     api: &DriverApi,
-) -> ntex_rt::JoinHandle<io::Result<i32>> {
-    let fd = item.fd;
-    item.flags.insert(Flags::CLOSED);
-    api.detach(fd, id);
-    ntex_rt::spawn_blocking(move || {
-        syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?;
-        syscall!(libc::close(fd))
-    })
+    error: Option<io::Error>,
+    shutdown: bool,
+) -> Option<ntex_rt::JoinHandle<io::Result<i32>>> {
+    if let Some(io) = item.io.take() {
+        log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd);
+        mem::forget(io);
+        if let Some(err) = error {
+            item.context.stopped(Some(err));
+        }
+        let fd = item.fd;
+        api.detach(fd, id);
+        Some(ntex_rt::spawn_blocking(move || {
+            if shutdown {
+                let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR));
+            }
+            syscall!(libc::close(fd))
+        }))
+    } else {
+        None
+    }
 }
 
 impl<T> StreamCtl<T> {
@@ -237,13 +240,7 @@ impl<T> StreamCtl<T> {
         let id = self.id as usize;
         let fut = self.inner.with(|streams| {
             let item = &mut streams[id];
-            if let Some(io) = item.io.take() {
-                log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd);
-                std::mem::forget(io);
-                Some(close(self.id, item, &self.inner.api))
-            } else {
-                None
-            }
+            close(self.id, item, &self.inner.api, None, false)
         });
         async move {
             if let Some(fut) = fut {
@@ -360,9 +357,7 @@ impl<T> Drop for StreamCtl<T> {
                     item.fd,
                     item.io.is_some()
                 );
-                if item.io.is_some() {
-                    close(self.id, &mut item, &self.inner.api);
-                }
+                close(self.id, &mut item, &self.inner.api, None, true);
             }
             self.inner.streams.set(Some(streams));
         } else {
diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml
index dcfa8332..a88be635 100644
--- a/ntex-server/Cargo.toml
+++ b/ntex-server/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "ntex-server"
-version = "2.7.3"
+version = "2.7.4"
 authors = ["ntex contributors <team@ntex.rs>"]
 description = "Server for ntex framework"
 keywords = ["network", "framework", "async", "futures"]
diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs
index f0719750..9d0bfe8d 100644
--- a/ntex-server/src/manager.rs
+++ b/ntex-server/src/manager.rs
@@ -180,7 +180,7 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
     fn process(&mut self, mut item: F::Item) {
         loop {
             if !self.workers.is_empty() {
-                if self.next > self.workers.len() {
+                if self.next >= self.workers.len() {
                     self.next = self.workers.len() - 1;
                 }
                 match self.workers[self.next].send(item) {

From 01d3a2440b074e0f77a089bc4d5e1496b4acdf76 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 28 Mar 2025 21:26:07 +0100
Subject: [PATCH 9/9] Prepare net release (#550)

---
 ntex-net/Cargo.toml | 2 +-
 ntex/Cargo.toml     | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml
index 0142a911..5a72d3eb 100644
--- a/ntex-net/Cargo.toml
+++ b/ntex-net/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "ntex-net"
-version = "2.5.9"
+version = "2.5.10"
 authors = ["ntex contributors <team@ntex.rs>"]
 description = "ntexwork utils for ntex framework"
 keywords = ["network", "framework", "async", "futures"]
diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml
index 1a947b47..0ea37469 100644
--- a/ntex/Cargo.toml
+++ b/ntex/Cargo.toml
@@ -68,11 +68,11 @@ ntex-service = "3.4"
 ntex-macros = "0.1"
 ntex-util = "2.8"
 ntex-bytes = "0.1.27"
-ntex-server = "2.7.3"
+ntex-server = "2.7.4"
 ntex-h2 = "1.8.6"
 ntex-rt = "0.4.27"
 ntex-io = "2.11"
-ntex-net = "2.5.8"
+ntex-net = "2.5.10"
 ntex-tls = "2.3"
 
 base64 = "0.22"