Use async fn in trait for Service definition

This commit is contained in:
Nikolay Kim 2024-01-07 03:42:45 +06:00
parent e3971e2d59
commit a9d5845005
18 changed files with 278 additions and 1039 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [2.0.0] - 2024-01-xx
* Use "async fn" in trait for Service definition
## [1.2.7] - 2023-09-19
* Use From<T::Error> for apply_fn util

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "1.2.7"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]

View file

@ -1,6 +1,6 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use std::{task::Context, task::Poll};
use super::{Service, ServiceCall, ServiceCtx, ServiceFactory};
use super::{Service, ServiceCtx, ServiceFactory};
#[derive(Clone, Debug)]
/// Service for the `and_then` combinator, chaining a computation onto the end
@ -26,7 +26,6 @@ where
{
type Response = B::Response;
type Error = A::Error;
type Future<'f> = AndThenServiceResponse<'f, A, B, Req> where Self: 'f, Req: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.svc1.poll_ready(cx)?.is_ready();
@ -47,73 +46,13 @@ where
}
#[inline]
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
AndThenServiceResponse {
slf: self,
state: State::A {
fut: ctx.call(&self.svc1, req),
ctx: Some(ctx),
},
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct AndThenServiceResponse<'f, A, B, Req>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
slf: &'f AndThen<A, B>,
#[pin]
state: State<'f, A, B, Req>,
}
}
pin_project_lite::pin_project! {
#[project = StateProject]
enum State<'f, A, B, Req>
where
A: Service<Req>,
A: 'f,
Req: 'f,
B: Service<A::Response, Error = A::Error>,
B: 'f,
{
A { #[pin] fut: ServiceCall<'f, A, Req>, ctx: Option<ServiceCtx<'f, AndThen<A, B>>> },
B { #[pin] fut: ServiceCall<'f, B, A::Response> },
Empty,
}
}
impl<'f, A, B, Req> Future for AndThenServiceResponse<'f, A, B, Req>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
type Output = Result<B::Response, A::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateProject::A { fut, ctx } => match fut.poll(cx)? {
Poll::Ready(res) => {
let fut = ctx.take().unwrap().call(&this.slf.svc2, res);
this.state.set(State::B { fut });
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateProject::B { fut } => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
StateProject::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
async fn call(
&self,
req: Req,
ctx: ServiceCtx<'_, Self>,
) -> Result<B::Response, A::Error> {
let res = ctx.call(&self.svc1, req).await?;
ctx.call(&self.svc2, res).await
}
}
@ -142,67 +81,13 @@ where
type Service = AndThen<A::Service, B::Service>;
type InitError = A::InitError;
type Future<'f> = AndThenFactoryResponse<'f, A, B, Req, Cfg> where Self: 'f, Cfg: 'f;
#[inline]
fn create(&self, cfg: Cfg) -> Self::Future<'_> {
AndThenFactoryResponse {
fut1: self.svc1.create(cfg.clone()),
fut2: self.svc2.create(cfg),
svc1: None,
svc2: None,
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct AndThenFactoryResponse<'f, A, B, Req, Cfg>
where
A: ServiceFactory<Req, Cfg>,
A: 'f,
B: ServiceFactory<A::Response, Cfg>,
B: 'f,
Cfg: 'f
{
#[pin]
fut1: A::Future<'f>,
#[pin]
fut2: B::Future<'f>,
svc1: Option<A::Service>,
svc2: Option<B::Service>,
}
}
impl<'f, A, B, Req, Cfg> Future for AndThenFactoryResponse<'f, A, B, Req, Cfg>
where
A: ServiceFactory<Req, Cfg>,
B: ServiceFactory<A::Response, Cfg, Error = A::Error, InitError = A::InitError>,
{
type Output = Result<AndThen<A::Service, B::Service>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.svc1.is_none() {
if let Poll::Ready(service) = this.fut1.poll(cx)? {
*this.svc1 = Some(service);
}
}
if this.svc2.is_none() {
if let Poll::Ready(service) = this.fut2.poll(cx)? {
*this.svc2 = Some(service);
}
}
if this.svc1.is_some() && this.svc2.is_some() {
Poll::Ready(Ok(AndThen::new(
this.svc1.take().unwrap(),
this.svc2.take().unwrap(),
)))
} else {
Poll::Pending
}
async fn create(&self, cfg: Cfg) -> Result<Self::Service, Self::InitError> {
Ok(AndThen {
svc1: self.svc1.create(cfg.clone()).await?,
svc2: self.svc2.create(cfg).await?,
})
}
}
@ -219,19 +104,18 @@ mod tests {
impl Service<&'static str> for Srv1 {
type Response = &'static str;
type Error = ();
type Future<'f> = Ready<Self::Response, ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call<'a>(
async fn call<'a>(
&'a self,
req: &'static str,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
Ready::Ok(req)
) -> Result<Self::Response, ()> {
Ok(req)
}
}
@ -241,19 +125,18 @@ mod tests {
impl Service<&'static str> for Srv2 {
type Response = (&'static str, &'static str);
type Error = ();
type Future<'f> = Ready<Self::Response, ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call<'a>(
async fn call<'a>(
&'a self,
req: &'static str,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
Ready::Ok((req, "srv2"))
) -> Result<Self::Response, ()> {
Ok((req, "srv2"))
}
}

View file

@ -1,5 +1,5 @@
#![allow(clippy::type_complexity)]
use std::{fmt, future::Future, marker, pin::Pin, task, task::Poll};
use std::{fmt, future::Future, marker};
use super::ctx::ServiceCtx;
use super::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory};
@ -97,14 +97,17 @@ where
{
type Response = Out;
type Error = Err;
type Future<'f> = R where Self: 'f, In: 'f, R: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
#[inline]
fn call<'a>(&'a self, req: In, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
(self.f)(req, self.service.clone())
async fn call(
&self,
req: In,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
(self.f)(req, self.service.clone()).await
}
}
@ -183,58 +186,14 @@ where
type Service = Apply<T::Service, Req, F, R, In, Out, Err>;
type InitError = T::InitError;
type Future<'f> = ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> where Self: 'f, Cfg: 'f;
#[inline]
fn create(&self, cfg: Cfg) -> Self::Future<'_> {
ApplyFactoryResponse {
fut: self.service.create(cfg),
f: Some(self.f.clone()),
_t: marker::PhantomData,
}
}
}
pin_project_lite::pin_project! {
pub struct ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg>,
T: 'f,
F: Fn(In, Pipeline<T::Service>) -> R,
T::Service: 'f,
R: Future<Output = Result<Out, Err>>,
Cfg: 'f,
Err: From<T::Error>,
{
#[pin]
fut: T::Future<'f>,
f: Option<F>,
_t: marker::PhantomData<(In, Out)>,
}
}
impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future
for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg>,
F: Fn(In, Pipeline<T::Service>) -> R,
R: Future<Output = Result<Out, Err>>,
Err: From<T::Error>,
{
type Output = Result<Apply<T::Service, Req, F, R, In, Out, Err>, T::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(Apply {
service: svc.into(),
f: this.f.take().unwrap(),
r: marker::PhantomData,
}))
} else {
Poll::Pending
}
async fn create(&self, cfg: Cfg) -> Result<Self::Service, Self::InitError> {
self.service.create(cfg).await.map(|svc| Apply {
service: svc.into(),
f: self.f.clone(),
r: marker::PhantomData,
})
}
}
@ -252,10 +211,9 @@ mod tests {
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future<'f> = Ready<(), ()>;
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Ready::Ok(())
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
Ok(())
}
}

