mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
Fix handling not consumed requests payload
This commit is contained in:
parent
9c29de14cf
commit
5804165d9a
4 changed files with 35 additions and 11 deletions
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
* http: Fix handling payload timer after payload got consumed
|
* http: Fix handling payload timer after payload got consumed
|
||||||
|
|
||||||
|
* http: Fix handling not consumed request's payload
|
||||||
|
|
||||||
## [2.0.0] - 2024-05-28
|
## [2.0.0] - 2024-05-28
|
||||||
|
|
||||||
* Use "async fn" for Service::ready() and Service::shutdown()
|
* Use "async fn" for Service::ready() and Service::shutdown()
|
||||||
|
|
|
@ -208,7 +208,12 @@ where
|
||||||
State::ReadRequest => ready!(inner.poll_read_request(cx)),
|
State::ReadRequest => ready!(inner.poll_read_request(cx)),
|
||||||
// consume request's payload
|
// consume request's payload
|
||||||
State::ReadPayload => {
|
State::ReadPayload => {
|
||||||
ready!(inner.poll_request_payload(cx)).unwrap_or(State::ReadRequest)
|
let result = inner.poll_request_payload(cx);
|
||||||
|
if inner.flags.contains(Flags::SENDPAYLOAD_AND_STOP) {
|
||||||
|
inner.stop()
|
||||||
|
} else {
|
||||||
|
ready!(result).unwrap_or(State::ReadRequest)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// send response body
|
// send response body
|
||||||
State::SendPayload { body } => {
|
State::SendPayload { body } => {
|
||||||
|
@ -224,6 +229,7 @@ where
|
||||||
let _ = ready!(Pin::new(f).poll(cx));
|
let _ = ready!(Pin::new(f).poll(cx));
|
||||||
fut.take();
|
fut.take();
|
||||||
}
|
}
|
||||||
|
log::debug!("{}: Dispatcher is stopped", inner.io.tag());
|
||||||
|
|
||||||
return Poll::Ready(
|
return Poll::Ready(
|
||||||
if let Some(io) = io {
|
if let Some(io) = io {
|
||||||
|
@ -654,7 +660,8 @@ where
|
||||||
// wait until future completes and then close
|
// wait until future completes and then close
|
||||||
// connection
|
// connection
|
||||||
self.payload = None;
|
self.payload = None;
|
||||||
Poll::Ready(Err(Either::Left(ProtocolError::PayloadIsNotConsumed)))
|
self.flags.insert(Flags::SENDPAYLOAD_AND_STOP);
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,10 +76,6 @@ pub enum ProtocolError {
|
||||||
#[error("Payload did not complete within the specified timeout")]
|
#[error("Payload did not complete within the specified timeout")]
|
||||||
SlowPayloadTimeout,
|
SlowPayloadTimeout,
|
||||||
|
|
||||||
/// Payload is not consumed
|
|
||||||
#[error("Task is completed but request's payload is not consumed")]
|
|
||||||
PayloadIsNotConsumed,
|
|
||||||
|
|
||||||
/// Response body processing error
|
/// Response body processing error
|
||||||
#[error("Response body processing error: {0}")]
|
#[error("Response body processing error: {0}")]
|
||||||
ResponsePayload(Box<dyn std::error::Error>),
|
ResponsePayload(Box<dyn std::error::Error>),
|
||||||
|
@ -89,14 +85,10 @@ impl super::ResponseError for ProtocolError {
|
||||||
fn error_response(&self) -> super::Response {
|
fn error_response(&self) -> super::Response {
|
||||||
match self {
|
match self {
|
||||||
ProtocolError::Decode(_) => super::Response::BadRequest().into(),
|
ProtocolError::Decode(_) => super::Response::BadRequest().into(),
|
||||||
|
|
||||||
ProtocolError::SlowRequestTimeout | ProtocolError::SlowPayloadTimeout => {
|
ProtocolError::SlowRequestTimeout | ProtocolError::SlowPayloadTimeout => {
|
||||||
super::Response::RequestTimeout().into()
|
super::Response::RequestTimeout().into()
|
||||||
}
|
}
|
||||||
|
ProtocolError::Encode(_) | ProtocolError::ResponsePayload(_) => {
|
||||||
ProtocolError::Encode(_)
|
|
||||||
| ProtocolError::PayloadIsNotConsumed
|
|
||||||
| ProtocolError::ResponsePayload(_) => {
|
|
||||||
super::Response::InternalServerError().into()
|
super::Response::InternalServerError().into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -380,6 +380,29 @@ async fn test_http1_disable_payload_timer_after_whole_pl_has_been_read() {
|
||||||
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle not consumed payload
|
||||||
|
#[ntex::test]
|
||||||
|
async fn test_http1_handle_not_consumed_payload() {
|
||||||
|
let srv = test_server(|| {
|
||||||
|
HttpService::build()
|
||||||
|
.h1_control(fn_service(move |msg: Control<_, _>| {
|
||||||
|
if matches!(msg, Control::ProtocolError(_)) {
|
||||||
|
panic!()
|
||||||
|
}
|
||||||
|
async move { Ok::<_, io::Error>(msg.ack()) }
|
||||||
|
}))
|
||||||
|
.h1(|_| async move { Ok::<_, io::Error>(Response::Ok().finish()) })
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
|
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\ncontent-length: 4\r\n\r\n");
|
||||||
|
sleep(Millis(250)).await;
|
||||||
|
let _ = stream.write_all(b"1234");
|
||||||
|
let mut data = vec![0; 1024];
|
||||||
|
let _ = stream.read(&mut data);
|
||||||
|
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
||||||
|
}
|
||||||
|
|
||||||
#[ntex::test]
|
#[ntex::test]
|
||||||
async fn test_content_length() {
|
async fn test_content_length() {
|
||||||
let srv = test_server(|| {
|
let srv = test_server(|| {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue