Service container (#208)

* Enforce service readiness during call
* Introduce service sharable readiness
This commit is contained in:
Nikolay Kim 2023-06-14 19:09:47 +06:00
parent a35535da32
commit 0e1f0b5f73
17 changed files with 674 additions and 155 deletions

View file

@ -26,7 +26,7 @@ ntex-io = { path = "ntex-io" }
ntex-http = { path = "ntex-http" } ntex-http = { path = "ntex-http" }
ntex-router = { path = "ntex-router" } ntex-router = { path = "ntex-router" }
ntex-rt = { path = "ntex-rt" } ntex-rt = { path = "ntex-rt" }
ntex-service = { path = "ntex-service" } #ntex-service = { path = "ntex-service" }
ntex-tls = { path = "ntex-tls" } ntex-tls = { path = "ntex-tls" }
ntex-macros = { path = "ntex-macros" } ntex-macros = { path = "ntex-macros" }
ntex-util = { path = "ntex-util" } ntex-util = { path = "ntex-util" }

View file

@ -1,5 +1,11 @@
# Changes # Changes
## [1.2.0] - 2023-06-xx
* Enforce service readiness during call
* Introduce service sharable readiness
## [1.0.2] - 2023-04-14 ## [1.0.2] - 2023-04-14
* Remove Rc<S> where S: Service as it brakes readiness check validity * Remove Rc<S> where S: Service as it brakes readiness check validity

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-service" name = "ntex-service"
version = "1.0.2" version = "1.2.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service" description = "ntex service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -17,6 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
pin-project-lite = "0.2.6" pin-project-lite = "0.2.6"
slab = "0.4"
[dev-dependencies] [dev-dependencies]
ntex = { version = "0.6.0", features = ["tokio"] } ntex = { version = "0.6.0", features = ["tokio"] }

View file

@ -1,6 +1,6 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll}; use std::{future::Future, pin::Pin, task::Context, task::Poll};
use super::{Service, ServiceFactory}; use super::{Ctx, Service, ServiceCall, ServiceFactory};
/// Service for the `and_then` combinator, chaining a computation onto the end /// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully. /// of another service which completes successfully.
@ -59,17 +59,22 @@ where
} }
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>
where
Req: 'a,
{
AndThenServiceResponse { AndThenServiceResponse {
slf: self, slf: self,
state: State::A { state: State::A {
fut: self.svc1.call(req), fut: ctx.clone().call(&self.svc1, req),
ctx: Some(ctx),
}, },
} }
} }
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct AndThenServiceResponse<'f, A, B, Req> pub struct AndThenServiceResponse<'f, A, B, Req>
where where
A: Service<Req>, A: Service<Req>,
@ -91,8 +96,8 @@ pin_project_lite::pin_project! {
B: Service<A::Response, Error = A::Error>, B: Service<A::Response, Error = A::Error>,
B: 'f, B: 'f,
{ {
A { #[pin] fut: A::Future<'f> }, A { #[pin] fut: ServiceCall<'f, A, Req>, ctx: Option<Ctx<'f, AndThen<A, B>>> },
B { #[pin] fut: B::Future<'f> }, B { #[pin] fut: ServiceCall<'f, B, A::Response> },
Empty, Empty,
} }
} }
@ -108,9 +113,9 @@ where
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
match this.state.as_mut().project() { match this.state.as_mut().project() {
StateProject::A { fut } => match fut.poll(cx)? { StateProject::A { fut, ctx } => match fut.poll(cx)? {
Poll::Ready(res) => { Poll::Ready(res) => {
let fut = this.slf.svc2.call(res); let fut = ctx.take().unwrap().call(&this.slf.svc2, res);
this.state.set(State::B { fut }); this.state.set(State::B { fut });
self.poll(cx) self.poll(cx)
} }
@ -178,6 +183,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct AndThenFactoryResponse<'f, A, B, Req, Cfg> pub struct AndThenFactoryResponse<'f, A, B, Req, Cfg>
where where
A: ServiceFactory<Req, Cfg>, A: ServiceFactory<Req, Cfg>,
@ -231,7 +237,7 @@ where
mod tests { mod tests {
use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory}; use crate::{fn_factory, pipeline, pipeline_factory, Ctx, Service, ServiceFactory};
use ntex_util::future::{lazy, Ready}; use ntex_util::future::{lazy, Ready};
#[derive(Clone)] #[derive(Clone)]
@ -247,7 +253,10 @@ mod tests {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&self, req: &'static str) -> Self::Future<'_> { fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a>
where
&'static str: 'a,
{
Ready::Ok(req) Ready::Ok(req)
} }
} }
@ -265,7 +274,10 @@ mod tests {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&self, req: &'static str) -> Self::Future<'_> { fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a>
where
&'static str: 'a,
{
Ready::Ok((req, "srv2")) Ready::Ok((req, "srv2"))
} }
} }
@ -286,7 +298,7 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_call() { async fn test_call() {
let cnt = Rc::new(Cell::new(0)); let cnt = Rc::new(Cell::new(0));
let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt)); let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt)).container();
let res = srv.call("srv1").await; let res = srv.call("srv1").await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2")); assert_eq!(res.unwrap(), ("srv1", "srv2"));
@ -302,7 +314,7 @@ mod tests {
.and_then(move || Ready::from(Ok(Srv2(cnt.clone())))) .and_then(move || Ready::from(Ok(Srv2(cnt.clone()))))
.clone(); .clone();
let srv = new_srv.create(&()).await.unwrap(); let srv = new_srv.container(&()).await.unwrap();
let res = srv.call("srv1").await; let res = srv.call("srv1").await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2")); assert_eq!(res.unwrap(), ("srv1", "srv2"));

View file

@ -1,8 +1,7 @@
use std::{ #![allow(clippy::type_complexity)]
future::Future, marker::PhantomData, pin::Pin, rc::Rc, task::Context, task::Poll, use std::{cell::RefCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll};
};
use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use super::{Ctx, IntoService, IntoServiceFactory, Service, ServiceCall, ServiceFactory};
/// Apply transform function to a service. /// Apply transform function to a service.
pub fn apply_fn<T, Req, F, R, In, Out, Err, U>( pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
@ -11,7 +10,7 @@ pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
) -> Apply<T, Req, F, R, In, Out, Err> ) -> Apply<T, Req, F, R, In, Out, Err>
where where
T: Service<Req, Error = Err>, T: Service<Req, Error = Err>,
F: Fn(In, Rc<T>) -> R, F: Fn(In, ApplyService<T>) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
U: IntoService<T, Req>, U: IntoService<T, Req>,
{ {
@ -25,7 +24,7 @@ pub fn apply_fn_factory<T, Req, Cfg, F, R, In, Out, Err, U>(
) -> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err> ) -> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where where
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Rc<T::Service>) -> R + Clone, F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
U: IntoServiceFactory<T, Req, Cfg>, U: IntoServiceFactory<T, Req, Cfg>,
{ {
@ -39,13 +38,13 @@ where
{ {
service: Rc<T>, service: Rc<T>,
f: F, f: F,
r: PhantomData<fn(Req) -> (In, Out, R)>, r: marker::PhantomData<fn(Req) -> (In, Out, R)>,
} }
impl<T, Req, F, R, In, Out, Err> Apply<T, Req, F, R, In, Out, Err> impl<T, Req, F, R, In, Out, Err> Apply<T, Req, F, R, In, Out, Err>
where where
T: Service<Req, Error = Err>, T: Service<Req, Error = Err>,
F: Fn(In, Rc<T>) -> R, F: Fn(In, ApplyService<T>) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
/// Create new `Apply` combinator /// Create new `Apply` combinator
@ -53,7 +52,7 @@ where
Self { Self {
f, f,
service: Rc::new(service), service: Rc::new(service),
r: PhantomData, r: marker::PhantomData,
} }
} }
} }
@ -61,22 +60,37 @@ where
impl<T, Req, F, R, In, Out, Err> Clone for Apply<T, Req, F, R, In, Out, Err> impl<T, Req, F, R, In, Out, Err> Clone for Apply<T, Req, F, R, In, Out, Err>
where where
T: Service<Req, Error = Err> + Clone, T: Service<Req, Error = Err> + Clone,
F: Fn(In, Rc<T>) -> R + Clone, F: Fn(In, ApplyService<T>) -> R + Clone,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Apply { Apply {
service: self.service.clone(), service: self.service.clone(),
f: self.f.clone(), f: self.f.clone(),
r: PhantomData, r: marker::PhantomData,
} }
} }
} }
pub struct ApplyService<S> {
svc: Rc<S>,
index: usize,
waiters: Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
}
impl<S> ApplyService<S> {
pub fn call<R>(&self, req: R) -> ServiceCall<'_, S, R>
where
S: Service<R>,
{
Ctx::<S>::new(self.index, &self.waiters).call(&self.svc, req)
}
}
impl<T, Req, F, R, In, Out, Err> Service<In> for Apply<T, Req, F, R, In, Out, Err> impl<T, Req, F, R, In, Out, Err> Service<In> for Apply<T, Req, F, R, In, Out, Err>
where where
T: Service<Req, Error = Err>, T: Service<Req, Error = Err>,
F: Fn(In, Rc<T>) -> R, F: Fn(In, ApplyService<T>) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
type Response = Out; type Response = Out;
@ -87,8 +101,17 @@ where
crate::forward_poll_shutdown!(service); crate::forward_poll_shutdown!(service);
#[inline] #[inline]
fn call(&self, req: In) -> Self::Future<'_> { fn call<'a>(&'a self, req: In, ctx: Ctx<'a, Self>) -> Self::Future<'a>
(self.f)(req, self.service.clone()) where
In: 'a,
{
let (index, waiters) = ctx.into_inner();
let svc = ApplyService {
index,
waiters: waiters.clone(),
svc: self.service.clone(),
};
(self.f)(req, svc)
} }
} }
@ -96,18 +119,18 @@ where
pub struct ApplyFactory<T, Req, Cfg, F, R, In, Out, Err> pub struct ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where where
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Rc<T::Service>) -> R + Clone, F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
service: T, service: T,
f: F, f: F,
r: PhantomData<fn(Req, Cfg) -> (R, In, Out)>, r: marker::PhantomData<fn(Req, Cfg) -> (R, In, Out)>,
} }
impl<T, Req, Cfg, F, R, In, Out, Err> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err> impl<T, Req, Cfg, F, R, In, Out, Err> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where where
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Rc<T::Service>) -> R + Clone, F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
/// Create new `ApplyNewService` new service instance /// Create new `ApplyNewService` new service instance
@ -115,7 +138,7 @@ where
Self { Self {
f, f,
service, service,
r: PhantomData, r: marker::PhantomData,
} }
} }
} }
@ -124,14 +147,14 @@ impl<T, Req, Cfg, F, R, In, Out, Err> Clone
for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err> for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where where
T: ServiceFactory<Req, Cfg, Error = Err> + Clone, T: ServiceFactory<Req, Cfg, Error = Err> + Clone,
F: Fn(In, Rc<T::Service>) -> R + Clone, F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
service: self.service.clone(), service: self.service.clone(),
f: self.f.clone(), f: self.f.clone(),
r: PhantomData, r: marker::PhantomData,
} }
} }
} }
@ -140,7 +163,7 @@ impl<T, Req, Cfg, F, R, In, Out, Err> ServiceFactory<In, Cfg>
for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err> for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where where
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Rc<T::Service>) -> R + Clone, F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
type Response = Out; type Response = Out;
@ -155,7 +178,7 @@ where
ApplyFactoryResponse { ApplyFactoryResponse {
fut: self.service.create(cfg), fut: self.service.create(cfg),
f: Some(self.f.clone()), f: Some(self.f.clone()),
_t: PhantomData, _t: marker::PhantomData,
} }
} }
} }
@ -165,14 +188,14 @@ pin_project_lite::pin_project! {
where where
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
T: 'f, T: 'f,
F: Fn(In, Rc<T::Service>) -> R, F: Fn(In, ApplyService<T::Service>) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
Cfg: 'f, Cfg: 'f,
{ {
#[pin] #[pin]
fut: T::Future<'f>, fut: T::Future<'f>,
f: Option<F>, f: Option<F>,
_t: PhantomData<(In, Out)>, _t: marker::PhantomData<(In, Out)>,
} }
} }
@ -180,12 +203,12 @@ impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future
for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err>
where where
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Rc<T::Service>) -> R, F: Fn(In, ApplyService<T::Service>) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
type Output = Result<Apply<T::Service, Req, F, R, In, Out, Err>, T::InitError>; type Output = Result<Apply<T::Service, Req, F, R, In, Out, Err>, T::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? { if let Poll::Ready(svc) = this.fut.poll(cx)? {
@ -202,7 +225,7 @@ mod tests {
use std::task::Poll; use std::task::Poll;
use super::*; use super::*;
use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; use crate::{pipeline, pipeline_factory, Ctx, Service, ServiceFactory};
#[derive(Clone)] #[derive(Clone)]
struct Srv; struct Srv;
@ -212,7 +235,10 @@ mod tests {
type Error = (); type Error = ();
type Future<'f> = Ready<(), ()>; type Future<'f> = Ready<(), ()>;
fn call(&self, _: ()) -> Self::Future<'_> { fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a>
where
(): 'a,
{
Ready::Ok(()) Ready::Ok(())
} }
} }
@ -220,15 +246,13 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_call() { async fn test_call() {
let srv = pipeline( let srv = pipeline(
apply_fn(Srv, |req: &'static str, srv| { apply_fn(Srv, |req: &'static str, srv| async move {
let fut = srv.call(()); srv.call(()).await.unwrap();
async move {
fut.await.unwrap();
Ok((req, ())) Ok((req, ()))
}
}) })
.clone(), .clone(),
); )
.container();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = lazy(|cx| srv.poll_shutdown(cx)).await; let res = lazy(|cx| srv.poll_shutdown(cx)).await;
@ -244,18 +268,15 @@ mod tests {
let new_srv = pipeline_factory( let new_srv = pipeline_factory(
apply_fn_factory( apply_fn_factory(
|| Ready::<_, ()>::Ok(Srv), || Ready::<_, ()>::Ok(Srv),
|req: &'static str, srv| { |req: &'static str, srv| async move {
let fut = srv.call(()); srv.call(()).await.unwrap();
async move {
fut.await.unwrap();
Ok((req, ())) Ok((req, ()))
}
}, },
) )
.clone(), .clone(),
); );
let srv = new_srv.create(&()).await.unwrap(); let srv = new_srv.container(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

View file

@ -1,4 +1,7 @@
use std::{future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; use std::task::{Context, Poll, Waker};
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc};
use crate::Ctx;
pub type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>; pub type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>;
@ -54,7 +57,12 @@ pub trait ServiceObj<Req> {
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()>; fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()>;
fn call<'a>(&'a self, req: Req) -> BoxFuture<'a, Self::Response, Self::Error> fn call<'a>(
&'a self,
req: Req,
idx: usize,
waiters: &'a Rc<RefCell<slab::Slab<Option<Waker>>>>,
) -> BoxFuture<'a, Self::Response, Self::Error>
where where
Req: 'a; Req: 'a;
} }
@ -78,11 +86,16 @@ where
} }
#[inline] #[inline]
fn call<'a>(&'a self, req: Req) -> BoxFuture<'a, Self::Response, Self::Error> fn call<'a>(
&'a self,
req: Req,
idx: usize,
waiters: &'a Rc<RefCell<slab::Slab<Option<Waker>>>>,
) -> BoxFuture<'a, Self::Response, Self::Error>
where where
Req: 'a, Req: 'a,
{ {
Box::pin(crate::Service::call(self, req)) Box::pin(Ctx::<'a, S>::new(idx, waiters).call_nowait(self, req))
} }
} }
@ -181,8 +194,12 @@ where
} }
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>
self.0.call(req) where
Req: 'a,
{
let (index, waiters) = ctx.into_inner();
self.0.call(req, index, waiters)
} }
} }
@ -213,8 +230,12 @@ where
} }
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>
self.0.call(req) where
Req: 'a,
{
let (index, waiters) = ctx.into_inner();
self.0.call(req, index, waiters)
} }
} }