View file

@ -87,7 +87,11 @@ where
idx: usize,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, Self::Response, Self::Error> {
Box::pin(ServiceCtx::<'a, S>::from_ref(idx, waiters).call_nowait(self, req))
Box::pin(async move {
ServiceCtx::<'a, S>::from_ref(idx, waiters)
.call_nowait(self, req)
.await
})
}
}
@ -134,7 +138,6 @@ where
{
type Response = Res;
type Error = Err;
type Future<'f> = BoxFuture<'f, Res, Err> where Self: 'f, Req: 'f;
#[inline]
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -147,9 +150,9 @@ where
}
#[inline]
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
async fn call(&self, req: Req, ctx: ServiceCtx<'_, Self>) -> Result<Res, Err> {
let (idx, waiters) = ctx.inner();
self.0.call(req, idx, waiters)
self.0.call(req, idx, waiters).await
}
}
@ -163,10 +166,9 @@ where
type Service = BoxService<Req, Res, Err>;
type InitError = InitErr;
type Future<'f> = BoxFuture<'f, Self::Service, InitErr> where Self: 'f, C: 'f;
#[inline]
fn create(&self, cfg: C) -> Self::Future<'_> {
self.0.create(cfg)
async fn create(&self, cfg: C) -> Result<Self::Service, Self::InitError> {
self.0.create(cfg).await
}
}

View file

@ -3,12 +3,11 @@ use std::{fmt, future::Future, marker::PhantomData};
use crate::and_then::{AndThen, AndThenFactory};
use crate::apply::{Apply, ApplyFactory};
use crate::ctx::{ServiceCall, ServiceCtx};
use crate::ctx::ServiceCtx;
use crate::map::{Map, MapFactory};
use crate::map_err::{MapErr, MapErrFactory};
use crate::map_init_err::MapInitErr;
use crate::middleware::{ApplyMiddleware, Middleware};
use crate::pipeline::CreatePipeline;
use crate::then::{Then, ThenFactory};
use crate::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory};
@ -171,14 +170,17 @@ where
impl<Svc: Service<Req>, Req> Service<Req> for ServiceChain<Svc, Req> {
type Response = Svc::Response;
type Error = Svc::Error;
type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
#[inline]
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ctx.call(&self.service, req)
async fn call(
&self,
req: Req,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
ctx.call(&self.service, req).await
}
}
@ -308,11 +310,11 @@ impl<T: ServiceFactory<Req, C>, Req, C> ServiceChainFactory<T, Req, C> {
}
/// Create and return a new service value asynchronously and wrap into a container
pub fn pipeline(&self, cfg: C) -> CreatePipeline<'_, T, Req, C>
pub async fn pipeline(&self, cfg: C) -> Result<Pipeline<T::Service>, T::InitError>
where
Self: Sized,
{
CreatePipeline::new(self.factory.create(cfg))
Ok(Pipeline::new(self.factory.create(cfg).await?))
}
}
@ -344,10 +346,9 @@ impl<T: ServiceFactory<R, C>, R, C> ServiceFactory<R, C> for ServiceChainFactory
type Error = T::Error;
type Service = T::Service;
type InitError = T::InitError;
type Future<'f> = T::Future<'f> where Self: 'f;
#[inline]
fn create(&self, cfg: C) -> Self::Future<'_> {
self.factory.create(cfg)
async fn create(&self, cfg: C) -> Result<Self::Service, Self::InitError> {
self.factory.create(cfg).await
}
}

View file

