Better handling for h2 remote payload (#439)

This commit is contained in:
Nikolay Kim 2024-10-16 20:46:40 +05:00 committed by GitHub
parent 4f7d951f40
commit dedb7de64c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 94 additions and 35 deletions

View file

@ -24,8 +24,8 @@
## Usage ## Usage
Starting ntex v0.5 async runtime must be selected as a feature. Available options are `glommio`, ntex supports multiple async runtimes, runtime must be selected as a feature. Available options are `compio`, `tokio`,
`tokio` or `async-std`. `glommio` or `async-std`.
```toml ```toml
[dependencies] [dependencies]

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.7.0] - 2024-10-16
* Better handling for h2 remote payload
## [2.6.0] - 2024-09-25 ## [2.6.0] - 2024-09-25
* Disable default features for rustls * Disable default features for rustls

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "2.6.0" version = "2.7.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -18,7 +18,7 @@ edition = "2021"
rust-version = "1.75" rust-version = "1.75"
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["compio", "tokio", "openssl", "rustls", "compress", "cookie", "ws", "brotli", "ntex-tls/rustls-ring"] features = ["tokio", "openssl", "rustls", "compress", "cookie", "ws", "brotli", "ntex-tls/rustls-ring"]
[lib] [lib]
name = "ntex" name = "ntex"
@ -69,7 +69,7 @@ ntex-macros = "0.1.3"
ntex-util = "2" ntex-util = "2"
ntex-bytes = "0.1.27" ntex-bytes = "0.1.27"
ntex-server = "2.4" ntex-server = "2.4"
ntex-h2 = "1.1" ntex-h2 = "1.2"
ntex-rt = "0.4.19" ntex-rt = "0.4.19"
ntex-io = "2.7" ntex-io = "2.7"
ntex-net = "2.4" ntex-net = "2.4"

View file

