diff --git a/Cargo.toml b/Cargo.toml index abc4bf39..76c34f0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ ntex-io = { path = "ntex-io" } ntex-http = { path = "ntex-http" } ntex-router = { path = "ntex-router" } ntex-rt = { path = "ntex-rt" } -ntex-service = { path = "ntex-service" } +#ntex-service = { path = "ntex-service" } ntex-tls = { path = "ntex-tls" } ntex-macros = { path = "ntex-macros" } ntex-util = { path = "ntex-util" } diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index d628a523..3e73a7d8 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [1.2.0] - 2023-06-xx + +* Enforce service readiness during call + +* Introduce service sharable readiness + ## [1.0.2] - 2023-04-14 * Remove Rc where S: Service as it brakes readiness check validity diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index c93db6ed..9189a081 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "1.0.2" +version = "1.2.0" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] @@ -17,6 +17,7 @@ path = "src/lib.rs" [dependencies] pin-project-lite = "0.2.6" +slab = "0.4" [dev-dependencies] ntex = { version = "0.6.0", features = ["tokio"] } diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index b4392914..78709c36 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -1,6 +1,6 @@ use std::{future::Future, pin::Pin, task::Context, task::Poll}; -use super::{Service, ServiceFactory}; +use super::{Ctx, Service, ServiceCall, ServiceFactory}; /// Service for the `and_then` combinator, chaining a computation onto the end /// of another service which completes successfully. @@ -59,17 +59,22 @@ where } #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { AndThenServiceResponse { slf: self, state: State::A { - fut: self.svc1.call(req), + fut: ctx.clone().call(&self.svc1, req), + ctx: Some(ctx), }, } } } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct AndThenServiceResponse<'f, A, B, Req> where A: Service, @@ -91,8 +96,8 @@ pin_project_lite::pin_project! { B: Service, B: 'f, { - A { #[pin] fut: A::Future<'f> }, - B { #[pin] fut: B::Future<'f> }, + A { #[pin] fut: ServiceCall<'f, A, Req>, ctx: Option>> }, + B { #[pin] fut: ServiceCall<'f, B, A::Response> }, Empty, } } @@ -108,9 +113,9 @@ where let mut this = self.as_mut().project(); match this.state.as_mut().project() { - StateProject::A { fut } => match fut.poll(cx)? { + StateProject::A { fut, ctx } => match fut.poll(cx)? { Poll::Ready(res) => { - let fut = this.slf.svc2.call(res); + let fut = ctx.take().unwrap().call(&this.slf.svc2, res); this.state.set(State::B { fut }); self.poll(cx) } @@ -178,6 +183,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct AndThenFactoryResponse<'f, A, B, Req, Cfg> where A: ServiceFactory, @@ -231,7 +237,7 @@ where mod tests { use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; - use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory}; + use crate::{fn_factory, pipeline, pipeline_factory, Ctx, Service, ServiceFactory}; use ntex_util::future::{lazy, Ready}; #[derive(Clone)] @@ -247,7 +253,10 @@ mod tests { Poll::Ready(Ok(())) } - fn call(&self, req: &'static str) -> Self::Future<'_> { + fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> + where + &'static str: 'a, + { Ready::Ok(req) } } @@ -265,7 +274,10 @@ mod tests { Poll::Ready(Ok(())) } - fn call(&self, req: &'static str) -> Self::Future<'_> { + fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> + where + &'static str: 'a, + { Ready::Ok((req, "srv2")) } } @@ -286,7 +298,7 @@ mod tests { #[ntex::test] async fn test_call() { let cnt = Rc::new(Cell::new(0)); - let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt)); + let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt)).container(); let res = srv.call("srv1").await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "srv2")); @@ -302,7 +314,7 @@ mod tests { .and_then(move || Ready::from(Ok(Srv2(cnt.clone())))) .clone(); - let srv = new_srv.create(&()).await.unwrap(); + let srv = new_srv.container(&()).await.unwrap(); let res = srv.call("srv1").await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "srv2")); diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 8fa41ad4..cdb02605 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -1,8 +1,7 @@ -use std::{ - future::Future, marker::PhantomData, pin::Pin, rc::Rc, task::Context, task::Poll, -}; +#![allow(clippy::type_complexity)] +use std::{cell::RefCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; -use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; +use super::{Ctx, IntoService, IntoServiceFactory, Service, ServiceCall, ServiceFactory}; /// Apply transform function to a service. pub fn apply_fn( @@ -11,7 +10,7 @@ pub fn apply_fn( ) -> Apply where T: Service, - F: Fn(In, Rc) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, U: IntoService, { @@ -25,7 +24,7 @@ pub fn apply_fn_factory( ) -> ApplyFactory where T: ServiceFactory, - F: Fn(In, Rc) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, U: IntoServiceFactory, { @@ -39,13 +38,13 @@ where { service: Rc, f: F, - r: PhantomData (In, Out, R)>, + r: marker::PhantomData (In, Out, R)>, } impl Apply where T: Service, - F: Fn(In, Rc) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, { /// Create new `Apply` combinator @@ -53,7 +52,7 @@ where Self { f, service: Rc::new(service), - r: PhantomData, + r: marker::PhantomData, } } } @@ -61,22 +60,37 @@ where impl Clone for Apply where T: Service + Clone, - F: Fn(In, Rc) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { fn clone(&self) -> Self { Apply { service: self.service.clone(), f: self.f.clone(), - r: PhantomData, + r: marker::PhantomData, } } } +pub struct ApplyService { + svc: Rc, + index: usize, + waiters: Rc>>>, +} + +impl ApplyService { + pub fn call(&self, req: R) -> ServiceCall<'_, S, R> + where + S: Service, + { + Ctx::::new(self.index, &self.waiters).call(&self.svc, req) + } +} + impl Service for Apply where T: Service, - F: Fn(In, Rc) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, { type Response = Out; @@ -87,8 +101,17 @@ where crate::forward_poll_shutdown!(service); #[inline] - fn call(&self, req: In) -> Self::Future<'_> { - (self.f)(req, self.service.clone()) + fn call<'a>(&'a self, req: In, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + In: 'a, + { + let (index, waiters) = ctx.into_inner(); + let svc = ApplyService { + index, + waiters: waiters.clone(), + svc: self.service.clone(), + }; + (self.f)(req, svc) } } @@ -96,18 +119,18 @@ where pub struct ApplyFactory where T: ServiceFactory, - F: Fn(In, Rc) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { service: T, f: F, - r: PhantomData (R, In, Out)>, + r: marker::PhantomData (R, In, Out)>, } impl ApplyFactory where T: ServiceFactory, - F: Fn(In, Rc) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { /// Create new `ApplyNewService` new service instance @@ -115,7 +138,7 @@ where Self { f, service, - r: PhantomData, + r: marker::PhantomData, } } } @@ -124,14 +147,14 @@ impl Clone for ApplyFactory where T: ServiceFactory + Clone, - F: Fn(In, Rc) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { fn clone(&self) -> Self { Self { service: self.service.clone(), f: self.f.clone(), - r: PhantomData, + r: marker::PhantomData, } } } @@ -140,7 +163,7 @@ impl ServiceFactory for ApplyFactory where T: ServiceFactory, - F: Fn(In, Rc) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { type Response = Out; @@ -155,7 +178,7 @@ where ApplyFactoryResponse { fut: self.service.create(cfg), f: Some(self.f.clone()), - _t: PhantomData, + _t: marker::PhantomData, } } } @@ -165,14 +188,14 @@ pin_project_lite::pin_project! { where T: ServiceFactory, T: 'f, - F: Fn(In, Rc) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, Cfg: 'f, { #[pin] fut: T::Future<'f>, f: Option, - _t: PhantomData<(In, Out)>, + _t: marker::PhantomData<(In, Out)>, } } @@ -180,12 +203,12 @@ impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> where T: ServiceFactory, - F: Fn(In, Rc) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, { type Output = Result, T::InitError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let this = self.project(); if let Poll::Ready(svc) = this.fut.poll(cx)? { @@ -202,7 +225,7 @@ mod tests { use std::task::Poll; use super::*; - use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; + use crate::{pipeline, pipeline_factory, Ctx, Service, ServiceFactory}; #[derive(Clone)] struct Srv; @@ -212,7 +235,10 @@ mod tests { type Error = (); type Future<'f> = Ready<(), ()>; - fn call(&self, _: ()) -> Self::Future<'_> { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> + where + (): 'a, + { Ready::Ok(()) } } @@ -220,15 +246,13 @@ mod tests { #[ntex::test] async fn test_call() { let srv = pipeline( - apply_fn(Srv, |req: &'static str, srv| { - let fut = srv.call(()); - async move { - fut.await.unwrap(); - Ok((req, ())) - } + apply_fn(Srv, |req: &'static str, srv| async move { + srv.call(()).await.unwrap(); + Ok((req, ())) }) .clone(), - ); + ) + .container(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let res = lazy(|cx| srv.poll_shutdown(cx)).await; @@ -244,18 +268,15 @@ mod tests { let new_srv = pipeline_factory( apply_fn_factory( || Ready::<_, ()>::Ok(Srv), - |req: &'static str, srv| { - let fut = srv.call(()); - async move { - fut.await.unwrap(); - Ok((req, ())) - } + |req: &'static str, srv| async move { + srv.call(()).await.unwrap(); + Ok((req, ())) }, ) .clone(), ); - let srv = new_srv.create(&()).await.unwrap(); + let srv = new_srv.container(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index 9b3a10d9..809061cf 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -1,4 +1,7 @@ -use std::{future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; +use std::task::{Context, Poll, Waker}; +use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc}; + +use crate::Ctx; pub type BoxFuture<'a, I, E> = Pin> + 'a>>; @@ -54,7 +57,12 @@ pub trait ServiceObj { fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()>; - fn call<'a>(&'a self, req: Req) -> BoxFuture<'a, Self::Response, Self::Error> + fn call<'a>( + &'a self, + req: Req, + idx: usize, + waiters: &'a Rc>>>, + ) -> BoxFuture<'a, Self::Response, Self::Error> where Req: 'a; } @@ -78,11 +86,16 @@ where } #[inline] - fn call<'a>(&'a self, req: Req) -> BoxFuture<'a, Self::Response, Self::Error> + fn call<'a>( + &'a self, + req: Req, + idx: usize, + waiters: &'a Rc>>>, + ) -> BoxFuture<'a, Self::Response, Self::Error> where Req: 'a, { - Box::pin(crate::Service::call(self, req)) + Box::pin(Ctx::<'a, S>::new(idx, waiters).call_nowait(self, req)) } } @@ -181,8 +194,12 @@ where } #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { - self.0.call(req) + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { + let (index, waiters) = ctx.into_inner(); + self.0.call(req, index, waiters) } } @@ -213,8 +230,12 @@ where } #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { - self.0.call(req) + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { + let (index, waiters) = ctx.into_inner(); + self.0.call(req, index, waiters) } } diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs new file mode 100644 index 00000000..5c6a7ca6 --- /dev/null +++ b/ntex-service/src/ctx.rs @@ -0,0 +1,370 @@ +use std::{cell::RefCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; + +use crate::{Service, ServiceFactory}; + +pub struct Container { + svc: Rc, + index: usize, + waiters: Rc>>>, + _t: marker::PhantomData, +} + +impl Container +where + S: Service, +{ + #[inline] + pub fn new(svc: S) -> Self { + let mut waiters = slab::Slab::new(); + let index = waiters.insert(None); + Container { + index, + svc: Rc::new(svc), + waiters: Rc::new(RefCell::new(waiters)), + _t: marker::PhantomData, + } + } + + #[inline] + /// Returns `Ready` when the service is able to process requests. + pub fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll> { + let res = self.svc.poll_ready(cx); + + if res.is_pending() { + self.waiters.borrow_mut()[self.index] = Some(cx.waker().clone()); + } + res + } + + #[inline] + /// Shutdown enclosed service. + pub fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> Poll<()> { + self.svc.poll_shutdown(cx) + } + + #[inline] + /// Process the request and return the response asynchronously. + pub fn call<'a>(&'a self, req: R) -> ServiceCall<'a, S, R> { + let ctx = Ctx::<'a, S> { + index: self.index, + waiters: &self.waiters, + _t: marker::PhantomData, + }; + ctx.call(self.svc.as_ref(), req) + } + + pub(crate) fn create, C>( + f: &F, + cfg: C, + ) -> ContainerFactory<'_, F, R, C> { + ContainerFactory { + fut: f.create(cfg), + _t: marker::PhantomData, + } + } +} + +impl Clone for Container { + fn clone(&self) -> Self { + let index = self.waiters.borrow_mut().insert(None); + + Self { + index, + svc: self.svc.clone(), + waiters: self.waiters.clone(), + _t: marker::PhantomData, + } + } +} + +impl From for Container +where + S: Service, +{ + fn from(svc: S) -> Self { + Container::new(svc) + } +} + +impl Drop for Container { + fn drop(&mut self) { + let mut waiters = self.waiters.borrow_mut(); + + waiters.remove(self.index); + for (_, waker) in &mut *waiters { + if let Some(waker) = waker.take() { + waker.wake(); + } + } + } +} + +pub struct Ctx<'b, S: ?Sized> { + index: usize, + waiters: &'b Rc>>>, + _t: marker::PhantomData>, +} + +impl<'b, S: ?Sized> Ctx<'b, S> { + pub(crate) fn new( + index: usize, + waiters: &'b Rc>>>, + ) -> Self { + Self { + index, + waiters, + _t: marker::PhantomData, + } + } + + pub(crate) fn into_inner( + self, + ) -> (usize, &'b Rc>>>) { + (self.index, self.waiters) + } + + /// Call service, do not check service readiness + pub(crate) fn call_nowait(&self, svc: &'b T, req: R) -> T::Future<'b> + where + T: Service + ?Sized, + R: 'b, + { + svc.call( + req, + Ctx { + index: self.index, + waiters: self.waiters, + _t: marker::PhantomData, + }, + ) + } + + #[inline] + /// Wait for service readiness and then call service + pub fn call(&self, svc: &'b T, req: R) -> ServiceCall<'b, T, R> + where + T: Service + ?Sized, + R: 'b, + { + ServiceCall { + state: ServiceCallState::Ready { + svc, + req: Some(req), + index: self.index, + waiters: self.waiters, + }, + } + } +} + +impl<'b, S: ?Sized> Clone for Ctx<'b, S> { + fn clone(&self) -> Self { + Self { + index: self.index, + waiters: self.waiters, + _t: marker::PhantomData, + } + } +} + +pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] + pub struct ServiceCall<'a, T, Req> + where + T: Service, + T: 'a, + T: ?Sized, + Req: 'a, + { + #[pin] + state: ServiceCallState<'a, T, Req>, + } +} + +pin_project_lite::pin_project! { + #[project = ServiceCallStateProject] + enum ServiceCallState<'a, T, Req> + where + T: Service, + T: 'a, + T: ?Sized, + Req: 'a, + { + Ready { req: Option, + svc: &'a T, + index: usize, + waiters: &'a Rc>>>, + }, + Call { #[pin] fut: T::Future<'a> }, + Empty, + } +} + +impl<'a, T, Req> Future for ServiceCall<'a, T, Req> +where + T: Service + ?Sized, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut this = self.as_mut().project(); + + match this.state.as_mut().project() { + ServiceCallStateProject::Ready { + req, + svc, + index, + waiters, + } => match svc.poll_ready(cx)? { + Poll::Ready(()) => { + for (_, waker) in &mut *waiters.borrow_mut() { + if let Some(waker) = waker.take() { + waker.wake(); + } + } + + let fut = svc.call( + req.take().unwrap(), + Ctx { + waiters, + index: *index, + _t: marker::PhantomData, + }, + ); + this.state.set(ServiceCallState::Call { fut }); + self.poll(cx) + } + Poll::Pending => { + waiters.borrow_mut()[*index] = Some(cx.waker().clone()); + Poll::Pending + } + }, + ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| { + this.state.set(ServiceCallState::Empty); + r + }), + ServiceCallStateProject::Empty => { + panic!("future must not be polled after it returned `Poll::Ready`") + } + } + } +} + +pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] + pub struct ContainerFactory<'f, F, R, C> + where F: ServiceFactory, + F: ?Sized, + F: 'f, + C: 'f, + { + #[pin] + fut: F::Future<'f>, + _t: marker::PhantomData<(R, C)>, + } +} + +impl<'f, F, R, C> Future for ContainerFactory<'f, F, R, C> +where + F: ServiceFactory + 'f, +{ + type Output = Result, F::InitError>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + Poll::Ready(Ok(Container::new(task::ready!(self + .project() + .fut + .poll(cx))?))) + } +} + +#[cfg(test)] +mod tests { + use ntex_util::{channel::condition, future::lazy, future::Ready, time}; + use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll}; + + use super::*; + + struct Srv(Rc>, condition::Waiter); + + impl Service<&'static str> for Srv { + type Response = &'static str; + type Error = (); + type Future<'f> = Ready; + + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.0.set(self.0.get() + 1); + self.1.poll_ready(cx).map(|_| Ok(())) + } + + fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> + where + &'static str: 'a, + { + Ready::Ok(req) + } + } + + #[ntex::test] + async fn test_poll_ready() { + let cnt = Rc::new(Cell::new(0)); + let con = condition::Condition::new(); + + let srv1 = Container::from(Srv(cnt.clone(), con.wait())); + let srv2 = srv1.clone(); + + let res = lazy(|cx| srv1.poll_ready(cx)).await; + assert_eq!(res, Poll::Pending); + assert_eq!(cnt.get(), 1); + + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Pending); + assert_eq!(cnt.get(), 2); + + con.notify(); + + let res = lazy(|cx| srv1.poll_ready(cx)).await; + assert_eq!(res, Poll::Ready(Ok(()))); + assert_eq!(cnt.get(), 3); + + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Pending); + assert_eq!(cnt.get(), 4); + } + + #[ntex::test] + async fn test_shared_call() { + let data = Rc::new(RefCell::new(Vec::new())); + + let cnt = Rc::new(Cell::new(0)); + let con = condition::Condition::new(); + + let srv1 = Container::from(Srv(cnt.clone(), con.wait())); + let srv2 = srv1.clone(); + + let data1 = data.clone(); + ntex::rt::spawn(async move { + let i = srv1.call("srv1").await.unwrap(); + data1.borrow_mut().push(i); + }); + + let data2 = data.clone(); + ntex::rt::spawn(async move { + let i = srv2.call("srv2").await.unwrap(); + data2.borrow_mut().push(i); + }); + time::sleep(time::Millis(50)).await; + + con.notify(); + time::sleep(time::Millis(150)).await; + + assert_eq!(cnt.get(), 4); + assert_eq!(&*data.borrow(), &["srv2"]); + + con.notify(); + time::sleep(time::Millis(150)).await; + + assert_eq!(cnt.get(), 5); + assert_eq!(&*data.borrow(), &["srv2", "srv1"]); + } +} diff --git a/ntex-service/src/fn_service.rs b/ntex-service/src/fn_service.rs index b3112b9f..607a65b1 100644 --- a/ntex-service/src/fn_service.rs +++ b/ntex-service/src/fn_service.rs @@ -1,6 +1,6 @@ use std::{future::ready, future::Future, future::Ready, marker::PhantomData}; -use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; +use crate::{Ctx, IntoService, IntoServiceFactory, Service, ServiceFactory}; #[inline] /// Create `ServiceFactory` for function that can act as a `Service` @@ -40,7 +40,7 @@ where /// }); /// /// // construct new service -/// let srv = factory.create(&()).await?; +/// let srv = factory.container(&()).await?; /// /// // now we can use `div` service /// let result = srv.call((10, 20)).await?; @@ -81,7 +81,7 @@ where /// }); /// /// // construct new service with config argument -/// let srv = factory.create(&10).await?; +/// let srv = factory.container(&10).await?; /// /// let result = srv.call(10).await?; /// assert_eq!(result, 100); @@ -128,7 +128,10 @@ where type Future<'f> = Fut where Self: 'f, Req: 'f; #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { (self.f)(req) } } @@ -190,7 +193,10 @@ where type Future<'f> = Fut where Self: 'f; #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { (self.f)(req) } } @@ -348,19 +354,19 @@ mod tests { use std::task::Poll; use super::*; - use crate::{Service, ServiceFactory}; + use crate::{Container, ServiceFactory}; #[ntex::test] async fn test_fn_service() { let new_srv = fn_service(|()| async { Ok::<_, ()>("srv") }).clone(); - let srv = new_srv.create(()).await.unwrap(); + let srv = Container::new(new_srv.create(()).await.unwrap()); let res = srv.call(()).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(res.is_ok()); assert_eq!(res.unwrap(), "srv"); - let srv2 = new_srv.clone(); + let srv2 = Container::new(new_srv.clone()); let res = srv2.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), "srv"); @@ -370,12 +376,14 @@ mod tests { #[ntex::test] async fn test_fn_service_service() { - let srv = fn_service(|()| async { Ok::<_, ()>("srv") }) - .clone() - .create(&()) - .await - .unwrap() - .clone(); + let srv = Container::new( + fn_service(|()| async { Ok::<_, ()>("srv") }) + .clone() + .create(&()) + .await + .unwrap() + .clone(), + ); let res = srv.call(()).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); @@ -396,7 +404,7 @@ mod tests { }) .clone(); - let srv = new_srv.create(&1).await.unwrap(); + let srv = Container::new(new_srv.create(&1).await.unwrap()); let res = srv.call(()).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(res.is_ok()); diff --git a/ntex-service/src/fn_shutdown.rs b/ntex-service/src/fn_shutdown.rs index 53d7f23e..06043dda 100644 --- a/ntex-service/src/fn_shutdown.rs +++ b/ntex-service/src/fn_shutdown.rs @@ -3,7 +3,7 @@ use std::future::{ready, Ready}; use std::marker::PhantomData; use std::task::{Context, Poll}; -use crate::Service; +use crate::{Ctx, Service}; #[inline] /// Create `FnShutdown` for function that can act as a `on_shutdown` callback. @@ -60,7 +60,10 @@ where } #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { ready(Ok(req)) } } @@ -70,7 +73,7 @@ mod tests { use ntex_util::future::lazy; use std::{rc::Rc, task::Poll}; - use crate::{fn_service, pipeline}; + use crate::{fn_service, pipeline, Container}; use super::*; @@ -83,7 +86,7 @@ mod tests { is_called2.set(true); }); - let pipe = pipeline(srv).and_then(on_shutdown).clone(); + let pipe = Container::new(pipeline(srv).and_then(on_shutdown).clone()); let res = pipe.call(()).await; assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(()))); diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 4a330b72..addc4cc0 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -1,7 +1,6 @@ //! See [`Service`] docs for information on this crate's foundational trait. #![deny(rust_2018_idioms, warnings)] -#![allow(clippy::type_complexity)] use std::future::Future; use std::rc::Rc; @@ -10,6 +9,7 @@ use std::task::{self, Context, Poll}; mod and_then; mod apply; pub mod boxed; +mod ctx; mod fn_service; mod fn_shutdown; mod macros; @@ -22,6 +22,7 @@ mod pipeline; mod then; pub use self::apply::{apply_fn, apply_fn_factory}; +pub use self::ctx::{Container, ContainerFactory, Ctx, ServiceCall}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_shutdown::fn_shutdown; pub use self::map_config::{map_config, unit_config}; @@ -53,13 +54,12 @@ pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; /// simple API surfaces. This leads to simpler design of each service, improves test-ability and /// makes composition easier. /// -/// ```rust +/// ```rust,ignore /// # use std::convert::Infallible; /// # use std::future::Future; /// # use std::pin::Pin; -/// # use std::task::{Context, Poll}; /// # -/// # use ntex_service::Service; +/// # use ntex_service::{Ctx, Service}; /// /// struct MyService; /// @@ -68,7 +68,7 @@ pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; /// type Error = Infallible; /// type Future<'f> = Pin>>>; /// -/// fn call(&self, req: u8) -> Self::Future<'_> { +/// fn call<'a>(&'a self, req: u8, _: Ctx<'a, Self>) -> Self::Future<'a> { /// Box::pin(std::future::ready(Ok(req as u64))) /// } /// } @@ -101,7 +101,9 @@ pub trait Service { /// /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be /// resilient to this fact. - fn call(&self, req: Req) -> Self::Future<'_>; + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a; #[inline] /// Returns `Ready` when the service is able to process requests. @@ -154,7 +156,7 @@ pub trait Service { /// error type. /// /// Note that this function consumes the receiving service and returns a wrapped version of it. - fn map_err(self, f: F) -> crate::dev::MapErr + fn map_err(self, f: F) -> crate::dev::MapErr where Self: Sized, F: Fn(Self::Error) -> E, @@ -196,6 +198,14 @@ pub trait ServiceFactory { /// Create and return a new service value asynchronously. fn create(&self, cfg: Cfg) -> Self::Future<'_>; + /// Create and return a new service value asynchronously and wrap into a container + fn container(&self, cfg: Cfg) -> ContainerFactory<'_, Self, Req, Cfg> + where + Self: Sized, + { + Container::::create(self, cfg) + } + #[inline] /// Map this service's output to a different type, returning a new service /// of the resulting type. @@ -250,8 +260,11 @@ where } #[inline] - fn call(&self, request: Req) -> S::Future<'_> { - (**self).call(request) + fn call<'s>(&'s self, request: Req, ctx: Ctx<'s, Self>) -> S::Future<'s> + where + Req: 's, + { + ctx.call_nowait(&**self, request) } } @@ -274,8 +287,11 @@ where } #[inline] - fn call(&self, request: Req) -> S::Future<'_> { - (**self).call(request) + fn call<'a>(&'a self, request: Req, ctx: Ctx<'a, Self>) -> S::Future<'a> + where + Req: 'a, + { + ctx.call_nowait(&**self, request) } } diff --git a/ntex-service/src/map.rs b/ntex-service/src/map.rs index d502acc0..4888b0cf 100644 --- a/ntex-service/src/map.rs +++ b/ntex-service/src/map.rs @@ -1,6 +1,6 @@ use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; -use super::{Service, ServiceFactory}; +use super::{Ctx, Service, ServiceCall, ServiceFactory}; /// Service for the `map` combinator, changing the type of a service's response. /// @@ -54,15 +54,19 @@ where crate::forward_poll_shutdown!(service); #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { MapFuture { - fut: self.service.call(req), + fut: ctx.call(&self.service, req), slf: self, } } } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct MapFuture<'f, A, F, Req, Res> where A: Service, @@ -72,7 +76,7 @@ pin_project_lite::pin_project! { { slf: &'f Map, #[pin] - fut: A::Future<'f>, + fut: ServiceCall<'f, A, Req>, } } @@ -154,6 +158,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct MapFactoryFuture<'f, A, F, Req, Res, Cfg> where A: ServiceFactory, @@ -190,7 +195,7 @@ mod tests { use ntex_util::future::{lazy, Ready}; use super::*; - use crate::{fn_factory, Service, ServiceFactory}; + use crate::{fn_factory, Container, Ctx, Service, ServiceFactory}; #[derive(Clone)] struct Srv; @@ -204,14 +209,17 @@ mod tests { Poll::Ready(Ok(())) } - fn call(&self, _: ()) -> Self::Future<'_> { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> + where + (): 'a, + { Ready::Ok(()) } } #[ntex::test] async fn test_service() { - let srv = Srv.map(|_| "ok").clone(); + let srv = Container::new(Srv.map(|_| "ok").clone()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), "ok"); @@ -225,7 +233,7 @@ mod tests { #[ntex::test] async fn test_pipeline() { - let srv = crate::pipeline(Srv).map(|_| "ok").clone(); + let srv = Container::new(crate::pipeline(Srv).map(|_| "ok").clone()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), "ok"); @@ -242,7 +250,7 @@ mod tests { let new_srv = fn_factory(|| async { Ok::<_, ()>(Srv) }) .map(|_| "ok") .clone(); - let srv = new_srv.create(&()).await.unwrap(); + let srv = Container::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("ok")); @@ -253,7 +261,7 @@ mod tests { let new_srv = crate::pipeline_factory(fn_factory(|| async { Ok::<_, ()>(Srv) })) .map(|_| "ok") .clone(); - let srv = new_srv.create(&()).await.unwrap(); + let srv = Container::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("ok")); diff --git a/ntex-service/src/map_config.rs b/ntex-service/src/map_config.rs index 1b6cea4d..4e4e5cd1 100644 --- a/ntex-service/src/map_config.rs +++ b/ntex-service/src/map_config.rs @@ -115,6 +115,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct UnitConfigFuture<'f, A, R, C> where A: ServiceFactory, A: 'f, diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index fa2867e2..487981d1 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -1,20 +1,20 @@ use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; -use super::{Service, ServiceFactory}; +use super::{Ctx, Service, ServiceCall, ServiceFactory}; /// Service for the `map_err` combinator, changing the type of a service's /// error. /// /// This is created by the `ServiceExt::map_err` method. -pub struct MapErr { +pub struct MapErr { service: A, f: F, - _t: PhantomData E>, + _t: PhantomData, } -impl MapErr { +impl MapErr { /// Create new `MapErr` combinator - pub(crate) fn new(service: A, f: F) -> Self + pub(crate) fn new(service: A, f: F) -> Self where A: Service, F: Fn(A::Error) -> E, @@ -27,7 +27,7 @@ impl MapErr { } } -impl Clone for MapErr +impl Clone for MapErr where A: Clone, F: Clone, @@ -42,7 +42,7 @@ where } } -impl Service for MapErr +impl Service for MapErr where A: Service, F: Fn(A::Error) -> E, @@ -57,10 +57,13 @@ where } #[inline] - fn call(&self, req: R) -> Self::Future<'_> { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + R: 'a, + { MapErrFuture { slf: self, - fut: self.service.call(req), + fut: ctx.call(&self.service, req), } } @@ -68,6 +71,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct MapErrFuture<'f, A, R, F, E> where A: Service, @@ -75,9 +79,9 @@ pin_project_lite::pin_project! { R: 'f, F: Fn(A::Error) -> E, { - slf: &'f MapErr, + slf: &'f MapErr, #[pin] - fut: A::Future<'f>, + fut: ServiceCall<'f, A, R>, } } @@ -145,7 +149,7 @@ where type Response = A::Response; type Error = E; - type Service = MapErr; + type Service = MapErr; type InitError = A::InitError; type Future<'f> = MapErrFactoryFuture<'f, A, R, C, F, E> where Self: 'f, C: 'f; @@ -159,6 +163,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct MapErrFactoryFuture<'f, A, R, C, F, E> where A: ServiceFactory, @@ -177,7 +182,7 @@ where A: ServiceFactory, F: Fn(A::Error) -> E + Clone, { - type Output = Result, A::InitError>; + type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -191,12 +196,13 @@ where #[cfg(test)] mod tests { - use super::*; - use crate::{fn_factory, Service, ServiceFactory}; use ntex_util::future::{lazy, Ready}; + use super::*; + use crate::{fn_factory, Container, Ctx, Service, ServiceFactory}; + #[derive(Clone)] - struct Srv; + struct Srv(bool); impl Service<()> for Srv { type Response = (); @@ -204,17 +210,24 @@ mod tests { type Future<'f> = Ready<(), ()>; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Err(())) + if self.0 { + Poll::Ready(Err(())) + } else { + Poll::Ready(Ok(())) + } } - fn call(&self, _: ()) -> Self::Future<'_> { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> + where + (): 'a, + { Ready::Err(()) } } #[ntex::test] async fn test_poll_ready() { - let srv = Srv.map_err(|_| "error"); + let srv = Srv(true).map_err(|_| "error"); let res = lazy(|cx| srv.poll_ready(cx)).await; assert_eq!(res, Poll::Ready(Err("error"))); @@ -224,7 +237,7 @@ mod tests { #[ntex::test] async fn test_service() { - let srv = Srv.map_err(|_| "error").clone(); + let srv = Container::new(Srv(false).map_err(|_| "error").clone()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); @@ -232,7 +245,7 @@ mod tests { #[ntex::test] async fn test_pipeline() { - let srv = crate::pipeline(Srv).map_err(|_| "error").clone(); + let srv = Container::new(crate::pipeline(Srv(false)).map_err(|_| "error").clone()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); @@ -240,10 +253,10 @@ mod tests { #[ntex::test] async fn test_factory() { - let new_srv = fn_factory(|| Ready::<_, ()>::Ok(Srv)) + let new_srv = fn_factory(|| Ready::<_, ()>::Ok(Srv(false))) .map_err(|_| "error") .clone(); - let srv = new_srv.create(&()).await.unwrap(); + let srv = Container::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); @@ -251,10 +264,11 @@ mod tests { #[ntex::test] async fn test_pipeline_factory() { - let new_srv = crate::pipeline_factory(fn_factory(|| async { Ok::(Srv) })) - .map_err(|_| "error") - .clone(); - let srv = new_srv.create(&()).await.unwrap(); + let new_srv = + crate::pipeline_factory(fn_factory(|| async { Ok::(Srv(false)) })) + .map_err(|_| "error") + .clone(); + let srv = Container::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); diff --git a/ntex-service/src/map_init_err.rs b/ntex-service/src/map_init_err.rs index d8dd6e46..5d699471 100644 --- a/ntex-service/src/map_init_err.rs +++ b/ntex-service/src/map_init_err.rs @@ -60,6 +60,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct MapInitErrFuture<'f, A, R, C, F, E> where A: ServiceFactory, diff --git a/ntex-service/src/middleware.rs b/ntex-service/src/middleware.rs index 7f955b83..1f19b0d7 100644 --- a/ntex-service/src/middleware.rs +++ b/ntex-service/src/middleware.rs @@ -136,6 +136,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct ApplyMiddlewareFuture<'f, T, S, R, C> where S: ServiceFactory, @@ -213,7 +214,7 @@ mod tests { use std::marker; use super::*; - use crate::{fn_service, Service, ServiceFactory}; + use crate::{fn_service, Container, Ctx, Service, ServiceCall, ServiceFactory}; #[derive(Clone)] struct Tr(marker::PhantomData); @@ -232,14 +233,17 @@ mod tests { impl, R> Service for Srv { type Response = S::Response; type Error = S::Error; - type Future<'f> = S::Future<'f> where Self: 'f, R: 'f; + type Future<'f> = ServiceCall<'f, S, R> where Self: 'f, R: 'f; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } - fn call(&self, req: R) -> Self::Future<'_> { - self.0.call(req) + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + R: 'a, + { + ctx.call(&self.0, req) } } @@ -251,7 +255,7 @@ mod tests { ) .clone(); - let srv = factory.create(&()).await.unwrap().clone(); + let srv = Container::new(factory.create(&()).await.unwrap().clone()); let res = srv.call(10).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), 20); @@ -267,7 +271,7 @@ mod tests { .apply(Rc::new(Tr(marker::PhantomData).clone())) .clone(); - let srv = factory.create(&()).await.unwrap().clone(); + let srv = Container::new(factory.create(&()).await.unwrap().clone()); let res = srv.call(10).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), 20); diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 7cbb46fb..2cf90e16 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; use crate::and_then::{AndThen, AndThenFactory}; +use crate::ctx::{Container, Ctx, ServiceCall}; use crate::map::{Map, MapFactory}; use crate::map_err::{MapErr, MapErrFactory}; use crate::map_init_err::MapInitErr; @@ -105,7 +106,7 @@ impl> Pipeline { /// /// Note that this function consumes the receiving service and returns a /// wrapped version of it. - pub fn map_err(self, f: F) -> Pipeline> + pub fn map_err(self, f: F) -> Pipeline> where Self: Sized, F: Fn(Svc::Error) -> Err, @@ -115,6 +116,11 @@ impl> Pipeline { _t: PhantomData, } } + + /// Create service container + pub fn container(self) -> Container { + Container::new(self.service) + } } impl Clone for Pipeline @@ -132,14 +138,17 @@ where impl> Service for Pipeline { type Response = Svc::Response; type Error = Svc::Error; - type Future<'f> = Svc::Future<'f> where Self: 'f, Req: 'f; + type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { - self.service.call(req) + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + Req: 'a, + { + ctx.call(&self.service, req) } } diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 2949dc49..3e6c26e7 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -1,6 +1,6 @@ use std::{future::Future, pin::Pin, task::Context, task::Poll}; -use super::{Service, ServiceFactory}; +use super::{Ctx, Service, ServiceCall, ServiceFactory}; /// Service for the `then` combinator, chaining a computation onto the end of /// another service. @@ -60,17 +60,22 @@ where } #[inline] - fn call(&self, req: R) -> Self::Future<'_> { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> + where + R: 'a, + { ThenServiceResponse { slf: self, state: State::A { - fut: self.svc1.call(req), + fut: ctx.call(&self.svc1, req), + ctx, }, } } } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct ThenServiceResponse<'f, A, B, R> where A: Service, @@ -93,8 +98,8 @@ pin_project_lite::pin_project! { B: 'f, R: 'f, { - A { #[pin] fut: A::Future<'f> }, - B { #[pin] fut: B::Future<'f> }, + A { #[pin] fut: ServiceCall<'f, A, R>, ctx: Ctx<'f, Then> }, + B { #[pin] fut: ServiceCall<'f, B, Result> }, Empty, } } @@ -110,9 +115,9 @@ where let mut this = self.as_mut().project(); match this.state.as_mut().project() { - StateProject::A { fut } => match fut.poll(cx) { + StateProject::A { fut, ctx } => match fut.poll(cx) { Poll::Ready(res) => { - let fut = this.slf.svc2.call(res); + let fut = ctx.call(&this.slf.svc2, res); this.state.set(State::B { fut }); self.poll(cx) } @@ -184,6 +189,7 @@ where } pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] pub struct ThenFactoryResponse<'f, A, B, R, C> where A: ServiceFactory, @@ -245,7 +251,7 @@ mod tests { use ntex_util::future::{lazy, Ready}; use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; - use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; + use crate::{pipeline, pipeline_factory, Ctx, Service, ServiceFactory}; #[derive(Clone)] struct Srv1(Rc>); @@ -260,7 +266,14 @@ mod tests { Poll::Ready(Ok(())) } - fn call(&self, req: Result<&'static str, &'static str>) -> Self::Future<'_> { + fn call<'a>( + &'a self, + req: Result<&'static str, &'static str>, + _: Ctx<'a, Self>, + ) -> Self::Future<'a> + where + Result<&'static str, &'static str>: 'a, + { match req { Ok(msg) => Ready::Ok(msg), Err(_) => Ready::Err(()), @@ -278,10 +291,17 @@ mod tests { fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { self.0.set(self.0.get() + 1); - Poll::Ready(Err(())) + Poll::Ready(Ok(())) } - fn call(&self, req: Result<&'static str, ()>) -> Self::Future<'_> { + fn call<'a>( + &'a self, + req: Result<&'static str, ()>, + _: Ctx<'a, Self>, + ) -> Self::Future<'a> + where + Result<&'static str, ()>: 'a, + { match req { Ok(msg) => Ready::Ok((msg, "ok")), Err(()) => Ready::Ok(("srv2", "err")), @@ -294,7 +314,7 @@ mod tests { let cnt = Rc::new(Cell::new(0)); let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone())); let res = lazy(|cx| srv.poll_ready(cx)).await; - assert_eq!(res, Poll::Ready(Err(()))); + assert_eq!(res, Poll::Ready(Ok(()))); assert_eq!(cnt.get(), 2); let res = lazy(|cx| srv.poll_shutdown(cx)).await; assert_eq!(res, Poll::Ready(())); @@ -303,9 +323,13 @@ mod tests { #[ntex::test] async fn test_call() { let cnt = Rc::new(Cell::new(0)); - let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt)).clone(); + let srv = pipeline(Srv1(cnt.clone())) + .then(Srv2(cnt)) + .clone() + .container(); let res = srv.call(Ok("srv1")).await; + println!("=========== {:?}", res); assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "ok")); @@ -322,7 +346,7 @@ mod tests { let factory = pipeline_factory(blank) .then(move || Ready::Ok(Srv2(cnt.clone()))) .clone(); - let srv = factory.create(&()).await.unwrap(); + let srv = factory.container(&()).await.unwrap(); let res = srv.call(Ok("srv1")).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "ok"));