@ -1,6 +1,6 @@
use std::{cell::UnsafeCell, fmt, future::Future, marker, pin::Pin, rc::Rc, task};
use std::{cell::UnsafeCell, fmt, future::poll_fn, marker, rc::Rc, task, task::Poll};
use crate::{Pipeline, Service};
use crate::Service;
pub struct ServiceCtx<'a, S: ?Sized> {
idx: usize,
@ -112,27 +112,51 @@ impl<'a, S> ServiceCtx<'a, S> {
(self.idx, self.waiters)
}
/// Returns when the service is able to process requests.
pub async fn ready<T, R>(&self, svc: &'a T) -> Result<(), T::Error>
where
T: Service<R>,
{
// check readiness and notify waiters
poll_fn(move |cx| match svc.poll_ready(cx)? {
Poll::Ready(()) => {
self.waiters.notify();
Poll::Ready(Ok(()))
}
Poll::Pending => {
self.waiters.register(self.idx, cx);
Poll::Pending
}
})
.await
}
#[inline]
/// Wait for service readiness and then call service
pub fn call<T, R>(&self, svc: &'a T, req: R) -> ServiceCall<'a, T, R>
pub async fn call<T, R>(&self, svc: &'a T, req: R) -> Result<T::Response, T::Error>
where
T: Service<R>,
R: 'a,
{
ServiceCall {
state: ServiceCallState::Ready {
svc,
req: Some(req),
self.ready(svc).await?;
svc.call(
req,
ServiceCtx {
idx: self.idx,
waiters: self.waiters,
_t: marker::PhantomData,
},
}
)
.await
}
#[doc(hidden)]
#[inline]
/// Call service, do not check service readiness
pub fn call_nowait<T, R>(&self, svc: &'a T, req: R) -> T::Future<'a>
pub async fn call_nowait<T, R>(
&self,
svc: &'a T,
req: R,
) -> Result<T::Response, T::Error>
where
T: Service<R>,
R: 'a,
@ -145,6 +169,7 @@ impl<'a, S> ServiceCtx<'a, S> {
_t: marker::PhantomData,
},
)
.await
}
}
@ -166,201 +191,12 @@ impl<'a, S> fmt::Debug for ServiceCtx<'a, S> {
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ServiceCall<'a, S, Req>
where
S: Service<Req>,
Req: 'a,
{
#[pin]
state: ServiceCallState<'a, S, Req>,
}
}
pin_project_lite::pin_project! {
#[project = ServiceCallStateProject]
enum ServiceCallState<'a, S, Req>
where
S: Service<Req>,
Req: 'a,
{
Ready { req: Option<Req>,
svc: &'a S,
idx: usize,
waiters: &'a WaitersRef,
},
ReadyPl { req: Option<Req>,
svc: &'a Pipeline<S>,
pl: Pipeline<S>,
},
Call { #[pin] fut: S::Future<'a> },
Empty,
}
}
impl<'a, S, Req> ServiceCall<'a, S, Req>
where
S: Service<Req>,
Req: 'a,
{
pub(crate) fn call_pipeline(req: Req, svc: &'a Pipeline<S>) -> Self {
ServiceCall {
state: ServiceCallState::ReadyPl {
req: Some(req),
pl: svc.clone(),
svc,
},
}
}
pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> {
match self.state {
ServiceCallState::Ready { .. } | ServiceCallState::ReadyPl { .. } => {}
ServiceCallState::Call { .. } | ServiceCallState::Empty => {
panic!(
"`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`"
)
}
}
ServiceCallToCall { state: self.state }
}
}
impl<'a, S, Req> Future for ServiceCall<'a, S, Req>
where
S: Service<Req>,
{
type Output = Result<S::Response, S::Error>;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
ServiceCallStateProject::Ready {
req,
svc,
idx,
waiters,
} => match svc.poll_ready(cx)? {
task::Poll::Ready(()) => {
waiters.notify();
let fut = svc.call(
req.take().unwrap(),
ServiceCtx {
waiters,
idx: *idx,
_t: marker::PhantomData,
},
);
this.state.set(ServiceCallState::Call { fut });
self.poll(cx)
}
task::Poll::Pending => {
waiters.register(*idx, cx);
task::Poll::Pending
}
},
ServiceCallStateProject::ReadyPl { req, svc, pl } => {
task::ready!(pl.poll_ready(cx))?;
let ctx = ServiceCtx::new(&svc.waiters);
let svc_call = svc.get_ref().call(req.take().unwrap(), ctx);
// SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
this.state.set(ServiceCallState::Call { fut });
self.poll(cx)
}
ServiceCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| {
this.state.set(ServiceCallState::Empty);
r
}),
ServiceCallStateProject::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ServiceCallToCall<'a, S, Req>
where
S: Service<Req>,
Req: 'a,
{
#[pin]
state: ServiceCallState<'a, S, Req>,
}
}
impl<'a, S, Req> Future for ServiceCallToCall<'a, S, Req>
where
S: Service<Req>,
{
type Output = Result<S::Future<'a>, S::Error>;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
ServiceCallStateProject::Ready {
req,
svc,
idx,
waiters,
} => match svc.poll_ready(cx)? {
task::Poll::Ready(()) => {
waiters.notify();
let fut = svc.call(
req.take().unwrap(),
ServiceCtx {
waiters,
idx: *idx,
_t: marker::PhantomData,
},
);
this.state.set(ServiceCallState::Empty);
task::Poll::Ready(Ok(fut))
}
task::Poll::Pending => {
waiters.register(*idx, cx);
task::Poll::Pending
}
},
ServiceCallStateProject::ReadyPl { req, svc, pl } => {
task::ready!(pl.poll_ready(cx))?;
let ctx = ServiceCtx::new(&svc.waiters);
task::Poll::Ready(Ok(svc.get_ref().call(req.take().unwrap(), ctx)))
}
ServiceCallStateProject::Call { .. } => {
unreachable!("`ServiceCallToCall` can only be constructed in `Ready` state")
}
ServiceCallStateProject::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, poll_fn, Ready};
use ntex_util::future::lazy;
use ntex_util::{channel::condition, time};
use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll};
use std::task::{Context, Poll};
use std::{cell::Cell, cell::RefCell, future::poll_fn, rc::Rc};
use super::*;
use crate::Pipeline;
@ -370,20 +206,19 @@ mod tests {
impl Service<&'static str> for Srv {
type Response = &'static str;
type Error = ();
type Future<'f> = Ready<Self::Response, ()>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
self.1.poll_ready(cx).map(|_| Ok(()))
}
fn call<'a>(
async fn call<'a>(
&'a self,
req: &'static str,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
) -> Result<Self::Response, Self::Error> {
let _ = ctx.clone();
Ready::Ok(req)
Ok(req)
}
}
@ -451,32 +286,32 @@ mod tests {
assert_eq!(&*data.borrow(), &["srv2", "srv1"]);
}
#[ntex::test]
async fn test_advance_to_call() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
// #[ntex::test]
// async fn test_advance_to_call() {
// let cnt = Rc::new(Cell::new(0));
// let con = condition::Condition::new();
// let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
let mut fut = srv.call("test").advance_to_call();
let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
con.notify();
// let mut fut = srv.call("test").advance_to_call();
// let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// con.notify();
let res = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
assert!(res.is_ready());
}
// let res = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// assert!(res.is_ready());
// }
#[ntex::test]
#[should_panic]
async fn test_advance_to_call_panic() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
// #[ntex::test]
// #[should_panic]
// async fn test_advance_to_call_panic() {
// let cnt = Rc::new(Cell::new(0));
// let con = condition::Condition::new();
// let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
let mut fut = srv.call("test");
let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
con.notify();
// let mut fut = srv.call("test");
// let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// con.notify();
let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
let _f = fut.advance_to_call();
}
// let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// let _f = fut.advance_to_call();
// }
}

