From e3f58cce277bc33c2567f6e87c2076f8e63a92a8 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 19 Mar 2025 21:13:39 +0100 Subject: [PATCH] Redesign neon poll support (#535) --- .github/workflows/cov.yml | 5 - .github/workflows/linux.yml | 5 - ntex-io/CHANGES.md | 4 + ntex-io/Cargo.toml | 2 +- ntex-io/src/tasks.rs | 67 +++----- ntex-net/CHANGES.md | 4 + ntex-net/Cargo.toml | 6 +- ntex-net/src/rt_polling/connect.rs | 57 +++---- ntex-net/src/rt_polling/driver.rs | 265 ++++++++++++++--------------- ntex-net/src/rt_polling/io.rs | 17 +- ntex/src/http/test.rs | 2 +- ntex/src/web/test.rs | 2 +- 12 files changed, 205 insertions(+), 231 deletions(-) diff --git a/.github/workflows/cov.yml b/.github/workflows/cov.yml index 10a7e518..c9f7a345 100644 --- a/.github/workflows/cov.yml +++ b/.github/workflows/cov.yml @@ -8,11 +8,6 @@ jobs: env: CARGO_TERM_COLOR: always steps: - - name: Free Disk Space - uses: jlumbroso/free-disk-space@main - with: - tool-cache: true - - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 5b8692f8..5297364c 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -16,11 +16,6 @@ jobs: runs-on: ubuntu-latest steps: - - name: Free Disk Space - uses: jlumbroso/free-disk-space@main - with: - tool-cache: true - - uses: actions/checkout@v4 - name: Install ${{ matrix.version }} diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index ff7201c9..c109a752 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.11.1] - 2025-03-20 + +* Add readiness check support + ## [2.11.0] - 2025-03-10 * Add single io context diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 40d4ed20..6a1e881d 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.11.0" +version = "2.11.1" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 9a4d6f94..4a04196b 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -722,28 +722,36 @@ impl IoContext { } /// Get read buffer - pub fn with_read_buf(&self, f: F) -> Poll<()> - where - F: FnOnce(&mut BytesVec) -> Poll>, - { - let result = self.with_read_buf_inner(f); - + pub fn is_read_ready(&self) -> bool { // check read readiness - if result.is_pending() { - if let Some(waker) = self.0 .0.read_task.take() { - let mut cx = Context::from_waker(&waker); + if let Some(waker) = self.0 .0.read_task.take() { + let mut cx = Context::from_waker(&waker); - if let Poll::Ready(ReadStatus::Ready) = - self.0.filter().poll_read_ready(&mut cx) - { - return Poll::Pending; - } + if let Poll::Ready(ReadStatus::Ready) = self.0.filter().poll_read_ready(&mut cx) + { + return true; } } - result + false } - fn with_read_buf_inner(&self, f: F) -> Poll<()> + pub fn is_write_ready(&self) -> bool { + if let Some(waker) = self.0 .0.write_task.take() { + let ready = self + .0 + .filter() + .poll_write_ready(&mut Context::from_waker(&waker)); + if !matches!( + ready, + Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) + ) { + return true; + } + } + false + } + + pub fn with_read_buf(&self, f: F) -> Poll<()> where F: FnOnce(&mut BytesVec) -> Poll>, { @@ -838,33 +846,8 @@ impl IoContext { } } - pub fn with_write_buf(&self, f: F) -> Poll<()> - where - F: FnOnce(&BytesVec) -> Poll>, - { - let result = self.with_write_buf_inner(f); - - // check write readiness - if result.is_pending() { - let inner = &self.0 .0; - if let Some(waker) = inner.write_task.take() { - let ready = self - .0 - .filter() - .poll_write_ready(&mut Context::from_waker(&waker)); - if !matches!( - ready, - Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) - ) { - return Poll::Ready(()); - } - } - } - result - } - /// Get write buffer - fn with_write_buf_inner(&self, f: F) -> Poll<()> + pub fn with_write_buf(&self, f: F) -> Poll<()> where F: FnOnce(&BytesVec) -> Poll>, { diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index a082b258..b4851692 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.6] - 2025-03-20 + +* Redesign neon poll support + ## [2.5.5] - 2025-03-17 * Add check for required io-uring opcodes diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 8f75d422..11d7f3bf 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.5" +version = "2.5.6" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -34,13 +34,13 @@ io-uring = ["ntex-neon/io-uring", "dep:io-uring"] ntex-service = "3.3" ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "2.11" +ntex-io = "2.11.1" ntex-rt = "0.4.25" ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.5", optional = true } +ntex-neon = { version = "0.1.6", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index 0208fc90..529c55b1 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -1,7 +1,7 @@ use std::os::fd::{AsRawFd, RawFd}; use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll}; -use ntex_neon::driver::{DriverApi, Handler, Interest}; +use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; use ntex_util::channel::oneshot::Sender; use slab::Slab; @@ -12,8 +12,7 @@ pub(crate) struct ConnectOps(Rc); #[derive(Debug)] enum Change { - Readable, - Writable, + Event(Event), Error(io::Error), } @@ -67,20 +66,15 @@ impl ConnectOps { let item = Item { fd, sender }; let id = self.0.connects.borrow_mut().insert(item); - self.0.api.register(fd, id, Interest::Writable); + self.0.api.attach(fd, id as u32, Some(Event::writable(0))); Ok(id) } } impl Handler for ConnectOpsBatcher { - fn readable(&mut self, id: usize) { + fn event(&mut self, id: usize, event: Event) { log::debug!("connect-fd is readable {:?}", id); - self.feed.push_back((id, Change::Readable)); - } - - fn writable(&mut self, id: usize) { - log::debug!("connect-fd is writable {:?}", id); - self.feed.push_back((id, Change::Writable)); + self.feed.push_back((id, Change::Event(event))); } fn error(&mut self, id: usize, err: io::Error) { @@ -99,32 +93,33 @@ impl Handler for ConnectOpsBatcher { if connects.contains(id) { let item = connects.remove(id); match change { - Change::Readable => unreachable!(), - Change::Writable => { - let mut err: libc::c_int = 0; - let mut err_len = - std::mem::size_of::() as libc::socklen_t; + Change::Event(event) => { + if event.writable { + let mut err: libc::c_int = 0; + let mut err_len = + std::mem::size_of::() as libc::socklen_t; - let res = syscall!(libc::getsockopt( - item.fd.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_ERROR, - &mut err as *mut _ as *mut _, - &mut err_len - )); + let res = syscall!(libc::getsockopt( + item.fd.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_ERROR, + &mut err as *mut _ as *mut _, + &mut err_len + )); - let res = if err == 0 { - res.map(|_| ()) - } else { - Err(io::Error::from_raw_os_error(err)) - }; + let res = if err == 0 { + res.map(|_| ()) + } else { + Err(io::Error::from_raw_os_error(err)) + }; - self.inner.api.unregister_all(item.fd); - let _ = item.sender.send(res); + self.inner.api.detach(item.fd, id as u32); + let _ = item.sender.send(res); + } } Change::Error(err) => { let _ = item.sender.send(Err(err)); - self.inner.api.unregister_all(item.fd); + self.inner.api.detach(item.fd, id as u32); } } } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 559e8023..6739a088 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,7 +1,7 @@ use std::os::fd::{AsRawFd, RawFd}; use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task}; -use ntex_neon::driver::{DriverApi, Handler, Interest}; +use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; use slab::Slab; @@ -9,7 +9,7 @@ use ntex_bytes::BufMut; use ntex_io::IoContext; pub(crate) struct StreamCtl { - id: usize, + id: u32, inner: Rc>, } @@ -24,8 +24,7 @@ pub(crate) struct StreamOps(Rc>); #[derive(Debug)] enum Change { - Readable, - Writable, + Event(Event), Error(io::Error), } @@ -36,7 +35,7 @@ struct StreamOpsHandler { struct StreamOpsInner { api: DriverApi, - feed: Cell>>, + feed: Cell>>, streams: Cell>>>>, } @@ -62,19 +61,23 @@ impl StreamOps { } pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { + let fd = io.as_raw_fd(); let item = StreamItem { + fd, context, - fd: io.as_raw_fd(), io: Some(io), ref_count: 1, }; - self.with(|streams| { - let id = streams.insert(item); + let stream = self.with(move |streams| { + let id = streams.insert(item) as u32; StreamCtl { id, inner: self.0.clone(), } - }) + }); + + self.0.api.attach(fd, stream.id, None); + stream } fn with(&self, f: F) -> R @@ -95,14 +98,9 @@ impl Clone for StreamOps { } impl Handler for StreamOpsHandler { - fn readable(&mut self, id: usize) { + fn event(&mut self, id: usize, event: Event) { log::debug!("FD is readable {:?}", id); - self.feed.push_back((id, Change::Readable)); - } - - fn writable(&mut self, id: usize) { - log::debug!("FD is writable {:?}", id); - self.feed.push_back((id, Change::Writable)); + self.feed.push_back((id, Change::Event(event))); } fn error(&mut self, id: usize, err: io::Error) { @@ -120,56 +118,75 @@ impl Handler for StreamOpsHandler { for (id, change) in self.feed.drain(..) { match change { - Change::Readable => { + Change::Event(ev) => { let item = &mut streams[id]; - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!( - break libc::read(item.fd, b as _, chunk.len()) - )) - .inspect(|size| { - unsafe { buf.advance_mut(*size) }; - log::debug!( - "{}: {:?}, SIZE: {:?}", - item.context.tag(), - item.fd, - size - ); - }), - ) - }); + let mut renew_ev = Event::new(0, false, false).with_interrupt(); + if ev.readable { + let result = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let b = chunk.as_mut_ptr(); + task::Poll::Ready( + task::ready!(syscall!( + break libc::read(item.fd, b as _, chunk.len()) + )) + .inspect(|size| { + unsafe { buf.advance_mut(*size) }; + log::debug!( + "{}: {:?}, SIZE: {:?}", + item.context.tag(), + item.fd, + size + ); + }), + ) + }); - if item.io.is_some() && result.is_pending() { - self.inner.api.register(item.fd, id, Interest::Readable); + if item.io.is_some() && result.is_pending() { + if item.context.is_read_ready() { + renew_ev.readable = true; + } + } } - } - Change::Writable => { - let item = &mut streams[id]; - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: writing {:?} SIZE: {:?}", - item.context.tag(), - item.fd, - buf.len() - ); - let slice = &buf[..]; - syscall!( - break libc::write(item.fd, slice.as_ptr() as _, slice.len()) - ) - }); + if ev.writable { + let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: writing {:?} SIZE: {:?}", + item.context.tag(), + item.fd, + buf.len() + ); + let slice = &buf[..]; + syscall!( + break libc::write( + item.fd, + slice.as_ptr() as _, + slice.len() + ) + ) + }); - if item.io.is_some() && result.is_pending() { - log::debug!("{}: want write {:?}", item.context.tag(), item.fd,); - self.inner.api.register(item.fd, id, Interest::Writable); + if item.io.is_some() && result.is_pending() { + if item.context.is_write_ready() { + renew_ev.writable = true; + } + } + } + + if ev.is_interrupt() { + item.context.stopped(None); + if let Some(_) = item.io.take() { + close(id as u32, item.fd, &self.inner.api); + } + continue; + } else { + self.inner.api.modify(item.fd, id as u32, renew_ev); } } Change::Error(err) => { if let Some(item) = streams.get_mut(id) { item.context.stopped(Some(err)); if let Some(_) = item.io.take() { - close(id, item.fd, &self.inner.api); + close(id as u32, item.fd, &self.inner.api); } } } @@ -179,10 +196,10 @@ impl Handler for StreamOpsHandler { // extra let mut feed = self.inner.feed.take().unwrap(); for id in feed.drain(..) { - let item = &mut streams[id]; + let item = &mut streams[id as usize]; item.ref_count -= 1; if item.ref_count == 0 { - let item = streams.remove(id); + let item = streams.remove(id as usize); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", item.context.tag(), @@ -201,8 +218,8 @@ impl Handler for StreamOpsHandler { } } -fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { - api.unregister_all(fd); +fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { + api.detach(fd, id); ntex_rt::spawn_blocking(move || { syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; syscall!(libc::close(fd)) @@ -211,10 +228,10 @@ fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle StreamCtl { pub(crate) fn close(self) -> impl Future> { - let (io, fd) = - self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd)); + let id = self.id as usize; + let (io, fd) = self.with(|streams| (streams[id].io.take(), streams[id].fd)); let fut = if let Some(io) = io { - log::debug!("Closing ({}), {:?}", self.id, fd); + log::debug!("Closing ({}), {:?}", id, fd); std::mem::forget(io); Some(close(self.id, fd, &self.inner.api)) } else { @@ -234,53 +251,32 @@ impl StreamCtl { where F: FnOnce(Option<&T>) -> R, { - self.with(|streams| f(streams[self.id].io.as_ref())) + self.with(|streams| f(streams[self.id as usize].io.as_ref())) } - pub(crate) fn pause_all(&self) { + pub(crate) fn modify(&self, readable: bool, writable: bool) { self.with(|streams| { - let item = &mut streams[self.id]; + let item = &mut streams[self.id as usize]; log::debug!( - "{}: Pause all io ({}), {:?}", + "{}: Modify interest ({}), {:?} read: {:?}, write: {:?}", item.context.tag(), self.id, - item.fd - ); - self.inner.api.unregister_all(item.fd); - }) - } - - pub(crate) fn pause_read(&self) { - self.with(|streams| { - let item = &mut streams[self.id]; - - log::debug!( - "{}: Pause io read ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); - self.inner.api.unregister(item.fd, Interest::Readable); - }) - } - - pub(crate) fn resume_read(&self) { - self.with(|streams| { - let item = &mut streams[self.id]; - - log::debug!( - "{}: Resume io read ({}), {:?}", - item.context.tag(), - self.id, - item.fd + item.fd, + readable, + writable ); - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len()))) + let mut event = Event::new(0, false, false).with_interrupt(); + + if readable { + let result = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let b = chunk.as_mut_ptr(); + task::Poll::Ready( + task::ready!(syscall!( + break libc::read(item.fd, b as _, chunk.len()) + )) .inspect(|size| { unsafe { buf.advance_mut(*size) }; log::debug!( @@ -290,45 +286,37 @@ impl StreamCtl { size ); }), - ) - }); + ) + }); - if item.io.is_some() && result.is_pending() { - self.inner - .api - .register(item.fd, self.id, Interest::Readable); + if item.io.is_some() && result.is_pending() { + if item.context.is_read_ready() { + event.readable = true; + } + } } - }) - } - pub(crate) fn resume_write(&self) { - self.with(|streams| { - let item = &mut streams[self.id]; + if writable { + let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: Writing io ({}), buf: {:?}", + item.context.tag(), + self.id, + buf.len() + ); - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: Writing io ({}), buf: {:?}", - item.context.tag(), - self.id, - buf.len() - ); + let slice = &buf[..]; + syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) + }); - let slice = &buf[..]; - syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) - }); - - if item.io.is_some() && result.is_pending() { - log::debug!( - "{}: Write is pending ({}), {:?}", - item.context.tag(), - self.id, - item.context.flags() - ); - - self.inner - .api - .register(item.fd, self.id, Interest::Writable); + if item.io.is_some() && result.is_pending() { + if item.context.is_write_ready() { + event.writable = true; + } + } } + + self.inner.api.modify(item.fd, self.id as u32, event); }) } @@ -346,7 +334,7 @@ impl StreamCtl { impl Clone for StreamCtl { fn clone(&self) -> Self { self.with(|streams| { - streams[self.id].ref_count += 1; + streams[self.id as usize].ref_count += 1; Self { id: self.id, inner: self.inner.clone(), @@ -358,9 +346,10 @@ impl Clone for StreamCtl { impl Drop for StreamCtl { fn drop(&mut self) { if let Some(mut streams) = self.inner.streams.take() { - streams[self.id].ref_count -= 1; - if streams[self.id].ref_count == 0 { - let item = streams.remove(self.id); + let id = self.id as usize; + streams[id].ref_count -= 1; + if streams[id].ref_count == 0 { + let item = streams.remove(id); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", item.context.tag(), diff --git a/ntex-net/src/rt_polling/io.rs b/ntex-net/src/rt_polling/io.rs index d6e3b4d9..990dae8f 100644 --- a/ntex-net/src/rt_polling/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -54,21 +54,26 @@ enum Status { async fn run(ctl: StreamCtl, context: IoContext) { // Handle io read readiness let st = poll_fn(|cx| { + let mut modify = false; + let mut readable = false; + let mut writable = false; let read = match context.poll_read_ready(cx) { Poll::Ready(ReadStatus::Ready) => { - ctl.resume_read(); + modify = true; + readable = true; Poll::Pending } Poll::Ready(ReadStatus::Terminate) => Poll::Ready(()), Poll::Pending => { - ctl.pause_read(); + modify = true; Poll::Pending } }; let write = match context.poll_write_ready(cx) { Poll::Ready(WriteStatus::Ready) => { - ctl.resume_write(); + modify = true; + writable = true; Poll::Pending } Poll::Ready(WriteStatus::Shutdown) => Poll::Ready(Status::Shutdown), @@ -76,6 +81,10 @@ async fn run(ctl: StreamCtl, context: IoContext) { Poll::Pending => Poll::Pending, }; + if modify { + ctl.modify(readable, writable); + } + if read.is_pending() && write.is_pending() { Poll::Pending } else if write.is_ready() { @@ -86,7 +95,7 @@ async fn run(ctl: StreamCtl, context: IoContext) { }) .await; - ctl.resume_write(); + ctl.modify(false, true); context.shutdown(st == Status::Shutdown).await; context.stopped(ctl.close().await.err()); } diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 60dfbf17..c9ecdb0b 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -246,7 +246,7 @@ where .run(); crate::rt::spawn(async move { - sleep(Millis(75)).await; + sleep(Millis(125)).await; tx.send((system, srv, local_addr)).unwrap(); }); Ok(()) diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index e977c3b9..1307ad9f 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -698,7 +698,7 @@ where .run(); crate::rt::spawn(async move { - sleep(Millis(75)).await; + sleep(Millis(125)).await; tx.send((System::current(), srv, local_addr)).unwrap(); }); Ok(())