use polling fork

This commit is contained in:
Nikolay Kim 2025-04-01 02:08:19 +05:00
parent 12d108c8c2
commit f157439c3f
8 changed files with 52 additions and 51 deletions

View file

@ -46,12 +46,11 @@ ntex-compio = { path = "ntex-compio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-neon = { git = "https://github.com/ntex-rs/neon.git" }
polling = { git = "https://github.com/fafhrd91/polling.git" }
#ntex-neon = { path = "../dev/neon" }
#polling = { path = "../dev/polling" }
[workspace.dependencies]
ntex-polling = "3.7.4"
async-channel = "2"
async-task = "4.5.0"
atomic-waker = "1.1"

View file

@ -670,7 +670,6 @@ impl IoContext {
// set buffer back
let result = match result {
Ok(0) => {
// log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size());
self.0.memory_pool().release_write_buf(buf);
Ok(inner.buffer.write_destination_size())
}
@ -680,7 +679,6 @@ impl IoContext {
self.0.memory_pool().release_write_buf(b);
}
let l = buf.len();
// log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l);
inner.buffer.set_write_destination(buf);
Ok(l)
}
@ -782,7 +780,7 @@ impl IoContext {
nbytes
);
if !inner.dispatch_task.wake_checked() {
log::error!("Dispatcher waker is not registered");
log::error!("Dispatcher waker is not registered, bytes: {:?}, flags: {:?}", status.nbytes, inner.flags.get());
}
} else {
if nbytes >= hw {

View file

@ -1,5 +1,11 @@
# Changes
## [2.5.11] - 2025-04-01
* Use edge mode for polling driver
* Use polling fork
## [2.5.10] - 2025-03-28
* Better closed sockets handling

View file

@ -27,8 +27,8 @@ compio = ["ntex-rt/compio", "ntex-compio"]
# neon runtime
neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"]
polling = ["ntex-neon/polling", "dep:polling", "socket2"]
io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"]
ntex-polling = ["ntex-neon/ntex-polling", "dep:ntex-polling", "socket2"]
[dependencies]
ntex-service = "3.3"
@ -53,7 +53,7 @@ thiserror = { workspace = true }
# Linux specific dependencies
[target.'cfg(target_os = "linux")'.dependencies]
io-uring = { workspace = true, optional = true }
polling = { workspace = true, optional = true }
ntex-polling = { workspace = true, optional = true }
[dev-dependencies]
ntex = "2"

View file

@ -105,7 +105,7 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
fd,
stream.id,
Event::new(0, false, false).with_interrupt(),
PollMode::Edge,
PollMode::Oneshot,
);
stream
}
@ -124,12 +124,12 @@ impl<T> Handler for StreamOpsHandler<T> {
return;
}
let item = &mut streams[id];
if item.io.is_none() {
if item.io.is_none() || item.contains(Flags::ERR) {
item.context.stopped(None);
return;
}
log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev);
let mut changed = false;
let mut renew_ev = Event::new(0, false, false).with_interrupt();
// handle read op
@ -141,23 +141,22 @@ impl<T> Handler for StreamOpsHandler<T> {
if res.is_pending() && item.context.is_read_ready() {
renew_ev.readable = true;
} else {
changed = true;
item.remove(Flags::RD);
}
} else if item.contains(Flags::RD) {
renew_ev.readable = true;
}
// handle error
if ev.is_err() == Some(true) {
item.insert(Flags::ERR);
// handle HUP
if ev.is_interrupt() && !item.contains(Flags::ERR) {
item.context.stopped(None);
close(id as u32, item, &self.inner.api);
return;
}
// handle HUP
if ev.is_interrupt() {
item.context.stopped(None);
close(id as u32, item, &self.inner.api, None);
return;
// handle error
if ev.is_err() == Some(true) || ev.is_interrupt() {
item.insert(Flags::ERR);
}
// handle write op
@ -169,18 +168,15 @@ impl<T> Handler for StreamOpsHandler<T> {
if result.is_pending() {
renew_ev.writable = true;
} else {
changed = true;
item.remove(Flags::WR);
}
} else if item.contains(Flags::WR) {
renew_ev.writable = true;
}
if changed {
self.inner
.api
.modify(item.fd, id as u32, renew_ev, PollMode::Edge);
}
self.inner
.api
.modify(item.fd, id as u32, renew_ev, PollMode::Oneshot);
// delayed drops
if self.inner.delayd_drop.get() {
@ -196,7 +192,8 @@ impl<T> Handler for StreamOpsHandler<T> {
item.fd,
item.io.is_some()
);
close(id, &mut item, &self.inner.api, None);
item.context.stopped(None);
close(id, &mut item, &self.inner.api);
}
}
self.inner.delayd_drop.set(false);
@ -214,7 +211,9 @@ impl<T> Handler for StreamOpsHandler<T> {
item.fd,
err
);
close(id as u32, item, &self.inner.api, Some(err));
item.insert(Flags::ERR);
item.context.stopped(Some(err));
close(id as u32, item, &self.inner.api);
}
})
}
@ -258,13 +257,12 @@ fn read<T>(
let chunk = buf.chunk_mut();
let chunk_len = chunk.len();
let chunk_ptr = chunk.as_mut_ptr();
let result =
syscall!(break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()));
let result = syscall!(break libc::read(item.fd, chunk_ptr as _, chunk.len()));
if let Poll::Ready(Ok(size)) = result {
unsafe { buf.advance_mut(size) };
total += size;
//if size != 0 {
if size == chunk_len {
continue;
}
@ -281,8 +279,8 @@ fn read<T>(
return match result {
Poll::Ready(Err(err)) => {
item.insert(Flags::ERR);
if total > 0 {
item.insert(Flags::ERR);
item.context.stopped(Some(err));
Poll::Ready(Ok(total))
} else {
@ -311,18 +309,12 @@ fn close<T>(
id: u32,
item: &mut StreamItem<T>,
api: &DriverApi,
error: Option<io::Error>,
) -> Option<ntex_rt::JoinHandle<io::Result<i32>>> {
if let Some(io) = item.io.take() {
log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd);
mem::forget(io);
let shutdown = if let Some(err) = error {
item.context.stopped(Some(err));
false
} else {
!item.flags.get().intersects(Flags::ERR | Flags::RDSH)
};
let fd = item.fd;
let shutdown = !item.flags.get().intersects(Flags::ERR | Flags::RDSH);
api.detach(fd, id);
Some(ntex_rt::spawn_blocking(move || {
if shutdown {
@ -340,7 +332,8 @@ impl<T> StreamCtl<T> {
let id = self.id as usize;
let fut = self.inner.with(|streams| {
let item = &mut streams[id];
close(self.id, item, &self.inner.api, None)
item.context.stopped(None);
close(self.id, item, &self.inner.api)
});
async move {
if let Some(fut) = fut {
@ -360,11 +353,11 @@ impl<T> StreamCtl<T> {
.with(|streams| f(streams[self.id as usize].io.as_ref()))
}
pub(crate) fn modify(&self, rd: bool, wr: bool) {
pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool {
self.inner.with(|streams| {
let item = &mut streams[self.id as usize];
if item.contains(Flags::ERR) {
return;
if item.io.is_none() || item.contains(Flags::ERR) {
return false;
}
log::debug!(
@ -423,8 +416,9 @@ impl<T> StreamCtl<T> {
if changed {
self.inner
.api
.modify(item.fd, self.id, event, PollMode::Edge);
.modify(item.fd, self.id, event, PollMode::Oneshot);
}
true
})
}
}
@ -455,7 +449,8 @@ impl<T> Drop for StreamCtl<T> {
item.fd,
item.io.is_some()
);
close(self.id, &mut item, &self.inner.api, None);
item.context.stopped(None);
close(self.id, &mut item, &self.inner.api);
}
self.inner.streams.set(Some(streams));
} else {

View file

@ -82,7 +82,9 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
};
if modify {
ctl.modify(readable, writable);
if !ctl.modify(readable, writable) {
return Poll::Ready(Status::Terminate);
}
}
if read.is_pending() && write.is_pending() {
@ -95,7 +97,9 @@ async fn run<T>(ctl: StreamCtl<T>, context: IoContext) {
})
.await;
ctl.modify(false, true);
context.shutdown(st == Status::Shutdown).await;
if st != Status::Terminate {
ctl.modify(false, true);
context.shutdown(st == Status::Shutdown).await;
}
context.stopped(ctl.close().await.err());
}

View file

@ -68,6 +68,7 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
)?)))
}
#[cfg(all(target_os = "linux", feature = "neon"))]
#[cfg(test)]
mod tests {
use ntex::{io::Io, time::sleep, time::Millis, util::PoolId};
@ -97,7 +98,7 @@ mod tests {
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World";
// #[ntex::test]
#[ntex::test]
async fn idle_disconnect() {
PoolId::P5.set_read_params(24, 12);
let (tx, rx) = ::oneshot::channel();

View file

@ -252,8 +252,6 @@ where
Ok(())
})
});
thread::sleep(std::time::Duration::from_millis(150));
let (system, server, addr) = rx.recv().unwrap();
TestServer {