diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index dc18cb14..15047c01 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.3] - 2025-03-14 + +* Fix operation cancelation handling for poll driver + ## [2.5.2] - 2025-03-14 * Fix operation cancelation handling for io-uring driver diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index f2ed2a79..382031df 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.2" +version = "2.5.3" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index b830fcca..5f464ce6 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -8,15 +8,6 @@ use slab::Slab; use ntex_bytes::BufMut; use ntex_io::IoContext; -bitflags::bitflags! { - #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] - struct Flags: u8 { - const ERROR = 0b0000_0001; - const RD = 0b0000_0010; - const WR = 0b0000_0100; - } -} - pub(crate) struct StreamCtl { id: usize, inner: Rc>, @@ -26,7 +17,6 @@ struct StreamItem { io: Option, fd: RawFd, context: IoContext, - flags: Flags, ref_count: usize, } @@ -76,7 +66,6 @@ impl StreamOps { context, fd: io.as_raw_fd(), io: Some(io), - flags: Flags::empty(), ref_count: 1, }; self.with(|streams| { @@ -147,17 +136,14 @@ impl Handler for StreamOpsHandler { item.context.tag(), item.fd, size, - buf + buf, ); }), ) }); if result.is_pending() { - item.flags.insert(Flags::RD); self.inner.api.register(item.fd, id, Interest::Readable); - } else { - item.flags.remove(Flags::RD); } } Change::Writable => { @@ -170,20 +156,13 @@ impl Handler for StreamOpsHandler { }); if result.is_pending() { - item.flags.insert(Flags::WR); self.inner.api.register(item.fd, id, Interest::Writable); - } else { - item.flags.remove(Flags::WR); } } Change::Error(err) => { if let Some(item) = streams.get_mut(id) { item.context.stopped(Some(err)); - if !item.flags.contains(Flags::ERROR) { - item.flags.insert(Flags::ERROR); - item.flags.remove(Flags::RD | Flags::WR); - self.inner.api.unregister_all(item.fd); - } + self.inner.api.unregister_all(item.fd); } } } @@ -193,11 +172,16 @@ impl Handler for StreamOpsHandler { let mut feed = self.inner.feed.take().unwrap(); for id in feed.drain(..) { let item = &mut streams[id]; - log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd); - item.ref_count -= 1; if item.ref_count == 0 { let item = streams.remove(id); + log::debug!( + "{}: Drop io ({}), {:?}, has-io: {}", + item.context.tag(), + id, + item.fd, + item.io.is_some() + ); if item.io.is_some() { self.inner.api.unregister_all(item.fd); } @@ -235,16 +219,13 @@ impl StreamCtl { self.with(|streams| { let item = &mut streams[self.id]; - if item.flags.intersects(Flags::RD | Flags::WR) { - log::debug!( - "{}: Pause all io ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); - item.flags.remove(Flags::RD | Flags::WR); - self.inner.api.unregister_all(item.fd); - } + log::debug!( + "{}: Pause all io ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); + self.inner.api.unregister_all(item.fd); }) } @@ -258,10 +239,7 @@ impl StreamCtl { self.id, item.fd ); - if item.flags.contains(Flags::RD) { - item.flags.remove(Flags::RD); - self.inner.api.unregister(item.fd, Interest::Readable); - } + self.inner.api.unregister(item.fd, Interest::Readable); }) } @@ -275,8 +253,26 @@ impl StreamCtl { self.id, item.fd ); - if !item.flags.contains(Flags::RD) { - item.flags.insert(Flags::RD); + + let result = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let b = chunk.as_mut_ptr(); + Poll::Ready( + task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len()))) + .inspect(|size| { + unsafe { buf.advance_mut(*size) }; + log::debug!( + "{}: {:?}, SIZE: {:?}, BUF: {:?}", + item.context.tag(), + item.fd, + size, + buf, + ); + }), + ) + }); + + if result.is_pending() { self.inner .api .register(item.fd, self.id, Interest::Readable); @@ -288,38 +284,35 @@ impl StreamCtl { self.with(|streams| { let item = &mut streams[self.id]; - if !item.flags.contains(Flags::WR) { + log::debug!( + "{}: Resume io write ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); + let result = item.context.with_write_buf(|buf| { log::debug!( - "{}: Resume io write ({}), {:?}", + "{}: Writing io ({}), buf: {:?}", item.context.tag(), self.id, - item.fd + buf.len() ); - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: Writing io ({}), buf: {:?}", - item.context.tag(), - self.id, - buf.len() - ); - let slice = &buf[..]; - syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) - }); + let slice = &buf[..]; + syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) + }); - if result.is_pending() { - log::debug!( - "{}: Write is pending ({}), {:?}", - item.context.tag(), - self.id, - item.context.flags() - ); + if result.is_pending() { + log::debug!( + "{}: Write is pending ({}), {:?}", + item.context.tag(), + self.id, + item.context.flags() + ); - item.flags.insert(Flags::WR); - self.inner - .api - .register(item.fd, self.id, Interest::Writable); - } + self.inner + .api + .register(item.fd, self.id, Interest::Writable); } }) } @@ -350,16 +343,16 @@ impl Clone for StreamCtl { impl Drop for StreamCtl { fn drop(&mut self) { if let Some(mut streams) = self.inner.streams.take() { - log::debug!( - "{}: Drop io ({}), {:?}", - streams[self.id].context.tag(), - self.id, - streams[self.id].fd - ); - streams[self.id].ref_count -= 1; if streams[self.id].ref_count == 0 { let item = streams.remove(self.id); + log::debug!( + "{}: Drop io ({}), {:?}, has-io: {}", + item.context.tag(), + self.id, + item.fd, + item.io.is_some() + ); if item.io.is_some() { self.inner.api.unregister_all(item.fd); }