Switch to ntex-h2 for http/2 support (#120)

* switch server to ntex-h2 for http/2 support
This commit is contained in:
Nikolay Kim 2022-06-27 15:28:44 +06:00 committed by GitHub
parent 08a577f730
commit 88c7fd3116
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 879 additions and 747 deletions

View file

@ -15,6 +15,8 @@ pub mod error {
pub use http::header::{InvalidHeaderName, InvalidHeaderValue};
pub use http::method::InvalidMethod;
pub use http::status::InvalidStatusCode;
pub use http::uri::InvalidUri;
pub use http::Error;
}
/// Convert http::HeaderMap to a HeaderMap

View file

@ -102,8 +102,7 @@ impl Extend<HeaderValue> for Value {
where
T: IntoIterator<Item = HeaderValue>,
{
let mut iter = iter.into_iter();
while let Some(h) = iter.next() {
for h in iter.into_iter() {
self.append(h);
}
}
@ -363,6 +362,7 @@ where
Value: TryFrom<V>,
{
#[inline]
#[allow(clippy::mutable_key_type)]
fn from_iter<T: IntoIterator<Item = (N, V)>>(iter: T) -> Self {
let map = iter
.into_iter()
@ -400,7 +400,7 @@ where
impl FromIterator<HeaderValue> for Value {
fn from_iter<T: IntoIterator<Item = HeaderValue>>(iter: T) -> Self {
let mut iter = iter.into_iter();
let value = iter.next().map(|h| Value::One(h));
let value = iter.next().map(Value::One);
let mut value = match value {
Some(v) => v,
_ => Value::One(HeaderValue::from_static("")),

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.18] - 2022-xx-xx
* Add fmt::Debug impl to channel::Pool
## [0.1.17] - 2022-05-25
* Allow to reset time::Deadline

View file

@ -1,6 +1,6 @@
//! A one-shot pool, futures-aware channel.
use slab::Slab;
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use std::{fmt, future::Future, pin::Pin, task::Context, task::Poll};
use super::{cell::Cell, Canceled};
use crate::task::LocalWaker;
@ -17,6 +17,14 @@ pub type OneshotsPool<T> = Pool<T>;
/// Futures-aware, pool of one-shot's.
pub struct Pool<T>(Cell<Slab<Inner<T>>>);
impl<T> fmt::Debug for Pool<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pool")
.field("size", &self.0.get_ref().len())
.finish()
}
}
bitflags::bitflags! {
struct Flags: u8 {
const SENDER = 0b0000_0001;

View file

@ -1,5 +1,9 @@
# Changes
## [0.5.20] - 2022-06-27
* http: replace h2 crate with ntex-h2
## [0.5.19] - 2022-06-23
* connect: move to separate crate

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.5.19"
version = "0.5.20"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -39,7 +39,7 @@ cookie = ["coo-kie", "coo-kie/percent-encode"]
url = ["url-pkg"]
# tokio runtime
tokio = ["ntex-rt/tokio", "ntex-connect/tokio"]
tokio = ["ntex-rt/tokio", "ntex-tokio", "ntex-connect/tokio"]
# glommio runtime
glommio = ["ntex-rt/glommio", "ntex-glommio", "ntex-connect/glommio"]
@ -56,10 +56,11 @@ ntex-service = "0.3.1"
ntex-macros = "0.1.3"
ntex-util = "0.1.17"
ntex-bytes = "0.1.14"
ntex-h2 = "0.1.0"
ntex-rt = "0.4.4"
ntex-io = "0.1.8"
ntex-tls = "0.1.5"
ntex-tokio = "0.1.3"
ntex-tokio = { version = "0.1.3", optional = true }
ntex-glommio = { version = "0.1.2", optional = true }
ntex-async-std = { version = "0.1.1", optional = true }
@ -79,7 +80,6 @@ socket2 = "0.4"
thiserror = "1.0"
# http/web framework
h2 = "0.3.9"
http = "0.2"
httparse = "1.6.0"
httpdate = "1.0"

View file

@ -19,12 +19,12 @@ async fn handle_request(mut req: Request) -> Result<Response, io::Error> {
#[ntex::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "echo=info");
env::set_var("RUST_LOG", "trace");
env_logger::init();
Server::build()
.bind("echo", "127.0.0.1:8080", |_| {
HttpService::build().finish(handle_request)
HttpService::build().h2(handle_request)
})?
.run()
.await

View file

@ -1,5 +1,7 @@
use std::{error::Error, fmt, marker::PhantomData};
use ntex_h2::{self as h2};
use crate::http::body::MessageBody;
use crate::http::config::{KeepAlive, OnRequest, ServiceConfig};
use crate::http::error::ResponseError;
@ -24,6 +26,7 @@ pub struct HttpServiceBuilder<F, S, X = ExpectHandler, U = UpgradeHandler<F>> {
expect: X,
upgrade: Option<U>,
on_request: Option<OnRequest>,
h2config: h2::Config,
_t: PhantomData<(F, S)>,
}
@ -38,6 +41,7 @@ impl<F, S> HttpServiceBuilder<F, S, ExpectHandler, UpgradeHandler<F>> {
expect: ExpectHandler,
upgrade: None,
on_request: None,
h2config: h2::Config::server(),
_t: PhantomData,
}
}
@ -75,6 +79,7 @@ where
/// By default client timeout is set to 3 seconds.
pub fn client_timeout(mut self, timeout: Seconds) -> Self {
self.client_timeout = timeout.into();
self.h2config.client_timeout(timeout);
self
}
@ -88,6 +93,7 @@ where
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self {
self.client_disconnect = timeout;
self.h2config.disconnect_timeout(timeout);
self
}
@ -99,6 +105,17 @@ where
/// By default handshake timeout is set to 5 seconds.
pub fn ssl_handshake_timeout(mut self, timeout: Seconds) -> Self {
self.handshake_timeout = timeout.into();
self.h2config.handshake_timeout(timeout);
self
}
#[doc(hidden)]
/// Configure http2 connection settings
pub fn configure_http2<O, R>(self, f: O) -> Self
where
O: FnOnce(&h2::Config) -> R,
{
let _ = f(&self.h2config);
self
}
@ -121,6 +138,7 @@ where
expect: expect.into_factory(),
upgrade: self.upgrade,
on_request: self.on_request,
h2config: self.h2config,
_t: PhantomData,
}
}
@ -144,6 +162,7 @@ where
expect: self.expect,
upgrade: Some(upgrade.into_factory()),
on_request: self.on_request,
h2config: self.h2config,
_t: PhantomData,
}
}
@ -174,6 +193,7 @@ where
self.client_timeout,
self.client_disconnect,
self.handshake_timeout,
self.h2config,
);
H1Service::with_config(cfg, service.into_factory())
.expect(self.expect)
@ -196,6 +216,7 @@ where
self.client_timeout,
self.client_disconnect,
self.handshake_timeout,
self.h2config,
);
H2Service::with_config(cfg, service.into_factory())
@ -217,6 +238,7 @@ where
self.client_timeout,
self.client_disconnect,
self.handshake_timeout,
self.h2config,
);
HttpService::with_config(cfg, service.into_factory())
.expect(self.expect)

View file

