diff --git a/Cargo.toml b/Cargo.toml index d9e97ef4..eed34f06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,12 @@ ntex-util = { path = "ntex-util" } ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } +ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } +#ntex-neon = { path = "../dev/neon" } + [workspace.dependencies] +ntex-polling = "3.7.4" + async-channel = "2" async-task = "4.5.0" atomic-waker = "1.1" diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index f55aa5d0..f7a54de3 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.11.1" +version = "2.11.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 498e249d..a99d0411 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -98,17 +98,19 @@ impl IoState { } pub(super) fn io_stopped(&self, err: Option) { - if err.is_some() { - self.error.set(err); + if !self.flags.get().contains(Flags::IO_STOPPED) { + if err.is_some() { + self.error.set(err); + } + self.read_task.wake(); + self.write_task.wake(); + self.dispatch_task.wake(); + self.notify_disconnect(); + self.handle.take(); + self.insert_flags( + Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS, + ); } - self.read_task.wake(); - self.write_task.wake(); - self.dispatch_task.wake(); - self.notify_disconnect(); - self.handle.take(); - self.insert_flags( - Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS, - ); } /// Gracefully shutdown read and write io tasks diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 55f99416..883ac7ee 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -537,9 +537,7 @@ impl IoContext { self.0.tag(), nbytes ); - if !inner.dispatch_task.wake_checked() { - log::error!("Dispatcher waker is not registered"); - } + inner.dispatch_task.wake(); } else { if nbytes >= hw { // read task is paused because of read back-pressure @@ -670,7 +668,6 @@ impl IoContext { // set buffer back let result = match result { Ok(0) => { - // log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size()); self.0.memory_pool().release_write_buf(buf); Ok(inner.buffer.write_destination_size()) } @@ -680,7 +677,6 @@ impl IoContext { self.0.memory_pool().release_write_buf(b); } let l = buf.len(); - // log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l); inner.buffer.set_write_destination(buf); Ok(l) } @@ -739,19 +735,11 @@ impl IoContext { pub fn with_read_buf(&self, f: F) -> Poll<()> where - F: FnOnce(&mut BytesVec) -> Poll>, + F: FnOnce(&mut BytesVec, usize, usize) -> Poll>, { let inner = &self.0 .0; let (hw, lw) = self.0.memory_pool().read_params().unpack(); - let result = inner.buffer.with_read_source(&self.0, |buf| { - // make sure we've got room - let remaining = buf.remaining_mut(); - if remaining < lw { - buf.reserve(hw - remaining); - } - - f(buf) - }); + let result = inner.buffer.with_read_source(&self.0, |buf| f(buf, hw, lw)); // handle buffer changes match result { @@ -789,9 +777,7 @@ impl IoContext { self.0.tag(), nbytes ); - if !inner.dispatch_task.wake_checked() { - log::error!("Dispatcher waker is not registered"); - } + inner.dispatch_task.wake(); } else { if nbytes >= hw { // read task is paused because of read back-pressure diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index e60744ef..f6600129 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.5.11] - 2025-04-01 + +* Use edge mode for polling driver + +* Use polling fork + ## [2.5.10] - 2025-03-28 * Better closed sockets handling diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 5a72d3eb..0174ce5e 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.10" +version = "2.5.11" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -27,20 +27,20 @@ compio = ["ntex-rt/compio", "ntex-compio"] # neon runtime neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"] -polling = ["ntex-neon/polling", "dep:polling", "socket2"] io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"] +ntex-polling = ["ntex-neon/ntex-polling", "dep:ntex-polling", "socket2"] [dependencies] ntex-service = "3.3" ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "2.11.1" +ntex-io = "2.11.2" ntex-rt = "0.4.25" ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.15", optional = true } +ntex-neon = { version = "0.1.16", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } @@ -53,7 +53,8 @@ thiserror = { workspace = true } # Linux specific dependencies [target.'cfg(target_os = "linux")'.dependencies] io-uring = { workspace = true, optional = true } -polling = { workspace = true, optional = true } +ntex-polling = { workspace = true, optional = true } [dev-dependencies] ntex = "2" +oneshot = "0.1" diff --git a/ntex-net/src/lib.rs b/ntex-net/src/lib.rs index ddc272bb..9d2a4387 100644 --- a/ntex-net/src/lib.rs +++ b/ntex-net/src/lib.rs @@ -14,14 +14,14 @@ cfg_if::cfg_if! { mod rt_impl; pub use self::rt_impl::{ from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, - unix_connect_in, + unix_connect_in, active_stream_ops }; } else if #[cfg(all(unix, feature = "neon"))] { #[path = "rt_polling/mod.rs"] mod rt_impl; pub use self::rt_impl::{ from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect, - unix_connect_in, + unix_connect_in, active_stream_ops }; } else { pub use self::compat::*; diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index 8f0f1dc9..3123fd16 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -1,7 +1,7 @@ use std::os::fd::{AsRawFd, RawFd}; use std::{cell::RefCell, io, rc::Rc, task::Poll}; -use ntex_neon::driver::{DriverApi, Event, Handler}; +use ntex_neon::driver::{DriverApi, Event, Handler, PollMode}; use ntex_neon::{syscall, Runtime}; use ntex_util::channel::oneshot::Sender; use slab::Slab; @@ -34,7 +34,7 @@ impl ConnectOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { let ops = Rc::new(ConnectOpsInner { api, connects: RefCell::new(Slab::new()), @@ -62,7 +62,9 @@ impl ConnectOps { let item = Item { fd, sender }; let id = self.0.connects.borrow_mut().insert(item); - self.0.api.attach(fd, id as u32, Some(Event::writable(0))); + self.0 + .api + .attach(fd, id as u32, Event::writable(0), PollMode::Oneshot); Ok(id) } } @@ -72,7 +74,6 @@ impl Handler for ConnectOpsBatcher { log::debug!("connect-fd is readable {:?}", id); let mut connects = self.inner.connects.borrow_mut(); - if connects.contains(id) { let item = connects.remove(id); if event.writable { diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 24db553d..22c04f50 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,16 +1,16 @@ -use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task, task::Poll}; +use std::os::fd::RawFd; +use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task::Poll}; -use ntex_neon::driver::{DriverApi, Event, Handler}; +use ntex_neon::driver::{DriverApi, Event, Handler, PollMode}; use ntex_neon::{syscall, Runtime}; use slab::Slab; use ntex_bytes::BufMut; use ntex_io::IoContext; -pub(crate) struct StreamCtl { +pub(crate) struct StreamCtl { id: u32, - inner: Rc>, + inner: Rc, } bitflags::bitflags! { @@ -18,41 +18,37 @@ bitflags::bitflags! { struct Flags: u8 { const RD = 0b0000_0001; const WR = 0b0000_0010; + const RDSH = 0b0000_0100; + const FAILED = 0b0000_1000; + const CLOSED = 0b0001_0000; } } -struct StreamItem { - io: Option, +struct StreamItem { fd: RawFd, flags: Flags, ref_count: u16, context: IoContext, } -pub(crate) struct StreamOps(Rc>); +pub(crate) struct StreamOps(Rc); -struct StreamOpsHandler { - inner: Rc>, +struct StreamOpsHandler { + inner: Rc, } -struct StreamOpsInner { +struct StreamOpsInner { api: DriverApi, delayd_drop: Cell, feed: RefCell>, - streams: Cell>>>>, + streams: Cell>>>, } -impl StreamItem { - fn tag(&self) -> &'static str { - self.context.tag() - } -} - -impl StreamOps { +impl StreamOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { let ops = Rc::new(StreamOpsInner { api, feed: RefCell::new(Vec::new()), @@ -67,13 +63,15 @@ impl StreamOps { }) } - pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { - let fd = io.as_raw_fd(); + pub(crate) fn active_ops() -> usize { + Self::current().0.with(|streams| streams.len()) + } + + pub(crate) fn register(&self, fd: RawFd, context: IoContext) -> StreamCtl { let stream = self.0.with(move |streams| { let item = StreamItem { fd, context, - io: Some(io), ref_count: 1, flags: Flags::empty(), }; @@ -86,60 +84,40 @@ impl StreamOps { self.0.api.attach( fd, stream.id, - Some(Event::new(0, false, false).with_interrupt()), + Event::new(0, false, false).with_interrupt(), + PollMode::Oneshot, ); stream } } -impl Clone for StreamOps { +impl Clone for StreamOps { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl Handler for StreamOpsHandler { +impl Handler for StreamOpsHandler { fn event(&mut self, id: usize, ev: Event) { self.inner.with(|streams| { if !streams.contains(id) { return; } let item = &mut streams[id]; - if item.io.is_none() { - return; - } + log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - // handle HUP - if ev.is_interrupt() { - item.context.stopped(None); - close(id as u32, item, &self.inner.api, None, true); - return; - } - - let mut renew_ev = Event::new(0, false, false).with_interrupt(); - + let mut renew = Event::new(0, false, false).with_interrupt(); if ev.readable { - let res = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let result = task::ready!(syscall!( - break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) - )); - if let Ok(size) = result { - log::debug!("{}: data {:?}, s: {:?}", item.tag(), item.fd, size); - unsafe { buf.advance_mut(size) }; - } - Poll::Ready(result) - }); - + let res = item.read(); if res.is_pending() && item.context.is_read_ready() { - renew_ev.readable = true; + renew.readable = true; item.flags.insert(Flags::RD); } else { item.flags.remove(Flags::RD); } } else if item.flags.contains(Flags::RD) { - renew_ev.readable = true; + renew.readable = true; } if ev.writable { @@ -148,16 +126,26 @@ impl Handler for StreamOpsHandler { syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len())) }); if result.is_pending() { - renew_ev.writable = true; + renew.writable = true; item.flags.insert(Flags::WR); } else { item.flags.remove(Flags::WR); } } else if item.flags.contains(Flags::WR) { - renew_ev.writable = true; + renew.writable = true; } - self.inner.api.modify(item.fd, id as u32, renew_ev); + // handle HUP + if ev.is_interrupt() { + item.close(id as u32, &self.inner.api, None, false); + return; + } + + if !item.flags.contains(Flags::CLOSED | Flags::FAILED) { + self.inner + .api + .modify(item.fd, id as u32, renew, PollMode::Oneshot); + } // delayed drops if self.inner.delayd_drop.get() { @@ -167,13 +155,12 @@ impl Handler for StreamOpsHandler { if item.ref_count == 0 { let mut item = streams.remove(id as usize); log::debug!( - "{}: Drop ({}), {:?}, has-io: {}", + "{}: Drop ({:?}), flags: {:?}", item.tag(), - id, item.fd, - item.io.is_some() + item.flags ); - close(id, &mut item, &self.inner.api, None, true); + item.close(id, &self.inner.api, None, true); } } self.inner.delayd_drop.set(false); @@ -191,16 +178,16 @@ impl Handler for StreamOpsHandler { item.fd, err ); - close(id as u32, item, &self.inner.api, Some(err), false); + item.close(id as u32, &self.inner.api, Some(err), false); } }) } } -impl StreamOpsInner { +impl StreamOpsInner { fn with(&self, f: F) -> R where - F: FnOnce(&mut Slab>) -> R, + F: FnOnce(&mut Slab) -> R, { let mut streams = self.streams.take().unwrap(); let result = f(&mut streams); @@ -209,39 +196,112 @@ impl StreamOpsInner { } } -fn close( - id: u32, - item: &mut StreamItem, - api: &DriverApi, - error: Option, - shutdown: bool, -) -> Option>> { - if let Some(io) = item.io.take() { - log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); - mem::forget(io); - if let Some(err) = error { - item.context.stopped(Some(err)); - } - let fd = item.fd; - api.detach(fd, id); - Some(ntex_rt::spawn_blocking(move || { - if shutdown { - let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR)); +impl StreamItem { + fn tag(&self) -> &'static str { + self.context.tag() + } + + fn read(&mut self) -> Poll<()> { + let mut flags = self.flags; + let result = self.context.with_read_buf(|buf, hw, lw| { + // prev call result is 0 + if flags.contains(Flags::RDSH) { + return Poll::Ready(Ok(0)); } - syscall!(libc::close(fd)) - })) - } else { - None + + let mut total = 0; + loop { + // make sure we've got room + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + + let chunk = buf.chunk_mut(); + let chunk_len = chunk.len(); + let chunk_ptr = chunk.as_mut_ptr(); + + let result = + syscall!(break libc::read(self.fd, chunk_ptr as _, chunk.len())); + if let Poll::Ready(Ok(size)) = result { + unsafe { buf.advance_mut(size) }; + total += size; + if size == chunk_len { + continue; + } + } + + log::debug!( + "{}: read fd ({:?}), s: {:?}, cap: {:?}, result: {:?}", + self.tag(), + self.fd, + total, + buf.remaining_mut(), + result + ); + + return match result { + Poll::Ready(Err(err)) => { + flags.insert(Flags::FAILED); + if total > 0 { + self.context.stopped(Some(err)); + Poll::Ready(Ok(total)) + } else { + Poll::Ready(Err(err)) + } + } + Poll::Ready(Ok(size)) => { + if size == 0 { + flags.insert(Flags::RDSH); + } + Poll::Ready(Ok(total)) + } + Poll::Pending => { + if total > 0 { + Poll::Ready(Ok(total)) + } else { + Poll::Pending + } + } + }; + } + }); + self.flags = flags; + result + } + + fn close( + &mut self, + id: u32, + api: &DriverApi, + error: Option, + shutdown: bool, + ) -> Option>> { + if !self.flags.contains(Flags::CLOSED) { + log::debug!("{}: Closing ({}), {:?}", self.tag(), id, self.fd); + self.flags.insert(Flags::CLOSED); + self.context.stopped(error); + + let fd = self.fd; + api.detach(fd, id); + Some(ntex_rt::spawn_blocking(move || { + if shutdown { + let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR)); + } + syscall!(libc::close(fd)) + })) + } else { + None + } } } -impl StreamCtl { +impl StreamCtl { pub(crate) fn close(self) -> impl Future> { let id = self.id as usize; - let fut = self.inner.with(|streams| { - let item = &mut streams[id]; - close(self.id, item, &self.inner.api, None, false) - }); + let fut = self + .inner + .with(|streams| streams[id].close(self.id, &self.inner.api, None, true)); async move { if let Some(fut) = fut { fut.await @@ -252,55 +312,38 @@ impl StreamCtl { } } - pub(crate) fn with_io(&self, f: F) -> R - where - F: FnOnce(Option<&T>) -> R, - { - self.inner - .with(|streams| f(streams[self.id as usize].io.as_ref())) - } - - pub(crate) fn modify(&self, rd: bool, wr: bool) { + pub(crate) fn modify(&self, rd: bool, wr: bool) -> bool { self.inner.with(|streams| { let item = &mut streams[self.id as usize]; + if item.flags.contains(Flags::CLOSED) { + return false; + } log::debug!( - "{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}", + "{}: Modify interest ({:?}) rd: {:?}, wr: {:?}", item.tag(), - self.id, item.fd, rd, wr ); + let mut changed = false; let mut event = Event::new(0, false, false).with_interrupt(); if rd { if item.flags.contains(Flags::RD) { event.readable = true; } else { - let res = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let result = task::ready!(syscall!( - break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) - )); - if let Ok(size) = result { - log::debug!( - "{}: read {:?}, s: {:?}", - item.tag(), - item.fd, - size - ); - unsafe { buf.advance_mut(size) }; - } - Poll::Ready(result) - }); - + let res = item.read(); if res.is_pending() && item.context.is_read_ready() { + changed = true; event.readable = true; item.flags.insert(Flags::RD); } } + } else if item.flags.contains(Flags::RD) { + changed = true; + item.flags.remove(Flags::RD); } if wr { @@ -320,18 +363,27 @@ impl StreamCtl { }); if result.is_pending() { + changed = true; event.writable = true; item.flags.insert(Flags::WR); } } + } else if item.flags.contains(Flags::WR) { + changed = true; + item.flags.remove(Flags::WR); } - self.inner.api.modify(item.fd, self.id, event); + if changed && !item.flags.contains(Flags::CLOSED | Flags::FAILED) { + self.inner + .api + .modify(item.fd, self.id, event, PollMode::Oneshot); + } + true }) } } -impl Clone for StreamCtl { +impl Clone for StreamCtl { fn clone(&self) -> Self { self.inner.with(|streams| { streams[self.id as usize].ref_count += 1; @@ -343,7 +395,7 @@ impl Clone for StreamCtl { } } -impl Drop for StreamCtl { +impl Drop for StreamCtl { fn drop(&mut self) { if let Some(mut streams) = self.inner.streams.take() { let id = self.id as usize; @@ -351,13 +403,12 @@ impl Drop for StreamCtl { if streams[id].ref_count == 0 { let mut item = streams.remove(id); log::debug!( - "{}: Drop io ({}), {:?}, has-io: {}", + "{}: Drop io ({:?}), flags: {:?}", item.tag(), - self.id, item.fd, - item.io.is_some() + item.flags ); - close(self.id, &mut item, &self.inner.api, None, true); + item.close(self.id, &self.inner.api, None, true); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-net/src/rt_polling/io.rs b/ntex-net/src/rt_polling/io.rs index 990dae8f..2cb57323 100644 --- a/ntex-net/src/rt_polling/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -1,4 +1,4 @@ -use std::{any, future::poll_fn, task::Poll}; +use std::{any, future::poll_fn, mem, os::fd::AsRawFd, task::Poll}; use ntex_io::{ types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, @@ -12,11 +12,10 @@ impl IoStream for super::TcpStream { fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; let context = read.context(); - let ctl = StreamOps::current().register(io, context.clone()); - let ctl2 = ctl.clone(); + let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone()); spawn(async move { run(ctl, context).await }); - Some(Box::new(HandleWrapper(ctl2))) + Some(Box::new(HandleWrapper(Some(io)))) } } @@ -24,19 +23,20 @@ impl IoStream for super::UnixStream { fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; let context = read.context(); - let ctl = StreamOps::current().register(io, context.clone()); + let ctl = StreamOps::current().register(io.as_raw_fd(), context.clone()); spawn(async move { run(ctl, context).await }); + mem::forget(io); None } } -struct HandleWrapper(StreamCtl); +struct HandleWrapper(Option); impl Handle for HandleWrapper { fn query(&self, id: any::TypeId) -> Option> { if id == any::TypeId::of::() { - let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok())); + let addr = self.0.as_ref().unwrap().peer_addr().ok(); if let Some(addr) = addr.and_then(|addr| addr.as_socket()) { return Some(Box::new(types::PeerAddr(addr))); } @@ -45,13 +45,19 @@ impl Handle for HandleWrapper { } } +impl Drop for HandleWrapper { + fn drop(&mut self) { + mem::forget(self.0.take()); + } +} + #[derive(Copy, Clone, Debug, PartialEq, Eq)] enum Status { Shutdown, Terminate, } -async fn run(ctl: StreamCtl, context: IoContext) { +async fn run(ctl: StreamCtl, context: IoContext) { // Handle io read readiness let st = poll_fn(|cx| { let mut modify = false; @@ -82,7 +88,9 @@ async fn run(ctl: StreamCtl, context: IoContext) { }; if modify { - ctl.modify(readable, writable); + if !ctl.modify(readable, writable) { + return Poll::Ready(Status::Terminate); + } } if read.is_pending() && write.is_pending() { @@ -95,7 +103,10 @@ async fn run(ctl: StreamCtl, context: IoContext) { }) .await; - ctl.modify(false, true); - context.shutdown(st == Status::Shutdown).await; + if st != Status::Terminate { + if ctl.modify(false, true) { + context.shutdown(st == Status::Shutdown).await; + } + } context.stopped(ctl.close().await.err()); } diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index b4fb928b..95f312b1 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -67,3 +67,68 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { Socket::from(stream), )?))) } + +#[doc(hidden)] +/// Get number of active Io objects +pub fn active_stream_ops() -> usize { + self::driver::StreamOps::active_ops() +} + +#[cfg(all(target_os = "linux", feature = "neon"))] +#[cfg(test)] +mod tests { + use ntex::{io::Io, time::sleep, time::Millis, util::PoolId}; + use std::sync::{Arc, Mutex}; + + use crate::connect::Connect; + + const DATA: &[u8] = b"Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World \ + Hello World Hello World Hello World Hello World Hello World"; + + #[ntex::test] + async fn idle_disconnect() { + PoolId::P5.set_read_params(24, 12); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); + + let server = ntex::server::test_server(move || { + let tx = tx.clone(); + ntex_service::fn_service(move |io: Io<_>| { + tx.lock().unwrap().take().unwrap().send(()).unwrap(); + + async move { + io.write(DATA).unwrap(); + sleep(Millis(250)).await; + io.close(); + Ok::<_, ()>(()) + } + }) + }); + + let msg = Connect::new(server.addr()); + let io = crate::connect::connect(msg).await.unwrap(); + io.set_memory_pool(PoolId::P5.into()); + rx.await.unwrap(); + + io.on_disconnect().await; + } +} diff --git a/ntex-net/src/rt_uring/connect.rs b/ntex-net/src/rt_uring/connect.rs index ea9be3e1..2e86eb29 100644 --- a/ntex-net/src/rt_uring/connect.rs +++ b/ntex-net/src/rt_uring/connect.rs @@ -31,7 +31,7 @@ impl ConnectOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { if !api.is_supported(opcode::Connect::CODE) { panic!("opcode::Connect is required for io-uring support"); } diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index d39d69e8..2f76509c 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -77,7 +77,7 @@ impl StreamOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { let mut inner = None; - rt.driver().register(|api| { + rt.register_handler(|api| { if !api.is_supported(opcode::Recv::CODE) { panic!("opcode::Recv is required for io-uring support"); } @@ -124,6 +124,10 @@ impl StreamOps { } } + pub(crate) fn active_ops() -> usize { + Self::current().with(|st| st.streams.len()) + } + fn with(&self, f: F) -> R where F: FnOnce(&mut StreamOpsStorage) -> R, diff --git a/ntex-net/src/rt_uring/mod.rs b/ntex-net/src/rt_uring/mod.rs index 41016d09..6ae53b99 100644 --- a/ntex-net/src/rt_uring/mod.rs +++ b/ntex-net/src/rt_uring/mod.rs @@ -64,3 +64,9 @@ pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { Socket::from(stream), )?))) } + +#[doc(hidden)] +/// Get number of active Io objects +pub fn active_stream_ops() -> usize { + self::driver::StreamOps::::active_ops() +} diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index a5966d76..4e09c6da 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -42,4 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } -ntex-neon = { version = "0.1.14", optional = true } +ntex-neon = { version = "0.1.16", optional = true } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index d5d85546..692e7904 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -265,7 +265,7 @@ mod neon { let rt = Runtime::new().unwrap(); log::info!( "Starting neon runtime, driver {:?}", - rt.driver().tp().name() + rt.driver_type().name() ); rt.block_on(fut); diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 0e4a6559..b3ecee4d 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -252,8 +252,6 @@ where Ok(()) }) }); - thread::sleep(std::time::Duration::from_millis(150)); - let (system, server, addr) = rx.recv().unwrap(); TestServer { diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index 5ecd51b7..523232a8 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -1,9 +1,8 @@ use std::{io, rc::Rc}; -use ntex::codec::BytesCodec; -use ntex::connect::Connect; use ntex::io::{types::PeerAddr, Io}; use ntex::service::{chain_factory, fn_service, Pipeline, ServiceFactory}; +use ntex::{codec::BytesCodec, connect::Connect}; use ntex::{server::build_test_server, server::test_server, time, util::Bytes}; #[cfg(feature = "rustls")] diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index bd4c7e0a..e80644e9 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -682,15 +682,18 @@ async fn client_read_until_eof() { for stream in lst.incoming() { if let Ok(mut stream) = stream { let mut b = [0; 1000]; - let _ = stream.read(&mut b).unwrap(); - let _ = stream + log::debug!("Reading request"); + let res = stream.read(&mut b).unwrap(); + log::debug!("Read {:?}", res); + let res = stream .write_all(b"HTTP/1.0 200 OK\r\nconnection: close\r\n\r\nwelcome!"); + log::debug!("Sent {:?}", res); } else { break; } } }); - sleep(Millis(300)).await; + sleep(Millis(500)).await; // client request let req = Client::build()