Return PayloadError::Incomplete on server disconnect (#545)

This commit is contained in:
Nikolay Kim 2025-03-28 02:10:25 +01:00 committed by GitHub
parent f647ad2eac
commit 8f2d5056c9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 70 additions and 57 deletions

View file

@ -537,7 +537,9 @@ impl IoContext {
self.0.tag(), self.0.tag(),
nbytes nbytes
); );
inner.dispatch_task.wake(); if !inner.dispatch_task.wake_checked() {
log::error!("Dispatcher waker is not registered");
}
} else { } else {
if nbytes >= hw { if nbytes >= hw {
// read task is paused because of read back-pressure // read task is paused because of read back-pressure
@ -735,22 +737,6 @@ impl IoContext {
false false
} }
pub fn is_write_ready(&self) -> bool {
if let Some(waker) = self.0 .0.write_task.take() {
let ready = self
.0
.filter()
.poll_write_ready(&mut Context::from_waker(&waker));
if !matches!(
ready,
Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
) {
return true;
}
}
false
}
pub fn with_read_buf<F>(&self, f: F) -> Poll<()> pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
where where
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>, F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
@ -803,7 +789,9 @@ impl IoContext {
self.0.tag(), self.0.tag(),
nbytes nbytes
); );
inner.dispatch_task.wake(); if !inner.dispatch_task.wake_checked() {
log::error!("Dispatcher waker is not registered");
}
} else { } else {
if nbytes >= hw { if nbytes >= hw {
// read task is paused because of read back-pressure // read task is paused because of read back-pressure

View file

@ -112,6 +112,8 @@ mod tokio {
/// ///
/// This function panics if ntex system is not running. /// This function panics if ntex system is not running.
#[inline] #[inline]
#[doc(hidden)]
#[deprecated]
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output> pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
where where
F: FnOnce() -> R + 'static, F: FnOnce() -> R + 'static,
@ -196,6 +198,8 @@ mod compio {
/// ///
/// This function panics if ntex system is not running. /// This function panics if ntex system is not running.
#[inline] #[inline]
#[doc(hidden)]
#[deprecated]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output> pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
where where
F: FnOnce() -> R + 'static, F: FnOnce() -> R + 'static,
@ -323,6 +327,8 @@ mod neon {
/// ///
/// This function panics if ntex system is not running. /// This function panics if ntex system is not running.
#[inline] #[inline]
#[doc(hidden)]
#[deprecated]
pub fn spawn_fn<F, R>(f: F) -> Task<R::Output> pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
where where
F: FnOnce() -> R + 'static, F: FnOnce() -> R + 'static,
@ -377,7 +383,7 @@ mod neon {
impl<T> JoinHandle<T> { impl<T> JoinHandle<T> {
pub fn is_finished(&self) -> bool { pub fn is_finished(&self) -> bool {
false self.fut.is_none()
} }
} }

View file

@ -1,5 +1,11 @@
# Changes # Changes
## [2.12.4] - 2025-03-28
* http: Return PayloadError::Incomplete on server disconnect
* web: Expose WebStack for external wrapper support in downstream crates #542
## [2.12.3] - 2025-03-22 ## [2.12.3] - 2025-03-22
* web: Export web::app_service::AppService #534 * web: Export web::app_service::AppService #534

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "2.12.3" version = "2.12.4"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"

View file

@ -1,13 +1,11 @@
use std::{ use std::{future::poll_fn, io, io::Write, pin::Pin, task, task::Poll, time::Instant};
future::poll_fn, io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant,
};
use crate::http::body::{BodySize, MessageBody}; use crate::http::body::{BodySize, MessageBody};
use crate::http::error::PayloadError; use crate::http::error::PayloadError;
use crate::http::h1;
use crate::http::header::{HeaderMap, HeaderValue, HOST}; use crate::http::header::{HeaderMap, HeaderValue, HOST};
use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::{Payload, PayloadStream}; use crate::http::payload::{Payload, PayloadStream};
use crate::http::{h1, Version};
use crate::io::{IoBoxed, RecvError}; use crate::io::{IoBoxed, RecvError};
use crate::time::{timeout_checked, Millis}; use crate::time::{timeout_checked, Millis};
use crate::util::{ready, BufMut, Bytes, BytesMut, Stream}; use crate::util::{ready, BufMut, Bytes, BytesMut, Stream};
@ -101,7 +99,13 @@ where
Ok((head, Payload::None)) Ok((head, Payload::None))
} }
_ => { _ => {
let pl: PayloadStream = Box::pin(PlStream::new(io, codec, created, pool)); let pl: PayloadStream = Box::pin(PlStream::new(
io,
codec,
created,
pool,
head.version == Version::HTTP_10,
));
Ok((head, pl.into())) Ok((head, pl.into()))
} }
} }
@ -137,6 +141,7 @@ pub(super) struct PlStream {
io: Option<IoBoxed>, io: Option<IoBoxed>,
codec: h1::ClientPayloadCodec, codec: h1::ClientPayloadCodec,
created: Instant, created: Instant,
http_10: bool,
pool: Option<Acquired>, pool: Option<Acquired>,
} }
@ -146,12 +151,14 @@ impl PlStream {
codec: h1::ClientCodec, codec: h1::ClientCodec,
created: Instant, created: Instant,
pool: Option<Acquired>, pool: Option<Acquired>,
http_10: bool,
) -> Self { ) -> Self {
PlStream { PlStream {
io: Some(io), io: Some(io),
codec: codec.into_payload_codec(), codec: codec.into_payload_codec(),
created, created,
pool, pool,
http_10,
} }
} }
} }
@ -161,12 +168,12 @@ impl Stream for PlStream {
fn poll_next( fn poll_next(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut task::Context<'_>,
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut(); let mut this = self.as_mut();
loop { loop {
return Poll::Ready(Some( let item = ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx));
match ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx)) { return Poll::Ready(Some(match item {
Ok(chunk) => { Ok(chunk) => {
if let Some(chunk) = chunk { if let Some(chunk) = chunk {
Ok(chunk) Ok(chunk)
@ -184,18 +191,23 @@ impl Stream for PlStream {
Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into()) Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into())
} }
Err(RecvError::Stop) => { Err(RecvError::Stop) => {
Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped") Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped").into())
.into())
} }
Err(RecvError::WriteBackpressure) => { Err(RecvError::WriteBackpressure) => {
ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?; ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?;
continue; continue;
} }
Err(RecvError::Decoder(err)) => Err(err), Err(RecvError::Decoder(err)) => Err(err),
Err(RecvError::PeerGone(Some(err))) => Err(err.into()), Err(RecvError::PeerGone(Some(err))) => {
Err(RecvError::PeerGone(None)) => return Poll::Ready(None), Err(PayloadError::Incomplete(Some(err)))
}, }
)); Err(RecvError::PeerGone(None)) => {
if this.http_10 {
return Poll::Ready(None);
}
Err(PayloadError::Incomplete(None))
}
}));
} }
} }
} }

View file

@ -387,8 +387,8 @@ impl Future for ReadBody {
let this = self.get_mut(); let this = self.get_mut();
loop { loop {
return match Pin::new(&mut this.stream).poll_next(cx)? { return match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(chunk)) => { Poll::Ready(Some(Ok(chunk))) => {
if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit { if this.limit > 0 && (this.buf.len() + chunk.len()) > this.limit {
Poll::Ready(Err(PayloadError::Overflow)) Poll::Ready(Err(PayloadError::Overflow))
} else { } else {
@ -397,6 +397,7 @@ impl Future for ReadBody {
} }
} }
Poll::Ready(None) => Poll::Ready(Ok(this.buf.split().freeze())), Poll::Ready(None) => Poll::Ready(Ok(this.buf.split().freeze())),
Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
Poll::Pending => { Poll::Pending => {
if this.timeout.poll_elapsed(cx).is_ready() { if this.timeout.poll_elapsed(cx).is_ready() {
Poll::Ready(Err(PayloadError::Incomplete(Some( Poll::Ready(Err(PayloadError::Incomplete(Some(