@ -1,20 +1,15 @@
use std::{cell::RefCell, fmt, rc::Rc, time};
use h2::client::SendRequest;
use std::{fmt, time};
use crate::http::body::MessageBody;
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::Payload;
use crate::io::{types::HttpProtocol, IoBoxed};
use crate::util::Bytes;
use super::error::SendRequestError;
use super::pool::Acquired;
use super::{h1proto, h2proto};
use super::{error::SendRequestError, h1proto, h2proto, pool::Acquired};
pub(super) enum ConnectionType {
H1(IoBoxed),
H2(H2Sender),
H2(h2proto::H2Client),
}
impl fmt::Debug for ConnectionType {
@ -26,32 +21,6 @@ impl fmt::Debug for ConnectionType {
}
}
#[derive(Clone)]
pub(super) struct H2Sender(Rc<RefCell<H2SenderInner>>);
struct H2SenderInner {
io: SendRequest<Bytes>,
closed: bool,
}
impl H2Sender {
pub(super) fn new(io: SendRequest<Bytes>) -> Self {
Self(Rc::new(RefCell::new(H2SenderInner { io, closed: false })))
}
pub(super) fn is_closed(&self) -> bool {
self.0.borrow().closed
}
pub(super) fn close(&self) {
self.0.borrow_mut().closed = true;
}
pub(super) fn get_sender(&self) -> SendRequest<Bytes> {
self.0.borrow().io.clone()
}
}
#[doc(hidden)]
/// HTTP client connection
pub struct Connection {

View file

@ -1,12 +1,12 @@
use std::{rc::Rc, task::Context, task::Poll, time::Duration};
use ntex_h2::{self as h2};
use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
use crate::http::Uri;
use crate::io::IoBoxed;
use crate::service::{apply_fn, boxed, Service};
use crate::time::{Millis, Seconds};
use crate::util::timeout::{TimeoutError, TimeoutService};
use crate::util::{Either, Ready};
use crate::util::{timeout::TimeoutError, timeout::TimeoutService, Either, Ready};
use crate::{http::Uri, io::IoBoxed};
use super::connection::Connection;
use super::error::ConnectError;
@ -40,6 +40,7 @@ pub struct Connector {
conn_keep_alive: Duration,
disconnect_timeout: Millis,
limit: usize,
h2config: h2::Config,
connector: BoxedConnector,
ssl_connector: Option<BoxedConnector>,
}
@ -64,6 +65,7 @@ impl Connector {
conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: Millis(3_000),
limit: 100,
h2config: h2::Config::client(),
};
#[cfg(feature = "openssl")]
@ -74,6 +76,9 @@ impl Connector {
let _ = ssl
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| error!("Cannot set ALPN protocol: {:?}", e));
ssl.set_verify(tls_openssl::ssl::SslVerifyMode::NONE);
conn.openssl(ssl.build())
}
#[cfg(all(not(feature = "openssl"), feature = "rustls"))]
@ -174,6 +179,16 @@ impl Connector {
self
}
#[doc(hidden)]
/// Configure http2 connection settings
pub fn configure_http2<O, R>(self, f: O) -> Self
where
O: FnOnce(&h2::Config) -> R,
{
let _ = f(&self.h2config);
self
}
/// Use custom connector to open un-secured connections.
pub fn connector<T>(mut self, connector: T) -> Self
where
@ -213,6 +228,7 @@ impl Connector {
self.conn_keep_alive,
self.disconnect_timeout,
self.limit,
self.h2config.clone(),
))
} else {
None
@ -225,6 +241,7 @@ impl Connector {
self.conn_keep_alive,
self.disconnect_timeout,
self.limit,
self.h2config.clone(),
),
ssl_pool,
})

View file

