Refactor ntex-connect (#314)

This commit is contained in:
Nikolay Kim 2024-03-24 15:33:08 +01:00 committed by GitHub
parent 5414e2096a
commit baabcff4a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
35 changed files with 446 additions and 437 deletions

View file

@ -1,5 +1,5 @@
# Changes
## [1.0.0] - 2024-03-23
## [1.0.0] - 2024-03-25
* Move to separate crate

View file

@ -28,10 +28,20 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"]
async-std = ["ntex-rt/async-std", "ntex-async-std"]
[dependencies]
ntex-service = "2.0"
ntex-bytes = "0.1.24"
ntex-io = "1.0.0"
ntex-rt = "0.4.7"
ntex-http = "0.1"
ntex-io = "1.0"
ntex-rt = "0.4.11"
ntex-util = "1.0"
ntex-tokio = { version = "0.4.0", optional = true }
ntex-glommio = { version = "0.4.0", optional = true }
ntex-async-std = { version = "0.4.0", optional = true }
log = "0.4"
thiserror = "1.0"
[dev-dependencies]
env_logger = "0.11"
ntex = { version = "1", features = ["tokio"] }

113
ntex-net/src/compat.rs Normal file
View file

@ -0,0 +1,113 @@
//! Utility for async runtime abstraction
#[cfg(feature = "tokio")]
pub use ntex_tokio::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(unix, feature = "tokio"))]
pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "async-std",
not(feature = "tokio"),
not(feature = "glommio")
))]
pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(
unix,
feature = "async-std",
not(feature = "tokio"),
not(feature = "glommio")
))]
pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "glommio",
not(feature = "tokio"),
not(feature = "async-std")
))]
pub use ntex_glommio::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(
unix,
feature = "glommio",
not(feature = "tokio"),
not(feature = "async-std")
))]
pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "glommio")
))]
mod no_rt {
use ntex_io::Io;
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(_: std::net::SocketAddr) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(
_: std::net::SocketAddr,
_: ntex_bytes::PoolRef,
) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
#[cfg(unix)]
/// Opens a unix stream connection.
pub async fn unix_connect<'a, P>(_: P) -> std::io::Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
#[cfg(unix)]
/// Opens a unix stream connection and specified memory pool.
pub async fn unix_connect_in<'a, P>(_: P, _: ntex_bytes::PoolRef) -> std::io::Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
/// Convert std TcpStream to tokio's TcpStream
pub fn from_tcp_stream(_: std::net::TcpStream) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
#[cfg(unix)]
/// Convert std UnixStream to tokio's UnixStream
pub fn from_unix_stream(_: std::os::unix::net::UnixStream) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
}
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "glommio")
))]
pub use no_rt::*;

View file

