From e1323b7eae6fc78e3b614cf377c4a0e308a61927 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 31 Mar 2025 15:33:27 +0500 Subject: [PATCH 1/7] Use neon main --- Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index d9e97ef4..3d6e9f07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,8 @@ ntex-util = { path = "ntex-util" } ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } +ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } + [workspace.dependencies] async-channel = "2" async-task = "4.5.0" From 73dee4d224410b34f78adac9e8177749563125d1 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 31 Mar 2025 15:56:57 +0500 Subject: [PATCH 2/7] Update neon api usage --- Cargo.toml | 1 + ntex-net/Cargo.toml | 4 ++-- ntex-net/src/rt_polling/connect.rs | 2 +- ntex-net/src/rt_polling/driver.rs | 2 +- ntex-net/src/rt_uring/connect.rs | 2 +- ntex-net/src/rt_uring/driver.rs | 2 +- ntex-rt/Cargo.toml | 2 +- ntex-rt/src/lib.rs | 2 +- 8 files changed, 9 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d6e9f07..681247bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } +#ntex-neon = { path = "../dev/neon" } [workspace.dependencies] async-channel = "2" diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 5a72d3eb..7c2ae447 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.10" +version = "2.5.11" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.15", optional = true } +ntex-neon = { version = "0.1.16", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index 8f0f1dc9..a0e2bd83 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -34,7 +34,7 @@ impl ConnectOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { let ops = Rc::new(ConnectOpsInner { api, connects: RefCell::new(Slab::new()), diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 24db553d..38263fc6 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -52,7 +52,7 @@ impl StreamOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { let ops = Rc::new(StreamOpsInner { api, feed: RefCell::new(Vec::new()), diff --git a/ntex-net/src/rt_uring/connect.rs b/ntex-net/src/rt_uring/connect.rs index ea9be3e1..2e86eb29 100644 --- a/ntex-net/src/rt_uring/connect.rs +++ b/ntex-net/src/rt_uring/connect.rs @@ -31,7 +31,7 @@ impl ConnectOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { if !api.is_supported(opcode::Connect::CODE) { panic!("opcode::Connect is required for io-uring support"); } diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index d39d69e8..7115b9a7 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -77,7 +77,7 @@ impl StreamOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { if !api.is_supported(opcode::Recv::CODE) { panic!("opcode::Recv is required for io-uring support"); } diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index a5966d76..36387680 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -42,4 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } -ntex-neon = { version = "0.1.14", optional = true } +ntex-neon = { version = "0.1.15", optional = true } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index d5d85546..692e7904 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -265,7 +265,7 @@ mod neon { let rt = Runtime::new().unwrap(); log::info!( "Starting neon runtime, driver {:?}", - rt.driver().tp().name() + rt.driver_type().name() ); rt.block_on(fut); From 12d108c8c25c89645dfa8f66b556e410de5b92a6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 31 Mar 2025 19:40:10 +0500 Subject: [PATCH 3/7] Refactor polling impl --- Cargo.toml | 3 + ntex-io/Cargo.toml | 2 +- ntex-io/src/tasks.rs | 12 +- ntex-net/Cargo.toml | 3 +- ntex-net/src/rt_polling/connect.rs | 7 +- ntex-net/src/rt_polling/driver.rs | 224 +++++++++++++++++++++-------- ntex-net/src/rt_polling/mod.rs | 58 ++++++++ ntex/tests/connect.rs | 3 +- ntex/tests/http_awc_client.rs | 9 +- 9 files changed, 238 insertions(+), 83 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 681247bd..e5006289 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,10 @@ ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } +polling = { git = "https://github.com/fafhrd91/polling.git" } + #ntex-neon = { path = "../dev/neon" } +#polling = { path = "../dev/polling" } [workspace.dependencies] async-channel = "2" diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index f55aa5d0..f7a54de3 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.11.1" +version = "2.11.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 55f99416..3a078c18 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -739,19 +739,11 @@ impl IoContext { pub fn with_read_buf(&self, f: F) -> Poll<()> where - F: FnOnce(&mut BytesVec) -> Poll>, + F: FnOnce(&mut BytesVec, usize, usize) -> Poll>, { let inner = &self.0 .0; let (hw, lw) = self.0.memory_pool().read_params().unpack(); - let result = inner.buffer.with_read_source(&self.0, |buf| { - // make sure we've got room - let remaining = buf.remaining_mut(); - if remaining < lw { - buf.reserve(hw - remaining); - } - - f(buf) - }); + let result = inner.buffer.with_read_source(&self.0, |buf| f(buf, hw, lw)); // handle buffer changes match result { diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 7c2ae447..12dec037 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -34,7 +34,7 @@ io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"] ntex-service = "3.3" ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "2.11.1" +ntex-io = "2.11.2" ntex-rt = "0.4.25" ntex-util = "2.5" @@ -57,3 +57,4 @@ polling = { workspace = true, optional = true } [dev-dependencies] ntex = "2" +oneshot = "0.1" diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index a0e2bd83..3123fd16 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -1,7 +1,7 @@ use std::os::fd::{AsRawFd, RawFd}; use std::{cell::RefCell, io, rc::Rc, task::Poll}; -use ntex_neon::driver::{DriverApi, Event, Handler}; +use ntex_neon::driver::{DriverApi, Event, Handler, PollMode}; use ntex_neon::{syscall, Runtime}; use ntex_util::channel::oneshot::Sender; use slab::Slab; @@ -62,7 +62,9 @@ impl ConnectOps { let item = Item { fd, sender }; let id = self.0.connects.borrow_mut().insert(item); - self.0.api.attach(fd, id as u32, Some(Event::writable(0))); + self.0 + .api + .attach(fd, id as u32, Event::writable(0), PollMode::Oneshot); Ok(id) } } @@ -72,7 +74,6 @@ impl Handler for ConnectOpsBatcher { log::debug!("connect-fd is readable {:?}", id); let mut connects = self.inner.connects.borrow_mut(); - if connects.contains(id) { let item = connects.remove(id); if event.writable { diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 38263fc6..6fd8d2e0 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,11 +1,11 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task, task::Poll}; +use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task::Poll}; -use ntex_neon::driver::{DriverApi, Event, Handler}; +use ntex_neon::driver::{DriverApi, Event, Handler, PollMode}; use ntex_neon::{syscall, Runtime}; use slab::Slab; -use ntex_bytes::BufMut; +use ntex_bytes::{BufMut, BytesVec}; use ntex_io::IoContext; pub(crate) struct StreamCtl { @@ -16,15 +16,17 @@ pub(crate) struct StreamCtl { bitflags::bitflags! { #[derive(Copy, Clone, Debug)] struct Flags: u8 { - const RD = 0b0000_0001; - const WR = 0b0000_0010; + const RD = 0b0000_0001; + const WR = 0b0000_0010; + const ERR = 0b0000_0100; + const RDSH = 0b0000_1000; } } struct StreamItem { io: Option, fd: RawFd, - flags: Flags, + flags: Cell, ref_count: u16, context: IoContext, } @@ -46,6 +48,22 @@ impl StreamItem { fn tag(&self) -> &'static str { self.context.tag() } + + fn contains(&self, flag: Flags) -> bool { + self.flags.get().contains(flag) + } + + fn insert(&self, fl: Flags) { + let mut flags = self.flags.get(); + flags.insert(fl); + self.flags.set(flags); + } + + fn remove(&self, fl: Flags) { + let mut flags = self.flags.get(); + flags.remove(fl); + self.flags.set(flags); + } } impl StreamOps { @@ -75,7 +93,7 @@ impl StreamOps { context, io: Some(io), ref_count: 1, - flags: Flags::empty(), + flags: Cell::new(Flags::empty()), }; StreamCtl { id: streams.insert(item) as u32, @@ -86,7 +104,8 @@ impl StreamOps { self.0.api.attach( fd, stream.id, - Some(Event::new(0, false, false).with_interrupt()), + Event::new(0, false, false).with_interrupt(), + PollMode::Edge, ); stream } @@ -110,38 +129,38 @@ impl Handler for StreamOpsHandler { } log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - // handle HUP - if ev.is_interrupt() { - item.context.stopped(None); - close(id as u32, item, &self.inner.api, None, true); - return; - } - + let mut changed = false; let mut renew_ev = Event::new(0, false, false).with_interrupt(); + // handle read op if ev.readable { - let res = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let result = task::ready!(syscall!( - break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) - )); - if let Ok(size) = result { - log::debug!("{}: data {:?}, s: {:?}", item.tag(), item.fd, size); - unsafe { buf.advance_mut(size) }; - } - Poll::Ready(result) - }); + let res = item + .context + .with_read_buf(|buf, hw, lw| read(item, buf, hw, lw)); if res.is_pending() && item.context.is_read_ready() { renew_ev.readable = true; - item.flags.insert(Flags::RD); } else { - item.flags.remove(Flags::RD); + changed = true; + item.remove(Flags::RD); } - } else if item.flags.contains(Flags::RD) { + } else if item.contains(Flags::RD) { renew_ev.readable = true; } + // handle error + if ev.is_err() == Some(true) { + item.insert(Flags::ERR); + } + + // handle HUP + if ev.is_interrupt() { + item.context.stopped(None); + close(id as u32, item, &self.inner.api, None); + return; + } + + // handle write op if ev.writable { let result = item.context.with_write_buf(|buf| { log::debug!("{}: write {:?} s: {:?}", item.tag(), item.fd, buf.len()); @@ -149,15 +168,19 @@ impl Handler for StreamOpsHandler { }); if result.is_pending() { renew_ev.writable = true; - item.flags.insert(Flags::WR); } else { - item.flags.remove(Flags::WR); + changed = true; + item.remove(Flags::WR); } - } else if item.flags.contains(Flags::WR) { + } else if item.contains(Flags::WR) { renew_ev.writable = true; } - self.inner.api.modify(item.fd, id as u32, renew_ev); + if changed { + self.inner + .api + .modify(item.fd, id as u32, renew_ev, PollMode::Edge); + } // delayed drops if self.inner.delayd_drop.get() { @@ -173,7 +196,7 @@ impl Handler for StreamOpsHandler { item.fd, item.io.is_some() ); - close(id, &mut item, &self.inner.api, None, true); + close(id, &mut item, &self.inner.api, None); } } self.inner.delayd_drop.set(false); @@ -191,7 +214,7 @@ impl Handler for StreamOpsHandler { item.fd, err ); - close(id as u32, item, &self.inner.api, Some(err), false); + close(id as u32, item, &self.inner.api, Some(err)); } }) } @@ -209,19 +232,96 @@ impl StreamOpsInner { } } +fn read( + item: &StreamItem, + buf: &mut BytesVec, + hw: usize, + lw: usize, +) -> Poll> { + log::debug!( + "{}: reading fd ({:?}) flags: {:?}", + item.tag(), + item.fd, + item.context.flags() + ); + if item.contains(Flags::RDSH) { + return Poll::Ready(Ok(0)); + } + + let mut total = 0; + loop { + // make sure we've got room + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + + let chunk = buf.chunk_mut(); + let chunk_len = chunk.len(); + + let result = + syscall!(break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len())); + if let Poll::Ready(Ok(size)) = result { + unsafe { buf.advance_mut(size) }; + total += size; + //if size != 0 { + if size == chunk_len { + continue; + } + } + + log::debug!( + "{}: read fd ({:?}), s: {:?}, cap: {:?}, result: {:?}", + item.tag(), + item.fd, + total, + buf.remaining_mut(), + result + ); + + return match result { + Poll::Ready(Err(err)) => { + if total > 0 { + item.insert(Flags::ERR); + item.context.stopped(Some(err)); + Poll::Ready(Ok(total)) + } else { + Poll::Ready(Err(err)) + } + } + Poll::Ready(Ok(size)) => { + if size == 0 { + item.insert(Flags::RDSH); + item.context.stopped(None); + } + Poll::Ready(Ok(total)) + } + Poll::Pending => { + if total > 0 { + Poll::Ready(Ok(total)) + } else { + Poll::Pending + } + } + }; + } +} + fn close( id: u32, item: &mut StreamItem, api: &DriverApi, error: Option, - shutdown: bool, ) -> Option>> { if let Some(io) = item.io.take() { log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); mem::forget(io); - if let Some(err) = error { + let shutdown = if let Some(err) = error { item.context.stopped(Some(err)); - } + false + } else { + !item.flags.get().intersects(Flags::ERR | Flags::RDSH) + }; let fd = item.fd; api.detach(fd, id); Some(ntex_rt::spawn_blocking(move || { @@ -240,7 +340,7 @@ impl StreamCtl { let id = self.id as usize; let fut = self.inner.with(|streams| { let item = &mut streams[id]; - close(self.id, item, &self.inner.api, None, false) + close(self.id, item, &self.inner.api, None) }); async move { if let Some(fut) = fut { @@ -263,48 +363,41 @@ impl StreamCtl { pub(crate) fn modify(&self, rd: bool, wr: bool) { self.inner.with(|streams| { let item = &mut streams[self.id as usize]; + if item.contains(Flags::ERR) { + return; + } log::debug!( - "{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}", + "{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}, flags: {:?}", item.tag(), self.id, item.fd, rd, - wr + wr, + item.flags ); + let mut changed = false; let mut event = Event::new(0, false, false).with_interrupt(); if rd { - if item.flags.contains(Flags::RD) { + if item.contains(Flags::RD) { event.readable = true; } else { - let res = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let result = task::ready!(syscall!( - break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) - )); - if let Ok(size) = result { - log::debug!( - "{}: read {:?}, s: {:?}", - item.tag(), - item.fd, - size - ); - unsafe { buf.advance_mut(size) }; - } - Poll::Ready(result) - }); + let res = item + .context + .with_read_buf(|buf, hw, lw| read(item, buf, hw, lw)); if res.is_pending() && item.context.is_read_ready() { + changed = true; event.readable = true; - item.flags.insert(Flags::RD); + item.insert(Flags::RD); } } } if wr { - if item.flags.contains(Flags::WR) { + if item.contains(Flags::WR) { event.writable = true; } else { let result = item.context.with_write_buf(|buf| { @@ -320,13 +413,18 @@ impl StreamCtl { }); if result.is_pending() { + changed = true; event.writable = true; - item.flags.insert(Flags::WR); + item.insert(Flags::WR); } } } - self.inner.api.modify(item.fd, self.id, event); + if changed { + self.inner + .api + .modify(item.fd, self.id, event, PollMode::Edge); + } }) } } @@ -357,7 +455,7 @@ impl Drop for StreamCtl { item.fd, item.io.is_some() ); - close(self.id, &mut item, &self.inner.api, None, true); + close(self.id, &mut item, &self.inner.api, None); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index b4fb928b..755fda0a 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -67,3 +67,61 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { Socket::from(stream), )?))) } + +#[cfg(test)] +mod tests { + use ntex::{io::Io, time::sleep, time::Millis, util::PoolId}; + use std::sync::{Arc, Mutex}; + + use crate::connect::Connect; + + const DATA: &[u8] = b"Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World"; + + // #[ntex::test] + async fn idle_disconnect() { + PoolId::P5.set_read_params(24, 12); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); + + let server = ntex::server::test_server(move || { + let tx = tx.clone(); + ntex_service::fn_service(move |io: Io<_>| { + tx.lock().unwrap().take().unwrap().send(()).unwrap(); + + async move { + io.write(DATA).unwrap(); + sleep(Millis(250)).await; + io.close(); + Ok::<_, ()>(()) + } + }) + }); + + let msg = Connect::new(server.addr()); + let io = crate::connect::connect(msg).await.unwrap(); + io.set_memory_pool(PoolId::P5.into()); + rx.await.unwrap(); + + io.on_disconnect().await; + } +} diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index 5ecd51b7..523232a8 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -1,9 +1,8 @@ use std::{io, rc::Rc}; -use ntex::codec::BytesCodec; -use ntex::connect::Connect; use ntex::io::{types::PeerAddr, Io}; use ntex::service::{chain_factory, fn_service, Pipeline, ServiceFactory}; +use ntex::{codec::BytesCodec, connect::Connect}; use ntex::{server::build_test_server, server::test_server, time, util::Bytes}; #[cfg(feature = "rustls")] diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index bd4c7e0a..e80644e9 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -682,15 +682,18 @@ async fn client_read_until_eof() { for stream in lst.incoming() { if let Ok(mut stream) = stream { let mut b = [0; 1000]; - let _ = stream.read(&mut b).unwrap(); - let _ = stream + log::debug!("Reading request"); + let res = stream.read(&mut b).unwrap(); + log::debug!("Read {:?}", res); + let res = stream .write_all(b"HTTP/1.0 200 OK\r\nconnection: close\r\n\r\nwelcome!"); + log::debug!("Sent {:?}", res); } else { break; } } }); - sleep(Millis(300)).await; + sleep(Millis(500)).await; // client request let req = Client::build() From f157439c3fdc9ce452f58256def8c5310060587a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 1 Apr 2025 02:08:19 +0500 Subject: [PATCH 4/7] use polling fork --- Cargo.toml | 5 +-- ntex-io/src/tasks.rs | 4 +- ntex-net/CHANGES.md | 6 +++ ntex-net/Cargo.toml | 4 +- ntex-net/src/rt_polling/driver.rs | 69 ++++++++++++++----------------- ntex-net/src/rt_polling/io.rs | 10 +++-- ntex-net/src/rt_polling/mod.rs | 3 +- ntex/src/http/test.rs | 2 - 8 files changed, 52 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e5006289..eed34f06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,12 +46,11 @@ ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } -polling = { git = "https://github.com/fafhrd91/polling.git" } - #ntex-neon = { path = "../dev/neon" } -#polling = { path = "../dev/polling" } [workspace.dependencies] +ntex-polling = "3.7.4" + async-channel = "2" async-task = "4.5.0" atomic-waker = "1.1" diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 3a078c18..90e18145 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -670,7 +670,6 @@ impl IoContext { // set buffer back let result = match result { Ok(0) => { - // log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size()); self.0.memory_pool().release_write_buf(buf); Ok(inner.buffer.write_destination_size()) } @@ -680,7 +679,6 @@ impl IoContext { self.0.memory_pool().release_write_buf(b); } let l = buf.len(); - // log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l); inner.buffer.set_write_destination(buf); Ok(l) } @@ -782,7 +780,7 @@ impl IoContext { nbytes ); if !inner.dispatch_task.wake_checked() { - log::error!("Dispatcher waker is not registered"); + log::error!("Dispatcher waker is not registered, bytes: {:?}, flags: {:?}", status.nbytes, inner.flags.get()); } } else { if nbytes >= hw { diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index e60744ef..f6600129 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.5.11] - 2025-04-01 + +* Use edge mode for polling driver + +* Use polling fork + ## [2.5.10] - 2025-03-28 * Better closed sockets handling diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 12dec037..0174ce5e 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -27,8 +27,8 @@ compio = ["ntex-rt/compio", "ntex-compio"] # neon runtime neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"] -polling = ["ntex-neon/polling", "dep:polling", "socket2"] io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"] +ntex-polling = ["ntex-neon/ntex-polling", "dep:ntex-polling", "socket2"] [dependencies] ntex-service = "3.3" @@ -53,7 +53,7 @@ thiserror = { workspace = true } # Linux specific dependencies [target.'cfg(target_os = "linux")'.dependencies] io-uring = { workspace = true, optional = true } -polling = { workspace = true, optional = true } +ntex-polling = { workspace = true, optional = true } [dev-dependencies] ntex = "2" diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 6fd8d2e0..39d4872e 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -105,7 +105,7 @@ impl StreamOps { fd, stream.id, Event::new(0, false, false).with_interrupt(), - PollMode::Edge, + PollMode::Oneshot, ); stream } @@ -124,12 +124,12 @@ impl Handler for StreamOpsHandler { return; } let item = &mut streams[id]; - if item.io.is_none() { + if item.io.is_none() || item.contains(Flags::ERR) { + item.context.stopped(None); return; } log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - let mut changed = false; let mut renew_ev = Event::new(0, false, false).with_interrupt(); // handle read op @@ -141,23 +141,22 @@ impl Handler for StreamOpsHandler { if res.is_pending() && item.context.is_read_ready() { renew_ev.readable = true; } else { - changed = true; item.remove(Flags::RD); } } else if item.contains(Flags::RD) { renew_ev.readable = true; } - // handle error - if ev.is_err() == Some(true) { - item.insert(Flags::ERR); + // handle HUP + if ev.is_interrupt() && !item.contains(Flags::ERR) { + item.context.stopped(None); + close(id as u32, item, &self.inner.api); + return; } - // handle HUP - if ev.is_interrupt() { - item.context.stopped(None); - close(id as u32, item, &self.inner.api, None); - return; + // handle error + if ev.is_err() == Some(true) || ev.is_interrupt() { + item.insert(Flags::ERR); } // handle write op @@ -169,18 +168,15 @@ impl Handler for StreamOpsHandler { if result.is_pending() { renew_ev.writable = true; } else { - changed = true; item.remove(Flags::WR); } } else if item.contains(Flags::WR) { renew_ev.writable = true; } - if changed { - self.inner - .api - .modify(item.fd, id as u32, renew_ev, PollMode::Edge); - } + self.inner + .api + .modify(item.fd, id as u32, renew_ev, PollMode::Oneshot); // delayed drops if self.inner.delayd_drop.get() { @@ -196,7 +192,8 @@ impl Handler for StreamOpsHandler { item.fd, item.io.is_some() ); - close(id, &mut item, &self.inner.api, None); + item.context.stopped(None); + close(id, &mut item, &self.inner.api); } } self.inner.delayd_drop.set(false); @@ -214,7 +211,9 @@ impl Handler for StreamOpsHandler { item.fd, err ); - close(id as u32, item, &self.inner.api, Some(err)); + item.insert(Flags::ERR); + item.context.stopped(Some(err)); + close(id as u32, item, &self.inner.api); } }) } @@ -258,13 +257,12 @@ fn read( let chunk = buf.chunk_mut(); let chunk_len = chunk.len(); + let chunk_ptr = chunk.as_mut_ptr(); - let result = - syscall!(break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len())); + let result = syscall!(break libc::read(item.fd, chunk_ptr as _, chunk.len())); if let Poll::Ready(Ok(size)) = result { unsafe { buf.advance_mut(size) }; total += size; - //if size != 0 { if size == chunk_len { continue; } @@ -281,8 +279,8 @@ fn read( return match result { Poll::Ready(Err(err)) => { + item.insert(Flags::ERR); if total > 0 { - item.insert(Flags::ERR); item.context.stopped(Some(err)); Poll::Ready(Ok(total)) } else { @@ -311,18 +309,12 @@ fn close( id: u32, item: &mut StreamItem, api: &DriverApi, - error: Option, ) -> Option>> { if let Some(io) = item.io.take() { log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); mem::forget(io); - let shutdown = if let Some(err) = error { - item.context.stopped(Some(err)); - false - } else { - !item.flags.get().intersects(Flags::ERR | Flags::RDSH) - }; let fd = item.fd; + let shutdown = !item.flags.get().intersects(Flags::ERR | Flags::RDSH); api.detach(fd, id); Some(ntex_rt::spawn_blocking(move || { if shutdown { @@ -340,7 +332,8 @@ impl StreamCtl { let id = self.id as usize; let fut = self.inner.with(|streams| { let item = &mut streams[id]; - close(self.id, item, &self.inner.api, None) + item.context.stopped(None); + close(self.id, item, &self.inner.api) }); async move { if let Some(fut) = fut { @@ -360,11 +353,11 @@ impl StreamCtl { .with(|streams| f(streams[self.id as usize].io.as_ref())) } - pub(crate) fn modify(&self, rd: bool, wr: bool) { + pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool { self.inner.with(|streams| { let item = &mut streams[self.id as usize]; - if item.contains(Flags::ERR) { - return; + if item.io.is_none() || item.contains(Flags::ERR) { + return false; } log::debug!( @@ -423,8 +416,9 @@ impl StreamCtl { if changed { self.inner .api - .modify(item.fd, self.id, event, PollMode::Edge); + .modify(item.fd, self.id, event, PollMode::Oneshot); } + true }) } } @@ -455,7 +449,8 @@ impl Drop for StreamCtl { item.fd, item.io.is_some() ); - close(self.id, &mut item, &self.inner.api, None); + item.context.stopped(None); + close(self.id, &mut item, &self.inner.api); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-net/src/rt_polling/io.rs b/ntex-net/src/rt_polling/io.rs index 990dae8f..254343e5 100644 --- a/ntex-net/src/rt_polling/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -82,7 +82,9 @@ async fn run(ctl: StreamCtl, context: IoContext) { }; if modify { - ctl.modify(readable, writable); + if !ctl.modify(readable, writable) { + return Poll::Ready(Status::Terminate); + } } if read.is_pending() && write.is_pending() { @@ -95,7 +97,9 @@ async fn run(ctl: StreamCtl, context: IoContext) { }) .await; - ctl.modify(false, true); - context.shutdown(st == Status::Shutdown).await; + if st != Status::Terminate { + ctl.modify(false, true); + context.shutdown(st == Status::Shutdown).await; + } context.stopped(ctl.close().await.err()); } diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index 755fda0a..08732fa7 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -68,6 +68,7 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { )?))) } +#[cfg(all(target_os = "linux", feature = "neon"))] #[cfg(test)] mod tests { use ntex::{io::Io, time::sleep, time::Millis, util::PoolId}; @@ -97,7 +98,7 @@ mod tests { Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; - // #[ntex::test] + #[ntex::test] async fn idle_disconnect() { PoolId::P5.set_read_params(24, 12); let (tx, rx) = ::oneshot::channel(); diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 0e4a6559..b3ecee4d 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -252,8 +252,6 @@ where Ok(()) }) }); - thread::sleep(std::time::Duration::from_millis(150)); - let (system, server, addr) = rx.recv().unwrap(); TestServer { From 52faee44070e9680e98bd068774b6a801911c0b6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 1 Apr 2025 14:38:29 +0500 Subject: [PATCH 5/7] wip --- ntex-io/src/tasks.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 90e18145..31681a59 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -780,7 +780,9 @@ impl IoContext { nbytes ); if !inner.dispatch_task.wake_checked() { - log::error!("Dispatcher waker is not registered, bytes: {:?}, flags: {:?}", status.nbytes, inner.flags.get()); + log::error!( + "{}: Dispatcher waker is not registered, bytes: {:?}, flags: {:?}", + self.0.tag(), status.nbytes, self.flags()); } } else { if nbytes >= hw { From 315cf7766888054d534fcc26d35be59c1a1ea2c7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 1 Apr 2025 16:49:57 +0500 Subject: [PATCH 6/7] allow to get number of active ops --- ntex-net/src/lib.rs | 2 +- ntex-net/src/rt_polling/driver.rs | 4 ++++ ntex-net/src/rt_polling/mod.rs | 6 ++++++ ntex-rt/Cargo.toml | 2 +- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/ntex-net/src/lib.rs b/ntex-net/src/lib.rs index ddc272bb..f97cb50c 100644 --- a/ntex-net/src/lib.rs +++ b/ntex-net/src/lib.rs @@ -21,7 +21,7 @@ cfg_if::cfg_if! { mod rt_impl; pub use self::rt_impl::{ from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, - unix_connect_in, + unix_connect_in, active_stream_ops }; } else { pub use self::compat::*; diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 39d4872e..88f95894 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -85,6 +85,10 @@ impl StreamOps { }) } + pub(crate) fn active_ops() -> usize { + Self::current().0.with(|streams| streams.len()) + } + pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { let fd = io.as_raw_fd(); let stream = self.0.with(move |streams| { diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index 08732fa7..c17a30d2 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -68,6 +68,12 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { )?))) } +#[doc(hidden)] +/// Get number of active Io objects +pub fn active_stream_ops() -> usize { + self::driver::StreamOps::::active_ops() +} + #[cfg(all(target_os = "linux", feature = "neon"))] #[cfg(test)] mod tests { diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 36387680..4e09c6da 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -42,4 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } -ntex-neon = { version = "0.1.15", optional = true } +ntex-neon = { version = "0.1.16", optional = true } From 1fefbf2e6f030a938f79110d377789d965618d74 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 1 Apr 2025 17:10:25 +0500 Subject: [PATCH 7/7] Refactor driver --- ntex-io/src/io.rs | 22 +- ntex-io/src/tasks.rs | 10 +- ntex-net/src/lib.rs | 2 +- ntex-net/src/rt_polling/driver.rs | 386 +++++++++++++----------------- ntex-net/src/rt_polling/io.rs | 27 ++- ntex-net/src/rt_polling/mod.rs | 2 +- ntex-net/src/rt_uring/driver.rs | 4 + ntex-net/src/rt_uring/mod.rs | 6 + 8 files changed, 213 insertions(+), 246 deletions(-) diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 498e249d..a99d0411 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -98,17 +98,19 @@ impl IoState { } pub(super) fn io_stopped(&self, err: Option) { - if err.is_some() { - self.error.set(err); + if !self.flags.get().contains(Flags::IO_STOPPED) { + if err.is_some() { + self.error.set(err); + } + self.read_task.wake(); + self.write_task.wake(); + self.dispatch_task.wake(); + self.notify_disconnect(); + self.handle.take(); + self.insert_flags( + Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS, + ); } - self.read_task.wake(); - self.write_task.wake(); - self.dispatch_task.wake(); - self.notify_disconnect(); - self.handle.take(); - self.insert_flags( - Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS, - ); } /// Gracefully shutdown read and write io tasks diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 31681a59..883ac7ee 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -537,9 +537,7 @@ impl IoContext { self.0.tag(), nbytes ); - if !inner.dispatch_task.wake_checked() { - log::error!("Dispatcher waker is not registered"); - } + inner.dispatch_task.wake(); } else { if nbytes >= hw { // read task is paused because of read back-pressure @@ -779,11 +777,7 @@ impl IoContext { self.0.tag(), nbytes ); - if !inner.dispatch_task.wake_checked() { - log::error!( - "{}: Dispatcher waker is not registered, bytes: {:?}, flags: {:?}", - self.0.tag(), status.nbytes, self.flags()); - } + inner.dispatch_task.wake(); } else { if nbytes >= hw { // read task is paused because of read back-pressure diff --git a/ntex-net/src/lib.rs b/ntex-net/src/lib.rs index f97cb50c..9d2a4387 100644 --- a/ntex-net/src/lib.rs +++ b/ntex-net/src/lib.rs @@ -14,7 +14,7 @@ cfg_if::cfg_if! { mod rt_impl; pub use self::rt_impl::{ from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, - unix_connect_in, + unix_connect_in, active_stream_ops }; } else if #[cfg(all(unix, feature = "neon"))] { #[path = "rt_polling/mod.rs"] diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 88f95894..22c04f50 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,72 +1,50 @@ -use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task::Poll}; +use std::os::fd::RawFd; +use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler, PollMode}; use ntex_neon::{syscall, Runtime}; use slab::Slab; -use ntex_bytes::{BufMut, BytesVec}; +use ntex_bytes::BufMut; use ntex_io::IoContext; -pub(crate) struct StreamCtl { +pub(crate) struct StreamCtl { id: u32, - inner: Rc>, + inner: Rc, } bitflags::bitflags! { #[derive(Copy, Clone, Debug)] struct Flags: u8 { - const RD = 0b0000_0001; - const WR = 0b0000_0010; - const ERR = 0b0000_0100; - const RDSH = 0b0000_1000; + const RD = 0b0000_0001; + const WR = 0b0000_0010; + const RDSH = 0b0000_0100; + const FAILED = 0b0000_1000; + const CLOSED = 0b0001_0000; } } -struct StreamItem { - io: Option, +struct StreamItem { fd: RawFd, - flags: Cell, + flags: Flags, ref_count: u16, context: IoContext, } -pub(crate) struct StreamOps(Rc>); +pub(crate) struct StreamOps(Rc); -struct StreamOpsHandler { - inner: Rc>, +struct StreamOpsHandler { + inner: Rc, } -struct StreamOpsInner { +struct StreamOpsInner { api: DriverApi, delayd_drop: Cell, feed: RefCell>, - streams: Cell>>>>, + streams: Cell>>>, } -impl StreamItem { - fn tag(&self) -> &'static str { - self.context.tag() - } - - fn contains(&self, flag: Flags) -> bool { - self.flags.get().contains(flag) - } - - fn insert(&self, fl: Flags) { - let mut flags = self.flags.get(); - flags.insert(fl); - self.flags.set(flags); - } - - fn remove(&self, fl: Flags) { - let mut flags = self.flags.get(); - flags.remove(fl); - self.flags.set(flags); - } -} - -impl StreamOps { +impl StreamOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; @@ -89,15 +67,13 @@ impl StreamOps { Self::current().0.with(|streams| streams.len()) } - pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { - let fd = io.as_raw_fd(); + pub(crate) fn register(&self, fd: RawFd, context: IoContext) -> StreamCtl { let stream = self.0.with(move |streams| { let item = StreamItem { fd, context, - io: Some(io), ref_count: 1, - flags: Cell::new(Flags::empty()), + flags: Flags::empty(), }; StreamCtl { id: streams.insert(item) as u32, @@ -115,72 +91,61 @@ impl StreamOps { } } -impl Clone for StreamOps { +impl Clone for StreamOps { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl Handler for StreamOpsHandler { +impl Handler for StreamOpsHandler { fn event(&mut self, id: usize, ev: Event) { self.inner.with(|streams| { if !streams.contains(id) { return; } let item = &mut streams[id]; - if item.io.is_none() || item.contains(Flags::ERR) { - item.context.stopped(None); - return; - } + log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - let mut renew_ev = Event::new(0, false, false).with_interrupt(); - - // handle read op + let mut renew = Event::new(0, false, false).with_interrupt(); if ev.readable { - let res = item - .context - .with_read_buf(|buf, hw, lw| read(item, buf, hw, lw)); - + let res = item.read(); if res.is_pending() && item.context.is_read_ready() { - renew_ev.readable = true; + renew.readable = true; + item.flags.insert(Flags::RD); } else { - item.remove(Flags::RD); + item.flags.remove(Flags::RD); } - } else if item.contains(Flags::RD) { - renew_ev.readable = true; + } else if item.flags.contains(Flags::RD) { + renew.readable = true; } - // handle HUP - if ev.is_interrupt() && !item.contains(Flags::ERR) { - item.context.stopped(None); - close(id as u32, item, &self.inner.api); - return; - } - - // handle error - if ev.is_err() == Some(true) || ev.is_interrupt() { - item.insert(Flags::ERR); - } - - // handle write op if ev.writable { let result = item.context.with_write_buf(|buf| { log::debug!("{}: write {:?} s: {:?}", item.tag(), item.fd, buf.len()); syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len())) }); if result.is_pending() { - renew_ev.writable = true; + renew.writable = true; + item.flags.insert(Flags::WR); } else { - item.remove(Flags::WR); + item.flags.remove(Flags::WR); } - } else if item.contains(Flags::WR) { - renew_ev.writable = true; + } else if item.flags.contains(Flags::WR) { + renew.writable = true; } - self.inner - .api - .modify(item.fd, id as u32, renew_ev, PollMode::Oneshot); + // handle HUP + if ev.is_interrupt() { + item.close(id as u32, &self.inner.api, None, false); + return; + } + + if !item.flags.contains(Flags::CLOSED | Flags::FAILED) { + self.inner + .api + .modify(item.fd, id as u32, renew, PollMode::Oneshot); + } // delayed drops if self.inner.delayd_drop.get() { @@ -190,14 +155,12 @@ impl Handler for StreamOpsHandler { if item.ref_count == 0 { let mut item = streams.remove(id as usize); log::debug!( - "{}: Drop ({}), {:?}, has-io: {}", + "{}: Drop ({:?}), flags: {:?}", item.tag(), - id, item.fd, - item.io.is_some() + item.flags ); - item.context.stopped(None); - close(id, &mut item, &self.inner.api); + item.close(id, &self.inner.api, None, true); } } self.inner.delayd_drop.set(false); @@ -215,18 +178,16 @@ impl Handler for StreamOpsHandler { item.fd, err ); - item.insert(Flags::ERR); - item.context.stopped(Some(err)); - close(id as u32, item, &self.inner.api); + item.close(id as u32, &self.inner.api, Some(err), false); } }) } } -impl StreamOpsInner { +impl StreamOpsInner { fn with(&self, f: F) -> R where - F: FnOnce(&mut Slab>) -> R, + F: FnOnce(&mut Slab) -> R, { let mut streams = self.streams.take().unwrap(); let result = f(&mut streams); @@ -235,110 +196,112 @@ impl StreamOpsInner { } } -fn read( - item: &StreamItem, - buf: &mut BytesVec, - hw: usize, - lw: usize, -) -> Poll> { - log::debug!( - "{}: reading fd ({:?}) flags: {:?}", - item.tag(), - item.fd, - item.context.flags() - ); - if item.contains(Flags::RDSH) { - return Poll::Ready(Ok(0)); +impl StreamItem { + fn tag(&self) -> &'static str { + self.context.tag() } - let mut total = 0; - loop { - // make sure we've got room - let remaining = buf.remaining_mut(); - if remaining < lw { - buf.reserve(hw - remaining); + fn read(&mut self) -> Poll<()> { + let mut flags = self.flags; + let result = self.context.with_read_buf(|buf, hw, lw| { + // prev call result is 0 + if flags.contains(Flags::RDSH) { + return Poll::Ready(Ok(0)); + } + + let mut total = 0; + loop { + // make sure we've got room + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + + let chunk = buf.chunk_mut(); + let chunk_len = chunk.len(); + let chunk_ptr = chunk.as_mut_ptr(); + + let result = + syscall!(break libc::read(self.fd, chunk_ptr as _, chunk.len())); + if let Poll::Ready(Ok(size)) = result { + unsafe { buf.advance_mut(size) }; + total += size; + if size == chunk_len { + continue; + } + } + + log::debug!( + "{}: read fd ({:?}), s: {:?}, cap: {:?}, result: {:?}", + self.tag(), + self.fd, + total, + buf.remaining_mut(), + result + ); + + return match result { + Poll::Ready(Err(err)) => { + flags.insert(Flags::FAILED); + if total > 0 { + self.context.stopped(Some(err)); + Poll::Ready(Ok(total)) + } else { + Poll::Ready(Err(err)) + } + } + Poll::Ready(Ok(size)) => { + if size == 0 { + flags.insert(Flags::RDSH); + } + Poll::Ready(Ok(total)) + } + Poll::Pending => { + if total > 0 { + Poll::Ready(Ok(total)) + } else { + Poll::Pending + } + } + }; + } + }); + self.flags = flags; + result + } + + fn close( + &mut self, + id: u32, + api: &DriverApi, + error: Option, + shutdown: bool, + ) -> Option>> { + if !self.flags.contains(Flags::CLOSED) { + log::debug!("{}: Closing ({}), {:?}", self.tag(), id, self.fd); + self.flags.insert(Flags::CLOSED); + self.context.stopped(error); + + let fd = self.fd; + api.detach(fd, id); + Some(ntex_rt::spawn_blocking(move || { + if shutdown { + let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR)); + } + syscall!(libc::close(fd)) + })) + } else { + None } - - let chunk = buf.chunk_mut(); - let chunk_len = chunk.len(); - let chunk_ptr = chunk.as_mut_ptr(); - - let result = syscall!(break libc::read(item.fd, chunk_ptr as _, chunk.len())); - if let Poll::Ready(Ok(size)) = result { - unsafe { buf.advance_mut(size) }; - total += size; - if size == chunk_len { - continue; - } - } - - log::debug!( - "{}: read fd ({:?}), s: {:?}, cap: {:?}, result: {:?}", - item.tag(), - item.fd, - total, - buf.remaining_mut(), - result - ); - - return match result { - Poll::Ready(Err(err)) => { - item.insert(Flags::ERR); - if total > 0 { - item.context.stopped(Some(err)); - Poll::Ready(Ok(total)) - } else { - Poll::Ready(Err(err)) - } - } - Poll::Ready(Ok(size)) => { - if size == 0 { - item.insert(Flags::RDSH); - item.context.stopped(None); - } - Poll::Ready(Ok(total)) - } - Poll::Pending => { - if total > 0 { - Poll::Ready(Ok(total)) - } else { - Poll::Pending - } - } - }; } } -fn close( - id: u32, - item: &mut StreamItem, - api: &DriverApi, -) -> Option>> { - if let Some(io) = item.io.take() { - log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); - mem::forget(io); - let fd = item.fd; - let shutdown = !item.flags.get().intersects(Flags::ERR | Flags::RDSH); - api.detach(fd, id); - Some(ntex_rt::spawn_blocking(move || { - if shutdown { - let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR)); - } - syscall!(libc::close(fd)) - })) - } else { - None - } -} - -impl StreamCtl { +impl StreamCtl { pub(crate) fn close(self) -> impl Future> { let id = self.id as usize; - let fut = self.inner.with(|streams| { - let item = &mut streams[id]; - item.context.stopped(None); - close(self.id, item, &self.inner.api) - }); + let fut = self + .inner + .with(|streams| streams[id].close(self.id, &self.inner.api, None, true)); async move { if let Some(fut) = fut { fut.await @@ -349,52 +312,42 @@ impl StreamCtl { } } - pub(crate) fn with_io(&self, f: F) -> R - where - F: FnOnce(Option<&T>) -> R, - { - self.inner - .with(|streams| f(streams[self.id as usize].io.as_ref())) - } - pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool { self.inner.with(|streams| { let item = &mut streams[self.id as usize]; - if item.io.is_none() || item.contains(Flags::ERR) { + if item.flags.contains(Flags::CLOSED) { return false; } log::debug!( - "{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}, flags: {:?}", + "{}: Modify interest ({:?}) rd: {:?}, wr: {:?}", item.tag(), - self.id, item.fd, rd, - wr, - item.flags + wr ); let mut changed = false; let mut event = Event::new(0, false, false).with_interrupt(); if rd { - if item.contains(Flags::RD) { + if item.flags.contains(Flags::RD) { event.readable = true; } else { - let res = item - .context - .with_read_buf(|buf, hw, lw| read(item, buf, hw, lw)); - + let res = item.read(); if res.is_pending() && item.context.is_read_ready() { changed = true; event.readable = true; - item.insert(Flags::RD); + item.flags.insert(Flags::RD); } } + } else if item.flags.contains(Flags::RD) { + changed = true; + item.flags.remove(Flags::RD); } if wr { - if item.contains(Flags::WR) { + if item.flags.contains(Flags::WR) { event.writable = true; } else { let result = item.context.with_write_buf(|buf| { @@ -412,12 +365,15 @@ impl StreamCtl { if result.is_pending() { changed = true; event.writable = true; - item.insert(Flags::WR); + item.flags.insert(Flags::WR); } } + } else if item.flags.contains(Flags::WR) { + changed = true; + item.flags.remove(Flags::WR); } - if changed { + if changed && !item.flags.contains(Flags::CLOSED | Flags::FAILED) { self.inner .api .modify(item.fd, self.id, event, PollMode::Oneshot); @@ -427,7 +383,7 @@ impl StreamCtl { } } -impl Clone for StreamCtl { +impl Clone for StreamCtl { fn clone(&self) -> Self { self.inner.with(|streams| { streams[self.id as usize].ref_count += 1; @@ -439,7 +395,7 @@ impl Clone for StreamCtl { } } -impl Drop for StreamCtl { +impl Drop for StreamCtl { fn drop(&mut self) { if let Some(mut streams) = self.inner.streams.take() { let id = self.id as usize; @@ -447,14 +403,12 @@ impl Drop for StreamCtl { if streams[id].ref_count == 0 { let mut item = streams.remove(id); log::debug!( - "{}: Drop io ({}), {:?}, has-io: {}", + "{}: Drop io ({:?}), flags: {:?}", item.tag(), - self.id, item.fd, - item.io.is_some() + item.flags ); - item.context.stopped(None); - close(self.id, &mut item, &self.inner.api); + item.close(self.id, &self.inner.api, None, true); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-net/src/rt_polling/io.rs b/ntex-net/src/rt_polling/io.rs index 254343e5..2cb57323 100644 --- a/ntex-net/src/rt_polling/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -1,4 +1,4 @@ -use std::{any, future::poll_fn, task::Poll}; +use std::{any, future::poll_fn, mem, os::fd::AsRawFd, task::Poll}; use ntex_io::{ types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, @@ -12,11 +12,10 @@ impl IoStream for super::TcpStream { fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; let context = read.context(); - let ctl = StreamOps::current().register(io, context.clone()); - let ctl2 = ctl.clone(); + let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone()); spawn(async move { run(ctl, context).await }); - Some(Box::new(HandleWrapper(ctl2))) + Some(Box::new(HandleWrapper(Some(io)))) } } @@ -24,19 +23,20 @@ impl IoStream for super::UnixStream { fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; let context = read.context(); - let ctl = StreamOps::current().register(io, context.clone()); + let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone()); spawn(async move { run(ctl, context).await }); + mem::forget(io); None } } -struct HandleWrapper(StreamCtl); +struct HandleWrapper(Option); impl Handle for HandleWrapper { fn query(&self, id: any::TypeId) -> Option> { if id == any::TypeId::of::() { - let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok())); + let addr = self.0.as_ref().unwrap().peer_addr().ok(); if let Some(addr) = addr.and_then(|addr| addr.as_socket()) { return Some(Box::new(types::PeerAddr(addr))); } @@ -45,13 +45,19 @@ impl Handle for HandleWrapper { } } +impl Drop for HandleWrapper { + fn drop(&mut self) { + mem::forget(self.0.take()); + } +} + #[derive(Copy, Clone, Debug, PartialEq, Eq)] enum Status { Shutdown, Terminate, } -async fn run(ctl: StreamCtl, context: IoContext) { +async fn run(ctl: StreamCtl, context: IoContext) { // Handle io read readiness let st = poll_fn(|cx| { let mut modify = false; @@ -98,8 +104,9 @@ async fn run(ctl: StreamCtl, context: IoContext) { .await; if st != Status::Terminate { - ctl.modify(false, true); - context.shutdown(st == Status::Shutdown).await; + if ctl.modify(false, true) { + context.shutdown(st == Status::Shutdown).await; + } } context.stopped(ctl.close().await.err()); } diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index c17a30d2..95f312b1 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -71,7 +71,7 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { #[doc(hidden)] /// Get number of active Io objects pub fn active_stream_ops() -> usize { - self::driver::StreamOps::::active_ops() + self::driver::StreamOps::active_ops() } #[cfg(all(target_os = "linux", feature = "neon"))] diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index 7115b9a7..2f76509c 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -124,6 +124,10 @@ impl StreamOps { } } + pub(crate) fn active_ops() -> usize { + Self::current().with(|st| st.streams.len()) + } + fn with(&self, f: F) -> R where F: FnOnce(&mut StreamOpsStorage) -> R, diff --git a/ntex-net/src/rt_uring/mod.rs b/ntex-net/src/rt_uring/mod.rs index 41016d09..6ae53b99 100644 --- a/ntex-net/src/rt_uring/mod.rs +++ b/ntex-net/src/rt_uring/mod.rs @@ -64,3 +64,9 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { Socket::from(stream), )?))) } + +#[doc(hidden)] +/// Get number of active Io objects +pub fn active_stream_ops() -> usize { + self::driver::StreamOps::::active_ops() +}