View file

@ -1,4 +1,4 @@
use std::{fmt, future::ready, future::Future, future::Ready, marker::PhantomData};
use std::{fmt, future::Future, marker::PhantomData};
use crate::{IntoService, IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
@ -133,11 +133,10 @@ where
{
type Response = Res;
type Error = Err;
type Future<'f> = Fut where Self: 'f, Req: 'f;
#[inline]
fn call<'a>(&'a self, req: Req, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
(self.f)(req)
async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result<Res, Err> {
(self.f)(req).await
}
}
@ -207,11 +206,10 @@ where
{
type Response = Res;
type Error = Err;
type Future<'f> = Fut where Self: 'f;
#[inline]
fn call<'a>(&'a self, req: Req, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
(self.f)(req)
async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result<Res, Err> {
(self.f)(req).await
}
}
@ -226,14 +224,13 @@ where
type Service = FnService<F, Req>;
type InitError = ();
type Future<'f> = Ready<Result<Self::Service, Self::InitError>> where Self: 'f;
#[inline]
fn create(&self, _: Cfg) -> Self::Future<'_> {
ready(Ok(FnService {
async fn create(&self, _: Cfg) -> Result<Self::Service, Self::InitError> {
Ok(FnService {
f: self.f.clone(),
_t: PhantomData,
}))
})
}
}
@ -300,11 +297,10 @@ where
type Service = Srv;
type InitError = Err;
type Future<'f> = Fut where Self: 'f, Fut: 'f;
#[inline]
fn create(&self, cfg: Cfg) -> Self::Future<'_> {
(self.f)(cfg)
async fn create(&self, cfg: Cfg) -> Result<Self::Service, Self::InitError> {
(self.f)(cfg).await
}
}
@ -341,11 +337,10 @@ where
type Error = S::Error;
type Service = S;
type InitError = E;
type Future<'f> = R where Self: 'f, R: 'f;
#[inline]
fn create(&self, _: C) -> Self::Future<'_> {
(self.f)()
async fn create(&self, _: C) -> Result<S, E> {
(self.f)().await
}
}

View file

@ -1,5 +1,4 @@
use std::task::{Context, Poll};
use std::{cell::Cell, fmt, future::ready, future::Ready, marker::PhantomData};
use std::{cell::Cell, fmt, marker::PhantomData, task::Context, task::Poll};
use crate::{Service, ServiceCtx};
@ -55,7 +54,6 @@ where
{
type Response = Req;
type Error = Err;
type Future<'f> = Ready<Result<Req, Err>> where Self: 'f, Req: 'f;
#[inline]
fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> {
@ -66,8 +64,8 @@ where
}
#[inline]
fn call<'a>(&'a self, req: Req, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ready(Ok(req))
async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result<Req, Err> {
Ok(req)
}
}

View file

@ -6,9 +6,7 @@
missing_debug_implementations
)]
use std::future::Future;
use std::rc::Rc;
use std::task::{self, Context, Poll};
use std::{future::Future, rc::Rc, task, task::Context, task::Poll};
mod and_then;
mod apply;
@ -28,7 +26,7 @@ mod then;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::chain::{chain, chain_factory};
pub use self::ctx::{ServiceCall, ServiceCallToCall, ServiceCtx};
pub use self::ctx::ServiceCtx;
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::fn_shutdown::fn_shutdown;
pub use self::map_config::{map_config, unit_config};
@ -62,8 +60,6 @@ pub use self::pipeline::{Pipeline, PipelineCall};
///
/// ```rust
/// # use std::convert::Infallible;
/// # use std::future::Future;
/// # use std::pin::Pin;
/// #
/// # use ntex_service::{Service, ServiceCtx};
///
@ -72,10 +68,9 @@ pub use self::pipeline::{Pipeline, PipelineCall};
/// impl Service<u8> for MyService {
/// type Response = u64;
/// type Error = Infallible;
/// type Future<'f> = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn call<'a>(&'a self, req: u8, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
/// Box::pin(std::future::ready(Ok(req as u64)))
/// async fn call<'a>(&'a self, req: u8, _: ServiceCtx<'a, Self>) -> Result<Self::Response, Self::Error> {
/// Ok(req as u64)
/// }
/// }
/// ```
@ -97,19 +92,17 @@ pub trait Service<Req> {
/// Errors produced by the service when polling readiness or executing call.
type Error;
/// The future response value.
type Future<'f>: Future<Output = Result<Self::Response, Self::Error>>
where
Req: 'f,
Self: 'f;
/// Process the request and return the response asynchronously.
///
/// This function is expected to be callable off-task. As such, implementations of `call`
/// should take care to not call `poll_ready`. Caller of the service verifies readiness,
/// Only way to make a `call` is to use `ctx` argument, it enforces readiness before calling
/// service.
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a>;
fn call(
&self,
req: Req,
ctx: ServiceCtx<'_, Self>,
) -> impl Future<Output = Result<Self::Response, Self::Error>>;
#[inline]
/// Returns `Ready` when the service is able to process requests.
@ -170,15 +163,6 @@ pub trait Service<Req> {
{
chain(dev::MapErr::new(self, f))
}
#[inline]
/// Convert `Self` to a `ServiceChain`
fn chain(self) -> dev::ServiceChain<Self, Req>
where
Self: Sized,
{
chain(self)
}
}
/// Factory for creating `Service`s.
@ -205,21 +189,19 @@ pub trait ServiceFactory<Req, Cfg = ()> {
/// Errors potentially raised while building a service.
type InitError;
/// The future of the `ServiceFactory` instance.
type Future<'f>: Future<Output = Result<Self::Service, Self::InitError>>
where
Cfg: 'f,
Self: 'f;
/// Create and return a new service value asynchronously.
fn create(&self, cfg: Cfg) -> Self::Future<'_>;
fn create(
&self,
cfg: Cfg,
) -> impl Future<Output = Result<Self::Service, Self::InitError>>;
#[allow(async_fn_in_trait)]
/// Create and return a new service value asynchronously and wrap into a container
fn pipeline(&self, cfg: Cfg) -> dev::CreatePipeline<'_, Self, Req, Cfg>
async fn pipeline(&self, cfg: Cfg) -> Result<Pipeline<Self::Service>, Self::InitError>
where
Self: Sized,
{
dev::CreatePipeline::new(self.create(cfg))
Ok(Pipeline::new(self.create(cfg).await?))
}
#[inline]
@ -269,7 +251,6 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future<'f> = S::Future<'f> where 'a: 'f, Req: 'f;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
@ -282,8 +263,12 @@ where
}
#[inline]
fn call<'s>(&'s self, request: Req, ctx: ServiceCtx<'s, Self>) -> S::Future<'s> {
ctx.call_nowait(&**self, request)
async fn call(
&self,
request: Req,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
ctx.call_nowait(&**self, request).await
}
}
@ -293,7 +278,6 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future<'f> = S::Future<'f> where S: 'f, Req: 'f;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
@ -306,8 +290,12 @@ where
}
#[inline]
fn call<'a>(&'a self, request: Req, ctx: ServiceCtx<'a, Self>) -> S::Future<'a> {
ctx.call_nowait(&**self, request)
async fn call(
&self,
request: Req,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
ctx.call_nowait(&**self, request).await
}
}
@ -319,10 +307,9 @@ where
type Error = S::Error;
type Service = S::Service;
type InitError = S::InitError;
type Future<'f> = S::Future<'f> where S: 'f, Cfg: 'f;
fn create(&self, cfg: Cfg) -> S::Future<'_> {
self.as_ref().create(cfg)
async fn create(&self, cfg: Cfg) -> Result<Self::Service, Self::InitError> {
self.as_ref().create(cfg).await
}
}
@ -333,15 +320,6 @@ where
{
/// Convert to a `Service`
fn into_service(self) -> Svc;
#[inline]
/// Convert `Self` to a `ServiceChain`
fn into_chain(self) -> dev::ServiceChain<Svc, Req>
where
Self: Sized,
{
chain(self)
}
}
/// Trait for types that can be converted to a `ServiceFactory`
@ -351,15 +329,6 @@ where
{
/// Convert `Self` to a `ServiceFactory`
fn into_factory(self) -> T;
#[inline]
/// Convert `Self` to a `ServiceChainFactory`
fn chain(self) -> dev::ServiceChainFactory<T, Req, Cfg>
where
Self: Sized,
{
chain_factory(self)
}
}
impl<Svc, Req> IntoService<Svc, Req> for Svc
@ -404,9 +373,5 @@ pub mod dev {
pub use crate::map_err::{MapErr, MapErrFactory};
pub use crate::map_init_err::MapInitErr;
pub use crate::middleware::ApplyMiddleware;
pub use crate::pipeline::CreatePipeline;
pub use crate::then::{Then, ThenFactory};
#[doc(hidden)]
pub type ApplyService<T> = crate::Pipeline<T>;
}

