diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 479b183f..f5d60849 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -1,7 +1,8 @@ #![allow(clippy::type_complexity)] -use std::{cell::RefCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; +use std::{future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; -use super::{Ctx, IntoService, IntoServiceFactory, Service, ServiceCall, ServiceFactory}; +use super::ctx::{Ctx, ServiceCall, Waiters}; +use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; /// Apply transform function to a service. pub fn apply_fn( @@ -10,7 +11,7 @@ pub fn apply_fn( ) -> Apply where T: Service, - F: Fn(In, ApplyService) -> R, + for<'r> F: Fn(In, ApplyService) -> R, R: Future>, U: IntoService, { @@ -74,8 +75,7 @@ where pub struct ApplyService { svc: Rc, - index: usize, - waiters: Rc>>>, + waiters: Waiters, } impl ApplyService { @@ -83,7 +83,7 @@ impl ApplyService { where S: Service, { - Ctx::::new(self.index, &self.waiters).call(&self.svc, req) + Ctx::::new(&self.waiters).call(&self.svc, req) } } @@ -102,11 +102,9 @@ where #[inline] fn call<'a>(&'a self, req: In, ctx: Ctx<'a, Self>) -> Self::Future<'a> { - let (index, waiters) = ctx.into_inner(); let svc = ApplyService { - index, - waiters: waiters.clone(), svc: self.service.clone(), + waiters: ctx.waiters().clone(), }; (self.f)(req, svc) } @@ -161,7 +159,7 @@ impl ServiceFactory where T: ServiceFactory, F: Fn(In, ApplyService) -> R + Clone, - R: Future>, + for<'r> R: Future> + 'r, { type Response = Out; type Error = Err; @@ -186,6 +184,7 @@ pin_project_lite::pin_project! { T: ServiceFactory, T: 'f, F: Fn(In, ApplyService) -> R, + T::Service: 'f, R: Future>, Cfg: 'f, { diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index cb580830..b3ff4c97 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -1,7 +1,6 @@ -use std::task::{Context, Poll, Waker}; -use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc}; +use std::{future::Future, pin::Pin, task::Context, task::Poll}; -use crate::Ctx; +use crate::ctx::{Ctx, Waiters}; pub type BoxFuture<'a, I, E> = Pin> + 'a>>; @@ -44,8 +43,7 @@ trait ServiceObj { fn call<'a>( &'a self, req: Req, - idx: usize, - waiters: &'a Rc>>>, + waiters: &'a Waiters, ) -> BoxFuture<'a, Self::Response, Self::Error>; } @@ -71,10 +69,9 @@ where fn call<'a>( &'a self, req: Req, - idx: usize, - waiters: &'a Rc>>>, + waiters: &'a Waiters, ) -> BoxFuture<'a, Self::Response, Self::Error> { - Box::pin(Ctx::<'a, S>::new(idx, waiters).call_nowait(self, req)) + Box::pin(Ctx::<'a, S>::new(waiters).call_nowait(self, req)) } } @@ -135,8 +132,7 @@ where #[inline] fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> { - let (index, waiters) = ctx.into_inner(); - self.0.call(req, index, waiters) + self.0.call(req, ctx.waiters()) } } diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 91461cd6..1397f237 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,22 +1,75 @@ -use std::{cell::RefCell, future::Future, marker, ops, pin::Pin, rc::Rc, task, task::Poll}; +use std::{ + cell::UnsafeCell, future::Future, marker, ops, pin::Pin, rc::Rc, task, task::Poll, +}; use crate::{Service, ServiceFactory}; +/// Container for a service. +/// +/// Container allows to call enclosed service and adds support of shared readiness. pub struct Container { svc: Rc, + waiters: Waiters, +} + +pub struct Ctx<'a, S: ?Sized> { + waiters: &'a Waiters, + _t: marker::PhantomData>, +} + +pub(crate) struct Waiters { index: usize, - waiters: Rc>>>, + waiters: Rc>>>, +} + +impl Waiters { + #[allow(clippy::mut_from_ref)] + fn get(&self) -> &mut slab::Slab> { + unsafe { &mut *self.waiters.as_ref().get() } + } + + fn notify(&self) { + for (_, waker) in self.get().iter_mut() { + if let Some(waker) = waker.take() { + waker.wake(); + } + } + } + + fn register(&self, cx: &mut task::Context<'_>) { + self.get()[self.index] = Some(cx.waker().clone()); + } +} + +impl Clone for Waiters { + fn clone(&self) -> Self { + Waiters { + index: self.get().insert(None), + waiters: self.waiters.clone(), + } + } +} + +impl Drop for Waiters { + #[inline] + fn drop(&mut self) { + self.get().remove(self.index); + self.notify(); + } } impl Container { #[inline] + /// Construct new container instance. pub fn new(svc: S) -> Self { let mut waiters = slab::Slab::new(); let index = waiters.insert(None); Container { - index, svc: Rc::new(svc), - waiters: Rc::new(RefCell::new(waiters)), + waiters: Waiters { + index, + waiters: Rc::new(UnsafeCell::new(waiters)), + }, } } @@ -27,9 +80,10 @@ impl Container { S: Service, { let res = self.svc.poll_ready(cx); - if res.is_pending() { - self.waiters.borrow_mut()[self.index] = Some(cx.waker().clone()); + self.waiters.register(cx) + } else { + self.waiters.notify() } res } @@ -50,7 +104,6 @@ impl Container { S: Service, { let ctx = Ctx::<'a, S> { - index: self.index, waiters: &self.waiters, _t: marker::PhantomData, }; @@ -61,12 +114,10 @@ impl Container { f: &F, cfg: C, ) -> ContainerFactory<'_, F, R, C> { - ContainerFactory { - fut: f.create(cfg), - _t: marker::PhantomData, - } + ContainerFactory { fut: f.create(cfg) } } + /// Extract service if container hadnt been cloned before. pub fn into_service(self) -> Option { let svc = self.svc.clone(); drop(self); @@ -75,11 +126,9 @@ impl Container { } impl Clone for Container { + #[inline] fn clone(&self) -> Self { - let index = self.waiters.borrow_mut().insert(None); - Self { - index, svc: self.svc.clone(), waiters: self.waiters.clone(), } @@ -87,6 +136,7 @@ impl Clone for Container { } impl From for Container { + #[inline] fn from(svc: S) -> Self { Container::new(svc) } @@ -101,41 +151,16 @@ impl ops::Deref for Container { } } -impl Drop for Container { - fn drop(&mut self) { - let mut waiters = self.waiters.borrow_mut(); - - waiters.remove(self.index); - for (_, waker) in &mut *waiters { - if let Some(waker) = waker.take() { - waker.wake(); - } - } - } -} - -pub struct Ctx<'a, S: ?Sized> { - index: usize, - waiters: &'a Rc>>>, - _t: marker::PhantomData>, -} - impl<'a, S: ?Sized> Ctx<'a, S> { - pub(crate) fn new( - index: usize, - waiters: &'a Rc>>>, - ) -> Self { + pub(crate) fn new(waiters: &'a Waiters) -> Self { Self { - index, waiters, _t: marker::PhantomData, } } - pub(crate) fn into_inner( - self, - ) -> (usize, &'a Rc>>>) { - (self.index, self.waiters) + pub(crate) fn waiters(self) -> &'a Waiters { + self.waiters } /// Call service, do not check service readiness @@ -147,7 +172,6 @@ impl<'a, S: ?Sized> Ctx<'a, S> { svc.call( req, Ctx { - index: self.index, waiters: self.waiters, _t: marker::PhantomData, }, @@ -165,7 +189,6 @@ impl<'a, S: ?Sized> Ctx<'a, S> { state: ServiceCallState::Ready { svc, req: Some(req), - index: self.index, waiters: self.waiters, }, } @@ -175,9 +198,9 @@ impl<'a, S: ?Sized> Ctx<'a, S> { impl<'a, S: ?Sized> Copy for Ctx<'a, S> {} impl<'a, S: ?Sized> Clone for Ctx<'a, S> { + #[inline] fn clone(&self) -> Self { Self { - index: self.index, waiters: self.waiters, _t: marker::PhantomData, } @@ -209,8 +232,7 @@ pin_project_lite::pin_project! { { Ready { req: Option, svc: &'a T, - index: usize, - waiters: &'a Rc>>>, + waiters: &'a Waiters, }, Call { #[pin] fut: T::Future<'a> }, Empty, @@ -227,35 +249,27 @@ where let mut this = self.as_mut().project(); match this.state.as_mut().project() { - ServiceCallStateProject::Ready { - req, - svc, - index, - waiters, - } => match svc.poll_ready(cx)? { - Poll::Ready(()) => { - for (_, waker) in &mut *waiters.borrow_mut() { - if let Some(waker) = waker.take() { - waker.wake(); - } - } + ServiceCallStateProject::Ready { req, svc, waiters } => { + match svc.poll_ready(cx)? { + Poll::Ready(()) => { + waiters.notify(); - let fut = svc.call( - req.take().unwrap(), - Ctx { - waiters, - index: *index, - _t: marker::PhantomData, - }, - ); - this.state.set(ServiceCallState::Call { fut }); - self.poll(cx) + let fut = svc.call( + req.take().unwrap(), + Ctx { + waiters, + _t: marker::PhantomData, + }, + ); + this.state.set(ServiceCallState::Call { fut }); + self.poll(cx) + } + Poll::Pending => { + waiters.register(cx); + Poll::Pending + } } - Poll::Pending => { - waiters.borrow_mut()[*index] = Some(cx.waker().clone()); - Poll::Pending - } - }, + } ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| { this.state.set(ServiceCallState::Empty); r @@ -277,7 +291,6 @@ pin_project_lite::pin_project! { { #[pin] fut: F::Future<'f>, - _t: marker::PhantomData<(R, C)>, } } diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 16333b9b..364ffbac 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -80,6 +80,10 @@ pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; /// ```rust,ignore /// async fn my_service(req: u8) -> Result; /// ``` +/// +/// Service cannot be called directly, it must be wrapped to an instance of [`Container`] or +/// by using `ctx` argument of the call method in case of chanined services. +/// pub trait Service { /// Responses given by the service. type Response; @@ -96,11 +100,9 @@ pub trait Service { /// Process the request and return the response asynchronously. /// /// This function is expected to be callable off-task. As such, implementations of `call` - /// should take care to not call `poll_ready`. If the service is at capacity and the request - /// is unable to be handled, the returned `Future` should resolve to an error. - /// - /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be - /// resilient to this fact. + /// should take care to not call `poll_ready`. Caller of the service verifies readiness, + /// Only way to make a `call` is to use `ctx` argument, it enforces readiness before calling + /// service. fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>; #[inline] @@ -116,7 +118,8 @@ pub trait Service { /// # Notes /// /// 1. `.poll_ready()` might be called on different task from actual service call. - /// 1. In case of chained services, `.poll_ready()` is called for all services at once. + /// 2. In case of chained services, `.poll_ready()` is called for all services at once. + /// 3. Every `.call()` in chained services enforces readiness. fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll> { Poll::Ready(Ok(())) }