From a9d5845005c2a21a5e5fd7e7a0a4a2cf59d26d3d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 7 Jan 2024 03:42:45 +0600 Subject: [PATCH] Use async fn in trait for Service definition --- .github/workflows/linux.yml | 2 +- ntex-service/CHANGES.md | 4 + ntex-service/Cargo.toml | 2 +- ntex-service/src/and_then.rs | 157 +++-------------- ntex-service/src/apply.rs | 72 ++------ ntex-service/src/boxed.rs | 16 +- ntex-service/src/chain.rs | 21 +-- ntex-service/src/ctx.rs | 293 +++++++------------------------ ntex-service/src/fn_service.rs | 29 ++- ntex-service/src/fn_shutdown.rs | 8 +- ntex-service/src/lib.rs | 95 ++++------ ntex-service/src/map.rs | 104 ++--------- ntex-service/src/map_config.rs | 64 +------ ntex-service/src/map_err.rs | 91 ++-------- ntex-service/src/map_init_err.rs | 38 +--- ntex-service/src/middleware.rs | 71 ++------ ntex-service/src/pipeline.rs | 82 +++------ ntex-service/src/then.rs | 168 +++--------------- 18 files changed, 278 insertions(+), 1039 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 5f079475..2739e069 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -8,7 +8,7 @@ jobs: fail-fast: false matrix: version: - - 1.67.0 # MSRV + - 1.75.0 # MSRV - stable - nightly diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index eb9b10b4..e33c4c3c 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-01-xx + +* Use "async fn" in trait for Service definition + ## [1.2.7] - 2023-09-19 * Use From for apply_fn util diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 84b832e3..dddda587 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "1.2.7" +version = "2.0.0" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index 9c91066f..291cc445 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -1,6 +1,6 @@ -use std::{future::Future, pin::Pin, task::Context, task::Poll}; +use std::{task::Context, task::Poll}; -use super::{Service, ServiceCall, ServiceCtx, ServiceFactory}; +use super::{Service, ServiceCtx, ServiceFactory}; #[derive(Clone, Debug)] /// Service for the `and_then` combinator, chaining a computation onto the end @@ -26,7 +26,6 @@ where { type Response = B::Response; type Error = A::Error; - type Future<'f> = AndThenServiceResponse<'f, A, B, Req> where Self: 'f, Req: 'f; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let not_ready = !self.svc1.poll_ready(cx)?.is_ready(); @@ -47,73 +46,13 @@ where } #[inline] - fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - AndThenServiceResponse { - slf: self, - state: State::A { - fut: ctx.call(&self.svc1, req), - ctx: Some(ctx), - }, - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct AndThenServiceResponse<'f, A, B, Req> - where - A: Service, - B: Service, - { - slf: &'f AndThen, - #[pin] - state: State<'f, A, B, Req>, - } -} - -pin_project_lite::pin_project! { - #[project = StateProject] - enum State<'f, A, B, Req> - where - A: Service, - A: 'f, - Req: 'f, - B: Service, - B: 'f, - { - A { #[pin] fut: ServiceCall<'f, A, Req>, ctx: Option>> }, - B { #[pin] fut: ServiceCall<'f, B, A::Response> }, - Empty, - } -} - -impl<'f, A, B, Req> Future for AndThenServiceResponse<'f, A, B, Req> -where - A: Service, - B: Service, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - StateProject::A { fut, ctx } => match fut.poll(cx)? { - Poll::Ready(res) => { - let fut = ctx.take().unwrap().call(&this.slf.svc2, res); - this.state.set(State::B { fut }); - self.poll(cx) - } - Poll::Pending => Poll::Pending, - }, - StateProject::B { fut } => fut.poll(cx).map(|r| { - this.state.set(State::Empty); - r - }), - StateProject::Empty => { - panic!("future must not be polled after it returned `Poll::Ready`") - } - } + async fn call( + &self, + req: Req, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + let res = ctx.call(&self.svc1, req).await?; + ctx.call(&self.svc2, res).await } } @@ -142,67 +81,13 @@ where type Service = AndThen; type InitError = A::InitError; - type Future<'f> = AndThenFactoryResponse<'f, A, B, Req, Cfg> where Self: 'f, Cfg: 'f; #[inline] - fn create(&self, cfg: Cfg) -> Self::Future<'_> { - AndThenFactoryResponse { - fut1: self.svc1.create(cfg.clone()), - fut2: self.svc2.create(cfg), - svc1: None, - svc2: None, - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct AndThenFactoryResponse<'f, A, B, Req, Cfg> - where - A: ServiceFactory, - A: 'f, - B: ServiceFactory, - B: 'f, - Cfg: 'f - { - #[pin] - fut1: A::Future<'f>, - #[pin] - fut2: B::Future<'f>, - - svc1: Option, - svc2: Option, - } -} - -impl<'f, A, B, Req, Cfg> Future for AndThenFactoryResponse<'f, A, B, Req, Cfg> -where - A: ServiceFactory, - B: ServiceFactory, -{ - type Output = Result, A::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if this.svc1.is_none() { - if let Poll::Ready(service) = this.fut1.poll(cx)? { - *this.svc1 = Some(service); - } - } - if this.svc2.is_none() { - if let Poll::Ready(service) = this.fut2.poll(cx)? { - *this.svc2 = Some(service); - } - } - if this.svc1.is_some() && this.svc2.is_some() { - Poll::Ready(Ok(AndThen::new( - this.svc1.take().unwrap(), - this.svc2.take().unwrap(), - ))) - } else { - Poll::Pending - } + async fn create(&self, cfg: Cfg) -> Result { + Ok(AndThen { + svc1: self.svc1.create(cfg.clone()).await?, + svc2: self.svc2.create(cfg).await?, + }) } } @@ -219,19 +104,18 @@ mod tests { impl Service<&'static str> for Srv1 { type Response = &'static str; type Error = (); - type Future<'f> = Ready; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { self.0.set(self.0.get() + 1); Poll::Ready(Ok(())) } - fn call<'a>( + async fn call<'a>( &'a self, req: &'static str, _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - Ready::Ok(req) + ) -> Result { + Ok(req) } } @@ -241,19 +125,18 @@ mod tests { impl Service<&'static str> for Srv2 { type Response = (&'static str, &'static str); type Error = (); - type Future<'f> = Ready; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { self.0.set(self.0.get() + 1); Poll::Ready(Ok(())) } - fn call<'a>( + async fn call<'a>( &'a self, req: &'static str, _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { - Ready::Ok((req, "srv2")) + ) -> Result { + Ok((req, "srv2")) } } diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index fa8bdf17..f4fc4fb6 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -1,5 +1,5 @@ #![allow(clippy::type_complexity)] -use std::{fmt, future::Future, marker, pin::Pin, task, task::Poll}; +use std::{fmt, future::Future, marker}; use super::ctx::ServiceCtx; use super::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory}; @@ -97,14 +97,17 @@ where { type Response = Out; type Error = Err; - type Future<'f> = R where Self: 'f, In: 'f, R: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); #[inline] - fn call<'a>(&'a self, req: In, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - (self.f)(req, self.service.clone()) + async fn call( + &self, + req: In, + _: ServiceCtx<'_, Self>, + ) -> Result { + (self.f)(req, self.service.clone()).await } } @@ -183,58 +186,14 @@ where type Service = Apply; type InitError = T::InitError; - type Future<'f> = ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> where Self: 'f, Cfg: 'f; #[inline] - fn create(&self, cfg: Cfg) -> Self::Future<'_> { - ApplyFactoryResponse { - fut: self.service.create(cfg), - f: Some(self.f.clone()), - _t: marker::PhantomData, - } - } -} - -pin_project_lite::pin_project! { - pub struct ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> - where - T: ServiceFactory, - T: 'f, - F: Fn(In, Pipeline) -> R, - T::Service: 'f, - R: Future>, - Cfg: 'f, - Err: From, - { - #[pin] - fut: T::Future<'f>, - f: Option, - _t: marker::PhantomData<(In, Out)>, - } -} - -impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future - for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> -where - T: ServiceFactory, - F: Fn(In, Pipeline) -> R, - R: Future>, - Err: From, -{ - type Output = Result, T::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let this = self.project(); - - if let Poll::Ready(svc) = this.fut.poll(cx)? { - Poll::Ready(Ok(Apply { - service: svc.into(), - f: this.f.take().unwrap(), - r: marker::PhantomData, - })) - } else { - Poll::Pending - } + async fn create(&self, cfg: Cfg) -> Result { + self.service.create(cfg).await.map(|svc| Apply { + service: svc.into(), + f: self.f.clone(), + r: marker::PhantomData, + }) } } @@ -252,10 +211,9 @@ mod tests { impl Service<()> for Srv { type Response = (); type Error = (); - type Future<'f> = Ready<(), ()>; - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Ready::Ok(()) + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + Ok(()) } } diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index 241095f5..c74b9bba 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -87,7 +87,11 @@ where idx: usize, waiters: &'a WaitersRef, ) -> BoxFuture<'a, Self::Response, Self::Error> { - Box::pin(ServiceCtx::<'a, S>::from_ref(idx, waiters).call_nowait(self, req)) + Box::pin(async move { + ServiceCtx::<'a, S>::from_ref(idx, waiters) + .call_nowait(self, req) + .await + }) } } @@ -134,7 +138,6 @@ where { type Response = Res; type Error = Err; - type Future<'f> = BoxFuture<'f, Res, Err> where Self: 'f, Req: 'f; #[inline] fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { @@ -147,9 +150,9 @@ where } #[inline] - fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call(&self, req: Req, ctx: ServiceCtx<'_, Self>) -> Result { let (idx, waiters) = ctx.inner(); - self.0.call(req, idx, waiters) + self.0.call(req, idx, waiters).await } } @@ -163,10 +166,9 @@ where type Service = BoxService; type InitError = InitErr; - type Future<'f> = BoxFuture<'f, Self::Service, InitErr> where Self: 'f, C: 'f; #[inline] - fn create(&self, cfg: C) -> Self::Future<'_> { - self.0.create(cfg) + async fn create(&self, cfg: C) -> Result { + self.0.create(cfg).await } } diff --git a/ntex-service/src/chain.rs b/ntex-service/src/chain.rs index ba373d77..7f278a87 100644 --- a/ntex-service/src/chain.rs +++ b/ntex-service/src/chain.rs @@ -3,12 +3,11 @@ use std::{fmt, future::Future, marker::PhantomData}; use crate::and_then::{AndThen, AndThenFactory}; use crate::apply::{Apply, ApplyFactory}; -use crate::ctx::{ServiceCall, ServiceCtx}; +use crate::ctx::ServiceCtx; use crate::map::{Map, MapFactory}; use crate::map_err::{MapErr, MapErrFactory}; use crate::map_init_err::MapInitErr; use crate::middleware::{ApplyMiddleware, Middleware}; -use crate::pipeline::CreatePipeline; use crate::then::{Then, ThenFactory}; use crate::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory}; @@ -171,14 +170,17 @@ where impl, Req> Service for ServiceChain { type Response = Svc::Response; type Error = Svc::Error; - type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); #[inline] - fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - ctx.call(&self.service, req) + async fn call( + &self, + req: Req, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + ctx.call(&self.service, req).await } } @@ -308,11 +310,11 @@ impl, Req, C> ServiceChainFactory { } /// Create and return a new service value asynchronously and wrap into a container - pub fn pipeline(&self, cfg: C) -> CreatePipeline<'_, T, Req, C> + pub async fn pipeline(&self, cfg: C) -> Result, T::InitError> where Self: Sized, { - CreatePipeline::new(self.factory.create(cfg)) + Ok(Pipeline::new(self.factory.create(cfg).await?)) } } @@ -344,10 +346,9 @@ impl, R, C> ServiceFactory for ServiceChainFactory type Error = T::Error; type Service = T::Service; type InitError = T::InitError; - type Future<'f> = T::Future<'f> where Self: 'f; #[inline] - fn create(&self, cfg: C) -> Self::Future<'_> { - self.factory.create(cfg) + async fn create(&self, cfg: C) -> Result { + self.factory.create(cfg).await } } diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index fee59698..97e3fd14 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,6 +1,6 @@ -use std::{cell::UnsafeCell, fmt, future::Future, marker, pin::Pin, rc::Rc, task}; +use std::{cell::UnsafeCell, fmt, future::poll_fn, marker, rc::Rc, task, task::Poll}; -use crate::{Pipeline, Service}; +use crate::Service; pub struct ServiceCtx<'a, S: ?Sized> { idx: usize, @@ -112,27 +112,51 @@ impl<'a, S> ServiceCtx<'a, S> { (self.idx, self.waiters) } + /// Returns when the service is able to process requests. + pub async fn ready(&self, svc: &'a T) -> Result<(), T::Error> + where + T: Service, + { + // check readiness and notify waiters + poll_fn(move |cx| match svc.poll_ready(cx)? { + Poll::Ready(()) => { + self.waiters.notify(); + Poll::Ready(Ok(())) + } + Poll::Pending => { + self.waiters.register(self.idx, cx); + Poll::Pending + } + }) + .await + } + #[inline] /// Wait for service readiness and then call service - pub fn call(&self, svc: &'a T, req: R) -> ServiceCall<'a, T, R> + pub async fn call(&self, svc: &'a T, req: R) -> Result where T: Service, R: 'a, { - ServiceCall { - state: ServiceCallState::Ready { - svc, - req: Some(req), + self.ready(svc).await?; + svc.call( + req, + ServiceCtx { idx: self.idx, waiters: self.waiters, + _t: marker::PhantomData, }, - } + ) + .await } - #[doc(hidden)] #[inline] /// Call service, do not check service readiness - pub fn call_nowait(&self, svc: &'a T, req: R) -> T::Future<'a> + pub async fn call_nowait( + &self, + svc: &'a T, + req: R, + ) -> Result where T: Service, R: 'a, @@ -145,6 +169,7 @@ impl<'a, S> ServiceCtx<'a, S> { _t: marker::PhantomData, }, ) + .await } } @@ -166,201 +191,12 @@ impl<'a, S> fmt::Debug for ServiceCtx<'a, S> { } } -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct ServiceCall<'a, S, Req> - where - S: Service, - Req: 'a, - { - #[pin] - state: ServiceCallState<'a, S, Req>, - } -} - -pin_project_lite::pin_project! { - #[project = ServiceCallStateProject] - enum ServiceCallState<'a, S, Req> - where - S: Service, - Req: 'a, - { - Ready { req: Option, - svc: &'a S, - idx: usize, - waiters: &'a WaitersRef, - }, - ReadyPl { req: Option, - svc: &'a Pipeline, - pl: Pipeline, - }, - Call { #[pin] fut: S::Future<'a> }, - Empty, - } -} - -impl<'a, S, Req> ServiceCall<'a, S, Req> -where - S: Service, - Req: 'a, -{ - pub(crate) fn call_pipeline(req: Req, svc: &'a Pipeline) -> Self { - ServiceCall { - state: ServiceCallState::ReadyPl { - req: Some(req), - pl: svc.clone(), - svc, - }, - } - } - - pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> { - match self.state { - ServiceCallState::Ready { .. } | ServiceCallState::ReadyPl { .. } => {} - ServiceCallState::Call { .. } | ServiceCallState::Empty => { - panic!( - "`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`" - ) - } - } - ServiceCallToCall { state: self.state } - } -} - -impl<'a, S, Req> Future for ServiceCall<'a, S, Req> -where - S: Service, -{ - type Output = Result; - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> task::Poll { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - ServiceCallStateProject::Ready { - req, - svc, - idx, - waiters, - } => match svc.poll_ready(cx)? { - task::Poll::Ready(()) => { - waiters.notify(); - - let fut = svc.call( - req.take().unwrap(), - ServiceCtx { - waiters, - idx: *idx, - _t: marker::PhantomData, - }, - ); - this.state.set(ServiceCallState::Call { fut }); - self.poll(cx) - } - task::Poll::Pending => { - waiters.register(*idx, cx); - task::Poll::Pending - } - }, - ServiceCallStateProject::ReadyPl { req, svc, pl } => { - task::ready!(pl.poll_ready(cx))?; - - let ctx = ServiceCtx::new(&svc.waiters); - let svc_call = svc.get_ref().call(req.take().unwrap(), ctx); - - // SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc` - // Pipeline::svc is heap allocated(Rc), we keep it alive until - // `svc_call` get resolved to result - let fut = unsafe { std::mem::transmute(svc_call) }; - - this.state.set(ServiceCallState::Call { fut }); - self.poll(cx) - } - ServiceCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| { - this.state.set(ServiceCallState::Empty); - r - }), - ServiceCallStateProject::Empty => { - panic!("future must not be polled after it returned `Poll::Ready`") - } - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct ServiceCallToCall<'a, S, Req> - where - S: Service, - Req: 'a, - { - #[pin] - state: ServiceCallState<'a, S, Req>, - } -} - -impl<'a, S, Req> Future for ServiceCallToCall<'a, S, Req> -where - S: Service, -{ - type Output = Result, S::Error>; - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> task::Poll { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - ServiceCallStateProject::Ready { - req, - svc, - idx, - waiters, - } => match svc.poll_ready(cx)? { - task::Poll::Ready(()) => { - waiters.notify(); - - let fut = svc.call( - req.take().unwrap(), - ServiceCtx { - waiters, - idx: *idx, - _t: marker::PhantomData, - }, - ); - this.state.set(ServiceCallState::Empty); - task::Poll::Ready(Ok(fut)) - } - task::Poll::Pending => { - waiters.register(*idx, cx); - task::Poll::Pending - } - }, - ServiceCallStateProject::ReadyPl { req, svc, pl } => { - task::ready!(pl.poll_ready(cx))?; - - let ctx = ServiceCtx::new(&svc.waiters); - task::Poll::Ready(Ok(svc.get_ref().call(req.take().unwrap(), ctx))) - } - ServiceCallStateProject::Call { .. } => { - unreachable!("`ServiceCallToCall` can only be constructed in `Ready` state") - } - ServiceCallStateProject::Empty => { - panic!("future must not be polled after it returned `Poll::Ready`") - } - } - } -} - #[cfg(test)] mod tests { - use ntex_util::future::{lazy, poll_fn, Ready}; + use ntex_util::future::lazy; use ntex_util::{channel::condition, time}; - use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll}; + use std::task::{Context, Poll}; + use std::{cell::Cell, cell::RefCell, future::poll_fn, rc::Rc}; use super::*; use crate::Pipeline; @@ -370,20 +206,19 @@ mod tests { impl Service<&'static str> for Srv { type Response = &'static str; type Error = (); - type Future<'f> = Ready; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.0.set(self.0.get() + 1); self.1.poll_ready(cx).map(|_| Ok(())) } - fn call<'a>( + async fn call<'a>( &'a self, req: &'static str, ctx: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ) -> Result { let _ = ctx.clone(); - Ready::Ok(req) + Ok(req) } } @@ -451,32 +286,32 @@ mod tests { assert_eq!(&*data.borrow(), &["srv2", "srv1"]); } - #[ntex::test] - async fn test_advance_to_call() { - let cnt = Rc::new(Cell::new(0)); - let con = condition::Condition::new(); - let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); + // #[ntex::test] + // async fn test_advance_to_call() { + // let cnt = Rc::new(Cell::new(0)); + // let con = condition::Condition::new(); + // let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); - let mut fut = srv.call("test").advance_to_call(); - let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - con.notify(); + // let mut fut = srv.call("test").advance_to_call(); + // let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; + // con.notify(); - let res = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - assert!(res.is_ready()); - } + // let res = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; + // assert!(res.is_ready()); + // } - #[ntex::test] - #[should_panic] - async fn test_advance_to_call_panic() { - let cnt = Rc::new(Cell::new(0)); - let con = condition::Condition::new(); - let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); + // #[ntex::test] + // #[should_panic] + // async fn test_advance_to_call_panic() { + // let cnt = Rc::new(Cell::new(0)); + // let con = condition::Condition::new(); + // let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); - let mut fut = srv.call("test"); - let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - con.notify(); + // let mut fut = srv.call("test"); + // let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; + // con.notify(); - let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - let _f = fut.advance_to_call(); - } + // let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; + // let _f = fut.advance_to_call(); + // } } diff --git a/ntex-service/src/fn_service.rs b/ntex-service/src/fn_service.rs index d6b83baf..ed0d44e9 100644 --- a/ntex-service/src/fn_service.rs +++ b/ntex-service/src/fn_service.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::ready, future::Future, future::Ready, marker::PhantomData}; +use std::{fmt, future::Future, marker::PhantomData}; use crate::{IntoService, IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; @@ -133,11 +133,10 @@ where { type Response = Res; type Error = Err; - type Future<'f> = Fut where Self: 'f, Req: 'f; #[inline] - fn call<'a>(&'a self, req: Req, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - (self.f)(req) + async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result { + (self.f)(req).await } } @@ -207,11 +206,10 @@ where { type Response = Res; type Error = Err; - type Future<'f> = Fut where Self: 'f; #[inline] - fn call<'a>(&'a self, req: Req, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - (self.f)(req) + async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result { + (self.f)(req).await } } @@ -226,14 +224,13 @@ where type Service = FnService; type InitError = (); - type Future<'f> = Ready> where Self: 'f; #[inline] - fn create(&self, _: Cfg) -> Self::Future<'_> { - ready(Ok(FnService { + async fn create(&self, _: Cfg) -> Result { + Ok(FnService { f: self.f.clone(), _t: PhantomData, - })) + }) } } @@ -300,11 +297,10 @@ where type Service = Srv; type InitError = Err; - type Future<'f> = Fut where Self: 'f, Fut: 'f; #[inline] - fn create(&self, cfg: Cfg) -> Self::Future<'_> { - (self.f)(cfg) + async fn create(&self, cfg: Cfg) -> Result { + (self.f)(cfg).await } } @@ -341,11 +337,10 @@ where type Error = S::Error; type Service = S; type InitError = E; - type Future<'f> = R where Self: 'f, R: 'f; #[inline] - fn create(&self, _: C) -> Self::Future<'_> { - (self.f)() + async fn create(&self, _: C) -> Result { + (self.f)().await } } diff --git a/ntex-service/src/fn_shutdown.rs b/ntex-service/src/fn_shutdown.rs index ffa3291b..a3945a32 100644 --- a/ntex-service/src/fn_shutdown.rs +++ b/ntex-service/src/fn_shutdown.rs @@ -1,5 +1,4 @@ -use std::task::{Context, Poll}; -use std::{cell::Cell, fmt, future::ready, future::Ready, marker::PhantomData}; +use std::{cell::Cell, fmt, marker::PhantomData, task::Context, task::Poll}; use crate::{Service, ServiceCtx}; @@ -55,7 +54,6 @@ where { type Response = Req; type Error = Err; - type Future<'f> = Ready> where Self: 'f, Req: 'f; #[inline] fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> { @@ -66,8 +64,8 @@ where } #[inline] - fn call<'a>(&'a self, req: Req, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - ready(Ok(req)) + async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result { + Ok(req) } } diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index f79ee5d4..9354a65d 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -6,9 +6,7 @@ missing_debug_implementations )] -use std::future::Future; -use std::rc::Rc; -use std::task::{self, Context, Poll}; +use std::{future::Future, rc::Rc, task, task::Context, task::Poll}; mod and_then; mod apply; @@ -28,7 +26,7 @@ mod then; pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::chain::{chain, chain_factory}; -pub use self::ctx::{ServiceCall, ServiceCallToCall, ServiceCtx}; +pub use self::ctx::ServiceCtx; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_shutdown::fn_shutdown; pub use self::map_config::{map_config, unit_config}; @@ -62,8 +60,6 @@ pub use self::pipeline::{Pipeline, PipelineCall}; /// /// ```rust /// # use std::convert::Infallible; -/// # use std::future::Future; -/// # use std::pin::Pin; /// # /// # use ntex_service::{Service, ServiceCtx}; /// @@ -72,10 +68,9 @@ pub use self::pipeline::{Pipeline, PipelineCall}; /// impl Service for MyService { /// type Response = u64; /// type Error = Infallible; -/// type Future<'f> = Pin>>>; /// -/// fn call<'a>(&'a self, req: u8, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { -/// Box::pin(std::future::ready(Ok(req as u64))) +/// async fn call<'a>(&'a self, req: u8, _: ServiceCtx<'a, Self>) -> Result { +/// Ok(req as u64) /// } /// } /// ``` @@ -97,19 +92,17 @@ pub trait Service { /// Errors produced by the service when polling readiness or executing call. type Error; - /// The future response value. - type Future<'f>: Future> - where - Req: 'f, - Self: 'f; - /// Process the request and return the response asynchronously. /// /// This function is expected to be callable off-task. As such, implementations of `call` /// should take care to not call `poll_ready`. Caller of the service verifies readiness, /// Only way to make a `call` is to use `ctx` argument, it enforces readiness before calling /// service. - fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a>; + fn call( + &self, + req: Req, + ctx: ServiceCtx<'_, Self>, + ) -> impl Future>; #[inline] /// Returns `Ready` when the service is able to process requests. @@ -170,15 +163,6 @@ pub trait Service { { chain(dev::MapErr::new(self, f)) } - - #[inline] - /// Convert `Self` to a `ServiceChain` - fn chain(self) -> dev::ServiceChain - where - Self: Sized, - { - chain(self) - } } /// Factory for creating `Service`s. @@ -205,21 +189,19 @@ pub trait ServiceFactory { /// Errors potentially raised while building a service. type InitError; - /// The future of the `ServiceFactory` instance. - type Future<'f>: Future> - where - Cfg: 'f, - Self: 'f; - /// Create and return a new service value asynchronously. - fn create(&self, cfg: Cfg) -> Self::Future<'_>; + fn create( + &self, + cfg: Cfg, + ) -> impl Future>; + #[allow(async_fn_in_trait)] /// Create and return a new service value asynchronously and wrap into a container - fn pipeline(&self, cfg: Cfg) -> dev::CreatePipeline<'_, Self, Req, Cfg> + async fn pipeline(&self, cfg: Cfg) -> Result, Self::InitError> where Self: Sized, { - dev::CreatePipeline::new(self.create(cfg)) + Ok(Pipeline::new(self.create(cfg).await?)) } #[inline] @@ -269,7 +251,6 @@ where { type Response = S::Response; type Error = S::Error; - type Future<'f> = S::Future<'f> where 'a: 'f, Req: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -282,8 +263,12 @@ where } #[inline] - fn call<'s>(&'s self, request: Req, ctx: ServiceCtx<'s, Self>) -> S::Future<'s> { - ctx.call_nowait(&**self, request) + async fn call( + &self, + request: Req, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + ctx.call_nowait(&**self, request).await } } @@ -293,7 +278,6 @@ where { type Response = S::Response; type Error = S::Error; - type Future<'f> = S::Future<'f> where S: 'f, Req: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -306,8 +290,12 @@ where } #[inline] - fn call<'a>(&'a self, request: Req, ctx: ServiceCtx<'a, Self>) -> S::Future<'a> { - ctx.call_nowait(&**self, request) + async fn call( + &self, + request: Req, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + ctx.call_nowait(&**self, request).await } } @@ -319,10 +307,9 @@ where type Error = S::Error; type Service = S::Service; type InitError = S::InitError; - type Future<'f> = S::Future<'f> where S: 'f, Cfg: 'f; - fn create(&self, cfg: Cfg) -> S::Future<'_> { - self.as_ref().create(cfg) + async fn create(&self, cfg: Cfg) -> Result { + self.as_ref().create(cfg).await } } @@ -333,15 +320,6 @@ where { /// Convert to a `Service` fn into_service(self) -> Svc; - - #[inline] - /// Convert `Self` to a `ServiceChain` - fn into_chain(self) -> dev::ServiceChain - where - Self: Sized, - { - chain(self) - } } /// Trait for types that can be converted to a `ServiceFactory` @@ -351,15 +329,6 @@ where { /// Convert `Self` to a `ServiceFactory` fn into_factory(self) -> T; - - #[inline] - /// Convert `Self` to a `ServiceChainFactory` - fn chain(self) -> dev::ServiceChainFactory - where - Self: Sized, - { - chain_factory(self) - } } impl IntoService for Svc @@ -404,9 +373,5 @@ pub mod dev { pub use crate::map_err::{MapErr, MapErrFactory}; pub use crate::map_init_err::MapInitErr; pub use crate::middleware::ApplyMiddleware; - pub use crate::pipeline::CreatePipeline; pub use crate::then::{Then, ThenFactory}; - - #[doc(hidden)] - pub type ApplyService = crate::Pipeline; } diff --git a/ntex-service/src/map.rs b/ntex-service/src/map.rs index e5133a4a..40c5dee1 100644 --- a/ntex-service/src/map.rs +++ b/ntex-service/src/map.rs @@ -1,6 +1,6 @@ -use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; +use std::{fmt, marker::PhantomData}; -use super::{Service, ServiceCall, ServiceCtx, ServiceFactory}; +use super::{Service, ServiceCtx, ServiceFactory}; /// Service for the `map` combinator, changing the type of a service's response. /// @@ -60,51 +60,17 @@ where { type Response = Res; type Error = A::Error; - type Future<'f> = MapFuture<'f, A, F, Req, Res> where Self: 'f, Req: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); #[inline] - fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - MapFuture { - fut: ctx.call(&self.service, req), - slf: self, - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct MapFuture<'f, A, F, Req, Res> - where - A: Service, - A: 'f, - Req: 'f, - F: Fn(A::Response) -> Res, - { - slf: &'f Map, - #[pin] - fut: ServiceCall<'f, A, Req>, - } -} - -impl<'f, A, F, Req, Res> Future for MapFuture<'f, A, F, Req, Res> -where - A: Service + 'f, - Req: 'f, - F: Fn(A::Response) -> Res, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - match this.fut.poll(cx) { - Poll::Ready(Ok(resp)) => Poll::Ready(Ok((self.project().slf.f)(resp))), - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - } + async fn call( + &self, + req: Req, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + ctx.call(&self.service, req).await.map(|r| (self.f)(r)) } } @@ -167,55 +133,22 @@ where type Service = Map; type InitError = A::InitError; - type Future<'f> = MapFactoryFuture<'f, A, F, Req, Res, Cfg> where Self: 'f, Cfg: 'f; #[inline] - fn create(&self, cfg: Cfg) -> Self::Future<'_> { - MapFactoryFuture { - fut: self.a.create(cfg), - f: Some(self.f.clone()), - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct MapFactoryFuture<'f, A, F, Req, Res, Cfg> - where - A: ServiceFactory, - A: 'f, - F: Fn(A::Response) -> Res, - Cfg: 'f, - { - #[pin] - fut: A::Future<'f>, - f: Option, - } -} - -impl<'f, A, F, Req, Res, Cfg> Future for MapFactoryFuture<'f, A, F, Req, Res, Cfg> -where - A: ServiceFactory, - F: Fn(A::Response) -> Res, -{ - type Output = Result, A::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if let Poll::Ready(svc) = this.fut.poll(cx)? { - Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap()))) - } else { - Poll::Pending - } + async fn create(&self, cfg: Cfg) -> Result { + Ok(Map { + service: self.a.create(cfg).await?, + f: self.f.clone(), + _t: PhantomData, + }) } } #[cfg(test)] mod tests { - use ntex_util::future::{lazy, Ready}; + use ntex_util::future::lazy; + use std::task::{Context, Poll}; - use super::*; use crate::{fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; #[derive(Debug, Clone)] @@ -224,14 +157,13 @@ mod tests { impl Service<()> for Srv { type Response = (); type Error = (); - type Future<'f> = Ready<(), ()>; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Ready::Ok(()) + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + Ok(()) } } diff --git a/ntex-service/src/map_config.rs b/ntex-service/src/map_config.rs index 42c98683..822f0014 100644 --- a/ntex-service/src/map_config.rs +++ b/ntex-service/src/map_config.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; +use std::{fmt, marker::PhantomData}; use super::{IntoServiceFactory, ServiceFactory}; @@ -78,11 +78,9 @@ where type Service = A::Service; type InitError = A::InitError; - type Future<'f> = A::Future<'f> where Self: 'f; - fn create(&self, cfg: C) -> Self::Future<'_> { - let cfg = (self.f)(cfg); - self.a.create(cfg) + async fn create(&self, cfg: C) -> Result { + self.a.create((self.f)(cfg)).await } } @@ -108,37 +106,9 @@ where type Service = A::Service; type InitError = A::InitError; - type Future<'f> = UnitConfigFuture<'f, A, R, C> where Self: 'f, C: 'f; - fn create(&self, _: C) -> Self::Future<'_> { - UnitConfigFuture { - fut: self.factory.create(()), - _t: PhantomData, - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct UnitConfigFuture<'f, A, R, C> - where A: ServiceFactory, - A: 'f, - C: 'f, - { - #[pin] - fut: A::Future<'f>, - _t: PhantomData, - } -} - -impl<'f, A, R, C> Future for UnitConfigFuture<'f, A, R, C> -where - A: ServiceFactory, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) + async fn create(&self, _: C) -> Result { + self.factory.create(()).await } } @@ -175,28 +145,4 @@ mod tests { .create(&10) .await; } - - // #[ntex::test] - // async fn test_map_config_service() { - // let item = Rc::new(Cell::new(10usize)); - // let item2 = item.clone(); - - // let srv = map_config_service( - // fn_factory_with_config(move |next: usize| { - // let item = item2.clone(); - // async move { - // item.set(next); - // Ok::<_, ()>(fn_service(|id: usize| Ready::<_, ()>::Ok(id * 2))) - // } - // }), - // fn_service(move |item: usize| Ready::<_, ()>::Ok(item + 1)), - // ) - // .clone() - // .create(10) - // .await - // .unwrap(); - - // assert_eq!(srv.call(10usize).await.unwrap(), 20); - // assert_eq!(item.get(), 11); - // } } diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index 8d532734..f82f4441 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -1,6 +1,6 @@ -use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; +use std::{fmt, marker::PhantomData, task::Context, task::Poll}; -use super::{Service, ServiceCall, ServiceCtx, ServiceFactory}; +use super::{Service, ServiceCtx, ServiceFactory}; /// Service for the `map_err` combinator, changing the type of a service's /// error. @@ -61,7 +61,6 @@ where { type Response = A::Response; type Error = E; - type Future<'f> = MapErrFuture<'f, A, R, F, E> where A: 'f, R: 'f, F: 'f, E: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -69,44 +68,17 @@ where } #[inline] - fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - MapErrFuture { - slf: self, - fut: ctx.call(&self.service, req), - } + async fn call( + &self, + req: R, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + ctx.call(&self.service, req).await.map_err(|e| (self.f)(e)) } crate::forward_poll_shutdown!(service); } -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct MapErrFuture<'f, A, R, F, E> - where - A: Service, - A: 'f, - R: 'f, - F: Fn(A::Error) -> E, - { - slf: &'f MapErr, - #[pin] - fut: ServiceCall<'f, A, R>, - } -} - -impl<'f, A, R, F, E> Future for MapErrFuture<'f, A, R, F, E> -where - A: Service + 'f, - F: Fn(A::Error) -> E, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - this.fut.poll(cx).map_err(|e| (self.project().slf.f)(e)) - } -} - /// Factory for the `map_err` combinator, changing the type of a new /// service's error. /// @@ -173,46 +145,14 @@ where type Service = MapErr; type InitError = A::InitError; - type Future<'f> = MapErrFactoryFuture<'f, A, R, C, F, E> where Self: 'f, C: 'f; #[inline] - fn create(&self, cfg: C) -> Self::Future<'_> { - MapErrFactoryFuture { + async fn create(&self, cfg: C) -> Result { + self.a.create(cfg).await.map(|service| MapErr { + service, f: self.f.clone(), - fut: self.a.create(cfg), - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct MapErrFactoryFuture<'f, A, R, C, F, E> - where - A: ServiceFactory, - A: 'f, - F: Fn(A::Error) -> E, - C: 'f, - { - f: F, - #[pin] - fut: A::Future<'f>, - } -} - -impl<'f, A, R, C, F, E> Future for MapErrFactoryFuture<'f, A, R, C, F, E> -where - A: ServiceFactory, - F: Fn(A::Error) -> E + Clone, -{ - type Output = Result, A::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - if let Poll::Ready(svc) = this.fut.poll(cx)? { - Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) - } else { - Poll::Pending - } + _t: PhantomData, + }) } } @@ -229,7 +169,6 @@ mod tests { impl Service<()> for Srv { type Response = (); type Error = (); - type Future<'f> = Ready<(), ()>; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { if self.0 { @@ -239,8 +178,8 @@ mod tests { } } - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Ready::Err(()) + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + Err(()) } } diff --git a/ntex-service/src/map_init_err.rs b/ntex-service/src/map_init_err.rs index e96411df..f9544e9d 100644 --- a/ntex-service/src/map_init_err.rs +++ b/ntex-service/src/map_init_err.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; +use std::{fmt, marker::PhantomData}; use super::ServiceFactory; @@ -60,42 +60,10 @@ where type Service = A::Service; type InitError = E; - type Future<'f> = MapInitErrFuture<'f, A, R, C, F, E> where Self: 'f, C: 'f; #[inline] - fn create(&self, cfg: C) -> Self::Future<'_> { - MapInitErrFuture { - f: self.f.clone(), - fut: self.a.create(cfg), - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct MapInitErrFuture<'f, A, R, C, F, E> - where - A: ServiceFactory, - A: 'f, - F: Fn(A::InitError) -> E, - C: 'f, - { - f: F, - #[pin] - fut: A::Future<'f>, - } -} - -impl<'f, A, R, C, F, E> Future for MapInitErrFuture<'f, A, R, C, F, E> -where - A: ServiceFactory, - F: Fn(A::InitError) -> E, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - this.fut.poll(cx).map_err(|e| (self.project().f)(e)) + async fn create(&self, cfg: C) -> Result { + self.a.create(cfg).await.map_err(|e| (self.f)(e)) } } diff --git a/ntex-service/src/middleware.rs b/ntex-service/src/middleware.rs index d673b3da..b87d6d5e 100644 --- a/ntex-service/src/middleware.rs +++ b/ntex-service/src/middleware.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll}; +use std::{fmt, marker::PhantomData, rc::Rc}; use crate::{IntoServiceFactory, Service, ServiceFactory}; @@ -98,18 +98,18 @@ where } /// `Apply` middleware to a service factory. -pub struct ApplyMiddleware(Rc<(T, S)>, marker::PhantomData); +pub struct ApplyMiddleware(Rc<(T, S)>, PhantomData); impl ApplyMiddleware { /// Create new `ApplyMiddleware` service factory instance pub(crate) fn new(mw: T, svc: S) -> Self { - Self(Rc::new((mw, svc)), marker::PhantomData) + Self(Rc::new((mw, svc)), PhantomData) } } impl Clone for ApplyMiddleware { fn clone(&self) -> Self { - Self(self.0.clone(), marker::PhantomData) + Self(self.0.clone(), PhantomData) } } @@ -137,46 +137,10 @@ where type Service = T::Service; type InitError = S::InitError; - type Future<'f> = ApplyMiddlewareFuture<'f, T, S, R, C> where Self: 'f, C: 'f; #[inline] - fn create(&self, cfg: C) -> Self::Future<'_> { - ApplyMiddlewareFuture { - slf: self.0.clone(), - fut: self.0 .1.create(cfg), - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct ApplyMiddlewareFuture<'f, T, S, R, C> - where - S: ServiceFactory, - S: 'f, - T: Middleware, - C: 'f, - { - slf: Rc<(T, S)>, - #[pin] - fut: S::Future<'f>, - } -} - -impl<'f, T, S, R, C> Future for ApplyMiddlewareFuture<'f, T, S, R, C> -where - S: ServiceFactory, - T: Middleware, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - match this.fut.poll(cx)? { - Poll::Ready(srv) => Poll::Ready(Ok(this.slf.0.create(srv))), - Poll::Pending => Poll::Pending, - } + async fn create(&self, cfg: C) -> Result { + Ok(self.0 .0.create(self.0 .1.create(cfg).await?)) } } @@ -224,43 +188,46 @@ where #[allow(clippy::redundant_clone)] mod tests { use ntex_util::future::{lazy, Ready}; - use std::marker; + use std::task::{Context, Poll}; use super::*; - use crate::{fn_service, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory}; + use crate::{fn_service, Pipeline, Service, ServiceCtx, ServiceFactory}; #[derive(Debug, Clone)] - struct Tr(marker::PhantomData); + struct Tr(PhantomData); impl Middleware for Tr { type Service = Srv; fn create(&self, service: S) -> Self::Service { - Srv(service, marker::PhantomData) + Srv(service, PhantomData) } } #[derive(Debug, Clone)] - struct Srv(S, marker::PhantomData); + struct Srv(S, PhantomData); impl, R> Service for Srv { type Response = S::Response; type Error = S::Error; - type Future<'f> = ServiceCall<'f, S, R> where Self: 'f, R: 'f; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } - fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - ctx.call(&self.0, req) + async fn call<'a>( + &'a self, + req: R, + ctx: ServiceCtx<'a, Self>, + ) -> Result { + ctx.call(&self.0, req).await } } #[ntex::test] async fn middleware() { let factory = apply( - Rc::new(Tr(marker::PhantomData).clone()), + Rc::new(Tr(PhantomData).clone()), fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2)), ) .clone(); @@ -279,7 +246,7 @@ mod tests { let factory = crate::chain_factory(fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2))) - .apply(Rc::new(Tr(marker::PhantomData).clone())) + .apply(Rc::new(Tr(PhantomData).clone())) .clone(); let srv = Pipeline::new(factory.create(&()).await.unwrap().clone()); diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 543acd2e..87210855 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -1,6 +1,7 @@ -use std::{cell::Cell, future, pin::Pin, rc::Rc, task, task::Context, task::Poll}; +use std::future::{poll_fn, Future}; +use std::{cell::Cell, pin::Pin, rc::Rc, task, task::Context, task::Poll}; -use crate::{ctx::ServiceCall, ctx::Waiters, Service, ServiceCtx, ServiceFactory}; +use crate::{ctx::Waiters, Service, ServiceCtx}; #[derive(Debug)] /// Container for a service. @@ -29,6 +30,15 @@ impl Pipeline { self.svc.as_ref() } + #[inline] + /// Returns when the service is able to process requests. + pub async fn ready(&self) -> Result<(), S::Error> + where + S: Service, + { + poll_fn(move |cx| self.poll_ready(cx)).await + } + #[inline] /// Returns `Ready` when the service is able to process requests. pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> @@ -55,26 +65,21 @@ impl Pipeline { self.svc.poll_shutdown(cx) } - #[deprecated(since = "1.2.3", note = "Use Pipeline::call() instead")] - #[doc(hidden)] #[inline] /// Wait for service readiness and then create future object /// that resolves to service result. - pub fn service_call(&self, req: R) -> ServiceCall<'_, S, R> + pub async fn call(&self, req: R) -> Result where S: Service, { - ServiceCall::call_pipeline(req, self) - } + // check service readiness + self.ready().await?; - #[inline] - /// Wait for service readiness and then create future object - /// that resolves to service result. - pub fn call(&self, req: R) -> ServiceCall<'_, S, R> - where - S: Service, - { - ServiceCall::call_pipeline(req, self) + // call service + self.svc + .as_ref() + .call(req, ServiceCtx::new(&self.waiters)) + .await } #[inline] @@ -130,6 +135,8 @@ impl Clone for Pipeline { } } +type BoxFuture<'a, T> = Pin + 'a>>; + pin_project_lite::pin_project! { #[must_use = "futures do nothing unless polled"] pub struct PipelineCall @@ -153,7 +160,7 @@ pin_project_lite::pin_project! { Req: 'static, { Ready { req: Option }, - Call { #[pin] fut: S::Future<'static> }, + Call { #[pin] fut: BoxFuture<'static, Result> }, Empty, } } @@ -163,9 +170,10 @@ where S: Service + 'static, R: 'static, { - fn new_call(pl: &Pipeline, req: R) -> Self { + fn new_call<'a>(pl: &'a Pipeline, req: R) -> Self { let ctx = ServiceCtx::new(&pl.waiters); - let svc_call = pl.get_ref().call(req, ctx); + let svc_call: BoxFuture<'a, Result> = + Box::pin(pl.get_ref().call(req, ctx)); // SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc` // Pipeline::svc is heap allocated(Rc), we keep it alive until @@ -176,7 +184,7 @@ where } } -impl future::Future for PipelineCall +impl Future for PipelineCall where S: Service, { @@ -204,39 +212,3 @@ where } } } - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct CreatePipeline<'f, F, R, C> - where F: ServiceFactory, - F: ?Sized, - F: 'f, - C: 'f, - { - #[pin] - fut: F::Future<'f>, - } -} - -impl<'f, F, R, C> CreatePipeline<'f, F, R, C> -where - F: ServiceFactory + 'f, -{ - pub(crate) fn new(fut: F::Future<'f>) -> Self { - Self { fut } - } -} - -impl<'f, F, R, C> future::Future for CreatePipeline<'f, F, R, C> -where - F: ServiceFactory + 'f, -{ - type Output = Result, F::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(Pipeline::new(std::task::ready!(self - .project() - .fut - .poll(cx))?))) - } -} diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 16ad2d59..107b2b77 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -1,6 +1,6 @@ -use std::{future::Future, pin::Pin, task::Context, task::Poll}; +use std::{task::Context, task::Poll}; -use super::{Service, ServiceCall, ServiceCtx, ServiceFactory}; +use super::{Service, ServiceCtx, ServiceFactory}; #[derive(Debug, Clone)] /// Service for the `then` combinator, chaining a computation onto the end of @@ -26,7 +26,6 @@ where { type Response = B::Response; type Error = B::Error; - type Future<'f> = ThenServiceResponse<'f, A, B, R> where Self: 'f, R: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -48,74 +47,12 @@ where } #[inline] - fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - ThenServiceResponse { - slf: self, - state: State::A { - fut: ctx.call(&self.svc1, req), - ctx, - }, - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct ThenServiceResponse<'f, A, B, R> - where - A: Service, - B: Service>, - { - slf: &'f Then, - #[pin] - state: State<'f, A, B, R>, - } -} - -pin_project_lite::pin_project! { - #[project = StateProject] - enum State<'f, A, B, R> - where - A: Service, - A: 'f, - A::Response: 'f, - B: Service>, - B: 'f, - R: 'f, - { - A { #[pin] fut: ServiceCall<'f, A, R>, ctx: ServiceCtx<'f, Then> }, - B { #[pin] fut: ServiceCall<'f, B, Result> }, - Empty, - } -} - -impl<'a, A, B, R> Future for ThenServiceResponse<'a, A, B, R> -where - A: Service, - B: Service>, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - StateProject::A { fut, ctx } => match fut.poll(cx) { - Poll::Ready(res) => { - let fut = ctx.call(&this.slf.svc2, res); - this.state.set(State::B { fut }); - self.poll(cx) - } - Poll::Pending => Poll::Pending, - }, - StateProject::B { fut } => fut.poll(cx).map(|r| { - this.state.set(State::Empty); - r - }), - StateProject::Empty => { - panic!("future must not be polled after it returned `Poll::Ready`") - } - } + async fn call( + &self, + req: R, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + ctx.call(&self.svc2, ctx.call(&self.svc1, req).await).await } } @@ -149,73 +86,12 @@ where type Service = Then; type InitError = A::InitError; - type Future<'f> = ThenFactoryResponse<'f, A, B, R, C> where Self: 'f, C: 'f; - fn create(&self, cfg: C) -> Self::Future<'_> { - ThenFactoryResponse { - fut_a: self.svc1.create(cfg.clone()), - fut_b: self.svc2.create(cfg), - a: None, - b: None, - } - } -} - -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct ThenFactoryResponse<'f, A, B, R, C> - where - A: ServiceFactory, - B: ServiceFactory, C, - Error = A::Error, - InitError = A::InitError, - >, - A: 'f, - B: 'f, - C: 'f, - { - #[pin] - fut_b: B::Future<'f>, - #[pin] - fut_a: A::Future<'f>, - a: Option, - b: Option, - } -} - -impl<'f, A, B, R, C> Future for ThenFactoryResponse<'f, A, B, R, C> -where - A: ServiceFactory, - B: ServiceFactory< - Result, - C, - Error = A::Error, - InitError = A::InitError, - >, -{ - type Output = Result, A::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if this.a.is_none() { - if let Poll::Ready(service) = this.fut_a.poll(cx)? { - *this.a = Some(service); - } - } - if this.b.is_none() { - if let Poll::Ready(service) = this.fut_b.poll(cx)? { - *this.b = Some(service); - } - } - if this.a.is_some() && this.b.is_some() { - Poll::Ready(Ok(Then::new( - this.a.take().unwrap(), - this.b.take().unwrap(), - ))) - } else { - Poll::Pending - } + async fn create(&self, cfg: C) -> Result { + Ok(Then { + svc1: self.svc1.create(cfg.clone()).await?, + svc2: self.svc2.create(cfg).await?, + }) } } @@ -232,21 +108,20 @@ mod tests { impl Service> for Srv1 { type Response = &'static str; type Error = (); - type Future<'f> = Ready; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { self.0.set(self.0.get() + 1); Poll::Ready(Ok(())) } - fn call<'a>( + async fn call<'a>( &'a self, req: Result<&'static str, &'static str>, _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ) -> Result<&'static str, ()> { match req { - Ok(msg) => Ready::Ok(msg), - Err(_) => Ready::Err(()), + Ok(msg) => Ok(msg), + Err(_) => Err(()), } } } @@ -257,21 +132,20 @@ mod tests { impl Service> for Srv2 { type Response = (&'static str, &'static str); type Error = (); - type Future<'f> = Ready; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { self.0.set(self.0.get() + 1); Poll::Ready(Ok(())) } - fn call<'a>( + async fn call<'a>( &'a self, req: Result<&'static str, ()>, _: ServiceCtx<'a, Self>, - ) -> Self::Future<'a> { + ) -> Result { match req { - Ok(msg) => Ready::Ok((msg, "ok")), - Err(()) => Ready::Ok(("srv2", "err")), + Ok(msg) => Ok((msg, "ok")), + Err(()) => Ok(("srv2", "err")), } } }