View file

@ -1,6 +1,6 @@
use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use std::{fmt, marker::PhantomData};
use super::{Service, ServiceCall, ServiceCtx, ServiceFactory};
use super::{Service, ServiceCtx, ServiceFactory};
/// Service for the `map` combinator, changing the type of a service's response.
///
@ -60,51 +60,17 @@ where
{
type Response = Res;
type Error = A::Error;
type Future<'f> = MapFuture<'f, A, F, Req, Res> where Self: 'f, Req: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
#[inline]
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
MapFuture {
fut: ctx.call(&self.service, req),
slf: self,
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapFuture<'f, A, F, Req, Res>
where
A: Service<Req>,
A: 'f,
Req: 'f,
F: Fn(A::Response) -> Res,
{
slf: &'f Map<A, F, Req, Res>,
#[pin]
fut: ServiceCall<'f, A, Req>,
}
}
impl<'f, A, F, Req, Res> Future for MapFuture<'f, A, F, Req, Res>
where
A: Service<Req> + 'f,
Req: 'f,
F: Fn(A::Response) -> Res,
{
type Output = Result<Res, A::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
match this.fut.poll(cx) {
Poll::Ready(Ok(resp)) => Poll::Ready(Ok((self.project().slf.f)(resp))),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
async fn call(
&self,
req: Req,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
ctx.call(&self.service, req).await.map(|r| (self.f)(r))
}
}
@ -167,55 +133,22 @@ where
type Service = Map<A::Service, F, Req, Res>;
type InitError = A::InitError;
type Future<'f> = MapFactoryFuture<'f, A, F, Req, Res, Cfg> where Self: 'f, Cfg: 'f;
#[inline]
fn create(&self, cfg: Cfg) -> Self::Future<'_> {
MapFactoryFuture {
fut: self.a.create(cfg),
f: Some(self.f.clone()),
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapFactoryFuture<'f, A, F, Req, Res, Cfg>
where
A: ServiceFactory<Req, Cfg>,
A: 'f,
F: Fn(A::Response) -> Res,
Cfg: 'f,
{
#[pin]
fut: A::Future<'f>,
f: Option<F>,
}
}
impl<'f, A, F, Req, Res, Cfg> Future for MapFactoryFuture<'f, A, F, Req, Res, Cfg>
where
A: ServiceFactory<Req, Cfg>,
F: Fn(A::Response) -> Res,
{
type Output = Result<Map<A::Service, F, Req, Res>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap())))
} else {
Poll::Pending
}
async fn create(&self, cfg: Cfg) -> Result<Self::Service, Self::InitError> {
Ok(Map {
service: self.a.create(cfg).await?,
f: self.f.clone(),
_t: PhantomData,
})
}
}
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, Ready};
use ntex_util::future::lazy;
use std::task::{Context, Poll};
use super::*;
use crate::{fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory};
#[derive(Debug, Clone)]
@ -224,14 +157,13 @@ mod tests {
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future<'f> = Ready<(), ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Ready::Ok(())
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
Ok(())
}
}

View file

