Slight optimization and docs

This commit is contained in:
Nikolay Kim 2023-06-16 23:29:56 +06:00
parent 1b199712a9
commit b350280dba
4 changed files with 112 additions and 101 deletions

View file

@ -1,7 +1,8 @@
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
use std::{cell::RefCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; use std::{future::Future, marker, pin::Pin, rc::Rc, task, task::Poll};
use super::{Ctx, IntoService, IntoServiceFactory, Service, ServiceCall, ServiceFactory}; use super::ctx::{Ctx, ServiceCall, Waiters};
use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Apply transform function to a service. /// Apply transform function to a service.
pub fn apply_fn<T, Req, F, R, In, Out, Err, U>( pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
@ -10,7 +11,7 @@ pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
) -> Apply<T, Req, F, R, In, Out, Err> ) -> Apply<T, Req, F, R, In, Out, Err>
where where
T: Service<Req, Error = Err>, T: Service<Req, Error = Err>,
F: Fn(In, ApplyService<T>) -> R, for<'r> F: Fn(In, ApplyService<T>) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
U: IntoService<T, Req>, U: IntoService<T, Req>,
{ {
@ -74,8 +75,7 @@ where
pub struct ApplyService<S> { pub struct ApplyService<S> {
svc: Rc<S>, svc: Rc<S>,
index: usize, waiters: Waiters,
waiters: Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
} }
impl<S> ApplyService<S> { impl<S> ApplyService<S> {
@ -83,7 +83,7 @@ impl<S> ApplyService<S> {
where where
S: Service<R>, S: Service<R>,
{ {
Ctx::<S>::new(self.index, &self.waiters).call(&self.svc, req) Ctx::<S>::new(&self.waiters).call(&self.svc, req)
} }
} }
@ -102,11 +102,9 @@ where
#[inline] #[inline]
fn call<'a>(&'a self, req: In, ctx: Ctx<'a, Self>) -> Self::Future<'a> { fn call<'a>(&'a self, req: In, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
let (index, waiters) = ctx.into_inner();
let svc = ApplyService { let svc = ApplyService {
index,
waiters: waiters.clone(),
svc: self.service.clone(), svc: self.service.clone(),
waiters: ctx.waiters().clone(),
}; };
(self.f)(req, svc) (self.f)(req, svc)
} }
@ -161,7 +159,7 @@ impl<T, Req, Cfg, F, R, In, Out, Err> ServiceFactory<In, Cfg>
where where
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, ApplyService<T::Service>) -> R + Clone, F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>, for<'r> R: Future<Output = Result<Out, Err>> + 'r,
{ {
type Response = Out; type Response = Out;
type Error = Err; type Error = Err;
@ -186,6 +184,7 @@ pin_project_lite::pin_project! {
T: ServiceFactory<Req, Cfg, Error = Err>, T: ServiceFactory<Req, Cfg, Error = Err>,
T: 'f, T: 'f,
F: Fn(In, ApplyService<T::Service>) -> R, F: Fn(In, ApplyService<T::Service>) -> R,
T::Service: 'f,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
Cfg: 'f, Cfg: 'f,
{ {

View file

@ -1,7 +1,6 @@
use std::task::{Context, Poll, Waker}; use std::{future::Future, pin::Pin, task::Context, task::Poll};
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc};
use crate::Ctx; use crate::ctx::{Ctx, Waiters};
pub type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>; pub type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>;
@ -44,8 +43,7 @@ trait ServiceObj<Req> {
fn call<'a>( fn call<'a>(
&'a self, &'a self,
req: Req, req: Req,
idx: usize, waiters: &'a Waiters,
waiters: &'a Rc<RefCell<slab::Slab<Option<Waker>>>>,
) -> BoxFuture<'a, Self::Response, Self::Error>; ) -> BoxFuture<'a, Self::Response, Self::Error>;
} }
@ -71,10 +69,9 @@ where
fn call<'a>( fn call<'a>(
&'a self, &'a self,
req: Req, req: Req,
idx: usize, waiters: &'a Waiters,
waiters: &'a Rc<RefCell<slab::Slab<Option<Waker>>>>,
) -> BoxFuture<'a, Self::Response, Self::Error> { ) -> BoxFuture<'a, Self::Response, Self::Error> {
Box::pin(Ctx::<'a, S>::new(idx, waiters).call_nowait(self, req)) Box::pin(Ctx::<'a, S>::new(waiters).call_nowait(self, req))
} }
} }
@ -135,8 +132,7 @@ where
#[inline] #[inline]
fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> { fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
let (index, waiters) = ctx.into_inner(); self.0.call(req, ctx.waiters())
self.0.call(req, index, waiters)
} }
} }

View file