@ -49,10 +49,6 @@ pub enum ConnectError {
#[error("No dns records found for the input")]
NoRecords,
/// Http2 error
#[error("{0}")]
H2(#[from] h2::Error),
/// Connecting took too long
#[error("Timeout out while establishing connection")]
Timeout,
@ -117,7 +113,7 @@ pub enum SendRequestError {
Http(#[from] HttpError),
/// Http2 error
#[error("Http2 error {0}")]
H2(#[from] h2::Error),
H2(#[from] ntex_h2::OperationError),
/// Response took too long
#[error("Timeout out while waiting for response")]
Timeout,

View file

@ -1,20 +1,18 @@
use std::convert::TryFrom;
use std::{cell::RefCell, convert::TryFrom, io, rc::Rc, task::Context, task::Poll};
use h2::SendStream;
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, Method, Version};
use ntex_h2::{self as h2, client::Client, frame};
use crate::http::body::{BodySize, MessageBody};
use crate::http::header::HeaderMap;
use crate::http::header::{self, HeaderMap, HeaderValue};
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::Payload;
use crate::util::{poll_fn, Bytes};
use crate::http::{h2::payload, payload::Payload, Method, Version};
use crate::util::{poll_fn, ByteString, Bytes, HashMap, Ready};
use crate::{channel::oneshot, service::Service};
use super::connection::H2Sender;
use super::error::SendRequestError;
pub(super) async fn send_request<B>(
io: H2Sender,
client: H2Client,
head: RequestHeadType,
body: B,
) -> Result<(ResponseHead, Payload), SendRequestError>
@ -22,34 +20,14 @@ where
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.size());
let head_req = head.as_ref().method == Method::HEAD;
let length = body.size();
let eof = matches!(
length,
BodySize::None | BodySize::Empty | BodySize::Sized(0)
);
let mut req = Request::new(());
*req.uri_mut() = head.as_ref().uri.clone();
*req.method_mut() = head.as_ref().method.clone();
*req.version_mut() = Version::HTTP_2;
let mut skip_len = true;
// Content length
let _ = match length {
BodySize::None => None,
BodySize::Stream => {
skip_len = false;
None
}
BodySize::Empty => req
.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
BodySize::Sized(len) => req.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
let eof = if head.as_ref().method == Method::HEAD {
true
} else {
matches!(
length,
BodySize::None | BodySize::Empty | BodySize::Sized(0)
)
};
// Extracting extra headers from RequestHeadType. HeaderMap::new() does not allocate.
@ -70,85 +48,257 @@ where
.chain(extra_headers.iter());
// copy headers
let mut hdrs = HeaderMap::new();
for (key, value) in headers {
match *key {
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
CONTENT_LENGTH if skip_len => continue,
header::CONNECTION | header::TRANSFER_ENCODING => continue, // http2 specific
_ => (),
}
req.headers_mut().append(key, value.clone());
hdrs.append(key.clone(), value.clone());
}
let mut sender = io.get_sender();
let res = poll_fn(|cx| sender.poll_ready(cx)).await;
if let Err(e) = res {
log::trace!("SendRequest readiness failed: {:?}", e);
return Err(SendRequestError::from(e));
}
let resp = match sender.send_request(req, eof) {
Ok((fut, send)) => {
if !eof {
send_body(body, send).await?;
}
fut.await.map_err(SendRequestError::from)?
}
Err(e) => {
return Err(e.into());
// Content length
let _ = match length {
BodySize::None | BodySize::Stream => (),
BodySize::Empty => {
hdrs.insert(header::CONTENT_LENGTH, HeaderValue::from_static("0"))
}
BodySize::Sized(len) => hdrs.insert(
header::CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
};
let (parts, body) = resp.into_parts();
let payload = if head_req { Payload::None } else { body.into() };
// send request
let stream = client
.0
.client
.send_request(
head.as_ref().method.clone(),
ByteString::from(format!("{}", head.as_ref().uri)),
hdrs,
eof,
)
.await?;
let mut head = ResponseHead::new(parts.status);
head.version = parts.version;
head.headers = parts.headers.into();
Ok((head, payload))
// send body
let id = stream.id();
if eof {
let result = client.wait_response(id).await;
client.set_stream(stream);
result
} else {
let c = client.clone();
crate::rt::spawn(async move {
if let Err(e) = send_body(body, &stream).await {
c.set_error(stream.id(), e);
} else {
c.set_stream(stream);
}
});
client.wait_response(id).await
}
}
async fn send_body<B: MessageBody>(
mut body: B,
mut send: SendStream<Bytes>,
stream: &h2::Stream,
) -> Result<(), SendRequestError> {
let mut buf = None;
loop {
if buf.is_none() {
match poll_fn(|cx| body.poll_next_chunk(cx)).await {
Some(Ok(b)) => {
send.reserve_capacity(b.len());
buf = Some(b);
}
Some(Err(e)) => return Err(e.into()),
None => {
if let Err(e) = send.send_data(Bytes::new(), true) {
return Err(e.into());
}
send.reserve_capacity(0);
return Ok(());
}
}
}
match poll_fn(|cx| send.poll_capacity(cx)).await {
None => return Ok(()),
Some(Ok(cap)) => {
let b = buf.as_mut().unwrap();
let len = b.len();
let bytes = b.split_to(std::cmp::min(cap, len));
if let Err(e) = send.send_data(bytes, false) {
return Err(e.into());
} else {
if !b.is_empty() {
send.reserve_capacity(b.len());
} else {
buf = None;
}
continue;
}
match poll_fn(|cx| body.poll_next_chunk(cx)).await {
Some(Ok(b)) => {
log::debug!("{:?} sending chunk, {} bytes", stream.id(), b.len());
stream.send_payload(b, false).await?
}
Some(Err(e)) => return Err(e.into()),
None => {
log::debug!("{:?} eof of send stream ", stream.id());
stream.send_payload(Bytes::new(), true).await?;
return Ok(());
}
}
}
}
#[derive(Clone)]
pub(super) struct H2Client(Rc<H2ClientInner>);
impl H2Client {
pub(super) fn new(client: Client) -> Self {
Self(Rc::new(H2ClientInner {
client,
streams: RefCell::new(HashMap::default()),
}))
}
pub(super) fn close(&self) {
self.0.client.close()
}
pub(super) fn is_closed(&self) -> bool {
self.0.client.is_closed()
}
fn set_error(&self, id: frame::StreamId, err: SendRequestError) {
if let Some(mut info) = self.0.streams.borrow_mut().remove(&id) {
if let Some(tx) = info.tx.take() {
let _ = tx.send(Err(err));
}
}
}
fn set_stream(&self, stream: h2::Stream) {
if let Some(info) = self.0.streams.borrow_mut().get_mut(&stream.id()) {
// response is not received yet
if info.tx.is_some() {
info.stream = Some(stream);
} else if let Some(ref mut sender) = info.payload {
sender.set_stream(Some(stream));
}
}
}
async fn wait_response(
&self,
id: frame::StreamId,
) -> Result<(ResponseHead, Payload), SendRequestError> {
let (tx, rx) = oneshot::channel();
let info = StreamInfo {
tx: Some(tx),
stream: None,
payload: None,
};
self.0.streams.borrow_mut().insert(id, info);
match rx.await {
Ok(item) => item,
Err(_) => Err(SendRequestError::Error(Box::new(io::Error::new(
io::ErrorKind::Other,
"disconnected",
)))),
}
}
}
struct H2ClientInner {
client: Client,
streams: RefCell<HashMap<frame::StreamId, StreamInfo>>,
}
struct StreamInfo {
tx: Option<oneshot::Sender<Result<(ResponseHead, Payload), SendRequestError>>>,
stream: Option<h2::Stream>,
payload: Option<payload::PayloadSender>,
}
pub(super) struct H2PublishService(Rc<H2ClientInner>);
impl H2PublishService {
pub(super) fn new(client: H2Client) -> Self {
Self(client.0)
}
}
impl Service<h2::Message> for H2PublishService {
type Response = ();
type Error = &'static str;
type Future = Ready<Self::Response, Self::Error>;
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_shutdown(&self, _: &mut Context<'_>, _: bool) -> Poll<()> {
Poll::Ready(())
}
fn call(&self, mut msg: h2::Message) -> Self::Future {
match msg.kind().take() {
h2::MessageKind::Headers {
pseudo,
headers,
eof,
} => {
log::trace!(
"{:?} got response (eof: {}): {:#?}\nheaders: {:#?}",
msg.id(),
eof,
pseudo,
headers
);
let status = match pseudo.status {
Some(status) => status,
None => {
if let Some(mut info) =
self.0.streams.borrow_mut().remove(&msg.id())
{
let _ = info.tx.take().unwrap().send(Err(
SendRequestError::H2(h2::OperationError::Connection(
h2::ConnectionError::MissingPseudo("Status"),
)),
));
}
return Ready::Err("Missing status header");
}
};
let mut head = ResponseHead::new(status);
head.headers = headers;
head.version = Version::HTTP_2;
if let Some(info) = self.0.streams.borrow_mut().get_mut(&msg.id()) {
let stream = info.stream.take();
let payload = if !eof {
log::debug!("Creating local payload stream for {:?}", msg.id());
let (sender, payload) =
payload::Payload::create(msg.stream().empty_capacity());
sender.set_stream(stream);
info.payload = Some(sender);
Payload::H2(payload)
} else {
Payload::None
};
let _ = info.tx.take().unwrap().send(Ok((head, payload)));
Ready::Ok(())
} else {
Ready::Err("Cannot find Stream info")
}
}
h2::MessageKind::Data(data, cap) => {
log::debug!("Got data chunk for {:?}: {:?}", msg.id(), data.len());
if let Some(info) = self.0.streams.borrow_mut().get_mut(&msg.id()) {
if let Some(ref mut pl) = info.payload {
pl.feed_data(data, cap);
}
Ready::Ok(())
} else {
log::error!("Payload stream does not exists for {:?}", msg.id());
Ready::Err("Cannot find Stream info")
}
}
h2::MessageKind::Eof(item) => {
log::debug!("Got payload eof for {:?}: {:?}", msg.id(), item);
if let Some(mut info) = self.0.streams.borrow_mut().remove(&msg.id()) {
if let Some(ref mut pl) = info.payload {
match item {
h2::StreamEof::Data(data) => {
pl.feed_eof(data);
}
h2::StreamEof::Trailers(_) => {
pl.feed_eof(Bytes::new());
}
h2::StreamEof::Error(err) => pl.set_error(err.into()),
}
}
Ready::Ok(())
} else {
Ready::Err("Cannot find Stream info")
}
}
_ => Ready::Ok(()),
}
}
}

View file

@ -2,15 +2,16 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc};
use h2::client::{Builder, Connection as H2Connection, SendRequest};
use http::uri::Authority;
use ntex_h2::{self as h2};
use crate::io::{types::HttpProtocol, IoBoxed, TokioIoBoxed};
use crate::http::uri::Authority;
use crate::io::{types::HttpProtocol, IoBoxed};
use crate::time::{now, Millis};
use crate::util::{ready, Bytes, HashMap, HashSet};
use crate::util::{ready, HashMap, HashSet};
use crate::{channel::pool, rt::spawn, service::Service, task::LocalWaker};
use super::connection::{Connection, ConnectionType, H2Sender};
use super::connection::{Connection, ConnectionType};
use super::h2proto::{H2Client, H2PublishService};
use super::{error::ConnectError, Connect};
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
@ -58,6 +59,7 @@ where
conn_keep_alive: Duration,
disconnect_timeout: Millis,
limit: usize,
h2config: h2::Config,
) -> Self {
let connector = Rc::new(connector);
let waiters = Rc::new(RefCell::new(Waiters {
@ -69,6 +71,7 @@ where
conn_keep_alive,
disconnect_timeout,
limit,
h2config,
acquired: 0,
available: HashMap::default(),
connecting: HashSet::default(),
@ -185,6 +188,7 @@ pub(super) struct Inner {
conn_keep_alive: Duration,
disconnect_timeout: Millis,
limit: usize,
h2config: h2::Config,
acquired: usize,
available: HashMap<Key, VecDeque<AvailableConnection>>,
connecting: HashSet<Key>,
@ -387,16 +391,9 @@ where
}
}
type H2Future = Box<
dyn Future<
Output = Result<(SendRequest<Bytes>, H2Connection<TokioIoBoxed, Bytes>), h2::Error>,
>,
>;
struct OpenConnection<F> {
key: Key,
fut: F,
h2: Option<Pin<H2Future>>,
tx: Option<Waiter>,
guard: Option<OpenGuard>,
disconnect_timeout: Millis,
@ -413,7 +410,6 @@ where
spawn(OpenConnection {
fut,
disconnect_timeout,
h2: None,
tx: Some(tx),
key: key.clone(),
inner: inner.clone(),
@ -431,57 +427,6 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// handle http2 connection
if let Some(ref mut h2) = this.h2 {
return match ready!(Pin::new(h2).poll(cx)) {
Ok((snd, connection)) => {
// h2 connection is ready
let h2 = H2Sender::new(snd);
let guard = this.guard.take().unwrap().consume();
let conn = Connection::new(
ConnectionType::H2(h2.clone()),
now(),
Some(guard.clone()),
);
if this.tx.take().unwrap().send(Ok(conn)).is_err() {
// waiter is gone, return connection to pool
log::trace!(
"Waiter for {:?} is gone while connecting to host",
&this.key.authority
);
}
// put h2 connection to list of available connections
Connection::new(ConnectionType::H2(h2.clone()), now(), Some(guard))
.release(false);
let key = this.key.clone();
spawn(async move {
let res = connection.await;
h2.close();
log::trace!(
"Http/2 connection is closed for {:?} with {:?}",
key.authority,
res
);
});
Poll::Ready(())
}
Err(err) => {
trace!(
"Failed to negotiate h2 connection for {:?} with error {:?}",
&this.key.authority,
err
);
let _ = this.guard.take();
if let Some(rx) = this.tx.take() {
let _ = rx.send(Err(ConnectError::H2(err)));
}
Poll::Ready(())
}
};
}
// open tcp connection
match ready!(Pin::new(&mut this.fut).poll(cx)) {
Err(err) => {
@ -506,9 +451,42 @@ where
"Connection for {:?} is established, start http2 handshake",
&this.key.authority
);
this.h2 =
Some(Box::pin(Builder::new().handshake(TokioIoBoxed::from(io))));
self.poll(cx)
let connection = h2::client::ClientConnection::new(
io,
this.inner.borrow().h2config.clone(),
);
let client = H2Client::new(connection.client());
let key = this.key.clone();
let publish = H2PublishService::new(client.clone());
crate::rt::spawn(async move {
let res = connection.start(publish).await;
log::trace!(
"Http/2 connection is closed for {:?} with {:?}",
key.authority,
res
);
});
let guard = this.guard.take().unwrap().consume();
let conn = Connection::new(
ConnectionType::H2(client.clone()),
now(),
Some(guard.clone()),
);
if this.tx.take().unwrap().send(Ok(conn)).is_err() {
// waiter is gone, return connection to pool
log::trace!(
"Waiter for {:?} is gone while connecting to host",
&this.key.authority
);
}
// put h2 connection to list of available connections
Connection::new(ConnectionType::H2(client), now(), Some(guard))
.release(false);
Poll::Ready(())
} else {
log::trace!(
"Connection for {:?} is established, init http1 connection",
@ -644,6 +622,7 @@ mod tests {
Duration::from_secs(10),
Millis::ZERO,
1,
h2::Config::client(),
)
.clone();

View file

@ -1,7 +1,9 @@
use std::{cell::Cell, ptr::copy_nonoverlapping, rc::Rc, time, time::Duration};
use ntex_h2::{self as h2};
use crate::http::{Request, Response};
use crate::time::{now, sleep, Millis, Seconds, Sleep};
use crate::time::{sleep, Millis, Seconds};
use crate::{io::IoRef, service::boxed::BoxService, util::BytesMut};
#[derive(Debug, PartialEq, Clone, Copy)]
@ -47,6 +49,7 @@ pub(super) struct Inner {
pub(super) ka_enabled: bool,
pub(super) timer: DateService,
pub(super) ssl_handshake_timeout: Millis,
pub(super) h2config: h2::Config,
}
impl Clone for ServiceConfig {
@ -62,6 +65,7 @@ impl Default for ServiceConfig {
Millis(1_000),
Seconds::ONE,
Millis(5_000),
h2::Config::server(),
)
}
}
@ -73,6 +77,7 @@ impl ServiceConfig {
client_timeout: Millis,
client_disconnect: Seconds,
ssl_handshake_timeout: Millis,
h2config: h2::Config,
) -> ServiceConfig {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (Millis::from(val), true),
@ -87,6 +92,7 @@ impl ServiceConfig {
client_timeout,
client_disconnect,
ssl_handshake_timeout,
h2config,
timer: DateService::new(),
}))
}
@ -131,24 +137,6 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
pub(super) fn keep_alive_enabled(&self) -> bool {
self.ka_enabled
}
/// Return keep-alive timer Sleep is configured.
pub(super) fn keep_alive_timer(&self) -> Option<Sleep> {
if self.keep_alive != Duration::ZERO {
Some(sleep(self.keep_alive))
} else {
None
}
}
/// Keep-alive expire time
pub(super) fn keep_alive_expire(&self) -> Option<time::Instant> {
if self.keep_alive != Duration::ZERO {
Some(now() + self.keep_alive)
} else {
None
}
}
}
const DATE_VALUE_LENGTH_HDR: usize = 39;
@ -211,11 +199,6 @@ impl DateService {
}
}
pub(super) fn now(&self) -> time::Instant {
self.check_date();
self.0.current_time.get()
}
pub(super) fn set_date<F: FnMut(&[u8])>(&self, mut f: F) {
self.check_date();
let date = self.0.current_date.get();

View file

@ -1,7 +1,8 @@
//! Http related errors
use std::{fmt, io, io::Write, str::Utf8Error, string::FromUtf8Error};
use std::{error, fmt, io, io::Write, str::Utf8Error, string::FromUtf8Error};
use http::{header, uri::InvalidUri, StatusCode};
use ntex_h2::{self as h2};
// re-export for convinience
pub use crate::channel::Canceled;
@ -129,7 +130,7 @@ pub enum PayloadError {
UnknownLength,
/// Http2 payload error
#[error("{0}")]
Http2Payload(#[from] h2::Error),
Http2Payload(#[from] h2::StreamError),
/// Parse error
#[error("Parse error: {0}")]
Parse(#[from] ParseError),
@ -172,7 +173,7 @@ pub enum DispatchError {
/// Http/2 error
#[error("{0}")]
H2(#[from] h2::Error),
H2(#[from] H2Error),
/// The first request did not complete within the specified timeout.
#[error("The first request did not complete within the specified timeout")]
@ -209,6 +210,23 @@ impl From<io::Error> for DispatchError {
}
}
#[derive(thiserror::Error, Debug)]
/// A set of errors that can occur during dispatching http2 requests
pub enum H2Error {
/// Operation error
#[error("Operation error: {0}")]
Operation(#[from] h2::OperationError),
/// Pseudo headers error
#[error("Missing pseudo header: {0}")]
MissingPseudo(&'static str),
/// Uri parsing error
#[error("Uri: {0}")]
Uri(#[from] InvalidUri),
/// Body stream error
#[error("{0}")]
Stream(#[from] Box<dyn error::Error>),
}
/// A set of error that can occure during parsing content type
#[derive(thiserror::Error, PartialEq, Debug)]
pub enum ContentTypeError {

View file

@ -713,6 +713,7 @@ mod tests {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cell::Cell, io, sync::Arc};
use ntex_h2::Config;
use rand::Rng;
use super::*;
@ -743,6 +744,7 @@ mod tests {
Millis(1_000),
Seconds::ZERO,
Millis(5_000),
Config::server(),
);
Dispatcher::new(
nio::Io::new(stream),
@ -796,6 +798,7 @@ mod tests {
Millis(1_000),
Seconds::ZERO,
Millis(5_000),
Config::server(),
);
let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<Base>>::new(
nio::Io::new(server),

View file

@ -1,328 +0,0 @@
use std::task::{Context, Poll};
use std::{convert::TryFrom, future::Future, marker::PhantomData, pin::Pin, rc::Rc, time};
use h2::server::{Connection, SendResponse};
use h2::SendStream;
use log::{error, trace};
use crate::http::body::{BodySize, MessageBody, ResponseBody};
use crate::http::config::{DateService, DispatcherConfig};
use crate::http::error::{DispatchError, ResponseError};
use crate::http::header::{
HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING,
};
use crate::http::message::{CurrentIo, ResponseHead};
use crate::http::{payload::Payload, request::Request, response::Response};
use crate::io::{IoRef, TokioIoBoxed};
use crate::service::Service;
use crate::time::{now, Sleep};
use crate::util::{Bytes, BytesMut};
const CHUNK_SIZE: usize = 16_384;
pin_project_lite::pin_project! {
/// Dispatcher for HTTP/2 protocol
pub struct Dispatcher<S: Service<Request>, B: MessageBody, X, U> {
io: IoRef,
config: Rc<DispatcherConfig<S, X, U>>,
connection: Connection<TokioIoBoxed, Bytes>,
ka_expire: time::Instant,
ka_timer: Option<Sleep>,
_t: PhantomData<B>,
}
}
impl<S, B, X, U> Dispatcher<S, B, X, U>
where
S: Service<Request> + 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
{
pub(in crate::http) fn new(
io: IoRef,
config: Rc<DispatcherConfig<S, X, U>>,
connection: Connection<TokioIoBoxed, Bytes>,
timeout: Option<Sleep>,
) -> Self {
// keep-alive timer
let (ka_expire, ka_timer) = if let Some(delay) = timeout {
let expire = config.timer.now() + config.keep_alive;
(expire, Some(delay))
} else if let Some(delay) = config.keep_alive_timer() {
let expire = config.timer.now() + config.keep_alive;
(expire, Some(delay))
} else {
(now(), None)
};
Dispatcher {
io,
config,
connection,
ka_expire,
ka_timer,
_t: PhantomData,
}
}
}
impl<S, B, X, U> Future for Dispatcher<S, B, X, U>
where
S: Service<Request> + 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
{
type Output = Result<(), DispatchError>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match Pin::new(&mut this.connection).poll_accept(cx) {
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
Poll::Ready(Some(Ok((req, res)))) => {
trace!("h2 message is received: {:?}", req);
// update keep-alive expire
if this.ka_timer.is_some() {
if let Some(expire) = this.config.keep_alive_expire() {
this.ka_expire = expire;
}
}
let (parts, body) = req.into_parts();
let mut req = Request::with_payload(Payload::H2(
crate::http::h2::Payload::new(body),
));
let head = &mut req.head_mut();
head.uri = parts.uri;
head.method = parts.method;
head.version = parts.version;
head.headers = parts.headers.into();
head.io = CurrentIo::Ref(this.io.clone());
crate::rt::spawn(ServiceResponse {
state: ServiceResponseState::ServiceCall {
call: this.config.service.call(req),
send: Some(res),
},
timer: this.config.timer.clone(),
buffer: None,
_t: PhantomData,
});
}
Poll::Pending => return Poll::Pending,
}
}
}
}
pin_project_lite::pin_project! {
struct ServiceResponse<F, I, E, B> {
#[pin]
state: ServiceResponseState<F, B>,
timer: DateService,
buffer: Option<Bytes>,
_t: PhantomData<(I, E)>,
}
}
pin_project_lite::pin_project! {
#[project = ServiceResponseStateProject]
enum ServiceResponseState<F, B> {
ServiceCall { #[pin] call: F, send: Option<SendResponse<Bytes>> },
SendPayload { stream: SendStream<Bytes>, body: ResponseBody<B> },
}
}
impl<F, I, E, B> ServiceResponse<F, I, E, B>
where
F: Future<Output = Result<I, E>>,
E: ResponseError + 'static,
I: Into<Response<B>>,
B: MessageBody,
{
fn prepare_response(
&self,
head: &ResponseHead,
size: &mut BodySize,
) -> http::Response<()> {
let mut has_date = false;
let mut skip_len = size != &BodySize::Stream;
let mut res = http::Response::new(());
*res.status_mut() = head.status;
*res.version_mut() = http::Version::HTTP_2;
// Content length
match head.status {
http::StatusCode::NO_CONTENT
| http::StatusCode::CONTINUE
| http::StatusCode::PROCESSING => *size = BodySize::None,
http::StatusCode::SWITCHING_PROTOCOLS => {
skip_len = true;
*size = BodySize::Stream;
}
_ => (),
}
let _ = match size {
BodySize::None | BodySize::Stream => None,
BodySize::Empty => res
.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
BodySize::Sized(len) => res.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
};
// copy headers
for (key, value) in head.headers.iter() {
match *key {
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
CONTENT_LENGTH if skip_len => continue,
DATE => has_date = true,
_ => (),
}
res.headers_mut().append(key, value.clone());
}
// set date header
if !has_date {
let mut bytes = BytesMut::with_capacity(29);
self.timer.set_date(|date| bytes.extend_from_slice(date));
res.headers_mut().insert(DATE, unsafe {
HeaderValue::from_maybe_shared_unchecked(bytes.freeze())
});
}
res
}
}
impl<F, I, E, B> Future for ServiceResponse<F, I, E, B>
where
F: Future<Output = Result<I, E>>,
E: ResponseError + 'static,
I: Into<Response<B>>,
B: MessageBody,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.project() {
ServiceResponseStateProject::ServiceCall { call, send } => {
match call.poll(cx) {
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
let mut send = send.take().unwrap();
let mut size = body.size();
let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
this = self.as_mut().project();
let stream = match send.send_response(h2_res, size.is_eof()) {
Err(e) => {
trace!("Error sending h2 response: {:?}", e);
return Poll::Ready(());
}
Ok(stream) => stream,
};
if size.is_eof() {
Poll::Ready(())
} else {
this.state
.set(ServiceResponseState::SendPayload { stream, body });
self.poll(cx)
}
}
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
let res: Response = e.into();
let (res, body) = res.replace_body(());
let mut send = send.take().unwrap();
let mut size = body.size();
let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
this = self.as_mut().project();
let stream = match send.send_response(h2_res, size.is_eof()) {
Err(e) => {
trace!("Error sending h2 response: {:?}", e);
return Poll::Ready(());
}
Ok(stream) => stream,
};
if size.is_eof() {
Poll::Ready(())
} else {
this.state.set(ServiceResponseState::SendPayload {
stream,
body: body.into_body(),
});
self.poll(cx)
}
}
}
}
ServiceResponseStateProject::SendPayload { stream, body } => loop {
loop {
if let Some(buffer) = this.buffer {
match stream.poll_capacity(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(Ok(cap))) => {
let len = buffer.len();
let bytes = buffer.split_to(std::cmp::min(cap, len));
if let Err(e) = stream.send_data(bytes, false) {
warn!("{:?}", e);
return Poll::Ready(());
} else if !buffer.is_empty() {
let cap = std::cmp::min(buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
this.buffer.take();
}
}
Poll::Ready(Some(Err(e))) => {
warn!("{:?}", e);
return Poll::Ready(());
}
}
} else {
match body.poll_next_chunk(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
if let Err(e) = stream.send_data(Bytes::new(), true) {
warn!("{:?}", e);
}
return Poll::Ready(());
}
Poll::Ready(Some(Ok(chunk))) => {
stream.reserve_capacity(std::cmp::min(
chunk.len(),
CHUNK_SIZE,
));
*this.buffer = Some(chunk);
}
Poll::Ready(Some(Err(e))) => {
error!("Response payload stream error: {:?}", e);
return Poll::Ready(());
}
}
}
}
},
}
}
}

View file

@ -1,46 +1,8 @@
//! HTTP/2 implementation
use std::pin::Pin;
use std::task::{Context, Poll};
use h2::RecvStream;
mod dispatcher;
pub(super) mod payload;
mod service;
pub use self::dispatcher::Dispatcher;
pub use self::payload::Payload;
pub use self::service::H2Service;
use crate::{http::error::PayloadError, util::Bytes, util::Stream};
/// H2 receive stream
#[derive(Debug)]
pub struct Payload {
pl: RecvStream,
}
impl Payload {
pub(crate) fn new(pl: RecvStream) -> Self {
Self { pl }
}
}
impl Stream for Payload {
type Item = Result<Bytes, PayloadError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.pl).poll_data(cx) {
Poll::Ready(Some(Ok(chunk))) => {
let len = chunk.len();
if let Err(err) = this.pl.flow_control().release_capacity(len) {
Poll::Ready(Some(Err(err.into())))
} else {
Poll::Ready(Some(Ok(Bytes::copy_from_slice(&chunk[..]))))
}
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
}
}
}
pub(in crate::http) use self::service::handle;

172
ntex/src/http/h2/payload.rs Normal file
View file

@ -0,0 +1,172 @@
//! Payload stream
use std::task::{Context, Poll};
use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, rc::Weak};
use ntex_h2::{self as h2};
use crate::util::{poll_fn, Bytes, Stream};
use crate::{http::error::PayloadError, task::LocalWaker};
/// Buffered stream of byte chunks
///
/// Payload stores chunks in a vector. First chunk can be received with
/// `.readany()` method. Payload stream is not thread safe. Payload does not
/// notify current task when new data is available.
///
/// Payload stream can be used as `Response` body stream.
#[derive(Debug)]
pub struct Payload {
inner: Rc<RefCell<Inner>>,
}
impl Payload {
/// Create payload stream.
///
/// This method construct two objects responsible for bytes stream
/// generation.
///
/// * `PayloadSender` - *Sender* side of the stream
///
/// * `Payload` - *Receiver* side of the stream
pub fn create(cap: h2::Capacity) -> (PayloadSender, Payload) {
let shared = Rc::new(RefCell::new(Inner::new(cap)));
(
PayloadSender {
inner: Rc::downgrade(&shared),
},
Payload { inner: shared },
)
}
#[inline]
pub async fn read(&self) -> Option<Result<Bytes, PayloadError>> {
poll_fn(|cx| self.poll_read(cx)).await
}
#[inline]
pub fn poll_read(
&self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
self.inner.borrow_mut().readany(cx)
}
}
impl Stream for Payload {
type Item = Result<Bytes, PayloadError>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
self.inner.borrow_mut().readany(cx)
}
}
/// Sender part of the payload stream
pub struct PayloadSender {
inner: Weak<RefCell<Inner>>,
}
impl Drop for PayloadSender {
fn drop(&mut self) {
self.set_error(PayloadError::Incomplete(None))
}
}
impl PayloadSender {
pub fn set_error(&mut self, err: PayloadError) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err);
self.inner = Weak::new();
}
}
pub fn feed_eof(&mut self, data: Bytes) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof(data);
self.inner = Weak::new();
}
}
pub fn feed_data(&mut self, data: Bytes, cap: h2::Capacity) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data, cap)
}
}
pub fn set_stream(&self, stream: Option<h2::Stream>) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().stream = stream;
}
}
}
#[derive(Debug)]
struct Inner {
eof: bool,
cap: h2::Capacity,
err: Option<PayloadError>,
items: VecDeque<Bytes>,
task: LocalWaker,
io_task: LocalWaker,
stream: Option<h2::Stream>,
}
impl Inner {
fn new(cap: h2::Capacity) -> Self {
Inner {
cap,
eof: false,
err: None,
stream: None,
items: VecDeque::new(),
task: LocalWaker::new(),
io_task: LocalWaker::new(),
}
}
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
self.task.wake()
}
fn feed_eof(&mut self, data: Bytes) {
self.eof = true;
if !data.is_empty() {
self.items.push_back(data);
}
self.task.wake()
}
fn feed_data(&mut self, data: Bytes, cap: h2::Capacity) {
self.cap += cap;
self.items.push_back(data);
self.task.wake();
}
fn readany(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
if let Some(data) = self.items.pop_front() {
if !self.eof {
self.cap.consume(data.len() as u32);
if self.cap.size() == 0 {
self.task.register(cx.waker());
}
}
Poll::Ready(Some(Ok(data)))
} else if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err)))
} else if self.eof {
Poll::Ready(None)
} else {
self.task.register(cx.waker());
self.io_task.wake();
Poll::Pending
}
}
}

