Maintain interest info for poll driver (#536)

This commit is contained in:
Nikolay Kim 2025-03-20 08:56:31 +01:00 committed by GitHub
parent e3f58cce27
commit bf6b1d6c79
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 26 additions and 7 deletions

View file

@ -13,11 +13,20 @@ pub(crate) struct StreamCtl<T> {
inner: Rc<StreamOpsInner<T>>,
}
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const RD = 0b0000_0001;
const WR = 0b0000_0010;
}
}
struct StreamItem<T> {
io: Option<T>,
fd: RawFd,
flags: Flags,
ref_count: u16,
context: IoContext,
ref_count: usize,
}
pub(crate) struct StreamOps<T>(Rc<StreamOpsInner<T>>);
@ -67,6 +76,7 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
context,
io: Some(io),
ref_count: 1,
flags: Flags::empty(),
};
let stream = self.with(move |streams| {
let id = streams.insert(item) as u32;
@ -146,7 +156,10 @@ impl<T> Handler for StreamOpsHandler<T> {
renew_ev.readable = true;
}
}
} else if item.flags.contains(Flags::RD) {
renew_ev.readable = true;
}
if ev.writable {
let result = item.context.with_write_buf(|buf| {
log::debug!(
@ -166,10 +179,10 @@ impl<T> Handler for StreamOpsHandler<T> {
});
if item.io.is_some() && result.is_pending() {
if item.context.is_write_ready() {
renew_ev.writable = true;
}
renew_ev.writable = true;
}
} else if item.flags.contains(Flags::WR) {
renew_ev.writable = true;
}
if ev.is_interrupt() {
@ -179,6 +192,8 @@ impl<T> Handler for StreamOpsHandler<T> {
}
continue;
} else {
item.flags.set(Flags::RD, renew_ev.readable);
item.flags.set(Flags::WR, renew_ev.writable);
self.inner.api.modify(item.fd, id as u32, renew_ev);
}
}
@ -257,6 +272,7 @@ impl<T> StreamCtl<T> {
pub(crate) fn modify(&self, readable: bool, writable: bool) {
self.with(|streams| {
let item = &mut streams[self.id as usize];
item.flags = Flags::empty();
log::debug!(
"{}: Modify interest ({}), {:?} read: {:?}, write: {:?}",
@ -292,6 +308,7 @@ impl<T> StreamCtl<T> {
if item.io.is_some() && result.is_pending() {
if item.context.is_read_ready() {
event.readable = true;
item.flags.insert(Flags::RD);
}
}
}
@ -310,9 +327,8 @@ impl<T> StreamCtl<T> {
});
if item.io.is_some() && result.is_pending() {
if item.context.is_write_ready() {
event.writable = true;
}
event.writable = true;
item.flags.insert(Flags::WR);
}
}

View file

@ -8,6 +8,9 @@ pub(crate) mod connect;
mod driver;
mod io;
#[cfg(not(target_pointer_width = "64"))]
compile_error!("Only 64bit platforms are supported");
/// Tcp stream wrapper for neon TcpStream
struct TcpStream(socket2::Socket);