* Rename Transform to Middleware

* Drop FnService's shutdown helper

* refactor Service trait to use GAT

* Migrate ntex to new service

* move Stack to service

* use BoxFuture

* simplify poll_shitdown method
This commit is contained in:
Nikolay Kim 2022-12-27 00:58:38 +06:00 committed by GitHub
parent de9738c9c0
commit 537d8dc18d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
81 changed files with 2069 additions and 2495 deletions

View file

@ -4,7 +4,7 @@ use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin};
use ntex_bytes::{PoolId, PoolRef};
use ntex_io::{types, Io};
use ntex_service::{Service, ServiceFactory};
use ntex_util::future::{Either, Ready};
use ntex_util::future::{BoxFuture, Either, Ready};
use crate::{net::tcp_connect_in, Address, Connect, ConnectError, Resolver};
@ -34,7 +34,7 @@ impl<T> Connector<T> {
impl<T: Address> Connector<T> {
/// Resolve and connect to remote host
pub fn connect<U>(&self, message: U) -> impl Future<Output = Result<Io, ConnectError>>
pub async fn connect<U>(&self, message: U) -> Result<Io, ConnectError>
where
Connect<T>: From<U>,
{
@ -42,6 +42,7 @@ impl<T: Address> Connector<T> {
state: ConnectState::Resolve(self.resolver.call(message.into())),
pool: self.pool,
}
.await
}
}
@ -60,15 +61,15 @@ impl<T> Clone for Connector<T> {
}
}
impl<T: Address, C> ServiceFactory<Connect<T>, C> for Connector<T> {
impl<T: Address, C: 'static> ServiceFactory<Connect<T>, C> for Connector<T> {
type Response = Io;
type Error = ConnectError;
type Service = Connector<T>;
type InitError = ();
type Future = Ready<Self::Service, Self::InitError>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;
#[inline]
fn new_service(&self, _: C) -> Self::Future {
fn create(&self, _: C) -> Self::Future<'_> {
Ready::Ok(self.clone())
}
}
@ -76,32 +77,27 @@ impl<T: Address, C> ServiceFactory<Connect<T>, C> for Connector<T> {
impl<T: Address> Service<Connect<T>> for Connector<T> {
type Response = Io;
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
type Future<'f> = ConnectServiceResponse<'f, T>;
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn call(&self, req: Connect<T>) -> Self::Future {
fn call(&self, req: Connect<T>) -> Self::Future<'_> {
ConnectServiceResponse::new(self.resolver.call(req))
}
}
enum ConnectState<T: Address> {
Resolve(<Resolver<T> as Service<Connect<T>>>::Future),
enum ConnectState<'f, T: Address> {
Resolve(<Resolver<T> as Service<Connect<T>>>::Future<'f>),
Connect(TcpConnectorResponse<T>),
}
#[doc(hidden)]
pub struct ConnectServiceResponse<T: Address> {
state: ConnectState<T>,
pub struct ConnectServiceResponse<'f, T: Address> {
state: ConnectState<'f, T>,
pool: PoolRef,
}
impl<T: Address> ConnectServiceResponse<T> {
pub(super) fn new(fut: <Resolver<T> as Service<Connect<T>>>::Future) -> Self {
impl<'f, T: Address> ConnectServiceResponse<'f, T> {
pub(super) fn new(fut: <Resolver<T> as Service<Connect<T>>>::Future<'f>) -> Self {
Self {
state: ConnectState::Resolve(fut),
pool: PoolId::P0.pool_ref(),
@ -109,7 +105,7 @@ impl<T: Address> ConnectServiceResponse<T> {
}
}
impl<T: Address> Future for ConnectServiceResponse<T> {
impl<'f, T: Address> Future for ConnectServiceResponse<'f, T> {
type Output = Result<Io, ConnectError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -150,7 +146,7 @@ struct TcpConnectorResponse<T> {
port: u16,
addrs: Option<VecDeque<SocketAddr>>,
#[allow(clippy::type_complexity)]
stream: Option<Pin<Box<dyn Future<Output = Result<Io, io::Error>>>>>,
stream: Option<BoxFuture<'static, Result<Io, io::Error>>>,
pool: PoolRef,
}