Handle io-uring cancelation

This commit is contained in:
Nikolay Kim 2025-03-17 20:38:06 +01:00
parent 1f3406d8bc
commit ca4ceefd0e
3 changed files with 78 additions and 25 deletions

View file

@ -4,6 +4,8 @@
* Add check for required io-uring opcodes * Add check for required io-uring opcodes
* Handle io-uring cancelation
## [2.5.4] - 2025-03-15 ## [2.5.4] - 2025-03-15
* Close FD in various case for poll driver * Close FD in various case for poll driver

View file

@ -13,11 +13,22 @@ pub(crate) struct StreamCtl<T> {
inner: Rc<StreamOpsInner<T>>, inner: Rc<StreamOpsInner<T>>,
} }
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const RD_CANCELING = 0b0000_0001;
const RD_REISSUE = 0b0000_0010;
const WR_CANCELING = 0b0001_0000;
const WR_REISSUE = 0b0010_0000;
}
}
struct StreamItem<T> { struct StreamItem<T> {
io: Option<T>, io: Option<T>,
fd: Fd, fd: Fd,
context: IoContext, context: IoContext,
ref_count: usize, ref_count: usize,
flags: Flags,
rd_op: Option<NonZeroU32>, rd_op: Option<NonZeroU32>,
wr_op: Option<NonZeroU32>, wr_op: Option<NonZeroU32>,
} }
@ -98,6 +109,7 @@ impl<T: os::fd::AsRawFd + 'static> StreamOps<T> {
ref_count: 1, ref_count: 1,
rd_op: None, rd_op: None,
wr_op: None, wr_op: None,
flags: Flags::empty(),
}; };
let id = self.0.storage.borrow_mut().streams.insert(item); let id = self.0.storage.borrow_mut().streams.insert(item);
StreamCtl { StreamCtl {
@ -126,10 +138,19 @@ impl<T> Handler for StreamOpsHandler<T> {
match storage.ops.remove(user_data) { match storage.ops.remove(user_data) {
Operation::Recv { id, buf, context } => { Operation::Recv { id, buf, context } => {
log::debug!("{}: Recv canceled {:?}", context.tag(), id,); log::debug!("{}: Recv canceled {:?}", context.tag(), id);
context.release_read_buf(buf); context.release_read_buf(buf);
if let Some(item) = storage.streams.get_mut(id) { if let Some(item) = storage.streams.get_mut(id) {
item.rd_op.take(); item.rd_op.take();
item.flags.remove(Flags::RD_CANCELING);
if item.flags.contains(Flags::RD_REISSUE) {
item.flags.remove(Flags::RD_REISSUE);
let result = storage.recv(id, Some(context));
if let Some((id, op)) = result {
self.inner.api.submit(id, op);
}
}
} }
} }
Operation::Send { id, buf, context } => { Operation::Send { id, buf, context } => {
@ -137,6 +158,15 @@ impl<T> Handler for StreamOpsHandler<T> {
context.release_write_buf(buf); context.release_write_buf(buf);
if let Some(item) = storage.streams.get_mut(id) { if let Some(item) = storage.streams.get_mut(id) {
item.wr_op.take(); item.wr_op.take();
item.flags.remove(Flags::WR_CANCELING);
if item.flags.contains(Flags::WR_REISSUE) {
item.flags.remove(Flags::WR_REISSUE);
let result = storage.send(id, Some(context));
if let Some((id, op)) = result {
self.inner.api.submit(id, op);
}
}
} }
} }
Operation::Nop | Operation::Close { .. } => {} Operation::Nop | Operation::Close { .. } => {}
@ -161,12 +191,11 @@ impl<T> Handler for StreamOpsHandler<T> {
// reset op reference // reset op reference
if let Some(item) = storage.streams.get_mut(id) { if let Some(item) = storage.streams.get_mut(id) {
log::debug!( log::debug!(
"{}: Recv completed {:?}, res: {:?}, buf({}): {:?}", "{}: Recv completed {:?}, res: {:?}, buf({})",
context.tag(), context.tag(),
item.fd, item.fd,
result, result,
buf.remaining_mut(), buf.remaining_mut()
buf,
); );
item.rd_op.take(); item.rd_op.take();
} }
@ -183,21 +212,24 @@ impl<T> Handler for StreamOpsHandler<T> {
} }
Operation::Send { id, buf, context } => { Operation::Send { id, buf, context } => {
// reset op reference // reset op reference
if let Some(item) = storage.streams.get_mut(id) { let fd = if let Some(item) = storage.streams.get_mut(id) {
log::debug!( log::debug!(
"{}: Send completed: {:?}, res: {:?}", "{}: Send completed: {:?}, res: {:?}, buf({})",
context.tag(), context.tag(),
item.fd, item.fd,
result result,
buf.len()
); );
item.wr_op.take(); item.wr_op.take();
} Some(item.fd)
} else {
None
};
// set read buf // set read buf
if context let result = context.set_write_buf(result.map(|size| size as usize), buf);
.set_write_buf(result.map(|size| size as usize), buf) if result.is_pending() {
.is_pending() log::debug!("{}: Need to send more: {:?}", context.tag(), fd);
{
if let Some((id, op)) = storage.send(id, Some(context)) { if let Some((id, op)) = storage.send(id, Some(context)) {
self.inner.api.submit(id, op); self.inner.api.submit(id, op);
} }
@ -240,11 +272,10 @@ impl<T> StreamOpsStorage<T> {
if item.rd_op.is_none() { if item.rd_op.is_none() {
if let Poll::Ready(mut buf) = item.context.get_read_buf() { if let Poll::Ready(mut buf) = item.context.get_read_buf() {
log::debug!( log::debug!(
"{}: Recv resume ({}), {:?} - {:?} = {:?}", "{}: Recv resume ({}), {:?} rem: {:?}",
item.context.tag(), item.context.tag(),
id, id,
item.fd, item.fd,
buf,
buf.remaining_mut() buf.remaining_mut()
); );
@ -262,6 +293,8 @@ impl<T> StreamOpsStorage<T> {
item.rd_op = NonZeroU32::new(op_id as u32); item.rd_op = NonZeroU32::new(op_id as u32);
return Some((op_id as u32, op)); return Some((op_id as u32, op));
} }
} else if item.flags.contains(Flags::RD_CANCELING) {
item.flags.insert(Flags::RD_REISSUE);
} }
None None
} }
@ -272,11 +305,11 @@ impl<T> StreamOpsStorage<T> {
if item.wr_op.is_none() { if item.wr_op.is_none() {
if let Poll::Ready(buf) = item.context.get_write_buf() { if let Poll::Ready(buf) = item.context.get_write_buf() {
log::debug!( log::debug!(
"{}: Send resume ({}), {:?} {:?}", "{}: Send resume ({}), {:?} len: {:?}",
item.context.tag(), item.context.tag(),
id, id,
item.fd, item.fd,
buf buf.len()
); );
let slice = buf.chunk(); let slice = buf.chunk();
@ -293,6 +326,8 @@ impl<T> StreamOpsStorage<T> {
item.wr_op = NonZeroU32::new(op_id as u32); item.wr_op = NonZeroU32::new(op_id as u32);
return Some((op_id as u32, op)); return Some((op_id as u32, op));
} }
} else if item.flags.contains(Flags::WR_CANCELING) {
item.flags.insert(Flags::WR_REISSUE);
} }
None None
} }
@ -360,15 +395,18 @@ impl<T> StreamCtl<T> {
let item = &mut storage.streams[self.id]; let item = &mut storage.streams[self.id];
if let Some(rd_op) = item.rd_op { if let Some(rd_op) = item.rd_op {
if !item.flags.contains(Flags::RD_CANCELING) {
log::debug!( log::debug!(
"{}: Recv to pause ({}), {:?}", "{}: Recv to pause ({}), {:?}",
item.context.tag(), item.context.tag(),
self.id, self.id,
item.fd item.fd
); );
item.flags.insert(Flags::RD_CANCELING);
self.inner.api.cancel(rd_op.get()); self.inner.api.cancel(rd_op.get());
} }
} }
}
} }
impl<T> Clone for StreamCtl<T> { impl<T> Clone for StreamCtl<T> {

View file

@ -54,7 +54,19 @@ enum Status {
async fn run<T>(ctl: StreamCtl<T>, context: IoContext) { async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
// Handle io readiness // Handle io readiness
let st = poll_fn(|cx| { let st = poll_fn(|cx| {
let read = match context.poll_read_ready(cx) { let read_st = context.poll_read_ready(cx);
let write_st = context.poll_write_ready(cx);
// log::debug!(
// "{}: io ctl read: {:?} write: {:?}, flags: {:?}",
// context.tag(),
// read_st,
// write_st,
// context.flags()
// );
//let read = match context.poll_read_ready(cx) {
let read = match read_st {
Poll::Ready(ReadStatus::Ready) => { Poll::Ready(ReadStatus::Ready) => {
ctl.resume_read(); ctl.resume_read();
Poll::Pending Poll::Pending
@ -66,7 +78,8 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
} }
}; };
let write = match context.poll_write_ready(cx) { // let write = match context.poll_write_ready(cx) {
let write = match write_st {
Poll::Ready(WriteStatus::Ready) => { Poll::Ready(WriteStatus::Ready) => {
ctl.resume_write(); ctl.resume_write();
Poll::Pending Poll::Pending