diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 6739a088..385f15fa 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -13,11 +13,20 @@ pub(crate) struct StreamCtl { inner: Rc>, } +bitflags::bitflags! { + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Flags: u8 { + const RD = 0b0000_0001; + const WR = 0b0000_0010; + } +} + struct StreamItem { io: Option, fd: RawFd, + flags: Flags, + ref_count: u16, context: IoContext, - ref_count: usize, } pub(crate) struct StreamOps(Rc>); @@ -67,6 +76,7 @@ impl StreamOps { context, io: Some(io), ref_count: 1, + flags: Flags::empty(), }; let stream = self.with(move |streams| { let id = streams.insert(item) as u32; @@ -146,7 +156,10 @@ impl Handler for StreamOpsHandler { renew_ev.readable = true; } } + } else if item.flags.contains(Flags::RD) { + renew_ev.readable = true; } + if ev.writable { let result = item.context.with_write_buf(|buf| { log::debug!( @@ -166,10 +179,10 @@ impl Handler for StreamOpsHandler { }); if item.io.is_some() && result.is_pending() { - if item.context.is_write_ready() { - renew_ev.writable = true; - } + renew_ev.writable = true; } + } else if item.flags.contains(Flags::WR) { + renew_ev.writable = true; } if ev.is_interrupt() { @@ -179,6 +192,8 @@ impl Handler for StreamOpsHandler { } continue; } else { + item.flags.set(Flags::RD, renew_ev.readable); + item.flags.set(Flags::WR, renew_ev.writable); self.inner.api.modify(item.fd, id as u32, renew_ev); } } @@ -257,6 +272,7 @@ impl StreamCtl { pub(crate) fn modify(&self, readable: bool, writable: bool) { self.with(|streams| { let item = &mut streams[self.id as usize]; + item.flags = Flags::empty(); log::debug!( "{}: Modify interest ({}), {:?} read: {:?}, write: {:?}", @@ -292,6 +308,7 @@ impl StreamCtl { if item.io.is_some() && result.is_pending() { if item.context.is_read_ready() { event.readable = true; + item.flags.insert(Flags::RD); } } } @@ -310,9 +327,8 @@ impl StreamCtl { }); if item.io.is_some() && result.is_pending() { - if item.context.is_write_ready() { - event.writable = true; - } + event.writable = true; + item.flags.insert(Flags::WR); } } diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index 671b8493..b4fb928b 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -8,6 +8,9 @@ pub(crate) mod connect; mod driver; mod io; +#[cfg(not(target_pointer_width = "64"))] +compile_error!("Only 64bit platforms are supported"); + /// Tcp stream wrapper for neon TcpStream struct TcpStream(socket2::Socket);