From 8f2d5056c9a8aab11217273b8b51fb55522ef9df Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Mar 2025 02:10:25 +0100 Subject: [PATCH] Return PayloadError::Incomplete on server disconnect (#545) --- ntex-io/src/tasks.rs | 24 +++------- ntex-rt/src/lib.rs | 8 +++- ntex/CHANGES.md | 6 +++ ntex/Cargo.toml | 2 +- ntex/src/http/client/h1proto.rs | 82 ++++++++++++++++++-------------- ntex/src/http/client/response.rs | 5 +- 6 files changed, 70 insertions(+), 57 deletions(-) diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 4a04196b..55f99416 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -537,7 +537,9 @@ impl IoContext { self.0.tag(), nbytes ); - inner.dispatch_task.wake(); + if !inner.dispatch_task.wake_checked() { + log::error!("Dispatcher waker is not registered"); + } } else { if nbytes >= hw { // read task is paused because of read back-pressure @@ -735,22 +737,6 @@ impl IoContext { false } - pub fn is_write_ready(&self) -> bool { - if let Some(waker) = self.0 .0.write_task.take() { - let ready = self - .0 - .filter() - .poll_write_ready(&mut Context::from_waker(&waker)); - if !matches!( - ready, - Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) - ) { - return true; - } - } - false - } - pub fn with_read_buf(&self, f: F) -> Poll<()> where F: FnOnce(&mut BytesVec) -> Poll>, @@ -803,7 +789,9 @@ impl IoContext { self.0.tag(), nbytes ); - inner.dispatch_task.wake(); + if !inner.dispatch_task.wake_checked() { + log::error!("Dispatcher waker is not registered"); + } } else { if nbytes >= hw { // read task is paused because of read back-pressure diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 1ffd7fe7..d5d85546 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -112,6 +112,8 @@ mod tokio { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> tok_io::task::JoinHandle where F: FnOnce() -> R + 'static, @@ -196,6 +198,8 @@ mod compio { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> JoinHandle where F: FnOnce() -> R + 'static, @@ -323,6 +327,8 @@ mod neon { /// /// This function panics if ntex system is not running. #[inline] + #[doc(hidden)] + #[deprecated] pub fn spawn_fn(f: F) -> Task where F: FnOnce() -> R + 'static, @@ -377,7 +383,7 @@ mod neon { impl JoinHandle { pub fn is_finished(&self) -> bool { - false + self.fut.is_none() } } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index cb75aabc..6ef4b5ef 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [2.12.4] - 2025-03-28 + +* http: Return PayloadError::Incomplete on server disconnect + +* web: Expose WebStack for external wrapper support in downstream crates #542 + ## [2.12.3] - 2025-03-22 * web: Export web::app_service::AppService #534 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 09655c7a..da30a2ba 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "2.12.3" +version = "2.12.4" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" diff --git a/ntex/src/http/client/h1proto.rs b/ntex/src/http/client/h1proto.rs index 06572418..28871225 100644 --- a/ntex/src/http/client/h1proto.rs +++ b/ntex/src/http/client/h1proto.rs @@ -1,13 +1,11 @@ -use std::{ - future::poll_fn, io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant, -}; +use std::{future::poll_fn, io, io::Write, pin::Pin, task, task::Poll, time::Instant}; use crate::http::body::{BodySize, MessageBody}; use crate::http::error::PayloadError; -use crate::http::h1; use crate::http::header::{HeaderMap, HeaderValue, HOST}; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::payload::{Payload, PayloadStream}; +use crate::http::{h1, Version}; use crate::io::{IoBoxed, RecvError}; use crate::time::{timeout_checked, Millis}; use crate::util::{ready, BufMut, Bytes, BytesMut, Stream}; @@ -101,7 +99,13 @@ where Ok((head, Payload::None)) } _ => { - let pl: PayloadStream = Box::pin(PlStream::new(io, codec, created, pool)); + let pl: PayloadStream = Box::pin(PlStream::new( + io, + codec, + created, + pool, + head.version == Version::HTTP_10, + )); Ok((head, pl.into())) } } @@ -137,6 +141,7 @@ pub(super) struct PlStream { io: Option, codec: h1::ClientPayloadCodec, created: Instant, + http_10: bool, pool: Option, } @@ -146,12 +151,14 @@ impl PlStream { codec: h1::ClientCodec, created: Instant, pool: Option, + http_10: bool, ) -> Self { PlStream { io: Some(io), codec: codec.into_payload_codec(), created, pool, + http_10, } } } @@ -161,41 +168,46 @@ impl Stream for PlStream { fn poll_next( mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + cx: &mut task::Context<'_>, ) -> Poll> { let mut this = self.as_mut(); loop { - return Poll::Ready(Some( - match ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)) { - Ok(chunk) => { - if let Some(chunk) = chunk { - Ok(chunk) - } else { - release_connection( - this.io.take().unwrap(), - !this.codec.keepalive(), - this.created, - this.pool.take(), - ); - return Poll::Ready(None); - } + let item = ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)); + return Poll::Ready(Some(match item { + Ok(chunk) => { + if let Some(chunk) = chunk { + Ok(chunk) + } else { + release_connection( + this.io.take().unwrap(), + !this.codec.keepalive(), + this.created, + this.pool.take(), + ); + return Poll::Ready(None); } - Err(RecvError::KeepAlive) => { - Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into()) + } + Err(RecvError::KeepAlive) => { + Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into()) + } + Err(RecvError::Stop) => { + Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped").into()) + } + Err(RecvError::WriteBackpressure) => { + ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?; + continue; + } + Err(RecvError::Decoder(err)) => Err(err), + Err(RecvError::PeerGone(Some(err))) => { + Err(PayloadError::Incomplete(Some(err))) + } + Err(RecvError::PeerGone(None)) => { + if this.http_10 { + return Poll::Ready(None); } - Err(RecvError::Stop) => { - Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped") - .into()) - } - Err(RecvError::WriteBackpressure) => { - ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?; - continue; - } - Err(RecvError::Decoder(err)) => Err(err), - Err(RecvError::PeerGone(Some(err))) => Err(err.into()), - Err(RecvError::PeerGone(None)) => return Poll::Ready(None), - }, - )); + Err(PayloadError::Incomplete(None)) + } + })); } } } diff --git a/ntex/src/http/client/response.rs b/ntex/src/http/client/response.rs index c68b6e73..9a450687 100644 --- a/ntex/src/http/client/response.rs +++ b/ntex/src/http/client/response.rs @@ -387,8 +387,8 @@ impl Future for ReadBody { let this = self.get_mut(); loop { - return match Pin::new(&mut this.stream).poll_next(cx)? { - Poll::Ready(Some(chunk)) => { + return match Pin::new(&mut this.stream).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit { Poll::Ready(Err(PayloadError::Overflow)) } else { @@ -397,6 +397,7 @@ impl Future for ReadBody { } } Poll::Ready(None) => Poll::Ready(Ok(this.buf.split().freeze())), + Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)), Poll::Pending => { if this.timeout.poll_elapsed(cx).is_ready() { Poll::Ready(Err(PayloadError::Incomplete(Some(