@ -0,0 +1,56 @@
use std::io;
#[derive(thiserror::Error, Debug)]
pub enum ConnectError {
/// Failed to resolve the hostname
#[error("Failed resolving hostname: {0}")]
Resolver(io::Error),
/// No dns records
#[error("No dns records found for the input")]
NoRecords,
/// Invalid input
#[error("Invalid input")]
InvalidInput,
/// Unresolved host name
#[error("Connector received `Connect` method with unresolved host")]
Unresolved,
/// Connection io error
#[error("{0}")]
Io(#[from] io::Error),
}
impl Clone for ConnectError {
fn clone(&self) -> Self {
match self {
ConnectError::Resolver(err) => {
ConnectError::Resolver(io::Error::new(err.kind(), format!("{}", err)))
}
ConnectError::NoRecords => ConnectError::NoRecords,
ConnectError::InvalidInput => ConnectError::InvalidInput,
ConnectError::Unresolved => ConnectError::Unresolved,
ConnectError::Io(err) => {
ConnectError::Io(io::Error::new(err.kind(), format!("{}", err)))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[allow(clippy::redundant_clone)]
fn connect_error_clone() {
let _ =
ConnectError::Resolver(io::Error::new(io::ErrorKind::Other, "test")).clone();
let _ = ConnectError::NoRecords.clone();
let _ = ConnectError::InvalidInput.clone();
let _ = ConnectError::Unresolved.clone();
let _ = ConnectError::Io(io::Error::new(io::ErrorKind::Other, "test")).clone();
}
}

View file

@ -0,0 +1,324 @@
use std::collections::{vec_deque, VecDeque};
use std::{fmt, iter::FusedIterator, net::SocketAddr};
use ntex_util::future::Either;
/// Connect request
pub trait Address: Unpin + 'static {
/// Host name of the request
fn host(&self) -> &str;
/// Port of the request
fn port(&self) -> Option<u16>;
/// SocketAddr of the address
fn addr(&self) -> Option<SocketAddr> {
None
}
}
impl Address for String {
fn host(&self) -> &str {
self
}
fn port(&self) -> Option<u16> {
None
}
}
impl Address for &'static str {
fn host(&self) -> &str {
self
}
fn port(&self) -> Option<u16> {
None
}
}
impl Address for SocketAddr {
fn host(&self) -> &str {
""
}
fn port(&self) -> Option<u16> {
None
}
fn addr(&self) -> Option<SocketAddr> {
Some(*self)
}
}
/// Connect request
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct Connect<T> {
pub(super) req: T,
pub(super) port: u16,
pub(super) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
}
impl<T: Address> Connect<T> {
/// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
pub fn new(req: T) -> Connect<T> {
let (_, port) = parse(req.host());
Connect {
req,
port: port.unwrap_or(0),
addr: None,
}
}
/// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
pub fn with(req: T, addr: SocketAddr) -> Connect<T> {
Connect {
req,
port: 0,
addr: Some(Either::Left(addr)),
}
}
/// Use port if address does not provide one.
///
/// By default it set to 0
pub fn set_port(mut self, port: u16) -> Self {
self.port = port;
self
}
/// Use address.
pub fn set_addr(mut self, addr: Option<SocketAddr>) -> Self {
if let Some(addr) = addr {
self.addr = Some(Either::Left(addr));
}
self
}
/// Use addresses.
pub fn set_addrs<I>(mut self, addrs: I) -> Self
where
I: IntoIterator<Item = SocketAddr>,
{
let mut addrs = VecDeque::from_iter(addrs);
self.addr = if addrs.len() < 2 {
addrs.pop_front().map(Either::Left)
} else {
Some(Either::Right(addrs))
};
self
}
/// Host name
pub fn host(&self) -> &str {
self.req.host()
}
/// Port of the request
pub fn port(&self) -> u16 {
self.req.port().unwrap_or(self.port)
}
/// Preresolved addresses of the request.
pub fn addrs(&self) -> ConnectAddrsIter<'_> {
if let Some(addr) = self.req.addr() {
ConnectAddrsIter {
inner: Either::Left(Some(addr)),
}
} else {
let inner = match self.addr {
None => Either::Left(None),
Some(Either::Left(addr)) => Either::Left(Some(addr)),
Some(Either::Right(ref addrs)) => Either::Right(addrs.iter()),
};
ConnectAddrsIter { inner }
}
}
/// Takes preresolved addresses of the request.
pub fn take_addrs(&mut self) -> ConnectTakeAddrsIter {
if let Some(addr) = self.req.addr() {
ConnectTakeAddrsIter {
inner: Either::Left(Some(addr)),
}
} else {
let inner = match self.addr.take() {
None => Either::Left(None),
Some(Either::Left(addr)) => Either::Left(Some(addr)),
Some(Either::Right(addrs)) => Either::Right(addrs.into_iter()),
};
ConnectTakeAddrsIter { inner }
}
}
/// Return reference to inner type
pub fn get_ref(&self) -> &T {
&self.req
}
}
impl<T: Clone> Clone for Connect<T> {
fn clone(&self) -> Self {
Connect {
req: self.req.clone(),
port: self.port,
addr: self.addr.clone(),
}
}
}
impl<T: Address> From<T> for Connect<T> {
fn from(addr: T) -> Self {
Connect::new(addr)
}
}
impl<T: Address> fmt::Display for Connect<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.host(), self.port())
}
}
/// Iterator over addresses in a [`Connect`](struct.Connect.html) request.
#[derive(Clone)]
pub struct ConnectAddrsIter<'a> {
inner: Either<Option<SocketAddr>, vec_deque::Iter<'a, SocketAddr>>,
}
impl Iterator for ConnectAddrsIter<'_> {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
match self.inner {
Either::Left(ref mut opt) => opt.take(),
Either::Right(ref mut iter) => iter.next().copied(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.inner {
Either::Left(Some(_)) => (1, Some(1)),
Either::Left(None) => (0, Some(0)),
Either::Right(ref iter) => iter.size_hint(),
}
}
}
impl fmt::Debug for ConnectAddrsIter<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.clone()).finish()
}
}
impl ExactSizeIterator for ConnectAddrsIter<'_> {}
impl FusedIterator for ConnectAddrsIter<'_> {}
/// Owned iterator over addresses in a [`Connect`](struct.Connect.html) request.
#[derive(Debug)]
pub struct ConnectTakeAddrsIter {
inner: Either<Option<SocketAddr>, vec_deque::IntoIter<SocketAddr>>,
}
impl Iterator for ConnectTakeAddrsIter {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
match self.inner {
Either::Left(ref mut opt) => opt.take(),
Either::Right(ref mut iter) => iter.next(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.inner {
Either::Left(Some(_)) => (1, Some(1)),
Either::Left(None) => (0, Some(0)),
Either::Right(ref iter) => iter.size_hint(),
}
}
}
impl ExactSizeIterator for ConnectTakeAddrsIter {}
impl FusedIterator for ConnectTakeAddrsIter {}
fn parse(host: &str) -> (&str, Option<u16>) {
let mut parts_iter = host.splitn(2, ':');
if let Some(host) = parts_iter.next() {
let port_str = parts_iter.next().unwrap_or("");
if let Ok(port) = port_str.parse::<u16>() {
(host, Some(port))
} else {
(host, None)
}
} else {
(host, None)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn address() {
assert_eq!("test".host(), "test");
assert_eq!("test".port(), None);
let s = "test".to_string();
assert_eq!(s.host(), "test");
assert_eq!(s.port(), None);
}
#[test]
fn connect() {
let mut connect = Connect::new("www.rust-lang.org");
assert_eq!(connect.host(), "www.rust-lang.org");
assert_eq!(connect.port(), 0);
assert_eq!(*connect.get_ref(), "www.rust-lang.org");
connect = connect.set_port(80);
assert_eq!(connect.port(), 80);
let addrs = connect.addrs().clone();
assert_eq!(format!("{:?}", addrs), "[]");
assert!(connect.addrs().next().is_none());
assert!(format!("{:?}", connect.clone()).contains("Connect"));
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
connect = connect.set_addrs(vec![addr]);
let addrs = connect.addrs().clone();
assert_eq!(format!("{:?}", addrs), "[127.0.0.1:8080]");
let addrs: Vec<_> = connect.take_addrs().collect();
assert_eq!(addrs.len(), 1);
assert!(addrs.contains(&addr));
let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();
connect = connect.set_addrs(vec![addr, addr2]);
let addrs: Vec<_> = connect.addrs().collect();
assert_eq!(addrs.len(), 2);
assert!(addrs.contains(&addr));
assert!(addrs.contains(&addr2));
let addrs: Vec<_> = connect.take_addrs().collect();
assert_eq!(addrs.len(), 2);
assert!(addrs.contains(&addr));
assert!(addrs.contains(&addr2));
assert!(connect.addrs().next().is_none());
connect = connect.set_addrs(vec![addr]);
assert_eq!(format!("{}", connect), "www.rust-lang.org:80");
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let mut connect = Connect::new(addr);
assert_eq!(connect.host(), "");
assert_eq!(connect.port(), 0);
let addrs: Vec<_> = connect.addrs().collect();
assert_eq!(addrs.len(), 1);
assert!(addrs.contains(&addr));
let addrs: Vec<_> = connect.take_addrs().collect();
assert_eq!(addrs.len(), 1);
assert!(addrs.contains(&addr));
}
}

View file

@ -0,0 +1,22 @@
//! Tcp connector service
mod error;
mod message;
mod resolve;
mod service;
mod uri;
pub use self::error::ConnectError;
pub use self::message::{Address, Connect};
pub use self::resolve::Resolver;
pub use self::service::Connector;
use ntex_io::Io;
/// Resolve and connect to remote host
pub async fn connect<T, U>(message: U) -> Result<Io, ConnectError>
where
T: Address,
Connect<T>: From<U>,
{
Connector::new().connect(message).await
}

View file

@ -0,0 +1,165 @@
use std::{fmt, io, marker, net};
use ntex_rt::spawn_blocking;
use ntex_service::{Service, ServiceCtx, ServiceFactory};
use ntex_util::future::Either;
use super::{Address, Connect, ConnectError};
#[derive(Copy)]
/// DNS Resolver Service
pub struct Resolver<T>(marker::PhantomData<T>);
impl<T> Resolver<T> {
/// Create new resolver instance with custom configuration and options.
pub fn new() -> Self {
Resolver(marker::PhantomData)
}
}
impl<T: Address> Resolver<T> {
/// Lookup ip addresses for provided host
pub async fn lookup(&self, req: Connect<T>) -> Result<Connect<T>, ConnectError> {
self.lookup_with_tag(req, "TCP-CLIENT").await
}
#[doc(hidden)]
/// Lookup ip addresses for provided host
pub async fn lookup_with_tag(
&self,
mut req: Connect<T>,
tag: &'static str,
) -> Result<Connect<T>, ConnectError> {
if req.addr.is_some() || req.req.addr().is_some() {
Ok(req)
} else if let Ok(ip) = req.host().parse() {
req.addr = Some(Either::Left(net::SocketAddr::new(ip, req.port())));
Ok(req)
} else {
log::trace!("{}: DNS Resolver - resolving host {:?}", tag, req.host());
let host = if req.host().contains(':') {
req.host().to_string()
} else {
format!("{}:{}", req.host(), req.port())
};
let fut = spawn_blocking(move || net::ToSocketAddrs::to_socket_addrs(&host));
match fut.await {
Ok(Ok(ips)) => {
let port = req.port();
let req = req.set_addrs(ips.map(|mut ip| {
ip.set_port(port);
ip
}));
log::trace!(
"{}: DNS Resolver - host {:?} resolved to {:?}",
tag,
req.host(),
req.addrs()
);
if req.addr.is_none() {
Err(ConnectError::NoRecords)
} else {
Ok(req)
}
}
Ok(Err(e)) => {
log::trace!(
"{}: DNS Resolver - failed to resolve host {:?} err: {}",
tag,
req.host(),
e
);
Err(ConnectError::Resolver(e))
}
Err(e) => {
log::trace!(
"{}: DNS Resolver - failed to resolve host {:?} err: {}",
tag,
req.host(),
e
);
Err(ConnectError::Resolver(io::Error::new(
io::ErrorKind::Other,
e,
)))
}
}
}
}
}
impl<T> Default for Resolver<T> {
fn default() -> Resolver<T> {
Resolver::new()
}
}
impl<T> Clone for Resolver<T> {
fn clone(&self) -> Self {
Resolver(marker::PhantomData)
}
}
impl<T> fmt::Debug for Resolver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Resolver").finish()
}
}
impl<T: Address, C> ServiceFactory<Connect<T>, C> for Resolver<T> {
type Response = Connect<T>;
type Error = ConnectError;
type Service = Resolver<T>;
type InitError = ();
async fn create(&self, _: C) -> Result<Self::Service, Self::InitError> {
Ok(self.clone())
}
}
impl<T: Address> Service<Connect<T>> for Resolver<T> {
type Response = Connect<T>;
type Error = ConnectError;
async fn call(
&self,
req: Connect<T>,
_: ServiceCtx<'_, Self>,
) -> Result<Connect<T>, Self::Error> {
self.lookup(req).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use ntex_util::future::lazy;
#[allow(clippy::clone_on_copy)]
#[ntex::test]
async fn resolver() {
let resolver = Resolver::default().clone();
assert!(format!("{:?}", resolver).contains("Resolver"));
let srv = resolver.pipeline(()).await.unwrap();
assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready());
let res = srv.call(Connect::new("www.rust-lang.org")).await;
assert!(res.is_ok());
let res = srv.call(Connect::new("---11213")).await;
assert!(res.is_err());
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let res = srv
.call(Connect::new("www.rust-lang.org").set_addrs(vec![addr]))
.await
.unwrap();
let addrs: Vec<_> = res.addrs().collect();
assert_eq!(addrs.len(), 1);
assert!(addrs.contains(&addr));
}
}

