diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 15047c01..2109e1aa 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.4] - 2025-03-15 + +* Close FD in various case for poll driver + ## [2.5.3] - 2025-03-14 * Fix operation cancelation handling for poll driver diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 382031df..177d043a 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.3" +version = "2.5.4" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.3", optional = true } +ntex-neon = { version = "0.1.4", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index 6963fc80..0208fc90 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -68,7 +68,6 @@ impl ConnectOps { let id = self.0.connects.borrow_mut().insert(item); self.0.api.register(fd, id, Interest::Writable); - Ok(id) } } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 5f464ce6..d90c73ba 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, collections::VecDeque, io, rc::Rc, task, task::Poll}; +use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task}; use ntex_neon::driver::{DriverApi, Handler, Interest}; use ntex_neon::{syscall, Runtime}; @@ -125,7 +125,7 @@ impl Handler for StreamOpsHandler { let result = item.context.with_read_buf(|buf| { let chunk = buf.chunk_mut(); let b = chunk.as_mut_ptr(); - Poll::Ready( + task::Poll::Ready( task::ready!(syscall!( break libc::read(item.fd, b as _, chunk.len()) )) @@ -142,27 +142,37 @@ impl Handler for StreamOpsHandler { ) }); - if result.is_pending() { + if item.io.is_some() && result.is_pending() { self.inner.api.register(item.fd, id, Interest::Readable); } } Change::Writable => { let item = &mut streams[id]; let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: writing {:?} SIZE: {:?}, BUF: {:?}", + item.context.tag(), + item.fd, + buf.len(), + buf, + ); let slice = &buf[..]; syscall!( break libc::write(item.fd, slice.as_ptr() as _, slice.len()) ) }); - if result.is_pending() { + if item.io.is_some() && result.is_pending() { + log::debug!("{}: want write {:?}", item.context.tag(), item.fd,); self.inner.api.register(item.fd, id, Interest::Writable); } } Change::Error(err) => { if let Some(item) = streams.get_mut(id) { item.context.stopped(Some(err)); - self.inner.api.unregister_all(item.fd); + if let Some(_) = item.io.take() { + close(id, item.fd, &self.inner.api); + } } } } @@ -183,7 +193,7 @@ impl Handler for StreamOpsHandler { item.io.is_some() ); if item.io.is_some() { - self.inner.api.unregister_all(item.fd); + close(id, item.fd, &self.inner.api); } } } @@ -193,19 +203,33 @@ impl Handler for StreamOpsHandler { } } +fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { + api.unregister_all(fd); + ntex_rt::spawn_blocking(move || { + syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; + syscall!(libc::close(fd)) + }) +} + impl StreamCtl { - pub(crate) async fn close(self) -> io::Result<()> { + pub(crate) fn close(self) -> impl Future> { let (io, fd) = self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd)); - if let Some(io) = io { + let fut = if let Some(io) = io { + log::debug!("Closing ({}), {:?}", self.id, fd); std::mem::forget(io); - - ntex_rt::spawn_blocking(move || syscall!(libc::close(fd))) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - .and_then(crate::helpers::pool_io_err)?; + Some(close(self.id, fd, &self.inner.api)) + } else { + None + }; + async move { + if let Some(fut) = fut { + fut.await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .and_then(crate::helpers::pool_io_err)?; + } + Ok(()) } - Ok(()) } pub(crate) fn with_io(&self, f: F) -> R @@ -257,7 +281,7 @@ impl StreamCtl { let result = item.context.with_read_buf(|buf| { let chunk = buf.chunk_mut(); let b = chunk.as_mut_ptr(); - Poll::Ready( + task::Poll::Ready( task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len()))) .inspect(|size| { unsafe { buf.advance_mut(*size) }; @@ -272,7 +296,7 @@ impl StreamCtl { ) }); - if result.is_pending() { + if item.io.is_some() && result.is_pending() { self.inner .api .register(item.fd, self.id, Interest::Readable); @@ -284,12 +308,6 @@ impl StreamCtl { self.with(|streams| { let item = &mut streams[self.id]; - log::debug!( - "{}: Resume io write ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); let result = item.context.with_write_buf(|buf| { log::debug!( "{}: Writing io ({}), buf: {:?}", @@ -302,7 +320,7 @@ impl StreamCtl { syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) }); - if result.is_pending() { + if item.io.is_some() && result.is_pending() { log::debug!( "{}: Write is pending ({}), {:?}", item.context.tag(), @@ -347,14 +365,14 @@ impl Drop for StreamCtl { if streams[self.id].ref_count == 0 { let item = streams.remove(self.id); log::debug!( - "{}: Drop io ({}), {:?}, has-io: {}", + "{}: Drop io ({}), {:?}, has-io: {}", item.context.tag(), self.id, item.fd, item.io.is_some() ); if item.io.is_some() { - self.inner.api.unregister_all(item.fd); + close(self.id, item.fd, &self.inner.api); } } self.inner.streams.set(Some(streams)); diff --git a/ntex-net/src/rt_polling/io.rs b/ntex-net/src/rt_polling/io.rs index 4b7a7b1a..d6e3b4d9 100644 --- a/ntex-net/src/rt_polling/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -88,9 +88,5 @@ async fn run(ctl: StreamCtl, context: IoContext) { ctl.resume_write(); context.shutdown(st == Status::Shutdown).await; - - ctl.pause_all(); - let result = ctl.close().await; - - context.stopped(result.err()); + context.stopped(ctl.close().await.err()); } diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 265ae6f8..cff51af4 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -248,10 +248,6 @@ where Ok(()) }) }); - // wait for server - if std::env::var("GITHUB_ACTIONS") == Ok("true".to_string()) { - thread::sleep(std::time::Duration::from_millis(150)); - } let (system, server, addr) = rx.recv().unwrap(); diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index d81c8338..6a0bcabc 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -701,10 +701,6 @@ where Ok(()) }) }); - // wait for server - if std::env::var("GITHUB_ACTIONS") == Ok("true".to_string()) { - thread::sleep(std::time::Duration::from_millis(150)); - } let (system, server, addr) = rx.recv().unwrap();