diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 2109e1aa..a082b258 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.5.5] - 2025-03-17 + +* Add check for required io-uring opcodes + +* Handle io-uring cancelation + ## [2.5.4] - 2025-03-15 * Close FD in various case for poll driver diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 177d043a..8f75d422 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.4" +version = "2.5.5" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.4", optional = true } +ntex-neon = { version = "0.1.5", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_uring/connect.rs b/ntex-net/src/rt_uring/connect.rs index 715c4763..ea9be3e1 100644 --- a/ntex-net/src/rt_uring/connect.rs +++ b/ntex-net/src/rt_uring/connect.rs @@ -32,6 +32,10 @@ impl ConnectOps { Runtime::value(|rt| { let mut inner = None; rt.driver().register(|api| { + if !api.is_supported(opcode::Connect::CODE) { + panic!("opcode::Connect is required for io-uring support"); + } + let ops = Rc::new(ConnectOpsInner { api, ops: RefCell::new(Slab::new()), diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index 6a2ba777..f2a88d11 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -13,11 +13,22 @@ pub(crate) struct StreamCtl { inner: Rc>, } +bitflags::bitflags! { + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Flags: u8 { + const RD_CANCELING = 0b0000_0001; + const RD_REISSUE = 0b0000_0010; + const WR_CANCELING = 0b0001_0000; + const WR_REISSUE = 0b0010_0000; + } +} + struct StreamItem { io: Option, fd: Fd, context: IoContext, ref_count: usize, + flags: Flags, rd_op: Option, wr_op: Option, } @@ -61,6 +72,16 @@ impl StreamOps { Runtime::value(|rt| { let mut inner = None; rt.driver().register(|api| { + if !api.is_supported(opcode::Recv::CODE) { + panic!("opcode::Recv is required for io-uring support"); + } + if !api.is_supported(opcode::Send::CODE) { + panic!("opcode::Send is required for io-uring support"); + } + if !api.is_supported(opcode::Close::CODE) { + panic!("opcode::Close is required for io-uring support"); + } + let mut ops = Slab::new(); ops.insert(Operation::Nop); @@ -88,6 +109,7 @@ impl StreamOps { ref_count: 1, rd_op: None, wr_op: None, + flags: Flags::empty(), }; let id = self.0.storage.borrow_mut().streams.insert(item); StreamCtl { @@ -116,10 +138,19 @@ impl Handler for StreamOpsHandler { match storage.ops.remove(user_data) { Operation::Recv { id, buf, context } => { - log::debug!("{}: Recv canceled {:?}", context.tag(), id,); + log::debug!("{}: Recv canceled {:?}", context.tag(), id); context.release_read_buf(buf); if let Some(item) = storage.streams.get_mut(id) { item.rd_op.take(); + item.flags.remove(Flags::RD_CANCELING); + if item.flags.contains(Flags::RD_REISSUE) { + item.flags.remove(Flags::RD_REISSUE); + + let result = storage.recv(id, Some(context)); + if let Some((id, op)) = result { + self.inner.api.submit(id, op); + } + } } } Operation::Send { id, buf, context } => { @@ -127,6 +158,15 @@ impl Handler for StreamOpsHandler { context.release_write_buf(buf); if let Some(item) = storage.streams.get_mut(id) { item.wr_op.take(); + item.flags.remove(Flags::WR_CANCELING); + if item.flags.contains(Flags::WR_REISSUE) { + item.flags.remove(Flags::WR_REISSUE); + + let result = storage.send(id, Some(context)); + if let Some((id, op)) = result { + self.inner.api.submit(id, op); + } + } } } Operation::Nop | Operation::Close { .. } => {} @@ -151,12 +191,11 @@ impl Handler for StreamOpsHandler { // reset op reference if let Some(item) = storage.streams.get_mut(id) { log::debug!( - "{}: Recv completed {:?}, res: {:?}, buf({}): {:?}", + "{}: Recv completed {:?}, res: {:?}, buf({})", context.tag(), item.fd, result, - buf.remaining_mut(), - buf, + buf.remaining_mut() ); item.rd_op.take(); } @@ -173,21 +212,24 @@ impl Handler for StreamOpsHandler { } Operation::Send { id, buf, context } => { // reset op reference - if let Some(item) = storage.streams.get_mut(id) { + let fd = if let Some(item) = storage.streams.get_mut(id) { log::debug!( - "{}: Send completed: {:?}, res: {:?}", + "{}: Send completed: {:?}, res: {:?}, buf({})", context.tag(), item.fd, - result + result, + buf.len() ); item.wr_op.take(); - } + Some(item.fd) + } else { + None + }; // set read buf - if context - .set_write_buf(result.map(|size| size as usize), buf) - .is_pending() - { + let result = context.set_write_buf(result.map(|size| size as usize), buf); + if result.is_pending() { + log::debug!("{}: Need to send more: {:?}", context.tag(), fd); if let Some((id, op)) = storage.send(id, Some(context)) { self.inner.api.submit(id, op); } @@ -230,11 +272,10 @@ impl StreamOpsStorage { if item.rd_op.is_none() { if let Poll::Ready(mut buf) = item.context.get_read_buf() { log::debug!( - "{}: Recv resume ({}), {:?} - {:?} = {:?}", + "{}: Recv resume ({}), {:?} rem: {:?}", item.context.tag(), id, item.fd, - buf, buf.remaining_mut() ); @@ -252,6 +293,8 @@ impl StreamOpsStorage { item.rd_op = NonZeroU32::new(op_id as u32); return Some((op_id as u32, op)); } + } else if item.flags.contains(Flags::RD_CANCELING) { + item.flags.insert(Flags::RD_REISSUE); } None } @@ -262,11 +305,11 @@ impl StreamOpsStorage { if item.wr_op.is_none() { if let Poll::Ready(buf) = item.context.get_write_buf() { log::debug!( - "{}: Send resume ({}), {:?} {:?}", + "{}: Send resume ({}), {:?} len: {:?}", item.context.tag(), id, item.fd, - buf + buf.len() ); let slice = buf.chunk(); @@ -283,6 +326,8 @@ impl StreamOpsStorage { item.wr_op = NonZeroU32::new(op_id as u32); return Some((op_id as u32, op)); } + } else if item.flags.contains(Flags::WR_CANCELING) { + item.flags.insert(Flags::WR_REISSUE); } None } @@ -350,13 +395,16 @@ impl StreamCtl { let item = &mut storage.streams[self.id]; if let Some(rd_op) = item.rd_op { - log::debug!( - "{}: Recv to pause ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); - self.inner.api.cancel(rd_op.get()); + if !item.flags.contains(Flags::RD_CANCELING) { + log::debug!( + "{}: Recv to pause ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); + item.flags.insert(Flags::RD_CANCELING); + self.inner.api.cancel(rd_op.get()); + } } } }