370
ntex-service/src/ctx.rs Normal file
View file

@ -0,0 +1,370 @@
use std::{cell::RefCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll};
use crate::{Service, ServiceFactory};
pub struct Container<S, R> {
svc: Rc<S>,
index: usize,
waiters: Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
_t: marker::PhantomData<R>,
}
impl<S, R> Container<S, R>
where
S: Service<R>,
{
#[inline]
pub fn new(svc: S) -> Self {
let mut waiters = slab::Slab::new();
let index = waiters.insert(None);
Container {
index,
svc: Rc::new(svc),
waiters: Rc::new(RefCell::new(waiters)),
_t: marker::PhantomData,
}
}
#[inline]
/// Returns `Ready` when the service is able to process requests.
pub fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), S::Error>> {
let res = self.svc.poll_ready(cx);
if res.is_pending() {
self.waiters.borrow_mut()[self.index] = Some(cx.waker().clone());
}
res
}
#[inline]
/// Shutdown enclosed service.
pub fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> Poll<()> {
self.svc.poll_shutdown(cx)
}
#[inline]
/// Process the request and return the response asynchronously.
pub fn call<'a>(&'a self, req: R) -> ServiceCall<'a, S, R> {
let ctx = Ctx::<'a, S> {
index: self.index,
waiters: &self.waiters,
_t: marker::PhantomData,
};
ctx.call(self.svc.as_ref(), req)
}
pub(crate) fn create<F: ServiceFactory<R, C>, C>(
f: &F,
cfg: C,
) -> ContainerFactory<'_, F, R, C> {
ContainerFactory {
fut: f.create(cfg),
_t: marker::PhantomData,
}
}
}
impl<S, R> Clone for Container<S, R> {
fn clone(&self) -> Self {
let index = self.waiters.borrow_mut().insert(None);
Self {
index,
svc: self.svc.clone(),
waiters: self.waiters.clone(),
_t: marker::PhantomData,
}
}
}
impl<S, R> From<S> for Container<S, R>
where
S: Service<R>,
{
fn from(svc: S) -> Self {
Container::new(svc)
}
}
impl<S, R> Drop for Container<S, R> {
fn drop(&mut self) {
let mut waiters = self.waiters.borrow_mut();
waiters.remove(self.index);
for (_, waker) in &mut *waiters {
if let Some(waker) = waker.take() {
waker.wake();
}
}
}
}
pub struct Ctx<'b, S: ?Sized> {
index: usize,
waiters: &'b Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
_t: marker::PhantomData<Rc<S>>,
}
impl<'b, S: ?Sized> Ctx<'b, S> {
pub(crate) fn new(
index: usize,
waiters: &'b Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
) -> Self {
Self {
index,
waiters,
_t: marker::PhantomData,
}
}
pub(crate) fn into_inner(
self,
) -> (usize, &'b Rc<RefCell<slab::Slab<Option<task::Waker>>>>) {
(self.index, self.waiters)
}
/// Call service, do not check service readiness
pub(crate) fn call_nowait<T, R>(&self, svc: &'b T, req: R) -> T::Future<'b>
where
T: Service<R> + ?Sized,
R: 'b,
{
svc.call(
req,
Ctx {
index: self.index,
waiters: self.waiters,
_t: marker::PhantomData,
},
)
}
#[inline]
/// Wait for service readiness and then call service
pub fn call<T, R>(&self, svc: &'b T, req: R) -> ServiceCall<'b, T, R>
where
T: Service<R> + ?Sized,
R: 'b,
{
ServiceCall {
state: ServiceCallState::Ready {
svc,
req: Some(req),
index: self.index,
waiters: self.waiters,
},
}
}
}
impl<'b, S: ?Sized> Clone for Ctx<'b, S> {
fn clone(&self) -> Self {
Self {
index: self.index,
waiters: self.waiters,
_t: marker::PhantomData,
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ServiceCall<'a, T, Req>
where
T: Service<Req>,
T: 'a,
T: ?Sized,
Req: 'a,
{
#[pin]
state: ServiceCallState<'a, T, Req>,
}
}
pin_project_lite::pin_project! {
#[project = ServiceCallStateProject]
enum ServiceCallState<'a, T, Req>
where
T: Service<Req>,
T: 'a,
T: ?Sized,
Req: 'a,
{
Ready { req: Option<Req>,
svc: &'a T,
index: usize,
waiters: &'a Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
},
Call { #[pin] fut: T::Future<'a> },
Empty,
}
}
impl<'a, T, Req> Future for ServiceCall<'a, T, Req>
where
T: Service<Req> + ?Sized,
{
type Output = Result<T::Response, T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
ServiceCallStateProject::Ready {
req,
svc,
index,
waiters,
} => match svc.poll_ready(cx)? {
Poll::Ready(()) => {
for (_, waker) in &mut *waiters.borrow_mut() {
if let Some(waker) = waker.take() {
waker.wake();
}
}
let fut = svc.call(
req.take().unwrap(),
Ctx {
waiters,
index: *index,
_t: marker::PhantomData,
},
);
this.state.set(ServiceCallState::Call { fut });
self.poll(cx)
}
Poll::Pending => {
waiters.borrow_mut()[*index] = Some(cx.waker().clone());
Poll::Pending
}
},
ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| {
this.state.set(ServiceCallState::Empty);
r
}),
ServiceCallStateProject::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ContainerFactory<'f, F, R, C>
where F: ServiceFactory<R, C>,
F: ?Sized,
F: 'f,
C: 'f,
{
#[pin]
fut: F::Future<'f>,
_t: marker::PhantomData<(R, C)>,
}
}
impl<'f, F, R, C> Future for ContainerFactory<'f, F, R, C>
where
F: ServiceFactory<R, C> + 'f,
{
type Output = Result<Container<F::Service, R>, F::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(Container::new(task::ready!(self
.project()
.fut
.poll(cx))?)))
}
}
#[cfg(test)]
mod tests {
use ntex_util::{channel::condition, future::lazy, future::Ready, time};
use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll};
use super::*;
struct Srv(Rc<Cell<usize>>, condition::Waiter);
impl Service<&'static str> for Srv {
type Response = &'static str;
type Error = ();
type Future<'f> = Ready<Self::Response, ()>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
self.1.poll_ready(cx).map(|_| Ok(()))
}
fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a>
where
&'static str: 'a,
{
Ready::Ok(req)
}
}
#[ntex::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv1 = Container::from(Srv(cnt.clone(), con.wait()));
let srv2 = srv1.clone();
let res = lazy(|cx| srv1.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
assert_eq!(cnt.get(), 1);
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
assert_eq!(cnt.get(), 2);
con.notify();
let res = lazy(|cx| srv1.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 3);
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
assert_eq!(cnt.get(), 4);
}
#[ntex::test]
async fn test_shared_call() {
let data = Rc::new(RefCell::new(Vec::new()));
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv1 = Container::from(Srv(cnt.clone(), con.wait()));
let srv2 = srv1.clone();
let data1 = data.clone();
ntex::rt::spawn(async move {
let i = srv1.call("srv1").await.unwrap();
data1.borrow_mut().push(i);
});
let data2 = data.clone();
ntex::rt::spawn(async move {
let i = srv2.call("srv2").await.unwrap();
data2.borrow_mut().push(i);
});
time::sleep(time::Millis(50)).await;
con.notify();
time::sleep(time::Millis(150)).await;
assert_eq!(cnt.get(), 4);
assert_eq!(&*data.borrow(), &["srv2"]);
con.notify();
time::sleep(time::Millis(150)).await;
assert_eq!(cnt.get(), 5);
assert_eq!(&*data.borrow(), &["srv2", "srv1"]);
}
}

