diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index a16145fc..e60744ef 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.10] - 2025-03-28 + +* Better closed sockets handling + ## [2.5.9] - 2025-03-27 * Handle closed sockets diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 46cf5cc4..5a72d3eb 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.9" +version = "2.5.10" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.14", optional = true } +ntex-neon = { version = "0.1.15", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index c179a77a..24db553d 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task, task::Poll}; +use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; @@ -18,7 +18,6 @@ bitflags::bitflags! { struct Flags: u8 { const RD = 0b0000_0001; const WR = 0b0000_0010; - const CLOSED = 0b0000_0100; } } @@ -106,18 +105,15 @@ impl Handler for StreamOpsHandler { return; } let item = &mut streams[id]; - log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - - if item.flags.contains(Flags::CLOSED) { + if item.io.is_none() { return; } + log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); // handle HUP if ev.is_interrupt() { item.context.stopped(None); - if item.io.take().is_some() { - close(id as u32, item, &self.inner.api); - } + close(id as u32, item, &self.inner.api, None, true); return; } @@ -177,9 +173,7 @@ impl Handler for StreamOpsHandler { item.fd, item.io.is_some() ); - if item.io.is_some() { - close(id, &mut item, &self.inner.api); - } + close(id, &mut item, &self.inner.api, None, true); } } self.inner.delayd_drop.set(false); @@ -197,10 +191,7 @@ impl Handler for StreamOpsHandler { item.fd, err ); - item.context.stopped(Some(err)); - if item.io.take().is_some() { - close(id as u32, item, &self.inner.api); - } + close(id as u32, item, &self.inner.api, Some(err), false); } }) } @@ -222,14 +213,26 @@ fn close( id: u32, item: &mut StreamItem, api: &DriverApi, -) -> ntex_rt::JoinHandle> { - let fd = item.fd; - item.flags.insert(Flags::CLOSED); - api.detach(fd, id); - ntex_rt::spawn_blocking(move || { - syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; - syscall!(libc::close(fd)) - }) + error: Option, + shutdown: bool, +) -> Option>> { + if let Some(io) = item.io.take() { + log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); + mem::forget(io); + if let Some(err) = error { + item.context.stopped(Some(err)); + } + let fd = item.fd; + api.detach(fd, id); + Some(ntex_rt::spawn_blocking(move || { + if shutdown { + let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR)); + } + syscall!(libc::close(fd)) + })) + } else { + None + } } impl StreamCtl { @@ -237,13 +240,7 @@ impl StreamCtl { let id = self.id as usize; let fut = self.inner.with(|streams| { let item = &mut streams[id]; - if let Some(io) = item.io.take() { - log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); - std::mem::forget(io); - Some(close(self.id, item, &self.inner.api)) - } else { - None - } + close(self.id, item, &self.inner.api, None, false) }); async move { if let Some(fut) = fut { @@ -360,9 +357,7 @@ impl Drop for StreamCtl { item.fd, item.io.is_some() ); - if item.io.is_some() { - close(self.id, &mut item, &self.inner.api); - } + close(self.id, &mut item, &self.inner.api, None, true); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index dcfa8332..a88be635 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.3" +version = "2.7.4" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs index f0719750..9d0bfe8d 100644 --- a/ntex-server/src/manager.rs +++ b/ntex-server/src/manager.rs @@ -180,7 +180,7 @@ impl HandleCmdState { fn process(&mut self, mut item: F::Item) { loop { if !self.workers.is_empty() { - if self.next > self.workers.len() { + if self.next >= self.workers.len() { self.next = self.workers.len() - 1; } match self.workers[self.next].send(item) { diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 1a947b47..0ea37469 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -68,11 +68,11 @@ ntex-service = "3.4" ntex-macros = "0.1" ntex-util = "2.8" ntex-bytes = "0.1.27" -ntex-server = "2.7.3" +ntex-server = "2.7.4" ntex-h2 = "1.8.6" ntex-rt = "0.4.27" ntex-io = "2.11" -ntex-net = "2.5.8" +ntex-net = "2.5.10" ntex-tls = "2.3" base64 = "0.22" diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index 75227c2c..921310a8 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -425,11 +425,12 @@ async fn test_h2_service_error() { assert_eq!(bytes, Bytes::from_static(b"error")); } -struct SetOnDrop(Arc); +struct SetOnDrop(Arc, Arc>>>); impl Drop for SetOnDrop { fn drop(&mut self) { self.0.fetch_add(1, Ordering::Relaxed); + let _ = self.1.lock().unwrap().take().unwrap().send(()); } } @@ -437,17 +438,20 @@ impl Drop for SetOnDrop { async fn test_h2_client_drop() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build() .h2(move |req: Request| { - let count = count.clone(); + let st = SetOnDrop(count.clone(), tx.clone()); async move { - let _st = SetOnDrop(count); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_2); - sleep(Seconds(100)).await; + sleep(Seconds(30)).await; + drop(st); Ok::<_, io::Error>(Response::Ok().finish()) } }) @@ -455,9 +459,9 @@ async fn test_h2_client_drop() -> io::Result<()> { .map_err(|_| ()) }); - let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; + let result = timeout(Millis(1500), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(250)).await; + let _ = timeout(Millis(1500), rx).await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) } diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index a4c1d05f..0227573b 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -723,11 +723,12 @@ async fn test_h1_service_error() { assert_eq!(bytes, Bytes::from_static(b"error")); } -struct SetOnDrop(Arc); +struct SetOnDrop(Arc, Option<::oneshot::Sender<()>>); impl Drop for SetOnDrop { fn drop(&mut self) { self.0.fetch_add(1, Ordering::Relaxed); + let _ = self.1.take().unwrap().send(()); } } @@ -735,24 +736,28 @@ impl Drop for SetOnDrop { async fn test_h1_client_drop() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build().h1(move |req: Request| { + let tx = tx.clone(); let count = count.clone(); async move { - let _st = SetOnDrop(count); + let _st = SetOnDrop(count, tx.lock().unwrap().take()); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_11); - sleep(Millis(500)).await; + sleep(Millis(50000)).await; Ok::<_, io::Error>(Response::Ok().finish()) } }) }); - let result = timeout(Millis(100), srv.request(Method::GET, "/").send()).await; + let result = timeout(Millis(1500), srv.request(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(1000)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) }