Fix operation cancelation handling for poll driver (#528)

This commit is contained in:
Nikolay Kim 2025-03-15 01:19:35 +05:00 committed by GitHub
parent a83ed4c3fa
commit f15c3203b1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 71 additions and 74 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.5.3] - 2025-03-14
* Fix operation cancelation handling for poll driver
## [2.5.2] - 2025-03-14 ## [2.5.2] - 2025-03-14
* Fix operation cancelation handling for io-uring driver * Fix operation cancelation handling for io-uring driver

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-net" name = "ntex-net"
version = "2.5.2" version = "2.5.3"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntexwork utils for ntex framework" description = "ntexwork utils for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -8,15 +8,6 @@ use slab::Slab;
use ntex_bytes::BufMut; use ntex_bytes::BufMut;
use ntex_io::IoContext; use ntex_io::IoContext;
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const ERROR = 0b0000_0001;
const RD = 0b0000_0010;
const WR = 0b0000_0100;
}
}
pub(crate) struct StreamCtl<T> { pub(crate) struct StreamCtl<T> {
id: usize, id: usize,
inner: Rc<StreamOpsInner<T>>, inner: Rc<StreamOpsInner<T>>,
@ -26,7 +17,6 @@ struct StreamItem<T> {
io: Option<T>, io: Option<T>,
fd: RawFd, fd: RawFd,
context: IoContext, context: IoContext,
flags: Flags,
ref_count: usize, ref_count: usize,
} }
@ -76,7 +66,6 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
context, context,
fd: io.as_raw_fd(), fd: io.as_raw_fd(),
io: Some(io), io: Some(io),
flags: Flags::empty(),
ref_count: 1, ref_count: 1,
}; };
self.with(|streams| { self.with(|streams| {
@ -147,17 +136,14 @@ impl<T> Handler for StreamOpsHandler<T> {
item.context.tag(), item.context.tag(),
item.fd, item.fd,
size, size,
buf buf,
); );
}), }),
) )
}); });
if result.is_pending() { if result.is_pending() {
item.flags.insert(Flags::RD);
self.inner.api.register(item.fd, id, Interest::Readable); self.inner.api.register(item.fd, id, Interest::Readable);
} else {
item.flags.remove(Flags::RD);
} }
} }
Change::Writable => { Change::Writable => {
@ -170,34 +156,32 @@ impl<T> Handler for StreamOpsHandler<T> {
}); });
if result.is_pending() { if result.is_pending() {
item.flags.insert(Flags::WR);
self.inner.api.register(item.fd, id, Interest::Writable); self.inner.api.register(item.fd, id, Interest::Writable);
} else {
item.flags.remove(Flags::WR);
} }
} }
Change::Error(err) => { Change::Error(err) => {
if let Some(item) = streams.get_mut(id) { if let Some(item) = streams.get_mut(id) {
item.context.stopped(Some(err)); item.context.stopped(Some(err));
if !item.flags.contains(Flags::ERROR) {
item.flags.insert(Flags::ERROR);
item.flags.remove(Flags::RD | Flags::WR);
self.inner.api.unregister_all(item.fd); self.inner.api.unregister_all(item.fd);
} }
} }
} }
} }
}
// extra // extra
let mut feed = self.inner.feed.take().unwrap(); let mut feed = self.inner.feed.take().unwrap();
for id in feed.drain(..) { for id in feed.drain(..) {
let item = &mut streams[id]; let item = &mut streams[id];
log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd);
item.ref_count -= 1; item.ref_count -= 1;
if item.ref_count == 0 { if item.ref_count == 0 {
let item = streams.remove(id); let item = streams.remove(id);
log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}",
item.context.tag(),
id,
item.fd,
item.io.is_some()
);
if item.io.is_some() { if item.io.is_some() {
self.inner.api.unregister_all(item.fd); self.inner.api.unregister_all(item.fd);
} }
@ -235,16 +219,13 @@ impl<T> StreamCtl<T> {
self.with(|streams| { self.with(|streams| {
let item = &mut streams[self.id]; let item = &mut streams[self.id];
if item.flags.intersects(Flags::RD | Flags::WR) {
log::debug!( log::debug!(
"{}: Pause all io ({}), {:?}", "{}: Pause all io ({}), {:?}",
item.context.tag(), item.context.tag(),
self.id, self.id,
item.fd item.fd
); );
item.flags.remove(Flags::RD | Flags::WR);
self.inner.api.unregister_all(item.fd); self.inner.api.unregister_all(item.fd);
}
}) })
} }
@ -258,10 +239,7 @@ impl<T> StreamCtl<T> {
self.id, self.id,
item.fd item.fd
); );
if item.flags.contains(Flags::RD) {
item.flags.remove(Flags::RD);
self.inner.api.unregister(item.fd, Interest::Readable); self.inner.api.unregister(item.fd, Interest::Readable);
}
}) })
} }
@ -275,8 +253,26 @@ impl<T> StreamCtl<T> {
self.id, self.id,
item.fd item.fd
); );
if !item.flags.contains(Flags::RD) {
item.flags.insert(Flags::RD); let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
Poll::Ready(
task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len())))
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
log::debug!(
"{}: {:?}, SIZE: {:?}, BUF: {:?}",
item.context.tag(),
item.fd,
size,
buf,
);
}),
)
});
if result.is_pending() {
self.inner self.inner
.api .api
.register(item.fd, self.id, Interest::Readable); .register(item.fd, self.id, Interest::Readable);
@ -288,7 +284,6 @@ impl<T> StreamCtl<T> {
self.with(|streams| { self.with(|streams| {
let item = &mut streams[self.id]; let item = &mut streams[self.id];
if !item.flags.contains(Flags::WR) {
log::debug!( log::debug!(
"{}: Resume io write ({}), {:?}", "{}: Resume io write ({}), {:?}",
item.context.tag(), item.context.tag(),
@ -315,12 +310,10 @@ impl<T> StreamCtl<T> {
item.context.flags() item.context.flags()
); );
item.flags.insert(Flags::WR);
self.inner self.inner
.api .api
.register(item.fd, self.id, Interest::Writable); .register(item.fd, self.id, Interest::Writable);
} }
}
}) })
} }
@ -350,16 +343,16 @@ impl<T> Clone for StreamCtl<T> {
impl<T> Drop for StreamCtl<T> { impl<T> Drop for StreamCtl<T> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(mut streams) = self.inner.streams.take() { if let Some(mut streams) = self.inner.streams.take() {
log::debug!(
"{}: Drop io ({}), {:?}",
streams[self.id].context.tag(),
self.id,
streams[self.id].fd
);
streams[self.id].ref_count -= 1; streams[self.id].ref_count -= 1;
if streams[self.id].ref_count == 0 { if streams[self.id].ref_count == 0 {
let item = streams.remove(self.id); let item = streams.remove(self.id);
log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}",
item.context.tag(),
self.id,
item.fd,
item.io.is_some()
);
if item.io.is_some() { if item.io.is_some() {
self.inner.api.unregister_all(item.fd); self.inner.api.unregister_all(item.fd);
} }