mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-06 06:17:40 +03:00
refactor http client connector
This commit is contained in:
parent
ce48c620fd
commit
703f911f6e
6 changed files with 199 additions and 557 deletions
|
@ -30,7 +30,7 @@ async fn main() -> std::io::Result<()> {
|
|||
.wrap(middleware::Compress::default())
|
||||
.wrap(middleware::Logger::default())
|
||||
.service(web::resource("/resource1/{name}/index.html").to(index))
|
||||
.service(web::resource("/").route(web::get().to(index)))
|
||||
.service(web::resource("/").route(web::get().to(no_params)))
|
||||
// .service(index)
|
||||
// .service(no_params)
|
||||
.service(
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, io, mem, time};
|
||||
use std::{fmt, time};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||
use bytes::{Buf, Bytes};
|
||||
use bytes::Bytes;
|
||||
use futures::future::{err, Either, Future, FutureExt, LocalBoxFuture, Ready};
|
||||
use h2::client::SendRequest;
|
||||
use pin_project::{pin_project, project};
|
||||
|
||||
use crate::http::body::MessageBody;
|
||||
use crate::http::h1::ClientCodec;
|
||||
|
@ -151,149 +148,3 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum EitherConnection<A, B> {
|
||||
A(IoConnection<A>),
|
||||
B(IoConnection<B>),
|
||||
}
|
||||
|
||||
impl<A, B> Connection for EitherConnection<A, B>
|
||||
where
|
||||
A: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
B: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
type Io = EitherIo<A, B>;
|
||||
type Future =
|
||||
LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>;
|
||||
|
||||
fn protocol(&self) -> Protocol {
|
||||
match self {
|
||||
EitherConnection::A(con) => con.protocol(),
|
||||
EitherConnection::B(con) => con.protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_request<RB: MessageBody + 'static, H: Into<RequestHeadType>>(
|
||||
self,
|
||||
head: H,
|
||||
body: RB,
|
||||
) -> Self::Future {
|
||||
match self {
|
||||
EitherConnection::A(con) => con.send_request(head, body),
|
||||
EitherConnection::B(con) => con.send_request(head, body),
|
||||
}
|
||||
}
|
||||
|
||||
type TunnelFuture = LocalBoxFuture<
|
||||
'static,
|
||||
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
|
||||
>;
|
||||
|
||||
/// Send request, returns Response and Framed
|
||||
fn open_tunnel<H: Into<RequestHeadType>>(self, head: H) -> Self::TunnelFuture {
|
||||
match self {
|
||||
EitherConnection::A(con) => con
|
||||
.open_tunnel(head)
|
||||
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A))))
|
||||
.boxed_local(),
|
||||
EitherConnection::B(con) => con
|
||||
.open_tunnel(head)
|
||||
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B))))
|
||||
.boxed_local(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub(crate) enum EitherIo<A, B> {
|
||||
A(#[pin] A),
|
||||
B(#[pin] B),
|
||||
}
|
||||
|
||||
impl<A, B> AsyncRead for EitherIo<A, B>
|
||||
where
|
||||
A: AsyncRead,
|
||||
B: AsyncRead,
|
||||
{
|
||||
#[inline]
|
||||
#[project]
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
#[project]
|
||||
match self.project() {
|
||||
EitherIo::A(val) => val.poll_read(cx, buf),
|
||||
EitherIo::B(val) => val.poll_read(cx, buf),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
unsafe fn prepare_uninitialized_buffer(
|
||||
&self,
|
||||
buf: &mut [mem::MaybeUninit<u8>],
|
||||
) -> bool {
|
||||
match self {
|
||||
EitherIo::A(ref val) => val.prepare_uninitialized_buffer(buf),
|
||||
EitherIo::B(ref val) => val.prepare_uninitialized_buffer(buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> AsyncWrite for EitherIo<A, B>
|
||||
where
|
||||
A: AsyncWrite,
|
||||
B: AsyncWrite,
|
||||
{
|
||||
#[project]
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
#[project]
|
||||
match self.project() {
|
||||
EitherIo::A(val) => val.poll_write(cx, buf),
|
||||
EitherIo::B(val) => val.poll_write(cx, buf),
|
||||
}
|
||||
}
|
||||
|
||||
#[project]
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
#[project]
|
||||
match self.project() {
|
||||
EitherIo::A(val) => val.poll_flush(cx),
|
||||
EitherIo::B(val) => val.poll_flush(cx),
|
||||
}
|
||||
}
|
||||
|
||||
#[project]
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
#[project]
|
||||
match self.project() {
|
||||
EitherIo::A(val) => val.poll_shutdown(cx),
|
||||
EitherIo::B(val) => val.poll_shutdown(cx),
|
||||
}
|
||||
}
|
||||
|
||||
#[project]
|
||||
fn poll_write_buf<U: Buf>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut U,
|
||||
) -> Poll<Result<usize, io::Error>>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
#[project]
|
||||
match self.project() {
|
||||
EitherIo::A(val) => val.poll_write_buf(cx, buf),
|
||||
EitherIo::B(val) => val.poll_write_buf(cx, buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use futures::future::{err, Either, Ready};
|
||||
|
||||
use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
|
||||
use crate::http::{Protocol, Uri};
|
||||
use crate::rt::net::TcpStream;
|
||||
use crate::service::{apply_fn, boxed, Service};
|
||||
use crate::util::timeout::{TimeoutError, TimeoutService};
|
||||
use crate::{apply_fn, Service};
|
||||
|
||||
use super::connection::Connection;
|
||||
use super::error::ConnectError;
|
||||
|
@ -23,15 +24,8 @@ use crate::connect::rustls::ClientConfig;
|
|||
#[cfg(feature = "rustls")]
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(any(feature = "openssl", feature = "rustls"))]
|
||||
enum SslConnector {
|
||||
#[cfg(feature = "openssl")]
|
||||
Openssl(OpensslConnector),
|
||||
#[cfg(feature = "rustls")]
|
||||
Rustls(Arc<ClientConfig>),
|
||||
}
|
||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||
type SslConnector = ();
|
||||
type BoxedConnector =
|
||||
boxed::BoxService<TcpConnect<Uri>, (Box<dyn Io>, Protocol), ConnectError>;
|
||||
|
||||
/// Manages http client network connectivity.
|
||||
///
|
||||
|
@ -46,103 +40,64 @@ type SslConnector = ();
|
|||
/// .timeout(Duration::from_secs(5))
|
||||
/// .finish();
|
||||
/// ```
|
||||
pub struct Connector<T, U> {
|
||||
connector: T,
|
||||
pub struct Connector {
|
||||
timeout: Duration,
|
||||
conn_lifetime: Duration,
|
||||
conn_keep_alive: Duration,
|
||||
disconnect_timeout: Duration,
|
||||
limit: usize,
|
||||
#[allow(dead_code)]
|
||||
ssl: SslConnector,
|
||||
_t: PhantomData<U>,
|
||||
connector: BoxedConnector,
|
||||
ssl_connector: Option<BoxedConnector>,
|
||||
}
|
||||
|
||||
trait Io: AsyncRead + AsyncWrite + Unpin {}
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin> Io for T {}
|
||||
|
||||
impl Connector<(), ()> {
|
||||
impl Connector {
|
||||
#[allow(clippy::new_ret_no_self, clippy::let_unit_value)]
|
||||
pub fn new() -> Connector<
|
||||
impl Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = TcpStream,
|
||||
Error = crate::connect::ConnectError,
|
||||
> + Clone,
|
||||
TcpStream,
|
||||
> {
|
||||
let ssl = {
|
||||
#[cfg(feature = "openssl")]
|
||||
{
|
||||
use crate::connect::openssl::SslMethod;
|
||||
|
||||
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
||||
let _ = ssl
|
||||
.set_alpn_protos(b"\x02h2\x08http/1.1")
|
||||
.map_err(|e| error!("Can not set alpn protocol: {:?}", e));
|
||||
SslConnector::Openssl(ssl.build())
|
||||
}
|
||||
#[cfg(all(not(feature = "openssl"), feature = "rustls"))]
|
||||
{
|
||||
let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
let mut config = ClientConfig::new();
|
||||
config.set_protocols(&protos);
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
SslConnector::Rustls(Arc::new(config))
|
||||
}
|
||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||
{}
|
||||
};
|
||||
|
||||
Connector {
|
||||
ssl,
|
||||
connector: TcpConnector::default(),
|
||||
pub fn new() -> Connector {
|
||||
let conn = Connector {
|
||||
connector: boxed::service(
|
||||
TcpConnector::default()
|
||||
.map(|io| (Box::new(io) as Box<dyn Io>, Protocol::Http1))
|
||||
.map_err(ConnectError::from),
|
||||
),
|
||||
ssl_connector: None,
|
||||
timeout: Duration::from_secs(1),
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
disconnect_timeout: Duration::from_millis(3000),
|
||||
limit: 100,
|
||||
_t: PhantomData,
|
||||
};
|
||||
|
||||
#[cfg(feature = "openssl")]
|
||||
{
|
||||
use crate::connect::openssl::SslMethod;
|
||||
|
||||
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
||||
let _ = ssl
|
||||
.set_alpn_protos(b"\x02h2\x08http/1.1")
|
||||
.map_err(|e| error!("Can not set alpn protocol: {:?}", e));
|
||||
conn.ssl(ssl.build())
|
||||
}
|
||||
#[cfg(all(not(feature = "openssl"), feature = "rustls"))]
|
||||
{
|
||||
let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
let mut config = ClientConfig::new();
|
||||
config.set_protocols(&protos);
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
conn.rustls(Arc::new(config))
|
||||
}
|
||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||
{
|
||||
conn
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Connector<T, U> {
|
||||
/// Use custom connector.
|
||||
pub fn connector<T1, U1>(self, connector: T1) -> Connector<T1, U1>
|
||||
where
|
||||
U1: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
|
||||
T1: Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = U1,
|
||||
Error = crate::connect::ConnectError,
|
||||
> + Clone,
|
||||
{
|
||||
Connector {
|
||||
connector,
|
||||
timeout: self.timeout,
|
||||
conn_lifetime: self.conn_lifetime,
|
||||
conn_keep_alive: self.conn_keep_alive,
|
||||
disconnect_timeout: self.disconnect_timeout,
|
||||
limit: self.limit,
|
||||
ssl: self.ssl,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Connector<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
|
||||
T: Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = U,
|
||||
Error = crate::connect::ConnectError,
|
||||
> + Clone
|
||||
+ 'static,
|
||||
{
|
||||
impl Connector {
|
||||
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
||||
/// Set to 1 second by default.
|
||||
pub fn timeout(mut self, timeout: Duration) -> Self {
|
||||
|
@ -152,15 +107,42 @@ where
|
|||
|
||||
#[cfg(feature = "openssl")]
|
||||
/// Use custom `SslConnector` instance.
|
||||
pub fn ssl(mut self, connector: OpensslConnector) -> Self {
|
||||
self.ssl = SslConnector::Openssl(connector);
|
||||
self
|
||||
pub fn ssl(self, connector: OpensslConnector) -> Self {
|
||||
use crate::connect::openssl::OpensslConnector;
|
||||
|
||||
const H2: &[u8] = b"h2";
|
||||
self.secure_connector(OpensslConnector::new(connector).map(|sock| {
|
||||
let h2 = sock
|
||||
.ssl()
|
||||
.selected_alpn_protocol()
|
||||
.map(|protos| protos.windows(2).any(|w| w == H2))
|
||||
.unwrap_or(false);
|
||||
if h2 {
|
||||
(sock, Protocol::Http2)
|
||||
} else {
|
||||
(sock, Protocol::Http1)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(feature = "rustls")]
|
||||
pub fn rustls(mut self, connector: Arc<ClientConfig>) -> Self {
|
||||
self.ssl = SslConnector::Rustls(connector);
|
||||
self
|
||||
pub fn rustls(self, connector: Arc<ClientConfig>) -> Self {
|
||||
use crate::connect::rustls::{RustlsConnector, Session};
|
||||
|
||||
const H2: &[u8] = b"h2";
|
||||
self.secure_connector(RustlsConnector::new(connector).map(|sock| {
|
||||
let h2 = sock
|
||||
.get_ref()
|
||||
.1
|
||||
.get_alpn_protocol()
|
||||
.map(|protos| protos.windows(2).any(|w| w == H2))
|
||||
.unwrap_or(false);
|
||||
if h2 {
|
||||
(Box::new(sock) as Box<dyn Io>, Protocol::Http2)
|
||||
} else {
|
||||
(Box::new(sock) as Box<dyn Io>, Protocol::Http1)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
/// Set total number of simultaneous connections per type of scheme.
|
||||
|
@ -206,6 +188,44 @@ where
|
|||
self
|
||||
}
|
||||
|
||||
/// Use custom connector to open un-secured connections.
|
||||
pub fn connector<T, U>(mut self, connector: T) -> Self
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = (U, Protocol),
|
||||
Error = crate::connect::ConnectError,
|
||||
> + Clone
|
||||
+ 'static,
|
||||
{
|
||||
self.connector = boxed::service(
|
||||
connector
|
||||
.map(|(io, proto)| (Box::new(io) as Box<dyn Io>, proto))
|
||||
.map_err(ConnectError::from),
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
/// Use custom connector to open secure connections.
|
||||
pub fn secure_connector<T, U>(mut self, connector: T) -> Self
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = (U, Protocol),
|
||||
Error = crate::connect::ConnectError,
|
||||
> + Clone
|
||||
+ 'static,
|
||||
{
|
||||
self.ssl_connector = Some(boxed::service(
|
||||
connector
|
||||
.map(|(io, proto)| (Box::new(io) as Box<dyn Io>, proto))
|
||||
.map_err(ConnectError::from),
|
||||
));
|
||||
self
|
||||
}
|
||||
|
||||
/// Finish configuration process and create connector service.
|
||||
/// The Connector builder always concludes by calling `finish()` last in
|
||||
/// its combinator chain.
|
||||
|
@ -213,314 +233,87 @@ where
|
|||
self,
|
||||
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
|
||||
+ Clone {
|
||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||
{
|
||||
let connector = TimeoutService::new(
|
||||
self.timeout,
|
||||
apply_fn(self.connector, |msg: Connect, srv| {
|
||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||
})
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| (stream, Protocol::Http1)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
let tcp_service = connector(self.connector, self.timeout);
|
||||
|
||||
connect_impl::InnerConnector {
|
||||
tcp_pool: ConnectionPool::new(
|
||||
connector,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
None,
|
||||
self.limit,
|
||||
),
|
||||
}
|
||||
}
|
||||
#[cfg(any(feature = "openssl", feature = "rustls"))]
|
||||
{
|
||||
const H2: &[u8] = b"h2";
|
||||
#[cfg(feature = "openssl")]
|
||||
use crate::connect::openssl::OpensslConnector;
|
||||
#[cfg(feature = "rustls")]
|
||||
use crate::connect::rustls::{RustlsConnector, Session};
|
||||
use crate::{boxed::service, pipeline};
|
||||
let ssl_pool = if let Some(ssl_connector) = self.ssl_connector {
|
||||
let srv = connector(ssl_connector, self.timeout);
|
||||
Some(ConnectionPool::new(
|
||||
srv,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
Some(self.disconnect_timeout),
|
||||
self.limit,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let ssl_service = TimeoutService::new(
|
||||
self.timeout,
|
||||
pipeline(apply_fn(
|
||||
match self.ssl {
|
||||
#[cfg(feature = "openssl")]
|
||||
SslConnector::Openssl(ssl) => service(
|
||||
OpensslConnector::new(ssl)
|
||||
.map(|sock| {
|
||||
let h2 = sock
|
||||
.ssl()
|
||||
.selected_alpn_protocol()
|
||||
.map(|protos| protos.windows(2).any(|w| w == H2))
|
||||
.unwrap_or(false);
|
||||
if h2 {
|
||||
(Box::new(sock) as Box<dyn Io>, Protocol::Http2)
|
||||
} else {
|
||||
(Box::new(sock) as Box<dyn Io>, Protocol::Http1)
|
||||
}
|
||||
})
|
||||
.map_err(ConnectError::from),
|
||||
),
|
||||
#[cfg(feature = "rustls")]
|
||||
SslConnector::Rustls(ssl) => service(
|
||||
RustlsConnector::new(ssl).map_err(ConnectError::from).map(
|
||||
|sock| {
|
||||
let h2 = sock
|
||||
.get_ref()
|
||||
.1
|
||||
.get_alpn_protocol()
|
||||
.map(|protos| protos.windows(2).any(|w| w == H2))
|
||||
.unwrap_or(false);
|
||||
if h2 {
|
||||
(Box::new(sock) as Box<dyn Io>, Protocol::Http2)
|
||||
} else {
|
||||
(Box::new(sock) as Box<dyn Io>, Protocol::Http1)
|
||||
}
|
||||
},
|
||||
),
|
||||
),
|
||||
},
|
||||
|msg: Connect, srv| {
|
||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||
},
|
||||
)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
let tcp_service = TimeoutService::new(
|
||||
self.timeout,
|
||||
apply_fn(self.connector, |msg: Connect, srv| {
|
||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||
})
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| (stream, Protocol::Http1)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
connect_impl::InnerConnector {
|
||||
tcp_pool: ConnectionPool::new(
|
||||
tcp_service,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
None,
|
||||
self.limit,
|
||||
),
|
||||
ssl_pool: ConnectionPool::new(
|
||||
ssl_service,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
Some(self.disconnect_timeout),
|
||||
self.limit,
|
||||
),
|
||||
}
|
||||
}
|
||||
Rc::new(RefCell::new(InnerConnector {
|
||||
tcp_pool: ConnectionPool::new(
|
||||
tcp_service,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
None,
|
||||
self.limit,
|
||||
),
|
||||
ssl_pool,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||
mod connect_impl {
|
||||
use std::task::{Context, Poll};
|
||||
fn connector(
|
||||
connector: BoxedConnector,
|
||||
timeout: Duration,
|
||||
) -> impl Service<Request = Connect, Response = (Box<dyn Io>, Protocol), Error = ConnectError>
|
||||
{
|
||||
TimeoutService::new(
|
||||
timeout,
|
||||
apply_fn(connector, |msg: Connect, srv| {
|
||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||
})
|
||||
.map_err(ConnectError::from),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
})
|
||||
}
|
||||
|
||||
use futures::future::{err, Either, Ready};
|
||||
type Pool<T> = ConnectionPool<T, Box<dyn Io>>;
|
||||
|
||||
use super::*;
|
||||
use crate::http::client::connection::IoConnection;
|
||||
struct InnerConnector<T> {
|
||||
tcp_pool: Pool<T>,
|
||||
ssl_pool: Option<Pool<T>>,
|
||||
}
|
||||
|
||||
pub(crate) struct InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
{
|
||||
pub(crate) tcp_pool: ConnectionPool<T, Io>,
|
||||
impl<T> Service for InnerConnector<T>
|
||||
where
|
||||
T: Service<
|
||||
Request = Connect,
|
||||
Response = (Box<dyn Io>, Protocol),
|
||||
Error = ConnectError,
|
||||
> + 'static,
|
||||
{
|
||||
type Request = Connect;
|
||||
type Response = <Pool<T> as Service>::Response;
|
||||
type Error = ConnectError;
|
||||
type Future =
|
||||
Either<<Pool<T> as Service>::Future, Ready<Result<Self::Response, Self::Error>>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.tcp_pool.poll_ready(cx)
|
||||
}
|
||||
|
||||
impl<T, Io> Clone for InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
InnerConnector {
|
||||
tcp_pool: self.tcp_pool.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Service for InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
{
|
||||
type Request = Connect;
|
||||
type Response = IoConnection<Io>;
|
||||
type Error = ConnectError;
|
||||
type Future = Either<
|
||||
<ConnectionPool<T, Io> as Service>::Future,
|
||||
Ready<Result<IoConnection<Io>, ConnectError>>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.tcp_pool.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Connect) -> Self::Future {
|
||||
match req.uri.scheme_str() {
|
||||
Some("https") | Some("wss") => {
|
||||
fn call(&mut self, req: Connect) -> Self::Future {
|
||||
match req.uri.scheme_str() {
|
||||
Some("https") | Some("wss") => {
|
||||
if let Some(ref mut conn) = self.ssl_pool {
|
||||
Either::Left(conn.call(req))
|
||||
} else {
|
||||
Either::Right(err(ConnectError::SslIsNotSupported))
|
||||
}
|
||||
_ => Either::Left(self.tcp_pool.call(req)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "openssl", feature = "rustls"))]
|
||||
mod connect_impl {
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::future::Either;
|
||||
use futures::ready;
|
||||
|
||||
use super::*;
|
||||
use crate::http::client::connection::EitherConnection;
|
||||
|
||||
pub(crate) struct InnerConnector<T1, T2, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
|
||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
|
||||
{
|
||||
pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
|
||||
pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
|
||||
}
|
||||
|
||||
impl<T1, T2, Io1, Io2> Clone for InnerConnector<T1, T2, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
InnerConnector {
|
||||
tcp_pool: self.tcp_pool.clone(),
|
||||
ssl_pool: self.ssl_pool.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T1, T2, Io1, Io2> Service for InnerConnector<T1, T2, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
{
|
||||
type Request = Connect;
|
||||
type Response = EitherConnection<Io1, Io2>;
|
||||
type Error = ConnectError;
|
||||
type Future = Either<
|
||||
InnerConnectorResponseA<T1, Io1, Io2>,
|
||||
InnerConnectorResponseB<T2, Io1, Io2>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.tcp_pool.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Connect) -> Self::Future {
|
||||
match req.uri.scheme_str() {
|
||||
Some("https") | Some("wss") => Either::Right(InnerConnectorResponseB {
|
||||
fut: self.ssl_pool.call(req),
|
||||
_t: PhantomData,
|
||||
}),
|
||||
_ => Either::Left(InnerConnectorResponseA {
|
||||
fut: self.tcp_pool.call(req),
|
||||
_t: PhantomData,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
{
|
||||
#[pin]
|
||||
fut: <ConnectionPool<T, Io1> as Service>::Future,
|
||||
_t: PhantomData<Io2>,
|
||||
}
|
||||
|
||||
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
|
||||
where
|
||||
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
type Output = Result<EitherConnection<Io1, Io2>, ConnectError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Poll::Ready(
|
||||
ready!(Pin::new(&mut self.get_mut().fut).poll(cx))
|
||||
.map(EitherConnection::A),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
|
||||
where
|
||||
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
{
|
||||
#[pin]
|
||||
fut: <ConnectionPool<T, Io2> as Service>::Future,
|
||||
_t: PhantomData<Io1>,
|
||||
}
|
||||
|
||||
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
|
||||
where
|
||||
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
type Output = Result<EitherConnection<Io1, Io2>, ConnectError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Poll::Ready(
|
||||
ready!(Pin::new(&mut self.get_mut().fut).poll(cx))
|
||||
.map(EitherConnection::B),
|
||||
)
|
||||
_ => Either::Left(self.tcp_pool.call(req)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ use super::connection::{ConnectionType, IoConnection};
|
|||
use super::error::SendRequestError;
|
||||
use super::pool::Acquired;
|
||||
|
||||
pub(crate) async fn send_request<T, B>(
|
||||
pub(super) async fn send_request<T, B>(
|
||||
mut io: SendRequest<Bytes>,
|
||||
head: RequestHeadType,
|
||||
body: B,
|
||||
|
|
|
@ -26,7 +26,7 @@ use super::error::ConnectError;
|
|||
use super::Connect;
|
||||
|
||||
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
|
||||
pub(crate) struct Key {
|
||||
pub(super) struct Key {
|
||||
authority: Authority,
|
||||
}
|
||||
|
||||
|
@ -37,13 +37,12 @@ impl From<Authority> for Key {
|
|||
}
|
||||
|
||||
/// Connections pool
|
||||
pub(crate) struct ConnectionPool<T, Io: 'static>(Rc<RefCell<T>>, Rc<RefCell<Inner<Io>>>);
|
||||
pub(super) struct ConnectionPool<T, Io: 'static>(Rc<RefCell<T>>, Rc<RefCell<Inner<Io>>>);
|
||||
|
||||
impl<T, Io> ConnectionPool<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||
+ 'static,
|
||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
connector: T,
|
||||
|
@ -249,7 +248,7 @@ struct AvailableConnection<Io> {
|
|||
created: Instant,
|
||||
}
|
||||
|
||||
pub(crate) struct Inner<Io> {
|
||||
pub(super) struct Inner<Io> {
|
||||
conn_lifetime: Duration,
|
||||
conn_keep_alive: Duration,
|
||||
disconnect_timeout: Option<Duration>,
|
||||
|
@ -594,7 +593,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
|
||||
pub(super) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
|
||||
|
||||
impl<T> Acquired<T>
|
||||
where
|
||||
|
|
|
@ -7,16 +7,14 @@ use std::time::Duration;
|
|||
use brotli2::write::BrotliEncoder;
|
||||
use bytes::Bytes;
|
||||
use coo_kie::Cookie;
|
||||
use flate2::read::GzDecoder;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
|
||||
use futures::future::ok;
|
||||
use rand::Rng;
|
||||
|
||||
use ntex::http::client::{error::SendRequestError, Client, Connector};
|
||||
use ntex::http::test::server as test_server;
|
||||
use ntex::http::{header, HttpMessage, HttpService};
|
||||
use ntex::service::{map_config, pipeline_factory};
|
||||
use ntex::service::{map_config, pipeline_factory, Service};
|
||||
use ntex::web::dev::{AppConfig, BodyEncoding};
|
||||
use ntex::web::middleware::Compress;
|
||||
use ntex::web::{self, test, App, Error, HttpRequest, HttpResponse};
|
||||
|
@ -113,9 +111,10 @@ async fn test_timeout() {
|
|||
});
|
||||
|
||||
let connector = Connector::new()
|
||||
.connector(ntex::connect::Connector::new(
|
||||
ntex::connect::start_default_resolver(),
|
||||
))
|
||||
.connector(
|
||||
ntex::connect::Connector::new(ntex::connect::start_default_resolver())
|
||||
.map(|sock| (sock, ntex::http::Protocol::Http1)),
|
||||
)
|
||||
.timeout(Duration::from_secs(15))
|
||||
.finish();
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue