This commit is contained in:
Nikolay Kim 2024-10-31 23:08:45 +05:00
parent 595c7332e6
commit 5697be0f81
15 changed files with 222 additions and 254 deletions

View file

@ -18,6 +18,7 @@ path = "src/lib.rs"
[dependencies]
slab = "0.4"
bitflags = "2"
pin-project-lite = "0.2"
[dev-dependencies]
ntex = "2"

View file

@ -1,3 +1,5 @@
use std::future::Future;
use super::{util, Service, ServiceCtx, ServiceFactory};
#[derive(Clone, Debug)]
@ -26,13 +28,8 @@ where
type Error = A::Error;
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
util::ready(&self.svc1, &self.svc2, ctx).await
}
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
util::unready(&self.svc1, &self.svc2).await
async fn ready(&self) -> Option<impl Future<Output = Result<(), Self::Error>>> {
util::ready(&self.svc1, &self.svc2).await
}
#[inline]

View file

@ -100,13 +100,11 @@ where
type Error = Err;
#[inline]
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Err> {
self.service.ready().await.map_err(From::from)
}
#[inline]
async fn unready(&self) -> Result<(), Err> {
self.service.unready().await.map_err(From::from)
async fn ready(&self) -> Option<impl Future<Output = Result<(), Err>>> {
self.service
.ready()
.await
.map(|fut| async move { fut.await.map_err(From::from) })
}
#[inline]

View file

@ -3,6 +3,7 @@ use std::{fmt, future::Future, pin::Pin};
use crate::ctx::{ServiceCtx, WaitersRef};
type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>;
type BoxFutureOne<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub struct BoxService<Req, Res, Err>(Box<dyn ServiceObj<Req, Response = Res, Error = Err>>);
pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(
Box<dyn ServiceFactoryObj<Req, Cfg, Response = Res, Error = Err, InitError = InitErr>>,
@ -48,14 +49,7 @@ trait ServiceObj<Req> {
type Response;
type Error;
fn ready<'a>(
&'a self,
idx: u32,
bound: bool,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, (), Self::Error>;
fn unready(&self) -> BoxFuture<'_, (), Self::Error>;
fn ready<'a>(&'a self) -> BoxFutureOne<'a, Option<BoxFuture<'a, (), Self::Error>>>;
fn call<'a>(
&'a self,
@ -76,24 +70,17 @@ where
type Error = S::Error;
#[inline]
fn ready<'a>(
&'a self,
idx: u32,
bound: bool,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, (), Self::Error> {
fn ready<'a>(&'a self) -> BoxFutureOne<'a, Option<BoxFuture<'a, (), Self::Error>>> {
Box::pin(async move {
ServiceCtx::<'a, S>::from_ref(idx, bound, waiters)
.ready(self)
.await
if let Some(fut) = self.ready().await {
let r: BoxFuture<'a, (), Self::Error> = Box::pin(fut);
Some(r)
} else {
None
}
})
}
#[inline]
fn unready(&self) -> BoxFuture<'_, (), Self::Error> {
Box::pin(crate::Service::unready(self))
}
#[inline]
fn shutdown<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
Box::pin(crate::Service::shutdown(self))
@ -107,7 +94,7 @@ where
waiters: &'a WaitersRef,
) -> BoxFuture<'a, Self::Response, Self::Error> {
Box::pin(async move {
ServiceCtx::<'a, S>::from_ref(idx, false, waiters)
ServiceCtx::<'a, S>::from_ref(idx, waiters)
.call_nowait(self, req)
.await
})
@ -159,14 +146,8 @@ where
type Error = Err;
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let (idx, bound, waiters) = ctx.inner();
self.0.ready(idx, bound, waiters).await
}
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
self.0.unready().await
async fn ready(&self) -> Option<impl Future<Output = Result<(), Self::Error>>> {
self.0.ready().await
}
#[inline]
@ -176,7 +157,7 @@ where
#[inline]
async fn call(&self, req: Req, ctx: ServiceCtx<'_, Self>) -> Result<Res, Err> {
let (idx, _, waiters) = ctx.inner();
let (idx, waiters) = ctx.inner();
self.0.call(req, idx, waiters).await
}
}

View file

@ -172,7 +172,6 @@ impl<Svc: Service<Req>, Req> Service<Req> for ServiceChain<Svc, Req> {
type Error = Svc::Error;
crate::forward_ready!(service);
crate::forward_unready!(service);
crate::forward_shutdown!(service);
#[inline]

View file

@ -5,7 +5,6 @@ use crate::Service;
pub struct ServiceCtx<'a, S: ?Sized> {
idx: u32,
bound: bool,
waiters: &'a WaitersRef,
_t: marker::PhantomData<Rc<S>>,
}
@ -78,6 +77,10 @@ impl WaitersRef {
self.state.get().contains(Flags::READY)
}
pub(crate) fn is_bound(&self) -> bool {
self.state.get().contains(Flags::BOUND)
}
pub(crate) fn bind(&self) {
self.state.set(Flags::BOUND);
self.notify();
@ -168,32 +171,21 @@ impl<'a, S> ServiceCtx<'a, S> {
pub(crate) fn new(waiters: &'a Waiters) -> Self {
Self {
idx: waiters.index,
bound: false,
waiters: waiters.get_ref(),
_t: marker::PhantomData,
}
}
pub(crate) fn new_bound(waiters: &'a Waiters) -> Self {
Self {
idx: waiters.index,
bound: true,
waiters: waiters.get_ref(),
_t: marker::PhantomData,
}
}
pub(crate) fn from_ref(idx: u32, bound: bool, waiters: &'a WaitersRef) -> Self {
pub(crate) fn from_ref(idx: u32, waiters: &'a WaitersRef) -> Self {
Self {
idx,
bound,
waiters,
_t: marker::PhantomData,
}
}
pub(crate) fn inner(self) -> (u32, bool, &'a WaitersRef) {
(self.idx, self.bound, self.waiters)
pub(crate) fn inner(self) -> (u32, &'a WaitersRef) {
(self.idx, self.waiters)
}
/// Mark service as un-ready, force readiness re-evaluation for pipeline
@ -205,36 +197,23 @@ impl<'a, S> ServiceCtx<'a, S> {
}
/// Returns when the service is able to process requests.
pub async fn ready<T, R>(&self, svc: &'a T) -> Result<(), T::Error>
pub async fn check_readiness<T, R>(&self, svc: &'a T) -> Result<(), T::Error>
where
T: Service<R>,
{
if self.bound {
svc.ready(ServiceCtx {
idx: self.idx,
bound: true,
waiters: self.waiters,
_t: marker::PhantomData,
})
.await
// active readiness
if self.waiters.is_ready() {
Ok(())
} else {
// active readiness
if self.waiters.is_ready() {
Ok(())
} else {
// check readiness and notify waiters
ReadyCall {
completed: false,
fut: svc.ready(ServiceCtx {
idx: self.idx,
bound: false,
waiters: self.waiters,
_t: marker::PhantomData,
}),
ctx: *self,
}
.await
// check readiness and notify waiters
ReadyCall {
fut: Some(svc.ready()),
fut1: None,
ctx: *self,
completed: false,
_t: marker::PhantomData,
}
.await
}
}
@ -245,13 +224,12 @@ impl<'a, S> ServiceCtx<'a, S> {
T: Service<R>,
R: 'a,
{
self.ready(svc).await?;
self.check_readiness(svc).await?;
svc.call(
req,
ServiceCtx {
idx: self.idx,
bound: false,
waiters: self.waiters,
_t: marker::PhantomData,
},
@ -274,7 +252,6 @@ impl<'a, S> ServiceCtx<'a, S> {
req,
ServiceCtx {
idx: self.idx,
bound: false,
waiters: self.waiters,
_t: marker::PhantomData,
},
@ -301,26 +278,35 @@ impl<'a, S> fmt::Debug for ServiceCtx<'a, S> {
}
}
struct ReadyCall<'a, S: ?Sized, F: Future<Output = Result<(), E>>, E> {
struct ReadyCall<'a, S: ?Sized, F: Future<Output = Option<R>>, E, R> {
completed: bool,
fut: F,
fut: Option<F>,
fut1: Option<R>,
ctx: ServiceCtx<'a, S>,
_t: marker::PhantomData<(R, E)>,
}
impl<'a, S: ?Sized, F: Future<Output = Result<(), E>>, E> Drop for ReadyCall<'a, S, F, E> {
impl<'a, S: ?Sized, F: Future<Output = Option<R>>, E, R> Drop
for ReadyCall<'a, S, F, E, R>
{
fn drop(&mut self) {
if !self.completed && !self.ctx.waiters.state.get().contains(Flags::BOUND) {
if !self.completed && !self.ctx.waiters.is_bound() {
self.ctx.waiters.notify();
}
}
}
impl<'a, S: ?Sized, F: Future<Output = Result<(), E>>, E> Unpin for ReadyCall<'a, S, F, E> {}
impl<'a, S: ?Sized, F: Future<Output = Result<(), E>>, E> Future
for ReadyCall<'a, S, F, E>
impl<'a, S: ?Sized, F: Future<Output = Option<R>>, E, R> Unpin
for ReadyCall<'a, S, F, E, R>
{
type Output = F::Output;
}
impl<'a, S: ?Sized, F: Future<Output = Option<R>>, E, R> Future
for ReadyCall<'a, S, F, E, R>
where
R: Future<Output = Result<(), E>>,
{
type Output = Result<(), E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let st = self.ctx.waiters.state.get();
@ -333,17 +319,36 @@ impl<'a, S: ?Sized, F: Future<Output = Result<(), E>>, E> Future
}
} else if self.ctx.waiters.can_check(self.ctx.idx, cx) {
// SAFETY: `fut` never moves
let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) };
match result {
Poll::Pending => {
self.ctx.waiters.register(self.ctx.idx, cx);
Poll::Pending
if let Some(ref mut fut) = self.as_mut().fut {
let result = unsafe { Pin::new_unchecked(fut).poll(cx) };
match result {
Poll::Pending => {
self.ctx.waiters.register(self.ctx.idx, cx);
return Poll::Pending;
}
Poll::Ready(res) => {
self.fut1 = res;
let _ = self.fut.take();
}
}
Poll::Ready(res) => {
self.completed = true;
self.ctx.waiters.notify();
Poll::Ready(res)
}
if let Some(ref mut fut) = self.as_mut().fut1 {
match unsafe { Pin::new_unchecked(fut).poll(cx) } {
Poll::Pending => {
self.ctx.waiters.register(self.ctx.idx, cx);
Poll::Pending
}
Poll::Ready(res) => {
self.completed = true;
self.ctx.waiters.notify();
Poll::Ready(res)
}
}
} else {
self.completed = true;
self.ctx.waiters.notify();
Poll::Ready(Ok(()))
}
} else {
Poll::Pending
@ -373,11 +378,6 @@ mod tests {
Ok(())
}
async fn unready(&self) -> Result<(), Self::Error> {
self.1.ready().await;
Ok(())
}
async fn call(
&self,
req: &'static str,

View file

@ -134,11 +134,6 @@ where
type Response = Res;
type Error = Err;
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
std::future::pending().await
}
#[inline]
async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result<Res, Err> {
(self.f)(req).await
@ -212,11 +207,6 @@ where
type Response = Res;
type Error = Err;
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
std::future::pending().await
}
#[inline]
async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result<Res, Err> {
(self.f)(req).await

View file

@ -62,11 +62,6 @@ where
}
}
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
std::future::pending().await
}
#[inline]
async fn call(&self, req: Req, _: ServiceCtx<'_, Self>) -> Result<Req, Err> {
Ok(req)

View file

@ -1,12 +1,12 @@
//! See [`Service`] docs for information on this crate's foundational trait.
#![allow(async_fn_in_trait)]
#![deny(
rust_2018_idioms,
warnings,
unreachable_pub,
missing_debug_implementations
)]
use std::rc::Rc;
// #![deny(
// rust_2018_idioms,
// warnings,
// unreachable_pub,
// missing_debug_implementations
// )]
use std::{future::Future, rc::Rc};
mod and_then;
mod apply;
@ -114,14 +114,8 @@ pub trait Service<Req> {
/// This is a **best effort** implementation. False positives are permitted. It is permitted for
/// the service to returns from a `ready` call and the next invocation of `call`
/// results in an error.
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
Ok(())
}
/// Returns when the service becomes un-ready and not able to process requests.
///
async fn unready(&self) -> Result<(), Self::Error> {
std::future::pending().await
async fn ready(&self) -> Option<impl Future<Output = Result<(), Self::Error>>> {
None::<util::Ready<Self::Error>>
}
#[inline]
@ -248,13 +242,8 @@ where
type Error = S::Error;
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), S::Error> {
ctx.ready(&**self).await
}
#[inline]
async fn unready(&self) -> Result<(), S::Error> {
(**self).unready().await
async fn ready(&self) -> Option<impl Future<Output = Result<(), Self::Error>>> {
(&**self).ready().await
}
#[inline]
@ -280,13 +269,8 @@ where
type Error = S::Error;
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), S::Error> {
ctx.ready(&**self).await
}
#[inline]
async fn unready(&self) -> Result<(), S::Error> {
(**self).unready().await
async fn ready(&self) -> Option<impl Future<Output = Result<(), Self::Error>>> {
(&**self).ready().await
}
#[inline]

View file

@ -5,34 +5,8 @@ macro_rules! forward_ready {
#[inline]
async fn ready(
&self,
ctx: $crate::ServiceCtx<'_, Self>,
) -> Result<(), Self::Error> {
ctx.ready(&self.$field)
.await
.map_err(::core::convert::Into::into)
}
};
($field:ident, $err:expr) => {
#[inline]
async fn ready(
&self,
ctx: $crate::ServiceCtx<'_, Self>,
) -> Result<(), Self::Error> {
ctx.ready(&self.$field).await.map_err($err)
}
};
}
/// An implementation of [`unready`] that forwards unready checks to a field.
#[macro_export]
macro_rules! forward_unready {
($field:ident) => {
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
self.$field
.unready()
.await
.map_err(::core::convert::Into::into)
) -> Option<impl ::std::future::Future<Output = Result<(), Self::Error>>> {
self.$field.ready().await
}
};
}

View file

@ -62,7 +62,6 @@ where
type Error = A::Error;
crate::forward_ready!(service);
crate::forward_unready!(service);
crate::forward_shutdown!(service);
#[inline]

View file

@ -1,4 +1,4 @@
use std::{fmt, marker::PhantomData};
use std::{fmt, future::Future, marker::PhantomData};
use super::{Service, ServiceCtx, ServiceFactory};
@ -63,13 +63,11 @@ where
type Error = E;
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
ctx.ready(&self.service).await.map_err(&self.f)
}
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
self.service.unready().await.map_err(&self.f)
async fn ready(&self) -> Option<impl Future<Output = Result<(), E>>> {
self.service
.ready()
.await
.map(|fut| async move { fut.await.map_err(&self.f) })
}
#[inline]

View file

@ -30,25 +30,13 @@ impl<S> Pipeline<S> {
#[inline]
/// Returns when the service is able to process requests.
pub async fn ready<R>(&self) -> Result<(), S::Error>
pub async fn ready<R>(
&self,
) -> Option<impl Future<Output = Result<(), S::Error>> + use<'_, S, R>>
where
S: Service<R>,
{
ServiceCtx::<'_, S>::new(&self.waiters)
.ready(self.svc.as_ref())
.await
}
#[inline]
/// Returns when the service is able to process requests.
pub async fn unready<R>(&self) -> Result<(), S::Error>
where
S: Service<R>,
{
self.waiters.set_ready();
let result = self.svc.unready().await;
self.waiters.set_notready();
result
self.svc.as_ref().ready().await
}
#[inline]
@ -61,7 +49,7 @@ impl<S> Pipeline<S> {
let ctx = ServiceCtx::<'_, S>::new(&self.waiters);
// check service readiness
ctx.ready(self.svc.as_ref()).await?;
ctx.check_readiness(self.svc.as_ref()).await?;
// call service
self.svc.as_ref().call(req, ctx).await
@ -168,10 +156,15 @@ where
let fut = Box::pin(async move {
loop {
pl.svc
.ready(ServiceCtx::<'_, S>::new_bound(&pl.waiters))
.await?;
pl.unready().await?;
if let Some(fut) = pl.svc.ready().await {
pl.waiters.set_ready();
let result = fut.await;
pl.waiters.set_notready();
result?
} else {
pl.waiters.set_ready();
std::future::pending().await
}
}
});

View file

@ -1,3 +1,5 @@
use std::future::Future;
use super::{util, Service, ServiceCtx, ServiceFactory};
#[derive(Debug, Clone)]
@ -26,13 +28,8 @@ where
type Error = B::Error;
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
util::ready(&self.svc1, &self.svc2, ctx).await
}
#[inline]
async fn unready(&self) -> Result<(), Self::Error> {
util::unready(&self.svc1, &self.svc2).await
async fn ready(&self) -> Option<impl Future<Output = Result<(), Self::Error>>> {
util::ready(&self.svc1, &self.svc2).await
}
#[inline]

View file

@ -1,6 +1,16 @@
use std::{future::poll_fn, future::Future, pin, task::Poll};
use std::{future::poll_fn, future::Future, marker, pin, task::Context, task::Poll};
use crate::{Service, ServiceCtx};
use crate::Service;
pub(crate) struct Ready<E>(marker::PhantomData<E>);
impl<E> Future for Ready<E> {
type Output = Result<(), E>;
fn poll(self: pin::Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
pub(crate) async fn shutdown<A, AR, B, BR>(svc1: &A, svc2: &B)
where
@ -29,53 +39,105 @@ where
.await
}
pub(crate) async fn ready<S, A, AR, B, BR>(
svc1: &A,
svc2: &B,
ctx: ServiceCtx<'_, S>,
) -> Result<(), A::Error>
pub(crate) async fn ready<'a, A, AR, B, BR>(
svc1: &'a A,
svc2: &'a B,
) -> Option<impl Future<Output = Result<(), A::Error>> + use<'a, A, AR, B, BR>>
where
A: Service<AR>,
B: Service<BR, Error = A::Error>,
{
let mut fut1 = pin::pin!(ctx.ready(svc1));
let mut fut2 = pin::pin!(ctx.ready(svc2));
let mut fut1 = pin::pin!(svc1.ready());
let mut fut2 = pin::pin!(svc2.ready());
let mut ready1 = false;
let mut ready2 = false;
poll_fn(move |cx| {
if !ready1 && pin::Pin::new(&mut fut1).poll(cx)?.is_ready() {
ready1 = true;
let mut r_fut1 = None;
let mut r_fut2 = None;
poll_fn(|cx| {
if !ready1 {
if let Poll::Ready(res) = pin::Pin::new(&mut fut1).poll(cx) {
r_fut1 = res;
ready1 = true;
}
}
if !ready2 && pin::Pin::new(&mut fut2).poll(cx)?.is_ready() {
ready2 = true;
if !ready2 {
if let Poll::Ready(res) = pin::Pin::new(&mut fut2).poll(cx) {
r_fut2 = res;
ready2 = true;
}
}
if ready1 && ready2 {
Poll::Ready(Ok(()))
Poll::Ready(())
} else {
Poll::Pending
}
})
.await
.await;
if r_fut1.is_none() && r_fut2.is_none() {
None
} else {
Some(async move {
match (r_fut1, r_fut2) {
(Some(fut), None) => fut.await?,
(None, Some(fut)) => fut.await?,
(Some(fut1), Some(fut2)) => match select(fut1, fut2).await {
Either::Left(res) => res?,
Either::Right(res) => res?,
},
(None, None) => (),
}
Ok(())
})
}
}
pub(crate) async fn unready<A, AR, B, BR>(svc1: &A, svc2: &B) -> Result<(), A::Error>
pub(crate) enum Either<A, B> {
/// First branch of the type
Left(A),
/// Second branch of the type
Right(B),
}
/// Waits for either one of two differently-typed futures to complete.
pub(crate) async fn select<A, B>(fut_a: A, fut_b: B) -> Either<A::Output, B::Output>
where
A: Service<AR>,
B: Service<BR, Error = A::Error>,
A: Future,
B: Future,
{
let mut fut1 = pin::pin!(svc1.unready());
let mut fut2 = pin::pin!(svc2.unready());
poll_fn(move |cx| {
if pin::Pin::new(&mut fut1).poll(cx)?.is_ready()
|| pin::Pin::new(&mut fut2).poll(cx)?.is_ready()
{
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
})
.await
Select { fut_a, fut_b }.await
}
pin_project_lite::pin_project! {
pub(crate) struct Select<A, B> {
#[pin]
fut_a: A,
#[pin]
fut_b: B,
}
}
impl<A, B> Future for Select<A, B>
where
A: Future,
B: Future,
{
type Output = Either<A::Output, B::Output>;
fn poll(self: pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(item) = this.fut_a.poll(cx) {
return Poll::Ready(Either::Left(item));
}
if let Poll::Ready(item) = this.fut_b.poll(cx) {
return Poll::Ready(Either::Right(item));
}
Poll::Pending
}
}