View file

@ -0,0 +1,260 @@
use std::task::{Context, Poll};
use std::{collections::VecDeque, fmt, future::Future, io, net::SocketAddr, pin::Pin};
use ntex_bytes::{PoolId, PoolRef};
use ntex_io::{types, Io};
use ntex_service::{Service, ServiceCtx, ServiceFactory};
use ntex_util::future::{BoxFuture, Either};
use super::{Address, Connect, ConnectError, Resolver};
use crate::tcp_connect_in;
#[derive(Copy)]
pub struct Connector<T> {
resolver: Resolver<T>,
pool: PoolRef,
tag: &'static str,
}
impl<T> Connector<T> {
/// Construct new connect service with default dns resolver
pub fn new() -> Self {
Connector {
resolver: Resolver::new(),
pool: PoolId::P0.pool_ref(),
tag: "TCP-CLIENT",
}
}
/// Set memory pool
///
/// Use specified memory pool for memory allocations. By default P0
/// memory pool is used.
pub fn memory_pool(mut self, id: PoolId) -> Self {
self.pool = id.pool_ref();
self
}
/// Set io tag
///
/// Set tag to opened io object.
pub fn tag(mut self, tag: &'static str) -> Self {
self.tag = tag;
self
}
}
impl<T: Address> Connector<T> {
/// Resolve and connect to remote host
pub async fn connect<U>(&self, message: U) -> Result<Io, ConnectError>
where
Connect<T>: From<U>,
{
// resolve first
let address = self
.resolver
.lookup_with_tag(message.into(), self.tag)
.await?;
let port = address.port();
let Connect { req, addr, .. } = address;
if let Some(addr) = addr {
TcpConnectorResponse::new(req, port, addr, self.tag, self.pool).await
} else if let Some(addr) = req.addr() {
TcpConnectorResponse::new(
req,
addr.port(),
Either::Left(addr),
self.tag,
self.pool,
)
.await
} else {
log::error!("{}: TCP connector: got unresolved address", self.tag);
Err(ConnectError::Unresolved)
}
}
}
impl<T> Default for Connector<T> {
fn default() -> Self {
Connector::new()
}
}
impl<T> Clone for Connector<T> {
fn clone(&self) -> Self {
Connector {
resolver: self.resolver.clone(),
tag: self.tag,
pool: self.pool,
}
}
}
impl<T> fmt::Debug for Connector<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connector")
.field("tag", &self.tag)
.field("resolver", &self.resolver)
.field("memory_pool", &self.pool)
.finish()
}
}
impl<T: Address, C> ServiceFactory<Connect<T>, C> for Connector<T> {
type Response = Io;
type Error = ConnectError;
type Service = Connector<T>;
type InitError = ();
async fn create(&self, _: C) -> Result<Self::Service, Self::InitError> {
Ok(self.clone())
}
}
impl<T: Address> Service<Connect<T>> for Connector<T> {
type Response = Io;
type Error = ConnectError;
async fn call(
&self,
req: Connect<T>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
self.connect(req).await
}
}
/// Tcp stream connector response future
struct TcpConnectorResponse<T> {
req: Option<T>,
port: u16,
addrs: Option<VecDeque<SocketAddr>>,
#[allow(clippy::type_complexity)]
stream: Option<BoxFuture<'static, Result<Io, io::Error>>>,
tag: &'static str,
pool: PoolRef,
}
impl<T: Address> TcpConnectorResponse<T> {
fn new(
req: T,
port: u16,
addr: Either<SocketAddr, VecDeque<SocketAddr>>,
tag: &'static str,
pool: PoolRef,
) -> TcpConnectorResponse<T> {
log::trace!(
"{}: TCP connector - connecting to {:?} addr:{:?} port:{}",
tag,
req.host(),
addr,
port
);
match addr {
Either::Left(addr) => TcpConnectorResponse {
req: Some(req),
addrs: None,
stream: Some(Box::pin(tcp_connect_in(addr, pool))),
tag,
pool,
port,
},
Either::Right(addrs) => TcpConnectorResponse {
tag,
port,
pool,
req: Some(req),
addrs: Some(addrs),
stream: None,
},
}
}
fn can_continue(&self, err: &io::Error) -> bool {
log::trace!(
"{}: TCP connector - failed to connect to {:?} port: {} err: {:?}",
self.tag,
self.req.as_ref().unwrap().host(),
self.port,
err
);
!(self.addrs.is_none() || self.addrs.as_ref().unwrap().is_empty())
}
}
impl<T: Address> Future for TcpConnectorResponse<T> {
type Output = Result<Io, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// connect
loop {
if let Some(new) = this.stream.as_mut() {
match new.as_mut().poll(cx) {
Poll::Ready(Ok(sock)) => {
let req = this.req.take().unwrap();
log::trace!(
"{}: TCP connector - successfully connected to connecting to {:?} - {:?}",
this.tag,
req.host(),
sock.query::<types::PeerAddr>().get()
);
sock.set_tag(this.tag);
return Poll::Ready(Ok(sock));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => {
if !this.can_continue(&err) {
return Poll::Ready(Err(err.into()));
}
}
}
}
// try to connect
let addr = this.addrs.as_mut().unwrap().pop_front().unwrap();
this.stream = Some(Box::pin(tcp_connect_in(addr, this.pool)));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[ntex::test]
async fn test_connect() {
let server = ntex::server::test_server(|| {
ntex_service::fn_service(|_| async { Ok::<_, ()>(()) })
});
let srv = Connector::default().tag("T").memory_pool(PoolId::P5);
let result = srv.connect("").await;
assert!(result.is_err());
let result = srv.connect("localhost:99999").await;
assert!(result.is_err());
assert!(format!("{:?}", srv).contains("Connector"));
let srv = Connector::default();
let result = srv.connect(format!("{}", server.addr())).await;
assert!(result.is_ok());
let msg = Connect::new(format!("{}", server.addr())).set_addrs(vec![
format!("127.0.0.1:{}", server.addr().port() - 1)
.parse()
.unwrap(),
server.addr(),
]);
let result = crate::connect::connect(msg).await;
assert!(result.is_ok());
let msg = Connect::new(server.addr());
let result = crate::connect::connect(msg).await;
assert!(result.is_ok());
}
}

View file

@ -0,0 +1,61 @@
use ntex_http::Uri;
use super::Address;
impl Address for Uri {
fn host(&self) -> &str {
self.host().unwrap_or("")
}
fn port(&self) -> Option<u16> {
if let Some(port) = self.port_u16() {
Some(port)
} else {
port(self.scheme_str())
}
}
}
// TODO: load data from file
fn port(scheme: Option<&str>) -> Option<u16> {
if let Some(scheme) = scheme {
match scheme {
"http" => Some(80),
"https" => Some(443),
"ws" => Some(80),
"wss" => Some(443),
"amqp" => Some(5672),
"amqps" => Some(5671),
"sb" => Some(5671),
"mqtt" => Some(1883),
"mqtts" => Some(8883),
_ => None,
}
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn port_tests() {
for (s, p) in [
("http", 80),
("https", 443),
("ws", 80),
("wss", 443),
("amqp", 5672),
("amqps", 5671),
("sb", 5671),
("mqtt", 1883),
("mqtts", 8883),
] {
assert_eq!(port(Some(s)), Some(p))
}
assert_eq!(port(Some("unknowns")), None);
assert_eq!(port(None), None);
}
}

View file

@ -1,117 +1,10 @@
//! Utility for async runtime abstraction
#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
mod compat;
pub mod connect;
pub use ntex_io::Io;
pub use ntex_rt::spawn;
pub use ntex_rt::{spawn, spawn_blocking};
#[cfg(feature = "tokio")]
pub use ntex_tokio::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(unix, feature = "tokio"))]
pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "async-std",
not(feature = "tokio"),
not(feature = "glommio")
))]
pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(
unix,
feature = "async-std",
not(feature = "tokio"),
not(feature = "glommio")
))]
pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "glommio",
not(feature = "tokio"),
not(feature = "async-std")
))]
pub use ntex_glommio::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(
unix,
feature = "glommio",
not(feature = "tokio"),
not(feature = "async-std")
))]
pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "glommio")
))]
mod no_rt {
use ntex_io::Io;
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(_: std::net::SocketAddr) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(
_: std::net::SocketAddr,
_: ntex_bytes::PoolRef,
) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
#[cfg(unix)]
/// Opens a unix stream connection.
pub async fn unix_connect<'a, P>(_: P) -> std::io::Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
#[cfg(unix)]
/// Opens a unix stream connection and specified memory pool.
pub async fn unix_connect_in<'a, P>(_: P, _: ntex_bytes::PoolRef) -> std::io::Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
/// Convert std TcpStream to tokio's TcpStream
pub fn from_tcp_stream(_: std::net::TcpStream) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
#[cfg(unix)]
/// Convert std UnixStream to tokio's UnixStream
pub fn from_unix_stream(_: std::os::unix::net::UnixStream) -> std::io::Result<Io> {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"runtime is not configure",
))
}
}
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "glommio")
))]
pub use no_rt::*;
pub use self::compat::*;