View file

@ -1,6 +1,6 @@
use std::{future::ready, future::Future, future::Ready, marker::PhantomData}; use std::{future::ready, future::Future, future::Ready, marker::PhantomData};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use crate::{Ctx, IntoService, IntoServiceFactory, Service, ServiceFactory};
#[inline] #[inline]
/// Create `ServiceFactory` for function that can act as a `Service` /// Create `ServiceFactory` for function that can act as a `Service`
@ -40,7 +40,7 @@ where
/// }); /// });
/// ///
/// // construct new service /// // construct new service
/// let srv = factory.create(&()).await?; /// let srv = factory.container(&()).await?;
/// ///
/// // now we can use `div` service /// // now we can use `div` service
/// let result = srv.call((10, 20)).await?; /// let result = srv.call((10, 20)).await?;
@ -81,7 +81,7 @@ where
/// }); /// });
/// ///
/// // construct new service with config argument /// // construct new service with config argument
/// let srv = factory.create(&10).await?; /// let srv = factory.container(&10).await?;
/// ///
/// let result = srv.call(10).await?; /// let result = srv.call(10).await?;
/// assert_eq!(result, 100); /// assert_eq!(result, 100);
@ -128,7 +128,10 @@ where
type Future<'f> = Fut where Self: 'f, Req: 'f; type Future<'f> = Fut where Self: 'f, Req: 'f;
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a>
where
Req: 'a,
{
(self.f)(req) (self.f)(req)
} }
} }
@ -190,7 +193,10 @@ where
type Future<'f> = Fut where Self: 'f; type Future<'f> = Fut where Self: 'f;
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a>
where
Req: 'a,
{
(self.f)(req) (self.f)(req)
} }
} }
@ -348,19 +354,19 @@ mod tests {
use std::task::Poll; use std::task::Poll;
use super::*; use super::*;
use crate::{Service, ServiceFactory}; use crate::{Container, ServiceFactory};
#[ntex::test] #[ntex::test]
async fn test_fn_service() { async fn test_fn_service() {
let new_srv = fn_service(|()| async { Ok::<_, ()>("srv") }).clone(); let new_srv = fn_service(|()| async { Ok::<_, ()>("srv") }).clone();
let srv = new_srv.create(()).await.unwrap(); let srv = Container::new(new_srv.create(()).await.unwrap());
let res = srv.call(()).await; let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv"); assert_eq!(res.unwrap(), "srv");
let srv2 = new_srv.clone(); let srv2 = Container::new(new_srv.clone());
let res = srv2.call(()).await; let res = srv2.call(()).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv"); assert_eq!(res.unwrap(), "srv");
@ -370,12 +376,14 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_fn_service_service() { async fn test_fn_service_service() {
let srv = fn_service(|()| async { Ok::<_, ()>("srv") }) let srv = Container::new(
fn_service(|()| async { Ok::<_, ()>("srv") })
.clone() .clone()
.create(&()) .create(&())
.await .await
.unwrap() .unwrap()
.clone(); .clone(),
);
let res = srv.call(()).await; let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
@ -396,7 +404,7 @@ mod tests {
}) })
.clone(); .clone();
let srv = new_srv.create(&1).await.unwrap(); let srv = Container::new(new_srv.create(&1).await.unwrap());
let res = srv.call(()).await; let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok()); assert!(res.is_ok());

View file

@ -3,7 +3,7 @@ use std::future::{ready, Ready};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::Service; use crate::{Ctx, Service};
#[inline] #[inline]
/// Create `FnShutdown` for function that can act as a `on_shutdown` callback. /// Create `FnShutdown` for function that can act as a `on_shutdown` callback.
@ -60,7 +60,10 @@ where
} }
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a>
where
Req: 'a,
{
ready(Ok(req)) ready(Ok(req))
} }
} }
@ -70,7 +73,7 @@ mod tests {
use ntex_util::future::lazy; use ntex_util::future::lazy;
use std::{rc::Rc, task::Poll}; use std::{rc::Rc, task::Poll};
use crate::{fn_service, pipeline}; use crate::{fn_service, pipeline, Container};
use super::*; use super::*;
@ -83,7 +86,7 @@ mod tests {
is_called2.set(true); is_called2.set(true);
}); });
let pipe = pipeline(srv).and_then(on_shutdown).clone(); let pipe = Container::new(pipeline(srv).and_then(on_shutdown).clone());
let res = pipe.call(()).await; let res = pipe.call(()).await;
assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(())));

View file

@ -1,7 +1,6 @@
//! See [`Service`] docs for information on this crate's foundational trait. //! See [`Service`] docs for information on this crate's foundational trait.
#![deny(rust_2018_idioms, warnings)] #![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)]
use std::future::Future; use std::future::Future;
use std::rc::Rc; use std::rc::Rc;
@ -10,6 +9,7 @@ use std::task::{self, Context, Poll};
mod and_then; mod and_then;
mod apply; mod apply;
pub mod boxed; pub mod boxed;
mod ctx;
mod fn_service; mod fn_service;
mod fn_shutdown; mod fn_shutdown;
mod macros; mod macros;
@ -22,6 +22,7 @@ mod pipeline;
mod then; mod then;
pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::ctx::{Container, ContainerFactory, Ctx, ServiceCall};
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::fn_shutdown::fn_shutdown; pub use self::fn_shutdown::fn_shutdown;
pub use self::map_config::{map_config, unit_config}; pub use self::map_config::{map_config, unit_config};
@ -53,13 +54,12 @@ pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
/// simple API surfaces. This leads to simpler design of each service, improves test-ability and /// simple API surfaces. This leads to simpler design of each service, improves test-ability and
/// makes composition easier. /// makes composition easier.
/// ///
/// ```rust /// ```rust,ignore
/// # use std::convert::Infallible; /// # use std::convert::Infallible;
/// # use std::future::Future; /// # use std::future::Future;
/// # use std::pin::Pin; /// # use std::pin::Pin;
/// # use std::task::{Context, Poll};
/// # /// #
/// # use ntex_service::Service; /// # use ntex_service::{Ctx, Service};
/// ///
/// struct MyService; /// struct MyService;
/// ///
@ -68,7 +68,7 @@ pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
/// type Error = Infallible; /// type Error = Infallible;
/// type Future<'f> = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; /// type Future<'f> = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
/// ///
/// fn call(&self, req: u8) -> Self::Future<'_> { /// fn call<'a>(&'a self, req: u8, _: Ctx<'a, Self>) -> Self::Future<'a> {
/// Box::pin(std::future::ready(Ok(req as u64))) /// Box::pin(std::future::ready(Ok(req as u64)))
/// } /// }
/// } /// }
@ -101,7 +101,9 @@ pub trait Service<Req> {
/// ///
/// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be
/// resilient to this fact. /// resilient to this fact.
fn call(&self, req: Req) -> Self::Future<'_>; fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>
where
Req: 'a;
#[inline] #[inline]
/// Returns `Ready` when the service is able to process requests. /// Returns `Ready` when the service is able to process requests.
@ -154,7 +156,7 @@ pub trait Service<Req> {
/// error type. /// error type.
/// ///
/// Note that this function consumes the receiving service and returns a wrapped version of it. /// Note that this function consumes the receiving service and returns a wrapped version of it.
fn map_err<F, E>(self, f: F) -> crate::dev::MapErr<Self, Req, F, E> fn map_err<F, E>(self, f: F) -> crate::dev::MapErr<Self, F, E>
where where
Self: Sized, Self: Sized,
F: Fn(Self::Error) -> E, F: Fn(Self::Error) -> E,
@ -196,6 +198,14 @@ pub trait ServiceFactory<Req, Cfg = ()> {
/// Create and return a new service value asynchronously. /// Create and return a new service value asynchronously.
fn create(&self, cfg: Cfg) -> Self::Future<'_>; fn create(&self, cfg: Cfg) -> Self::Future<'_>;
/// Create and return a new service value asynchronously and wrap into a container
fn container(&self, cfg: Cfg) -> ContainerFactory<'_, Self, Req, Cfg>
where
Self: Sized,
{
Container::<Self::Service, Req>::create(self, cfg)
}
#[inline] #[inline]
/// Map this service's output to a different type, returning a new service /// Map this service's output to a different type, returning a new service
/// of the resulting type. /// of the resulting type.
@ -250,8 +260,11 @@ where
} }
#[inline] #[inline]
fn call(&self, request: Req) -> S::Future<'_> { fn call<'s>(&'s self, request: Req, ctx: Ctx<'s, Self>) -> S::Future<'s>
(**self).call(request) where
Req: 's,
{
ctx.call_nowait(&**self, request)
} }
} }
@ -274,8 +287,11 @@ where
} }
#[inline] #[inline]
fn call(&self, request: Req) -> S::Future<'_> { fn call<'a>(&'a self, request: Req, ctx: Ctx<'a, Self>) -> S::Future<'a>
(**self).call(request) where
Req: 'a,
{
ctx.call_nowait(&**self, request)
} }
} }

