migrate connect, server, util

This commit is contained in:
Nikolay Kim 2021-12-24 01:38:21 +06:00
parent ea7d86e905
commit 75b54e6b04
15 changed files with 232 additions and 231 deletions

View file

@ -5,7 +5,7 @@ pub use tls_openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslM
use ntex_tls::openssl::SslConnector as IoSslConnector;
use crate::io::{Base, Io};
use crate::io::{Base, Io, SealedService};
use crate::service::{Service, ServiceFactory};
use crate::util::{PoolId, Ready};
@ -76,6 +76,11 @@ impl<T: Address + 'static> Connector<T> {
}
}
}
/// Produce sealed io stream (IoBoxed)
pub fn seal(self) -> SealedService<Connector<T>> {
SealedService::new(self)
}
}
impl<T> Clone for Connector<T> {

View file

@ -102,8 +102,7 @@ impl<T> Clone for Resolver<T> {
}
}
impl<T: Address> ServiceFactory for Resolver<T> {
type Request = Connect<T>;
impl<T: Address> ServiceFactory<Connect<T>> for Resolver<T> {
type Response = Connect<T>;
type Error = ConnectError;
type Config = ();
@ -116,8 +115,7 @@ impl<T: Address> ServiceFactory for Resolver<T> {
}
}
impl<T: Address> Service for Resolver<T> {
type Request = Connect<T>;
impl<T: Address> Service<Connect<T>> for Resolver<T> {
type Response = Connect<T>;
type Error = ConnectError;
type Future = Pin<Box<dyn Future<Output = Result<Connect<T>, Self::Error>>>>;

View file

@ -5,7 +5,7 @@ pub use tls_rustls::{ClientConfig, ServerName};
use ntex_tls::rustls::TlsConnector;
use crate::io::{Base, Io};
use crate::io::{Base, Io, SealedService};
use crate::service::{Service, ServiceFactory};
use crate::util::{PoolId, Ready};
@ -80,6 +80,11 @@ impl<T: Address + 'static> Connector<T> {
}
}
}
/// Produce sealed io stream (IoBoxed)
pub fn seal(self) -> SealedService<Connector<T>> {
SealedService::new(self)
}
}
impl<T> Clone for Connector<T> {

View file

@ -1,7 +1,7 @@
use std::task::{Context, Poll};
use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin};
use crate::io::{types, Io};
use crate::io::{types, Io, SealedService};
use crate::rt::tcp_connect_in;
use crate::service::{Service, ServiceFactory};
use crate::util::{Either, PoolId, PoolRef, Ready};
@ -46,6 +46,11 @@ impl<T: Address> Connector<T> {
pool: self.pool,
}
}
/// Produce sealed io stream (IoBoxed)
pub fn seal(self) -> SealedService<Connector<T>, Io> {
SealedService::new(self)
}
}
impl<T> Default for Connector<T> {
@ -63,8 +68,7 @@ impl<T> Clone for Connector<T> {
}
}
impl<T: Address> ServiceFactory for Connector<T> {
type Request = Connect<T>;
impl<T: Address> ServiceFactory<Connect<T>> for Connector<T> {
type Response = Io;
type Error = ConnectError;
type Config = ();
@ -78,8 +82,7 @@ impl<T: Address> ServiceFactory for Connector<T> {
}
}
impl<T: Address> Service for Connector<T> {
type Request = Connect<T>;
impl<T: Address> Service<Connect<T>> for Connector<T> {
type Response = Io;
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
@ -96,7 +99,7 @@ impl<T: Address> Service for Connector<T> {
}
enum ConnectState<T: Address> {
Resolve(<Resolver<T> as Service>::Future),
Resolve(<Resolver<T> as Service<Connect<T>>>::Future),
Connect(TcpConnectorResponse<T>),
}
@ -107,7 +110,7 @@ pub struct ConnectServiceResponse<T: Address> {
}
impl<T: Address> ConnectServiceResponse<T> {
pub(super) fn new(fut: <Resolver<T> as Service>::Future) -> Self {
pub(super) fn new(fut: <Resolver<T> as Service<Connect<T>>>::Future) -> Self {
Self {
state: ConnectState::Resolve(fut),
pool: PoolId::P0.pool_ref(),

View file

@ -198,7 +198,7 @@ impl ServerBuilder {
where
U: net::ToSocketAddrs,
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
R: ServiceFactory<Io, Config = ()>,
{
let sockets = bind_addr(addr, self.backlog)?;
@ -226,7 +226,7 @@ impl ServerBuilder {
N: AsRef<str>,
U: AsRef<std::path::Path>,
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
R: ServiceFactory<Io, Config = ()>,
{
use std::os::unix::net::UnixListener;
@ -255,7 +255,7 @@ impl ServerBuilder {
) -> io::Result<Self>
where
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
R: ServiceFactory<Io, Config = ()>,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let token = self.token.next();
@ -280,7 +280,7 @@ impl ServerBuilder {
) -> io::Result<Self>
where
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
R: ServiceFactory<Io, Config = ()>,
{
let token = self.token.next();
self.services.push(Factory::create(

View file

@ -282,8 +282,8 @@ impl ServiceRuntime {
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service<T, F>(&self, name: &str, service: F)
where
F: service::IntoServiceFactory<T>,
T: service::ServiceFactory<Config = (), Request = Io> + 'static,
F: service::IntoServiceFactory<T, Io>,
T: service::ServiceFactory<Io, Config = ()> + 'static,
T::Future: 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
@ -297,8 +297,8 @@ impl ServiceRuntime {
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service_in<T, F>(&self, name: &str, pool: PoolId, service: F)
where
F: service::IntoServiceFactory<T>,
T: service::ServiceFactory<Config = (), Request = Io> + 'static,
F: service::IntoServiceFactory<T, Io>,
T: service::ServiceFactory<Io, Config = ()> + 'static,
T::Future: 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
@ -329,7 +329,7 @@ impl ServiceRuntime {
type BoxedNewService = Box<
dyn service::ServiceFactory<
Request = (Option<CounterGuard>, ServerMessage),
(Option<CounterGuard>, ServerMessage),
Response = (),
Error = (),
InitError = (),
@ -344,15 +344,14 @@ struct ServiceFactory<T> {
pool: PoolId,
}
impl<T> service::ServiceFactory for ServiceFactory<T>
impl<T> service::ServiceFactory<(Option<CounterGuard>, ServerMessage)> for ServiceFactory<T>
where
T: service::ServiceFactory<Config = (), Request = Io>,
T: service::ServiceFactory<Io, Config = ()>,
T::Future: 'static,
T::Service: 'static,
T::Error: 'static,
T::InitError: fmt::Debug + 'static,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
type Error = ();
type InitError = ();

View file

@ -21,7 +21,7 @@ pub(super) enum ServerMessage {
}
pub(super) trait StreamServiceFactory: Send + Clone + 'static {
type Factory: ServiceFactory<Config = (), Request = Io>;
type Factory: ServiceFactory<Io, Config = ()>;
fn create(&self, _: Config) -> Self::Factory;
}
@ -38,7 +38,7 @@ pub(super) trait InternalServiceFactory: Send {
pub(super) type BoxedServerService = Box<
dyn Service<
Request = (Option<CounterGuard>, ServerMessage),
(Option<CounterGuard>, ServerMessage),
Response = (),
Error = (),
Future = Ready<(), ()>,
@ -59,13 +59,12 @@ impl<T> StreamService<T> {
}
}
impl<T> Service for StreamService<T>
impl<T> Service<(Option<CounterGuard>, ServerMessage)> for StreamService<T>
where
T: Service<Request = Io>,
T: Service<Io>,
T::Future: 'static,
T::Error: 'static,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
type Error = ();
type Future = Ready<(), ()>;
@ -195,7 +194,7 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
impl<F, T> StreamServiceFactory for F
where
F: Fn(Config) -> T + Send + Clone + 'static,
T: ServiceFactory<Config = (), Request = Io>,
T: ServiceFactory<Io, Config = ()>,
{
type Factory = T;

View file

@ -41,7 +41,7 @@ use crate::{io::Io, service::ServiceFactory};
pub fn test_server<F, R>(factory: F) -> TestServer
where
F: Fn() -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
R: ServiceFactory<Io, Config = ()>,
{
let (tx, rx) = mpsc::channel();

View file

@ -1,7 +1,9 @@
//! Service that buffers incomming requests.
use std::cell::{Cell, RefCell};
use std::task::{Context, Poll};
use std::{collections::VecDeque, future::Future, pin::Pin, rc::Rc};
use std::{
collections::VecDeque, future::Future, marker::PhantomData, pin::Pin, rc::Rc,
};
use crate::channel::oneshot;
use crate::service::{IntoService, Service, Transform};
@ -11,12 +13,13 @@ use crate::util::Either;
/// Buffer - service factory for service that can buffer incoming request.
///
/// Default number of buffered requests is 16
pub struct Buffer<E> {
pub struct Buffer<R, E> {
buf_size: usize,
err: Rc<dyn Fn() -> E>,
_t: PhantomData<R>,
}
impl<E> Buffer<E> {
impl<R, E> Buffer<R, E> {
pub fn new<F>(f: F) -> Self
where
F: Fn() -> E + 'static,
@ -24,6 +27,7 @@ impl<E> Buffer<E> {
Self {
buf_size: 16,
err: Rc::new(f),
_t: PhantomData,
}
}
@ -33,20 +37,21 @@ impl<E> Buffer<E> {
}
}
impl<E> Clone for Buffer<E> {
impl<R, E> Clone for Buffer<R, E> {
fn clone(&self) -> Self {
Self {
buf_size: self.buf_size,
err: self.err.clone(),
_t: PhantomData,
}
}
}
impl<S, E> Transform<S> for Buffer<E>
impl<R, S, E> Transform<S> for Buffer<R, E>
where
S: Service<Error = E>,
S: Service<R, Error = E>,
{
type Service = BufferService<S, E>;
type Service = BufferService<R, S, E>;
fn new_transform(&self, service: S) -> Self::Service {
BufferService {
@ -65,26 +70,26 @@ where
/// Buffer service - service that can buffer incoming requests.
///
/// Default number of buffered requests is 16
pub struct BufferService<S: Service<Error = E>, E> {
pub struct BufferService<R, S: Service<R, Error = E>, E> {
size: usize,
inner: Rc<Inner<S, E>>,
inner: Rc<Inner<R, S, E>>,
}
struct Inner<S: Service<Error = E>, E> {
struct Inner<R, S: Service<R, Error = E>, E> {
ready: Cell<bool>,
service: S,
waker: LocalWaker,
err: Rc<dyn Fn() -> E>,
buf: RefCell<VecDeque<(oneshot::Sender<S::Request>, S::Request)>>,
buf: RefCell<VecDeque<(oneshot::Sender<R>, R)>>,
}
impl<S, E> BufferService<S, E>
impl<R, S, E> BufferService<R, S, E>
where
S: Service<Error = E>,
S: Service<R, Error = E>,
{
pub fn new<U, F>(size: usize, err: F, service: U) -> Self
where
U: IntoService<S>,
U: IntoService<S, R>,
F: Fn() -> E + 'static,
{
Self {
@ -100,9 +105,9 @@ where
}
}
impl<S, E> Clone for BufferService<S, E>
impl<R, S, E> Clone for BufferService<R, S, E>
where
S: Service<Error = E> + Clone,
S: Service<R, Error = E> + Clone,
{
fn clone(&self) -> Self {
Self {
@ -118,14 +123,13 @@ where
}
}
impl<S, E> Service for BufferService<S, E>
impl<R, S, E> Service<R> for BufferService<R, S, E>
where
S: Service<Error = E>,
S: Service<R, Error = E>,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = Either<S::Future, BufferServiceResponse<S, E>>;
type Future = Either<S::Future, BufferServiceResponse<R, S, E>>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -158,7 +162,7 @@ where
}
#[inline]
fn call(&self, req: S::Request) -> Self::Future {
fn call(&self, req: R) -> Self::Future {
if self.inner.ready.get() {
self.inner.ready.set(false);
Either::Left(self.inner.service.call(req))
@ -178,21 +182,21 @@ where
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct BufferServiceResponse<S: Service<Error = E>, E> {
pub struct BufferServiceResponse<R, S: Service<R, Error = E>, E> {
#[pin]
state: State<S, E>,
state: State<R, S, E>,
}
}
pin_project_lite::pin_project! {
#[project = StateProject]
enum State<S: Service<Error = E>, E> {
Tx { rx: oneshot::Receiver<S::Request>, inner: Rc<Inner<S, E>> },
Srv { #[pin] fut: S::Future, inner: Rc<Inner<S, E>> },
enum State<R, S: Service<R, Error = E>, E> {
Tx { rx: oneshot::Receiver<R>, inner: Rc<Inner<R, S, E>> },
Srv { #[pin] fut: S::Future, inner: Rc<Inner<R, S, E>> },
}
}
impl<S: Service<Error = E>, E> Future for BufferServiceResponse<S, E> {
impl<R, S: Service<R, Error = E>, E> Future for BufferServiceResponse<R, S, E> {
type Output = Result<S::Response, S::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,5 +1,5 @@
//! Service that limits number of in-flight async requests.
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use super::counter::{Counter, CounterGuard};
use crate::{IntoService, Service, Transform};
@ -24,14 +24,14 @@ impl Default for InFlight {
}
}
impl<S> Transform<S> for InFlight
where
S: Service,
{
impl<S> Transform<S> for InFlight {
type Service = InFlightService<S>;
fn new_transform(&self, service: S) -> Self::Service {
InFlightService::new(self.max_inflight, service)
InFlightService {
service,
count: Counter::new(self.max_inflight),
}
}
}
@ -40,13 +40,11 @@ pub struct InFlightService<S> {
service: S,
}
impl<S> InFlightService<S>
where
S: Service,
{
pub fn new<U>(max: usize, service: U) -> Self
impl<S> InFlightService<S> {
pub fn new<U, R>(max: usize, service: U) -> Self
where
U: IntoService<S>,
S: Service<R>,
U: IntoService<S, R>,
{
Self {
count: Counter::new(max),
@ -55,14 +53,13 @@ where
}
}
impl<T> Service for InFlightService<T>
impl<T, R> Service<R> for InFlightService<T>
where
T: Service,
T: Service<R>,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = InFlightServiceResponse<T>;
type Future = InFlightServiceResponse<T, R>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -82,24 +79,26 @@ where
}
#[inline]
fn call(&self, req: T::Request) -> Self::Future {
fn call(&self, req: R) -> Self::Future {
InFlightServiceResponse {
fut: self.service.call(req),
_guard: self.count.get(),
_t: PhantomData,
}
}
}
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct InFlightServiceResponse<T: Service> {
pub struct InFlightServiceResponse<T: Service<R>, R> {
#[pin]
fut: T::Future,
_guard: CounterGuard,
_t: PhantomData<R>
}
}
impl<T: Service> Future for InFlightServiceResponse<T> {
impl<T: Service<R>, R> Future for InFlightServiceResponse<T, R> {
type Output = Result<T::Response, T::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -7,13 +7,13 @@ use crate::{util::Ready, Service, ServiceFactory};
/// KeepAlive service factory
///
/// Controls min time between requests.
pub struct KeepAlive<R, E, F> {
pub struct KeepAlive<E, F> {
f: F,
ka: Millis,
_t: marker::PhantomData<(R, E)>,
_t: marker::PhantomData<E>,
}
impl<R, E, F> KeepAlive<R, E, F>
impl<E, F> KeepAlive<E, F>
where
F: Fn() -> E + Clone,
{
@ -30,7 +30,7 @@ where
}
}
impl<R, E, F> Clone for KeepAlive<R, E, F>
impl<E, F> Clone for KeepAlive<E, F>
where
F: Clone,
{
@ -43,16 +43,15 @@ where
}
}
impl<R, E, F> ServiceFactory for KeepAlive<R, E, F>
impl<R, E, F> ServiceFactory<R> for KeepAlive<E, F>
where
F: Fn() -> E + Clone,
{
type Request = R;
type Response = R;
type Error = E;
type InitError = Infallible;
type Config = ();
type Service = KeepAliveService<R, E, F>;
type Service = KeepAliveService<E, F>;
type Future = Ready<Self::Service, Self::InitError>;
fn new_service(&self, _: ()) -> Self::Future {
@ -60,15 +59,15 @@ where
}
}
pub struct KeepAliveService<R, E, F> {
pub struct KeepAliveService<E, F> {
f: F,
dur: Millis,
sleep: Sleep,
expire: Cell<Instant>,
_t: marker::PhantomData<(R, E)>,
_t: marker::PhantomData<E>,
}
impl<R, E, F> KeepAliveService<R, E, F>
impl<E, F> KeepAliveService<E, F>
where
F: Fn() -> E,
{
@ -85,11 +84,10 @@ where
}
}
impl<R, E, F> Service for KeepAliveService<R, E, F>
impl<R, E, F> Service<R> for KeepAliveService<E, F>
where
F: Fn() -> E,
{
type Request = R;
type Response = R;
type Error = E;
type Future = Ready<R, E>;

View file

@ -40,11 +40,10 @@ where
}
}
impl<S, I> Service for SinkService<S, I>
impl<S, I> Service<I> for SinkService<S, I>
where
S: Sink<I> + Unpin,
{
type Request = I;
type Response = ();
type Error = S::Error;
type Future = Ready<(), S::Error>;

View file

@ -5,12 +5,12 @@ use crate::service::{IntoService, Service};
use crate::{util::poll_fn, Sink, Stream};
pin_project_lite::pin_project! {
pub struct Dispatcher<R, S, T, U>
pub struct Dispatcher<Req, R, S, T, U>
where
R: 'static,
S: Service<Response = Option<R>>,
S: Service<Req, Response = Option<R>>,
S: 'static,
T: Stream<Item = Result<S::Request, S::Error>>,
T: Stream<Item = Result<Req, S::Error>>,
T: Unpin,
U: Sink<Result<R, S::Error>>,
U: Unpin,
@ -24,18 +24,18 @@ pin_project_lite::pin_project! {
}
}
impl<R, S, T, U> Dispatcher<R, S, T, U>
impl<Req, R, S, T, U> Dispatcher<Req, R, S, T, U>
where
R: 'static,
S: Service<Response = Option<R>> + 'static,
S: Service<Req, Response = Option<R>> + 'static,
S::Error: fmt::Debug,
T: Stream<Item = Result<S::Request, S::Error>> + Unpin,
T: Stream<Item = Result<Req, S::Error>> + Unpin,
U: Sink<Result<R, S::Error>> + Unpin + 'static,
U::Error: fmt::Debug,
{
pub fn new<F>(stream: T, sink: U, service: F) -> Self
where
F: IntoService<S>,
F: IntoService<S, Req>,
{
Dispatcher {
stream,
@ -47,12 +47,13 @@ where
}
}
impl<R, S, T, U> Future for Dispatcher<R, S, T, U>
impl<Req, R, S, T, U> Future for Dispatcher<Req, R, S, T, U>
where
R: 'static,
S: Service<Response = Option<R>> + 'static,
S::Error: fmt::Debug,
T: Stream<Item = Result<S::Request, S::Error>> + Unpin,
S: Service<Req, Response = Option<R>> + 'static,
S::Future: 'static,
S::Error: fmt::Debug + 'static,
T: Stream<Item = Result<Req, S::Error>> + Unpin,
U: Sink<Result<R, S::Error>> + Unpin + 'static,
U::Error: fmt::Debug,
{

View file

@ -2,7 +2,10 @@
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll};
use std::{
fmt, future::Future, marker, marker::PhantomData, pin::Pin, task::Context,
task::Poll,
};
use crate::service::{IntoService, Service, Transform};
use crate::time::{sleep, Millis, Sleep};
@ -82,10 +85,7 @@ impl Clone for Timeout {
}
}
impl<S> Transform<S> for Timeout
where
S: Service,
{
impl<S> Transform<S> for Timeout {
type Service = TimeoutService<S>;
fn new_transform(&self, service: S) -> Self::Service {
@ -103,14 +103,12 @@ pub struct TimeoutService<S> {
timeout: Millis,
}
impl<S> TimeoutService<S>
where
S: Service,
{
pub fn new<T, U>(timeout: T, service: U) -> Self
impl<S> TimeoutService<S> {
pub fn new<T, U, R>(timeout: T, service: U) -> Self
where
T: Into<Millis>,
U: IntoService<S>,
S: Service<R>,
U: IntoService<S, R>,
{
TimeoutService {
timeout: timeout.into(),
@ -119,14 +117,13 @@ where
}
}
impl<S> Service for TimeoutService<S>
impl<S, R> Service<R> for TimeoutService<S>
where
S: Service,
S: Service<R>,
{
type Request = S::Request;
type Response = S::Response;
type Error = TimeoutError<S::Error>;
type Future = Either<TimeoutServiceResponse<S>, TimeoutServiceResponse2<S>>;
type Future = Either<TimeoutServiceResponse<S, R>, TimeoutServiceResponse2<S, R>>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -138,15 +135,17 @@ where
self.service.poll_shutdown(cx, is_error)
}
fn call(&self, request: S::Request) -> Self::Future {
fn call(&self, request: R) -> Self::Future {
if self.timeout.is_zero() {
Either::Right(TimeoutServiceResponse2 {
fut: self.service.call(request),
_t: PhantomData,
})
} else {
Either::Left(TimeoutServiceResponse {
fut: self.service.call(request),
sleep: sleep(self.timeout),
_t: PhantomData,
})
}
}
@ -156,16 +155,17 @@ pin_project_lite::pin_project! {
/// `TimeoutService` response future
#[doc(hidden)]
#[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> {
pub struct TimeoutServiceResponse<T: Service<R>, R> {
#[pin]
fut: T::Future,
sleep: Sleep,
_t: PhantomData<R>
}
}
impl<T> Future for TimeoutServiceResponse<T>
impl<T, R> Future for TimeoutServiceResponse<T, R>
where
T: Service,
T: Service<R>,
{
type Output = Result<T::Response, TimeoutError<T::Error>>;
@ -191,15 +191,16 @@ pin_project_lite::pin_project! {
/// `TimeoutService` response future
#[doc(hidden)]
#[derive(Debug)]
pub struct TimeoutServiceResponse2<T: Service> {
pub struct TimeoutServiceResponse2<T: Service<R>, R> {
#[pin]
fut: T::Future,
_t: PhantomData<R>,
}
}
impl<T> Future for TimeoutServiceResponse2<T>
impl<T, R> Future for TimeoutServiceResponse2<T, R>
where
T: Service,
T: Service<R>,
{
type Output = Result<T::Response, TimeoutError<T::Error>>;

View file

@ -1,121 +1,112 @@
//! Contains `Variant` service and related types and functions.
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use crate::service::{IntoServiceFactory, Service, ServiceFactory};
/// Construct `Variant` service factory.
///
/// Variant service allow to combine multiple different services into a single service.
pub fn variant<A: ServiceFactory>(factory: A) -> Variant<A> {
Variant { factory }
pub fn variant<V1: ServiceFactory<V1R>, V1R>(factory: V1) -> Variant<V1, V1R> {
Variant {
factory,
_t: PhantomData,
}
}
/// Combine multiple different service types into a single service.
pub struct Variant<A> {
pub struct Variant<A, AR> {
factory: A,
_t: PhantomData<AR>,
}
impl<A> Variant<A>
impl<A, AR> Variant<A, AR>
where
A: ServiceFactory,
A: ServiceFactory<AR>,
A::Config: Clone,
{
/// Convert to a Variant with two request types
pub fn and<B, F>(self, factory: F) -> VariantFactory2<A, B>
pub fn v2<B, BR, F>(self, factory: F) -> VariantFactory2<A, B, AR, BR>
where
B: ServiceFactory<
BR,
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
F: IntoServiceFactory<B>,
F: IntoServiceFactory<B, BR>,
{
VariantFactory2 {
A: self.factory,
V2: factory.into_factory(),
}
}
/// Convert to a Variant with two request types
pub fn v2<B, F>(self, factory: F) -> VariantFactory2<A, B>
where
B: ServiceFactory<
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
F: IntoServiceFactory<B>,
{
VariantFactory2 {
A: self.factory,
V1: self.factory,
V2: factory.into_factory(),
_t: PhantomData,
}
}
}
macro_rules! variant_impl_and ({$fac1_type:ident, $fac2_type:ident, $name:ident, $m_name:ident, ($($T:ident),+)} => {
macro_rules! variant_impl_and ({$fac1_type:ident, $fac2_type:ident, $name:ident, $r_name:ident, $m_name:ident, ($($T:ident),+), ($($R:ident),+)} => {
impl<V1, $($T),+> $fac1_type<V1, $($T),+>
#[allow(non_snake_case)]
impl<V1, $($T,)+ V1R, $($R,)+> $fac1_type<V1, $($T,)+ V1R, $($R,)+>
where
V1: ServiceFactory,
V1: ServiceFactory<V1R>,
V1::Config: Clone,
{
/// Convert to a Variant with more request types
pub fn $m_name<$name, F>(self, factory: F) -> $fac2_type<V1, $($T,)+ $name>
where $name: ServiceFactory<
pub fn $m_name<$name, $r_name, F>(self, factory: F) -> $fac2_type<V1, $($T,)+ $name, V1R, $($R,)+ $r_name>
where $name: ServiceFactory<$r_name,
Config = V1::Config,
Response = V1::Response,
Error = V1::Error,
InitError = V1::InitError>,
F: IntoServiceFactory<$name>,
F: IntoServiceFactory<$name, $r_name>,
{
$fac2_type {
A: self.A,
V1: self.V1,
$($T: self.$T,)+
$name: factory.into_factory(),
_t: PhantomData
}
}
}
});
macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, $fac_type:ident, $(($n:tt, $T:ident)),+} => {
macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, $fac_type:ident, $(($n:tt, $T:ident, $R:ident)),+} => {
#[allow(non_snake_case)]
pub enum $enum_type<V1, $($T),+> {
V1(V1),
$($T($T),)+
pub enum $enum_type<V1R, $($R),+> {
V1(V1R),
$($T($R),)+
}
#[allow(non_snake_case)]
pub struct $srv_type<V1, $($T),+> {
a: V1,
pub struct $srv_type<V1, $($T,)+ V1R, $($R,)+> {
V1: V1,
$($T: $T,)+
_t: PhantomData<(V1R, $($R),+)>,
}
impl<V1: Clone, $($T: Clone),+> Clone for $srv_type<V1, $($T),+> {
impl<V1: Clone, $($T: Clone,)+ V1R, $($R,)+> Clone for $srv_type<V1, $($T,)+ V1R, $($R,)+> {
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
_t: PhantomData,
V1: self.V1.clone(),
$($T: self.$T.clone(),)+
}
}
}
impl<V1, $($T),+> Service for $srv_type<V1, $($T),+>
impl<V1, $($T,)+ V1R, $($R,)+> Service<$enum_type<V1R, $($R,)+>> for $srv_type<V1, $($T,)+ V1R, $($R,)+>
where
V1: Service,
$($T: Service<Response = V1::Response, Error = V1::Error>),+
V1: Service<V1R>,
$($T: Service<$R, Response = V1::Response, Error = V1::Error>),+
{
type Request = $enum_type<V1::Request, $($T::Request),+>;
type Response = V1::Response;
type Error = V1::Error;
type Future = $mod_name::ServiceResponse<V1::Future, $($T::Future),+>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut ready = self.a.poll_ready(cx)?.is_ready();
let mut ready = self.V1.poll_ready(cx)?.is_ready();
$(ready = self.$T.poll_ready(cx)?.is_ready() && ready;)+
if ready {
@ -125,62 +116,61 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
}
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let mut ready = self.a.poll_shutdown(cx, is_error).is_ready();
$(ready = self.$T.poll_shutdown(cx, is_error).is_ready() && ready;)+
let mut ready = self.V1.poll_shutdown(cx, is_error).is_ready();
$(ready = self.$T.poll_shutdown(cx, is_error).is_ready() && ready;)+
if ready {
Poll::Ready(())
} else {
Poll::Pending
if ready {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
#[inline]
fn call(&self, req: Self::Request) -> Self::Future {
fn call(&self, req: $enum_type<V1R, $($R,)+>) -> Self::Future {
match req {
$enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: self.a.call(req) },
$enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: self.V1.call(req) },
$($enum_type::$T(req) => $mod_name::ServiceResponse::$T { fut: self.$T.call(req) },)+
}
}
}
#[allow(non_snake_case)]
pub struct $fac_type<V1, $($T),+> {
A: V1,
pub struct $fac_type<V1, $($T,)+ V1R, $($R,)+> {
V1: V1,
$($T: $T,)+
_t: PhantomData<(V1R, $($R,)+)>,
}
impl<V1: Clone, $($T: Clone),+> Clone for $fac_type<V1, $($T),+> {
impl<V1: Clone, $($T: Clone,)+ V1R, $($R,)+> Clone for $fac_type<V1, $($T,)+ V1R, $($R,)+> {
fn clone(&self) -> Self {
Self {
A: self.A.clone(),
_t: PhantomData,
V1: self.V1.clone(),
$($T: self.$T.clone(),)+
}
}
}
impl<V1, $($T),+> ServiceFactory for $fac_type<V1, $($T),+>
impl<V1, $($T,)+ V1R, $($R,)+> ServiceFactory<$enum_type<V1R, $($R),+>> for $fac_type<V1, $($T,)+ V1R, $($R,)+>
where
V1: ServiceFactory,
V1: ServiceFactory<V1R>,
V1::Config: Clone,
$($T: ServiceFactory<Config = V1::Config, Response = V1::Response, Error = V1::Error, InitError = V1::InitError>),+
$($T: ServiceFactory<$R, Config = V1::Config, Response = V1::Response, Error = V1::Error, InitError = V1::InitError>),+
{
type Request = $enum_type<V1::Request, $($T::Request),+>;
type Response = V1::Response;
type Error = V1::Error;
type Config = V1::Config;
type InitError = V1::InitError;
type Service = $srv_type<V1::Service, $($T::Service,)+>;
type Future = $mod_name::ServiceFactoryResponse<V1, $($T,)+>;
type Service = $srv_type<V1::Service, $($T::Service,)+ V1R, $($R,)+>;
type Future = $mod_name::ServiceFactoryResponse<V1, $($T,)+ V1R, $($R,)+>;
fn new_service(&self, cfg: Self::Config) -> Self::Future {
$mod_name::ServiceFactoryResponse {
a: None,
V1: None,
items: Default::default(),
$($T: self.$T.new_service(cfg.clone()),)+
a_fut: self.A.new_service(cfg),
V1_fut: self.V1.new_service(cfg),
}
}
}
@ -192,18 +182,18 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
pin_project_lite::pin_project! {
#[project = ServiceResponseProject]
pub enum ServiceResponse<A: Future, $($T: Future),+> {
V1{ #[pin] fut: A },
pub enum ServiceResponse<V1: Future, $($T: Future),+> {
V1{ #[pin] fut: V1 },
$($T{ #[pin] fut: $T },)+
}
}
impl<A, $($T),+> Future for ServiceResponse<A, $($T),+>
impl<V1, $($T),+> Future for ServiceResponse<V1, $($T),+>
where
A: Future,
$($T: Future<Output = A::Output>),+
V1: Future,
$($T: Future<Output = V1::Output>),+
{
type Output = A::Output;
type Output = V1::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
@ -216,29 +206,29 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct ServiceFactoryResponse<A: ServiceFactory, $($T: ServiceFactory),+> {
pub(super) a: Option<A::Service>,
pub struct ServiceFactoryResponse<V1: ServiceFactory<V1R>, $($T: ServiceFactory<$R>,)+ V1R, $($R,)+> {
pub(super) V1: Option<V1::Service>,
pub(super) items: ($(Option<$T::Service>,)+),
#[pin] pub(super) a_fut: A::Future,
#[pin] pub(super) V1_fut: V1::Future,
$(#[pin] pub(super) $T: $T::Future),+
}
}
impl<A, $($T),+> Future for ServiceFactoryResponse<A, $($T),+>
impl<V1, $($T,)+ V1R, $($R,)+> Future for ServiceFactoryResponse<V1, $($T,)+ V1R, $($R,)+>
where
A: ServiceFactory,
$($T: ServiceFactory<Response = A::Response, Error = A::Error, InitError = A::InitError,>),+
V1: ServiceFactory<V1R>,
$($T: ServiceFactory<$R, Response = V1::Response, Error = V1::Error, InitError = V1::InitError,>),+
{
type Output = Result<$srv_type<A::Service, $($T::Service),+>, A::InitError>;
type Output = Result<$srv_type<V1::Service, $($T::Service,)+ V1R, $($R),+>, V1::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut ready = true;
if this.a.is_none() {
match this.a_fut.poll(cx) {
if this.V1.is_none() {
match this.V1_fut.poll(cx) {
Poll::Ready(Ok(item)) => {
*this.a = Some(item);
*this.V1 = Some(item);
}
Poll::Pending => ready = false,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
@ -259,8 +249,9 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
if ready {
Poll::Ready(Ok($srv_type {
a: this.a.take().unwrap(),
V1: this.V1.take().unwrap(),
$($T: this.items.$n.take().unwrap(),)+
_t: PhantomData
}))
} else {
Poll::Pending
@ -272,33 +263,32 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
});
#[rustfmt::skip]
variant_impl!(v2, Variant2, VariantService2, VariantFactory2, (0, V2));
variant_impl!(v2, Variant2, VariantService2, VariantFactory2, (0, V2, V2R));
#[rustfmt::skip]
variant_impl!(v3, Variant3, VariantService3, VariantFactory3, (0, V2), (1, V3));
variant_impl!(v3, Variant3, VariantService3, VariantFactory3, (0, V2, V2R), (1, V3, V3R));
#[rustfmt::skip]
variant_impl!(v4, Variant4, VariantService4, VariantFactory4, (0, V2), (1, V3), (2, V4));
variant_impl!(v4, Variant4, VariantService4, VariantFactory4, (0, V2, V2R), (1, V3, V3R), (2, V4, V4R));
#[rustfmt::skip]
variant_impl!(v5, Variant5, VariantService5, VariantFactory5, (0, V2), (1, V3), (2, V4), (3, V5));
variant_impl!(v5, Variant5, VariantService5, VariantFactory5, (0, V2, V2R), (1, V3, V3R), (2, V4, V4R), (3, V5, V5R));
#[rustfmt::skip]
variant_impl!(v6, Variant6, VariantService6, VariantFactory6, (0, V2), (1, V3), (2, V4), (3, V5), (4, V6));
variant_impl!(v6, Variant6, VariantService6, VariantFactory6, (0, V2, V2R), (1, V3, V3R), (2, V4, V4R), (3, V5, V5R), (4, V6, V6R));
#[rustfmt::skip]
variant_impl!(v7, Variant7, VariantService7, VariantFactory7, (0, V2), (1, V3), (2, V4), (3, V5), (4, V6), (5, V7));
variant_impl!(v7, Variant7, VariantService7, VariantFactory7, (0, V2, V2R), (1, V3, V3R), (2, V4, V4R), (3, V5, V5R), (4, V6, V6R), (5, V7, V7R));
#[rustfmt::skip]
variant_impl!(v8, Variant8, VariantService8, VariantFactory8, (0, V2), (1, V3), (2, V4), (3, V5), (4, V6), (5, V7), (6, V8));
variant_impl!(v8, Variant8, VariantService8, VariantFactory8, (0, V2, V2R), (1, V3, V3R), (2, V4, V4R), (3, V5, V5R), (4, V6, V6R), (5, V7, V7R), (6, V8, V8R));
variant_impl_and!(VariantFactory2, VariantFactory3, V3, v3, (V2));
variant_impl_and!(VariantFactory3, VariantFactory4, V4, v4, (V2, V3));
variant_impl_and!(VariantFactory4, VariantFactory5, V5, v5, (V2, V3, V4));
variant_impl_and!(VariantFactory5, VariantFactory6, V6, v6, (V2, V3, V4, V5));
variant_impl_and!(
VariantFactory6,
VariantFactory7,
V7,
v7,
(V2, V3, V4, V5, V6)
);
#[rustfmt::skip]
variant_impl_and!(VariantFactory7, VariantFactory8, V8, v8, (V2, V3, V4, V5, V6, V7));
variant_impl_and!(VariantFactory2, VariantFactory3, V3, V3R, v3, (V2), (V2R));
#[rustfmt::skip]
variant_impl_and!(VariantFactory3, VariantFactory4, V4, V4R, v4, (V2, V3), (V2R, V3R));
#[rustfmt::skip]
variant_impl_and!(VariantFactory4, VariantFactory5, V5, V5R, v5, (V2, V3, V4), (V2R, V3R, V4R));
#[rustfmt::skip]
variant_impl_and!(VariantFactory5, VariantFactory6, V6, V6R, v6, (V2, V3, V4, V5), (V2R, V3R, V4R, V5R));
#[rustfmt::skip]
variant_impl_and!(VariantFactory6, VariantFactory7, V7, V7R, v7, (V2, V3, V4, V5, V6), (V2R, V3R, V4R, V5R, V6R));
#[rustfmt::skip]
variant_impl_and!(VariantFactory7, VariantFactory8, V8, V8R, v8, (V2, V3, V4, V5, V6, V7), (V2R, V3R, V4R, V5R, V6R, V7R));
#[cfg(test)]
mod tests {