Migrate ntex-connect to async fn in trait

This commit is contained in:
Nikolay Kim 2024-01-07 05:03:02 +06:00
parent 60620d4587
commit 2e12cc6edf
17 changed files with 134 additions and 231 deletions

View file

@ -4,7 +4,7 @@ use std::{collections::VecDeque, fmt, future::Future, io, net::SocketAddr, pin::
use ntex_bytes::{PoolId, PoolRef};
use ntex_io::{types, Io};
use ntex_service::{Service, ServiceCtx, ServiceFactory};
use ntex_util::future::{BoxFuture, Either, Ready};
use ntex_util::future::{BoxFuture, Either};
use crate::{net::tcp_connect_in, Address, Connect, ConnectError, Resolver};
@ -16,12 +16,12 @@ pub struct Connector<T> {
}
impl<T> Connector<T> {
/// Construct new connect service with custom dns resolver
/// Construct new connect service with default dns resolver
pub fn new() -> Self {
Connector {
resolver: Resolver::new(),
pool: PoolId::P0.pool_ref(),
tag: "",
tag: "TCP-CLIENT",
}
}
@ -49,12 +49,27 @@ impl<T: Address> Connector<T> {
where
Connect<T>: From<U>,
{
ConnectServiceResponse {
state: ConnectState::Resolve(Box::pin(self.resolver.lookup(message.into()))),
tag: self.tag,
pool: self.pool,
// resolve first
let address = self.resolver.lookup(message.into()).await?;
let port = address.port();
let Connect { req, addr, .. } = address;
if let Some(addr) = addr {
TcpConnectorResponse::new(req, port, addr, self.tag, self.pool).await
} else if let Some(addr) = req.addr() {
TcpConnectorResponse::new(
req,
addr.port(),
Either::Left(addr),
self.tag,
self.pool,
)
.await
} else {
log::error!("{}: TCP connector: got unresolved address", self.tag);
Err(ConnectError::Unresolved)
}
.await
}
}
@ -89,93 +104,22 @@ impl<T: Address, C: 'static> ServiceFactory<Connect<T>, C> for Connector<T> {
type Error = ConnectError;
type Service = Connector<T>;
type InitError = ();
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;
#[inline]
fn create(&self, _: C) -> Self::Future<'_> {
Ready::Ok(self.clone())
async fn create(&self, _: C) -> Result<Self::Service, Self::InitError> {
Ok(self.clone())
}
}
impl<T: Address> Service<Connect<T>> for Connector<T> {
type Response = Io;
type Error = ConnectError;
type Future<'f> = ConnectServiceResponse<'f, T>;
#[inline]
fn call<'a>(&'a self, req: Connect<T>, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ConnectServiceResponse {
state: ConnectState::Resolve(Box::pin(self.resolver.lookup(req))),
pool: PoolId::P0.pool_ref(),
tag: self.tag,
}
}
}
enum ConnectState<'f, T: Address> {
Resolve(BoxFuture<'f, Result<Connect<T>, ConnectError>>),
Connect(TcpConnectorResponse<T>),
}
#[doc(hidden)]
pub struct ConnectServiceResponse<'f, T: Address> {
state: ConnectState<'f, T>,
pool: PoolRef,
tag: &'static str,
}
impl<'f, T: Address> ConnectServiceResponse<'f, T> {
pub(super) fn new(fut: BoxFuture<'f, Result<Connect<T>, ConnectError>>) -> Self {
Self {
state: ConnectState::Resolve(fut),
pool: PoolId::P0.pool_ref(),
tag: "",
}
}
}
impl<'f, T: Address> fmt::Debug for ConnectServiceResponse<'f, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectServiceResponse")
.field("tag", &self.tag)
.field("pool", &self.pool)
.finish()
}
}
impl<'f, T: Address> Future for ConnectServiceResponse<'f, T> {
type Output = Result<Io, ConnectError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.state {
ConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(address) => {
let port = address.port();
let Connect { req, addr, .. } = address;
if let Some(addr) = addr {
self.state = ConnectState::Connect(TcpConnectorResponse::new(
req, port, addr, self.tag, self.pool,
));
self.poll(cx)
} else if let Some(addr) = req.addr() {
self.state = ConnectState::Connect(TcpConnectorResponse::new(
req,
addr.port(),
Either::Left(addr),
self.tag,
self.pool,
));
self.poll(cx)
} else {
error!("{}: TCP connector: got unresolved address", self.tag);
Poll::Ready(Err(ConnectError::Unresolved))
}
}
},
ConnectState::Connect(ref mut fut) => Pin::new(fut).poll(cx),
}
async fn call(
&self,
req: Connect<T>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
self.connect(req).await
}
}
@ -198,8 +142,8 @@ impl<T: Address> TcpConnectorResponse<T> {
tag: &'static str,
pool: PoolRef,
) -> TcpConnectorResponse<T> {
trace!(
"{}TCP connector - connecting to {:?} addr:{:?} port:{}",
log::trace!(
"{}: TCP connector - connecting to {:?} addr:{:?} port:{}",
tag,
req.host(),
addr,
@ -227,8 +171,8 @@ impl<T: Address> TcpConnectorResponse<T> {
}
fn can_continue(&self, err: &io::Error) -> bool {
trace!(
"{}TCP connector - failed to connect to {:?} port: {} err: {:?}",
log::trace!(
"{}: TCP connector - failed to connect to {:?} port: {} err: {:?}",
self.tag,
self.req.as_ref().unwrap().host(),
self.port,
@ -250,8 +194,8 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
match new.as_mut().poll(cx) {
Poll::Ready(Ok(sock)) => {
let req = this.req.take().unwrap();
trace!(
"{}TCP connector - successfully connected to connecting to {:?} - {:?}",
log::trace!(
"{}: TCP connector - successfully connected to connecting to {:?} - {:?}",
this.tag,
req.host(),
sock.query::<types::PeerAddr>().get()