From b2915f48681b9a42630047ee20b80871fb2fd0f4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 27 Mar 2025 20:45:43 +0100 Subject: [PATCH] Improve tests (#543) --- ntex-net/CHANGES.md | 4 +++ ntex-net/Cargo.toml | 2 +- ntex-net/src/rt_polling/driver.rs | 60 +++++++++++++++++++------------ ntex-net/src/rt_uring/driver.rs | 26 ++++++-------- ntex-server/CHANGES.md | 4 +++ ntex-server/Cargo.toml | 2 +- ntex-server/src/manager.rs | 4 +-- ntex-server/src/net/accept.rs | 28 +++++++++++---- ntex-server/src/wrk.rs | 44 +++++++++++++++-------- ntex/Cargo.toml | 1 + ntex/tests/http_awc_client.rs | 22 ++++++------ ntex/tests/http_openssl.rs | 15 +++++--- ntex/tests/http_server.rs | 25 ++++++++----- 13 files changed, 151 insertions(+), 86 deletions(-) diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 800ff027..a16145fc 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.9] - 2025-03-27 + +* Handle closed sockets + ## [2.5.8] - 2025-03-25 * Update neon runtime diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 71791e06..8e7335c6 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.8" +version = "2.5.9" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index eaa6d589..c179a77a 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -16,8 +16,9 @@ pub(crate) struct StreamCtl { bitflags::bitflags! { #[derive(Copy, Clone, Debug)] struct Flags: u8 { - const RD = 0b0000_0001; - const WR = 0b0000_0010; + const RD = 0b0000_0001; + const WR = 0b0000_0010; + const CLOSED = 0b0000_0100; } } @@ -100,19 +101,22 @@ impl Clone for StreamOps { impl Handler for StreamOpsHandler { fn event(&mut self, id: usize, ev: Event) { - log::debug!("FD event {:?} event: {:?}", id, ev); - self.inner.with(|streams| { if !streams.contains(id) { return; } let item = &mut streams[id]; + log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); + + if item.flags.contains(Flags::CLOSED) { + return; + } // handle HUP if ev.is_interrupt() { item.context.stopped(None); if item.io.take().is_some() { - close(id as u32, item.fd, &self.inner.api); + close(id as u32, item, &self.inner.api); } return; } @@ -165,7 +169,7 @@ impl Handler for StreamOpsHandler { let item = &mut streams[id as usize]; item.ref_count -= 1; if item.ref_count == 0 { - let item = streams.remove(id as usize); + let mut item = streams.remove(id as usize); log::debug!( "{}: Drop ({}), {:?}, has-io: {}", item.tag(), @@ -174,7 +178,7 @@ impl Handler for StreamOpsHandler { item.io.is_some() ); if item.io.is_some() { - close(id, item.fd, &self.inner.api); + close(id, &mut item, &self.inner.api); } } } @@ -186,10 +190,16 @@ impl Handler for StreamOpsHandler { fn error(&mut self, id: usize, err: io::Error) { self.inner.with(|streams| { if let Some(item) = streams.get_mut(id) { - log::debug!("FD is failed ({}) {:?}, err: {:?}", id, item.fd, err); + log::debug!( + "{}: FD is failed ({}) {:?}, err: {:?}", + item.tag(), + id, + item.fd, + err + ); item.context.stopped(Some(err)); if item.io.take().is_some() { - close(id as u32, item.fd, &self.inner.api); + close(id as u32, item, &self.inner.api); } } }) @@ -208,7 +218,13 @@ impl StreamOpsInner { } } -fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { +fn close( + id: u32, + item: &mut StreamItem, + api: &DriverApi, +) -> ntex_rt::JoinHandle> { + let fd = item.fd; + item.flags.insert(Flags::CLOSED); api.detach(fd, id); ntex_rt::spawn_blocking(move || { syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; @@ -219,16 +235,16 @@ fn close(id: u32, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle StreamCtl { pub(crate) fn close(self) -> impl Future> { let id = self.id as usize; - let (io, fd) = self - .inner - .with(|streams| (streams[id].io.take(), streams[id].fd)); - let fut = if let Some(io) = io { - log::debug!("Closing ({}), {:?}", id, fd); - std::mem::forget(io); - Some(close(self.id, fd, &self.inner.api)) - } else { - None - }; + let fut = self.inner.with(|streams| { + let item = &mut streams[id]; + if let Some(io) = item.io.take() { + log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); + std::mem::forget(io); + Some(close(self.id, item, &self.inner.api)) + } else { + None + } + }); async move { if let Some(fut) = fut { fut.await @@ -336,7 +352,7 @@ impl Drop for StreamCtl { let id = self.id as usize; streams[id].ref_count -= 1; if streams[id].ref_count == 0 { - let item = streams.remove(id); + let mut item = streams.remove(id); log::debug!( "{}: Drop io ({}), {:?}, has-io: {}", item.tag(), @@ -345,7 +361,7 @@ impl Drop for StreamCtl { item.io.is_some() ); if item.io.is_some() { - close(self.id, item.fd, &self.inner.api); + close(self.id, &mut item, &self.inner.api); } } self.inner.streams.set(Some(streams)); diff --git a/ntex-net/src/rt_uring/driver.rs b/ntex-net/src/rt_uring/driver.rs index f2a88d11..d39d69e8 100644 --- a/ntex-net/src/rt_uring/driver.rs +++ b/ntex-net/src/rt_uring/driver.rs @@ -33,6 +33,12 @@ struct StreamItem { wr_op: Option, } +impl StreamItem { + fn tag(&self) -> &'static str { + self.context.tag() + } +} + enum Operation { Recv { id: usize, @@ -249,7 +255,7 @@ impl Handler for StreamOpsHandler { if storage.streams[id].ref_count == 0 { let mut item = storage.streams.remove(id); - log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd); + log::debug!("{}: Drop io ({}), {:?}", item.tag(), id, item.fd); if let Some(io) = item.io.take() { mem::forget(io); @@ -273,7 +279,7 @@ impl StreamOpsStorage { if let Poll::Ready(mut buf) = item.context.get_read_buf() { log::debug!( "{}: Recv resume ({}), {:?} rem: {:?}", - item.context.tag(), + item.tag(), id, item.fd, buf.remaining_mut() @@ -306,7 +312,7 @@ impl StreamOpsStorage { if let Poll::Ready(buf) = item.context.get_write_buf() { log::debug!( "{}: Send resume ({}), {:?} len: {:?}", - item.context.tag(), + item.tag(), id, item.fd, buf.len() @@ -396,12 +402,7 @@ impl StreamCtl { if let Some(rd_op) = item.rd_op { if !item.flags.contains(Flags::RD_CANCELING) { - log::debug!( - "{}: Recv to pause ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); + log::debug!("{}: Recv to pause ({}), {:?}", item.tag(), self.id, item.fd); item.flags.insert(Flags::RD_CANCELING); self.inner.api.cancel(rd_op.get()); } @@ -426,12 +427,7 @@ impl Drop for StreamCtl { if storage.streams[self.id].ref_count == 0 { let mut item = storage.streams.remove(self.id); if let Some(io) = item.io.take() { - log::debug!( - "{}: Close io ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); + log::debug!("{}: Close io ({}), {:?}", item.tag(), self.id, item.fd); mem::forget(io); let id = storage.ops.insert(Operation::Close { tx: None }); diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index 7f1d8302..0d8dabc5 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.7.2] - 2025-03-27 + +* Handle paused state + ## [2.7.1] - 2025-02-28 * Fix set core affinity out of worker start #508 diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index ed962fc0..bca5f8b2 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.1" +version = "2.7.2" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs index f0be9c40..ca558a54 100644 --- a/ntex-server/src/manager.rs +++ b/ntex-server/src/manager.rs @@ -55,7 +55,7 @@ impl ServerManager { let no_signals = cfg.no_signals; let shared = Arc::new(ServerShared { - paused: AtomicBool::new(true), + paused: AtomicBool::new(false), }); let mgr = ServerManager(Rc::new(Inner { cfg, @@ -212,7 +212,7 @@ impl HandleCmdState { match upd { Update::Available(worker) => { self.workers.push(worker); - if self.workers.len() == 1 { + if !self.workers.is_empty() { self.mgr.resume(); } else { self.workers.sort(); diff --git a/ntex-server/src/net/accept.rs b/ntex-server/src/net/accept.rs index 332fc846..31793d82 100644 --- a/ntex-server/src/net/accept.rs +++ b/ntex-server/src/net/accept.rs @@ -92,12 +92,14 @@ impl AcceptLoop { /// Start accept loop pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) { + let (tx, rx_start) = oneshot::channel(); let (rx, poll) = self .inner .take() .expect("AcceptLoop cannot be used multiple times"); Accept::start( + tx, rx, poll, socks, @@ -105,6 +107,8 @@ impl AcceptLoop { self.notify.clone(), self.status_handler.take(), ); + + let _ = rx_start.recv(); } } @@ -121,6 +125,7 @@ impl fmt::Debug for AcceptLoop { struct Accept { poller: Arc, rx: mpsc::Receiver, + tx: Option>, sockets: Vec, srv: Server, notify: AcceptNotify, @@ -131,6 +136,7 @@ struct Accept { impl Accept { fn start( + tx: oneshot::Sender<()>, rx: mpsc::Receiver, poller: Arc, socks: Vec<(Token, Listener)>, @@ -145,11 +151,12 @@ impl Accept { .name("ntex-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - Accept::new(rx, poller, socks, srv, notify, status_handler).poll() + Accept::new(tx, rx, poller, socks, srv, notify, status_handler).poll() }); } fn new( + tx: oneshot::Sender<()>, rx: mpsc::Receiver, poller: Arc, socks: Vec<(Token, Listener)>, @@ -175,6 +182,7 @@ impl Accept { notify, srv, status_handler, + tx: Some(tx), backpressure: true, backlog: VecDeque::new(), } @@ -192,8 +200,9 @@ impl Accept { // Create storage for events let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap()); + let mut timeout = Some(Duration::ZERO); loop { - if let Err(e) = self.poller.wait(&mut events, None) { + if let Err(e) = self.poller.wait(&mut events, timeout) { if e.kind() == io::ErrorKind::Interrupted { continue; } else { @@ -201,10 +210,17 @@ impl Accept { } } - for event in events.iter() { - let readd = self.accept(event.key); - if readd { - self.add_source(event.key); + if timeout.is_some() { + timeout = None; + let _ = self.tx.take().unwrap().send(()); + } + + for idx in 0..self.sockets.len() { + if self.sockets[idx].registered.get() { + let readd = self.accept(idx); + if readd { + self.add_source(idx); + } } } diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index a61f7731..b9092d0f 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -99,10 +99,10 @@ impl Worker { log::debug!("Creating server instance in {:?}", id); let factory = cfg.create().await; - log::debug!("Server instance has been created in {:?}", id); match create(id, rx1, rx2, factory, avail_tx).await { Ok((svc, wrk)) => { + log::debug!("Server instance has been created in {:?}", id); run_worker(svc, wrk).await; } Err(e) => { @@ -241,7 +241,7 @@ impl WorkerAvailabilityTx { /// Worker accepts message via unbounded channel and starts processing. struct WorkerSt> { id: WorkerId, - rx: Pin>>, + rx: Receiver, stop: Pin>>, factory: F, availability: WorkerAvailabilityTx, @@ -253,20 +253,36 @@ where F: ServiceFactory + 'static, { loop { + let mut recv = std::pin::pin!(wrk.rx.recv()); let fut = poll_fn(|cx| { - ready!(svc.poll_ready(cx)?); - - if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) { - let fut = svc.call(item); - let _ = spawn(async move { - let _ = fut.await; - }); + match svc.poll_ready(cx) { + Poll::Ready(res) => { + res?; + wrk.availability.set(true); + } + Poll::Pending => { + wrk.availability.set(false); + return Poll::Pending; + } + } + + match ready!(recv.as_mut().poll(cx)) { + Ok(item) => { + let fut = svc.call(item); + let _ = spawn(async move { + let _ = fut.await; + }); + Poll::Ready(Ok::<_, F::Error>(true)) + } + Err(_) => { + log::error!("Server is gone"); + Poll::Ready(Ok(false)) + } } - Poll::Ready(Ok::<(), F::Error>(())) }); match select(fut, stream_recv(&mut wrk.stop)).await { - Either::Left(Ok(())) => continue, + Either::Left(Ok(true)) => continue, Either::Left(Err(_)) => { let _ = ntex_rt::spawn(async move { svc.shutdown().await; @@ -285,7 +301,7 @@ where stop_svc(wrk.id, svc, timeout, Some(result)).await; return; } - Either::Right(None) => { + Either::Left(Ok(false)) | Either::Right(None) => { stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; return; } @@ -336,8 +352,6 @@ where { availability.set(false); let factory = factory?; - - let rx = Box::pin(rx); let mut stop = Box::pin(stop); let svc = match select(factory.create(()), stream_recv(&mut stop)).await { @@ -356,9 +370,9 @@ where svc, WorkerSt { id, + rx, factory, availability, - rx: Box::pin(rx), stop: Box::pin(stop), }, )) diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 301d239e..09655c7a 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -114,6 +114,7 @@ flate2 = { version = "1.0", optional = true } [dev-dependencies] rand = "0.8" time = "0.3" +oneshot = "0.1" futures-util = "0.3" tls-openssl = { version = "0.10", package = "openssl" } tls-rustls = { version = "0.23", package = "rustls", features = ["ring", "std"], default-features = false } diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index 00f304b9..bd4c7e0a 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -508,19 +508,21 @@ async fn test_client_gzip_encoding_large() { async fn test_client_gzip_encoding_large_random() { let data = rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) - .take(100_000) + .take(1_048_500) .map(char::from) .collect::(); let srv = test::server(|| { - App::new().service(web::resource("/").route(web::to(|data: Bytes| async move { - let mut e = GzEncoder::new(Vec::new(), Compression::default()); - e.write_all(&data).unwrap(); - let data = e.finish().unwrap(); - HttpResponse::Ok() - .header("content-encoding", "gzip") - .body(data) - }))) + App::new() + .state(web::types::PayloadConfig::default().limit(1_048_576)) + .service(web::resource("/").route(web::to(|data: Bytes| async move { + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + e.write_all(&data).unwrap(); + let data = e.finish().unwrap(); + HttpResponse::Ok() + .header("content-encoding", "gzip") + .body(data) + }))) }); // client request @@ -528,7 +530,7 @@ async fn test_client_gzip_encoding_large_random() { assert!(response.status().is_success()); // read response - let bytes = response.body().await.unwrap(); + let bytes = response.body().limit(1_048_576).await.unwrap(); assert_eq!(bytes, Bytes::from(data)); } diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index c91de0b8..75227c2c 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -1,5 +1,6 @@ #![cfg(feature = "openssl")] -use std::{io, sync::atomic::AtomicUsize, sync::atomic::Ordering, sync::Arc}; +use std::io; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex}; use futures_util::stream::{once, Stream, StreamExt}; use tls_openssl::ssl::{AlpnError, SslAcceptor, SslFiletype, SslMethod}; @@ -456,7 +457,7 @@ async fn test_h2_client_drop() -> io::Result<()> { let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(150)).await; + sleep(Millis(250)).await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) } @@ -539,13 +540,19 @@ async fn test_ws_transport() { async fn test_h2_graceful_shutdown() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build() .h2(move |_| { let count = count.clone(); count.fetch_add(1, Ordering::Relaxed); + if count.load(Ordering::Relaxed) == 2 { + let _ = tx.lock().unwrap().take().unwrap().send(()); + } async move { sleep(Millis(1000)).await; count.fetch_sub(1, Ordering::Relaxed); @@ -566,7 +573,7 @@ async fn test_h2_graceful_shutdown() -> io::Result<()> { let _ = req.send().await.unwrap(); sleep(Millis(100000)).await; }); - sleep(Millis(150)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 2); let (tx, rx) = oneshot::channel(); @@ -574,8 +581,6 @@ async fn test_h2_graceful_shutdown() -> io::Result<()> { srv.stop().await; let _ = tx.send(()); }); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 0); diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index cea9e667..a4c1d05f 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -1,4 +1,4 @@ -use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex}; use std::{io, io::Read, io::Write, net}; use futures_util::future::{self, FutureExt}; @@ -761,12 +761,18 @@ async fn test_h1_client_drop() -> io::Result<()> { async fn test_h1_gracefull_shutdown() { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build().h1(move |_: Request| { let count = count.clone(); count.fetch_add(1, Ordering::Relaxed); + if count.load(Ordering::Relaxed) == 2 { + let _ = tx.lock().unwrap().take().unwrap().send(()); + } async move { sleep(Millis(1000)).await; count.fetch_sub(1, Ordering::Relaxed); @@ -781,7 +787,7 @@ async fn test_h1_gracefull_shutdown() { let mut stream2 = net::TcpStream::connect(srv.addr()).unwrap(); let _ = stream2.write_all(b"GET /index.html HTTP/1.1\r\n\r\n"); - sleep(Millis(150)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 2); let (tx, rx) = oneshot::channel(); @@ -789,8 +795,6 @@ async fn test_h1_gracefull_shutdown() { srv.stop().await; let _ = tx.send(()); }); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 0); @@ -800,12 +804,18 @@ async fn test_h1_gracefull_shutdown() { async fn test_h1_gracefull_shutdown_2() { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build().finish(move |_: Request| { let count = count.clone(); count.fetch_add(1, Ordering::Relaxed); + if count.load(Ordering::Relaxed) == 2 { + let _ = tx.lock().unwrap().take().unwrap().send(()); + } async move { sleep(Millis(1000)).await; count.fetch_sub(1, Ordering::Relaxed); @@ -820,17 +830,14 @@ async fn test_h1_gracefull_shutdown_2() { let mut stream2 = net::TcpStream::connect(srv.addr()).unwrap(); let _ = stream2.write_all(b"GET /index.html HTTP/1.1\r\n\r\n"); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); + let _ = rx.await; + assert_eq!(count.load(Ordering::Acquire), 2); let (tx, rx) = oneshot::channel(); rt::spawn(async move { srv.stop().await; let _ = tx.send(()); }); - sleep(Millis(150)).await; - assert_eq!(count.load(Ordering::Relaxed), 2); - let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 0); }