diff --git a/Cargo.toml b/Cargo.toml index e5006289..eed34f06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,12 +46,11 @@ ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } -polling = { git = "https://github.com/fafhrd91/polling.git" } - #ntex-neon = { path = "../dev/neon" } -#polling = { path = "../dev/polling" } [workspace.dependencies] +ntex-polling = "3.7.4" + async-channel = "2" async-task = "4.5.0" atomic-waker = "1.1" diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 3a078c18..90e18145 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -670,7 +670,6 @@ impl IoContext { // set buffer back let result = match result { Ok(0) => { - // log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size()); self.0.memory_pool().release_write_buf(buf); Ok(inner.buffer.write_destination_size()) } @@ -680,7 +679,6 @@ impl IoContext { self.0.memory_pool().release_write_buf(b); } let l = buf.len(); - // log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l); inner.buffer.set_write_destination(buf); Ok(l) } @@ -782,7 +780,7 @@ impl IoContext { nbytes ); if !inner.dispatch_task.wake_checked() { - log::error!("Dispatcher waker is not registered"); + log::error!("Dispatcher waker is not registered, bytes: {:?}, flags: {:?}", status.nbytes, inner.flags.get()); } } else { if nbytes >= hw { diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index e60744ef..f6600129 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.5.11] - 2025-04-01 + +* Use edge mode for polling driver + +* Use polling fork + ## [2.5.10] - 2025-03-28 * Better closed sockets handling diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 12dec037..0174ce5e 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -27,8 +27,8 @@ compio = ["ntex-rt/compio", "ntex-compio"] # neon runtime neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"] -polling = ["ntex-neon/polling", "dep:polling", "socket2"] io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"] +ntex-polling = ["ntex-neon/ntex-polling", "dep:ntex-polling", "socket2"] [dependencies] ntex-service = "3.3" @@ -53,7 +53,7 @@ thiserror = { workspace = true } # Linux specific dependencies [target.'cfg(target_os = "linux")'.dependencies] io-uring = { workspace = true, optional = true } -polling = { workspace = true, optional = true } +ntex-polling = { workspace = true, optional = true } [dev-dependencies] ntex = "2" diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 6fd8d2e0..39d4872e 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -105,7 +105,7 @@ impl StreamOps { fd, stream.id, Event::new(0, false, false).with_interrupt(), - PollMode::Edge, + PollMode::Oneshot, ); stream } @@ -124,12 +124,12 @@ impl Handler for StreamOpsHandler { return; } let item = &mut streams[id]; - if item.io.is_none() { + if item.io.is_none() || item.contains(Flags::ERR) { + item.context.stopped(None); return; } log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - let mut changed = false; let mut renew_ev = Event::new(0, false, false).with_interrupt(); // handle read op @@ -141,23 +141,22 @@ impl Handler for StreamOpsHandler { if res.is_pending() && item.context.is_read_ready() { renew_ev.readable = true; } else { - changed = true; item.remove(Flags::RD); } } else if item.contains(Flags::RD) { renew_ev.readable = true; } - // handle error - if ev.is_err() == Some(true) { - item.insert(Flags::ERR); + // handle HUP + if ev.is_interrupt() && !item.contains(Flags::ERR) { + item.context.stopped(None); + close(id as u32, item, &self.inner.api); + return; } - // handle HUP - if ev.is_interrupt() { - item.context.stopped(None); - close(id as u32, item, &self.inner.api, None); - return; + // handle error + if ev.is_err() == Some(true) || ev.is_interrupt() { + item.insert(Flags::ERR); } // handle write op @@ -169,18 +168,15 @@ impl Handler for StreamOpsHandler { if result.is_pending() { renew_ev.writable = true; } else { - changed = true; item.remove(Flags::WR); } } else if item.contains(Flags::WR) { renew_ev.writable = true; } - if changed { - self.inner - .api - .modify(item.fd, id as u32, renew_ev, PollMode::Edge); - } + self.inner + .api + .modify(item.fd, id as u32, renew_ev, PollMode::Oneshot); // delayed drops if self.inner.delayd_drop.get() { @@ -196,7 +192,8 @@ impl Handler for StreamOpsHandler { item.fd, item.io.is_some() ); - close(id, &mut item, &self.inner.api, None); + item.context.stopped(None); + close(id, &mut item, &self.inner.api); } } self.inner.delayd_drop.set(false); @@ -214,7 +211,9 @@ impl Handler for StreamOpsHandler { item.fd, err ); - close(id as u32, item, &self.inner.api, Some(err)); + item.insert(Flags::ERR); + item.context.stopped(Some(err)); + close(id as u32, item, &self.inner.api); } }) } @@ -258,13 +257,12 @@ fn read( let chunk = buf.chunk_mut(); let chunk_len = chunk.len(); + let chunk_ptr = chunk.as_mut_ptr(); - let result = - syscall!(break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len())); + let result = syscall!(break libc::read(item.fd, chunk_ptr as _, chunk.len())); if let Poll::Ready(Ok(size)) = result { unsafe { buf.advance_mut(size) }; total += size; - //if size != 0 { if size == chunk_len { continue; } @@ -281,8 +279,8 @@ fn read( return match result { Poll::Ready(Err(err)) => { + item.insert(Flags::ERR); if total > 0 { - item.insert(Flags::ERR); item.context.stopped(Some(err)); Poll::Ready(Ok(total)) } else { @@ -311,18 +309,12 @@ fn close( id: u32, item: &mut StreamItem, api: &DriverApi, - error: Option, ) -> Option>> { if let Some(io) = item.io.take() { log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); mem::forget(io); - let shutdown = if let Some(err) = error { - item.context.stopped(Some(err)); - false - } else { - !item.flags.get().intersects(Flags::ERR | Flags::RDSH) - }; let fd = item.fd; + let shutdown = !item.flags.get().intersects(Flags::ERR | Flags::RDSH); api.detach(fd, id); Some(ntex_rt::spawn_blocking(move || { if shutdown { @@ -340,7 +332,8 @@ impl StreamCtl { let id = self.id as usize; let fut = self.inner.with(|streams| { let item = &mut streams[id]; - close(self.id, item, &self.inner.api, None) + item.context.stopped(None); + close(self.id, item, &self.inner.api) }); async move { if let Some(fut) = fut { @@ -360,11 +353,11 @@ impl StreamCtl { .with(|streams| f(streams[self.id as usize].io.as_ref())) } - pub(crate) fn modify(&self, rd: bool, wr: bool) { + pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool { self.inner.with(|streams| { let item = &mut streams[self.id as usize]; - if item.contains(Flags::ERR) { - return; + if item.io.is_none() || item.contains(Flags::ERR) { + return false; } log::debug!( @@ -423,8 +416,9 @@ impl StreamCtl { if changed { self.inner .api - .modify(item.fd, self.id, event, PollMode::Edge); + .modify(item.fd, self.id, event, PollMode::Oneshot); } + true }) } } @@ -455,7 +449,8 @@ impl Drop for StreamCtl { item.fd, item.io.is_some() ); - close(self.id, &mut item, &self.inner.api, None); + item.context.stopped(None); + close(self.id, &mut item, &self.inner.api); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-net/src/rt_polling/io.rs b/ntex-net/src/rt_polling/io.rs index 990dae8f..254343e5 100644 --- a/ntex-net/src/rt_polling/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -82,7 +82,9 @@ async fn run(ctl: StreamCtl, context: IoContext) { }; if modify { - ctl.modify(readable, writable); + if !ctl.modify(readable, writable) { + return Poll::Ready(Status::Terminate); + } } if read.is_pending() && write.is_pending() { @@ -95,7 +97,9 @@ async fn run(ctl: StreamCtl, context: IoContext) { }) .await; - ctl.modify(false, true); - context.shutdown(st == Status::Shutdown).await; + if st != Status::Terminate { + ctl.modify(false, true); + context.shutdown(st == Status::Shutdown).await; + } context.stopped(ctl.close().await.err()); } diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index 755fda0a..08732fa7 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -68,6 +68,7 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { )?))) } +#[cfg(all(target_os = "linux", feature = "neon"))] #[cfg(test)] mod tests { use ntex::{io::Io, time::sleep, time::Millis, util::PoolId}; @@ -97,7 +98,7 @@ mod tests { Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; - // #[ntex::test] + #[ntex::test] async fn idle_disconnect() { PoolId::P5.set_read_params(24, 12); let (tx, rx) = ::oneshot::channel(); diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 0e4a6559..b3ecee4d 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -252,8 +252,6 @@ where Ok(()) }) }); - thread::sleep(std::time::Duration::from_millis(150)); - let (system, server, addr) = rx.recv().unwrap(); TestServer {