@ -1,4 +1,4 @@
use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use std::{fmt, marker::PhantomData};
use super::{IntoServiceFactory, ServiceFactory};
@ -78,11 +78,9 @@ where
type Service = A::Service;
type InitError = A::InitError;
type Future<'f> = A::Future<'f> where Self: 'f;
fn create(&self, cfg: C) -> Self::Future<'_> {
let cfg = (self.f)(cfg);
self.a.create(cfg)
async fn create(&self, cfg: C) -> Result<Self::Service, Self::InitError> {
self.a.create((self.f)(cfg)).await
}
}
@ -108,37 +106,9 @@ where
type Service = A::Service;
type InitError = A::InitError;
type Future<'f> = UnitConfigFuture<'f, A, R, C> where Self: 'f, C: 'f;
fn create(&self, _: C) -> Self::Future<'_> {
UnitConfigFuture {
fut: self.factory.create(()),
_t: PhantomData,
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct UnitConfigFuture<'f, A, R, C>
where A: ServiceFactory<R>,
A: 'f,
C: 'f,
{
#[pin]
fut: A::Future<'f>,
_t: PhantomData<C>,
}
}
impl<'f, A, R, C> Future for UnitConfigFuture<'f, A, R, C>
where
A: ServiceFactory<R>,
{
type Output = Result<A::Service, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
async fn create(&self, _: C) -> Result<Self::Service, Self::InitError> {
self.factory.create(()).await
}
}
@ -175,28 +145,4 @@ mod tests {
.create(&10)
.await;
}
// #[ntex::test]
// async fn test_map_config_service() {
// let item = Rc::new(Cell::new(10usize));
// let item2 = item.clone();
// let srv = map_config_service(
// fn_factory_with_config(move |next: usize| {
// let item = item2.clone();
// async move {
// item.set(next);
// Ok::<_, ()>(fn_service(|id: usize| Ready::<_, ()>::Ok(id * 2)))
// }
// }),
// fn_service(move |item: usize| Ready::<_, ()>::Ok(item + 1)),
// )
// .clone()
// .create(10)
// .await
// .unwrap();
// assert_eq!(srv.call(10usize).await.unwrap(), 20);
// assert_eq!(item.get(), 11);
// }
}

View file

@ -1,6 +1,6 @@
use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use std::{fmt, marker::PhantomData, task::Context, task::Poll};
use super::{Service, ServiceCall, ServiceCtx, ServiceFactory};
use super::{Service, ServiceCtx, ServiceFactory};
/// Service for the `map_err` combinator, changing the type of a service's
/// error.
@ -61,7 +61,6 @@ where
{
type Response = A::Response;
type Error = E;
type Future<'f> = MapErrFuture<'f, A, R, F, E> where A: 'f, R: 'f, F: 'f, E: 'f;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -69,44 +68,17 @@ where
}
#[inline]
fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
MapErrFuture {
slf: self,
fut: ctx.call(&self.service, req),
}
async fn call(
&self,
req: R,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
ctx.call(&self.service, req).await.map_err(|e| (self.f)(e))
}
crate::forward_poll_shutdown!(service);
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapErrFuture<'f, A, R, F, E>
where
A: Service<R>,
A: 'f,
R: 'f,
F: Fn(A::Error) -> E,
{
slf: &'f MapErr<A, F, E>,
#[pin]
fut: ServiceCall<'f, A, R>,
}
}
impl<'f, A, R, F, E> Future for MapErrFuture<'f, A, R, F, E>
where
A: Service<R> + 'f,
F: Fn(A::Error) -> E,
{
type Output = Result<A::Response, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
this.fut.poll(cx).map_err(|e| (self.project().slf.f)(e))
}
}
/// Factory for the `map_err` combinator, changing the type of a new
/// service's error.
///
@ -173,46 +145,14 @@ where
type Service = MapErr<A::Service, F, E>;
type InitError = A::InitError;
type Future<'f> = MapErrFactoryFuture<'f, A, R, C, F, E> where Self: 'f, C: 'f;
#[inline]
fn create(&self, cfg: C) -> Self::Future<'_> {
MapErrFactoryFuture {
async fn create(&self, cfg: C) -> Result<Self::Service, Self::InitError> {
self.a.create(cfg).await.map(|service| MapErr {
service,
f: self.f.clone(),
fut: self.a.create(cfg),
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapErrFactoryFuture<'f, A, R, C, F, E>
where
A: ServiceFactory<R, C>,
A: 'f,
F: Fn(A::Error) -> E,
C: 'f,
{
f: F,
#[pin]
fut: A::Future<'f>,
}
}
impl<'f, A, R, C, F, E> Future for MapErrFactoryFuture<'f, A, R, C, F, E>
where
A: ServiceFactory<R, C>,
F: Fn(A::Error) -> E + Clone,
{
type Output = Result<MapErr<A::Service, F, E>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(MapErr::new(svc, this.f.clone())))
} else {
Poll::Pending
}
_t: PhantomData,
})
}
}
@ -229,7 +169,6 @@ mod tests {
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future<'f> = Ready<(), ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.0 {
@ -239,8 +178,8 @@ mod tests {
}
}
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Ready::Err(())
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
Err(())
}
}

View file

@ -1,4 +1,4 @@
use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use std::{fmt, marker::PhantomData};
use super::ServiceFactory;
@ -60,42 +60,10 @@ where
type Service = A::Service;
type InitError = E;
type Future<'f> = MapInitErrFuture<'f, A, R, C, F, E> where Self: 'f, C: 'f;
#[inline]
fn create(&self, cfg: C) -> Self::Future<'_> {
MapInitErrFuture {
f: self.f.clone(),
fut: self.a.create(cfg),
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapInitErrFuture<'f, A, R, C, F, E>
where
A: ServiceFactory<R, C>,
A: 'f,
F: Fn(A::InitError) -> E,
C: 'f,
{
f: F,
#[pin]
fut: A::Future<'f>,
}
}
impl<'f, A, R, C, F, E> Future for MapInitErrFuture<'f, A, R, C, F, E>
where
A: ServiceFactory<R, C>,
F: Fn(A::InitError) -> E,
{
type Output = Result<A::Service, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
this.fut.poll(cx).map_err(|e| (self.project().f)(e))
async fn create(&self, cfg: C) -> Result<Self::Service, Self::InitError> {
self.a.create(cfg).await.map_err(|e| (self.f)(e))
}
}

View file

