Simplify neon poll impl (#537)

This commit is contained in:
Nikolay Kim 2025-03-21 08:21:45 +01:00 committed by GitHub
parent bf6b1d6c79
commit 5484009c92
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 217 additions and 260 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.5.7] - 2025-03-21
* Simplify neon poll impl
## [2.5.6] - 2025-03-20 ## [2.5.6] - 2025-03-20
* Redesign neon poll support * Redesign neon poll support

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-net" name = "ntex-net"
version = "2.5.6" version = "2.5.7"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntexwork utils for ntex framework" description = "ntexwork utils for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -40,7 +40,7 @@ ntex-util = "2.5"
ntex-tokio = { version = "0.5.3", optional = true } ntex-tokio = { version = "0.5.3", optional = true }
ntex-compio = { version = "0.2.4", optional = true } ntex-compio = { version = "0.2.4", optional = true }
ntex-neon = { version = "0.1.6", optional = true } ntex-neon = { version = "0.1.7", optional = true }
bitflags = { workspace = true } bitflags = { workspace = true }
cfg-if = { workspace = true } cfg-if = { workspace = true }

View file

@ -1,5 +1,5 @@
use std::os::fd::{AsRawFd, RawFd}; use std::os::fd::{AsRawFd, RawFd};
use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll}; use std::{cell::RefCell, io, rc::Rc, task::Poll};
use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::driver::{DriverApi, Event, Handler};
use ntex_neon::{syscall, Runtime}; use ntex_neon::{syscall, Runtime};
@ -17,7 +17,6 @@ enum Change {
} }
struct ConnectOpsBatcher { struct ConnectOpsBatcher {
feed: VecDeque<(usize, Change)>,
inner: Rc<ConnectOpsInner>, inner: Rc<ConnectOpsInner>,
} }
@ -41,10 +40,7 @@ impl ConnectOps {
connects: RefCell::new(Slab::new()), connects: RefCell::new(Slab::new()),
}); });
inner = Some(ops.clone()); inner = Some(ops.clone());
Box::new(ConnectOpsBatcher { Box::new(ConnectOpsBatcher { inner: ops })
inner: ops,
feed: VecDeque::new(),
})
}); });
ConnectOps(inner.unwrap()) ConnectOps(inner.unwrap())
@ -74,55 +70,42 @@ impl ConnectOps {
impl Handler for ConnectOpsBatcher { impl Handler for ConnectOpsBatcher {
fn event(&mut self, id: usize, event: Event) { fn event(&mut self, id: usize, event: Event) {
log::debug!("connect-fd is readable {:?}", id); log::debug!("connect-fd is readable {:?}", id);
self.feed.push_back((id, Change::Event(event)));
}
fn error(&mut self, id: usize, err: io::Error) {
self.feed.push_back((id, Change::Error(err)));
}
fn commit(&mut self) {
if self.feed.is_empty() {
return;
}
log::debug!("Commit connect driver changes, num: {:?}", self.feed.len());
let mut connects = self.inner.connects.borrow_mut(); let mut connects = self.inner.connects.borrow_mut();
for (id, change) in self.feed.drain(..) { if connects.contains(id) {
if connects.contains(id) { let item = connects.remove(id);
let item = connects.remove(id); if event.writable {
match change { let mut err: libc::c_int = 0;
Change::Event(event) => { let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
if event.writable {
let mut err: libc::c_int = 0;
let mut err_len =
std::mem::size_of::<libc::c_int>() as libc::socklen_t;
let res = syscall!(libc::getsockopt( let res = syscall!(libc::getsockopt(
item.fd.as_raw_fd(), item.fd.as_raw_fd(),
libc::SOL_SOCKET, libc::SOL_SOCKET,
libc::SO_ERROR, libc::SO_ERROR,
&mut err as *mut _ as *mut _, &mut err as *mut _ as *mut _,
&mut err_len &mut err_len
)); ));
let res = if err == 0 { let res = if err == 0 {
res.map(|_| ()) res.map(|_| ())
} else { } else {
Err(io::Error::from_raw_os_error(err)) Err(io::Error::from_raw_os_error(err))
}; };
self.inner.api.detach(item.fd, id as u32); self.inner.api.detach(item.fd, id as u32);
let _ = item.sender.send(res); let _ = item.sender.send(res);
}
}
Change::Error(err) => {
let _ = item.sender.send(Err(err));
self.inner.api.detach(item.fd, id as u32);
}
}
} }
} }
} }
fn error(&mut self, id: usize, err: io::Error) {
let mut connects = self.inner.connects.borrow_mut();
if connects.contains(id) {
let item = connects.remove(id);
let _ = item.sender.send(Err(err));
self.inner.api.detach(item.fd, id as u32);
}
}
} }

View file

@ -1,5 +1,5 @@
use std::os::fd::{AsRawFd, RawFd}; use std::os::fd::{AsRawFd, RawFd};
use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task}; use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task, task::Poll};
use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::driver::{DriverApi, Event, Handler};
use ntex_neon::{syscall, Runtime}; use ntex_neon::{syscall, Runtime};
@ -14,7 +14,7 @@ pub(crate) struct StreamCtl<T> {
} }
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug)]
struct Flags: u8 { struct Flags: u8 {
const RD = 0b0000_0001; const RD = 0b0000_0001;
const WR = 0b0000_0010; const WR = 0b0000_0010;
@ -31,23 +31,23 @@ struct StreamItem<T> {
pub(crate) struct StreamOps<T>(Rc<StreamOpsInner<T>>); pub(crate) struct StreamOps<T>(Rc<StreamOpsInner<T>>);
#[derive(Debug)]
enum Change {
Event(Event),
Error(io::Error),
}
struct StreamOpsHandler<T> { struct StreamOpsHandler<T> {
feed: VecDeque<(usize, Change)>,
inner: Rc<StreamOpsInner<T>>, inner: Rc<StreamOpsInner<T>>,
} }
struct StreamOpsInner<T> { struct StreamOpsInner<T> {
api: DriverApi, api: DriverApi,
feed: Cell<Option<VecDeque<u32>>>, delayd_drop: Cell<bool>,
feed: RefCell<Vec<u32>>,
streams: Cell<Option<Box<Slab<StreamItem<T>>>>>, streams: Cell<Option<Box<Slab<StreamItem<T>>>>>,
} }
impl<T> StreamItem<T> {
fn tag(&self) -> &'static str {
self.context.tag()
}
}
impl<T: AsRawFd + 'static> StreamOps<T> { impl<T: AsRawFd + 'static> StreamOps<T> {
pub(crate) fn current() -> Self { pub(crate) fn current() -> Self {
Runtime::value(|rt| { Runtime::value(|rt| {
@ -55,14 +55,12 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
rt.driver().register(|api| { rt.driver().register(|api| {
let ops = Rc::new(StreamOpsInner { let ops = Rc::new(StreamOpsInner {
api, api,
feed: Cell::new(Some(VecDeque::new())), feed: RefCell::new(Vec::new()),
delayd_drop: Cell::new(false),
streams: Cell::new(Some(Box::new(Slab::new()))), streams: Cell::new(Some(Box::new(Slab::new()))),
}); });
inner = Some(ops.clone()); inner = Some(ops.clone());
Box::new(StreamOpsHandler { Box::new(StreamOpsHandler { inner: ops })
inner: ops,
feed: VecDeque::new(),
})
}); });
StreamOps(inner.unwrap()) StreamOps(inner.unwrap())
@ -71,34 +69,27 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> { pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> {
let fd = io.as_raw_fd(); let fd = io.as_raw_fd();
let item = StreamItem { let stream = self.0.with(move |streams| {
fd, let item = StreamItem {
context, fd,
io: Some(io), context,
ref_count: 1, io: Some(io),
flags: Flags::empty(), ref_count: 1,
}; flags: Flags::empty(),
let stream = self.with(move |streams| { };
let id = streams.insert(item) as u32;
StreamCtl { StreamCtl {
id, id: streams.insert(item) as u32,
inner: self.0.clone(), inner: self.0.clone(),
} }
}); });
self.0.api.attach(fd, stream.id, None); self.0.api.attach(
fd,
stream.id,
Some(Event::new(0, false, false).with_interrupt()),
);
stream stream
} }
fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Slab<StreamItem<T>>) -> R,
{
let mut inner = self.0.streams.take().unwrap();
let result = f(&mut inner);
self.0.streams.set(Some(inner));
result
}
} }
impl<T> Clone for StreamOps<T> { impl<T> Clone for StreamOps<T> {
@ -108,128 +99,112 @@ impl<T> Clone for StreamOps<T> {
} }
impl<T> Handler for StreamOpsHandler<T> { impl<T> Handler for StreamOpsHandler<T> {
fn event(&mut self, id: usize, event: Event) { fn event(&mut self, id: usize, ev: Event) {
log::debug!("FD is readable {:?}", id); log::debug!("FD event {:?} event: {:?}", id, ev);
self.feed.push_back((id, Change::Event(event)));
self.inner.with(|streams| {
if !streams.contains(id) {
return;
}
let item = &mut streams[id];
// handle HUP
if ev.is_interrupt() {
item.context.stopped(None);
if item.io.take().is_some() {
close(id as u32, item.fd, &self.inner.api);
}
return;
}
let mut renew_ev = Event::new(0, false, false).with_interrupt();
if ev.readable {
let res = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let result = task::ready!(syscall!(
break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len())
));
if let Ok(size) = result {
log::debug!("{}: data {:?}, s: {:?}", item.tag(), item.fd, size);
unsafe { buf.advance_mut(size) };
}
Poll::Ready(result)
});
if res.is_pending() && item.context.is_read_ready() {
renew_ev.readable = true;
item.flags.insert(Flags::RD);
} else {
item.flags.remove(Flags::RD);
}
} else if item.flags.contains(Flags::RD) {
renew_ev.readable = true;
}
if ev.writable {
let result = item.context.with_write_buf(|buf| {
log::debug!("{}: write {:?} s: {:?}", item.tag(), item.fd, buf.len());
syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len()))
});
if result.is_pending() {
renew_ev.writable = true;
item.flags.insert(Flags::WR);
} else {
item.flags.remove(Flags::WR);
}
} else if item.flags.contains(Flags::WR) {
renew_ev.writable = true;
}
self.inner.api.modify(item.fd, id as u32, renew_ev);
// delayed drops
if self.inner.delayd_drop.get() {
for id in self.inner.feed.borrow_mut().drain(..) {
let item = &mut streams[id as usize];
item.ref_count -= 1;
if item.ref_count == 0 {
let item = streams.remove(id as usize);
log::debug!(
"{}: Drop ({}), {:?}, has-io: {}",
item.tag(),
id,
item.fd,
item.io.is_some()
);
if item.io.is_some() {
close(id, item.fd, &self.inner.api);
}
}
}
self.inner.delayd_drop.set(false);
}
});
} }
fn error(&mut self, id: usize, err: io::Error) { fn error(&mut self, id: usize, err: io::Error) {
log::debug!("FD is failed {:?}, err: {:?}", id, err); self.inner.with(|streams| {
self.feed.push_back((id, Change::Error(err))); if let Some(item) = streams.get_mut(id) {
log::debug!("FD is failed ({}) {:?}, err: {:?}", id, item.fd, err);
item.context.stopped(Some(err));
if item.io.take().is_some() {
close(id as u32, item.fd, &self.inner.api);
}
}
})
} }
}
fn commit(&mut self) { impl<T> StreamOpsInner<T> {
if self.feed.is_empty() { fn with<F, R>(&self, f: F) -> R
return; where
} F: FnOnce(&mut Slab<StreamItem<T>>) -> R,
log::debug!("Commit changes, num: {:?}", self.feed.len()); {
let mut streams = self.streams.take().unwrap();
let mut streams = self.inner.streams.take().unwrap(); let result = f(&mut streams);
self.streams.set(Some(streams));
for (id, change) in self.feed.drain(..) { result
match change {
Change::Event(ev) => {
let item = &mut streams[id];
let mut renew_ev = Event::new(0, false, false).with_interrupt();
if ev.readable {
let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
task::Poll::Ready(
task::ready!(syscall!(
break libc::read(item.fd, b as _, chunk.len())
))
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
log::debug!(
"{}: {:?}, SIZE: {:?}",
item.context.tag(),
item.fd,
size
);
}),
)
});
if item.io.is_some() && result.is_pending() {
if item.context.is_read_ready() {
renew_ev.readable = true;
}
}
} else if item.flags.contains(Flags::RD) {
renew_ev.readable = true;
}
if ev.writable {
let result = item.context.with_write_buf(|buf| {
log::debug!(
"{}: writing {:?} SIZE: {:?}",
item.context.tag(),
item.fd,
buf.len()
);
let slice = &buf[..];
syscall!(
break libc::write(
item.fd,
slice.as_ptr() as _,
slice.len()
)
)
});
if item.io.is_some() && result.is_pending() {
renew_ev.writable = true;
}
} else if item.flags.contains(Flags::WR) {
renew_ev.writable = true;
}
if ev.is_interrupt() {
item.context.stopped(None);
if let Some(_) = item.io.take() {
close(id as u32, item.fd, &self.inner.api);
}
continue;
} else {
item.flags.set(Flags::RD, renew_ev.readable);
item.flags.set(Flags::WR, renew_ev.writable);
self.inner.api.modify(item.fd, id as u32, renew_ev);
}
}
Change::Error(err) => {
if let Some(item) = streams.get_mut(id) {
item.context.stopped(Some(err));
if let Some(_) = item.io.take() {
close(id as u32, item.fd, &self.inner.api);
}
}
}
}
}
// extra
let mut feed = self.inner.feed.take().unwrap();
for id in feed.drain(..) {
let item = &mut streams[id as usize];
item.ref_count -= 1;
if item.ref_count == 0 {
let item = streams.remove(id as usize);
log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}",
item.context.tag(),
id,
item.fd,
item.io.is_some()
);
if item.io.is_some() {
close(id, item.fd, &self.inner.api);
}
}
}
self.inner.feed.set(Some(feed));
self.inner.streams.set(Some(streams));
} }
} }
@ -244,7 +219,9 @@ fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle<io::Result<
impl<T> StreamCtl<T> { impl<T> StreamCtl<T> {
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> { pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
let id = self.id as usize; let id = self.id as usize;
let (io, fd) = self.with(|streams| (streams[id].io.take(), streams[id].fd)); let (io, fd) = self
.inner
.with(|streams| (streams[id].io.take(), streams[id].fd));
let fut = if let Some(io) = io { let fut = if let Some(io) = io {
log::debug!("Closing ({}), {:?}", id, fd); log::debug!("Closing ({}), {:?}", id, fd);
std::mem::forget(io); std::mem::forget(io);
@ -266,90 +243,84 @@ impl<T> StreamCtl<T> {
where where
F: FnOnce(Option<&T>) -> R, F: FnOnce(Option<&T>) -> R,
{ {
self.with(|streams| f(streams[self.id as usize].io.as_ref())) self.inner
.with(|streams| f(streams[self.id as usize].io.as_ref()))
} }
pub(crate) fn modify(&self, readable: bool, writable: bool) { pub(crate) fn modify(&self, rd: bool, wr: bool) {
self.with(|streams| { self.inner.with(|streams| {
let item = &mut streams[self.id as usize]; let item = &mut streams[self.id as usize];
item.flags = Flags::empty();
log::debug!( log::debug!(
"{}: Modify interest ({}), {:?} read: {:?}, write: {:?}", "{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}",
item.context.tag(), item.tag(),
self.id, self.id,
item.fd, item.fd,
readable, rd,
writable wr
); );
let mut event = Event::new(0, false, false).with_interrupt(); let mut event = Event::new(0, false, false).with_interrupt();
if readable { if rd {
let result = item.context.with_read_buf(|buf| { if item.flags.contains(Flags::RD) {
let chunk = buf.chunk_mut(); event.readable = true;
let b = chunk.as_mut_ptr(); } else {
task::Poll::Ready( let res = item.context.with_read_buf(|buf| {
task::ready!(syscall!( let chunk = buf.chunk_mut();
break libc::read(item.fd, b as _, chunk.len()) let result = task::ready!(syscall!(
)) break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len())
.inspect(|size| { ));
unsafe { buf.advance_mut(*size) }; if let Ok(size) = result {
log::debug!( log::debug!(
"{}: {:?}, SIZE: {:?}", "{}: read {:?}, s: {:?}",
item.context.tag(), item.tag(),
item.fd, item.fd,
size size
); );
}), unsafe { buf.advance_mut(size) };
) }
}); Poll::Ready(result)
});
if item.io.is_some() && result.is_pending() { if res.is_pending() && item.context.is_read_ready() {
if item.context.is_read_ready() {
event.readable = true; event.readable = true;
item.flags.insert(Flags::RD); item.flags.insert(Flags::RD);
} }
} }
} }
if writable { if wr {
let result = item.context.with_write_buf(|buf| { if item.flags.contains(Flags::WR) {
log::debug!(
"{}: Writing io ({}), buf: {:?}",
item.context.tag(),
self.id,
buf.len()
);
let slice = &buf[..];
syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len()))
});
if item.io.is_some() && result.is_pending() {
event.writable = true; event.writable = true;
item.flags.insert(Flags::WR); } else {
let result = item.context.with_write_buf(|buf| {
log::debug!(
"{}: Writing ({}), buf: {:?}",
item.tag(),
self.id,
buf.len()
);
syscall!(
break libc::write(item.fd, buf[..].as_ptr() as _, buf.len())
)
});
if result.is_pending() {
event.writable = true;
item.flags.insert(Flags::WR);
}
} }
} }
self.inner.api.modify(item.fd, self.id as u32, event); self.inner.api.modify(item.fd, self.id, event);
}) })
} }
fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Slab<StreamItem<T>>) -> R,
{
let mut inner = self.inner.streams.take().unwrap();
let result = f(&mut inner);
self.inner.streams.set(Some(inner));
result
}
} }
impl<T> Clone for StreamCtl<T> { impl<T> Clone for StreamCtl<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
self.with(|streams| { self.inner.with(|streams| {
streams[self.id as usize].ref_count += 1; streams[self.id as usize].ref_count += 1;
Self { Self {
id: self.id, id: self.id,
@ -368,7 +339,7 @@ impl<T> Drop for StreamCtl<T> {
let item = streams.remove(id); let item = streams.remove(id);
log::debug!( log::debug!(
"{}: Drop io ({}), {:?}, has-io: {}", "{}: Drop io ({}), {:?}, has-io: {}",
item.context.tag(), item.tag(),
self.id, self.id,
item.fd, item.fd,
item.io.is_some() item.io.is_some()
@ -379,9 +350,8 @@ impl<T> Drop for StreamCtl<T> {
} }
self.inner.streams.set(Some(streams)); self.inner.streams.set(Some(streams));
} else { } else {
let mut feed = self.inner.feed.take().unwrap(); self.inner.delayd_drop.set(true);
feed.push_back(self.id); self.inner.feed.borrow_mut().push(self.id);
self.inner.feed.set(Some(feed));
} }
} }
} }