From ca4ceefd0e578cdd94fab1c6cd777b56db45acb5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 17 Mar 2025 20:38:06 +0100 Subject: [PATCH] Handle io-uring cancelation --- ntex-net/CHANGES.md | 2 + ntex-net/src/rt_uring/driver.rs | 84 ++++++++++++++++++++++++--------- ntex-net/src/rt_uring/io.rs | 17 ++++++- 3 files changed, 78 insertions(+), 25 deletions(-) diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index d2c24373..b305c885 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -4,6 +4,8 @@ * Add check for required io-uring opcodes +* Handle io-uring cancelation + ## [2.5.4] - 2025-03-15 * Close FD in various case for poll driver diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index f2368d1a..f2a88d11 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -13,11 +13,22 @@ pub(crate) struct StreamCtl { inner: Rc>, } +bitflags::bitflags! { + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Flags: u8 { + const RD_CANCELING = 0b0000_0001; + const RD_REISSUE = 0b0000_0010; + const WR_CANCELING = 0b0001_0000; + const WR_REISSUE = 0b0010_0000; + } +} + struct StreamItem { io: Option, fd: Fd, context: IoContext, ref_count: usize, + flags: Flags, rd_op: Option, wr_op: Option, } @@ -98,6 +109,7 @@ impl StreamOps { ref_count: 1, rd_op: None, wr_op: None, + flags: Flags::empty(), }; let id = self.0.storage.borrow_mut().streams.insert(item); StreamCtl { @@ -126,10 +138,19 @@ impl Handler for StreamOpsHandler { match storage.ops.remove(user_data) { Operation::Recv { id, buf, context } => { - log::debug!("{}: Recv canceled {:?}", context.tag(), id,); + log::debug!("{}: Recv canceled {:?}", context.tag(), id); context.release_read_buf(buf); if let Some(item) = storage.streams.get_mut(id) { item.rd_op.take(); + item.flags.remove(Flags::RD_CANCELING); + if item.flags.contains(Flags::RD_REISSUE) { + item.flags.remove(Flags::RD_REISSUE); + + let result = storage.recv(id, Some(context)); + if let Some((id, op)) = result { + self.inner.api.submit(id, op); + } + } } } Operation::Send { id, buf, context } => { @@ -137,6 +158,15 @@ impl Handler for StreamOpsHandler { context.release_write_buf(buf); if let Some(item) = storage.streams.get_mut(id) { item.wr_op.take(); + item.flags.remove(Flags::WR_CANCELING); + if item.flags.contains(Flags::WR_REISSUE) { + item.flags.remove(Flags::WR_REISSUE); + + let result = storage.send(id, Some(context)); + if let Some((id, op)) = result { + self.inner.api.submit(id, op); + } + } } } Operation::Nop | Operation::Close { .. } => {} @@ -161,12 +191,11 @@ impl Handler for StreamOpsHandler { // reset op reference if let Some(item) = storage.streams.get_mut(id) { log::debug!( - "{}: Recv completed {:?}, res: {:?}, buf({}): {:?}", + "{}: Recv completed {:?}, res: {:?}, buf({})", context.tag(), item.fd, result, - buf.remaining_mut(), - buf, + buf.remaining_mut() ); item.rd_op.take(); } @@ -183,21 +212,24 @@ impl Handler for StreamOpsHandler { } Operation::Send { id, buf, context } => { // reset op reference - if let Some(item) = storage.streams.get_mut(id) { + let fd = if let Some(item) = storage.streams.get_mut(id) { log::debug!( - "{}: Send completed: {:?}, res: {:?}", + "{}: Send completed: {:?}, res: {:?}, buf({})", context.tag(), item.fd, - result + result, + buf.len() ); item.wr_op.take(); - } + Some(item.fd) + } else { + None + }; // set read buf - if context - .set_write_buf(result.map(|size| size as usize), buf) - .is_pending() - { + let result = context.set_write_buf(result.map(|size| size as usize), buf); + if result.is_pending() { + log::debug!("{}: Need to send more: {:?}", context.tag(), fd); if let Some((id, op)) = storage.send(id, Some(context)) { self.inner.api.submit(id, op); } @@ -240,11 +272,10 @@ impl StreamOpsStorage { if item.rd_op.is_none() { if let Poll::Ready(mut buf) = item.context.get_read_buf() { log::debug!( - "{}: Recv resume ({}), {:?} - {:?} = {:?}", + "{}: Recv resume ({}), {:?} rem: {:?}", item.context.tag(), id, item.fd, - buf, buf.remaining_mut() ); @@ -262,6 +293,8 @@ impl StreamOpsStorage { item.rd_op = NonZeroU32::new(op_id as u32); return Some((op_id as u32, op)); } + } else if item.flags.contains(Flags::RD_CANCELING) { + item.flags.insert(Flags::RD_REISSUE); } None } @@ -272,11 +305,11 @@ impl StreamOpsStorage { if item.wr_op.is_none() { if let Poll::Ready(buf) = item.context.get_write_buf() { log::debug!( - "{}: Send resume ({}), {:?} {:?}", + "{}: Send resume ({}), {:?} len: {:?}", item.context.tag(), id, item.fd, - buf + buf.len() ); let slice = buf.chunk(); @@ -293,6 +326,8 @@ impl StreamOpsStorage { item.wr_op = NonZeroU32::new(op_id as u32); return Some((op_id as u32, op)); } + } else if item.flags.contains(Flags::WR_CANCELING) { + item.flags.insert(Flags::WR_REISSUE); } None } @@ -360,13 +395,16 @@ impl StreamCtl { let item = &mut storage.streams[self.id]; if let Some(rd_op) = item.rd_op { - log::debug!( - "{}: Recv to pause ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); - self.inner.api.cancel(rd_op.get()); + if !item.flags.contains(Flags::RD_CANCELING) { + log::debug!( + "{}: Recv to pause ({}), {:?}", + item.context.tag(), + self.id, + item.fd + ); + item.flags.insert(Flags::RD_CANCELING); + self.inner.api.cancel(rd_op.get()); + } } } } diff --git a/ntex-net/src/rt_uring/io.rs b/ntex-net/src/rt_uring/io.rs index 2f111ad7..7a22f66c 100644 --- a/ntex-net/src/rt_uring/io.rs +++ b/ntex-net/src/rt_uring/io.rs @@ -54,7 +54,19 @@ enum Status { async fn run(ctl: StreamCtl, context: IoContext) { // Handle io readiness let st = poll_fn(|cx| { - let read = match context.poll_read_ready(cx) { + let read_st = context.poll_read_ready(cx); + let write_st = context.poll_write_ready(cx); + + // log::debug!( + // "{}: io ctl read: {:?} write: {:?}, flags: {:?}", + // context.tag(), + // read_st, + // write_st, + // context.flags() + // ); + + //let read = match context.poll_read_ready(cx) { + let read = match read_st { Poll::Ready(ReadStatus::Ready) => { ctl.resume_read(); Poll::Pending @@ -66,7 +78,8 @@ async fn run(ctl: StreamCtl, context: IoContext) { } }; - let write = match context.poll_write_ready(cx) { + // let write = match context.poll_write_ready(cx) { + let write = match write_st { Poll::Ready(WriteStatus::Ready) => { ctl.resume_write(); Poll::Pending