diff --git a/ntex/examples/uds.rs b/ntex/examples/uds.rs index d577a450..cbd6ee26 100644 --- a/ntex/examples/uds.rs +++ b/ntex/examples/uds.rs @@ -30,7 +30,7 @@ async fn main() -> std::io::Result<()> { .wrap(middleware::Compress::default()) .wrap(middleware::Logger::default()) .service(web::resource("/resource1/{name}/index.html").to(index)) - .service(web::resource("/").route(web::get().to(index))) + .service(web::resource("/").route(web::get().to(no_params))) // .service(index) // .service(no_params) .service( diff --git a/ntex/src/http/client/connection.rs b/ntex/src/http/client/connection.rs index bf4015e4..81cf343e 100644 --- a/ntex/src/http/client/connection.rs +++ b/ntex/src/http/client/connection.rs @@ -1,12 +1,9 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, io, mem, time}; +use std::{fmt, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures::future::{err, Either, Future, FutureExt, LocalBoxFuture, Ready}; use h2::client::SendRequest; -use pin_project::{pin_project, project}; use crate::http::body::MessageBody; use crate::http::h1::ClientCodec; @@ -151,149 +148,3 @@ where } } } - -#[allow(dead_code)] -pub(crate) enum EitherConnection { - A(IoConnection), - B(IoConnection), -} - -impl Connection for EitherConnection -where - A: AsyncRead + AsyncWrite + Unpin + 'static, - B: AsyncRead + AsyncWrite + Unpin + 'static, -{ - type Io = EitherIo; - type Future = - LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>; - - fn protocol(&self) -> Protocol { - match self { - EitherConnection::A(con) => con.protocol(), - EitherConnection::B(con) => con.protocol(), - } - } - - fn send_request>( - self, - head: H, - body: RB, - ) -> Self::Future { - match self { - EitherConnection::A(con) => con.send_request(head, body), - EitherConnection::B(con) => con.send_request(head, body), - } - } - - type TunnelFuture = LocalBoxFuture< - 'static, - Result<(ResponseHead, Framed), SendRequestError>, - >; - - /// Send request, returns Response and Framed - fn open_tunnel>(self, head: H) -> Self::TunnelFuture { - match self { - EitherConnection::A(con) => con - .open_tunnel(head) - .map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A)))) - .boxed_local(), - EitherConnection::B(con) => con - .open_tunnel(head) - .map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B)))) - .boxed_local(), - } - } -} - -#[pin_project] -pub(crate) enum EitherIo { - A(#[pin] A), - B(#[pin] B), -} - -impl AsyncRead for EitherIo -where - A: AsyncRead, - B: AsyncRead, -{ - #[inline] - #[project] - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - #[project] - match self.project() { - EitherIo::A(val) => val.poll_read(cx, buf), - EitherIo::B(val) => val.poll_read(cx, buf), - } - } - - #[inline] - unsafe fn prepare_uninitialized_buffer( - &self, - buf: &mut [mem::MaybeUninit], - ) -> bool { - match self { - EitherIo::A(ref val) => val.prepare_uninitialized_buffer(buf), - EitherIo::B(ref val) => val.prepare_uninitialized_buffer(buf), - } - } -} - -impl AsyncWrite for EitherIo -where - A: AsyncWrite, - B: AsyncWrite, -{ - #[project] - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - #[project] - match self.project() { - EitherIo::A(val) => val.poll_write(cx, buf), - EitherIo::B(val) => val.poll_write(cx, buf), - } - } - - #[project] - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - #[project] - match self.project() { - EitherIo::A(val) => val.poll_flush(cx), - EitherIo::B(val) => val.poll_flush(cx), - } - } - - #[project] - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - #[project] - match self.project() { - EitherIo::A(val) => val.poll_shutdown(cx), - EitherIo::B(val) => val.poll_shutdown(cx), - } - } - - #[project] - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut U, - ) -> Poll> - where - Self: Sized, - { - #[project] - match self.project() { - EitherIo::A(val) => val.poll_write_buf(cx, buf), - EitherIo::B(val) => val.poll_write_buf(cx, buf), - } - } -} diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index 22beb33c..61f187f3 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -1,14 +1,15 @@ -use std::fmt; -use std::marker::PhantomData; +use std::cell::RefCell; +use std::rc::Rc; +use std::task::{Context, Poll}; use std::time::Duration; use actix_codec::{AsyncRead, AsyncWrite}; +use futures::future::{err, Either, Ready}; use crate::connect::{Connect as TcpConnect, Connector as TcpConnector}; use crate::http::{Protocol, Uri}; -use crate::rt::net::TcpStream; +use crate::service::{apply_fn, boxed, Service}; use crate::util::timeout::{TimeoutError, TimeoutService}; -use crate::{apply_fn, Service}; use super::connection::Connection; use super::error::ConnectError; @@ -23,15 +24,8 @@ use crate::connect::rustls::ClientConfig; #[cfg(feature = "rustls")] use std::sync::Arc; -#[cfg(any(feature = "openssl", feature = "rustls"))] -enum SslConnector { - #[cfg(feature = "openssl")] - Openssl(OpensslConnector), - #[cfg(feature = "rustls")] - Rustls(Arc), -} -#[cfg(not(any(feature = "openssl", feature = "rustls")))] -type SslConnector = (); +type BoxedConnector = + boxed::BoxService, (Box, Protocol), ConnectError>; /// Manages http client network connectivity. /// @@ -46,103 +40,64 @@ type SslConnector = (); /// .timeout(Duration::from_secs(5)) /// .finish(); /// ``` -pub struct Connector { - connector: T, +pub struct Connector { timeout: Duration, conn_lifetime: Duration, conn_keep_alive: Duration, disconnect_timeout: Duration, limit: usize, - #[allow(dead_code)] - ssl: SslConnector, - _t: PhantomData, + connector: BoxedConnector, + ssl_connector: Option, } trait Io: AsyncRead + AsyncWrite + Unpin {} impl Io for T {} -impl Connector<(), ()> { +impl Connector { #[allow(clippy::new_ret_no_self, clippy::let_unit_value)] - pub fn new() -> Connector< - impl Service< - Request = TcpConnect, - Response = TcpStream, - Error = crate::connect::ConnectError, - > + Clone, - TcpStream, - > { - let ssl = { - #[cfg(feature = "openssl")] - { - use crate::connect::openssl::SslMethod; - - let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); - let _ = ssl - .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| error!("Can not set alpn protocol: {:?}", e)); - SslConnector::Openssl(ssl.build()) - } - #[cfg(all(not(feature = "openssl"), feature = "rustls"))] - { - let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - let mut config = ClientConfig::new(); - config.set_protocols(&protos); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - SslConnector::Rustls(Arc::new(config)) - } - #[cfg(not(any(feature = "openssl", feature = "rustls")))] - {} - }; - - Connector { - ssl, - connector: TcpConnector::default(), + pub fn new() -> Connector { + let conn = Connector { + connector: boxed::service( + TcpConnector::default() + .map(|io| (Box::new(io) as Box, Protocol::Http1)) + .map_err(ConnectError::from), + ), + ssl_connector: None, timeout: Duration::from_secs(1), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), disconnect_timeout: Duration::from_millis(3000), limit: 100, - _t: PhantomData, + }; + + #[cfg(feature = "openssl")] + { + use crate::connect::openssl::SslMethod; + + let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); + let _ = ssl + .set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(|e| error!("Can not set alpn protocol: {:?}", e)); + conn.ssl(ssl.build()) + } + #[cfg(all(not(feature = "openssl"), feature = "rustls"))] + { + let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + let mut config = ClientConfig::new(); + config.set_protocols(&protos); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + conn.rustls(Arc::new(config)) + } + #[cfg(not(any(feature = "openssl", feature = "rustls")))] + { + conn } } } -impl Connector { - /// Use custom connector. - pub fn connector(self, connector: T1) -> Connector - where - U1: AsyncRead + AsyncWrite + Unpin + fmt::Debug, - T1: Service< - Request = TcpConnect, - Response = U1, - Error = crate::connect::ConnectError, - > + Clone, - { - Connector { - connector, - timeout: self.timeout, - conn_lifetime: self.conn_lifetime, - conn_keep_alive: self.conn_keep_alive, - disconnect_timeout: self.disconnect_timeout, - limit: self.limit, - ssl: self.ssl, - _t: PhantomData, - } - } -} - -impl Connector -where - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, - T: Service< - Request = TcpConnect, - Response = U, - Error = crate::connect::ConnectError, - > + Clone - + 'static, -{ +impl Connector { /// Connection timeout, i.e. max time to connect to remote host including dns name resolution. /// Set to 1 second by default. pub fn timeout(mut self, timeout: Duration) -> Self { @@ -152,15 +107,42 @@ where #[cfg(feature = "openssl")] /// Use custom `SslConnector` instance. - pub fn ssl(mut self, connector: OpensslConnector) -> Self { - self.ssl = SslConnector::Openssl(connector); - self + pub fn ssl(self, connector: OpensslConnector) -> Self { + use crate::connect::openssl::OpensslConnector; + + const H2: &[u8] = b"h2"; + self.secure_connector(OpensslConnector::new(connector).map(|sock| { + let h2 = sock + .ssl() + .selected_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (sock, Protocol::Http2) + } else { + (sock, Protocol::Http1) + } + })) } #[cfg(feature = "rustls")] - pub fn rustls(mut self, connector: Arc) -> Self { - self.ssl = SslConnector::Rustls(connector); - self + pub fn rustls(self, connector: Arc) -> Self { + use crate::connect::rustls::{RustlsConnector, Session}; + + const H2: &[u8] = b"h2"; + self.secure_connector(RustlsConnector::new(connector).map(|sock| { + let h2 = sock + .get_ref() + .1 + .get_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (Box::new(sock) as Box, Protocol::Http2) + } else { + (Box::new(sock) as Box, Protocol::Http1) + } + })) } /// Set total number of simultaneous connections per type of scheme. @@ -206,6 +188,44 @@ where self } + /// Use custom connector to open un-secured connections. + pub fn connector(mut self, connector: T) -> Self + where + U: AsyncRead + AsyncWrite + Unpin + 'static, + T: Service< + Request = TcpConnect, + Response = (U, Protocol), + Error = crate::connect::ConnectError, + > + Clone + + 'static, + { + self.connector = boxed::service( + connector + .map(|(io, proto)| (Box::new(io) as Box, proto)) + .map_err(ConnectError::from), + ); + self + } + + /// Use custom connector to open secure connections. + pub fn secure_connector(mut self, connector: T) -> Self + where + U: AsyncRead + AsyncWrite + Unpin + 'static, + T: Service< + Request = TcpConnect, + Response = (U, Protocol), + Error = crate::connect::ConnectError, + > + Clone + + 'static, + { + self.ssl_connector = Some(boxed::service( + connector + .map(|(io, proto)| (Box::new(io) as Box, proto)) + .map_err(ConnectError::from), + )); + self + } + /// Finish configuration process and create connector service. /// The Connector builder always concludes by calling `finish()` last in /// its combinator chain. @@ -213,314 +233,87 @@ where self, ) -> impl Service + Clone { - #[cfg(not(any(feature = "openssl", feature = "rustls")))] - { - let connector = TimeoutService::new( - self.timeout, - apply_fn(self.connector, |msg: Connect, srv| { - srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) - }) - .map_err(ConnectError::from) - .map(|stream| (stream, Protocol::Http1)), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectError::Timeout, - }); + let tcp_service = connector(self.connector, self.timeout); - connect_impl::InnerConnector { - tcp_pool: ConnectionPool::new( - connector, - self.conn_lifetime, - self.conn_keep_alive, - None, - self.limit, - ), - } - } - #[cfg(any(feature = "openssl", feature = "rustls"))] - { - const H2: &[u8] = b"h2"; - #[cfg(feature = "openssl")] - use crate::connect::openssl::OpensslConnector; - #[cfg(feature = "rustls")] - use crate::connect::rustls::{RustlsConnector, Session}; - use crate::{boxed::service, pipeline}; + let ssl_pool = if let Some(ssl_connector) = self.ssl_connector { + let srv = connector(ssl_connector, self.timeout); + Some(ConnectionPool::new( + srv, + self.conn_lifetime, + self.conn_keep_alive, + Some(self.disconnect_timeout), + self.limit, + )) + } else { + None + }; - let ssl_service = TimeoutService::new( - self.timeout, - pipeline(apply_fn( - match self.ssl { - #[cfg(feature = "openssl")] - SslConnector::Openssl(ssl) => service( - OpensslConnector::new(ssl) - .map(|sock| { - let h2 = sock - .ssl() - .selected_alpn_protocol() - .map(|protos| protos.windows(2).any(|w| w == H2)) - .unwrap_or(false); - if h2 { - (Box::new(sock) as Box, Protocol::Http2) - } else { - (Box::new(sock) as Box, Protocol::Http1) - } - }) - .map_err(ConnectError::from), - ), - #[cfg(feature = "rustls")] - SslConnector::Rustls(ssl) => service( - RustlsConnector::new(ssl).map_err(ConnectError::from).map( - |sock| { - let h2 = sock - .get_ref() - .1 - .get_alpn_protocol() - .map(|protos| protos.windows(2).any(|w| w == H2)) - .unwrap_or(false); - if h2 { - (Box::new(sock) as Box, Protocol::Http2) - } else { - (Box::new(sock) as Box, Protocol::Http1) - } - }, - ), - ), - }, - |msg: Connect, srv| { - srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) - }, - )), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectError::Timeout, - }); - - let tcp_service = TimeoutService::new( - self.timeout, - apply_fn(self.connector, |msg: Connect, srv| { - srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) - }) - .map_err(ConnectError::from) - .map(|stream| (stream, Protocol::Http1)), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectError::Timeout, - }); - - connect_impl::InnerConnector { - tcp_pool: ConnectionPool::new( - tcp_service, - self.conn_lifetime, - self.conn_keep_alive, - None, - self.limit, - ), - ssl_pool: ConnectionPool::new( - ssl_service, - self.conn_lifetime, - self.conn_keep_alive, - Some(self.disconnect_timeout), - self.limit, - ), - } - } + Rc::new(RefCell::new(InnerConnector { + tcp_pool: ConnectionPool::new( + tcp_service, + self.conn_lifetime, + self.conn_keep_alive, + None, + self.limit, + ), + ssl_pool, + })) } } -#[cfg(not(any(feature = "openssl", feature = "rustls")))] -mod connect_impl { - use std::task::{Context, Poll}; +fn connector( + connector: BoxedConnector, + timeout: Duration, +) -> impl Service, Protocol), Error = ConnectError> +{ + TimeoutService::new( + timeout, + apply_fn(connector, |msg: Connect, srv| { + srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) + }) + .map_err(ConnectError::from), + ) + .map_err(|e| match e { + TimeoutError::Service(e) => e, + TimeoutError::Timeout => ConnectError::Timeout, + }) +} - use futures::future::{err, Either, Ready}; +type Pool = ConnectionPool>; - use super::*; - use crate::http::client::connection::IoConnection; +struct InnerConnector { + tcp_pool: Pool, + ssl_pool: Option>, +} - pub(crate) struct InnerConnector - where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service - + 'static, - { - pub(crate) tcp_pool: ConnectionPool, +impl Service for InnerConnector +where + T: Service< + Request = Connect, + Response = (Box, Protocol), + Error = ConnectError, + > + 'static, +{ + type Request = Connect; + type Response = as Service>::Response; + type Error = ConnectError; + type Future = + Either< as Service>::Future, Ready>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tcp_pool.poll_ready(cx) } - impl Clone for InnerConnector - where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service - + 'static, - { - fn clone(&self) -> Self { - InnerConnector { - tcp_pool: self.tcp_pool.clone(), - } - } - } - - impl Service for InnerConnector - where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service - + 'static, - { - type Request = Connect; - type Response = IoConnection; - type Error = ConnectError; - type Future = Either< - as Service>::Future, - Ready, ConnectError>>, - >; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.tcp_pool.poll_ready(cx) - } - - fn call(&mut self, req: Connect) -> Self::Future { - match req.uri.scheme_str() { - Some("https") | Some("wss") => { + fn call(&mut self, req: Connect) -> Self::Future { + match req.uri.scheme_str() { + Some("https") | Some("wss") => { + if let Some(ref mut conn) = self.ssl_pool { + Either::Left(conn.call(req)) + } else { Either::Right(err(ConnectError::SslIsNotSupported)) } - _ => Either::Left(self.tcp_pool.call(req)), } - } - } -} - -#[cfg(any(feature = "openssl", feature = "rustls"))] -mod connect_impl { - use std::future::Future; - use std::marker::PhantomData; - use std::pin::Pin; - use std::task::{Context, Poll}; - - use futures::future::Either; - use futures::ready; - - use super::*; - use crate::http::client::connection::EitherConnection; - - pub(crate) struct InnerConnector - where - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T1: Service, - T2: Service, - { - pub(crate) tcp_pool: ConnectionPool, - pub(crate) ssl_pool: ConnectionPool, - } - - impl Clone for InnerConnector - where - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T1: Service - + 'static, - T2: Service - + 'static, - { - fn clone(&self) -> Self { - InnerConnector { - tcp_pool: self.tcp_pool.clone(), - ssl_pool: self.ssl_pool.clone(), - } - } - } - - impl Service for InnerConnector - where - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T1: Service - + 'static, - T2: Service - + 'static, - { - type Request = Connect; - type Response = EitherConnection; - type Error = ConnectError; - type Future = Either< - InnerConnectorResponseA, - InnerConnectorResponseB, - >; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.tcp_pool.poll_ready(cx) - } - - fn call(&mut self, req: Connect) -> Self::Future { - match req.uri.scheme_str() { - Some("https") | Some("wss") => Either::Right(InnerConnectorResponseB { - fut: self.ssl_pool.call(req), - _t: PhantomData, - }), - _ => Either::Left(InnerConnectorResponseA { - fut: self.tcp_pool.call(req), - _t: PhantomData, - }), - } - } - } - - #[pin_project::pin_project] - pub(crate) struct InnerConnectorResponseA - where - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service - + 'static, - { - #[pin] - fut: as Service>::Future, - _t: PhantomData, - } - - impl Future for InnerConnectorResponseA - where - T: Service - + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - type Output = Result, ConnectError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready( - ready!(Pin::new(&mut self.get_mut().fut).poll(cx)) - .map(EitherConnection::A), - ) - } - } - - #[pin_project::pin_project] - pub(crate) struct InnerConnectorResponseB - where - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service - + 'static, - { - #[pin] - fut: as Service>::Future, - _t: PhantomData, - } - - impl Future for InnerConnectorResponseB - where - T: Service - + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - type Output = Result, ConnectError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready( - ready!(Pin::new(&mut self.get_mut().fut).poll(cx)) - .map(EitherConnection::B), - ) + _ => Either::Left(self.tcp_pool.call(req)), } } } diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index 3299c03e..5b6793d2 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -17,7 +17,7 @@ use super::connection::{ConnectionType, IoConnection}; use super::error::SendRequestError; use super::pool::Acquired; -pub(crate) async fn send_request( +pub(super) async fn send_request( mut io: SendRequest, head: RequestHeadType, body: B, diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index c6a27993..12c5f173 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -26,7 +26,7 @@ use super::error::ConnectError; use super::Connect; #[derive(Hash, Eq, PartialEq, Clone, Debug)] -pub(crate) struct Key { +pub(super) struct Key { authority: Authority, } @@ -37,13 +37,12 @@ impl From for Key { } /// Connections pool -pub(crate) struct ConnectionPool(Rc>, Rc>>); +pub(super) struct ConnectionPool(Rc>, Rc>>); impl ConnectionPool where Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service - + 'static, + T: Service, { pub(crate) fn new( connector: T, @@ -249,7 +248,7 @@ struct AvailableConnection { created: Instant, } -pub(crate) struct Inner { +pub(super) struct Inner { conn_lifetime: Duration, conn_keep_alive: Duration, disconnect_timeout: Option, @@ -594,7 +593,7 @@ where } } -pub(crate) struct Acquired(Key, Option>>>); +pub(super) struct Acquired(Key, Option>>>); impl Acquired where diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index 2df10297..93575cb7 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -7,16 +7,14 @@ use std::time::Duration; use brotli2::write::BrotliEncoder; use bytes::Bytes; use coo_kie::Cookie; -use flate2::read::GzDecoder; -use flate2::write::GzEncoder; -use flate2::Compression; +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; use futures::future::ok; use rand::Rng; use ntex::http::client::{error::SendRequestError, Client, Connector}; use ntex::http::test::server as test_server; use ntex::http::{header, HttpMessage, HttpService}; -use ntex::service::{map_config, pipeline_factory}; +use ntex::service::{map_config, pipeline_factory, Service}; use ntex::web::dev::{AppConfig, BodyEncoding}; use ntex::web::middleware::Compress; use ntex::web::{self, test, App, Error, HttpRequest, HttpResponse}; @@ -113,9 +111,10 @@ async fn test_timeout() { }); let connector = Connector::new() - .connector(ntex::connect::Connector::new( - ntex::connect::start_default_resolver(), - )) + .connector( + ntex::connect::Connector::new(ntex::connect::start_default_resolver()) + .map(|sock| (sock, ntex::http::Protocol::Http1)), + ) .timeout(Duration::from_secs(15)) .finish();