@ -1,4 +1,4 @@
use std::{fmt, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll};
use std::{fmt, marker::PhantomData, rc::Rc};
use crate::{IntoServiceFactory, Service, ServiceFactory};
@ -98,18 +98,18 @@ where
}
/// `Apply` middleware to a service factory.
pub struct ApplyMiddleware<T, S, C>(Rc<(T, S)>, marker::PhantomData<C>);
pub struct ApplyMiddleware<T, S, C>(Rc<(T, S)>, PhantomData<C>);
impl<T, S, C> ApplyMiddleware<T, S, C> {
/// Create new `ApplyMiddleware` service factory instance
pub(crate) fn new(mw: T, svc: S) -> Self {
Self(Rc::new((mw, svc)), marker::PhantomData)
Self(Rc::new((mw, svc)), PhantomData)
}
}
impl<T, S, C> Clone for ApplyMiddleware<T, S, C> {
fn clone(&self) -> Self {
Self(self.0.clone(), marker::PhantomData)
Self(self.0.clone(), PhantomData)
}
}
@ -137,46 +137,10 @@ where
type Service = T::Service;
type InitError = S::InitError;
type Future<'f> = ApplyMiddlewareFuture<'f, T, S, R, C> where Self: 'f, C: 'f;
#[inline]
fn create(&self, cfg: C) -> Self::Future<'_> {
ApplyMiddlewareFuture {
slf: self.0.clone(),
fut: self.0 .1.create(cfg),
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ApplyMiddlewareFuture<'f, T, S, R, C>
where
S: ServiceFactory<R, C>,
S: 'f,
T: Middleware<S::Service>,
C: 'f,
{
slf: Rc<(T, S)>,
#[pin]
fut: S::Future<'f>,
}
}
impl<'f, T, S, R, C> Future for ApplyMiddlewareFuture<'f, T, S, R, C>
where
S: ServiceFactory<R, C>,
T: Middleware<S::Service>,
{
type Output = Result<T::Service, S::InitError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
match this.fut.poll(cx)? {
Poll::Ready(srv) => Poll::Ready(Ok(this.slf.0.create(srv))),
Poll::Pending => Poll::Pending,
}
async fn create(&self, cfg: C) -> Result<Self::Service, Self::InitError> {
Ok(self.0 .0.create(self.0 .1.create(cfg).await?))
}
}
@ -224,43 +188,46 @@ where
#[allow(clippy::redundant_clone)]
mod tests {
use ntex_util::future::{lazy, Ready};
use std::marker;
use std::task::{Context, Poll};
use super::*;
use crate::{fn_service, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory};
use crate::{fn_service, Pipeline, Service, ServiceCtx, ServiceFactory};
#[derive(Debug, Clone)]
struct Tr<R>(marker::PhantomData<R>);
struct Tr<R>(PhantomData<R>);
impl<S, R> Middleware<S> for Tr<R> {
type Service = Srv<S, R>;
fn create(&self, service: S) -> Self::Service {
Srv(service, marker::PhantomData)
Srv(service, PhantomData)
}
}
#[derive(Debug, Clone)]
struct Srv<S, R>(S, marker::PhantomData<R>);
struct Srv<S, R>(S, PhantomData<R>);
impl<S: Service<R>, R> Service<R> for Srv<S, R> {
type Response = S::Response;
type Error = S::Error;
type Future<'f> = ServiceCall<'f, S, R> where Self: 'f, R: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ctx.call(&self.0, req)
async fn call<'a>(
&'a self,
req: R,
ctx: ServiceCtx<'a, Self>,
) -> Result<S::Response, S::Error> {
ctx.call(&self.0, req).await
}
}
#[ntex::test]
async fn middleware() {
let factory = apply(
Rc::new(Tr(marker::PhantomData).clone()),
Rc::new(Tr(PhantomData).clone()),
fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2)),
)
.clone();
@ -279,7 +246,7 @@ mod tests {
let factory =
crate::chain_factory(fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2)))
.apply(Rc::new(Tr(marker::PhantomData).clone()))
.apply(Rc::new(Tr(PhantomData).clone()))
.clone();
let srv = Pipeline::new(factory.create(&()).await.unwrap().clone());

View file

@ -1,6 +1,7 @@
use std::{cell::Cell, future, pin::Pin, rc::Rc, task, task::Context, task::Poll};
use std::future::{poll_fn, Future};
use std::{cell::Cell, pin::Pin, rc::Rc, task, task::Context, task::Poll};
use crate::{ctx::ServiceCall, ctx::Waiters, Service, ServiceCtx, ServiceFactory};
use crate::{ctx::Waiters, Service, ServiceCtx};
#[derive(Debug)]
/// Container for a service.
@ -29,6 +30,15 @@ impl<S> Pipeline<S> {
self.svc.as_ref()
}
#[inline]
/// Returns when the service is able to process requests.
pub async fn ready<R>(&self) -> Result<(), S::Error>
where
S: Service<R>,
{
poll_fn(move |cx| self.poll_ready(cx)).await
}
#[inline]
/// Returns `Ready` when the service is able to process requests.
pub fn poll_ready<R>(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>
@ -55,26 +65,21 @@ impl<S> Pipeline<S> {
self.svc.poll_shutdown(cx)
}
#[deprecated(since = "1.2.3", note = "Use Pipeline::call() instead")]
#[doc(hidden)]
#[inline]
/// Wait for service readiness and then create future object
/// that resolves to service result.
pub fn service_call<R>(&self, req: R) -> ServiceCall<'_, S, R>
pub async fn call<R>(&self, req: R) -> Result<S::Response, S::Error>
where
S: Service<R>,
{
ServiceCall::call_pipeline(req, self)
}
// check service readiness
self.ready().await?;
#[inline]
/// Wait for service readiness and then create future object
/// that resolves to service result.
pub fn call<R>(&self, req: R) -> ServiceCall<'_, S, R>
where
S: Service<R>,
{
ServiceCall::call_pipeline(req, self)
// call service
self.svc
.as_ref()
.call(req, ServiceCtx::new(&self.waiters))
.await
}
#[inline]
@ -130,6 +135,8 @@ impl<S> Clone for Pipeline<S> {
}
}
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct PipelineCall<S, R>
@ -153,7 +160,7 @@ pin_project_lite::pin_project! {
Req: 'static,
{
Ready { req: Option<Req> },
Call { #[pin] fut: S::Future<'static> },
Call { #[pin] fut: BoxFuture<'static, Result<S::Response, S::Error>> },
Empty,
}
}
@ -163,9 +170,10 @@ where
S: Service<R> + 'static,
R: 'static,
{
fn new_call(pl: &Pipeline<S>, req: R) -> Self {
fn new_call<'a>(pl: &'a Pipeline<S>, req: R) -> Self {
let ctx = ServiceCtx::new(&pl.waiters);
let svc_call = pl.get_ref().call(req, ctx);
let svc_call: BoxFuture<'a, Result<S::Response, S::Error>> =
Box::pin(pl.get_ref().call(req, ctx));
// SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
@ -176,7 +184,7 @@ where
}
}
impl<S, R> future::Future for PipelineCall<S, R>
impl<S, R> Future for PipelineCall<S, R>
where
S: Service<R>,
{
@ -204,39 +212,3 @@ where
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct CreatePipeline<'f, F, R, C>
where F: ServiceFactory<R, C>,
F: ?Sized,
F: 'f,
C: 'f,
{
#[pin]
fut: F::Future<'f>,
}
}
impl<'f, F, R, C> CreatePipeline<'f, F, R, C>
where
F: ServiceFactory<R, C> + 'f,
{
pub(crate) fn new(fut: F::Future<'f>) -> Self {
Self { fut }
}
}
impl<'f, F, R, C> future::Future for CreatePipeline<'f, F, R, C>
where
F: ServiceFactory<R, C> + 'f,
{
type Output = Result<Pipeline<F::Service>, F::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(Pipeline::new(std::task::ready!(self
.project()
.fut
.poll(cx))?)))
}
}