View file

@ -1,6 +1,6 @@
use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use super::{Service, ServiceFactory}; use super::{Ctx, Service, ServiceCall, ServiceFactory};
/// Service for the `map` combinator, changing the type of a service's response. /// Service for the `map` combinator, changing the type of a service's response.
/// ///
@ -54,15 +54,19 @@ where
crate::forward_poll_shutdown!(service); crate::forward_poll_shutdown!(service);
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>
where
Req: 'a,
{
MapFuture { MapFuture {
fut: self.service.call(req), fut: ctx.call(&self.service, req),
slf: self, slf: self,
} }
} }
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapFuture<'f, A, F, Req, Res> pub struct MapFuture<'f, A, F, Req, Res>
where where
A: Service<Req>, A: Service<Req>,
@ -72,7 +76,7 @@ pin_project_lite::pin_project! {
{ {
slf: &'f Map<A, F, Req, Res>, slf: &'f Map<A, F, Req, Res>,
#[pin] #[pin]
fut: A::Future<'f>, fut: ServiceCall<'f, A, Req>,
} }
} }
@ -154,6 +158,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapFactoryFuture<'f, A, F, Req, Res, Cfg> pub struct MapFactoryFuture<'f, A, F, Req, Res, Cfg>
where where
A: ServiceFactory<Req, Cfg>, A: ServiceFactory<Req, Cfg>,
@ -190,7 +195,7 @@ mod tests {
use ntex_util::future::{lazy, Ready}; use ntex_util::future::{lazy, Ready};
use super::*; use super::*;
use crate::{fn_factory, Service, ServiceFactory}; use crate::{fn_factory, Container, Ctx, Service, ServiceFactory};
#[derive(Clone)] #[derive(Clone)]
struct Srv; struct Srv;
@ -204,14 +209,17 @@ mod tests {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&self, _: ()) -> Self::Future<'_> { fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a>
where
(): 'a,
{
Ready::Ok(()) Ready::Ok(())
} }
} }
#[ntex::test] #[ntex::test]
async fn test_service() { async fn test_service() {
let srv = Srv.map(|_| "ok").clone(); let srv = Container::new(Srv.map(|_| "ok").clone());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), "ok"); assert_eq!(res.unwrap(), "ok");
@ -225,7 +233,7 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_pipeline() { async fn test_pipeline() {
let srv = crate::pipeline(Srv).map(|_| "ok").clone(); let srv = Container::new(crate::pipeline(Srv).map(|_| "ok").clone());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), "ok"); assert_eq!(res.unwrap(), "ok");
@ -242,7 +250,7 @@ mod tests {
let new_srv = fn_factory(|| async { Ok::<_, ()>(Srv) }) let new_srv = fn_factory(|| async { Ok::<_, ()>(Srv) })
.map(|_| "ok") .map(|_| "ok")
.clone(); .clone();
let srv = new_srv.create(&()).await.unwrap(); let srv = Container::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("ok")); assert_eq!(res.unwrap(), ("ok"));
@ -253,7 +261,7 @@ mod tests {
let new_srv = crate::pipeline_factory(fn_factory(|| async { Ok::<_, ()>(Srv) })) let new_srv = crate::pipeline_factory(fn_factory(|| async { Ok::<_, ()>(Srv) }))
.map(|_| "ok") .map(|_| "ok")
.clone(); .clone();
let srv = new_srv.create(&()).await.unwrap(); let srv = Container::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("ok")); assert_eq!(res.unwrap(), ("ok"));

