Refactor service pipelines (#214)

This commit is contained in:
Nikolay Kim 2023-06-22 18:39:09 +06:00 committed by GitHub
parent a02009d7be
commit 6382ef6b40
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
61 changed files with 848 additions and 792 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "1.2.0-beta.4"
version = "1.2.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]
@ -20,5 +20,5 @@ pin-project-lite = "0.2.6"
slab = "0.4"
[dev-dependencies]
ntex = { version = "0.7.0-beta.1", features = ["tokio"] }
ntex-util = "0.3.0-beta.1"
ntex = { version = "0.7.0", features = ["tokio"] }
ntex-util = "0.3.0"

View file

@ -234,9 +234,7 @@ where
mod tests {
use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use crate::{
fn_factory, pipeline, pipeline_factory, Service, ServiceCtx, ServiceFactory,
};
use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
use ntex_util::future::{lazy, Ready};
#[derive(Clone)]
@ -286,9 +284,7 @@ mod tests {
#[ntex::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let srv = pipeline(Srv1(cnt.clone()))
.and_then(Srv2(cnt.clone()))
.clone();
let srv = chain(Srv1(cnt.clone())).and_then(Srv2(cnt.clone())).clone();
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 2);
@ -299,7 +295,7 @@ mod tests {
#[ntex::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt)).container();
let srv = chain(Srv1(cnt.clone())).and_then(Srv2(cnt)).pipeline();
let res = srv.call("srv1").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2"));
@ -309,13 +305,13 @@ mod tests {
async fn test_factory() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let new_srv = pipeline_factory(fn_factory(move || {
let new_srv = chain_factory(fn_factory(move || {
Ready::from(Ok::<_, ()>(Srv1(cnt2.clone())))
}))
.and_then(move || Ready::from(Ok(Srv2(cnt.clone()))))
.clone();
let srv = new_srv.container(&()).await.unwrap();
let srv = new_srv.pipeline(&()).await.unwrap();
let res = srv.call("srv1").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2"));

View file

@ -1,8 +1,8 @@
#![allow(clippy::type_complexity)]
use std::{future::Future, marker, pin::Pin, task, task::Poll};
use super::ctx::{Container, ServiceCall, ServiceCtx};
use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use super::ctx::{ServiceCall, ServiceCtx};
use super::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory};
/// Apply transform function to a service.
pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
@ -17,7 +17,7 @@ where
{
Apply {
f,
service: Container::new(service.into_service()),
service: Pipeline::new(service.into_service()),
r: marker::PhantomData,
}
}
@ -41,13 +41,13 @@ pub struct Apply<T, Req, F, R, In, Out, Err>
where
T: Service<Req, Error = Err>,
{
service: Container<T>,
service: Pipeline<T>,
f: F,
r: marker::PhantomData<fn(Req) -> (In, Out, R)>,
}
pub struct ApplyService<S> {
service: Container<S>,
service: Pipeline<S>,
}
impl<S> ApplyService<S> {
@ -56,7 +56,7 @@ impl<S> ApplyService<S> {
where
S: Service<R>,
{
self.service.call(req)
self.service.service_call(req)
}
}
@ -212,7 +212,7 @@ mod tests {
use std::task::Poll;
use super::*;
use crate::{pipeline, pipeline_factory, Service, ServiceCtx, ServiceFactory};
use crate::{chain, chain_factory, Service, ServiceCtx};
#[derive(Clone)]
struct Srv;
@ -229,14 +229,14 @@ mod tests {
#[ntex::test]
async fn test_call() {
let srv = pipeline(
let srv = chain(
apply_fn(Srv, |req: &'static str, svc| async move {
svc.call(()).await.unwrap();
Ok((req, ()))
})
.clone(),
)
.container();
.pipeline();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
@ -249,7 +249,7 @@ mod tests {
#[ntex::test]
async fn test_create() {
let new_srv = pipeline_factory(
let new_srv = chain_factory(
apply_fn_factory(
|| Ready::<_, ()>::Ok(Srv),
|req: &'static str, srv| async move {
@ -260,7 +260,7 @@ mod tests {
.clone(),
);
let srv = new_srv.container(&()).await.unwrap();
let srv = new_srv.pipeline(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

View file

@ -73,7 +73,7 @@ where
idx: usize,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, Self::Response, Self::Error> {
Box::pin(ServiceCtx::<'a, S>::new(idx, waiters).call_nowait(self, req))
Box::pin(ServiceCtx::<'a, S>::from_ref(idx, waiters).call_nowait(self, req))
}
}

291
ntex-service/src/chain.rs Normal file
View file

@ -0,0 +1,291 @@
use std::marker::PhantomData;
use crate::and_then::{AndThen, AndThenFactory};
use crate::ctx::{ServiceCall, ServiceCtx};
use crate::map::{Map, MapFactory};
use crate::map_err::{MapErr, MapErrFactory};
use crate::map_init_err::MapInitErr;
use crate::middleware::{ApplyMiddleware, Middleware};
use crate::pipeline::CreatePipeline;
use crate::then::{Then, ThenFactory};
use crate::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory};
/// Constructs new pipeline with one service in pipeline chain.
pub fn chain<Svc, Req, F>(service: F) -> ServiceChain<Svc, Req>
where
Svc: Service<Req>,
F: IntoService<Svc, Req>,
{
ServiceChain {
service: service.into_service(),
_t: PhantomData,
}
}
/// Constructs new pipeline factory with one service factory.
pub fn chain_factory<T, R, C, F>(factory: F) -> ServiceChainFactory<T, R, C>
where
T: ServiceFactory<R, C>,
F: IntoServiceFactory<T, R, C>,
{
ServiceChainFactory {
factory: factory.into_factory(),
_t: PhantomData,
}
}
/// Pipeline builder - pipeline allows to compose multiple service into one service.
pub struct ServiceChain<Svc, Req> {
service: Svc,
_t: PhantomData<Req>,
}
impl<Svc: Service<Req>, Req> ServiceChain<Svc, Req> {
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that
/// the second service isn't called until call to the fist service have
/// finished. Result of the call to the first service is used as an
/// input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
pub fn and_then<Next, F>(self, service: F) -> ServiceChain<AndThen<Svc, Next>, Req>
where
Self: Sized,
F: IntoService<Next, Svc::Response>,
Next: Service<Svc::Response, Error = Svc::Error>,
{
ServiceChain {
service: AndThen::new(self.service, service.into_service()),
_t: PhantomData,
}
}
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `U`.
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<Next, F>(self, service: F) -> ServiceChain<Then<Svc, Next>, Req>
where
Self: Sized,
F: IntoService<Next, Result<Svc::Response, Svc::Error>>,
Next: Service<Result<Svc::Response, Svc::Error>, Error = Svc::Error>,
{
ServiceChain {
service: Then::new(self.service, service.into_service()),
_t: PhantomData,
}
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
///
/// This function is similar to the `Option::map` or `Iterator::map` where
/// it will change the type of the underlying service.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
pub fn map<F, Res>(self, f: F) -> ServiceChain<Map<Svc, F, Req, Res>, Req>
where
Self: Sized,
F: Fn(Svc::Response) -> Res,
{
ServiceChain {
service: Map::new(self.service, f),
_t: PhantomData,
}
}
/// Map this service's error to a different error, returning a new service.
///
/// This function is similar to the `Result::map_err` where it will change
/// the error type of the underlying service. This is useful for example to
/// ensure that services have the same error type.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
pub fn map_err<F, Err>(self, f: F) -> ServiceChain<MapErr<Svc, F, Err>, Req>
where
Self: Sized,
F: Fn(Svc::Error) -> Err,
{
ServiceChain {
service: MapErr::new(self.service, f),
_t: PhantomData,
}
}
/// Create service pipeline
pub fn pipeline(self) -> Pipeline<Svc> {
Pipeline::new(self.service)
}
}
impl<Svc, Req> Clone for ServiceChain<Svc, Req>
where
Svc: Clone,
{
fn clone(&self) -> Self {
ServiceChain {
service: self.service.clone(),
_t: PhantomData,
}
}
}
impl<Svc: Service<Req>, Req> Service<Req> for ServiceChain<Svc, Req> {
type Response = Svc::Response;
type Error = Svc::Error;
type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
#[inline]
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ctx.call(&self.service, req)
}
}
/// Service factory builder
pub struct ServiceChainFactory<T, Req, C = ()> {
factory: T,
_t: PhantomData<(Req, C)>,
}
impl<T: ServiceFactory<Req, C>, Req, C> ServiceChainFactory<T, Req, C> {
/// Call another service after call to this one has resolved successfully.
pub fn and_then<F, U>(
self,
factory: F,
) -> ServiceChainFactory<AndThenFactory<T, U>, Req, C>
where
Self: Sized,
F: IntoServiceFactory<U, T::Response, C>,
U: ServiceFactory<T::Response, C, Error = T::Error, InitError = T::InitError>,
{
ServiceChainFactory {
factory: AndThenFactory::new(self.factory, factory.into_factory()),
_t: PhantomData,
}
}
/// Apply middleware to current service factory.
///
/// Short version of `apply(middleware, pipeline_factory(...))`
pub fn apply<U>(self, tr: U) -> ServiceChainFactory<ApplyMiddleware<U, T, C>, Req, C>
where
U: Middleware<T::Service>,
{
ServiceChainFactory {
factory: ApplyMiddleware::new(tr, self.factory),
_t: PhantomData,
}
}
/// Create `NewService` to chain on a computation for when a call to the
/// service finished, passing the result of the call to the next
/// service `U`.
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<F, U>(self, factory: F) -> ServiceChainFactory<ThenFactory<T, U>, Req, C>
where
Self: Sized,
C: Clone,
F: IntoServiceFactory<U, Result<T::Response, T::Error>, C>,
U: ServiceFactory<
Result<T::Response, T::Error>,
C,
Error = T::Error,
InitError = T::InitError,
>,
{
ServiceChainFactory {
factory: ThenFactory::new(self.factory, factory.into_factory()),
_t: PhantomData,
}
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
pub fn map<F, Res>(
self,
f: F,
) -> ServiceChainFactory<MapFactory<T, F, Req, Res, C>, Req, C>
where
Self: Sized,
F: Fn(T::Response) -> Res + Clone,
{
ServiceChainFactory {
factory: MapFactory::new(self.factory, f),
_t: PhantomData,
}
}
/// Map this service's error to a different error, returning a new service.
pub fn map_err<F, E>(
self,
f: F,
) -> ServiceChainFactory<MapErrFactory<T, Req, C, F, E>, Req, C>
where
Self: Sized,
F: Fn(T::Error) -> E + Clone,
{
ServiceChainFactory {
factory: MapErrFactory::new(self.factory, f),
_t: PhantomData,
}
}
/// Map this factory's init error to a different error, returning a new service.
pub fn map_init_err<F, E>(
self,
f: F,
) -> ServiceChainFactory<MapInitErr<T, Req, C, F, E>, Req, C>
where
Self: Sized,
F: Fn(T::InitError) -> E + Clone,
{
ServiceChainFactory {
factory: MapInitErr::new(self.factory, f),
_t: PhantomData,
}
}
/// Create and return a new service value asynchronously and wrap into a container
pub fn pipeline(&self, cfg: C) -> CreatePipeline<'_, T, Req, C>
where
Self: Sized,
{
CreatePipeline::new(self.factory.create(cfg))
}
}
impl<T, R, C> Clone for ServiceChainFactory<T, R, C>
where
T: Clone,
{
fn clone(&self) -> Self {
ServiceChainFactory {
factory: self.factory.clone(),
_t: PhantomData,
}
}
}
impl<T: ServiceFactory<R, C>, R, C> ServiceFactory<R, C> for ServiceChainFactory<T, R, C> {
type Response = T::Response;
type Error = T::Error;
type Service = T::Service;
type InitError = T::InitError;
type Future<'f> = T::Future<'f> where Self: 'f;
#[inline]
fn create(&self, cfg: C) -> Self::Future<'_> {
self.factory.create(cfg)
}
}

View file

@ -1,15 +1,6 @@
use std::{cell::Cell, cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task};
use std::{cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task};
use crate::{Service, ServiceFactory};
/// Container for a service.
///
/// Container allows to call enclosed service and adds support of shared readiness.
pub struct Container<S> {
svc: Rc<S>,
waiters: Waiters,
pending: Cell<bool>,
}
use crate::Service;
pub struct ServiceCtx<'a, S: ?Sized> {
idx: usize,
@ -53,11 +44,24 @@ impl WaitersRef {
}
impl Waiters {
fn register(&self, cx: &mut task::Context<'_>) {
pub(crate) fn new() -> Self {
let mut waiters = slab::Slab::new();
let index = waiters.insert(None);
Waiters {
index,
waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))),
}
}
pub(crate) fn get_ref(&self) -> &WaitersRef {
self.waiters.as_ref()
}
pub(crate) fn register(&self, cx: &mut task::Context<'_>) {
self.waiters.register(self.index, cx)
}
fn notify(&self) {
pub(crate) fn notify(&self) {
self.waiters.notify()
}
}
@ -78,132 +82,16 @@ impl Drop for Waiters {
}
}
impl<S> Container<S> {
#[inline]
/// Construct new container instance.
pub fn new(svc: S) -> Self {
let mut waiters = slab::Slab::new();
let index = waiters.insert(None);
Container {
svc: Rc::new(svc),
pending: Cell::new(false),
waiters: Waiters {
index,
waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))),
},
}
}
#[inline]
/// Return reference to enclosed service
pub fn get_ref(&self) -> &S {
self.svc.as_ref()
}
#[inline]
/// Returns `Ready` when the service is able to process requests.
pub fn poll_ready<R>(
&self,
cx: &mut task::Context<'_>,
) -> task::Poll<Result<(), S::Error>>
where
S: Service<R>,
{
let res = self.svc.poll_ready(cx);
if res.is_pending() {
self.pending.set(true);
self.waiters.register(cx)
} else if self.pending.get() {
self.pending.set(false);
self.waiters.notify()
}
res
}
#[inline]
/// Shutdown enclosed service.
pub fn poll_shutdown<R>(&self, cx: &mut task::Context<'_>) -> task::Poll<()>
where
S: Service<R>,
{
self.svc.poll_shutdown(cx)
}
#[inline]
/// Wait for service readiness and then create future object
/// that resolves to service result.
pub fn call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R>
where
S: Service<R>,
{
let ctx = ServiceCtx::<'a, S> {
idx: self.waiters.index,
waiters: self.waiters.waiters.as_ref(),
_t: marker::PhantomData,
};
ctx.call(self.svc.as_ref(), req)
}
#[inline]
/// Call service and create future object that resolves to service result.
///
/// Note, this call does not check service readiness.
pub fn container_call<R>(&self, req: R) -> ContainerCall<'_, S, R>
where
S: Service<R>,
{
let container = self.clone();
let svc_call = container.svc.call(
req,
ServiceCtx {
idx: container.waiters.index,
waiters: container.waiters.waiters.as_ref(),
_t: marker::PhantomData,
},
);
// SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc`
// Container::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
ContainerCall { fut, container }
}
pub(crate) fn create<F: ServiceFactory<R, C>, R, C>(
f: &F,
cfg: C,
) -> ContainerFactory<'_, F, R, C> {
ContainerFactory { fut: f.create(cfg) }
}
/// Extract service if container hadnt been cloned before.
pub fn into_service(self) -> Option<S> {
let svc = self.svc.clone();
drop(self);
Rc::try_unwrap(svc).ok()
}
}
impl<S> From<S> for Container<S> {
#[inline]
fn from(svc: S) -> Self {
Container::new(svc)
}
}
impl<S> Clone for Container<S> {
#[inline]
fn clone(&self) -> Self {
Self {
svc: self.svc.clone(),
pending: Cell::new(false),
waiters: self.waiters.clone(),
}
}
}
impl<'a, S: ?Sized> ServiceCtx<'a, S> {
pub(crate) fn new(idx: usize, waiters: &'a WaitersRef) -> Self {
pub(crate) fn new(waiters: &'a Waiters) -> Self {
Self {
idx: waiters.index,
waiters: waiters.get_ref(),
_t: marker::PhantomData,
}
}
pub(crate) fn from_ref(idx: usize, waiters: &'a WaitersRef) -> Self {
Self {
idx,
waiters,
@ -264,54 +152,6 @@ impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> {
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ContainerCall<'f, S, R>
where
S: Service<R>,
S: 'f,
R: 'f,
{
#[pin]
fut: S::Future<'f>,
container: Container<S>,
}
}
impl<'f, S, R> ContainerCall<'f, S, R>
where
S: Service<R> + 'f,
R: 'f,
{
#[inline]
/// Convert future object to static version.
///
/// Returned future is suitable for spawning into a async runtime.
/// Note, this call does not check service readiness.
pub fn into_static(self) -> ContainerCall<'static, S, R> {
let svc_call = self.fut;
let container = self.container;
// SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc`
// Container::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
ContainerCall { fut, container }
}
}
impl<'f, S, R> Future for ContainerCall<'f, S, R>
where
S: Service<R>,
{
type Output = Result<S::Response, S::Error>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ServiceCall<'a, S, Req>
@ -394,33 +234,6 @@ where
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ContainerFactory<'f, F, R, C>
where F: ServiceFactory<R, C>,
F: ?Sized,
F: 'f,
C: 'f,
{
#[pin]
fut: F::Future<'f>,
}
}
impl<'f, F, R, C> Future for ContainerFactory<'f, F, R, C>
where
F: ServiceFactory<R, C> + 'f,
{
type Output = Result<Container<F::Service>, F::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
task::Poll::Ready(Ok(Container::new(task::ready!(self
.project()
.fut
.poll(cx))?)))
}
}
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, poll_fn, Ready};
@ -428,6 +241,7 @@ mod tests {
use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll};
use super::*;
use crate::Pipeline;
struct Srv(Rc<Cell<usize>>, condition::Waiter);
@ -455,7 +269,7 @@ mod tests {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv1 = Container::from(Srv(cnt.clone(), con.wait()));
let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait()));
let srv2 = srv1.clone();
let res = lazy(|cx| srv1.poll_ready(cx)).await;
@ -484,19 +298,19 @@ mod tests {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv1 = Container::from(Srv(cnt.clone(), con.wait()));
let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait()));
let srv2 = srv1.clone();
let data1 = data.clone();
ntex::rt::spawn(async move {
let _ = poll_fn(|cx| srv1.poll_ready(cx)).await;
let i = srv1.container_call("srv1").await.unwrap();
let i = srv1.call("srv1").await.unwrap();
data1.borrow_mut().push(i);
});
let data2 = data.clone();
ntex::rt::spawn(async move {
let i = srv2.call("srv2").await.unwrap();
let i = srv2.service_call("srv2").await.unwrap();
data2.borrow_mut().push(i);
});
time::sleep(time::Millis(50)).await;

View file

@ -40,7 +40,7 @@ where
/// });
///
/// // construct new service
/// let srv = factory.container(&()).await?;
/// let srv = factory.pipeline(&()).await?;
///
/// // now we can use `div` service
/// let result = srv.call((10, 20)).await?;
@ -81,7 +81,7 @@ where
/// });
///
/// // construct new service with config argument
/// let srv = factory.container(&10).await?;
/// let srv = factory.pipeline(&10).await?;
///
/// let result = srv.call(10).await?;
/// assert_eq!(result, 100);
@ -348,19 +348,19 @@ mod tests {
use std::task::Poll;
use super::*;
use crate::{Container, ServiceFactory};
use crate::{Pipeline, ServiceFactory};
#[ntex::test]
async fn test_fn_service() {
let new_srv = fn_service(|()| async { Ok::<_, ()>("srv") }).clone();
let srv = Container::new(new_srv.create(()).await.unwrap());
let srv = Pipeline::new(new_srv.create(()).await.unwrap());
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv");
let srv2 = Container::new(new_srv.clone());
let srv2 = Pipeline::new(new_srv.clone());
let res = srv2.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv");
@ -370,7 +370,7 @@ mod tests {
#[ntex::test]
async fn test_fn_service_service() {
let srv = Container::new(
let srv = Pipeline::new(
fn_service(|()| async { Ok::<_, ()>("srv") })
.clone()
.create(&())
@ -398,7 +398,7 @@ mod tests {
})
.clone();
let srv = Container::new(new_srv.create(&1).await.unwrap());
let srv = Pipeline::new(new_srv.create(&1).await.unwrap());
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());

View file

@ -70,7 +70,7 @@ mod tests {
use ntex_util::future::lazy;
use std::{rc::Rc, task::Poll};
use crate::{fn_service, pipeline, Container};
use crate::{chain, fn_service, Pipeline};
use super::*;
@ -83,7 +83,7 @@ mod tests {
is_called2.set(true);
});
let pipe = Container::new(pipeline(srv).and_then(on_shutdown).clone());
let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone());
let res = pipe.call(()).await;
assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(())));

View file

@ -9,6 +9,7 @@ use std::task::{self, Context, Poll};
mod and_then;
mod apply;
pub mod boxed;
mod chain;
mod ctx;
mod fn_service;
mod fn_shutdown;
@ -22,12 +23,13 @@ mod pipeline;
mod then;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::ctx::{Container, ContainerCall, ContainerFactory, ServiceCall, ServiceCtx};
pub use self::chain::{chain, chain_factory};
pub use self::ctx::{ServiceCall, ServiceCtx};
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::fn_shutdown::fn_shutdown;
pub use self::map_config::{map_config, unit_config};
pub use self::middleware::{apply, Identity, Middleware, Stack};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
pub use self::pipeline::{Pipeline, PipelineCall};
#[allow(unused_variables)]
/// An asynchronous function of `Request` to a `Response`.
@ -141,12 +143,12 @@ pub trait Service<Req> {
///
/// Note that this function consumes the receiving service and returns a wrapped version of it,
/// similar to the existing `map` methods in the standard library.
fn map<F, Res>(self, f: F) -> crate::dev::Map<Self, F, Req, Res>
fn map<F, Res>(self, f: F) -> dev::ServiceChain<dev::Map<Self, F, Req, Res>, Req>
where
Self: Sized,
F: Fn(Self::Response) -> Res,
{
crate::dev::Map::new(self, f)
chain(dev::Map::new(self, f))
}
#[inline]
@ -157,12 +159,21 @@ pub trait Service<Req> {
/// error type.
///
/// Note that this function consumes the receiving service and returns a wrapped version of it.
fn map_err<F, E>(self, f: F) -> crate::dev::MapErr<Self, F, E>
fn map_err<F, E>(self, f: F) -> dev::ServiceChain<dev::MapErr<Self, F, E>, Req>
where
Self: Sized,
F: Fn(Self::Error) -> E,
{
crate::dev::MapErr::new(self, f)
chain(dev::MapErr::new(self, f))
}
#[inline]
/// Convert `Self` to a `ServiceChain`
fn chain(self) -> dev::ServiceChain<Self, Req>
where
Self: Sized,
{
chain(self)
}
}
@ -200,32 +211,38 @@ pub trait ServiceFactory<Req, Cfg = ()> {
fn create(&self, cfg: Cfg) -> Self::Future<'_>;
/// Create and return a new service value asynchronously and wrap into a container
fn container(&self, cfg: Cfg) -> ContainerFactory<'_, Self, Req, Cfg>
fn pipeline(&self, cfg: Cfg) -> dev::CreatePipeline<'_, Self, Req, Cfg>
where
Self: Sized,
{
Container::<Self::Service>::create(self, cfg)
dev::CreatePipeline::new(self.create(cfg))
}
#[inline]
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
fn map<F, Res>(self, f: F) -> crate::map::MapFactory<Self, F, Req, Res, Cfg>
fn map<F, Res>(
self,
f: F,
) -> dev::ServiceChainFactory<dev::MapFactory<Self, F, Req, Res, Cfg>, Req, Cfg>
where
Self: Sized,
F: Fn(Self::Response) -> Res + Clone,
{
crate::map::MapFactory::new(self, f)
chain_factory(dev::MapFactory::new(self, f))
}
#[inline]
/// Map this service's error to a different error, returning a new service.
fn map_err<F, E>(self, f: F) -> crate::map_err::MapErrFactory<Self, Req, Cfg, F, E>
fn map_err<F, E>(
self,
f: F,
) -> dev::ServiceChainFactory<dev::MapErrFactory<Self, Req, Cfg, F, E>, Req, Cfg>
where
Self: Sized,
F: Fn(Self::Error) -> E + Clone,
{
crate::map_err::MapErrFactory::new(self, f)
chain_factory(dev::MapErrFactory::new(self, f))
}
#[inline]
@ -233,12 +250,12 @@ pub trait ServiceFactory<Req, Cfg = ()> {
fn map_init_err<F, E>(
self,
f: F,
) -> crate::map_init_err::MapInitErr<Self, Req, Cfg, F, E>
) -> dev::ServiceChainFactory<dev::MapInitErr<Self, Req, Cfg, F, E>, Req, Cfg>
where
Self: Sized,
F: Fn(Self::InitError) -> E + Clone,
{
crate::map_init_err::MapInitErr::new(self, f)
chain_factory(dev::MapInitErr::new(self, f))
}
}
@ -312,6 +329,15 @@ where
{
/// Convert to a `Service`
fn into_service(self) -> Svc;
#[inline]
/// Convert `Self` to a `ServiceChain`
fn into_chain(self) -> dev::ServiceChain<Svc, Req>
where
Self: Sized,
{
chain(self)
}
}
/// Trait for types that can be converted to a `ServiceFactory`
@ -321,12 +347,22 @@ where
{
/// Convert `Self` to a `ServiceFactory`
fn into_factory(self) -> T;
#[inline]
/// Convert `Self` to a `ServiceChainFactory`
fn chain(self) -> dev::ServiceChainFactory<T, Req, Cfg>
where
Self: Sized,
{
chain_factory(self)
}
}
impl<Svc, Req> IntoService<Svc, Req> for Svc
where
Svc: Service<Req>,
{
#[inline]
fn into_service(self) -> Svc {
self
}
@ -336,6 +372,7 @@ impl<T, Req, Cfg> IntoServiceFactory<T, Req, Cfg> for T
where
T: ServiceFactory<Req, Cfg>,
{
#[inline]
fn into_factory(self) -> T {
self
}
@ -353,6 +390,7 @@ where
pub mod dev {
pub use crate::and_then::{AndThen, AndThenFactory};
pub use crate::apply::{Apply, ApplyFactory, ApplyService};
pub use crate::chain::{ServiceChain, ServiceChainFactory};
pub use crate::fn_service::{
FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig,
};
@ -362,5 +400,6 @@ pub mod dev {
pub use crate::map_err::{MapErr, MapErrFactory};
pub use crate::map_init_err::MapInitErr;
pub use crate::middleware::ApplyMiddleware;
pub use crate::pipeline::CreatePipeline;
pub use crate::then::{Then, ThenFactory};
}

View file

@ -192,7 +192,7 @@ mod tests {
use ntex_util::future::{lazy, Ready};
use super::*;
use crate::{fn_factory, Container, Service, ServiceCtx, ServiceFactory};
use crate::{fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory};
#[derive(Clone)]
struct Srv;
@ -213,7 +213,7 @@ mod tests {
#[ntex::test]
async fn test_service() {
let srv = Container::new(Srv.map(|_| "ok").clone());
let srv = Pipeline::new(Srv.map(|_| "ok").clone());
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), "ok");
@ -227,7 +227,7 @@ mod tests {
#[ntex::test]
async fn test_pipeline() {
let srv = Container::new(crate::pipeline(Srv).map(|_| "ok").clone());
let srv = Pipeline::new(crate::chain(Srv).map(|_| "ok").clone());
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), "ok");
@ -244,7 +244,7 @@ mod tests {
let new_srv = fn_factory(|| async { Ok::<_, ()>(Srv) })
.map(|_| "ok")
.clone();
let srv = Container::new(new_srv.create(&()).await.unwrap());
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("ok"));
@ -252,10 +252,10 @@ mod tests {
#[ntex::test]
async fn test_pipeline_factory() {
let new_srv = crate::pipeline_factory(fn_factory(|| async { Ok::<_, ()>(Srv) }))
let new_srv = crate::chain_factory(fn_factory(|| async { Ok::<_, ()>(Srv) }))
.map(|_| "ok")
.clone();
let srv = Container::new(new_srv.create(&()).await.unwrap());
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("ok"));

View file

@ -196,7 +196,7 @@ mod tests {
use ntex_util::future::{lazy, Ready};
use super::*;
use crate::{fn_factory, Container, Service, ServiceCtx, ServiceFactory};
use crate::{fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory};
#[derive(Clone)]
struct Srv(bool);
@ -231,7 +231,7 @@ mod tests {
#[ntex::test]
async fn test_service() {
let srv = Container::new(Srv(false).map_err(|_| "error").clone());
let srv = Pipeline::new(Srv(false).map_err(|_| "error").clone());
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
@ -239,7 +239,7 @@ mod tests {
#[ntex::test]
async fn test_pipeline() {
let srv = Container::new(crate::pipeline(Srv(false)).map_err(|_| "error").clone());
let srv = Pipeline::new(crate::chain(Srv(false)).map_err(|_| "error").clone());
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
@ -250,7 +250,7 @@ mod tests {
let new_srv = fn_factory(|| Ready::<_, ()>::Ok(Srv(false)))
.map_err(|_| "error")
.clone();
let srv = Container::new(new_srv.create(&()).await.unwrap());
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
@ -259,10 +259,10 @@ mod tests {
#[ntex::test]
async fn test_pipeline_factory() {
let new_srv =
crate::pipeline_factory(fn_factory(|| async { Ok::<Srv, ()>(Srv(false)) }))
crate::chain_factory(fn_factory(|| async { Ok::<Srv, ()>(Srv(false)) }))
.map_err(|_| "error")
.clone();
let srv = Container::new(new_srv.create(&()).await.unwrap());
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");

View file

@ -89,11 +89,11 @@ where
#[cfg(test)]
mod tests {
use crate::{fn_factory_with_config, into_service, pipeline_factory, ServiceFactory};
use crate::{chain_factory, fn_factory_with_config, into_service, ServiceFactory};
#[ntex::test]
async fn map_init_err() {
let factory = pipeline_factory(fn_factory_with_config(|err: &bool| {
let factory = chain_factory(fn_factory_with_config(|err: &bool| {
let err = *err;
async move {
if err {

View file

@ -214,7 +214,7 @@ mod tests {
use std::marker;
use super::*;
use crate::{fn_service, Container, Service, ServiceCall, ServiceCtx, ServiceFactory};
use crate::{fn_service, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory};
#[derive(Clone)]
struct Tr<R>(marker::PhantomData<R>);
@ -252,7 +252,7 @@ mod tests {
)
.clone();
let srv = Container::new(factory.create(&()).await.unwrap().clone());
let srv = Pipeline::new(factory.create(&()).await.unwrap().clone());
let res = srv.call(10).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), 20);
@ -264,11 +264,11 @@ mod tests {
assert_eq!(res, Poll::Ready(()));
let factory =
crate::pipeline_factory(fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2)))
crate::chain_factory(fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2)))
.apply(Rc::new(Tr(marker::PhantomData).clone()))
.clone();
let srv = Container::new(factory.create(&()).await.unwrap().clone());
let srv = Pipeline::new(factory.create(&()).await.unwrap().clone());
let res = srv.call(10).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), 20);

View file

@ -1,278 +1,194 @@
use std::marker::PhantomData;
use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll};
use crate::and_then::{AndThen, AndThenFactory};
use crate::ctx::{Container, ServiceCall, ServiceCtx};
use crate::map::{Map, MapFactory};
use crate::map_err::{MapErr, MapErrFactory};
use crate::map_init_err::MapInitErr;
use crate::middleware::{ApplyMiddleware, Middleware};
use crate::then::{Then, ThenFactory};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use crate::ctx::{ServiceCall, ServiceCtx, Waiters};
use crate::{Service, ServiceFactory};
/// Constructs new pipeline with one service in pipeline chain.
pub fn pipeline<Svc, Req, F>(service: F) -> Pipeline<Req, Svc>
where
Svc: Service<Req>,
F: IntoService<Svc, Req>,
{
Pipeline {
service: service.into_service(),
_t: PhantomData,
}
/// Container for a service.
///
/// Container allows to call enclosed service and adds support of shared readiness.
pub struct Pipeline<S> {
svc: Rc<S>,
waiters: Waiters,
pending: Cell<bool>,
}
/// Constructs new pipeline factory with one service factory.
pub fn pipeline_factory<T, R, C, F>(factory: F) -> PipelineFactory<R, T, C>
where
T: ServiceFactory<R, C>,
F: IntoServiceFactory<T, R, C>,
{
PipelineFactory {
factory: factory.into_factory(),
_t: PhantomData,
}
}
/// Pipeline service - pipeline allows to compose multiple service into one service.
pub struct Pipeline<Req, Svc> {
service: Svc,
_t: PhantomData<Req>,
}
impl<Req, Svc: Service<Req>> Pipeline<Req, Svc> {
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that
/// the second service isn't called until call to the fist service have
/// finished. Result of the call to the first service is used as an
/// input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
pub fn and_then<Next, F>(self, service: F) -> Pipeline<Req, AndThen<Svc, Next>>
where
Self: Sized,
F: IntoService<Next, Svc::Response>,
Next: Service<Svc::Response, Error = Svc::Error>,
{
impl<S> Pipeline<S> {
#[inline]
/// Construct new container instance.
pub fn new(svc: S) -> Self {
Pipeline {
service: AndThen::new(self.service, service.into_service()),
_t: PhantomData,
svc: Rc::new(svc),
pending: Cell::new(false),
waiters: Waiters::new(),
}
}
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `U`.
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<Next, F>(self, service: F) -> Pipeline<Req, Then<Svc, Next>>
where
Self: Sized,
F: IntoService<Next, Result<Svc::Response, Svc::Error>>,
Next: Service<Result<Svc::Response, Svc::Error>, Error = Svc::Error>,
{
Pipeline {
service: Then::new(self.service, service.into_service()),
_t: PhantomData,
}
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
///
/// This function is similar to the `Option::map` or `Iterator::map` where
/// it will change the type of the underlying service.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
pub fn map<F, Res>(self, f: F) -> Pipeline<Req, Map<Svc, F, Req, Res>>
where
Self: Sized,
F: Fn(Svc::Response) -> Res,
{
Pipeline {
service: Map::new(self.service, f),
_t: PhantomData,
}
}
/// Map this service's error to a different error, returning a new service.
///
/// This function is similar to the `Result::map_err` where it will change
/// the error type of the underlying service. This is useful for example to
/// ensure that services have the same error type.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
pub fn map_err<F, Err>(self, f: F) -> Pipeline<Req, MapErr<Svc, F, Err>>
where
Self: Sized,
F: Fn(Svc::Error) -> Err,
{
Pipeline {
service: MapErr::new(self.service, f),
_t: PhantomData,
}
}
/// Create service container
pub fn container(self) -> Container<Svc> {
Container::new(self.service)
}
}
impl<Req, Svc> Clone for Pipeline<Req, Svc>
where
Svc: Clone,
{
fn clone(&self) -> Self {
Pipeline {
service: self.service.clone(),
_t: PhantomData,
}
}
}
impl<Req, Svc: Service<Req>> Service<Req> for Pipeline<Req, Svc> {
type Response = Svc::Response;
type Error = Svc::Error;
type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
#[inline]
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ctx.call(&self.service, req)
/// Return reference to enclosed service
pub fn get_ref(&self) -> &S {
self.svc.as_ref()
}
}
/// Pipeline factory
pub struct PipelineFactory<Req, T, C = ()> {
factory: T,
_t: PhantomData<(Req, C)>,
}
impl<Req, T: ServiceFactory<Req, C>, C> PipelineFactory<Req, T, C> {
/// Call another service after call to this one has resolved successfully.
pub fn and_then<F, U>(self, factory: F) -> PipelineFactory<Req, AndThenFactory<T, U>, C>
where
Self: Sized,
F: IntoServiceFactory<U, T::Response, C>,
U: ServiceFactory<T::Response, C, Error = T::Error, InitError = T::InitError>,
{
PipelineFactory {
factory: AndThenFactory::new(self.factory, factory.into_factory()),
_t: PhantomData,
}
}
/// Apply middleware to current service factory.
///
/// Short version of `apply(middleware, pipeline_factory(...))`
pub fn apply<U>(self, tr: U) -> PipelineFactory<Req, ApplyMiddleware<U, T, C>, C>
where
U: Middleware<T::Service>,
{
PipelineFactory {
factory: ApplyMiddleware::new(tr, self.factory),
_t: PhantomData,
}
}
/// Create `NewService` to chain on a computation for when a call to the
/// service finished, passing the result of the call to the next
/// service `U`.
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<F, U>(self, factory: F) -> PipelineFactory<Req, ThenFactory<T, U>, C>
where
Self: Sized,
C: Clone,
F: IntoServiceFactory<U, Result<T::Response, T::Error>, C>,
U: ServiceFactory<
Result<T::Response, T::Error>,
C,
Error = T::Error,
InitError = T::InitError,
>,
{
PipelineFactory {
factory: ThenFactory::new(self.factory, factory.into_factory()),
_t: PhantomData,
}
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
pub fn map<F, Res>(self, f: F) -> PipelineFactory<Req, MapFactory<T, F, Req, Res, C>, C>
where
Self: Sized,
F: Fn(T::Response) -> Res + Clone,
{
PipelineFactory {
factory: MapFactory::new(self.factory, f),
_t: PhantomData,
}
}
/// Map this service's error to a different error, returning a new service.
pub fn map_err<F, E>(
self,
f: F,
) -> PipelineFactory<Req, MapErrFactory<T, Req, C, F, E>, C>
where
Self: Sized,
F: Fn(T::Error) -> E + Clone,
{
PipelineFactory {
factory: MapErrFactory::new(self.factory, f),
_t: PhantomData,
}
}
/// Map this factory's init error to a different error, returning a new service.
pub fn map_init_err<F, E>(
self,
f: F,
) -> PipelineFactory<Req, MapInitErr<T, Req, C, F, E>, C>
where
Self: Sized,
F: Fn(T::InitError) -> E + Clone,
{
PipelineFactory {
factory: MapInitErr::new(self.factory, f),
_t: PhantomData,
}
}
}
impl<Req, T, C> Clone for PipelineFactory<Req, T, C>
where
T: Clone,
{
fn clone(&self) -> Self {
PipelineFactory {
factory: self.factory.clone(),
_t: PhantomData,
}
}
}
impl<Req, T: ServiceFactory<Req, C>, C> ServiceFactory<Req, C>
for PipelineFactory<Req, T, C>
{
type Response = T::Response;
type Error = T::Error;
type Service = T::Service;
type InitError = T::InitError;
type Future<'f> = T::Future<'f> where Self: 'f;
#[inline]
fn create(&self, cfg: C) -> Self::Future<'_> {
self.factory.create(cfg)
/// Returns `Ready` when the service is able to process requests.
pub fn poll_ready<R>(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>
where
S: Service<R>,
{
let res = self.svc.poll_ready(cx);
if res.is_pending() {
self.pending.set(true);
self.waiters.register(cx)
} else if self.pending.get() {
self.pending.set(false);
self.waiters.notify()
}
res
}
#[inline]
/// Shutdown enclosed service.
pub fn poll_shutdown<R>(&self, cx: &mut Context<'_>) -> Poll<()>
where
S: Service<R>,
{
self.svc.poll_shutdown(cx)
}
#[inline]
/// Wait for service readiness and then create future object
/// that resolves to service result.
pub fn service_call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R>
where
S: Service<R>,
{
ServiceCtx::<'a, S>::new(&self.waiters).call(self.svc.as_ref(), req)
}
#[inline]
/// Call service and create future object that resolves to service result.
///
/// Note, this call does not check service readiness.
pub fn call<R>(&self, req: R) -> PipelineCall<'_, S, R>
where
S: Service<R>,
{
let pipeline = self.clone();
let svc_call = pipeline.svc.call(req, ServiceCtx::new(&pipeline.waiters));
// SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
PipelineCall { fut, pipeline }
}
/// Extract service if container hadnt been cloned before.
pub fn into_service(self) -> Option<S> {
let svc = self.svc.clone();
drop(self);
Rc::try_unwrap(svc).ok()
}
}
impl<S> From<S> for Pipeline<S> {
#[inline]
fn from(svc: S) -> Self {
Pipeline::new(svc)
}
}
impl<S> Clone for Pipeline<S> {
#[inline]
fn clone(&self) -> Self {
Self {
svc: self.svc.clone(),
pending: Cell::new(false),
waiters: self.waiters.clone(),
}
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct PipelineCall<'f, S, R>
where
S: Service<R>,
S: 'f,
R: 'f,
{
#[pin]
fut: S::Future<'f>,
pipeline: Pipeline<S>,
}
}
impl<'f, S, R> PipelineCall<'f, S, R>
where
S: Service<R> + 'f,
R: 'f,
{
#[inline]
/// Convert future object to static version.
///
/// Returned future is suitable for spawning into a async runtime.
/// Note, this call does not check service readiness.
pub fn into_static(self) -> PipelineCall<'static, S, R> {
let svc_call = self.fut;
let pipeline = self.pipeline;
// SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
PipelineCall { fut, pipeline }
}
}
impl<'f, S, R> future::Future for PipelineCall<'f, S, R>
where
S: Service<R>,
{
type Output = Result<S::Response, S::Error>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct CreatePipeline<'f, F, R, C>
where F: ServiceFactory<R, C>,
F: ?Sized,
F: 'f,
C: 'f,
{
#[pin]
fut: F::Future<'f>,
}
}
impl<'f, F, R, C> CreatePipeline<'f, F, R, C>
where
F: ServiceFactory<R, C> + 'f,
{
pub(crate) fn new(fut: F::Future<'f>) -> Self {
Self { fut }
}
}
impl<'f, F, R, C> future::Future for CreatePipeline<'f, F, R, C>
where
F: ServiceFactory<R, C> + 'f,
{
type Output = Result<Pipeline<F::Service>, F::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(Pipeline::new(std::task::ready!(self
.project()
.fut
.poll(cx))?)))
}
}

View file

@ -248,7 +248,7 @@ mod tests {
use ntex_util::future::{lazy, Ready};
use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use crate::{pipeline, pipeline_factory, Service, ServiceCtx, ServiceFactory};
use crate::{chain, chain_factory, Service, ServiceCtx};
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);
@ -303,7 +303,7 @@ mod tests {
#[ntex::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
let srv = chain(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 2);
@ -314,10 +314,7 @@ mod tests {
#[ntex::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let srv = pipeline(Srv1(cnt.clone()))
.then(Srv2(cnt))
.clone()
.container();
let srv = chain(Srv1(cnt.clone())).then(Srv2(cnt)).clone().pipeline();
let res = srv.call(Ok("srv1")).await;
assert!(res.is_ok());
@ -333,10 +330,10 @@ mod tests {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let blank = move || Ready::<_, ()>::Ok(Srv1(cnt2.clone()));
let factory = pipeline_factory(blank)
let factory = chain_factory(blank)
.then(move || Ready::Ok(Srv2(cnt.clone())))
.clone();
let srv = factory.container(&()).await.unwrap();
let srv = factory.pipeline(&()).await.unwrap();
let res = srv.call(Ok("srv1")).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "ok"));