@ -1,14 +1,13 @@
use std::{future::poll_fn, io}; use std::{future::poll_fn, io};
use ntex_h2::client::{RecvStream, SimpleClient}; use ntex_h2::{self as h2, client::RecvStream, client::SimpleClient, frame};
use ntex_h2::{self as h2, frame};
use crate::http::body::{BodySize, MessageBody}; use crate::http::body::{BodySize, MessageBody};
use crate::http::header::{self, HeaderMap, HeaderValue}; use crate::http::header::{self, HeaderMap, HeaderValue};
use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::{h2::payload, payload::Payload, Method, Version}; use crate::http::{h2::payload, payload::Payload, Method, Version};
use crate::time::{timeout_checked, Millis}; use crate::time::{timeout_checked, Millis};
use crate::util::{ByteString, Bytes}; use crate::util::{select, ByteString, Bytes, Either};
use super::error::{ConnectError, SendRequestError}; use super::error::{ConnectError, SendRequestError};
@ -21,7 +20,12 @@ pub(super) async fn send_request<B>(
where where
B: MessageBody, B: MessageBody,
{ {
log::trace!("Sending client request: {:?} {:?}", head, body.size()); log::trace!(
"{}: Sending client request: {:?} {:?}",
client.client.tag(),
head,
body.size()
);
let length = body.size(); let length = body.size();
let eof = if head.as_ref().method == Method::HEAD { let eof = if head.as_ref().method == Method::HEAD {
true true
@ -82,7 +86,7 @@ where
// at the same time // at the same time
let _ = crate::rt::spawn(async move { let _ = crate::rt::spawn(async move {
if let Err(e) = send_body(body, &snd_stream).await { if let Err(e) = send_body(body, &snd_stream).await {
log::error!("Cannot send body: {:?}", e); log::error!("{}: Cannot send body: {:?}", snd_stream.tag(), e);
snd_stream.reset(frame::Reason::INTERNAL_ERROR); snd_stream.reset(frame::Reason::INTERNAL_ERROR);
} }
}); });
@ -108,7 +112,8 @@ async fn get_response(
eof, eof,
} => { } => {
log::trace!( log::trace!(
"{:?} got response (eof: {}): {:#?}\nheaders: {:#?}", "{}: {:?} got response (eof: {}): {:#?}\nheaders: {:#?}",
stream.tag(),
stream.id(), stream.id(),
eof, eof,
pseudo, pseudo,
@ -122,31 +127,44 @@ async fn get_response(
head.version = Version::HTTP_2; head.version = Version::HTTP_2;
let payload = if !eof { let payload = if !eof {
log::debug!("Creating local payload stream for {:?}", stream.id()); log::debug!(
"{}: Creating local payload stream for {:?}",
stream.tag(),
stream.id()
);
let (mut pl, payload) = let (mut pl, payload) =
payload::Payload::create(stream.empty_capacity()); payload::Payload::create(stream.empty_capacity());
let _ = crate::rt::spawn(async move { let _ = crate::rt::spawn(async move {
loop { loop {
let h2::Message { stream, kind } = let h2::Message { stream, kind } = match select(
match rcv_stream.recv().await { rcv_stream.recv(),
Some(msg) => msg, poll_fn(|cx| pl.on_cancel(cx.waker())),
None => { )
pl.feed_eof(Bytes::new()); .await
break; {
} Either::Left(Some(msg)) => msg,
}; Either::Left(None) => {
pl.feed_eof(Bytes::new());
break;
}
Either::Right(_) => break,
};
match kind { match kind {
h2::MessageKind::Data(data, cap) => { h2::MessageKind::Data(data, cap) => {
log::debug!( log::trace!(
"Got data chunk for {:?}: {:?}", "{}: Got data chunk for {:?}: {:?}",
stream.tag(),
stream.id(), stream.id(),
data.len() data.len()
); );
pl.feed_data(data, cap); pl.feed_data(data, cap);
} }
h2::MessageKind::Eof(item) => { h2::MessageKind::Eof(item) => {
log::debug!( log::trace!(
"Got payload eof for {:?}: {:?}", "{}: Got payload eof for {:?}: {:?}",
stream.tag(),
stream.id(), stream.id(),
item item
); );
@ -163,7 +181,11 @@ async fn get_response(
} }
} }
h2::MessageKind::Disconnect(err) => { h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err); log::trace!(
"{}: Connection is disconnected {:?}",
stream.tag(),
err
);
pl.set_error( pl.set_error(
io::Error::new(io::ErrorKind::Other, err) io::Error::new(io::ErrorKind::Other, err)
.into(), .into(),
@ -207,12 +229,17 @@ async fn send_body<B: MessageBody>(
loop { loop {
match poll_fn(|cx| body.poll_next_chunk(cx)).await { match poll_fn(|cx| body.poll_next_chunk(cx)).await {
Some(Ok(b)) => { Some(Ok(b)) => {
log::debug!("{:?} sending chunk, {} bytes", stream.id(), b.len()); log::trace!(
"{}: {:?} sending chunk, {} bytes",
stream.tag(),
stream.id(),
b.len()
);
stream.send_payload(b, false).await? stream.send_payload(b, false).await?
} }
Some(Err(e)) => return Err(e.into()), Some(Err(e)) => return Err(e.into()),
None => { None => {
log::debug!("{:?} eof of send stream ", stream.id()); log::trace!("{}: {:?} eof of send stream ", stream.tag(), stream.id());
stream.send_payload(Bytes::new(), true).await?; stream.send_payload(Bytes::new(), true).await?;
return Ok(()); return Ok(());
} }

View file

@ -41,8 +41,7 @@ pub use self::response::{ClientResponse, JsonBody, MessageBody};
pub use self::sender::SendClientRequest; pub use self::sender::SendClientRequest;
pub use self::test::TestResponse; pub use self::test::TestResponse;
use crate::http::error::HttpError; use crate::http::{error::HttpError, HeaderMap, Method, RequestHead, Uri};
use crate::http::{HeaderMap, Method, RequestHead, Uri};
use crate::time::Millis; use crate::time::Millis;
use self::connect::{Connect as HttpConnect, ConnectorWrapper}; use self::connect::{Connect as HttpConnect, ConnectorWrapper};

View file

@ -1,6 +1,6 @@
//! Payload stream //! Payload stream
use std::collections::VecDeque; use std::collections::VecDeque;
use std::task::{Context, Poll}; use std::task::{Context, Poll, Waker};
use std::{cell::RefCell, future::poll_fn, pin::Pin, rc::Rc, rc::Weak}; use std::{cell::RefCell, future::poll_fn, pin::Pin, rc::Rc, rc::Weak};
use ntex_h2::{self as h2}; use ntex_h2::{self as h2};
@ -8,6 +8,14 @@ use ntex_h2::{self as h2};
use crate::util::{Bytes, Stream}; use crate::util::{Bytes, Stream};
use crate::{http::error::PayloadError, task::LocalWaker}; use crate::{http::error::PayloadError, task::LocalWaker};
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const EOF = 0b0000_0001;
const DROPPED = 0b0000_0010;
}
}
/// Buffered stream of byte chunks /// Buffered stream of byte chunks
/// ///
/// Payload stores chunks in a vector. First chunk can be received with /// Payload stores chunks in a vector. First chunk can be received with
@ -54,6 +62,14 @@ impl Payload {
} }
} }
impl Drop for Payload {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
inner.io_task.wake();
inner.flags.insert(Flags::DROPPED);
}
}
impl Stream for Payload { impl Stream for Payload {
type Item = Result<Bytes, PayloadError>; type Item = Result<Bytes, PayloadError>;
@ -103,11 +119,24 @@ impl PayloadSender {
shared.borrow_mut().stream = stream; shared.borrow_mut().stream = stream;
} }
} }
pub(crate) fn on_cancel(&self, w: &Waker) -> Poll<()> {
if let Some(shared) = self.inner.upgrade() {
if shared.borrow_mut().flags.contains(Flags::DROPPED) {
Poll::Ready(())
} else {
shared.borrow_mut().io_task.register(w);
Poll::Pending
}
} else {
Poll::Ready(())
}
}
} }
#[derive(Debug)] #[derive(Debug)]
struct Inner { struct Inner {
eof: bool, flags: Flags,
cap: h2::Capacity, cap: h2::Capacity,
err: Option<PayloadError>, err: Option<PayloadError>,
items: VecDeque<Bytes>, items: VecDeque<Bytes>,
@ -120,7 +149,7 @@ impl Inner {
fn new(cap: h2::Capacity) -> Self { fn new(cap: h2::Capacity) -> Self {
Inner { Inner {
cap, cap,
eof: false, flags: Flags::empty(),
err: None, err: None,
stream: None, stream: None,
items: VecDeque::new(), items: VecDeque::new(),
@ -135,7 +164,7 @@ impl Inner {
} }
fn feed_eof(&mut self, data: Bytes) { fn feed_eof(&mut self, data: Bytes) {
self.eof = true; self.flags.insert(Flags::EOF);
if !data.is_empty() { if !data.is_empty() {
self.items.push_back(data); self.items.push_back(data);
} }
@ -153,7 +182,7 @@ impl Inner {
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, PayloadError>>> { ) -> Poll<Option<Result<Bytes, PayloadError>>> {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
if !self.eof { if !self.flags.contains(Flags::EOF) {
self.cap.consume(data.len() as u32); self.cap.consume(data.len() as u32);
if self.cap.size() == 0 { if self.cap.size() == 0 {
@ -163,7 +192,7 @@ impl Inner {
Poll::Ready(Some(Ok(data))) Poll::Ready(Some(Ok(data)))
} else if let Some(err) = self.err.take() { } else if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err))) Poll::Ready(Some(Err(err)))
} else if self.eof { } else if self.flags.contains(Flags::EOF) {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
self.task.register(cx.waker()); self.task.register(cx.waker());