View file

@ -1,26 +1,25 @@
use std::task::{Context, Poll};
use std::{future::Future, marker::PhantomData, pin::Pin, rc::Rc};
use std::{cell::RefCell, task::Context, task::Poll};
use std::{convert::TryFrom, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc};
use h2::server::{self, Handshake};
use ntex_h2::{self as h2, frame::StreamId, server};
use crate::http::body::MessageBody;
use crate::http::body::{BodySize, MessageBody};
use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::error::{DispatchError, ResponseError};
use crate::http::request::Request;
use crate::http::response::Response;
use crate::io::{types, Filter, Io, IoRef, TokioIoBoxed};
use crate::http::error::{DispatchError, H2Error, ResponseError};
use crate::http::header::{self, HeaderMap, HeaderValue};
use crate::http::message::{CurrentIo, ResponseHead};
use crate::http::{DateService, Method, Request, Response, StatusCode, Uri, Version};
use crate::io::{types, Filter, Io, IoBoxed, IoRef};
use crate::service::{IntoServiceFactory, Service, ServiceFactory};
use crate::time::Millis;
use crate::util::Bytes;
use crate::util::{poll_fn, Bytes, BytesMut, Either, HashMap, Ready};
use super::dispatcher::Dispatcher;
use super::payload::{Payload, PayloadSender};
/// `ServiceFactory` implementation for HTTP2 transport
pub struct H2Service<F, S, B> {
srv: S,
cfg: ServiceConfig,
#[allow(dead_code)]
handshake_timeout: Millis,
h2config: h2::Config,
_t: PhantomData<(F, B)>,
}
@ -37,10 +36,10 @@ where
service: U,
) -> Self {
H2Service {
srv: service.into_factory(),
handshake_timeout: cfg.0.ssl_handshake_timeout,
_t: PhantomData,
cfg,
srv: service.into_factory(),
h2config: h2::Config::server(),
_t: PhantomData,
}
}
}
@ -116,7 +115,7 @@ mod rustls {
pipeline_factory(
Acceptor::from(config)
.timeout(self.handshake_timeout)
.timeout(self.cfg.0.ssl_handshake_timeout)
.map_err(|e| SslError::Ssl(Box::new(e)))
.map_init_err(|_| panic!()),
)
@ -142,6 +141,7 @@ where
fn new_service(&self, _: ()) -> Self::Future {
let fut = self.srv.new_service(());
let cfg = self.cfg.clone();
let h2config = self.h2config.clone();
Box::pin(async move {
let service = fut.await?;
@ -149,6 +149,7 @@ where
Ok(H2ServiceHandler {
config,
h2config,
_t: PhantomData,
})
})
@ -158,6 +159,7 @@ where
/// `Service` implementation for http/2 transport
pub struct H2ServiceHandler<F, S: Service<Request>, B> {
config: Rc<DispatcherConfig<S, (), ()>>,
h2config: h2::Config,
_t: PhantomData<(F, B)>,
}
@ -171,7 +173,7 @@ where
{
type Response = ();
type Error = DispatchError;
type Future = H2ServiceHandlerResponse<S, B>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -191,71 +193,286 @@ where
"New http2 connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
);
io.set_disconnect_timeout(self.config.client_disconnect.into());
H2ServiceHandlerResponse {
state: State::Handshake(
io.get_ref(),
self.config.clone(),
server::Builder::new().handshake(TokioIoBoxed::from(io)),
),
Box::pin(handle(
io.into(),
self.config.clone(),
self.h2config.clone(),
))
}
}
pub(in crate::http) async fn handle<S, B, X, U>(
io: IoBoxed,
config: Rc<DispatcherConfig<S, X, U>>,
h2config: h2::Config,
) -> Result<(), DispatchError>
where
S: Service<Request> + 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
X: 'static,
U: 'static,
{
io.set_disconnect_timeout(config.client_disconnect.into());
let ioref = io.get_ref();
let _ = server::handle_one(
io,
h2config,
ControlService::new(),
PublishService::new(ioref, config),
)
.await;
Ok(())
}
struct ControlService {}
impl ControlService {
fn new() -> Self {
Self {}
}
}
impl Service<h2::ControlMessage<H2Error>> for ControlService {
type Response = h2::ControlResult;
type Error = ();
type Future = Ready<Self::Response, Self::Error>;
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_shutdown(&self, _: &mut Context<'_>, _: bool) -> Poll<()> {
Poll::Ready(())
}
fn call(&self, msg: h2::ControlMessage<H2Error>) -> Self::Future {
log::trace!("Control message: {:?}", msg);
Ready::Ok::<_, ()>(msg.ack())
}
}
struct PublishService<S: Service<Request>, B, X, U> {
io: IoRef,
config: Rc<DispatcherConfig<S, X, U>>,
streams: RefCell<HashMap<StreamId, PayloadSender>>,
_t: PhantomData<B>,
}
impl<S, B, X, U> PublishService<S, B, X, U>
where
S: Service<Request> + 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
{
fn new(io: IoRef, config: Rc<DispatcherConfig<S, X, U>>) -> Self {
Self {
io,
config,
streams: RefCell::new(HashMap::default()),
_t: PhantomData,
}
}
}
enum State<S: Service<Request>, B: MessageBody>
where
S: 'static,
{
Incoming(Dispatcher<S, B, (), ()>),
Handshake(
IoRef,
Rc<DispatcherConfig<S, (), ()>>,
Handshake<TokioIoBoxed, Bytes>,
),
}
pub struct H2ServiceHandlerResponse<S, B>
impl<S, B, X, U> Service<h2::Message> for PublishService<S, B, X, U>
where
S: Service<Request> + 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
X: 'static,
U: 'static,
{
state: State<S, B>,
}
type Response = ();
type Error = H2Error;
type Future = Either<
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>,
Ready<Self::Response, Self::Error>,
>;
impl<S, B> Future for H2ServiceHandlerResponse<S, B>
where
S: Service<Request> + 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
{
type Output = Result<(), DispatchError>;
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.state {
State::Incoming(ref mut disp) => Pin::new(disp).poll(cx),
State::Handshake(ref io, ref config, ref mut handshake) => {
match Pin::new(handshake).poll(cx) {
Poll::Ready(Ok(conn)) => {
trace!("H2 handshake completed");
self.state = State::Incoming(Dispatcher::new(
io.clone(),
config.clone(),
conn,
None,
));
self.poll(cx)
#[inline]
fn poll_shutdown(&self, _: &mut Context<'_>, _: bool) -> Poll<()> {
Poll::Ready(())
}
fn call(&self, mut msg: h2::Message) -> Self::Future {
let (io, pseudo, headers, eof, payload) = match msg.kind().take() {
h2::MessageKind::Headers {
pseudo,
headers,
eof,
} => {
let pl = if !eof {
log::debug!("Creating local payload stream for {:?}", msg.id());
let (sender, payload) = Payload::create(msg.stream().empty_capacity());
self.streams.borrow_mut().insert(msg.id(), sender);
Some(payload)
} else {
None
};
(self.io.clone(), pseudo, headers, eof, pl)
}
h2::MessageKind::Data(data, cap) => {
log::debug!("Got data chunk for {:?}: {:?}", msg.id(), data.len());
if let Some(sender) = self.streams.borrow_mut().get_mut(&msg.id()) {
sender.feed_data(data, cap)
} else {
log::error!("Payload stream does not exists for {:?}", msg.id());
};
return Either::Right(Ready::Ok(()));
}
h2::MessageKind::Eof(item) => {
log::debug!("Got payload eof for {:?}: {:?}", msg.id(), item);
if let Some(mut sender) = self.streams.borrow_mut().remove(&msg.id()) {
match item {
h2::StreamEof::Data(data) => {
sender.feed_eof(data);
}
h2::StreamEof::Trailers(_) => {
sender.feed_eof(Bytes::new());
}
h2::StreamEof::Error(err) => sender.set_error(err.into()),
}
Poll::Ready(Err(err)) => {
trace!("H2 handshake error: {}", err);
Poll::Ready(Err(err.into()))
}
return Either::Right(Ready::Ok(()));
}
_ => return Either::Right(Ready::Ok(())),
};
let cfg = self.config.clone();
Either::Left(Box::pin(async move {
log::trace!(
"{:?} got request (eof: {}): {:#?}\nheaders: {:#?}",
msg.id(),
eof,
pseudo,
headers
);
let mut req = if let Some(pl) = payload {
Request::with_payload(crate::http::Payload::H2(pl))
} else {
Request::new()
};
let path = pseudo.path.ok_or(H2Error::MissingPseudo("Path"))?;
let method = pseudo.method.ok_or(H2Error::MissingPseudo("Method"))?;
let head = req.head_mut();
head.uri = if let Some(ref authority) = pseudo.authority {
let scheme = pseudo.scheme.ok_or(H2Error::MissingPseudo("Scheme"))?;
Uri::try_from(format!("{}://{}{}", scheme, authority, path))?
} else {
Uri::try_from(path.as_str())?
};
let is_head_req = method == Method::HEAD;
head.version = Version::HTTP_2;
head.method = method;
head.headers = headers;
head.io = CurrentIo::Ref(io);
let (mut res, mut body) = match cfg.service.call(req).await {
Ok(res) => res.into().into_parts(),
Err(err) => {
let (res, body) = Response::from(&err).into_parts();
(res, body.into_body())
}
};
let head = res.head_mut();
let mut size = body.size();
prepare_response(&cfg.timer, head, &mut size);
log::debug!("Received service response: {:?} payload: {:?}", head, size);
let hdrs = mem::replace(&mut head.headers, HeaderMap::new());
if size.is_eof() || is_head_req {
msg.stream().send_response(head.status, hdrs, true)?;
} else {
msg.stream().send_response(head.status, hdrs, false)?;
loop {
match poll_fn(|cx| body.poll_next_chunk(cx)).await {
None => {
log::debug!("{:?} closing sending payload", msg.id());
msg.stream().send_payload(Bytes::new(), true).await?;
break;
}
Some(Ok(chunk)) => {
log::debug!(
"{:?} sending data chunk {:?} bytes",
msg.id(),
chunk.len()
);
if !chunk.is_empty() {
msg.stream().send_payload(chunk, false).await?;
}
}
Some(Err(e)) => {
error!("Response payload stream error: {:?}", e);
return Err(e.into());
}
}
Poll::Pending => Poll::Pending,
}
}
}
Ok(())
}))
}
}
fn prepare_response(timer: &DateService, head: &mut ResponseHead, size: &mut BodySize) {
let mut skip_len = size == &BodySize::Stream;
// Content length
match head.status {
StatusCode::NO_CONTENT | StatusCode::CONTINUE | StatusCode::PROCESSING => {
*size = BodySize::None
}
StatusCode::SWITCHING_PROTOCOLS => {
skip_len = true;
*size = BodySize::Stream;
head.headers.remove(header::CONTENT_LENGTH);
}
_ => (),
}
let _ = match size {
BodySize::None | BodySize::Stream => (),
BodySize::Empty => head
.headers
.insert(header::CONTENT_LENGTH, HeaderValue::from_static("0")),
BodySize::Sized(len) => {
if !skip_len {
head.headers.insert(
header::CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
);
}
}
};
// http2 specific1
head.headers.remove(header::CONNECTION);
head.headers.remove(header::TRANSFER_ENCODING);
// set date header
if !head.headers.contains_key(header::DATE) {
let mut bytes = BytesMut::with_capacity(29);
timer.set_date(|date| bytes.extend_from_slice(date));
head.headers.insert(header::DATE, unsafe {
HeaderValue::from_maybe_shared_unchecked(bytes.freeze())
});
}
}

View file

@ -1,8 +1,6 @@
use std::{fmt, mem, pin::Pin, task::Context, task::Poll};
use h2::RecvStream;
use super::{error::PayloadError, h1, h2 as h2d};
use super::{error::PayloadError, h1, h2};
use crate::util::{poll_fn, Bytes, Stream};
/// Type represent boxed payload
@ -12,7 +10,7 @@ pub type PayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>
pub enum Payload {
None,
H1(h1::Payload),
H2(h2d::Payload),
H2(h2::Payload),
Stream(PayloadStream),
}
@ -28,18 +26,12 @@ impl From<h1::Payload> for Payload {
}
}
impl From<h2d::Payload> for Payload {
fn from(v: h2d::Payload) -> Self {
impl From<h2::Payload> for Payload {
fn from(v: h2::Payload) -> Self {
Payload::H2(v)
}
}
impl From<RecvStream> for Payload {
fn from(v: RecvStream) -> Self {
Payload::H2(h2d::Payload::new(v))
}
}
impl From<PayloadStream> for Payload {
fn from(pl: PayloadStream) -> Self {
Payload::Stream(pl)
@ -88,7 +80,7 @@ impl Payload {
match self {
Payload::None => Poll::Ready(None),
Payload::H1(ref mut pl) => pl.readany(cx),
Payload::H2(ref mut pl) => Pin::new(pl).poll_next(cx),
Payload::H2(ref mut pl) => pl.poll_read(cx),
Payload::Stream(ref mut pl) => Pin::new(pl).poll_next(cx),
}
}

View file

@ -204,17 +204,6 @@ impl<B> Response<B> {
}
}
/// Set a body and return previous body value
pub(crate) fn replace_body<B2>(self, body: B2) -> (Response<B2>, ResponseBody<B>) {
(
Response {
head: self.head,
body: ResponseBody::Body(body),
},
self.body,
)
}
/// Set a body and return previous body value
pub fn map_body<F, B2>(mut self, f: F) -> Response<B2>
where

View file

@ -1,12 +1,9 @@
use std::task::{Context, Poll};
use std::{cell, error, fmt, future, marker, pin::Pin, rc::Rc};
use std::{cell, error, fmt, future, future::Future, marker, pin::Pin, rc::Rc};
use h2::server::{self, Handshake};
use crate::io::{types, Filter, Io, IoRef, TokioIoBoxed};
use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceFactory};
use crate::time::{Millis, Seconds};
use crate::util::Bytes;
use super::body::MessageBody;
use super::builder::HttpServiceBuilder;
@ -14,7 +11,7 @@ use super::config::{DispatcherConfig, KeepAlive, OnRequest, ServiceConfig};
use super::error::{DispatchError, ResponseError};
use super::request::Request;
use super::response::Response;
use super::{h1, h2::Dispatcher};
use super::{h1, h2};
/// `ServiceFactory` HTTP1.1/HTTP2 transport implementation
pub struct HttpService<F, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler<F>> {
@ -56,6 +53,7 @@ where
Millis(5_000),
Seconds::ONE,
Millis(5_000),
ntex_h2::Config::server(),
);
HttpService {
@ -280,9 +278,11 @@ where
None
};
let h2config = cfg.0.h2config.clone();
let config = DispatcherConfig::new(cfg, service, expect, upgrade, on_request);
Ok(HttpServiceHandler {
h2config,
config: Rc::new(config),
_t: marker::PhantomData,
})
@ -293,6 +293,7 @@ where
/// `Service` implementation for http transport
pub struct HttpServiceHandler<F, S, B, X, U> {
config: Rc<DispatcherConfig<S, X, U>>,
h2config: ntex_h2::Config,
_t: marker::PhantomData<(F, B)>,
}
@ -376,13 +377,12 @@ where
);
if io.query::<types::HttpProtocol>().get() == Some(types::HttpProtocol::Http2) {
io.set_disconnect_timeout(self.config.client_disconnect.into());
HttpServiceHandlerResponse {
state: ResponseState::H2Handshake {
data: Some((
io.get_ref(),
server::Builder::new().handshake(TokioIoBoxed::from(io)),
state: ResponseState::H2 {
fut: Box::pin(h2::handle(
io.into(),
self.config.clone(),
self.h2config.clone(),
)),
},
}
@ -436,14 +436,7 @@ pin_project_lite::pin_project! {
U: 'static,
{
H1 { #[pin] fut: h1::Dispatcher<F, S, B, X, U> },
H2 { fut: Dispatcher<S, B, X, U> },
H2Handshake { data:
Option<(
IoRef,
Handshake<TokioIoBoxed, Bytes>,
Rc<DispatcherConfig<S, X, U>>,
)>,
},
H2 { fut: Pin<Box<dyn Future<Output = Result<(), DispatchError>>>> },
}
}
@ -467,25 +460,6 @@ where
match this.state.project() {
StateProject::H1 { fut } => fut.poll(cx),
StateProject::H2 { ref mut fut } => Pin::new(fut).poll(cx),
StateProject::H2Handshake { data } => {
let conn = if let Some(ref mut item) = data {
match Pin::new(&mut item.1).poll(cx) {
Poll::Ready(Ok(conn)) => conn,
Poll::Ready(Err(err)) => {
trace!("H2 handshake error: {}", err);
return Poll::Ready(Err(err.into()));
}
Poll::Pending => return Poll::Pending,
}
} else {
panic!()
};
let (io, _, cfg) = data.take().unwrap();
self.as_mut().project().state.set(ResponseState::H2 {
fut: Dispatcher::new(io, cfg, conn, None),
});
self.poll(cx)
}
}
}
}

View file

@ -92,8 +92,6 @@ pub mod time {
pub mod io {
//! IO streaming utilities.
pub use ntex_io::*;
pub use ntex_tokio::TokioIoBoxed;
}
pub mod testing {

View file

@ -163,7 +163,8 @@ impl InternalServiceFactory for ConfiguredService {
let mut services = mem::take(&mut rt.0.borrow_mut().services);
// TODO: Proper error handling here
for f in mem::take(&mut rt.0.borrow_mut().onstart).into_iter() {
let onstart = mem::take(&mut rt.0.borrow_mut().onstart);
for f in onstart.into_iter() {
f.await;
}
let mut res = vec![];

View file

@ -1,6 +1,5 @@
#![cfg(feature = "openssl")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use tls_openssl::ssl::{
AlpnError, SslAcceptor, SslConnector, SslFiletype, SslMethod, SslVerifyMode,

View file

@ -308,6 +308,7 @@ async fn test_h2_head_binary() {
assert!(bytes.is_empty());
}
/// Server must send content-length, but no payload
#[ntex::test]
async fn test_h2_head_binary2() {
let srv = test_server(move || {