View file

@ -115,6 +115,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct UnitConfigFuture<'f, A, R, C> pub struct UnitConfigFuture<'f, A, R, C>
where A: ServiceFactory<R>, where A: ServiceFactory<R>,
A: 'f, A: 'f,

View file

@ -1,20 +1,20 @@
use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use super::{Service, ServiceFactory}; use super::{Ctx, Service, ServiceCall, ServiceFactory};
/// Service for the `map_err` combinator, changing the type of a service's /// Service for the `map_err` combinator, changing the type of a service's
/// error. /// error.
/// ///
/// This is created by the `ServiceExt::map_err` method. /// This is created by the `ServiceExt::map_err` method.
pub struct MapErr<A, R, F, E> { pub struct MapErr<A, F, E> {
service: A, service: A,
f: F, f: F,
_t: PhantomData<fn(R) -> E>, _t: PhantomData<E>,
} }
impl<A, R, F, E> MapErr<A, R, F, E> { impl<A, F, E> MapErr<A, F, E> {
/// Create new `MapErr` combinator /// Create new `MapErr` combinator
pub(crate) fn new(service: A, f: F) -> Self pub(crate) fn new<R>(service: A, f: F) -> Self
where where
A: Service<R>, A: Service<R>,
F: Fn(A::Error) -> E, F: Fn(A::Error) -> E,
@ -27,7 +27,7 @@ impl<A, R, F, E> MapErr<A, R, F, E> {
} }
} }
impl<A, R, F, E> Clone for MapErr<A, R, F, E> impl<A, F, E> Clone for MapErr<A, F, E>
where where
A: Clone, A: Clone,
F: Clone, F: Clone,
@ -42,7 +42,7 @@ where
} }
} }
impl<A, R, F, E> Service<R> for MapErr<A, R, F, E> impl<A, R, F, E> Service<R> for MapErr<A, F, E>
where where
A: Service<R>, A: Service<R>,
F: Fn(A::Error) -> E, F: Fn(A::Error) -> E,
@ -57,10 +57,13 @@ where
} }
#[inline] #[inline]
fn call(&self, req: R) -> Self::Future<'_> { fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a>
where
R: 'a,
{
MapErrFuture { MapErrFuture {
slf: self, slf: self,
fut: self.service.call(req), fut: ctx.call(&self.service, req),
} }
} }
@ -68,6 +71,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapErrFuture<'f, A, R, F, E> pub struct MapErrFuture<'f, A, R, F, E>
where where
A: Service<R>, A: Service<R>,
@ -75,9 +79,9 @@ pin_project_lite::pin_project! {
R: 'f, R: 'f,
F: Fn(A::Error) -> E, F: Fn(A::Error) -> E,
{ {
slf: &'f MapErr<A, R, F, E>, slf: &'f MapErr<A, F, E>,
#[pin] #[pin]
fut: A::Future<'f>, fut: ServiceCall<'f, A, R>,
} }
} }
@ -145,7 +149,7 @@ where
type Response = A::Response; type Response = A::Response;
type Error = E; type Error = E;
type Service = MapErr<A::Service, R, F, E>; type Service = MapErr<A::Service, F, E>;
type InitError = A::InitError; type InitError = A::InitError;
type Future<'f> = MapErrFactoryFuture<'f, A, R, C, F, E> where Self: 'f, C: 'f; type Future<'f> = MapErrFactoryFuture<'f, A, R, C, F, E> where Self: 'f, C: 'f;
@ -159,6 +163,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapErrFactoryFuture<'f, A, R, C, F, E> pub struct MapErrFactoryFuture<'f, A, R, C, F, E>
where where
A: ServiceFactory<R, C>, A: ServiceFactory<R, C>,
@ -177,7 +182,7 @@ where
A: ServiceFactory<R, C>, A: ServiceFactory<R, C>,
F: Fn(A::Error) -> E + Clone, F: Fn(A::Error) -> E + Clone,
{ {
type Output = Result<MapErr<A::Service, R, F, E>, A::InitError>; type Output = Result<MapErr<A::Service, F, E>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
@ -191,12 +196,13 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use crate::{fn_factory, Service, ServiceFactory};
use ntex_util::future::{lazy, Ready}; use ntex_util::future::{lazy, Ready};
use super::*;
use crate::{fn_factory, Container, Ctx, Service, ServiceFactory};
#[derive(Clone)] #[derive(Clone)]
struct Srv; struct Srv(bool);
impl Service<()> for Srv { impl Service<()> for Srv {
type Response = (); type Response = ();
@ -204,17 +210,24 @@ mod tests {
type Future<'f> = Ready<(), ()>; type Future<'f> = Ready<(), ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.0 {
Poll::Ready(Err(())) Poll::Ready(Err(()))
} else {
Poll::Ready(Ok(()))
}
} }
fn call(&self, _: ()) -> Self::Future<'_> { fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a>
where
(): 'a,
{
Ready::Err(()) Ready::Err(())
} }
} }
#[ntex::test] #[ntex::test]
async fn test_poll_ready() { async fn test_poll_ready() {
let srv = Srv.map_err(|_| "error"); let srv = Srv(true).map_err(|_| "error");
let res = lazy(|cx| srv.poll_ready(cx)).await; let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Err("error"))); assert_eq!(res, Poll::Ready(Err("error")));
@ -224,7 +237,7 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_service() { async fn test_service() {
let srv = Srv.map_err(|_| "error").clone(); let srv = Container::new(Srv(false).map_err(|_| "error").clone());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_err()); assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error"); assert_eq!(res.err().unwrap(), "error");
@ -232,7 +245,7 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_pipeline() { async fn test_pipeline() {
let srv = crate::pipeline(Srv).map_err(|_| "error").clone(); let srv = Container::new(crate::pipeline(Srv(false)).map_err(|_| "error").clone());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_err()); assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error"); assert_eq!(res.err().unwrap(), "error");
@ -240,10 +253,10 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_factory() { async fn test_factory() {
let new_srv = fn_factory(|| Ready::<_, ()>::Ok(Srv)) let new_srv = fn_factory(|| Ready::<_, ()>::Ok(Srv(false)))
.map_err(|_| "error") .map_err(|_| "error")
.clone(); .clone();
let srv = new_srv.create(&()).await.unwrap(); let srv = Container::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_err()); assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error"); assert_eq!(res.err().unwrap(), "error");
@ -251,10 +264,11 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_pipeline_factory() { async fn test_pipeline_factory() {
let new_srv = crate::pipeline_factory(fn_factory(|| async { Ok::<Srv, ()>(Srv) })) let new_srv =
crate::pipeline_factory(fn_factory(|| async { Ok::<Srv, ()>(Srv(false)) }))
.map_err(|_| "error") .map_err(|_| "error")
.clone(); .clone();
let srv = new_srv.create(&()).await.unwrap(); let srv = Container::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await; let res = srv.call(()).await;
assert!(res.is_err()); assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error"); assert_eq!(res.err().unwrap(), "error");

View file

@ -60,6 +60,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct MapInitErrFuture<'f, A, R, C, F, E> pub struct MapInitErrFuture<'f, A, R, C, F, E>
where where
A: ServiceFactory<R, C>, A: ServiceFactory<R, C>,

View file

@ -136,6 +136,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ApplyMiddlewareFuture<'f, T, S, R, C> pub struct ApplyMiddlewareFuture<'f, T, S, R, C>
where where
S: ServiceFactory<R, C>, S: ServiceFactory<R, C>,
@ -213,7 +214,7 @@ mod tests {
use std::marker; use std::marker;
use super::*; use super::*;
use crate::{fn_service, Service, ServiceFactory}; use crate::{fn_service, Container, Ctx, Service, ServiceCall, ServiceFactory};
#[derive(Clone)] #[derive(Clone)]
struct Tr<R>(marker::PhantomData<R>); struct Tr<R>(marker::PhantomData<R>);
@ -232,14 +233,17 @@ mod tests {
impl<S: Service<R>, R> Service<R> for Srv<S, R> { impl<S: Service<R>, R> Service<R> for Srv<S, R> {
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
type Future<'f> = S::Future<'f> where Self: 'f, R: 'f; type Future<'f> = ServiceCall<'f, S, R> where Self: 'f, R: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx) self.0.poll_ready(cx)
} }
fn call(&self, req: R) -> Self::Future<'_> { fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a>
self.0.call(req) where
R: 'a,
{
ctx.call(&self.0, req)
} }
} }
@ -251,7 +255,7 @@ mod tests {
) )
.clone(); .clone();
let srv = factory.create(&()).await.unwrap().clone(); let srv = Container::new(factory.create(&()).await.unwrap().clone());
let res = srv.call(10).await; let res = srv.call(10).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 20); assert_eq!(res.unwrap(), 20);
@ -267,7 +271,7 @@ mod tests {
.apply(Rc::new(Tr(marker::PhantomData).clone())) .apply(Rc::new(Tr(marker::PhantomData).clone()))
.clone(); .clone();
let srv = factory.create(&()).await.unwrap().clone(); let srv = Container::new(factory.create(&()).await.unwrap().clone());
let res = srv.call(10).await; let res = srv.call(10).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 20); assert_eq!(res.unwrap(), 20);

View file

@ -1,6 +1,7 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use crate::and_then::{AndThen, AndThenFactory}; use crate::and_then::{AndThen, AndThenFactory};
use crate::ctx::{Container, Ctx, ServiceCall};
use crate::map::{Map, MapFactory}; use crate::map::{Map, MapFactory};
use crate::map_err::{MapErr, MapErrFactory}; use crate::map_err::{MapErr, MapErrFactory};
use crate::map_init_err::MapInitErr; use crate::map_init_err::MapInitErr;
@ -105,7 +106,7 @@ impl<Req, Svc: Service<Req>> Pipeline<Req, Svc> {
/// ///
/// Note that this function consumes the receiving service and returns a /// Note that this function consumes the receiving service and returns a
/// wrapped version of it. /// wrapped version of it.
pub fn map_err<F, Err>(self, f: F) -> Pipeline<Req, MapErr<Svc, Req, F, Err>> pub fn map_err<F, Err>(self, f: F) -> Pipeline<Req, MapErr<Svc, F, Err>>
where where
Self: Sized, Self: Sized,
F: Fn(Svc::Error) -> Err, F: Fn(Svc::Error) -> Err,
@ -115,6 +116,11 @@ impl<Req, Svc: Service<Req>> Pipeline<Req, Svc> {
_t: PhantomData, _t: PhantomData,
} }
} }
/// Create service container
pub fn container(self) -> Container<Svc, Req> {
Container::new(self.service)
}
} }
impl<Req, Svc> Clone for Pipeline<Req, Svc> impl<Req, Svc> Clone for Pipeline<Req, Svc>
@ -132,14 +138,17 @@ where
impl<Req, Svc: Service<Req>> Service<Req> for Pipeline<Req, Svc> { impl<Req, Svc: Service<Req>> Service<Req> for Pipeline<Req, Svc> {
type Response = Svc::Response; type Response = Svc::Response;
type Error = Svc::Error; type Error = Svc::Error;
type Future<'f> = Svc::Future<'f> where Self: 'f, Req: 'f; type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f;
crate::forward_poll_ready!(service); crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service); crate::forward_poll_shutdown!(service);
#[inline] #[inline]
fn call(&self, req: Req) -> Self::Future<'_> { fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>
self.service.call(req) where
Req: 'a,
{
ctx.call(&self.service, req)
} }
} }

