From 5804165d9a3f6d832f185f9cf1f3b45f41c16de9 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 29 May 2024 18:10:38 +0500 Subject: [PATCH] Fix handling not consumed requests payload --- ntex/CHANGES.md | 2 ++ ntex/src/http/h1/dispatcher.rs | 11 +++++++++-- ntex/src/http/h1/mod.rs | 10 +--------- ntex/tests/http_server.rs | 23 +++++++++++++++++++++++ 4 files changed, 35 insertions(+), 11 deletions(-) diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 87253296..6390b098 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -4,6 +4,8 @@ * http: Fix handling payload timer after payload got consumed +* http: Fix handling not consumed request's payload + ## [2.0.0] - 2024-05-28 * Use "async fn" for Service::ready() and Service::shutdown() diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 3a338071..664b0de8 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -208,7 +208,12 @@ where State::ReadRequest => ready!(inner.poll_read_request(cx)), // consume request's payload State::ReadPayload => { - ready!(inner.poll_request_payload(cx)).unwrap_or(State::ReadRequest) + let result = inner.poll_request_payload(cx); + if inner.flags.contains(Flags::SENDPAYLOAD_AND_STOP) { + inner.stop() + } else { + ready!(result).unwrap_or(State::ReadRequest) + } } // send response body State::SendPayload { body } => { @@ -224,6 +229,7 @@ where let _ = ready!(Pin::new(f).poll(cx)); fut.take(); } + log::debug!("{}: Dispatcher is stopped", inner.io.tag()); return Poll::Ready( if let Some(io) = io { @@ -654,7 +660,8 @@ where // wait until future completes and then close // connection self.payload = None; - Poll::Ready(Err(Either::Left(ProtocolError::PayloadIsNotConsumed))) + self.flags.insert(Flags::SENDPAYLOAD_AND_STOP); + Poll::Pending } } } diff --git a/ntex/src/http/h1/mod.rs b/ntex/src/http/h1/mod.rs index ccdf1c93..9d7a6f68 100644 --- a/ntex/src/http/h1/mod.rs +++ b/ntex/src/http/h1/mod.rs @@ -76,10 +76,6 @@ pub enum ProtocolError { #[error("Payload did not complete within the specified timeout")] SlowPayloadTimeout, - /// Payload is not consumed - #[error("Task is completed but request's payload is not consumed")] - PayloadIsNotConsumed, - /// Response body processing error #[error("Response body processing error: {0}")] ResponsePayload(Box), @@ -89,14 +85,10 @@ impl super::ResponseError for ProtocolError { fn error_response(&self) -> super::Response { match self { ProtocolError::Decode(_) => super::Response::BadRequest().into(), - ProtocolError::SlowRequestTimeout | ProtocolError::SlowPayloadTimeout => { super::Response::RequestTimeout().into() } - - ProtocolError::Encode(_) - | ProtocolError::PayloadIsNotConsumed - | ProtocolError::ResponsePayload(_) => { + ProtocolError::Encode(_) | ProtocolError::ResponsePayload(_) => { super::Response::InternalServerError().into() } } diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index 50f724b8..16a9aabe 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -380,6 +380,29 @@ async fn test_http1_disable_payload_timer_after_whole_pl_has_been_read() { assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); } +/// Handle not consumed payload +#[ntex::test] +async fn test_http1_handle_not_consumed_payload() { + let srv = test_server(|| { + HttpService::build() + .h1_control(fn_service(move |msg: Control<_, _>| { + if matches!(msg, Control::ProtocolError(_)) { + panic!() + } + async move { Ok::<_, io::Error>(msg.ack()) } + })) + .h1(|_| async move { Ok::<_, io::Error>(Response::Ok().finish()) }) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\ncontent-length: 4\r\n\r\n"); + sleep(Millis(250)).await; + let _ = stream.write_all(b"1234"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); +} + #[ntex::test] async fn test_content_length() { let srv = test_server(|| {