From 3b58f5a1112d5927e3f050caa50c4381b53fcc68 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 18 Mar 2025 05:50:28 +0100 Subject: [PATCH 01/20] Add delay for test server availability, could cause connect race (#533) --- ntex-server/src/net/test.rs | 12 +++++++----- ntex/CHANGES.md | 4 ++++ ntex/src/http/test.rs | 8 ++++++-- ntex/src/web/test.rs | 5 ++++- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/ntex-server/src/net/test.rs b/ntex-server/src/net/test.rs index 2ddae445..1c78f5c5 100644 --- a/ntex-server/src/net/test.rs +++ b/ntex-server/src/net/test.rs @@ -59,17 +59,19 @@ where .workers(1) .disable_signals() .run(); - tx.send((system, local_addr, server)) - .expect("Failed to send Server to TestServer"); + + ntex_rt::spawn(async move { + ntex_util::time::sleep(ntex_util::time::Millis(75)).await; + tx.send((system, local_addr, server)) + .expect("Failed to send Server to TestServer"); + }); + Ok(()) }) }); let (system, addr, server) = rx.recv().unwrap(); - // wait for server - thread::sleep(std::time::Duration::from_millis(50)); - TestServer { addr, server, diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 4a665b57..2e6b077e 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.12.3] - 2025-03-xx + +* http: Add delay for test server availability, could cause connect race + ## [2.12.2] - 2025-03-15 * http: Allow to run publish future to completion in case error diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index cff51af4..60dfbf17 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -11,7 +11,7 @@ use crate::server::Server; use crate::service::ServiceFactory; #[cfg(feature = "ws")] use crate::ws::{error::WsClientError, WsClient, WsConnection}; -use crate::{rt::System, time::Millis, time::Seconds, util::Bytes}; +use crate::{rt::System, time::sleep, time::Millis, time::Seconds, util::Bytes}; use super::client::{Client, ClientRequest, ClientResponse, Connector}; use super::error::{HttpError, PayloadError}; @@ -244,7 +244,11 @@ where .workers(1) .disable_signals() .run(); - tx.send((system, srv, local_addr)).unwrap(); + + crate::rt::spawn(async move { + sleep(Millis(75)).await; + tx.send((system, srv, local_addr)).unwrap(); + }); Ok(()) }) }); diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 6a0bcabc..e977c3b9 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -697,7 +697,10 @@ where .set_tag("test", "WEB-SRV") .run(); - tx.send((System::current(), srv, local_addr)).unwrap(); + crate::rt::spawn(async move { + sleep(Millis(75)).await; + tx.send((System::current(), srv, local_addr)).unwrap(); + }); Ok(()) }) }); From e904cf85f1bb45868de7fa63e788d1441183e480 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 18 Mar 2025 06:10:52 +0100 Subject: [PATCH 02/20] Fix tls examples --- ntex-tls/examples/rustls-server.rs | 5 ++--- ntex-tls/examples/webserver.rs | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/ntex-tls/examples/rustls-server.rs b/ntex-tls/examples/rustls-server.rs index 445cffec..a80b25e2 100644 --- a/ntex-tls/examples/rustls-server.rs +++ b/ntex-tls/examples/rustls-server.rs @@ -13,9 +13,8 @@ async fn main() -> io::Result<()> { println!("Started openssl echp server: 127.0.0.1:8443"); // load ssl keys - let cert_file = - &mut BufReader::new(File::open("../ntex-tls/examples/cert.pem").unwrap()); - let key_file = &mut BufReader::new(File::open("../ntex-tls/examples/key.pem").unwrap()); + let cert_file = &mut BufReader::new(File::open("../examples/cert.pem").unwrap()); + let key_file = &mut BufReader::new(File::open("../examples/key.pem").unwrap()); let keys = rustls_pemfile::private_key(key_file).unwrap().unwrap(); let cert_chain = rustls_pemfile::certs(cert_file) .collect::, _>>() diff --git a/ntex-tls/examples/webserver.rs b/ntex-tls/examples/webserver.rs index 52867a6b..9398708e 100644 --- a/ntex-tls/examples/webserver.rs +++ b/ntex-tls/examples/webserver.rs @@ -8,18 +8,18 @@ use tls_openssl::ssl::{self, SslFiletype, SslMethod}; #[ntex::main] async fn main() -> io::Result<()> { - //std::env::set_var("RUST_LOG", "trace"); - //env_logger::init(); + std::env::set_var("RUST_LOG", "trace"); + let _ = env_logger::try_init(); println!("Started openssl web server: 127.0.0.1:8443"); // load ssl keys let mut builder = ssl::SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); builder - .set_private_key_file("../tests/key.pem", SslFiletype::PEM) + .set_private_key_file("./examples/key.pem", SslFiletype::PEM) .unwrap(); builder - .set_certificate_chain_file("../tests/cert.pem") + .set_certificate_chain_file("./examples/cert.pem") .unwrap(); // h2 alpn config From e3f58cce277bc33c2567f6e87c2076f8e63a92a8 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 19 Mar 2025 21:13:39 +0100 Subject: [PATCH 03/20] Redesign neon poll support (#535) --- .github/workflows/cov.yml | 5 - .github/workflows/linux.yml | 5 - ntex-io/CHANGES.md | 4 + ntex-io/Cargo.toml | 2 +- ntex-io/src/tasks.rs | 67 +++----- ntex-net/CHANGES.md | 4 + ntex-net/Cargo.toml | 6 +- ntex-net/src/rt_polling/connect.rs | 57 +++---- ntex-net/src/rt_polling/driver.rs | 265 ++++++++++++++--------------- ntex-net/src/rt_polling/io.rs | 17 +- ntex/src/http/test.rs | 2 +- ntex/src/web/test.rs | 2 +- 12 files changed, 205 insertions(+), 231 deletions(-) diff --git a/.github/workflows/cov.yml b/.github/workflows/cov.yml index 10a7e518..c9f7a345 100644 --- a/.github/workflows/cov.yml +++ b/.github/workflows/cov.yml @@ -8,11 +8,6 @@ jobs: env: CARGO_TERM_COLOR: always steps: - - name: Free Disk Space - uses: jlumbroso/free-disk-space@main - with: - tool-cache: true - - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 5b8692f8..5297364c 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -16,11 +16,6 @@ jobs: runs-on: ubuntu-latest steps: - - name: Free Disk Space - uses: jlumbroso/free-disk-space@main - with: - tool-cache: true - - uses: actions/checkout@v4 - name: Install ${{ matrix.version }} diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index ff7201c9..c109a752 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.11.1] - 2025-03-20 + +* Add readiness check support + ## [2.11.0] - 2025-03-10 * Add single io context diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 40d4ed20..6a1e881d 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.11.0" +version = "2.11.1" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 9a4d6f94..4a04196b 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -722,28 +722,36 @@ impl IoContext { } /// Get read buffer - pub fn with_read_buf(&self, f: F) -> Poll<()> - where - F: FnOnce(&mut BytesVec) -> Poll>, - { - let result = self.with_read_buf_inner(f); - + pub fn is_read_ready(&self) -> bool { // check read readiness - if result.is_pending() { - if let Some(waker) = self.0 .0.read_task.take() { - let mut cx = Context::from_waker(&waker); + if let Some(waker) = self.0 .0.read_task.take() { + let mut cx = Context::from_waker(&waker); - if let Poll::Ready(ReadStatus::Ready) = - self.0.filter().poll_read_ready(&mut cx) - { - return Poll::Pending; - } + if let Poll::Ready(ReadStatus::Ready) = self.0.filter().poll_read_ready(&mut cx) + { + return true; } } - result + false } - fn with_read_buf_inner(&self, f: F) -> Poll<()> + pub fn is_write_ready(&self) -> bool { + if let Some(waker) = self.0 .0.write_task.take() { + let ready = self + .0 + .filter() + .poll_write_ready(&mut Context::from_waker(&waker)); + if !matches!( + ready, + Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) + ) { + return true; + } + } + false + } + + pub fn with_read_buf(&self, f: F) -> Poll<()> where F: FnOnce(&mut BytesVec) -> Poll>, { @@ -838,33 +846,8 @@ impl IoContext { } } - pub fn with_write_buf(&self, f: F) -> Poll<()> - where - F: FnOnce(&BytesVec) -> Poll>, - { - let result = self.with_write_buf_inner(f); - - // check write readiness - if result.is_pending() { - let inner = &self.0 .0; - if let Some(waker) = inner.write_task.take() { - let ready = self - .0 - .filter() - .poll_write_ready(&mut Context::from_waker(&waker)); - if !matches!( - ready, - Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) - ) { - return Poll::Ready(()); - } - } - } - result - } - /// Get write buffer - fn with_write_buf_inner(&self, f: F) -> Poll<()> + pub fn with_write_buf(&self, f: F) -> Poll<()> where F: FnOnce(&BytesVec) -> Poll>, { diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index a082b258..b4851692 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.6] - 2025-03-20 + +* Redesign neon poll support + ## [2.5.5] - 2025-03-17 * Add check for required io-uring opcodes diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 8f75d422..11d7f3bf 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.5" +version = "2.5.6" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -34,13 +34,13 @@ io-uring = ["ntex-neon/io-uring", "dep:io-uring"] ntex-service = "3.3" ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "2.11" +ntex-io = "2.11.1" ntex-rt = "0.4.25" ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.5", optional = true } +ntex-neon = { version = "0.1.6", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index 0208fc90..529c55b1 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -1,7 +1,7 @@ use std::os::fd::{AsRawFd, RawFd}; use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll}; -use ntex_neon::driver::{DriverApi, Handler, Interest}; +use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; use ntex_util::channel::oneshot::Sender; use slab::Slab; @@ -12,8 +12,7 @@ pub(crate) struct ConnectOps(Rc); #[derive(Debug)] enum Change { - Readable, - Writable, + Event(Event), Error(io::Error), } @@ -67,20 +66,15 @@ impl ConnectOps { let item = Item { fd, sender }; let id = self.0.connects.borrow_mut().insert(item); - self.0.api.register(fd, id, Interest::Writable); + self.0.api.attach(fd, id as u32, Some(Event::writable(0))); Ok(id) } } impl Handler for ConnectOpsBatcher { - fn readable(&mut self, id: usize) { + fn event(&mut self, id: usize, event: Event) { log::debug!("connect-fd is readable {:?}", id); - self.feed.push_back((id, Change::Readable)); - } - - fn writable(&mut self, id: usize) { - log::debug!("connect-fd is writable {:?}", id); - self.feed.push_back((id, Change::Writable)); + self.feed.push_back((id, Change::Event(event))); } fn error(&mut self, id: usize, err: io::Error) { @@ -99,32 +93,33 @@ impl Handler for ConnectOpsBatcher { if connects.contains(id) { let item = connects.remove(id); match change { - Change::Readable => unreachable!(), - Change::Writable => { - let mut err: libc::c_int = 0; - let mut err_len = - std::mem::size_of::() as libc::socklen_t; + Change::Event(event) => { + if event.writable { + let mut err: libc::c_int = 0; + let mut err_len = + std::mem::size_of::() as libc::socklen_t; - let res = syscall!(libc::getsockopt( - item.fd.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_ERROR, - &mut err as *mut _ as *mut _, - &mut err_len - )); + let res = syscall!(libc::getsockopt( + item.fd.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_ERROR, + &mut err as *mut _ as *mut _, + &mut err_len + )); - let res = if err == 0 { - res.map(|_| ()) - } else { - Err(io::Error::from_raw_os_error(err)) - }; + let res = if err == 0 { + res.map(|_| ()) + } else { + Err(io::Error::from_raw_os_error(err)) + }; - self.inner.api.unregister_all(item.fd); - let _ = item.sender.send(res); + self.inner.api.detach(item.fd, id as u32); + let _ = item.sender.send(res); + } } Change::Error(err) => { let _ = item.sender.send(Err(err)); - self.inner.api.unregister_all(item.fd); + self.inner.api.detach(item.fd, id as u32); } } } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 559e8023..6739a088 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,7 +1,7 @@ use std::os::fd::{AsRawFd, RawFd}; use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task}; -use ntex_neon::driver::{DriverApi, Handler, Interest}; +use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; use slab::Slab; @@ -9,7 +9,7 @@ use ntex_bytes::BufMut; use ntex_io::IoContext; pub(crate) struct StreamCtl { - id: usize, + id: u32, inner: Rc>, } @@ -24,8 +24,7 @@ pub(crate) struct StreamOps(Rc>); #[derive(Debug)] enum Change { - Readable, - Writable, + Event(Event), Error(io::Error), } @@ -36,7 +35,7 @@ struct StreamOpsHandler { struct StreamOpsInner { api: DriverApi, - feed: Cell>>, + feed: Cell>>, streams: Cell>>>>, } @@ -62,19 +61,23 @@ impl StreamOps { } pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { + let fd = io.as_raw_fd(); let item = StreamItem { + fd, context, - fd: io.as_raw_fd(), io: Some(io), ref_count: 1, }; - self.with(|streams| { - let id = streams.insert(item); + let stream = self.with(move |streams| { + let id = streams.insert(item) as u32; StreamCtl { id, inner: self.0.clone(), } - }) + }); + + self.0.api.attach(fd, stream.id, None); + stream } fn with(&self, f: F) -> R @@ -95,14 +98,9 @@ impl Clone for StreamOps { } impl Handler for StreamOpsHandler { - fn readable(&mut self, id: usize) { + fn event(&mut self, id: usize, event: Event) { log::debug!("FD is readable {:?}", id); - self.feed.push_back((id, Change::Readable)); - } - - fn writable(&mut self, id: usize) { - log::debug!("FD is writable {:?}", id); - self.feed.push_back((id, Change::Writable)); + self.feed.push_back((id, Change::Event(event))); } fn error(&mut self, id: usize, err: io::Error) { @@ -120,56 +118,75 @@ impl Handler for StreamOpsHandler { for (id, change) in self.feed.drain(..) { match change { - Change::Readable => { + Change::Event(ev) => { let item = &mut streams[id]; - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!( - break libc::read(item.fd, b as _, chunk.len()) - )) - .inspect(|size| { - unsafe { buf.advance_mut(*size) }; - log::debug!( - "{}: {:?}, SIZE: {:?}", - item.context.tag(), - item.fd, - size - ); - }), - ) - }); + let mut renew_ev = Event::new(0, false, false).with_interrupt(); + if ev.readable { + let result = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let b = chunk.as_mut_ptr(); + task::Poll::Ready( + task::ready!(syscall!( + break libc::read(item.fd, b as _, chunk.len()) + )) + .inspect(|size| { + unsafe { buf.advance_mut(*size) }; + log::debug!( + "{}: {:?}, SIZE: {:?}", + item.context.tag(), + item.fd, + size + ); + }), + ) + }); - if item.io.is_some() && result.is_pending() { - self.inner.api.register(item.fd, id, Interest::Readable); + if item.io.is_some() && result.is_pending() { + if item.context.is_read_ready() { + renew_ev.readable = true; + } + } } - } - Change::Writable => { - let item = &mut streams[id]; - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: writing {:?} SIZE: {:?}", - item.context.tag(), - item.fd, - buf.len() - ); - let slice = &buf[..]; - syscall!( - break libc::write(item.fd, slice.as_ptr() as _, slice.len()) - ) - }); + if ev.writable { + let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: writing {:?} SIZE: {:?}", + item.context.tag(), + item.fd, + buf.len() + ); + let slice = &buf[..]; + syscall!( + break libc::write( + item.fd, + slice.as_ptr() as _, + slice.len() + ) + ) + }); - if item.io.is_some() && result.is_pending() { - log::debug!("{}: want write {:?}", item.context.tag(), item.fd,); - self.inner.api.register(item.fd, id, Interest::Writable); + if item.io.is_some() && result.is_pending() { + if item.context.is_write_ready() { + renew_ev.writable = true; + } + } + } + + if ev.is_interrupt() { + item.context.stopped(None); + if let Some(_) = item.io.take() { + close(id as u32, item.fd, &self.inner.api); + } + continue; + } else { + self.inner.api.modify(item.fd, id as u32, renew_ev); } } Change::Error(err) => { if let Some(item) = streams.get_mut(id) { item.context.stopped(Some(err)); if let Some(_) = item.io.take() { - close(id, item.fd, &self.inner.api); + close(id as u32, item.fd, &self.inner.api); } } } @@ -179,10 +196,10 @@ impl Handler for StreamOpsHandler { // extra let mut feed = self.inner.feed.take().unwrap(); for id in feed.drain(..) { - let item = &mut streams[id]; + let item = &mut streams[id as usize]; item.ref_count -= 1; if item.ref_count == 0 { - let item = streams.remove(id); + let item = streams.remove(id as usize); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", item.context.tag(), @@ -201,8 +218,8 @@ impl Handler for StreamOpsHandler { } } -fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { - api.unregister_all(fd); +fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { + api.detach(fd, id); ntex_rt::spawn_blocking(move || { syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; syscall!(libc::close(fd)) @@ -211,10 +228,10 @@ fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle StreamCtl { pub(crate) fn close(self) -> impl Future> { - let (io, fd) = - self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd)); + let id = self.id as usize; + let (io, fd) = self.with(|streams| (streams[id].io.take(), streams[id].fd)); let fut = if let Some(io) = io { - log::debug!("Closing ({}), {:?}", self.id, fd); + log::debug!("Closing ({}), {:?}", id, fd); std::mem::forget(io); Some(close(self.id, fd, &self.inner.api)) } else { @@ -234,53 +251,32 @@ impl StreamCtl { where F: FnOnce(Option<&T>) -> R, { - self.with(|streams| f(streams[self.id].io.as_ref())) + self.with(|streams| f(streams[self.id as usize].io.as_ref())) } - pub(crate) fn pause_all(&self) { + pub(crate) fn modify(&self, readable: bool, writable: bool) { self.with(|streams| { - let item = &mut streams[self.id]; + let item = &mut streams[self.id as usize]; log::debug!( - "{}: Pause all io ({}), {:?}", + "{}: Modify interest ({}), {:?} read: {:?}, write: {:?}", item.context.tag(), self.id, - item.fd - ); - self.inner.api.unregister_all(item.fd); - }) - } - - pub(crate) fn pause_read(&self) { - self.with(|streams| { - let item = &mut streams[self.id]; - - log::debug!( - "{}: Pause io read ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); - self.inner.api.unregister(item.fd, Interest::Readable); - }) - } - - pub(crate) fn resume_read(&self) { - self.with(|streams| { - let item = &mut streams[self.id]; - - log::debug!( - "{}: Resume io read ({}), {:?}", - item.context.tag(), - self.id, - item.fd + item.fd, + readable, + writable ); - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!(break libc::read(item.fd, b as _, chunk.len()))) + let mut event = Event::new(0, false, false).with_interrupt(); + + if readable { + let result = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let b = chunk.as_mut_ptr(); + task::Poll::Ready( + task::ready!(syscall!( + break libc::read(item.fd, b as _, chunk.len()) + )) .inspect(|size| { unsafe { buf.advance_mut(*size) }; log::debug!( @@ -290,45 +286,37 @@ impl StreamCtl { size ); }), - ) - }); + ) + }); - if item.io.is_some() && result.is_pending() { - self.inner - .api - .register(item.fd, self.id, Interest::Readable); + if item.io.is_some() && result.is_pending() { + if item.context.is_read_ready() { + event.readable = true; + } + } } - }) - } - pub(crate) fn resume_write(&self) { - self.with(|streams| { - let item = &mut streams[self.id]; + if writable { + let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: Writing io ({}), buf: {:?}", + item.context.tag(), + self.id, + buf.len() + ); - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: Writing io ({}), buf: {:?}", - item.context.tag(), - self.id, - buf.len() - ); + let slice = &buf[..]; + syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) + }); - let slice = &buf[..]; - syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) - }); - - if item.io.is_some() && result.is_pending() { - log::debug!( - "{}: Write is pending ({}), {:?}", - item.context.tag(), - self.id, - item.context.flags() - ); - - self.inner - .api - .register(item.fd, self.id, Interest::Writable); + if item.io.is_some() && result.is_pending() { + if item.context.is_write_ready() { + event.writable = true; + } + } } + + self.inner.api.modify(item.fd, self.id as u32, event); }) } @@ -346,7 +334,7 @@ impl StreamCtl { impl Clone for StreamCtl { fn clone(&self) -> Self { self.with(|streams| { - streams[self.id].ref_count += 1; + streams[self.id as usize].ref_count += 1; Self { id: self.id, inner: self.inner.clone(), @@ -358,9 +346,10 @@ impl Clone for StreamCtl { impl Drop for StreamCtl { fn drop(&mut self) { if let Some(mut streams) = self.inner.streams.take() { - streams[self.id].ref_count -= 1; - if streams[self.id].ref_count == 0 { - let item = streams.remove(self.id); + let id = self.id as usize; + streams[id].ref_count -= 1; + if streams[id].ref_count == 0 { + let item = streams.remove(id); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", item.context.tag(), diff --git a/ntex-net/src/rt_polling/io.rs b/ntex-net/src/rt_polling/io.rs index d6e3b4d9..990dae8f 100644 --- a/ntex-net/src/rt_polling/io.rs +++ b/ntex-net/src/rt_polling/io.rs @@ -54,21 +54,26 @@ enum Status { async fn run(ctl: StreamCtl, context: IoContext) { // Handle io read readiness let st = poll_fn(|cx| { + let mut modify = false; + let mut readable = false; + let mut writable = false; let read = match context.poll_read_ready(cx) { Poll::Ready(ReadStatus::Ready) => { - ctl.resume_read(); + modify = true; + readable = true; Poll::Pending } Poll::Ready(ReadStatus::Terminate) => Poll::Ready(()), Poll::Pending => { - ctl.pause_read(); + modify = true; Poll::Pending } }; let write = match context.poll_write_ready(cx) { Poll::Ready(WriteStatus::Ready) => { - ctl.resume_write(); + modify = true; + writable = true; Poll::Pending } Poll::Ready(WriteStatus::Shutdown) => Poll::Ready(Status::Shutdown), @@ -76,6 +81,10 @@ async fn run(ctl: StreamCtl, context: IoContext) { Poll::Pending => Poll::Pending, }; + if modify { + ctl.modify(readable, writable); + } + if read.is_pending() && write.is_pending() { Poll::Pending } else if write.is_ready() { @@ -86,7 +95,7 @@ async fn run(ctl: StreamCtl, context: IoContext) { }) .await; - ctl.resume_write(); + ctl.modify(false, true); context.shutdown(st == Status::Shutdown).await; context.stopped(ctl.close().await.err()); } diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 60dfbf17..c9ecdb0b 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -246,7 +246,7 @@ where .run(); crate::rt::spawn(async move { - sleep(Millis(75)).await; + sleep(Millis(125)).await; tx.send((system, srv, local_addr)).unwrap(); }); Ok(()) diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index e977c3b9..1307ad9f 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -698,7 +698,7 @@ where .run(); crate::rt::spawn(async move { - sleep(Millis(75)).await; + sleep(Millis(125)).await; tx.send((System::current(), srv, local_addr)).unwrap(); }); Ok(()) From bf6b1d6c7909beacb5d60da1473704ed00af60f6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 20 Mar 2025 08:56:31 +0100 Subject: [PATCH 04/20] Maintain interest info for poll driver (#536) --- ntex-net/src/rt_polling/driver.rs | 30 +++++++++++++++++++++++------- ntex-net/src/rt_polling/mod.rs | 3 +++ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 6739a088..385f15fa 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -13,11 +13,20 @@ pub(crate) struct StreamCtl { inner: Rc>, } +bitflags::bitflags! { + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Flags: u8 { + const RD = 0b0000_0001; + const WR = 0b0000_0010; + } +} + struct StreamItem { io: Option, fd: RawFd, + flags: Flags, + ref_count: u16, context: IoContext, - ref_count: usize, } pub(crate) struct StreamOps(Rc>); @@ -67,6 +76,7 @@ impl StreamOps { context, io: Some(io), ref_count: 1, + flags: Flags::empty(), }; let stream = self.with(move |streams| { let id = streams.insert(item) as u32; @@ -146,7 +156,10 @@ impl Handler for StreamOpsHandler { renew_ev.readable = true; } } + } else if item.flags.contains(Flags::RD) { + renew_ev.readable = true; } + if ev.writable { let result = item.context.with_write_buf(|buf| { log::debug!( @@ -166,10 +179,10 @@ impl Handler for StreamOpsHandler { }); if item.io.is_some() && result.is_pending() { - if item.context.is_write_ready() { - renew_ev.writable = true; - } + renew_ev.writable = true; } + } else if item.flags.contains(Flags::WR) { + renew_ev.writable = true; } if ev.is_interrupt() { @@ -179,6 +192,8 @@ impl Handler for StreamOpsHandler { } continue; } else { + item.flags.set(Flags::RD, renew_ev.readable); + item.flags.set(Flags::WR, renew_ev.writable); self.inner.api.modify(item.fd, id as u32, renew_ev); } } @@ -257,6 +272,7 @@ impl StreamCtl { pub(crate) fn modify(&self, readable: bool, writable: bool) { self.with(|streams| { let item = &mut streams[self.id as usize]; + item.flags = Flags::empty(); log::debug!( "{}: Modify interest ({}), {:?} read: {:?}, write: {:?}", @@ -292,6 +308,7 @@ impl StreamCtl { if item.io.is_some() && result.is_pending() { if item.context.is_read_ready() { event.readable = true; + item.flags.insert(Flags::RD); } } } @@ -310,9 +327,8 @@ impl StreamCtl { }); if item.io.is_some() && result.is_pending() { - if item.context.is_write_ready() { - event.writable = true; - } + event.writable = true; + item.flags.insert(Flags::WR); } } diff --git a/ntex-net/src/rt_polling/mod.rs b/ntex-net/src/rt_polling/mod.rs index 671b8493..b4fb928b 100644 --- a/ntex-net/src/rt_polling/mod.rs +++ b/ntex-net/src/rt_polling/mod.rs @@ -8,6 +8,9 @@ pub(crate) mod connect; mod driver; mod io; +#[cfg(not(target_pointer_width = "64"))] +compile_error!("Only 64bit platforms are supported"); + /// Tcp stream wrapper for neon TcpStream struct TcpStream(socket2::Socket); From 5484009c9240cf9a405ab160505d63134c1585d1 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 21 Mar 2025 08:21:45 +0100 Subject: [PATCH 05/20] Simplify neon poll impl (#537) --- ntex-net/CHANGES.md | 4 + ntex-net/Cargo.toml | 4 +- ntex-net/src/rt_polling/connect.rs | 79 +++--- ntex-net/src/rt_polling/driver.rs | 390 +++++++++++++---------------- 4 files changed, 217 insertions(+), 260 deletions(-) diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index b4851692..18d70d3a 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.7] - 2025-03-21 + +* Simplify neon poll impl + ## [2.5.6] - 2025-03-20 * Redesign neon poll support diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 11d7f3bf..6dc6af0f 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.6" +version = "2.5.7" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.6", optional = true } +ntex-neon = { version = "0.1.7", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/connect.rs b/ntex-net/src/rt_polling/connect.rs index 529c55b1..8f0f1dc9 100644 --- a/ntex-net/src/rt_polling/connect.rs +++ b/ntex-net/src/rt_polling/connect.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll}; +use std::{cell::RefCell, io, rc::Rc, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; @@ -17,7 +17,6 @@ enum Change { } struct ConnectOpsBatcher { - feed: VecDeque<(usize, Change)>, inner: Rc, } @@ -41,10 +40,7 @@ impl ConnectOps { connects: RefCell::new(Slab::new()), }); inner = Some(ops.clone()); - Box::new(ConnectOpsBatcher { - inner: ops, - feed: VecDeque::new(), - }) + Box::new(ConnectOpsBatcher { inner: ops }) }); ConnectOps(inner.unwrap()) @@ -74,55 +70,42 @@ impl ConnectOps { impl Handler for ConnectOpsBatcher { fn event(&mut self, id: usize, event: Event) { log::debug!("connect-fd is readable {:?}", id); - self.feed.push_back((id, Change::Event(event))); - } - - fn error(&mut self, id: usize, err: io::Error) { - self.feed.push_back((id, Change::Error(err))); - } - - fn commit(&mut self) { - if self.feed.is_empty() { - return; - } - log::debug!("Commit connect driver changes, num: {:?}", self.feed.len()); let mut connects = self.inner.connects.borrow_mut(); - for (id, change) in self.feed.drain(..) { - if connects.contains(id) { - let item = connects.remove(id); - match change { - Change::Event(event) => { - if event.writable { - let mut err: libc::c_int = 0; - let mut err_len = - std::mem::size_of::() as libc::socklen_t; + if connects.contains(id) { + let item = connects.remove(id); + if event.writable { + let mut err: libc::c_int = 0; + let mut err_len = std::mem::size_of::() as libc::socklen_t; - let res = syscall!(libc::getsockopt( - item.fd.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_ERROR, - &mut err as *mut _ as *mut _, - &mut err_len - )); + let res = syscall!(libc::getsockopt( + item.fd.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_ERROR, + &mut err as *mut _ as *mut _, + &mut err_len + )); - let res = if err == 0 { - res.map(|_| ()) - } else { - Err(io::Error::from_raw_os_error(err)) - }; + let res = if err == 0 { + res.map(|_| ()) + } else { + Err(io::Error::from_raw_os_error(err)) + }; - self.inner.api.detach(item.fd, id as u32); - let _ = item.sender.send(res); - } - } - Change::Error(err) => { - let _ = item.sender.send(Err(err)); - self.inner.api.detach(item.fd, id as u32); - } - } + self.inner.api.detach(item.fd, id as u32); + let _ = item.sender.send(res); } } } + + fn error(&mut self, id: usize, err: io::Error) { + let mut connects = self.inner.connects.borrow_mut(); + + if connects.contains(id) { + let item = connects.remove(id); + let _ = item.sender.send(Err(err)); + self.inner.api.detach(item.fd, id as u32); + } + } } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 385f15fa..eaa6d589 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, collections::VecDeque, future::Future, io, rc::Rc, task}; +use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; @@ -14,7 +14,7 @@ pub(crate) struct StreamCtl { } bitflags::bitflags! { - #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + #[derive(Copy, Clone, Debug)] struct Flags: u8 { const RD = 0b0000_0001; const WR = 0b0000_0010; @@ -31,23 +31,23 @@ struct StreamItem { pub(crate) struct StreamOps(Rc>); -#[derive(Debug)] -enum Change { - Event(Event), - Error(io::Error), -} - struct StreamOpsHandler { - feed: VecDeque<(usize, Change)>, inner: Rc>, } struct StreamOpsInner { api: DriverApi, - feed: Cell>>, + delayd_drop: Cell, + feed: RefCell>, streams: Cell>>>>, } +impl StreamItem { + fn tag(&self) -> &'static str { + self.context.tag() + } +} + impl StreamOps { pub(crate) fn current() -> Self { Runtime::value(|rt| { @@ -55,14 +55,12 @@ impl StreamOps { rt.driver().register(|api| { let ops = Rc::new(StreamOpsInner { api, - feed: Cell::new(Some(VecDeque::new())), + feed: RefCell::new(Vec::new()), + delayd_drop: Cell::new(false), streams: Cell::new(Some(Box::new(Slab::new()))), }); inner = Some(ops.clone()); - Box::new(StreamOpsHandler { - inner: ops, - feed: VecDeque::new(), - }) + Box::new(StreamOpsHandler { inner: ops }) }); StreamOps(inner.unwrap()) @@ -71,34 +69,27 @@ impl StreamOps { pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { let fd = io.as_raw_fd(); - let item = StreamItem { - fd, - context, - io: Some(io), - ref_count: 1, - flags: Flags::empty(), - }; - let stream = self.with(move |streams| { - let id = streams.insert(item) as u32; + let stream = self.0.with(move |streams| { + let item = StreamItem { + fd, + context, + io: Some(io), + ref_count: 1, + flags: Flags::empty(), + }; StreamCtl { - id, + id: streams.insert(item) as u32, inner: self.0.clone(), } }); - self.0.api.attach(fd, stream.id, None); + self.0.api.attach( + fd, + stream.id, + Some(Event::new(0, false, false).with_interrupt()), + ); stream } - - fn with(&self, f: F) -> R - where - F: FnOnce(&mut Slab>) -> R, - { - let mut inner = self.0.streams.take().unwrap(); - let result = f(&mut inner); - self.0.streams.set(Some(inner)); - result - } } impl Clone for StreamOps { @@ -108,128 +99,112 @@ impl Clone for StreamOps { } impl Handler for StreamOpsHandler { - fn event(&mut self, id: usize, event: Event) { - log::debug!("FD is readable {:?}", id); - self.feed.push_back((id, Change::Event(event))); + fn event(&mut self, id: usize, ev: Event) { + log::debug!("FD event {:?} event: {:?}", id, ev); + + self.inner.with(|streams| { + if !streams.contains(id) { + return; + } + let item = &mut streams[id]; + + // handle HUP + if ev.is_interrupt() { + item.context.stopped(None); + if item.io.take().is_some() { + close(id as u32, item.fd, &self.inner.api); + } + return; + } + + let mut renew_ev = Event::new(0, false, false).with_interrupt(); + + if ev.readable { + let res = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let result = task::ready!(syscall!( + break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) + )); + if let Ok(size) = result { + log::debug!("{}: data {:?}, s: {:?}", item.tag(), item.fd, size); + unsafe { buf.advance_mut(size) }; + } + Poll::Ready(result) + }); + + if res.is_pending() && item.context.is_read_ready() { + renew_ev.readable = true; + item.flags.insert(Flags::RD); + } else { + item.flags.remove(Flags::RD); + } + } else if item.flags.contains(Flags::RD) { + renew_ev.readable = true; + } + + if ev.writable { + let result = item.context.with_write_buf(|buf| { + log::debug!("{}: write {:?} s: {:?}", item.tag(), item.fd, buf.len()); + syscall!(break libc::write(item.fd, buf[..].as_ptr() as _, buf.len())) + }); + if result.is_pending() { + renew_ev.writable = true; + item.flags.insert(Flags::WR); + } else { + item.flags.remove(Flags::WR); + } + } else if item.flags.contains(Flags::WR) { + renew_ev.writable = true; + } + + self.inner.api.modify(item.fd, id as u32, renew_ev); + + // delayed drops + if self.inner.delayd_drop.get() { + for id in self.inner.feed.borrow_mut().drain(..) { + let item = &mut streams[id as usize]; + item.ref_count -= 1; + if item.ref_count == 0 { + let item = streams.remove(id as usize); + log::debug!( + "{}: Drop ({}), {:?}, has-io: {}", + item.tag(), + id, + item.fd, + item.io.is_some() + ); + if item.io.is_some() { + close(id, item.fd, &self.inner.api); + } + } + } + self.inner.delayd_drop.set(false); + } + }); } fn error(&mut self, id: usize, err: io::Error) { - log::debug!("FD is failed {:?}, err: {:?}", id, err); - self.feed.push_back((id, Change::Error(err))); + self.inner.with(|streams| { + if let Some(item) = streams.get_mut(id) { + log::debug!("FD is failed ({}) {:?}, err: {:?}", id, item.fd, err); + item.context.stopped(Some(err)); + if item.io.take().is_some() { + close(id as u32, item.fd, &self.inner.api); + } + } + }) } +} - fn commit(&mut self) { - if self.feed.is_empty() { - return; - } - log::debug!("Commit changes, num: {:?}", self.feed.len()); - - let mut streams = self.inner.streams.take().unwrap(); - - for (id, change) in self.feed.drain(..) { - match change { - Change::Event(ev) => { - let item = &mut streams[id]; - let mut renew_ev = Event::new(0, false, false).with_interrupt(); - if ev.readable { - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!( - break libc::read(item.fd, b as _, chunk.len()) - )) - .inspect(|size| { - unsafe { buf.advance_mut(*size) }; - log::debug!( - "{}: {:?}, SIZE: {:?}", - item.context.tag(), - item.fd, - size - ); - }), - ) - }); - - if item.io.is_some() && result.is_pending() { - if item.context.is_read_ready() { - renew_ev.readable = true; - } - } - } else if item.flags.contains(Flags::RD) { - renew_ev.readable = true; - } - - if ev.writable { - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: writing {:?} SIZE: {:?}", - item.context.tag(), - item.fd, - buf.len() - ); - let slice = &buf[..]; - syscall!( - break libc::write( - item.fd, - slice.as_ptr() as _, - slice.len() - ) - ) - }); - - if item.io.is_some() && result.is_pending() { - renew_ev.writable = true; - } - } else if item.flags.contains(Flags::WR) { - renew_ev.writable = true; - } - - if ev.is_interrupt() { - item.context.stopped(None); - if let Some(_) = item.io.take() { - close(id as u32, item.fd, &self.inner.api); - } - continue; - } else { - item.flags.set(Flags::RD, renew_ev.readable); - item.flags.set(Flags::WR, renew_ev.writable); - self.inner.api.modify(item.fd, id as u32, renew_ev); - } - } - Change::Error(err) => { - if let Some(item) = streams.get_mut(id) { - item.context.stopped(Some(err)); - if let Some(_) = item.io.take() { - close(id as u32, item.fd, &self.inner.api); - } - } - } - } - } - - // extra - let mut feed = self.inner.feed.take().unwrap(); - for id in feed.drain(..) { - let item = &mut streams[id as usize]; - item.ref_count -= 1; - if item.ref_count == 0 { - let item = streams.remove(id as usize); - log::debug!( - "{}: Drop io ({}), {:?}, has-io: {}", - item.context.tag(), - id, - item.fd, - item.io.is_some() - ); - if item.io.is_some() { - close(id, item.fd, &self.inner.api); - } - } - } - - self.inner.feed.set(Some(feed)); - self.inner.streams.set(Some(streams)); +impl StreamOpsInner { + fn with(&self, f: F) -> R + where + F: FnOnce(&mut Slab>) -> R, + { + let mut streams = self.streams.take().unwrap(); + let result = f(&mut streams); + self.streams.set(Some(streams)); + result } } @@ -244,7 +219,9 @@ fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle StreamCtl { pub(crate) fn close(self) -> impl Future> { let id = self.id as usize; - let (io, fd) = self.with(|streams| (streams[id].io.take(), streams[id].fd)); + let (io, fd) = self + .inner + .with(|streams| (streams[id].io.take(), streams[id].fd)); let fut = if let Some(io) = io { log::debug!("Closing ({}), {:?}", id, fd); std::mem::forget(io); @@ -266,90 +243,84 @@ impl StreamCtl { where F: FnOnce(Option<&T>) -> R, { - self.with(|streams| f(streams[self.id as usize].io.as_ref())) + self.inner + .with(|streams| f(streams[self.id as usize].io.as_ref())) } - pub(crate) fn modify(&self, readable: bool, writable: bool) { - self.with(|streams| { + pub(crate) fn modify(&self, rd: bool, wr: bool) { + self.inner.with(|streams| { let item = &mut streams[self.id as usize]; - item.flags = Flags::empty(); log::debug!( - "{}: Modify interest ({}), {:?} read: {:?}, write: {:?}", - item.context.tag(), + "{}: Modify interest ({}), {:?} rd: {:?}, wr: {:?}", + item.tag(), self.id, item.fd, - readable, - writable + rd, + wr ); let mut event = Event::new(0, false, false).with_interrupt(); - if readable { - let result = item.context.with_read_buf(|buf| { - let chunk = buf.chunk_mut(); - let b = chunk.as_mut_ptr(); - task::Poll::Ready( - task::ready!(syscall!( - break libc::read(item.fd, b as _, chunk.len()) - )) - .inspect(|size| { - unsafe { buf.advance_mut(*size) }; + if rd { + if item.flags.contains(Flags::RD) { + event.readable = true; + } else { + let res = item.context.with_read_buf(|buf| { + let chunk = buf.chunk_mut(); + let result = task::ready!(syscall!( + break libc::read(item.fd, chunk.as_mut_ptr() as _, chunk.len()) + )); + if let Ok(size) = result { log::debug!( - "{}: {:?}, SIZE: {:?}", - item.context.tag(), + "{}: read {:?}, s: {:?}", + item.tag(), item.fd, size ); - }), - ) - }); + unsafe { buf.advance_mut(size) }; + } + Poll::Ready(result) + }); - if item.io.is_some() && result.is_pending() { - if item.context.is_read_ready() { + if res.is_pending() && item.context.is_read_ready() { event.readable = true; item.flags.insert(Flags::RD); } } } - if writable { - let result = item.context.with_write_buf(|buf| { - log::debug!( - "{}: Writing io ({}), buf: {:?}", - item.context.tag(), - self.id, - buf.len() - ); - - let slice = &buf[..]; - syscall!(break libc::write(item.fd, slice.as_ptr() as _, slice.len())) - }); - - if item.io.is_some() && result.is_pending() { + if wr { + if item.flags.contains(Flags::WR) { event.writable = true; - item.flags.insert(Flags::WR); + } else { + let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: Writing ({}), buf: {:?}", + item.tag(), + self.id, + buf.len() + ); + syscall!( + break libc::write(item.fd, buf[..].as_ptr() as _, buf.len()) + ) + }); + + if result.is_pending() { + event.writable = true; + item.flags.insert(Flags::WR); + } } } - self.inner.api.modify(item.fd, self.id as u32, event); + self.inner.api.modify(item.fd, self.id, event); }) } - - fn with(&self, f: F) -> R - where - F: FnOnce(&mut Slab>) -> R, - { - let mut inner = self.inner.streams.take().unwrap(); - let result = f(&mut inner); - self.inner.streams.set(Some(inner)); - result - } } impl Clone for StreamCtl { fn clone(&self) -> Self { - self.with(|streams| { + self.inner.with(|streams| { streams[self.id as usize].ref_count += 1; Self { id: self.id, @@ -368,7 +339,7 @@ impl Drop for StreamCtl { let item = streams.remove(id); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", - item.context.tag(), + item.tag(), self.id, item.fd, item.io.is_some() @@ -379,9 +350,8 @@ impl Drop for StreamCtl { } self.inner.streams.set(Some(streams)); } else { - let mut feed = self.inner.feed.take().unwrap(); - feed.push_back(self.id); - self.inner.feed.set(Some(feed)); + self.inner.delayd_drop.set(true); + self.inner.feed.borrow_mut().push(self.id); } } } From b32df88500a03d5372288e9e9813282c8a2184f9 Mon Sep 17 00:00:00 2001 From: Drew Pirrone-Brusse Date: Sat, 22 Mar 2025 17:16:00 -0400 Subject: [PATCH 06/20] Publicize web::app_service::AppService (#534) --- ntex/src/web/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ntex/src/web/mod.rs b/ntex/src/web/mod.rs index cf1686e0..6c3d37b1 100644 --- a/ntex/src/web/mod.rs +++ b/ntex/src/web/mod.rs @@ -128,6 +128,7 @@ pub mod dev { //! The purpose of this module is to alleviate imports of many common //! traits by adding a glob import to the top of ntex::web heavy modules: + pub use crate::web::app_service::AppService; pub use crate::web::config::AppConfig; pub use crate::web::info::ConnectionInfo; pub use crate::web::rmap::ResourceMap; From eaec50d8a26016184fcc1c3956a0f1b602431d05 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 22 Mar 2025 22:17:59 +0100 Subject: [PATCH 07/20] Prepare release (#538) --- ntex/CHANGES.md | 4 +++- ntex/Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 2e6b077e..cb75aabc 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,6 +1,8 @@ # Changes -## [2.12.3] - 2025-03-xx +## [2.12.3] - 2025-03-22 + +* web: Export web::app_service::AppService #534 * http: Add delay for test server availability, could cause connect race diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index d8cbdd26..5cd86a12 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "2.12.2" +version = "2.12.3" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" From e903e65e279a4d4281ce2b27f811f6ba257feb48 Mon Sep 17 00:00:00 2001 From: Ruangyot Nanchiang Date: Tue, 25 Mar 2025 18:31:09 +0700 Subject: [PATCH 08/20] add public ServiceConfig::register constructor to support external configuration (#250) (#539) * add public ServiceConfig::register constructor to support external configuration (#250) * fix: doctest ServiceConfig::register() error (#250) * add unit testing for ServiceConfig::register() * replace pub(crate) to pub in ServiceConfig::new() (#250) --------- Co-authored-by: RuangyotN --- ntex/src/web/config.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ntex/src/web/config.rs b/ntex/src/web/config.rs index 91c5034c..c7edceb6 100644 --- a/ntex/src/web/config.rs +++ b/ntex/src/web/config.rs @@ -68,7 +68,7 @@ pub struct ServiceConfig { } impl ServiceConfig { - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self { services: Vec::new(), state: Extensions::new(), @@ -132,7 +132,7 @@ mod tests { use crate::http::{Method, StatusCode}; use crate::util::Bytes; use crate::web::test::{call_service, init_service, read_body, TestRequest}; - use crate::web::{self, App, HttpRequest, HttpResponse}; + use crate::web::{self, App, DefaultError, HttpRequest, HttpResponse}; #[crate::rt_test] async fn test_configure_state() { @@ -205,4 +205,11 @@ mod tests { let resp = call_service(&srv, req).await; assert_eq!(resp.status(), StatusCode::OK); } + + #[test] + fn test_new_service_config() { + let cfg: ServiceConfig = ServiceConfig::new(); + assert!(cfg.services.is_empty()); + assert!(cfg.external.is_empty()); + } } From 0d3f1293c9d5bbe5dd32f5be52207678414947ef Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 25 Mar 2025 12:40:42 +0100 Subject: [PATCH 09/20] Update neon runtime (#540) --- ntex-net/CHANGES.md | 4 ++++ ntex-net/Cargo.toml | 10 +++++----- ntex/Cargo.toml | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 18d70d3a..800ff027 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.8] - 2025-03-25 + +* Update neon runtime + ## [2.5.7] - 2025-03-21 * Simplify neon poll impl diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 6dc6af0f..a10b84be 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.7" +version = "2.5.8" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -27,8 +27,8 @@ compio = ["ntex-rt/compio", "ntex-compio"] # neon runtime neon = ["ntex-rt/neon", "ntex-neon", "slab", "socket2"] -polling = ["ntex-neon/polling", "dep:polling"] -io-uring = ["ntex-neon/io-uring", "dep:io-uring"] +polling = ["ntex-neon/polling", "dep:polling", "socket2"] +io-uring = ["ntex-neon/io-uring", "dep:io-uring", "socket2"] [dependencies] ntex-service = "3.3" @@ -40,14 +40,14 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.7", optional = true } +ntex-neon = { version = "0.1.10", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } log = { workspace = true } libc = { workspace = true } slab = { workspace = true, optional = true } -socket2 = { workspace = true, optional = true } +socket2 = { workspace = true, optional = true, features = ["all"] } thiserror = { workspace = true } # Linux specific dependencies diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 5cd86a12..301d239e 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -72,7 +72,7 @@ ntex-server = "2.7" ntex-h2 = "1.8.6" ntex-rt = "0.4.27" ntex-io = "2.11" -ntex-net = "2.5" +ntex-net = "2.5.8" ntex-tls = "2.3" base64 = "0.22" From eb4ec4b3e1502b68265b1291229ced7ed66fceef Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 26 Mar 2025 14:40:05 +0100 Subject: [PATCH 10/20] Add Arbiter::get_value() helper method (#541) --- ntex-net/Cargo.toml | 2 +- ntex-rt/CHANGES.md | 4 ++++ ntex-rt/Cargo.toml | 6 +++--- ntex-rt/src/arbiter.rs | 20 ++++++++++++++++++++ 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index a10b84be..71791e06 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.10", optional = true } +ntex-neon = { version = "0.1.11", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index f2ab4736..2afd5bd6 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.29] - 2025-03-26 + +* Add Arbiter::get_value() helper method + ## [0.4.27] - 2025-03-14 * Add srbiters pings ttl diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 0526e450..e133ceb4 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.28" +version = "0.4.29" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -32,8 +32,8 @@ neon = ["ntex-neon"] [dependencies] async-channel = "2" futures-timer = "3.0" -log = "0.4" oneshot = "0.1" +log = "0.4" compio-driver = { version = "0.6", optional = true } compio-runtime = { version = "0.6", optional = true } @@ -42,7 +42,7 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } -ntex-neon = { version = "0.1.1", optional = true } +ntex-neon = { version = "0.1.11", optional = true } [dev-dependencies] env_logger = "0.11" diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index 48a673ca..e20ab282 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -286,6 +286,25 @@ impl Arbiter { }) } + /// Get a type previously inserted to this runtime or create new one. + pub fn get_value(f: F) -> T + where + T: Clone + 'static, + F: FnOnce() -> T, + { + STORAGE.with(move |cell| { + let mut st = cell.borrow_mut(); + if let Some(boxed) = st.get(&TypeId::of::()) { + if let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::() { + return val.clone(); + } + } + let val = f(); + st.insert(TypeId::of::(), Box::new(val.clone())); + val + }) + } + /// Wait for the event loop to stop by joining the underlying thread (if have Some). pub fn join(&mut self) -> thread::Result<()> { if let Some(thread_handle) = self.thread_handle.take() { @@ -355,6 +374,7 @@ mod tests { assert!(Arbiter::get_item::<&'static str, _, _>(|s| *s == "test")); assert!(Arbiter::get_mut_item::<&'static str, _, _>(|s| *s == "test")); assert!(Arbiter::contains_item::<&'static str>()); + assert!(Arbiter::get_value(|| 64u64) == 64); assert!(format!("{:?}", Arbiter::current()).contains("Arbiter")); } } From b2915f48681b9a42630047ee20b80871fb2fd0f4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 27 Mar 2025 20:45:43 +0100 Subject: [PATCH 11/20] Improve tests (#543) --- ntex-net/CHANGES.md | 4 +++ ntex-net/Cargo.toml | 2 +- ntex-net/src/rt_polling/driver.rs | 60 +++++++++++++++++++------------ ntex-net/src/rt_uring/driver.rs | 26 ++++++-------- ntex-server/CHANGES.md | 4 +++ ntex-server/Cargo.toml | 2 +- ntex-server/src/manager.rs | 4 +-- ntex-server/src/net/accept.rs | 28 +++++++++++---- ntex-server/src/wrk.rs | 44 +++++++++++++++-------- ntex/Cargo.toml | 1 + ntex/tests/http_awc_client.rs | 22 ++++++------ ntex/tests/http_openssl.rs | 15 +++++--- ntex/tests/http_server.rs | 25 ++++++++----- 13 files changed, 151 insertions(+), 86 deletions(-) diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 800ff027..a16145fc 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.9] - 2025-03-27 + +* Handle closed sockets + ## [2.5.8] - 2025-03-25 * Update neon runtime diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 71791e06..8e7335c6 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.8" +version = "2.5.9" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index eaa6d589..c179a77a 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -16,8 +16,9 @@ pub(crate) struct StreamCtl { bitflags::bitflags! { #[derive(Copy, Clone, Debug)] struct Flags: u8 { - const RD = 0b0000_0001; - const WR = 0b0000_0010; + const RD = 0b0000_0001; + const WR = 0b0000_0010; + const CLOSED = 0b0000_0100; } } @@ -100,19 +101,22 @@ impl Clone for StreamOps { impl Handler for StreamOpsHandler { fn event(&mut self, id: usize, ev: Event) { - log::debug!("FD event {:?} event: {:?}", id, ev); - self.inner.with(|streams| { if !streams.contains(id) { return; } let item = &mut streams[id]; + log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); + + if item.flags.contains(Flags::CLOSED) { + return; + } // handle HUP if ev.is_interrupt() { item.context.stopped(None); if item.io.take().is_some() { - close(id as u32, item.fd, &self.inner.api); + close(id as u32, item, &self.inner.api); } return; } @@ -165,7 +169,7 @@ impl Handler for StreamOpsHandler { let item = &mut streams[id as usize]; item.ref_count -= 1; if item.ref_count == 0 { - let item = streams.remove(id as usize); + let mut item = streams.remove(id as usize); log::debug!( "{}: Drop ({}), {:?}, has-io: {}", item.tag(), @@ -174,7 +178,7 @@ impl Handler for StreamOpsHandler { item.io.is_some() ); if item.io.is_some() { - close(id, item.fd, &self.inner.api); + close(id, &mut item, &self.inner.api); } } } @@ -186,10 +190,16 @@ impl Handler for StreamOpsHandler { fn error(&mut self, id: usize, err: io::Error) { self.inner.with(|streams| { if let Some(item) = streams.get_mut(id) { - log::debug!("FD is failed ({}) {:?}, err: {:?}", id, item.fd, err); + log::debug!( + "{}: FD is failed ({}) {:?}, err: {:?}", + item.tag(), + id, + item.fd, + err + ); item.context.stopped(Some(err)); if item.io.take().is_some() { - close(id as u32, item.fd, &self.inner.api); + close(id as u32, item, &self.inner.api); } } }) @@ -208,7 +218,13 @@ impl StreamOpsInner { } } -fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { +fn close( + id: u32, + item: &mut StreamItem, + api: &DriverApi, +) -> ntex_rt::JoinHandle> { + let fd = item.fd; + item.flags.insert(Flags::CLOSED); api.detach(fd, id); ntex_rt::spawn_blocking(move || { syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; @@ -219,16 +235,16 @@ fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle StreamCtl { pub(crate) fn close(self) -> impl Future> { let id = self.id as usize; - let (io, fd) = self - .inner - .with(|streams| (streams[id].io.take(), streams[id].fd)); - let fut = if let Some(io) = io { - log::debug!("Closing ({}), {:?}", id, fd); - std::mem::forget(io); - Some(close(self.id, fd, &self.inner.api)) - } else { - None - }; + let fut = self.inner.with(|streams| { + let item = &mut streams[id]; + if let Some(io) = item.io.take() { + log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); + std::mem::forget(io); + Some(close(self.id, item, &self.inner.api)) + } else { + None + } + }); async move { if let Some(fut) = fut { fut.await @@ -336,7 +352,7 @@ impl Drop for StreamCtl { let id = self.id as usize; streams[id].ref_count -= 1; if streams[id].ref_count == 0 { - let item = streams.remove(id); + let mut item = streams.remove(id); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", item.tag(), @@ -345,7 +361,7 @@ impl Drop for StreamCtl { item.io.is_some() ); if item.io.is_some() { - close(self.id, item.fd, &self.inner.api); + close(self.id, &mut item, &self.inner.api); } } self.inner.streams.set(Some(streams)); diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index f2a88d11..d39d69e8 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -33,6 +33,12 @@ struct StreamItem { wr_op: Option, } +impl StreamItem { + fn tag(&self) -> &'static str { + self.context.tag() + } +} + enum Operation { Recv { id: usize, @@ -249,7 +255,7 @@ impl Handler for StreamOpsHandler { if storage.streams[id].ref_count == 0 { let mut item = storage.streams.remove(id); - log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd); + log::debug!("{}: Drop io ({}), {:?}", item.tag(), id, item.fd); if let Some(io) = item.io.take() { mem::forget(io); @@ -273,7 +279,7 @@ impl StreamOpsStorage { if let Poll::Ready(mut buf) = item.context.get_read_buf() { log::debug!( "{}: Recv resume ({}), {:?} rem: {:?}", - item.context.tag(), + item.tag(), id, item.fd, buf.remaining_mut() @@ -306,7 +312,7 @@ impl StreamOpsStorage { if let Poll::Ready(buf) = item.context.get_write_buf() { log::debug!( "{}: Send resume ({}), {:?} len: {:?}", - item.context.tag(), + item.tag(), id, item.fd, buf.len() @@ -396,12 +402,7 @@ impl StreamCtl { if let Some(rd_op) = item.rd_op { if !item.flags.contains(Flags::RD_CANCELING) { - log::debug!( - "{}: Recv to pause ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); + log::debug!("{}: Recv to pause ({}), {:?}", item.tag(), self.id, item.fd); item.flags.insert(Flags::RD_CANCELING); self.inner.api.cancel(rd_op.get()); } @@ -426,12 +427,7 @@ impl Drop for StreamCtl { if storage.streams[self.id].ref_count == 0 { let mut item = storage.streams.remove(self.id); if let Some(io) = item.io.take() { - log::debug!( - "{}: Close io ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); + log::debug!("{}: Close io ({}), {:?}", item.tag(), self.id, item.fd); mem::forget(io); let id = storage.ops.insert(Operation::Close { tx: None }); diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index 7f1d8302..0d8dabc5 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.7.2] - 2025-03-27 + +* Handle paused state + ## [2.7.1] - 2025-02-28 * Fix set core affinity out of worker start #508 diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index ed962fc0..bca5f8b2 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.1" +version = "2.7.2" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs index f0be9c40..ca558a54 100644 --- a/ntex-server/src/manager.rs +++ b/ntex-server/src/manager.rs @@ -55,7 +55,7 @@ impl ServerManager { let no_signals = cfg.no_signals; let shared = Arc::new(ServerShared { - paused: AtomicBool::new(true), + paused: AtomicBool::new(false), }); let mgr = ServerManager(Rc::new(Inner { cfg, @@ -212,7 +212,7 @@ impl HandleCmdState { match upd { Update::Available(worker) => { self.workers.push(worker); - if self.workers.len() == 1 { + if !self.workers.is_empty() { self.mgr.resume(); } else { self.workers.sort(); diff --git a/ntex-server/src/net/accept.rs b/ntex-server/src/net/accept.rs index 332fc846..31793d82 100644 --- a/ntex-server/src/net/accept.rs +++ b/ntex-server/src/net/accept.rs @@ -92,12 +92,14 @@ impl AcceptLoop { /// Start accept loop pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) { + let (tx, rx_start) = oneshot::channel(); let (rx, poll) = self .inner .take() .expect("AcceptLoop cannot be used multiple times"); Accept::start( + tx, rx, poll, socks, @@ -105,6 +107,8 @@ impl AcceptLoop { self.notify.clone(), self.status_handler.take(), ); + + let _ = rx_start.recv(); } } @@ -121,6 +125,7 @@ impl fmt::Debug for AcceptLoop { struct Accept { poller: Arc, rx: mpsc::Receiver, + tx: Option>, sockets: Vec, srv: Server, notify: AcceptNotify, @@ -131,6 +136,7 @@ struct Accept { impl Accept { fn start( + tx: oneshot::Sender<()>, rx: mpsc::Receiver, poller: Arc, socks: Vec<(Token, Listener)>, @@ -145,11 +151,12 @@ impl Accept { .name("ntex-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - Accept::new(rx, poller, socks, srv, notify, status_handler).poll() + Accept::new(tx, rx, poller, socks, srv, notify, status_handler).poll() }); } fn new( + tx: oneshot::Sender<()>, rx: mpsc::Receiver, poller: Arc, socks: Vec<(Token, Listener)>, @@ -175,6 +182,7 @@ impl Accept { notify, srv, status_handler, + tx: Some(tx), backpressure: true, backlog: VecDeque::new(), } @@ -192,8 +200,9 @@ impl Accept { // Create storage for events let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap()); + let mut timeout = Some(Duration::ZERO); loop { - if let Err(e) = self.poller.wait(&mut events, None) { + if let Err(e) = self.poller.wait(&mut events, timeout) { if e.kind() == io::ErrorKind::Interrupted { continue; } else { @@ -201,10 +210,17 @@ impl Accept { } } - for event in events.iter() { - let readd = self.accept(event.key); - if readd { - self.add_source(event.key); + if timeout.is_some() { + timeout = None; + let _ = self.tx.take().unwrap().send(()); + } + + for idx in 0..self.sockets.len() { + if self.sockets[idx].registered.get() { + let readd = self.accept(idx); + if readd { + self.add_source(idx); + } } } diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index a61f7731..b9092d0f 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -99,10 +99,10 @@ impl Worker { log::debug!("Creating server instance in {:?}", id); let factory = cfg.create().await; - log::debug!("Server instance has been created in {:?}", id); match create(id, rx1, rx2, factory, avail_tx).await { Ok((svc, wrk)) => { + log::debug!("Server instance has been created in {:?}", id); run_worker(svc, wrk).await; } Err(e) => { @@ -241,7 +241,7 @@ impl WorkerAvailabilityTx { /// Worker accepts message via unbounded channel and starts processing. struct WorkerSt> { id: WorkerId, - rx: Pin>>, + rx: Receiver, stop: Pin>>, factory: F, availability: WorkerAvailabilityTx, @@ -253,20 +253,36 @@ where F: ServiceFactory + 'static, { loop { + let mut recv = std::pin::pin!(wrk.rx.recv()); let fut = poll_fn(|cx| { - ready!(svc.poll_ready(cx)?); - - if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) { - let fut = svc.call(item); - let _ = spawn(async move { - let _ = fut.await; - }); + match svc.poll_ready(cx) { + Poll::Ready(res) => { + res?; + wrk.availability.set(true); + } + Poll::Pending => { + wrk.availability.set(false); + return Poll::Pending; + } + } + + match ready!(recv.as_mut().poll(cx)) { + Ok(item) => { + let fut = svc.call(item); + let _ = spawn(async move { + let _ = fut.await; + }); + Poll::Ready(Ok::<_, F::Error>(true)) + } + Err(_) => { + log::error!("Server is gone"); + Poll::Ready(Ok(false)) + } } - Poll::Ready(Ok::<(), F::Error>(())) }); match select(fut, stream_recv(&mut wrk.stop)).await { - Either::Left(Ok(())) => continue, + Either::Left(Ok(true)) => continue, Either::Left(Err(_)) => { let _ = ntex_rt::spawn(async move { svc.shutdown().await; @@ -285,7 +301,7 @@ where stop_svc(wrk.id, svc, timeout, Some(result)).await; return; } - Either::Right(None) => { + Either::Left(Ok(false)) | Either::Right(None) => { stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; return; } @@ -336,8 +352,6 @@ where { availability.set(false); let factory = factory?; - - let rx = Box::pin(rx); let mut stop = Box::pin(stop); let svc = match select(factory.create(()), stream_recv(&mut stop)).await { @@ -356,9 +370,9 @@ where svc, WorkerSt { id, + rx, factory, availability, - rx: Box::pin(rx), stop: Box::pin(stop), }, )) diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 301d239e..09655c7a 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -114,6 +114,7 @@ flate2 = { version = "1.0", optional = true } [dev-dependencies] rand = "0.8" time = "0.3" +oneshot = "0.1" futures-util = "0.3" tls-openssl = { version = "0.10", package = "openssl" } tls-rustls = { version = "0.23", package = "rustls", features = ["ring", "std"], default-features = false } diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index 00f304b9..bd4c7e0a 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -508,19 +508,21 @@ async fn test_client_gzip_encoding_large() { async fn test_client_gzip_encoding_large_random() { let data = rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) - .take(100_000) + .take(1_048_500) .map(char::from) .collect::(); let srv = test::server(|| { - App::new().service(web::resource("/").route(web::to(|data: Bytes| async move { - let mut e = GzEncoder::new(Vec::new(), Compression::default()); - e.write_all(&data).unwrap(); - let data = e.finish().unwrap(); - HttpResponse::Ok() - .header("content-encoding", "gzip") - .body(data) - }))) + App::new() + .state(web::types::PayloadConfig::default().limit(1_048_576)) + .service(web::resource("/").route(web::to(|data: Bytes| async move { + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + e.write_all(&data).unwrap(); + let data = e.finish().unwrap(); + HttpResponse::Ok() + .header("content-encoding", "gzip") + .body(data) + }))) }); // client request @@ -528,7 +530,7 @@ async fn test_client_gzip_encoding_large_random() { assert!(response.status().is_success()); // read response - let bytes = response.body().await.unwrap(); + let bytes = response.body().limit(1_048_576).await.unwrap(); assert_eq!(bytes, Bytes::from(data)); } diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index c91de0b8..75227c2c 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -1,5 +1,6 @@ #![cfg(feature = "openssl")] -use std::{io, sync::atomic::AtomicUsize, sync::atomic::Ordering, sync::Arc}; +use std::io; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex}; use futures_util::stream::{once, Stream, StreamExt}; use tls_openssl::ssl::{AlpnError, SslAcceptor, SslFiletype, SslMethod}; @@ -456,7 +457,7 @@ async fn test_h2_client_drop() -> io::Result<()> { let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(150)).await; + sleep(Millis(250)).await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) } @@ -539,13 +540,19 @@ async fn test_ws_transport() { async fn test_h2_graceful_shutdown() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build() .h2(move |_| { let count = count.clone(); count.fetch_add(1, Ordering::Relaxed); + if count.load(Ordering::Relaxed) == 2 { + let _ = tx.lock().unwrap().take().unwrap().send(()); + } async move { sleep(Millis(1000)).await; count.fetch_sub(1, Ordering::Relaxed); @@ -566,7 +573,7 @@ async fn test_h2_graceful_shutdown() -> io::Result<()> { let _ = req.send().await.unwrap(); sleep(Millis(100000)).await; }); - sleep(Millis(150)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 2); let (tx, rx) = oneshot::channel(); @@ -574,8 +581,6 @@ async fn test_h2_graceful_shutdown() -> io::Result<()> { srv.stop().await; let _ = tx.send(()); }); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 0); diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index cea9e667..a4c1d05f 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -1,4 +1,4 @@ -use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex}; use std::{io, io::Read, io::Write, net}; use futures_util::future::{self, FutureExt}; @@ -761,12 +761,18 @@ async fn test_h1_client_drop() -> io::Result<()> { async fn test_h1_gracefull_shutdown() { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build().h1(move |_: Request| { let count = count.clone(); count.fetch_add(1, Ordering::Relaxed); + if count.load(Ordering::Relaxed) == 2 { + let _ = tx.lock().unwrap().take().unwrap().send(()); + } async move { sleep(Millis(1000)).await; count.fetch_sub(1, Ordering::Relaxed); @@ -781,7 +787,7 @@ async fn test_h1_gracefull_shutdown() { let mut stream2 = net::TcpStream::connect(srv.addr()).unwrap(); let _ = stream2.write_all(b"GET /index.html HTTP/1.1\r\n\r\n"); - sleep(Millis(150)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 2); let (tx, rx) = oneshot::channel(); @@ -789,8 +795,6 @@ async fn test_h1_gracefull_shutdown() { srv.stop().await; let _ = tx.send(()); }); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 0); @@ -800,12 +804,18 @@ async fn test_h1_gracefull_shutdown() { async fn test_h1_gracefull_shutdown_2() { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build().finish(move |_: Request| { let count = count.clone(); count.fetch_add(1, Ordering::Relaxed); + if count.load(Ordering::Relaxed) == 2 { + let _ = tx.lock().unwrap().take().unwrap().send(()); + } async move { sleep(Millis(1000)).await; count.fetch_sub(1, Ordering::Relaxed); @@ -820,17 +830,14 @@ async fn test_h1_gracefull_shutdown_2() { let mut stream2 = net::TcpStream::connect(srv.addr()).unwrap(); let _ = stream2.write_all(b"GET /index.html HTTP/1.1\r\n\r\n"); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); + let _ = rx.await; + assert_eq!(count.load(Ordering::Acquire), 2); let (tx, rx) = oneshot::channel(); rt::spawn(async move { srv.stop().await; let _ = tx.send(()); }); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); - let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 0); } From 728ab919a35c7873b2e2d9446a38c6fbc4f15f66 Mon Sep 17 00:00:00 2001 From: Ruangyot Nanchiang Date: Fri, 28 Mar 2025 04:12:34 +0700 Subject: [PATCH 12/20] Expose WebStack for external wrapper support in downstream crates (#542) * add public ServiceConfig::register constructor to support external configuration (#250) * fix: doctest ServiceConfig::register() error (#250) * add unit testing for ServiceConfig::register() * replace pub(crate) to pub in ServiceConfig::new() (#250) * replace pub to pub(crate) for ServiceConfig::new() and add pub for mod ntex::web::stack instead * remove unsed DefaultError import in config.rs tests --------- Co-authored-by: RuangyotN --- ntex/src/web/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ntex/src/web/mod.rs b/ntex/src/web/mod.rs index 6c3d37b1..8d9adf4d 100644 --- a/ntex/src/web/mod.rs +++ b/ntex/src/web/mod.rs @@ -82,7 +82,7 @@ mod route; mod scope; mod server; mod service; -mod stack; +pub mod stack; pub mod test; pub mod types; mod util; From f647ad2eac60b4c9b70461498a3a881a4ecd72c6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 27 Mar 2025 22:16:51 +0100 Subject: [PATCH 13/20] Update tests (#544) --- ntex-io/Cargo.toml | 1 - ntex-macros/Cargo.toml | 1 - ntex-net/Cargo.toml | 3 +-- ntex-rt/Cargo.toml | 3 --- ntex/src/http/test.rs | 1 + 5 files changed, 2 insertions(+), 7 deletions(-) diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 6a1e881d..f55aa5d0 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -28,4 +28,3 @@ pin-project-lite = "0.2" [dev-dependencies] ntex = "2" rand = "0.8" -env_logger = "0.11" diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml index f6cad0e2..a5bcf67d 100644 --- a/ntex-macros/Cargo.toml +++ b/ntex-macros/Cargo.toml @@ -18,4 +18,3 @@ proc-macro2 = "^1" [dev-dependencies] ntex = "2" futures = "0.3" -env_logger = "0.11" diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 8e7335c6..c800c00b 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.11", optional = true } +ntex-neon = { version = "0.1.13", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } @@ -57,4 +57,3 @@ polling = { workspace = true, optional = true } [dev-dependencies] ntex = "2" -env_logger = "0.11" diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index e133ceb4..2b5aa5d0 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -43,6 +43,3 @@ tok-io = { version = "1", package = "tokio", default-features = false, features ], optional = true } ntex-neon = { version = "0.1.11", optional = true } - -[dev-dependencies] -env_logger = "0.11" diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index c9ecdb0b..0e4a6559 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -252,6 +252,7 @@ where Ok(()) }) }); + thread::sleep(std::time::Duration::from_millis(150)); let (system, server, addr) = rx.recv().unwrap(); From 8f2d5056c9a8aab11217273b8b51fb55522ef9df Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 02:10:25 +0100 Subject: [PATCH 14/20] Return PayloadError::Incomplete on server disconnect (#545) --- ntex-io/src/tasks.rs | 24 +++------- ntex-rt/src/lib.rs | 8 +++- ntex/CHANGES.md | 6 +++ ntex/Cargo.toml | 2 +- ntex/src/http/client/h1proto.rs | 82 ++++++++++++++++++-------------- ntex/src/http/client/response.rs | 5 +- 6 files changed, 70 insertions(+), 57 deletions(-) diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 4a04196b..55f99416 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -537,7 +537,9 @@ impl IoContext { self.0.tag(), nbytes ); - inner.dispatch_task.wake(); + if !inner.dispatch_task.wake_checked() { + log::error!("Dispatcher waker is not registered"); + } } else { if nbytes >= hw { // read task is paused because of read back-pressure @@ -735,22 +737,6 @@ impl IoContext { false } - pub fn is_write_ready(&self) -> bool { - if let Some(waker) = self.0 .0.write_task.take() { - let ready = self - .0 - .filter() - .poll_write_ready(&mut Context::from_waker(&waker)); - if !matches!( - ready, - Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) - ) { - return true; - } - } - false - } - pub fn with_read_buf(&self, f: F) -> Poll<()> where F: FnOnce(&mut BytesVec) -> Poll>, @@ -803,7 +789,9 @@ impl IoContext { self.0.tag(), nbytes ); - inner.dispatch_task.wake(); + if !inner.dispatch_task.wake_checked() { + log::error!("Dispatcher waker is not registered"); + } } else { if nbytes >= hw { // read task is paused because of read back-pressure diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 1ffd7fe7..d5d85546 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -112,6 +112,8 @@ mod tokio { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> tok_io::task::JoinHandle where F: FnOnce() -> R + 'static, @@ -196,6 +198,8 @@ mod compio { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> JoinHandle where F: FnOnce() -> R + 'static, @@ -323,6 +327,8 @@ mod neon { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> Task where F: FnOnce() -> R + 'static, @@ -377,7 +383,7 @@ mod neon { impl JoinHandle { pub fn is_finished(&self) -> bool { - false + self.fut.is_none() } } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index cb75aabc..6ef4b5ef 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.12.4] - 2025-03-28 + +* http: Return PayloadError::Incomplete on server disconnect + +* web: Expose WebStack for external wrapper support in downstream crates #542 + ## [2.12.3] - 2025-03-22 * web: Export web::app_service::AppService #534 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 09655c7a..da30a2ba 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "2.12.3" +version = "2.12.4" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" diff --git a/ntex/src/http/client/h1proto.rs b/ntex/src/http/client/h1proto.rs index 06572418..28871225 100644 --- a/ntex/src/http/client/h1proto.rs +++ b/ntex/src/http/client/h1proto.rs @@ -1,13 +1,11 @@ -use std::{ - future::poll_fn, io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant, -}; +use std::{future::poll_fn, io, io::Write, pin::Pin, task, task::Poll, time::Instant}; use crate::http::body::{BodySize, MessageBody}; use crate::http::error::PayloadError; -use crate::http::h1; use crate::http::header::{HeaderMap, HeaderValue, HOST}; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::payload::{Payload, PayloadStream}; +use crate::http::{h1, Version}; use crate::io::{IoBoxed, RecvError}; use crate::time::{timeout_checked, Millis}; use crate::util::{ready, BufMut, Bytes, BytesMut, Stream}; @@ -101,7 +99,13 @@ where Ok((head, Payload::None)) } _ => { - let pl: PayloadStream = Box::pin(PlStream::new(io, codec, created, pool)); + let pl: PayloadStream = Box::pin(PlStream::new( + io, + codec, + created, + pool, + head.version == Version::HTTP_10, + )); Ok((head, pl.into())) } } @@ -137,6 +141,7 @@ pub(super) struct PlStream { io: Option, codec: h1::ClientPayloadCodec, created: Instant, + http_10: bool, pool: Option, } @@ -146,12 +151,14 @@ impl PlStream { codec: h1::ClientCodec, created: Instant, pool: Option, + http_10: bool, ) -> Self { PlStream { io: Some(io), codec: codec.into_payload_codec(), created, pool, + http_10, } } } @@ -161,41 +168,46 @@ impl Stream for PlStream { fn poll_next( mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + cx: &mut task::Context<'_>, ) -> Poll> { let mut this = self.as_mut(); loop { - return Poll::Ready(Some( - match ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)) { - Ok(chunk) => { - if let Some(chunk) = chunk { - Ok(chunk) - } else { - release_connection( - this.io.take().unwrap(), - !this.codec.keepalive(), - this.created, - this.pool.take(), - ); - return Poll::Ready(None); - } + let item = ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)); + return Poll::Ready(Some(match item { + Ok(chunk) => { + if let Some(chunk) = chunk { + Ok(chunk) + } else { + release_connection( + this.io.take().unwrap(), + !this.codec.keepalive(), + this.created, + this.pool.take(), + ); + return Poll::Ready(None); } - Err(RecvError::KeepAlive) => { - Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into()) + } + Err(RecvError::KeepAlive) => { + Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into()) + } + Err(RecvError::Stop) => { + Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped").into()) + } + Err(RecvError::WriteBackpressure) => { + ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?; + continue; + } + Err(RecvError::Decoder(err)) => Err(err), + Err(RecvError::PeerGone(Some(err))) => { + Err(PayloadError::Incomplete(Some(err))) + } + Err(RecvError::PeerGone(None)) => { + if this.http_10 { + return Poll::Ready(None); } - Err(RecvError::Stop) => { - Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped") - .into()) - } - Err(RecvError::WriteBackpressure) => { - ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?; - continue; - } - Err(RecvError::Decoder(err)) => Err(err), - Err(RecvError::PeerGone(Some(err))) => Err(err.into()), - Err(RecvError::PeerGone(None)) => return Poll::Ready(None), - }, - )); + Err(PayloadError::Incomplete(None)) + } + })); } } } diff --git a/ntex/src/http/client/response.rs b/ntex/src/http/client/response.rs index c68b6e73..9a450687 100644 --- a/ntex/src/http/client/response.rs +++ b/ntex/src/http/client/response.rs @@ -387,8 +387,8 @@ impl Future for ReadBody { let this = self.get_mut(); loop { - return match Pin::new(&mut this.stream).poll_next(cx)? { - Poll::Ready(Some(chunk)) => { + return match Pin::new(&mut this.stream).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit { Poll::Ready(Err(PayloadError::Overflow)) } else { @@ -397,6 +397,7 @@ impl Future for ReadBody { } } Poll::Ready(None) => Poll::Ready(Ok(this.buf.split().freeze())), + Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)), Poll::Pending => { if this.timeout.poll_elapsed(cx).is_ready() { Poll::Ready(Err(PayloadError::Incomplete(Some( From e9a12841511609b105058a984546f41bfd55f681 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 08:51:44 +0100 Subject: [PATCH 15/20] Better worker availability handling (#546) --- Cargo.toml | 6 ++- ntex-net/Cargo.toml | 2 +- ntex-rt/Cargo.toml | 2 +- ntex-server/CHANGES.md | 4 ++ ntex-server/Cargo.toml | 16 +++---- ntex-server/src/manager.rs | 11 ++--- ntex-server/src/net/accept.rs | 8 +--- ntex-server/src/wrk.rs | 83 ++++++++++++++++++++++++----------- ntex/Cargo.toml | 2 +- 9 files changed, 86 insertions(+), 48 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 871d9de2..d9e97ef4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,10 @@ ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } [workspace.dependencies] +async-channel = "2" async-task = "4.5.0" +atomic-waker = "1.1" +core_affinity = "0.8" bitflags = "2" cfg_aliases = "0.2.1" cfg-if = "1.0.0" @@ -57,7 +60,8 @@ fxhash = "0.2" libc = "0.2.164" log = "0.4" io-uring = "0.7.4" -polling = "3.3.0" +oneshot = "0.1" +polling = "3.7.4" nohash-hasher = "0.2.0" scoped-tls = "1.0.1" slab = "0.4.9" diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index c800c00b..46cf5cc4 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.13", optional = true } +ntex-neon = { version = "0.1.14", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 2b5aa5d0..a5966d76 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -42,4 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } -ntex-neon = { version = "0.1.11", optional = true } +ntex-neon = { version = "0.1.14", optional = true } diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index 0d8dabc5..546a92ff 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.7.3] - 2025-03-28 + +* Better worker availability handling + ## [2.7.2] - 2025-03-27 * Handle paused state diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index bca5f8b2..dcfa8332 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.2" +version = "2.7.3" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -22,13 +22,13 @@ ntex-service = "3.4" ntex-rt = "0.4" ntex-util = "2.8" -async-channel = "2" -async-broadcast = "0.7" -core_affinity = "0.8" -polling = "3.3" -log = "0.4" -socket2 = "0.5" -oneshot = { version = "0.1", default-features = false, features = ["async"] } +async-channel = { workspace = true } +atomic-waker = { workspace = true } +core_affinity = { workspace = true } +oneshot = { workspace = true } +polling = { workspace = true } +log = { workspace = true } +socket2 = { workspace = true } [dev-dependencies] ntex = "2" diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs index ca558a54..f0719750 100644 --- a/ntex-server/src/manager.rs +++ b/ntex-server/src/manager.rs @@ -55,7 +55,7 @@ impl ServerManager { let no_signals = cfg.no_signals; let shared = Arc::new(ServerShared { - paused: AtomicBool::new(false), + paused: AtomicBool::new(true), }); let mgr = ServerManager(Rc::new(Inner { cfg, @@ -139,7 +139,6 @@ impl ServerManager { fn start_worker(mgr: ServerManager, cid: Option) { let _ = ntex_rt::spawn(async move { let id = mgr.next_id(); - let mut wrk = Worker::start(id, mgr.factory(), cid); loop { @@ -212,10 +211,9 @@ impl HandleCmdState { match upd { Update::Available(worker) => { self.workers.push(worker); - if !self.workers.is_empty() { + self.workers.sort(); + if self.workers.len() == 1 { self.mgr.resume(); - } else { - self.workers.sort(); } } Update::Unavailable(worker) => { @@ -234,6 +232,9 @@ impl HandleCmdState { if let Err(item) = self.workers[0].send(item) { self.backlog.push_back(item); self.workers.remove(0); + if self.workers.is_empty() { + self.mgr.pause(); + } break; } } diff --git a/ntex-server/src/net/accept.rs b/ntex-server/src/net/accept.rs index 31793d82..7694d286 100644 --- a/ntex-server/src/net/accept.rs +++ b/ntex-server/src/net/accept.rs @@ -203,14 +203,10 @@ impl Accept { let mut timeout = Some(Duration::ZERO); loop { if let Err(e) = self.poller.wait(&mut events, timeout) { - if e.kind() == io::ErrorKind::Interrupted { - continue; - } else { + if e.kind() != io::ErrorKind::Interrupted { panic!("Cannot wait for events in poller: {}", e) } - } - - if timeout.is_some() { + } else if timeout.is_some() { timeout = None; let _ = self.tx.take().unwrap().send(()); } diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index b9092d0f..b791817d 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -2,8 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{ready, Context, Poll}; use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc}; -use async_broadcast::{self as bus, broadcast}; use async_channel::{unbounded, Receiver, Sender}; +use atomic_waker::AtomicWaker; use core_affinity::CoreId; use ntex_rt::{spawn, Arbiter}; @@ -151,10 +151,8 @@ impl Worker { if self.failed.load(Ordering::Acquire) { WorkerStatus::Failed } else { - // cleanup updates - while self.avail.notify.try_recv().is_ok() {} - - if self.avail.notify.recv_direct().await.is_err() { + self.avail.wait_for_update().await; + if self.avail.failed() { self.failed.store(true, Ordering::Release); } self.status() @@ -196,46 +194,79 @@ impl Future for WorkerStop { #[derive(Debug, Clone)] struct WorkerAvailability { - notify: bus::Receiver<()>, - available: Arc, + inner: Arc, } #[derive(Debug, Clone)] struct WorkerAvailabilityTx { - notify: bus::Sender<()>, - available: Arc, + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + waker: AtomicWaker, + updated: AtomicBool, + available: AtomicBool, + failed: AtomicBool, } impl WorkerAvailability { fn create() -> (Self, WorkerAvailabilityTx) { - let (mut tx, rx) = broadcast(16); - tx.set_overflow(true); + let inner = Arc::new(Inner { + waker: AtomicWaker::new(), + updated: AtomicBool::new(false), + available: AtomicBool::new(false), + failed: AtomicBool::new(false), + }); let avail = WorkerAvailability { - notify: rx, - available: Arc::new(AtomicBool::new(false)), - }; - let avail_tx = WorkerAvailabilityTx { - notify: tx, - available: avail.available.clone(), + inner: inner.clone(), }; + let avail_tx = WorkerAvailabilityTx { inner }; (avail, avail_tx) } + fn failed(&self) -> bool { + self.inner.failed.load(Ordering::Acquire) + } + fn available(&self) -> bool { - self.available.load(Ordering::Acquire) + self.inner.available.load(Ordering::Acquire) + } + + async fn wait_for_update(&self) { + poll_fn(|cx| { + if self.inner.updated.load(Ordering::Acquire) { + self.inner.updated.store(false, Ordering::Release); + Poll::Ready(()) + } else { + self.inner.waker.register(cx.waker()); + Poll::Pending + } + }) + .await; } } impl WorkerAvailabilityTx { fn set(&self, val: bool) { - let old = self.available.swap(val, Ordering::Release); - if !old && val { - let _ = self.notify.try_broadcast(()); + let old = self.inner.available.swap(val, Ordering::Release); + if old != val { + self.inner.updated.store(true, Ordering::Release); + self.inner.waker.wake(); } } } +impl Drop for WorkerAvailabilityTx { + fn drop(&mut self) { + self.inner.failed.store(true, Ordering::Release); + self.inner.updated.store(true, Ordering::Release); + self.inner.available.store(false, Ordering::Release); + self.inner.waker.wake(); + } +} + /// Service worker /// /// Worker accepts message via unbounded channel and starts processing. @@ -256,10 +287,13 @@ where let mut recv = std::pin::pin!(wrk.rx.recv()); let fut = poll_fn(|cx| { match svc.poll_ready(cx) { - Poll::Ready(res) => { - res?; + Poll::Ready(Ok(())) => { wrk.availability.set(true); } + Poll::Ready(Err(err)) => { + wrk.availability.set(false); + return Poll::Ready(Err(err)); + } Poll::Pending => { wrk.availability.set(false); return Poll::Pending; @@ -287,7 +321,6 @@ where let _ = ntex_rt::spawn(async move { svc.shutdown().await; }); - wrk.availability.set(false); } Either::Right(Some(Shutdown { timeout, result })) => { wrk.availability.set(false); @@ -302,6 +335,7 @@ where return; } Either::Left(Ok(false)) | Either::Right(None) => { + wrk.availability.set(false); stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; return; } @@ -311,7 +345,6 @@ where loop { match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await { Either::Left(Ok(service)) => { - wrk.availability.set(true); svc = Pipeline::new(service).bind(); break; } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index da30a2ba..1a947b47 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -68,7 +68,7 @@ ntex-service = "3.4" ntex-macros = "0.1" ntex-util = "2.8" ntex-bytes = "0.1.27" -ntex-server = "2.7" +ntex-server = "2.7.3" ntex-h2 = "1.8.6" ntex-rt = "0.4.27" ntex-io = "2.11" From 30928d019ce044b18fe5624b855fbbcee6cd02ae Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 09:11:59 +0100 Subject: [PATCH 16/20] Improve tests (#547) --- ntex/tests/http_openssl.rs | 11 ++++++++--- ntex/tests/http_server.rs | 11 ++++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index 75227c2c..e46765af 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -425,11 +425,12 @@ async fn test_h2_service_error() { assert_eq!(bytes, Bytes::from_static(b"error")); } -struct SetOnDrop(Arc); +struct SetOnDrop(Arc, Arc>>>); impl Drop for SetOnDrop { fn drop(&mut self) { self.0.fetch_add(1, Ordering::Relaxed); + let _ = self.1.lock().unwrap().take().unwrap().send(()); } } @@ -437,14 +438,18 @@ impl Drop for SetOnDrop { async fn test_h2_client_drop() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build() .h2(move |req: Request| { + let tx = tx.clone(); let count = count.clone(); async move { - let _st = SetOnDrop(count); + let _st = SetOnDrop(count, tx); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_2); sleep(Seconds(100)).await; @@ -457,7 +462,7 @@ async fn test_h2_client_drop() -> io::Result<()> { let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(250)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) } diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index a4c1d05f..64ab4ede 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -723,11 +723,12 @@ async fn test_h1_service_error() { assert_eq!(bytes, Bytes::from_static(b"error")); } -struct SetOnDrop(Arc); +struct SetOnDrop(Arc, Option<::oneshot::Sender<()>>); impl Drop for SetOnDrop { fn drop(&mut self) { self.0.fetch_add(1, Ordering::Relaxed); + let _ = self.1.take().unwrap().send(()); } } @@ -735,13 +736,17 @@ impl Drop for SetOnDrop { async fn test_h1_client_drop() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build().h1(move |req: Request| { + let tx = tx.clone(); let count = count.clone(); async move { - let _st = SetOnDrop(count); + let _st = SetOnDrop(count, tx.lock().unwrap().take()); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_11); sleep(Millis(500)).await; @@ -752,7 +757,7 @@ async fn test_h1_client_drop() -> io::Result<()> { let result = timeout(Millis(100), srv.request(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(1000)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) } From f6fe9c3e10d7eb182a52b8d6c6aebef4ee8c7910 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 10:07:10 +0100 Subject: [PATCH 17/20] Improve tests (#548) --- ntex/tests/http_openssl.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index e46765af..2d2c98f9 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -446,13 +446,12 @@ async fn test_h2_client_drop() -> io::Result<()> { let count = count2.clone(); HttpService::build() .h2(move |req: Request| { - let tx = tx.clone(); - let count = count.clone(); + let st = SetOnDrop(count.clone(), tx.clone()); async move { - let _st = SetOnDrop(count, tx); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_2); - sleep(Seconds(100)).await; + sleep(Seconds(30)).await; + drop(st); Ok::<_, io::Error>(Response::Ok().finish()) } }) @@ -460,9 +459,9 @@ async fn test_h2_client_drop() -> io::Result<()> { .map_err(|_| ()) }); - let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; + let result = timeout(Millis(150), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); - let _ = rx.await; + let _ = timeout(Millis(1500), rx).await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) } From e4f24ee41f2fec056c782c63ba2e3bea1f041a1b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 11:39:24 +0100 Subject: [PATCH 18/20] Handle flaky tests --- ntex/tests/http_openssl.rs | 2 +- ntex/tests/http_server.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index 2d2c98f9..921310a8 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -459,7 +459,7 @@ async fn test_h2_client_drop() -> io::Result<()> { .map_err(|_| ()) }); - let result = timeout(Millis(150), srv.srequest(Method::GET, "/").send()).await; + let result = timeout(Millis(1500), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); let _ = timeout(Millis(1500), rx).await; assert_eq!(count.load(Ordering::Relaxed), 1); diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index 64ab4ede..0227573b 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -749,13 +749,13 @@ async fn test_h1_client_drop() -> io::Result<()> { let _st = SetOnDrop(count, tx.lock().unwrap().take()); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_11); - sleep(Millis(500)).await; + sleep(Millis(50000)).await; Ok::<_, io::Error>(Response::Ok().finish()) } }) }); - let result = timeout(Millis(100), srv.request(Method::GET, "/").send()).await; + let result = timeout(Millis(1500), srv.request(Method::GET, "/").send()).await; assert!(result.is_err()); let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 1); From f5ee55d598810f56379f789b2fbcd163b8730f03 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 21:06:11 +0100 Subject: [PATCH 19/20] Handle socket close for poll driver (#549) --- ntex-net/CHANGES.md | 4 ++ ntex-net/Cargo.toml | 2 +- ntex-net/src/rt_polling/driver.rs | 61 ++++++++++++++----------------- ntex-server/Cargo.toml | 2 +- ntex-server/src/manager.rs | 2 +- 5 files changed, 35 insertions(+), 36 deletions(-) diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index a16145fc..e60744ef 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.10] - 2025-03-28 + +* Better closed sockets handling + ## [2.5.9] - 2025-03-27 * Handle closed sockets diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 46cf5cc4..0142a911 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.14", optional = true } +ntex-neon = { version = "0.1.15", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index c179a77a..24db553d 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task, task::Poll}; +use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; @@ -18,7 +18,6 @@ bitflags::bitflags! { struct Flags: u8 { const RD = 0b0000_0001; const WR = 0b0000_0010; - const CLOSED = 0b0000_0100; } } @@ -106,18 +105,15 @@ impl Handler for StreamOpsHandler { return; } let item = &mut streams[id]; - log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - - if item.flags.contains(Flags::CLOSED) { + if item.io.is_none() { return; } + log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); // handle HUP if ev.is_interrupt() { item.context.stopped(None); - if item.io.take().is_some() { - close(id as u32, item, &self.inner.api); - } + close(id as u32, item, &self.inner.api, None, true); return; } @@ -177,9 +173,7 @@ impl Handler for StreamOpsHandler { item.fd, item.io.is_some() ); - if item.io.is_some() { - close(id, &mut item, &self.inner.api); - } + close(id, &mut item, &self.inner.api, None, true); } } self.inner.delayd_drop.set(false); @@ -197,10 +191,7 @@ impl Handler for StreamOpsHandler { item.fd, err ); - item.context.stopped(Some(err)); - if item.io.take().is_some() { - close(id as u32, item, &self.inner.api); - } + close(id as u32, item, &self.inner.api, Some(err), false); } }) } @@ -222,14 +213,26 @@ fn close( id: u32, item: &mut StreamItem, api: &DriverApi, -) -> ntex_rt::JoinHandle> { - let fd = item.fd; - item.flags.insert(Flags::CLOSED); - api.detach(fd, id); - ntex_rt::spawn_blocking(move || { - syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; - syscall!(libc::close(fd)) - }) + error: Option, + shutdown: bool, +) -> Option>> { + if let Some(io) = item.io.take() { + log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); + mem::forget(io); + if let Some(err) = error { + item.context.stopped(Some(err)); + } + let fd = item.fd; + api.detach(fd, id); + Some(ntex_rt::spawn_blocking(move || { + if shutdown { + let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR)); + } + syscall!(libc::close(fd)) + })) + } else { + None + } } impl StreamCtl { @@ -237,13 +240,7 @@ impl StreamCtl { let id = self.id as usize; let fut = self.inner.with(|streams| { let item = &mut streams[id]; - if let Some(io) = item.io.take() { - log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); - std::mem::forget(io); - Some(close(self.id, item, &self.inner.api)) - } else { - None - } + close(self.id, item, &self.inner.api, None, false) }); async move { if let Some(fut) = fut { @@ -360,9 +357,7 @@ impl Drop for StreamCtl { item.fd, item.io.is_some() ); - if item.io.is_some() { - close(self.id, &mut item, &self.inner.api); - } + close(self.id, &mut item, &self.inner.api, None, true); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index dcfa8332..a88be635 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.3" +version = "2.7.4" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs index f0719750..9d0bfe8d 100644 --- a/ntex-server/src/manager.rs +++ b/ntex-server/src/manager.rs @@ -180,7 +180,7 @@ impl HandleCmdState { fn process(&mut self, mut item: F::Item) { loop { if !self.workers.is_empty() { - if self.next > self.workers.len() { + if self.next >= self.workers.len() { self.next = self.workers.len() - 1; } match self.workers[self.next].send(item) { From 01d3a2440b074e0f77a089bc4d5e1496b4acdf76 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 21:26:07 +0100 Subject: [PATCH 20/20] Prepare net release (#550) --- ntex-net/Cargo.toml | 2 +- ntex/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 0142a911..5a72d3eb 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.9" +version = "2.5.10" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 1a947b47..0ea37469 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -68,11 +68,11 @@ ntex-service = "3.4" ntex-macros = "0.1" ntex-util = "2.8" ntex-bytes = "0.1.27" -ntex-server = "2.7.3" +ntex-server = "2.7.4" ntex-h2 = "1.8.6" ntex-rt = "0.4.27" ntex-io = "2.11" -ntex-net = "2.5.8" +ntex-net = "2.5.10" ntex-tls = "2.3" base64 = "0.22"