View file

@ -1,6 +1,6 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll}; use std::{future::Future, pin::Pin, task::Context, task::Poll};
use super::{Service, ServiceFactory}; use super::{Ctx, Service, ServiceCall, ServiceFactory};
/// Service for the `then` combinator, chaining a computation onto the end of /// Service for the `then` combinator, chaining a computation onto the end of
/// another service. /// another service.
@ -60,17 +60,22 @@ where
} }
#[inline] #[inline]
fn call(&self, req: R) -> Self::Future<'_> { fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a>
where
R: 'a,
{
ThenServiceResponse { ThenServiceResponse {
slf: self, slf: self,
state: State::A { state: State::A {
fut: self.svc1.call(req), fut: ctx.call(&self.svc1, req),
ctx,
}, },
} }
} }
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ThenServiceResponse<'f, A, B, R> pub struct ThenServiceResponse<'f, A, B, R>
where where
A: Service<R>, A: Service<R>,
@ -93,8 +98,8 @@ pin_project_lite::pin_project! {
B: 'f, B: 'f,
R: 'f, R: 'f,
{ {
A { #[pin] fut: A::Future<'f> }, A { #[pin] fut: ServiceCall<'f, A, R>, ctx: Ctx<'f, Then<A, B>> },
B { #[pin] fut: B::Future<'f> }, B { #[pin] fut: ServiceCall<'f, B, Result<A::Response, A::Error>> },
Empty, Empty,
} }
} }
@ -110,9 +115,9 @@ where
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
match this.state.as_mut().project() { match this.state.as_mut().project() {
StateProject::A { fut } => match fut.poll(cx) { StateProject::A { fut, ctx } => match fut.poll(cx) {
Poll::Ready(res) => { Poll::Ready(res) => {
let fut = this.slf.svc2.call(res); let fut = ctx.call(&this.slf.svc2, res);
this.state.set(State::B { fut }); this.state.set(State::B { fut });
self.poll(cx) self.poll(cx)
} }
@ -184,6 +189,7 @@ where
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ThenFactoryResponse<'f, A, B, R, C> pub struct ThenFactoryResponse<'f, A, B, R, C>
where where
A: ServiceFactory<R, C>, A: ServiceFactory<R, C>,
@ -245,7 +251,7 @@ mod tests {
use ntex_util::future::{lazy, Ready}; use ntex_util::future::{lazy, Ready};
use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; use crate::{pipeline, pipeline_factory, Ctx, Service, ServiceFactory};
#[derive(Clone)] #[derive(Clone)]
struct Srv1(Rc<Cell<usize>>); struct Srv1(Rc<Cell<usize>>);
@ -260,7 +266,14 @@ mod tests {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&self, req: Result<&'static str, &'static str>) -> Self::Future<'_> { fn call<'a>(
&'a self,
req: Result<&'static str, &'static str>,
_: Ctx<'a, Self>,
) -> Self::Future<'a>
where
Result<&'static str, &'static str>: 'a,
{
match req { match req {
Ok(msg) => Ready::Ok(msg), Ok(msg) => Ready::Ok(msg),
Err(_) => Ready::Err(()), Err(_) => Ready::Err(()),
@ -278,10 +291,17 @@ mod tests {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1); self.0.set(self.0.get() + 1);
Poll::Ready(Err(())) Poll::Ready(Ok(()))
} }
fn call(&self, req: Result<&'static str, ()>) -> Self::Future<'_> { fn call<'a>(
&'a self,
req: Result<&'static str, ()>,
_: Ctx<'a, Self>,
) -> Self::Future<'a>
where
Result<&'static str, ()>: 'a,
{
match req { match req {
Ok(msg) => Ready::Ok((msg, "ok")), Ok(msg) => Ready::Ok((msg, "ok")),
Err(()) => Ready::Ok(("srv2", "err")), Err(()) => Ready::Ok(("srv2", "err")),
@ -294,7 +314,7 @@ mod tests {
let cnt = Rc::new(Cell::new(0)); let cnt = Rc::new(Cell::new(0));
let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone())); let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
let res = lazy(|cx| srv.poll_ready(cx)).await; let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Err(()))); assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 2); assert_eq!(cnt.get(), 2);
let res = lazy(|cx| srv.poll_shutdown(cx)).await; let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(())); assert_eq!(res, Poll::Ready(()));
@ -303,9 +323,13 @@ mod tests {
#[ntex::test] #[ntex::test]
async fn test_call() { async fn test_call() {
let cnt = Rc::new(Cell::new(0)); let cnt = Rc::new(Cell::new(0));
let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt)).clone(); let srv = pipeline(Srv1(cnt.clone()))
.then(Srv2(cnt))
.clone()
.container();
let res = srv.call(Ok("srv1")).await; let res = srv.call(Ok("srv1")).await;
println!("=========== {:?}", res);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "ok")); assert_eq!(res.unwrap(), ("srv1", "ok"));
@ -322,7 +346,7 @@ mod tests {
let factory = pipeline_factory(blank) let factory = pipeline_factory(blank)
.then(move || Ready::Ok(Srv2(cnt.clone()))) .then(move || Ready::Ok(Srv2(cnt.clone())))
.clone(); .clone();
let srv = factory.create(&()).await.unwrap(); let srv = factory.container(&()).await.unwrap();
let res = srv.call(Ok("srv1")).await; let res = srv.call(Ok("srv1")).await;
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "ok")); assert_eq!(res.unwrap(), ("srv1", "ok"));