View file

@ -1,6 +1,6 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use std::{task::Context, task::Poll};
use super::{Service, ServiceCall, ServiceCtx, ServiceFactory};
use super::{Service, ServiceCtx, ServiceFactory};
#[derive(Debug, Clone)]
/// Service for the `then` combinator, chaining a computation onto the end of
@ -26,7 +26,6 @@ where
{
type Response = B::Response;
type Error = B::Error;
type Future<'f> = ThenServiceResponse<'f, A, B, R> where Self: 'f, R: 'f;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -48,74 +47,12 @@ where
}
#[inline]
fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ThenServiceResponse {
slf: self,
state: State::A {
fut: ctx.call(&self.svc1, req),
ctx,
},
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ThenServiceResponse<'f, A, B, R>
where
A: Service<R>,
B: Service<Result<A::Response, A::Error>>,
{
slf: &'f Then<A, B>,
#[pin]
state: State<'f, A, B, R>,
}
}
pin_project_lite::pin_project! {
#[project = StateProject]
enum State<'f, A, B, R>
where
A: Service<R>,
A: 'f,
A::Response: 'f,
B: Service<Result<A::Response, A::Error>>,
B: 'f,
R: 'f,
{
A { #[pin] fut: ServiceCall<'f, A, R>, ctx: ServiceCtx<'f, Then<A, B>> },
B { #[pin] fut: ServiceCall<'f, B, Result<A::Response, A::Error>> },
Empty,
}
}
impl<'a, A, B, R> Future for ThenServiceResponse<'a, A, B, R>
where
A: Service<R>,
B: Service<Result<A::Response, A::Error>>,
{
type Output = Result<B::Response, B::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateProject::A { fut, ctx } => match fut.poll(cx) {
Poll::Ready(res) => {
let fut = ctx.call(&this.slf.svc2, res);
this.state.set(State::B { fut });
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateProject::B { fut } => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
StateProject::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
async fn call(
&self,
req: R,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
ctx.call(&self.svc2, ctx.call(&self.svc1, req).await).await
}
}
@ -149,73 +86,12 @@ where
type Service = Then<A::Service, B::Service>;
type InitError = A::InitError;
type Future<'f> = ThenFactoryResponse<'f, A, B, R, C> where Self: 'f, C: 'f;
fn create(&self, cfg: C) -> Self::Future<'_> {
ThenFactoryResponse {
fut_a: self.svc1.create(cfg.clone()),
fut_b: self.svc2.create(cfg),
a: None,
b: None,
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ThenFactoryResponse<'f, A, B, R, C>
where
A: ServiceFactory<R, C>,
B: ServiceFactory<Result<A::Response, A::Error>, C,
Error = A::Error,
InitError = A::InitError,
>,
A: 'f,
B: 'f,
C: 'f,
{
#[pin]
fut_b: B::Future<'f>,
#[pin]
fut_a: A::Future<'f>,
a: Option<A::Service>,
b: Option<B::Service>,
}
}
impl<'f, A, B, R, C> Future for ThenFactoryResponse<'f, A, B, R, C>
where
A: ServiceFactory<R, C>,
B: ServiceFactory<
Result<A::Response, A::Error>,
C,
Error = A::Error,
InitError = A::InitError,
>,
{
type Output = Result<Then<A::Service, B::Service>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.a.is_none() {
if let Poll::Ready(service) = this.fut_a.poll(cx)? {
*this.a = Some(service);
}
}
if this.b.is_none() {
if let Poll::Ready(service) = this.fut_b.poll(cx)? {
*this.b = Some(service);
}
}
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(Then::new(
this.a.take().unwrap(),
this.b.take().unwrap(),
)))
} else {
Poll::Pending
}
async fn create(&self, cfg: C) -> Result<Self::Service, Self::InitError> {
Ok(Then {
svc1: self.svc1.create(cfg.clone()).await?,
svc2: self.svc2.create(cfg).await?,
})
}
}
@ -232,21 +108,20 @@ mod tests {
impl Service<Result<&'static str, &'static str>> for Srv1 {
type Response = &'static str;
type Error = ();
type Future<'f> = Ready<Self::Response, Self::Error>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call<'a>(
async fn call<'a>(
&'a self,
req: Result<&'static str, &'static str>,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
) -> Result<&'static str, ()> {
match req {
Ok(msg) => Ready::Ok(msg),
Err(_) => Ready::Err(()),
Ok(msg) => Ok(msg),
Err(_) => Err(()),
}
}
}
@ -257,21 +132,20 @@ mod tests {
impl Service<Result<&'static str, ()>> for Srv2 {
type Response = (&'static str, &'static str);
type Error = ();
type Future<'f> = Ready<Self::Response, ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call<'a>(
async fn call<'a>(
&'a self,
req: Result<&'static str, ()>,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
) -> Result<Self::Response, ()> {
match req {
Ok(msg) => Ready::Ok((msg, "ok")),
Err(()) => Ready::Ok(("srv2", "err")),
Ok(msg) => Ok((msg, "ok")),
Err(()) => Ok(("srv2", "err")),
}
}
}