From 34142e1ae24c05d1844e797ede20cd5ed8eb795d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 29 May 2024 18:29:49 +0500 Subject: [PATCH] Fix handling not consumed request's payload (#367) --- ntex-service/src/util.rs | 30 ++++++++---------------------- ntex/CHANGES.md | 2 ++ ntex/src/http/h1/dispatcher.rs | 11 +++++++++-- ntex/src/http/h1/mod.rs | 10 +--------- ntex/tests/http_server.rs | 23 +++++++++++++++++++++++ 5 files changed, 43 insertions(+), 33 deletions(-) diff --git a/ntex-service/src/util.rs b/ntex-service/src/util.rs index 092909a6..6015ab3b 100644 --- a/ntex-service/src/util.rs +++ b/ntex-service/src/util.rs @@ -14,17 +14,11 @@ where let mut ready2 = false; poll_fn(move |cx| { - if !ready1 { - match pin::Pin::new(&mut fut1).poll(cx) { - Poll::Ready(_) => ready1 = true, - Poll::Pending => (), - } + if !ready1 && pin::Pin::new(&mut fut1).poll(cx).is_ready() { + ready1 = true; } - if !ready2 { - match pin::Pin::new(&mut fut2).poll(cx) { - Poll::Ready(_) => ready2 = true, - Poll::Pending => (), - } + if !ready2 && pin::Pin::new(&mut fut2).poll(cx).is_ready() { + ready2 = true } if ready1 && ready2 { Poll::Ready(()) @@ -51,19 +45,11 @@ where let mut ready2 = false; poll_fn(move |cx| { - if !ready1 { - match pin::Pin::new(&mut fut1).poll(cx) { - Poll::Ready(Ok(())) => ready1 = true, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => (), - } + if !ready1 && pin::Pin::new(&mut fut1).poll(cx)?.is_ready() { + ready1 = true; } - if !ready2 { - match pin::Pin::new(&mut fut2).poll(cx) { - Poll::Ready(Ok(())) => ready2 = true, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => (), - }; + if !ready2 && pin::Pin::new(&mut fut2).poll(cx)?.is_ready() { + ready2 = true; } if ready1 && ready2 { Poll::Ready(Ok(())) diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 87253296..6390b098 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -4,6 +4,8 @@ * http: Fix handling payload timer after payload got consumed +* http: Fix handling not consumed request's payload + ## [2.0.0] - 2024-05-28 * Use "async fn" for Service::ready() and Service::shutdown() diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 3a338071..664b0de8 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -208,7 +208,12 @@ where State::ReadRequest => ready!(inner.poll_read_request(cx)), // consume request's payload State::ReadPayload => { - ready!(inner.poll_request_payload(cx)).unwrap_or(State::ReadRequest) + let result = inner.poll_request_payload(cx); + if inner.flags.contains(Flags::SENDPAYLOAD_AND_STOP) { + inner.stop() + } else { + ready!(result).unwrap_or(State::ReadRequest) + } } // send response body State::SendPayload { body } => { @@ -224,6 +229,7 @@ where let _ = ready!(Pin::new(f).poll(cx)); fut.take(); } + log::debug!("{}: Dispatcher is stopped", inner.io.tag()); return Poll::Ready( if let Some(io) = io { @@ -654,7 +660,8 @@ where // wait until future completes and then close // connection self.payload = None; - Poll::Ready(Err(Either::Left(ProtocolError::PayloadIsNotConsumed))) + self.flags.insert(Flags::SENDPAYLOAD_AND_STOP); + Poll::Pending } } } diff --git a/ntex/src/http/h1/mod.rs b/ntex/src/http/h1/mod.rs index ccdf1c93..9d7a6f68 100644 --- a/ntex/src/http/h1/mod.rs +++ b/ntex/src/http/h1/mod.rs @@ -76,10 +76,6 @@ pub enum ProtocolError { #[error("Payload did not complete within the specified timeout")] SlowPayloadTimeout, - /// Payload is not consumed - #[error("Task is completed but request's payload is not consumed")] - PayloadIsNotConsumed, - /// Response body processing error #[error("Response body processing error: {0}")] ResponsePayload(Box), @@ -89,14 +85,10 @@ impl super::ResponseError for ProtocolError { fn error_response(&self) -> super::Response { match self { ProtocolError::Decode(_) => super::Response::BadRequest().into(), - ProtocolError::SlowRequestTimeout | ProtocolError::SlowPayloadTimeout => { super::Response::RequestTimeout().into() } - - ProtocolError::Encode(_) - | ProtocolError::PayloadIsNotConsumed - | ProtocolError::ResponsePayload(_) => { + ProtocolError::Encode(_) | ProtocolError::ResponsePayload(_) => { super::Response::InternalServerError().into() } } diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index 50f724b8..16a9aabe 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -380,6 +380,29 @@ async fn test_http1_disable_payload_timer_after_whole_pl_has_been_read() { assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); } +/// Handle not consumed payload +#[ntex::test] +async fn test_http1_handle_not_consumed_payload() { + let srv = test_server(|| { + HttpService::build() + .h1_control(fn_service(move |msg: Control<_, _>| { + if matches!(msg, Control::ProtocolError(_)) { + panic!() + } + async move { Ok::<_, io::Error>(msg.ack()) } + })) + .h1(|_| async move { Ok::<_, io::Error>(Response::Ok().finish()) }) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\ncontent-length: 4\r\n\r\n"); + sleep(Millis(250)).await; + let _ = stream.write_all(b"1234"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); +} + #[ntex::test] async fn test_content_length() { let srv = test_server(|| {