@ -1,22 +1,75 @@
use std::{cell::RefCell, future::Future, marker, ops, pin::Pin, rc::Rc, task, task::Poll}; use std::{
cell::UnsafeCell, future::Future, marker, ops, pin::Pin, rc::Rc, task, task::Poll,
};
use crate::{Service, ServiceFactory}; use crate::{Service, ServiceFactory};
/// Container for a service.
///
/// Container allows to call enclosed service and adds support of shared readiness.
pub struct Container<S> { pub struct Container<S> {
svc: Rc<S>, svc: Rc<S>,
waiters: Waiters,
}
pub struct Ctx<'a, S: ?Sized> {
waiters: &'a Waiters,
_t: marker::PhantomData<Rc<S>>,
}
pub(crate) struct Waiters {
index: usize, index: usize,
waiters: Rc<RefCell<slab::Slab<Option<task::Waker>>>>, waiters: Rc<UnsafeCell<slab::Slab<Option<task::Waker>>>>,
}
impl Waiters {
#[allow(clippy::mut_from_ref)]
fn get(&self) -> &mut slab::Slab<Option<task::Waker>> {
unsafe { &mut *self.waiters.as_ref().get() }
}
fn notify(&self) {
for (_, waker) in self.get().iter_mut() {
if let Some(waker) = waker.take() {
waker.wake();
}
}
}
fn register(&self, cx: &mut task::Context<'_>) {
self.get()[self.index] = Some(cx.waker().clone());
}
}
impl Clone for Waiters {
fn clone(&self) -> Self {
Waiters {
index: self.get().insert(None),
waiters: self.waiters.clone(),
}
}
}
impl Drop for Waiters {
#[inline]
fn drop(&mut self) {
self.get().remove(self.index);
self.notify();
}
} }
impl<S> Container<S> { impl<S> Container<S> {
#[inline] #[inline]
/// Construct new container instance.
pub fn new(svc: S) -> Self { pub fn new(svc: S) -> Self {
let mut waiters = slab::Slab::new(); let mut waiters = slab::Slab::new();
let index = waiters.insert(None); let index = waiters.insert(None);
Container { Container {
index,
svc: Rc::new(svc), svc: Rc::new(svc),
waiters: Rc::new(RefCell::new(waiters)), waiters: Waiters {
index,
waiters: Rc::new(UnsafeCell::new(waiters)),
},
} }
} }
@ -27,9 +80,10 @@ impl<S> Container<S> {
S: Service<R>, S: Service<R>,
{ {
let res = self.svc.poll_ready(cx); let res = self.svc.poll_ready(cx);
if res.is_pending() { if res.is_pending() {
self.waiters.borrow_mut()[self.index] = Some(cx.waker().clone()); self.waiters.register(cx)
} else {
self.waiters.notify()
} }
res res
} }
@ -50,7 +104,6 @@ impl<S> Container<S> {
S: Service<R>, S: Service<R>,
{ {
let ctx = Ctx::<'a, S> { let ctx = Ctx::<'a, S> {
index: self.index,
waiters: &self.waiters, waiters: &self.waiters,
_t: marker::PhantomData, _t: marker::PhantomData,
}; };
@ -61,12 +114,10 @@ impl<S> Container<S> {
f: &F, f: &F,
cfg: C, cfg: C,
) -> ContainerFactory<'_, F, R, C> { ) -> ContainerFactory<'_, F, R, C> {
ContainerFactory { ContainerFactory { fut: f.create(cfg) }
fut: f.create(cfg),
_t: marker::PhantomData,
}
} }
/// Extract service if container hadnt been cloned before.
pub fn into_service(self) -> Option<S> { pub fn into_service(self) -> Option<S> {
let svc = self.svc.clone(); let svc = self.svc.clone();
drop(self); drop(self);
@ -75,11 +126,9 @@ impl<S> Container<S> {
} }
impl<S> Clone for Container<S> { impl<S> Clone for Container<S> {
#[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
let index = self.waiters.borrow_mut().insert(None);
Self { Self {
index,
svc: self.svc.clone(), svc: self.svc.clone(),
waiters: self.waiters.clone(), waiters: self.waiters.clone(),
} }
@ -87,6 +136,7 @@ impl<S> Clone for Container<S> {
} }
impl<S> From<S> for Container<S> { impl<S> From<S> for Container<S> {
#[inline]
fn from(svc: S) -> Self { fn from(svc: S) -> Self {
Container::new(svc) Container::new(svc)
} }
@ -101,41 +151,16 @@ impl<S> ops::Deref for Container<S> {
} }
} }
impl<S> Drop for Container<S> {
fn drop(&mut self) {
let mut waiters = self.waiters.borrow_mut();
waiters.remove(self.index);
for (_, waker) in &mut *waiters {
if let Some(waker) = waker.take() {
waker.wake();
}
}
}
}
pub struct Ctx<'a, S: ?Sized> {
index: usize,
waiters: &'a Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
_t: marker::PhantomData<Rc<S>>,
}
impl<'a, S: ?Sized> Ctx<'a, S> { impl<'a, S: ?Sized> Ctx<'a, S> {
pub(crate) fn new( pub(crate) fn new(waiters: &'a Waiters) -> Self {
index: usize,
waiters: &'a Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
) -> Self {
Self { Self {
index,
waiters, waiters,
_t: marker::PhantomData, _t: marker::PhantomData,
} }
} }
pub(crate) fn into_inner( pub(crate) fn waiters(self) -> &'a Waiters {
self, self.waiters
) -> (usize, &'a Rc<RefCell<slab::Slab<Option<task::Waker>>>>) {
(self.index, self.waiters)
} }
/// Call service, do not check service readiness /// Call service, do not check service readiness
@ -147,7 +172,6 @@ impl<'a, S: ?Sized> Ctx<'a, S> {
svc.call( svc.call(
req, req,
Ctx { Ctx {
index: self.index,
waiters: self.waiters, waiters: self.waiters,
_t: marker::PhantomData, _t: marker::PhantomData,
}, },
@ -165,7 +189,6 @@ impl<'a, S: ?Sized> Ctx<'a, S> {
state: ServiceCallState::Ready { state: ServiceCallState::Ready {
svc, svc,
req: Some(req), req: Some(req),
index: self.index,
waiters: self.waiters, waiters: self.waiters,
}, },
} }
@ -175,9 +198,9 @@ impl<'a, S: ?Sized> Ctx<'a, S> {
impl<'a, S: ?Sized> Copy for Ctx<'a, S> {} impl<'a, S: ?Sized> Copy for Ctx<'a, S> {}
impl<'a, S: ?Sized> Clone for Ctx<'a, S> { impl<'a, S: ?Sized> Clone for Ctx<'a, S> {
#[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
index: self.index,
waiters: self.waiters, waiters: self.waiters,
_t: marker::PhantomData, _t: marker::PhantomData,
} }
@ -209,8 +232,7 @@ pin_project_lite::pin_project! {
{ {
Ready { req: Option<Req>, Ready { req: Option<Req>,
svc: &'a T, svc: &'a T,
index: usize, waiters: &'a Waiters,
waiters: &'a Rc<RefCell<slab::Slab<Option<task::Waker>>>>,
}, },
Call { #[pin] fut: T::Future<'a> }, Call { #[pin] fut: T::Future<'a> },
Empty, Empty,
@ -227,24 +249,15 @@ where
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
match this.state.as_mut().project() { match this.state.as_mut().project() {
ServiceCallStateProject::Ready { ServiceCallStateProject::Ready { req, svc, waiters } => {
req, match svc.poll_ready(cx)? {
svc,
index,
waiters,
} => match svc.poll_ready(cx)? {
Poll::Ready(()) => { Poll::Ready(()) => {
for (_, waker) in &mut *waiters.borrow_mut() { waiters.notify();
if let Some(waker) = waker.take() {
waker.wake();
}
}
let fut = svc.call( let fut = svc.call(
req.take().unwrap(), req.take().unwrap(),
Ctx { Ctx {
waiters, waiters,
index: *index,
_t: marker::PhantomData, _t: marker::PhantomData,
}, },
); );
@ -252,10 +265,11 @@ where
self.poll(cx) self.poll(cx)
} }
Poll::Pending => { Poll::Pending => {
waiters.borrow_mut()[*index] = Some(cx.waker().clone()); waiters.register(cx);
Poll::Pending Poll::Pending
} }
}, }
}
ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| { ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| {
this.state.set(ServiceCallState::Empty); this.state.set(ServiceCallState::Empty);
r r
@ -277,7 +291,6 @@ pin_project_lite::pin_project! {
{ {
#[pin] #[pin]
fut: F::Future<'f>, fut: F::Future<'f>,
_t: marker::PhantomData<(R, C)>,
} }
} }

View file

@ -80,6 +80,10 @@ pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
/// ```rust,ignore /// ```rust,ignore
/// async fn my_service(req: u8) -> Result<u64, Infallible>; /// async fn my_service(req: u8) -> Result<u64, Infallible>;
/// ``` /// ```
///
/// Service cannot be called directly, it must be wrapped to an instance of [`Container`] or
/// by using `ctx` argument of the call method in case of chanined services.
///
pub trait Service<Req> { pub trait Service<Req> {
/// Responses given by the service. /// Responses given by the service.
type Response; type Response;
@ -96,11 +100,9 @@ pub trait Service<Req> {
/// Process the request and return the response asynchronously. /// Process the request and return the response asynchronously.
/// ///
/// This function is expected to be callable off-task. As such, implementations of `call` /// This function is expected to be callable off-task. As such, implementations of `call`
/// should take care to not call `poll_ready`. If the service is at capacity and the request /// should take care to not call `poll_ready`. Caller of the service verifies readiness,
/// is unable to be handled, the returned `Future` should resolve to an error. /// Only way to make a `call` is to use `ctx` argument, it enforces readiness before calling
/// /// service.
/// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be
/// resilient to this fact.
fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>; fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>;
#[inline] #[inline]
@ -116,7 +118,8 @@ pub trait Service<Req> {
/// # Notes /// # Notes
/// ///
/// 1. `.poll_ready()` might be called on different task from actual service call. /// 1. `.poll_ready()` might be called on different task from actual service call.
/// 1. In case of chained services, `.poll_ready()` is called for all services at once. /// 2. In case of chained services, `.poll_ready()` is called for all services at once.
/// 3. Every `.call()` in chained services enforces readiness.
fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }