diff --git a/Cargo.toml b/Cargo.toml index 871d9de2..d9e97ef4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,10 @@ ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } [workspace.dependencies] +async-channel = "2" async-task = "4.5.0" +atomic-waker = "1.1" +core_affinity = "0.8" bitflags = "2" cfg_aliases = "0.2.1" cfg-if = "1.0.0" @@ -57,7 +60,8 @@ fxhash = "0.2" libc = "0.2.164" log = "0.4" io-uring = "0.7.4" -polling = "3.3.0" +oneshot = "0.1" +polling = "3.7.4" nohash-hasher = "0.2.0" scoped-tls = "1.0.1" slab = "0.4.9" diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 6a1e881d..f55aa5d0 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -28,4 +28,3 @@ pin-project-lite = "0.2" [dev-dependencies] ntex = "2" rand = "0.8" -env_logger = "0.11" diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 4a04196b..55f99416 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -537,7 +537,9 @@ impl IoContext { self.0.tag(), nbytes ); - inner.dispatch_task.wake(); + if !inner.dispatch_task.wake_checked() { + log::error!("Dispatcher waker is not registered"); + } } else { if nbytes >= hw { // read task is paused because of read back-pressure @@ -735,22 +737,6 @@ impl IoContext { false } - pub fn is_write_ready(&self) -> bool { - if let Some(waker) = self.0 .0.write_task.take() { - let ready = self - .0 - .filter() - .poll_write_ready(&mut Context::from_waker(&waker)); - if !matches!( - ready, - Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) - ) { - return true; - } - } - false - } - pub fn with_read_buf(&self, f: F) -> Poll<()> where F: FnOnce(&mut BytesVec) -> Poll>, @@ -803,7 +789,9 @@ impl IoContext { self.0.tag(), nbytes ); - inner.dispatch_task.wake(); + if !inner.dispatch_task.wake_checked() { + log::error!("Dispatcher waker is not registered"); + } } else { if nbytes >= hw { // read task is paused because of read back-pressure diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml index f6cad0e2..a5bcf67d 100644 --- a/ntex-macros/Cargo.toml +++ b/ntex-macros/Cargo.toml @@ -18,4 +18,3 @@ proc-macro2 = "^1" [dev-dependencies] ntex = "2" futures = "0.3" -env_logger = "0.11" diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index a16145fc..e60744ef 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.10] - 2025-03-28 + +* Better closed sockets handling + ## [2.5.9] - 2025-03-27 * Handle closed sockets diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 8e7335c6..5a72d3eb 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.9" +version = "2.5.10" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -40,7 +40,7 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-neon = { version = "0.1.11", optional = true } +ntex-neon = { version = "0.1.15", optional = true } bitflags = { workspace = true } cfg-if = { workspace = true } @@ -57,4 +57,3 @@ polling = { workspace = true, optional = true } [dev-dependencies] ntex = "2" -env_logger = "0.11" diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index c179a77a..24db553d 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -1,5 +1,5 @@ use std::os::fd::{AsRawFd, RawFd}; -use std::{cell::Cell, cell::RefCell, future::Future, io, rc::Rc, task, task::Poll}; +use std::{cell::Cell, cell::RefCell, future::Future, io, mem, rc::Rc, task, task::Poll}; use ntex_neon::driver::{DriverApi, Event, Handler}; use ntex_neon::{syscall, Runtime}; @@ -18,7 +18,6 @@ bitflags::bitflags! { struct Flags: u8 { const RD = 0b0000_0001; const WR = 0b0000_0010; - const CLOSED = 0b0000_0100; } } @@ -106,18 +105,15 @@ impl Handler for StreamOpsHandler { return; } let item = &mut streams[id]; - log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); - - if item.flags.contains(Flags::CLOSED) { + if item.io.is_none() { return; } + log::debug!("{}: FD event {:?} event: {:?}", item.tag(), id, ev); // handle HUP if ev.is_interrupt() { item.context.stopped(None); - if item.io.take().is_some() { - close(id as u32, item, &self.inner.api); - } + close(id as u32, item, &self.inner.api, None, true); return; } @@ -177,9 +173,7 @@ impl Handler for StreamOpsHandler { item.fd, item.io.is_some() ); - if item.io.is_some() { - close(id, &mut item, &self.inner.api); - } + close(id, &mut item, &self.inner.api, None, true); } } self.inner.delayd_drop.set(false); @@ -197,10 +191,7 @@ impl Handler for StreamOpsHandler { item.fd, err ); - item.context.stopped(Some(err)); - if item.io.take().is_some() { - close(id as u32, item, &self.inner.api); - } + close(id as u32, item, &self.inner.api, Some(err), false); } }) } @@ -222,14 +213,26 @@ fn close( id: u32, item: &mut StreamItem, api: &DriverApi, -) -> ntex_rt::JoinHandle> { - let fd = item.fd; - item.flags.insert(Flags::CLOSED); - api.detach(fd, id); - ntex_rt::spawn_blocking(move || { - syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; - syscall!(libc::close(fd)) - }) + error: Option, + shutdown: bool, +) -> Option>> { + if let Some(io) = item.io.take() { + log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); + mem::forget(io); + if let Some(err) = error { + item.context.stopped(Some(err)); + } + let fd = item.fd; + api.detach(fd, id); + Some(ntex_rt::spawn_blocking(move || { + if shutdown { + let _ = syscall!(libc::shutdown(fd, libc::SHUT_RDWR)); + } + syscall!(libc::close(fd)) + })) + } else { + None + } } impl StreamCtl { @@ -237,13 +240,7 @@ impl StreamCtl { let id = self.id as usize; let fut = self.inner.with(|streams| { let item = &mut streams[id]; - if let Some(io) = item.io.take() { - log::debug!("{}: Closing ({}), {:?}", item.tag(), id, item.fd); - std::mem::forget(io); - Some(close(self.id, item, &self.inner.api)) - } else { - None - } + close(self.id, item, &self.inner.api, None, false) }); async move { if let Some(fut) = fut { @@ -360,9 +357,7 @@ impl Drop for StreamCtl { item.fd, item.io.is_some() ); - if item.io.is_some() { - close(self.id, &mut item, &self.inner.api); - } + close(self.id, &mut item, &self.inner.api, None, true); } self.inner.streams.set(Some(streams)); } else { diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index e133ceb4..a5966d76 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -42,7 +42,4 @@ tok-io = { version = "1", package = "tokio", default-features = false, features "net", ], optional = true } -ntex-neon = { version = "0.1.11", optional = true } - -[dev-dependencies] -env_logger = "0.11" +ntex-neon = { version = "0.1.14", optional = true } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 1ffd7fe7..d5d85546 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -112,6 +112,8 @@ mod tokio { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> tok_io::task::JoinHandle where F: FnOnce() -> R + 'static, @@ -196,6 +198,8 @@ mod compio { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> JoinHandle where F: FnOnce() -> R + 'static, @@ -323,6 +327,8 @@ mod neon { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> Task where F: FnOnce() -> R + 'static, @@ -377,7 +383,7 @@ mod neon { impl JoinHandle { pub fn is_finished(&self) -> bool { - false + self.fut.is_none() } } diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index 0d8dabc5..546a92ff 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.7.3] - 2025-03-28 + +* Better worker availability handling + ## [2.7.2] - 2025-03-27 * Handle paused state diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index bca5f8b2..a88be635 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.2" +version = "2.7.4" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -22,13 +22,13 @@ ntex-service = "3.4" ntex-rt = "0.4" ntex-util = "2.8" -async-channel = "2" -async-broadcast = "0.7" -core_affinity = "0.8" -polling = "3.3" -log = "0.4" -socket2 = "0.5" -oneshot = { version = "0.1", default-features = false, features = ["async"] } +async-channel = { workspace = true } +atomic-waker = { workspace = true } +core_affinity = { workspace = true } +oneshot = { workspace = true } +polling = { workspace = true } +log = { workspace = true } +socket2 = { workspace = true } [dev-dependencies] ntex = "2" diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs index ca558a54..9d0bfe8d 100644 --- a/ntex-server/src/manager.rs +++ b/ntex-server/src/manager.rs @@ -55,7 +55,7 @@ impl ServerManager { let no_signals = cfg.no_signals; let shared = Arc::new(ServerShared { - paused: AtomicBool::new(false), + paused: AtomicBool::new(true), }); let mgr = ServerManager(Rc::new(Inner { cfg, @@ -139,7 +139,6 @@ impl ServerManager { fn start_worker(mgr: ServerManager, cid: Option) { let _ = ntex_rt::spawn(async move { let id = mgr.next_id(); - let mut wrk = Worker::start(id, mgr.factory(), cid); loop { @@ -181,7 +180,7 @@ impl HandleCmdState { fn process(&mut self, mut item: F::Item) { loop { if !self.workers.is_empty() { - if self.next > self.workers.len() { + if self.next >= self.workers.len() { self.next = self.workers.len() - 1; } match self.workers[self.next].send(item) { @@ -212,10 +211,9 @@ impl HandleCmdState { match upd { Update::Available(worker) => { self.workers.push(worker); - if !self.workers.is_empty() { + self.workers.sort(); + if self.workers.len() == 1 { self.mgr.resume(); - } else { - self.workers.sort(); } } Update::Unavailable(worker) => { @@ -234,6 +232,9 @@ impl HandleCmdState { if let Err(item) = self.workers[0].send(item) { self.backlog.push_back(item); self.workers.remove(0); + if self.workers.is_empty() { + self.mgr.pause(); + } break; } } diff --git a/ntex-server/src/net/accept.rs b/ntex-server/src/net/accept.rs index 31793d82..7694d286 100644 --- a/ntex-server/src/net/accept.rs +++ b/ntex-server/src/net/accept.rs @@ -203,14 +203,10 @@ impl Accept { let mut timeout = Some(Duration::ZERO); loop { if let Err(e) = self.poller.wait(&mut events, timeout) { - if e.kind() == io::ErrorKind::Interrupted { - continue; - } else { + if e.kind() != io::ErrorKind::Interrupted { panic!("Cannot wait for events in poller: {}", e) } - } - - if timeout.is_some() { + } else if timeout.is_some() { timeout = None; let _ = self.tx.take().unwrap().send(()); } diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index b9092d0f..b791817d 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -2,8 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{ready, Context, Poll}; use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc}; -use async_broadcast::{self as bus, broadcast}; use async_channel::{unbounded, Receiver, Sender}; +use atomic_waker::AtomicWaker; use core_affinity::CoreId; use ntex_rt::{spawn, Arbiter}; @@ -151,10 +151,8 @@ impl Worker { if self.failed.load(Ordering::Acquire) { WorkerStatus::Failed } else { - // cleanup updates - while self.avail.notify.try_recv().is_ok() {} - - if self.avail.notify.recv_direct().await.is_err() { + self.avail.wait_for_update().await; + if self.avail.failed() { self.failed.store(true, Ordering::Release); } self.status() @@ -196,46 +194,79 @@ impl Future for WorkerStop { #[derive(Debug, Clone)] struct WorkerAvailability { - notify: bus::Receiver<()>, - available: Arc, + inner: Arc, } #[derive(Debug, Clone)] struct WorkerAvailabilityTx { - notify: bus::Sender<()>, - available: Arc, + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + waker: AtomicWaker, + updated: AtomicBool, + available: AtomicBool, + failed: AtomicBool, } impl WorkerAvailability { fn create() -> (Self, WorkerAvailabilityTx) { - let (mut tx, rx) = broadcast(16); - tx.set_overflow(true); + let inner = Arc::new(Inner { + waker: AtomicWaker::new(), + updated: AtomicBool::new(false), + available: AtomicBool::new(false), + failed: AtomicBool::new(false), + }); let avail = WorkerAvailability { - notify: rx, - available: Arc::new(AtomicBool::new(false)), - }; - let avail_tx = WorkerAvailabilityTx { - notify: tx, - available: avail.available.clone(), + inner: inner.clone(), }; + let avail_tx = WorkerAvailabilityTx { inner }; (avail, avail_tx) } + fn failed(&self) -> bool { + self.inner.failed.load(Ordering::Acquire) + } + fn available(&self) -> bool { - self.available.load(Ordering::Acquire) + self.inner.available.load(Ordering::Acquire) + } + + async fn wait_for_update(&self) { + poll_fn(|cx| { + if self.inner.updated.load(Ordering::Acquire) { + self.inner.updated.store(false, Ordering::Release); + Poll::Ready(()) + } else { + self.inner.waker.register(cx.waker()); + Poll::Pending + } + }) + .await; } } impl WorkerAvailabilityTx { fn set(&self, val: bool) { - let old = self.available.swap(val, Ordering::Release); - if !old && val { - let _ = self.notify.try_broadcast(()); + let old = self.inner.available.swap(val, Ordering::Release); + if old != val { + self.inner.updated.store(true, Ordering::Release); + self.inner.waker.wake(); } } } +impl Drop for WorkerAvailabilityTx { + fn drop(&mut self) { + self.inner.failed.store(true, Ordering::Release); + self.inner.updated.store(true, Ordering::Release); + self.inner.available.store(false, Ordering::Release); + self.inner.waker.wake(); + } +} + /// Service worker /// /// Worker accepts message via unbounded channel and starts processing. @@ -256,10 +287,13 @@ where let mut recv = std::pin::pin!(wrk.rx.recv()); let fut = poll_fn(|cx| { match svc.poll_ready(cx) { - Poll::Ready(res) => { - res?; + Poll::Ready(Ok(())) => { wrk.availability.set(true); } + Poll::Ready(Err(err)) => { + wrk.availability.set(false); + return Poll::Ready(Err(err)); + } Poll::Pending => { wrk.availability.set(false); return Poll::Pending; @@ -287,7 +321,6 @@ where let _ = ntex_rt::spawn(async move { svc.shutdown().await; }); - wrk.availability.set(false); } Either::Right(Some(Shutdown { timeout, result })) => { wrk.availability.set(false); @@ -302,6 +335,7 @@ where return; } Either::Left(Ok(false)) | Either::Right(None) => { + wrk.availability.set(false); stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; return; } @@ -311,7 +345,6 @@ where loop { match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await { Either::Left(Ok(service)) => { - wrk.availability.set(true); svc = Pipeline::new(service).bind(); break; } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index cb75aabc..6ef4b5ef 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.12.4] - 2025-03-28 + +* http: Return PayloadError::Incomplete on server disconnect + +* web: Expose WebStack for external wrapper support in downstream crates #542 + ## [2.12.3] - 2025-03-22 * web: Export web::app_service::AppService #534 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 09655c7a..0ea37469 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "2.12.3" +version = "2.12.4" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -68,11 +68,11 @@ ntex-service = "3.4" ntex-macros = "0.1" ntex-util = "2.8" ntex-bytes = "0.1.27" -ntex-server = "2.7" +ntex-server = "2.7.4" ntex-h2 = "1.8.6" ntex-rt = "0.4.27" ntex-io = "2.11" -ntex-net = "2.5.8" +ntex-net = "2.5.10" ntex-tls = "2.3" base64 = "0.22" diff --git a/ntex/src/http/client/h1proto.rs b/ntex/src/http/client/h1proto.rs index 06572418..28871225 100644 --- a/ntex/src/http/client/h1proto.rs +++ b/ntex/src/http/client/h1proto.rs @@ -1,13 +1,11 @@ -use std::{ - future::poll_fn, io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant, -}; +use std::{future::poll_fn, io, io::Write, pin::Pin, task, task::Poll, time::Instant}; use crate::http::body::{BodySize, MessageBody}; use crate::http::error::PayloadError; -use crate::http::h1; use crate::http::header::{HeaderMap, HeaderValue, HOST}; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::payload::{Payload, PayloadStream}; +use crate::http::{h1, Version}; use crate::io::{IoBoxed, RecvError}; use crate::time::{timeout_checked, Millis}; use crate::util::{ready, BufMut, Bytes, BytesMut, Stream}; @@ -101,7 +99,13 @@ where Ok((head, Payload::None)) } _ => { - let pl: PayloadStream = Box::pin(PlStream::new(io, codec, created, pool)); + let pl: PayloadStream = Box::pin(PlStream::new( + io, + codec, + created, + pool, + head.version == Version::HTTP_10, + )); Ok((head, pl.into())) } } @@ -137,6 +141,7 @@ pub(super) struct PlStream { io: Option, codec: h1::ClientPayloadCodec, created: Instant, + http_10: bool, pool: Option, } @@ -146,12 +151,14 @@ impl PlStream { codec: h1::ClientCodec, created: Instant, pool: Option, + http_10: bool, ) -> Self { PlStream { io: Some(io), codec: codec.into_payload_codec(), created, pool, + http_10, } } } @@ -161,41 +168,46 @@ impl Stream for PlStream { fn poll_next( mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + cx: &mut task::Context<'_>, ) -> Poll> { let mut this = self.as_mut(); loop { - return Poll::Ready(Some( - match ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)) { - Ok(chunk) => { - if let Some(chunk) = chunk { - Ok(chunk) - } else { - release_connection( - this.io.take().unwrap(), - !this.codec.keepalive(), - this.created, - this.pool.take(), - ); - return Poll::Ready(None); - } + let item = ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)); + return Poll::Ready(Some(match item { + Ok(chunk) => { + if let Some(chunk) = chunk { + Ok(chunk) + } else { + release_connection( + this.io.take().unwrap(), + !this.codec.keepalive(), + this.created, + this.pool.take(), + ); + return Poll::Ready(None); } - Err(RecvError::KeepAlive) => { - Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into()) + } + Err(RecvError::KeepAlive) => { + Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into()) + } + Err(RecvError::Stop) => { + Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped").into()) + } + Err(RecvError::WriteBackpressure) => { + ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?; + continue; + } + Err(RecvError::Decoder(err)) => Err(err), + Err(RecvError::PeerGone(Some(err))) => { + Err(PayloadError::Incomplete(Some(err))) + } + Err(RecvError::PeerGone(None)) => { + if this.http_10 { + return Poll::Ready(None); } - Err(RecvError::Stop) => { - Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped") - .into()) - } - Err(RecvError::WriteBackpressure) => { - ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?; - continue; - } - Err(RecvError::Decoder(err)) => Err(err), - Err(RecvError::PeerGone(Some(err))) => Err(err.into()), - Err(RecvError::PeerGone(None)) => return Poll::Ready(None), - }, - )); + Err(PayloadError::Incomplete(None)) + } + })); } } } diff --git a/ntex/src/http/client/response.rs b/ntex/src/http/client/response.rs index c68b6e73..9a450687 100644 --- a/ntex/src/http/client/response.rs +++ b/ntex/src/http/client/response.rs @@ -387,8 +387,8 @@ impl Future for ReadBody { let this = self.get_mut(); loop { - return match Pin::new(&mut this.stream).poll_next(cx)? { - Poll::Ready(Some(chunk)) => { + return match Pin::new(&mut this.stream).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit { Poll::Ready(Err(PayloadError::Overflow)) } else { @@ -397,6 +397,7 @@ impl Future for ReadBody { } } Poll::Ready(None) => Poll::Ready(Ok(this.buf.split().freeze())), + Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)), Poll::Pending => { if this.timeout.poll_elapsed(cx).is_ready() { Poll::Ready(Err(PayloadError::Incomplete(Some( diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index c9ecdb0b..0e4a6559 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -252,6 +252,7 @@ where Ok(()) }) }); + thread::sleep(std::time::Duration::from_millis(150)); let (system, server, addr) = rx.recv().unwrap(); diff --git a/ntex/src/web/mod.rs b/ntex/src/web/mod.rs index 6c3d37b1..8d9adf4d 100644 --- a/ntex/src/web/mod.rs +++ b/ntex/src/web/mod.rs @@ -82,7 +82,7 @@ mod route; mod scope; mod server; mod service; -mod stack; +pub mod stack; pub mod test; pub mod types; mod util; diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index 75227c2c..921310a8 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -425,11 +425,12 @@ async fn test_h2_service_error() { assert_eq!(bytes, Bytes::from_static(b"error")); } -struct SetOnDrop(Arc); +struct SetOnDrop(Arc, Arc>>>); impl Drop for SetOnDrop { fn drop(&mut self) { self.0.fetch_add(1, Ordering::Relaxed); + let _ = self.1.lock().unwrap().take().unwrap().send(()); } } @@ -437,17 +438,20 @@ impl Drop for SetOnDrop { async fn test_h2_client_drop() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build() .h2(move |req: Request| { - let count = count.clone(); + let st = SetOnDrop(count.clone(), tx.clone()); async move { - let _st = SetOnDrop(count); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_2); - sleep(Seconds(100)).await; + sleep(Seconds(30)).await; + drop(st); Ok::<_, io::Error>(Response::Ok().finish()) } }) @@ -455,9 +459,9 @@ async fn test_h2_client_drop() -> io::Result<()> { .map_err(|_| ()) }); - let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; + let result = timeout(Millis(1500), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(250)).await; + let _ = timeout(Millis(1500), rx).await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) } diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index a4c1d05f..0227573b 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -723,11 +723,12 @@ async fn test_h1_service_error() { assert_eq!(bytes, Bytes::from_static(b"error")); } -struct SetOnDrop(Arc); +struct SetOnDrop(Arc, Option<::oneshot::Sender<()>>); impl Drop for SetOnDrop { fn drop(&mut self) { self.0.fetch_add(1, Ordering::Relaxed); + let _ = self.1.take().unwrap().send(()); } } @@ -735,24 +736,28 @@ impl Drop for SetOnDrop { async fn test_h1_client_drop() -> io::Result<()> { let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone(); + let (tx, rx) = ::oneshot::channel(); + let tx = Arc::new(Mutex::new(Some(tx))); let srv = test_server(move || { + let tx = tx.clone(); let count = count2.clone(); HttpService::build().h1(move |req: Request| { + let tx = tx.clone(); let count = count.clone(); async move { - let _st = SetOnDrop(count); + let _st = SetOnDrop(count, tx.lock().unwrap().take()); assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_11); - sleep(Millis(500)).await; + sleep(Millis(50000)).await; Ok::<_, io::Error>(Response::Ok().finish()) } }) }); - let result = timeout(Millis(100), srv.request(Method::GET, "/").send()).await; + let result = timeout(Millis(1500), srv.request(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(1000)).await; + let _ = rx.await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) }