Use "async fn" for Service::ready() and Service::shutdown() methods (#361)

This commit is contained in:
Nikolay Kim 2024-05-28 16:37:16 +05:00 committed by GitHub
parent d8f55decb0
commit dec6ab083a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 704 additions and 410 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [3.0.0] - 2024-05-28
* Use "async fn" for Service::ready() and Service::shutdown() methods
## [2.0.2] - 2024-03-20
* Add boxed rc service factory

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "2.0.1"
version = "3.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]

View file

@ -1,6 +1,4 @@
use std::{task::Context, task::Poll};
use super::{Service, ServiceCtx, ServiceFactory};
use super::{util, Service, ServiceCtx, ServiceFactory};
#[derive(Clone, Debug)]
/// Service for the `and_then` combinator, chaining a computation onto the end
@ -27,22 +25,14 @@ where
type Response = B::Response;
type Error = A::Error;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.svc1.poll_ready(cx)?.is_ready();
if !self.svc2.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
util::ready(&self.svc1, &self.svc2, ctx).await
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.svc1.poll_shutdown(cx).is_ready() && self.svc2.poll_shutdown(cx).is_ready()
{
Poll::Ready(())
} else {
Poll::Pending
}
#[inline]
async fn shutdown(&self) {
util::shutdown(&self.svc1, &self.svc2).await
}
#[inline]
@ -93,21 +83,20 @@ where
#[cfg(test)]
mod tests {
use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use std::{cell::Cell, rc::Rc};
use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
use ntex_util::future::{lazy, Ready};
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);
struct Srv1(Rc<Cell<usize>>, Rc<Cell<usize>>);
impl Service<&'static str> for Srv1 {
type Response = &'static str;
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
Ok(())
}
async fn call(
@ -117,18 +106,22 @@ mod tests {
) -> Result<Self::Response, ()> {
Ok(req)
}
async fn shutdown(&self) {
self.1.set(self.1.get() + 1);
}
}
#[derive(Clone)]
struct Srv2(Rc<Cell<usize>>);
struct Srv2(Rc<Cell<usize>>, Rc<Cell<usize>>);
impl Service<&'static str> for Srv2 {
type Response = (&'static str, &'static str);
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
Ok(())
}
async fn call(
@ -138,34 +131,46 @@ mod tests {
) -> Result<Self::Response, ()> {
Ok((req, "srv2"))
}
async fn shutdown(&self) {
self.1.set(self.1.get() + 1);
}
}
#[ntex::test]
async fn test_poll_ready() {
async fn test_ready() {
let cnt = Rc::new(Cell::new(0));
let srv = chain(Srv1(cnt.clone())).and_then(Srv2(cnt.clone())).clone();
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
let cnt_sht = Rc::new(Cell::new(0));
let srv = chain(Srv1(cnt.clone(), cnt_sht.clone()))
.and_then(Srv2(cnt.clone(), cnt_sht.clone()))
.clone()
.into_pipeline();
let res = srv.ready().await;
assert_eq!(res, Ok(()));
assert_eq!(cnt.get(), 2);
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
srv.shutdown().await;
assert_eq!(cnt_sht.get(), 2);
}
#[ntex::test]
async fn test_poll_ready2() {
async fn test_ready2() {
let cnt = Rc::new(Cell::new(0));
let srv = Box::new(chain(Srv1(cnt.clone())).and_then(Srv2(cnt.clone())));
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
let srv = Box::new(
chain(Srv1(cnt.clone(), Rc::new(Cell::new(0))))
.and_then(Srv2(cnt.clone(), Rc::new(Cell::new(0)))),
)
.into_pipeline();
let res = srv.ready().await;
assert_eq!(res, Ok(()));
assert_eq!(cnt.get(), 2);
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
}
#[ntex::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let srv = chain(Srv1(cnt.clone())).and_then(Srv2(cnt)).into_pipeline();
let srv = chain(Srv1(cnt.clone(), Rc::new(Cell::new(0))))
.and_then(Srv2(cnt, Rc::new(Cell::new(0))))
.into_pipeline();
let res = srv.call("srv1").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2"));
@ -176,9 +181,13 @@ mod tests {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let new_srv = chain_factory(fn_factory(move || {
Ready::from(Ok::<_, ()>(Srv1(cnt2.clone())))
let cnt = cnt2.clone();
async move { Ok::<_, ()>(Srv1(cnt, Rc::new(Cell::new(0)))) }
}))
.and_then(fn_factory(move || {
let cnt = cnt.clone();
async move { Ok(Srv2(cnt.clone(), Rc::new(Cell::new(0)))) }
}))
.and_then(move || Ready::from(Ok(Srv2(cnt.clone()))))
.clone();
let srv = new_srv.pipeline(&()).await.unwrap();

View file

@ -1,8 +1,9 @@
#![allow(clippy::type_complexity)]
use std::{fmt, future::Future, marker};
use super::ctx::ServiceCtx;
use super::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory};
use super::{
IntoService, IntoServiceFactory, Pipeline, Service, ServiceCtx, ServiceFactory,
};
/// Apply transform function to a service.
pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
@ -98,8 +99,15 @@ where
type Response = Out;
type Error = Err;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
#[inline]
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Err> {
self.service.ready().await.map_err(From::from)
}
#[inline]
async fn shutdown(&self) {
self.service.shutdown().await
}
#[inline]
async fn call(
@ -199,14 +207,13 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, Ready};
use std::task::Poll;
use std::{cell::Cell, rc::Rc};
use super::*;
use crate::{chain, chain_factory};
use crate::{chain, chain_factory, fn_factory};
#[derive(Clone, Debug)]
struct Srv;
#[derive(Debug, Default, Clone)]
struct Srv(Rc<Cell<usize>>);
impl Service<()> for Srv {
type Response = ();
@ -215,6 +222,10 @@ mod tests {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Ok(())
}
async fn shutdown(&self) {
self.0.set(self.0.get() + 1);
}
}
#[derive(Debug, PartialEq, Eq)]
@ -228,8 +239,9 @@ mod tests {
#[ntex::test]
async fn test_call() {
let cnt_sht = Rc::new(Cell::new(0));
let srv = chain(
apply_fn(Srv, |req: &'static str, svc| async move {
apply_fn(Srv(cnt_sht.clone()), |req: &'static str, svc| async move {
svc.call(()).await.unwrap();
Ok((req, ()))
})
@ -237,12 +249,10 @@ mod tests {
)
.into_pipeline();
assert_eq!(
lazy(|cx| srv.poll_ready(cx)).await,
Poll::Ready(Ok::<_, Err>(()))
);
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
assert_eq!(srv.ready().await, Ok::<_, Err>(()));
srv.shutdown().await;
assert_eq!(cnt_sht.get(), 1);
let res = srv.call("srv").await;
assert!(res.is_ok());
@ -251,7 +261,8 @@ mod tests {
#[ntex::test]
async fn test_call_chain() {
let srv = chain(Srv)
let cnt_sht = Rc::new(Cell::new(0));
let srv = chain(Srv(cnt_sht.clone()))
.apply_fn(|req: &'static str, svc| async move {
svc.call(()).await.unwrap();
Ok((req, ()))
@ -259,12 +270,10 @@ mod tests {
.clone()
.into_pipeline();
assert_eq!(
lazy(|cx| srv.poll_ready(cx)).await,
Poll::Ready(Ok::<_, Err>(()))
);
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
assert_eq!(srv.ready().await, Ok::<_, Err>(()));
srv.shutdown().await;
assert_eq!(cnt_sht.get(), 1);
let res = srv.call("srv").await;
assert!(res.is_ok());
@ -276,7 +285,7 @@ mod tests {
async fn test_create() {
let new_srv = chain_factory(
apply_fn_factory(
|| Ready::<_, ()>::Ok(Srv),
fn_factory(|| async { Ok::<_, ()>(Srv::default()) }),
|req: &'static str, srv| async move {
srv.call(()).await.unwrap();
Ok((req, ()))
@ -287,10 +296,7 @@ mod tests {
let srv = new_srv.pipeline(&()).await.unwrap();
assert_eq!(
lazy(|cx| srv.poll_ready(cx)).await,
Poll::Ready(Ok::<_, Err>(()))
);
assert_eq!(srv.ready().await, Ok::<_, Err>(()));
let res = srv.call("srv").await;
assert!(res.is_ok());
@ -302,7 +308,7 @@ mod tests {
#[ntex::test]
async fn test_create_chain() {
let new_srv = chain_factory(|| Ready::<_, ()>::Ok(Srv))
let new_srv = chain_factory(fn_factory(|| async { Ok::<_, ()>(Srv::default()) }))
.apply_fn(|req: &'static str, srv| async move {
srv.call(()).await.unwrap();
Ok((req, ()))
@ -311,10 +317,7 @@ mod tests {
let srv = new_srv.pipeline(&()).await.unwrap();
assert_eq!(
lazy(|cx| srv.poll_ready(cx)).await,
Poll::Ready(Ok::<_, Err>(()))
);
assert_eq!(srv.ready().await, Ok::<_, Err>(()));
let res = srv.call("srv").await;
assert!(res.is_ok());

View file

@ -1,8 +1,8 @@
use std::{fmt, future::Future, pin::Pin, task::Context, task::Poll};
use std::{fmt, future::Future, pin::Pin};
use crate::ctx::{ServiceCtx, WaitersRef};
pub type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>;
type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>;
pub struct BoxService<Req, Res, Err>(Box<dyn ServiceObj<Req, Response = Res, Error = Err>>);
pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(
Box<dyn ServiceFactoryObj<Req, Cfg, Response = Res, Error = Err, InitError = InitErr>>,
@ -48,9 +48,11 @@ trait ServiceObj<Req> {
type Response;
type Error;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()>;
fn ready<'a>(
&'a self,
idx: usize,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, (), Self::Error>;
fn call<'a>(
&'a self,
@ -58,22 +60,34 @@ trait ServiceObj<Req> {
idx: usize,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, Self::Response, Self::Error>;
fn shutdown<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
}
impl<S, Req> ServiceObj<Req> for S
where
Req: 'static,
S: crate::Service<Req>,
Req: 'static,
{
type Response = S::Response;
type Error = S::Error;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
crate::Service::poll_ready(self, cx)
#[inline]
fn ready<'a>(
&'a self,
idx: usize,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, (), Self::Error> {
Box::pin(async move {
ServiceCtx::<'a, S>::from_ref(idx, waiters)
.ready(self)
.await
})
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
crate::Service::poll_shutdown(self, cx)
#[inline]
fn shutdown<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
Box::pin(crate::Service::shutdown(self))
}
#[inline]
@ -136,13 +150,14 @@ where
type Error = Err;
#[inline]
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(ctx)
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let (idx, waiters) = ctx.inner();
self.0.ready(idx, waiters).await
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
self.0.poll_shutdown(cx)
async fn shutdown(&self) {
self.0.shutdown().await
}
#[inline]

View file

@ -171,8 +171,8 @@ impl<Svc: Service<Req>, Req> Service<Req> for ServiceChain<Svc, Req> {
type Response = Svc::Response;
type Error = Svc::Error;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
crate::forward_ready!(service);
crate::forward_shutdown!(service);
#[inline]
async fn call(

View file

@ -1,4 +1,4 @@
use std::{cell::UnsafeCell, fmt, future::poll_fn, marker, rc::Rc, task, task::Poll};
use std::{cell, fmt, future::poll_fn, future::Future, marker, pin, rc::Rc, task};
use crate::Service;
@ -8,17 +8,20 @@ pub struct ServiceCtx<'a, S: ?Sized> {
_t: marker::PhantomData<Rc<S>>,
}
pub(crate) struct WaitersRef(UnsafeCell<slab::Slab<Option<task::Waker>>>);
pub(crate) struct Waiters {
index: usize,
waiters: Rc<WaitersRef>,
}
pub(crate) struct WaitersRef {
cur: cell::Cell<usize>,
indexes: cell::UnsafeCell<slab::Slab<Option<task::Waker>>>,
}
impl WaitersRef {
#[allow(clippy::mut_from_ref)]
fn get(&self) -> &mut slab::Slab<Option<task::Waker>> {
unsafe { &mut *self.0.get() }
unsafe { &mut *self.indexes.get() }
}
fn insert(&self) -> usize {
@ -40,6 +43,21 @@ impl WaitersRef {
waker.wake();
}
}
self.cur.set(usize::MAX);
}
pub(crate) fn can_check(&self, idx: usize, cx: &mut task::Context<'_>) -> bool {
let cur = self.cur.get();
if cur == idx {
true
} else if cur == usize::MAX {
self.cur.set(idx);
true
} else {
self.register(idx, cx);
false
}
}
}
@ -49,7 +67,10 @@ impl Waiters {
let index = waiters.insert(None);
Waiters {
index,
waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))),
waiters: Rc::new(WaitersRef {
cur: cell::Cell::new(usize::MAX),
indexes: cell::UnsafeCell::new(waiters),
}),
}
}
@ -57,12 +78,26 @@ impl Waiters {
self.waiters.as_ref()
}
pub(crate) fn can_check(&self, cx: &mut task::Context<'_>) -> bool {
self.waiters.can_check(self.index, cx)
}
pub(crate) fn register(&self, cx: &mut task::Context<'_>) {
self.waiters.register(self.index, cx)
self.waiters.register(self.index, cx);
}
pub(crate) fn notify(&self) {
self.waiters.notify()
if self.waiters.cur.get() == self.index {
self.waiters.notify();
}
}
}
impl Drop for Waiters {
#[inline]
fn drop(&mut self) {
self.waiters.remove(self.index);
self.waiters.notify();
}
}
@ -84,13 +119,6 @@ impl fmt::Debug for Waiters {
}
}
impl Drop for Waiters {
#[inline]
fn drop(&mut self) {
self.waiters.remove(self.index);
}
}
impl<'a, S> ServiceCtx<'a, S> {
pub(crate) fn new(waiters: &'a Waiters) -> Self {
Self {
@ -118,15 +146,25 @@ impl<'a, S> ServiceCtx<'a, S> {
T: Service<R>,
{
// check readiness and notify waiters
poll_fn(move |cx| match svc.poll_ready(cx)? {
Poll::Ready(()) => {
self.waiters.notify();
Poll::Ready(Ok(()))
}
Poll::Pending => {
self.waiters.register(self.idx, cx);
Poll::Pending
let mut fut = svc.ready(ServiceCtx {
idx: self.idx,
waiters: self.waiters,
_t: marker::PhantomData,
});
poll_fn(|cx| {
if self.waiters.can_check(self.idx, cx) {
// SAFETY: `fut` never moves
let p = unsafe { pin::Pin::new_unchecked(&mut fut) };
match p.poll(cx) {
task::Poll::Pending => self.waiters.register(self.idx, cx),
task::Poll::Ready(res) => {
self.waiters.notify();
return task::Poll::Ready(res);
}
}
}
task::Poll::Pending
})
.await
}
@ -139,6 +177,7 @@ impl<'a, S> ServiceCtx<'a, S> {
R: 'a,
{
self.ready(svc).await?;
svc.call(
req,
ServiceCtx {
@ -193,8 +232,7 @@ impl<'a, S> fmt::Debug for ServiceCtx<'a, S> {
#[cfg(test)]
mod tests {
use std::task::Context;
use std::{cell::Cell, cell::RefCell};
use std::{cell::Cell, cell::RefCell, future::poll_fn, task::Poll};
use ntex_util::{channel::condition, future::lazy, time};
@ -207,9 +245,10 @@ mod tests {
type Response = &'static str;
type Error = ();
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.0.set(self.0.get() + 1);
self.1.poll_ready(cx).map(|_| Ok(()))
self.1.ready().await;
Ok(())
}
async fn call(
@ -225,11 +264,11 @@ mod tests {
}
#[ntex::test]
async fn test_poll_ready() {
async fn test_ready() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait()));
let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait())).bind();
let srv2 = srv1.clone();
let res = lazy(|cx| srv1.poll_ready(cx)).await;
@ -238,17 +277,26 @@ mod tests {
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
assert_eq!(cnt.get(), 2);
assert_eq!(cnt.get(), 1);
con.notify();
let res = lazy(|cx| srv1.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 3);
assert_eq!(cnt.get(), 1);
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
assert_eq!(cnt.get(), 4);
assert_eq!(cnt.get(), 2);
con.notify();
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 2);
let res = lazy(|cx| srv1.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
assert_eq!(cnt.get(), 3);
}
#[ntex::test]
@ -258,7 +306,7 @@ mod tests {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait()));
let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait())).bind();
let srv2 = srv1.clone();
let data1 = data.clone();
@ -270,7 +318,7 @@ mod tests {
let data2 = data.clone();
ntex::rt::spawn(async move {
let i = srv2.call_static("srv2").await.unwrap();
let i = srv2.call("srv2").await.unwrap();
data2.borrow_mut().push(i);
});
time::sleep(time::Millis(50)).await;
@ -278,13 +326,13 @@ mod tests {
con.notify();
time::sleep(time::Millis(150)).await;
assert_eq!(cnt.get(), 4);
assert_eq!(&*data.borrow(), &["srv2"]);
assert_eq!(cnt.get(), 2);
assert_eq!(&*data.borrow(), &["srv1"]);
con.notify();
time::sleep(time::Millis(150)).await;
assert_eq!(cnt.get(), 5);
assert_eq!(&*data.borrow(), &["srv2", "srv1"]);
assert_eq!(cnt.get(), 2);
assert_eq!(&*data.borrow(), &["srv1", "srv2"]);
}
}

View file

@ -396,14 +396,14 @@ mod tests {
let new_srv = fn_service(|()| async { Ok::<_, ()>("srv") }).clone();
format!("{:?}", new_srv);
let srv = Pipeline::new(new_srv.create(()).await.unwrap());
let srv = Pipeline::new(new_srv.create(()).await.unwrap()).bind();
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv");
format!("{:?}", srv);
let srv2 = Pipeline::new(new_srv.clone());
let srv2 = Pipeline::new(new_srv.clone()).bind();
let res = srv2.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv");
@ -421,7 +421,8 @@ mod tests {
.await
.unwrap()
.clone(),
);
)
.bind();
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
@ -442,7 +443,7 @@ mod tests {
})
.clone();
let srv = Pipeline::new(new_srv.create(&1).await.unwrap());
let srv = Pipeline::new(new_srv.create(&1).await.unwrap()).bind();
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, fmt, marker::PhantomData, task::Context, task::Poll};
use std::{cell::Cell, fmt, marker::PhantomData};
use crate::{Service, ServiceCtx};
@ -56,11 +56,10 @@ where
type Error = Err;
#[inline]
fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> {
async fn shutdown(&self) {
if let Some(f) = self.f_shutdown.take() {
(f)()
}
Poll::Ready(())
}
#[inline]
@ -71,7 +70,6 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::lazy;
use std::rc::Rc;
use crate::{chain, fn_service, Pipeline};
@ -90,10 +88,10 @@ mod tests {
let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone());
let res = pipe.call(()).await;
assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert_eq!(pipe.ready().await, Ok(()));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "pipe");
assert_eq!(lazy(|cx| pipe.poll_shutdown(cx)).await, Poll::Ready(()));
pipe.shutdown().await;
assert!(is_called.get());
format!("{:?}", pipe);

View file

@ -1,4 +1,5 @@
//! See [`Service`] docs for information on this crate's foundational trait.
#![allow(async_fn_in_trait)]
#![deny(
rust_2018_idioms,
warnings,
@ -6,7 +7,7 @@
missing_debug_implementations
)]
use std::{future::Future, rc::Rc, task, task::Context, task::Poll};
use std::{future::Future, rc::Rc};
mod and_then;
mod apply;
@ -23,6 +24,7 @@ mod map_init_err;
mod middleware;
mod pipeline;
mod then;
mod util;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::chain::{chain, chain_factory};
@ -31,7 +33,7 @@ pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::fn_shutdown::fn_shutdown;
pub use self::map_config::{map_config, unit_config};
pub use self::middleware::{apply, Identity, Middleware, Stack};
pub use self::pipeline::{Pipeline, PipelineCall};
pub use self::pipeline::{Pipeline, PipelineBinding, PipelineCall};
#[allow(unused_variables)]
/// An asynchronous function of `Request` to a `Response`.
@ -98,39 +100,30 @@ pub trait Service<Req> {
/// should take care to not call `poll_ready`. Caller of the service verifies readiness,
/// Only way to make a `call` is to use `ctx` argument, it enforces readiness before calling
/// service.
fn call(
async fn call(
&self,
req: Req,
ctx: ServiceCtx<'_, Self>,
) -> impl Future<Output = Result<Self::Response, Self::Error>>;
) -> Result<Self::Response, Self::Error>;
#[inline]
/// Returns `Ready` when the service is able to process requests.
/// Returns when the service is able to process requests.
///
/// If the service is at capacity, then `Pending` is returned and the task is notified when
/// If the service is at capacity, then `ready` does not returns and the task is notified when
/// the service becomes ready again. This function is expected to be called while on a task.
///
/// This is a **best effort** implementation. False positives are permitted. It is permitted for
/// the service to return `Ready` from a `poll_ready` call and the next invocation of `call`
/// the service to returns from a `ready` call and the next invocation of `call`
/// results in an error.
///
/// # Notes
///
/// 1. `.poll_ready()` might be called on different task from actual service call.
/// 2. In case of chained services, `.poll_ready()` is called for all services at once.
/// 3. Every `.call()` in chained services enforces readiness.
fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
Ok(())
}
#[inline]
/// Shutdown service.
///
/// Returns `Ready` when the service is properly shutdowns. This method might be called
/// after it returns `Ready`.
fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> Poll<()> {
Poll::Ready(())
}
/// Returns when the service is properly shutdowns.
async fn shutdown(&self) {}
#[inline]
/// Map this service's output to a different type, returning a new service of the resulting type.
@ -195,7 +188,6 @@ pub trait ServiceFactory<Req, Cfg = ()> {
cfg: Cfg,
) -> impl Future<Output = Result<Self::Service, Self::InitError>>;
#[allow(async_fn_in_trait)]
/// Create and return a new service value asynchronously and wrap into a container
async fn pipeline(&self, cfg: Cfg) -> Result<Pipeline<Self::Service>, Self::InitError>
where
@ -253,13 +245,13 @@ where
type Error = S::Error;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
(**self).poll_ready(cx)
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), S::Error> {
ctx.ready(&**self).await
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
(**self).poll_shutdown(cx)
async fn shutdown(&self) {
(**self).shutdown().await
}
#[inline]
@ -280,13 +272,13 @@ where
type Error = S::Error;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
(**self).poll_ready(cx)
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), S::Error> {
ctx.ready(&**self).await
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
(**self).poll_shutdown(cx)
async fn shutdown(&self) {
(**self).shutdown().await
}
#[inline]
@ -351,15 +343,6 @@ where
}
}
/// Convert object of type `T` to a service `S`
pub fn into_service<Svc, Req, F>(tp: F) -> Svc
where
Svc: Service<Req>,
F: IntoService<Svc, Req>,
{
tp.into_service()
}
pub mod dev {
pub use crate::and_then::{AndThen, AndThenFactory};
pub use crate::apply::{Apply, ApplyFactory};

View file

@ -1,38 +1,35 @@
/// An implementation of [`poll_ready`] that forwards readiness checks to a field.
/// An implementation of [`ready`] that forwards readiness checks to a field.
#[macro_export]
macro_rules! forward_poll_ready {
macro_rules! forward_ready {
($field:ident) => {
#[inline]
fn poll_ready(
async fn ready(
&self,
cx: &mut ::core::task::Context<'_>,
) -> ::core::task::Poll<Result<(), Self::Error>> {
self.$field
.poll_ready(cx)
ctx: $crate::ServiceCtx<'_, Self>,
) -> Result<(), Self::Error> {
ctx.ready(&self.$field)
.await
.map_err(::core::convert::Into::into)
}
};
($field:ident, $err:expr) => {
#[inline]
fn poll_ready(
async fn ready(
&self,
cx: &mut ::core::task::Context<'_>,
) -> ::core::task::Poll<Result<(), Self::Error>> {
self.$field.poll_ready(cx).map_err($err)
ctx: $crate::ServiceCtx<'_, Self>,
) -> Result<(), Self::Error> {
ctx.ready(&self.$field).await.map_err($err)
}
};
}
/// An implementation of [`poll_shutdown`] that forwards readiness checks to a field.
/// An implementation of [`shutdown`] that forwards shutdown checks to a field.
#[macro_export]
macro_rules! forward_poll_shutdown {
macro_rules! forward_shutdown {
($field:ident) => {
#[inline]
fn poll_shutdown(
&self,
cx: &mut ::core::task::Context<'_>,
) -> ::core::task::Poll<()> {
self.$field.poll_shutdown(cx)
async fn shutdown(&self) {
self.$field.shutdown().await
}
};
}

View file

@ -61,8 +61,8 @@ where
type Response = Res;
type Error = A::Error;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
crate::forward_ready!(service);
crate::forward_shutdown!(service);
#[inline]
async fn call(
@ -146,60 +146,61 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::lazy;
use std::task::{Context, Poll};
use std::{cell::Cell, rc::Rc};
use crate::{fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory};
#[derive(Debug, Clone)]
struct Srv;
#[derive(Debug, Default, Clone)]
struct Srv(Rc<Cell<usize>>);
impl Service<()> for Srv {
type Response = ();
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
Ok(())
}
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Ok(())
}
async fn shutdown(&self) {
self.0.set(self.0.get() + 1);
}
}
#[ntex::test]
async fn test_service() {
let srv = Pipeline::new(Srv.map(|_| "ok").clone());
let cnt_sht = Rc::new(Cell::new(0));
let srv = Pipeline::new(Srv(cnt_sht.clone()).map(|_| "ok").clone());
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), "ok");
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
let res = srv.ready().await;
assert_eq!(res, Ok(()));
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
srv.shutdown().await;
assert_eq!(cnt_sht.get(), 1);
format!("{:?}", srv);
}
#[ntex::test]
async fn test_pipeline() {
let srv = Pipeline::new(crate::chain(Srv).map(|_| "ok").clone());
let srv = Pipeline::new(crate::chain(Srv::default()).map(|_| "ok").clone());
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), "ok");
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
let res = srv.ready().await;
assert_eq!(res, Ok(()));
}
#[ntex::test]
async fn test_factory() {
let new_srv = fn_factory(|| async { Ok::<_, ()>(Srv) })
let new_srv = fn_factory(|| async { Ok::<_, ()>(Srv::default()) })
.map(|_| "ok")
.clone();
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
@ -212,9 +213,10 @@ mod tests {
#[ntex::test]
async fn test_pipeline_factory() {
let new_srv = crate::chain_factory(fn_factory(|| async { Ok::<_, ()>(Srv) }))
.map(|_| "ok")
.clone();
let new_srv =
crate::chain_factory(fn_factory(|| async { Ok::<_, ()>(Srv::default()) }))
.map(|_| "ok")
.clone();
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await;
assert!(res.is_ok());

View file

@ -115,7 +115,6 @@ where
#[cfg(test)]
#[allow(clippy::redundant_closure)]
mod tests {
use ntex_util::future::Ready;
use std::{cell::Cell, rc::Rc};
use super::*;
@ -126,7 +125,7 @@ mod tests {
let item = Rc::new(Cell::new(1usize));
let factory = map_config(
fn_service(|item: usize| Ready::<_, ()>::Ok(item)),
fn_service(|item: usize| async move { Ok::<_, ()>(item) }),
|t: &usize| {
item.set(item.get() + *t);
},
@ -140,7 +139,7 @@ mod tests {
#[ntex::test]
async fn test_unit_config() {
let _ = unit_config(fn_service(|item: usize| Ready::<_, ()>::Ok(item)))
let _ = unit_config(fn_service(|item: usize| async move { Ok::<_, ()>(item) }))
.clone()
.create(&10)
.await;

View file

@ -1,4 +1,4 @@
use std::{fmt, marker::PhantomData, task::Context, task::Poll};
use std::{fmt, marker::PhantomData};
use super::{Service, ServiceCtx, ServiceFactory};
@ -63,8 +63,8 @@ where
type Error = E;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(&self.f)
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
ctx.ready(&self.service).await.map_err(&self.f)
}
#[inline]
@ -76,7 +76,7 @@ where
ctx.call(&self.service, req).await.map_err(|e| (self.f)(e))
}
crate::forward_poll_shutdown!(service);
crate::forward_shutdown!(service);
}
/// Factory for the `map_err` combinator, changing the type of a new
@ -158,44 +158,53 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, Ready};
use std::{cell::Cell, rc::Rc};
use super::*;
use crate::{fn_factory, Pipeline};
#[derive(Debug, Clone)]
struct Srv(bool);
struct Srv(bool, Rc<Cell<usize>>);
impl Service<()> for Srv {
type Response = ();
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
if self.0 {
Poll::Ready(Err(()))
Err(())
} else {
Poll::Ready(Ok(()))
Ok(())
}
}
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Err(())
}
async fn shutdown(&self) {
self.1.set(self.1.get() + 1);
}
}
#[ntex::test]
async fn test_poll_ready() {
let srv = Srv(true).map_err(|_| "error");
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Err("error")));
async fn test_ready() {
let cnt_sht = Rc::new(Cell::new(0));
let srv = Pipeline::new(Srv(true, cnt_sht.clone()).map_err(|_| "error"));
let res = srv.ready().await;
assert_eq!(res, Err("error"));
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
srv.shutdown().await;
assert_eq!(cnt_sht.get(), 1);
}
#[ntex::test]
async fn test_service() {
let srv = Pipeline::new(Srv(false).map_err(|_| "error").clone());
let srv = Pipeline::new(
Srv(false, Rc::new(Cell::new(0)))
.map_err(|_| "error")
.clone(),
);
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
@ -205,7 +214,11 @@ mod tests {
#[ntex::test]
async fn test_pipeline() {
let srv = Pipeline::new(crate::chain(Srv(false)).map_err(|_| "error").clone());
let srv = Pipeline::new(
crate::chain(Srv(false, Rc::new(Cell::new(0))))
.map_err(|_| "error")
.clone(),
);
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
@ -215,9 +228,10 @@ mod tests {
#[ntex::test]
async fn test_factory() {
let new_srv = fn_factory(|| Ready::<_, ()>::Ok(Srv(false)))
.map_err(|_| "error")
.clone();
let new_srv =
fn_factory(|| async { Ok::<_, ()>(Srv(false, Rc::new(Cell::new(0)))) })
.map_err(|_| "error")
.clone();
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await;
assert!(res.is_err());
@ -227,10 +241,11 @@ mod tests {
#[ntex::test]
async fn test_pipeline_factory() {
let new_srv =
crate::chain_factory(fn_factory(|| async { Ok::<Srv, ()>(Srv(false)) }))
.map_err(|_| "error")
.clone();
let new_srv = crate::chain_factory(fn_factory(|| async {
Ok::<Srv, ()>(Srv(false, Rc::new(Cell::new(0))))
}))
.map_err(|_| "error")
.clone();
let srv = Pipeline::new(new_srv.create(&()).await.unwrap());
let res = srv.call(()).await;
assert!(res.is_err());

View file

@ -69,7 +69,7 @@ where
#[cfg(test)]
mod tests {
use crate::{chain_factory, fn_factory_with_config, into_service, ServiceFactory};
use crate::{chain_factory, fn_factory_with_config, fn_service, ServiceFactory};
#[ntex::test]
async fn map_init_err() {
@ -79,7 +79,7 @@ mod tests {
if err {
Err(())
} else {
Ok(into_service(|i: usize| async move { Ok::<_, ()>(i * 2) }))
Ok(fn_service(|i: usize| async move { Ok::<_, ()>(i * 2) }))
}
}
}))
@ -99,7 +99,7 @@ mod tests {
if err {
Err(())
} else {
Ok(into_service(|i: usize| async move { Ok::<_, ()>(i * 2) }))
Ok(fn_service(|i: usize| async move { Ok::<_, ()>(i * 2) }))
}
}
})

View file

@ -21,10 +21,18 @@ where
///
/// For example, timeout middleware:
///
/// ```rust,ignore
/// ```rust
/// use ntex_service::{Service, ServiceCtx};
/// use ntex_util::{time::sleep, future::Either, future::select};
///
/// pub struct Timeout<S> {
/// service: S,
/// timeout: Duration,
/// timeout: std::time::Duration,
/// }
///
/// pub enum TimeoutError<E> {
/// Service(E),
/// Timeout,
/// }
///
/// impl<S, R> Service<R> for Timeout<S>
@ -34,11 +42,11 @@ where
/// type Response = S::Response;
/// type Error = TimeoutError<S::Error>;
///
/// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// self.service.poll_ready(cx).map_err(TimeoutError::Service)
/// async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
/// ctx.ready(&self.service).await.map_err(TimeoutError::Service)
/// }
///
/// async fn call(&self, req: S::Request) -> Result<Self::Response, Self::Error> {
/// async fn call(&self, req: R, ctx: ServiceCtx<'_, Self>) -> Result<Self::Response, Self::Error> {
/// match select(sleep(self.timeout), ctx.call(&self.service, req)).await {
/// Either::Left(_) => Err(TimeoutError::Timeout),
/// Either::Right(res) => res.map_err(TimeoutError::Service),
@ -59,18 +67,18 @@ where
///
/// ```rust,ignore
/// pub struct TimeoutMiddleware {
/// timeout: Duration,
/// timeout: std::time::Duration,
/// }
///
/// impl<S> Middleware<S> for TimeoutMiddleware<E>
/// impl<S> Middleware<S> for TimeoutMiddleware
/// {
/// type Service = Timeout<S>;
///
/// fn create(&self, service: S) -> Self::Service {
/// ok(Timeout {
/// Timeout {
/// service,
/// timeout: self.timeout,
/// })
/// }
/// }
/// }
/// ```
@ -183,32 +191,31 @@ where
#[cfg(test)]
#[allow(clippy::redundant_clone)]
mod tests {
use ntex_util::future::{lazy, Ready};
use std::task::{Context, Poll};
use std::{cell::Cell, rc::Rc};
use super::*;
use crate::{fn_service, Pipeline, ServiceCtx};
#[derive(Debug, Clone)]
struct Tr<R>(PhantomData<R>);
struct Tr<R>(PhantomData<R>, Rc<Cell<usize>>);
impl<S, R> Middleware<S> for Tr<R> {
type Service = Srv<S, R>;
fn create(&self, service: S) -> Self::Service {
Srv(service, PhantomData)
Srv(service, PhantomData, self.1.clone())
}
}
#[derive(Debug, Clone)]
struct Srv<S, R>(S, PhantomData<R>);
struct Srv<S, R>(S, PhantomData<R>, Rc<Cell<usize>>);
impl<S: Service<R>, R> Service<R> for Srv<S, R> {
type Response = S::Response;
type Error = S::Error;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
ctx.ready(&self.0).await
}
async fn call(
@ -218,13 +225,18 @@ mod tests {
) -> Result<S::Response, S::Error> {
ctx.call(&self.0, req).await
}
async fn shutdown(&self) {
self.2.set(self.2.get() + 1);
}
}
#[ntex::test]
async fn middleware() {
let cnt_sht = Rc::new(Cell::new(0));
let factory = apply(
Rc::new(Tr(PhantomData).clone()),
fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2)),
Rc::new(Tr(PhantomData, cnt_sht.clone()).clone()),
fn_service(|i: usize| async move { Ok::<_, ()>(i * 2) }),
)
.clone();
@ -234,15 +246,13 @@ mod tests {
assert_eq!(res.unwrap(), 20);
format!("{:?} {:?}", factory, srv);
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
assert_eq!(srv.ready().await, Ok(()));
srv.shutdown().await;
assert_eq!(cnt_sht.get(), 1);
let factory =
crate::chain_factory(fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2)))
.apply(Rc::new(Tr(PhantomData).clone()))
crate::chain_factory(fn_service(|i: usize| async move { Ok::<_, ()>(i * 2) }))
.apply(Rc::new(Tr(PhantomData, Rc::new(Cell::new(0))).clone()))
.clone();
let srv = Pipeline::new(factory.create(&()).await.unwrap().clone());
@ -251,10 +261,6 @@ mod tests {
assert_eq!(res.unwrap(), 20);
format!("{:?} {:?}", factory, srv);
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
assert_eq!(srv.ready().await, Ok(()));
}
}

View file

@ -1,5 +1,4 @@
use std::future::{poll_fn, Future};
use std::{cell::Cell, fmt, pin::Pin, rc::Rc, task, task::Context, task::Poll};
use std::{cell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use crate::{ctx::Waiters, Service, ServiceCtx};
@ -9,7 +8,6 @@ use crate::{ctx::Waiters, Service, ServiceCtx};
/// Container allows to call enclosed service and adds support of shared readiness.
pub struct Pipeline<S> {
svc: Rc<S>,
pending: Cell<bool>,
pub(crate) waiters: Waiters,
}
@ -19,7 +17,6 @@ impl<S> Pipeline<S> {
pub fn new(svc: S) -> Self {
Pipeline {
svc: Rc::new(svc),
pending: Cell::new(false),
waiters: Waiters::new(),
}
}
@ -36,33 +33,9 @@ impl<S> Pipeline<S> {
where
S: Service<R>,
{
poll_fn(move |cx| self.poll_ready(cx)).await
}
#[inline]
/// Returns `Ready` when the service is able to process requests.
pub fn poll_ready<R>(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>
where
S: Service<R>,
{
let res = self.svc.poll_ready(cx);
if res.is_pending() {
self.pending.set(true);
self.waiters.register(cx)
} else if self.pending.get() {
self.pending.set(false);
self.waiters.notify()
}
res
}
#[inline]
/// Shutdown enclosed service.
pub fn poll_shutdown<R>(&self, cx: &mut Context<'_>) -> Poll<()>
where
S: Service<R>,
{
self.svc.poll_shutdown(cx)
ServiceCtx::<'_, S>::new(&self.waiters)
.ready(self.svc.as_ref())
.await
}
#[inline]
@ -72,14 +45,13 @@ impl<S> Pipeline<S> {
where
S: Service<R>,
{
let ctx = ServiceCtx::<'_, S>::new(&self.waiters);
// check service readiness
self.ready().await?;
self.svc.as_ref().ready(ctx).await?;
// call service
self.svc
.as_ref()
.call(req, ServiceCtx::new(&self.waiters))
.await
self.svc.as_ref().call(req, ctx).await
}
#[inline]
@ -88,10 +60,16 @@ impl<S> Pipeline<S> {
pub fn call_static<R>(&self, req: R) -> PipelineCall<S, R>
where
S: Service<R> + 'static,
R: 'static,
{
let pl = self.clone();
PipelineCall {
state: PipelineCallState::Ready { req: Some(req) },
pipeline: self.clone(),
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call(pl.svc.as_ref(), req)
.await
}),
}
}
@ -102,18 +80,36 @@ impl<S> Pipeline<S> {
pub fn call_nowait<R>(&self, req: R) -> PipelineCall<S, R>
where
S: Service<R> + 'static,
R: 'static,
{
let pl = self.clone();
PipelineCall {
state: PipelineCallState::new_call(self, req),
pipeline: self.clone(),
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call_nowait(pl.svc.as_ref(), req)
.await
}),
}
}
/// Extract service if container hadnt been cloned before.
pub fn into_service(self) -> Option<S> {
let svc = self.svc.clone();
drop(self);
Rc::try_unwrap(svc).ok()
#[inline]
/// Shutdown enclosed service.
pub async fn shutdown<R>(&self)
where
S: Service<R>,
{
self.svc.as_ref().shutdown().await
}
#[inline]
/// Convert to lifetime object.
pub fn bind<R>(self) -> PipelineBinding<S, R>
where
S: Service<R> + 'static,
R: 'static,
{
PipelineBinding::new(self)
}
}
@ -129,58 +125,161 @@ impl<S> Clone for Pipeline<S> {
fn clone(&self) -> Self {
Self {
svc: self.svc.clone(),
pending: Cell::new(false),
waiters: self.waiters.clone(),
}
}
}
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
/// Bound container for a service.
pub struct PipelineBinding<S, R>
where
S: Service<R>,
{
pl: Pipeline<S>,
st: cell::UnsafeCell<State<S::Error>>,
}
enum State<E> {
New,
Readiness(Pin<Box<dyn Future<Output = Result<(), E>> + 'static>>),
Shutdown(Pin<Box<dyn Future<Output = ()> + 'static>>),
}
impl<S, R> PipelineBinding<S, R>
where
S: Service<R> + 'static,
R: 'static,
{
fn new(pl: Pipeline<S>) -> Self {
PipelineBinding {
pl,
st: cell::UnsafeCell::new(State::New),
}
}
#[inline]
/// Return reference to enclosed service
pub fn get_ref(&self) -> &S {
self.pl.svc.as_ref()
}
#[inline]
/// Returns `Ready` when the pipeline is able to process requests.
///
/// panics if .poll_shutdown() was called before.
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let st = unsafe { &mut *self.st.get() };
match st {
State::New => {
// SAFETY: `fut` has same lifetime same as lifetime of `self.pl`.
// Pipeline::svc is heap allocated(Rc<S>), and it is being kept alive until
// `self` is alive
let pl: &'static Pipeline<S> = unsafe { std::mem::transmute(&self.pl) };
let fut = Box::pin(CheckReadiness {
fut: None,
f: ready,
pl,
});
*st = State::Readiness(fut);
self.poll_ready(cx)
}
State::Readiness(ref mut fut) => Pin::new(fut).poll(cx),
State::Shutdown(_) => panic!("Pipeline is shutding down"),
}
}
#[inline]
/// Returns `Ready` when the service is properly shutdowns.
pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let st = unsafe { &mut *self.st.get() };
match st {
State::New | State::Readiness(_) => {
// SAFETY: `fut` has same lifetime same as lifetime of `self.pl`.
// Pipeline::svc is heap allocated(Rc<S>), and it is being kept alive until
// `self` is alive
let pl: &'static Pipeline<S> = unsafe { std::mem::transmute(&self.pl) };
*st = State::Shutdown(Box::pin(async move { pl.shutdown().await }));
self.poll_shutdown(cx)
}
State::Shutdown(ref mut fut) => Pin::new(fut).poll(cx),
}
}
#[inline]
/// Wait for service readiness and then create future object
/// that resolves to service result.
pub fn call(&self, req: R) -> PipelineCall<S, R> {
let pl = self.pl.clone();
PipelineCall {
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call(pl.svc.as_ref(), req)
.await
}),
}
}
#[inline]
/// Call service and create future object that resolves to service result.
///
/// Note, this call does not check service readiness.
pub fn call_nowait(&self, req: R) -> PipelineCall<S, R> {
let pl = self.pl.clone();
PipelineCall {
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call_nowait(pl.svc.as_ref(), req)
.await
}),
}
}
#[inline]
/// Shutdown enclosed service.
pub async fn shutdown(&self) {
self.pl.svc.as_ref().shutdown().await
}
}
impl<S, R> Clone for PipelineBinding<S, R>
where
S: Service<R>,
{
#[inline]
fn clone(&self) -> Self {
Self {
pl: self.pl.clone(),
st: cell::UnsafeCell::new(State::New),
}
}
}
impl<S, R> fmt::Debug for PipelineBinding<S, R>
where
S: Service<R> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipelineBinding")
.field("pipeline", &self.pl)
.finish()
}
}
#[allow(missing_debug_implementations)]
#[must_use = "futures do nothing unless polled"]
/// Pipeline call future
pub struct PipelineCall<S, R>
where
S: Service<R>,
R: 'static,
{
state: PipelineCallState<S, R>,
pipeline: Pipeline<S>,
fut: Call<S::Response, S::Error>,
}
impl<S: Service<R>, R> Unpin for PipelineCall<S, R> {}
enum PipelineCallState<S, Req>
where
S: Service<Req>,
Req: 'static,
{
Ready {
req: Option<Req>,
},
Call {
fut: BoxFuture<'static, Result<S::Response, S::Error>>,
},
}
impl<S, R> PipelineCallState<S, R>
where
S: Service<R>,
R: 'static,
{
fn new_call<'a>(pl: &'a Pipeline<S>, req: R) -> Self {
let ctx = ServiceCtx::new(&pl.waiters);
let svc_call: BoxFuture<'a, Result<S::Response, S::Error>> =
Box::pin(pl.get_ref().call(req, ctx));
// SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc`
// Pipeline::svc is heap allocated(Rc<S>), and it is being kept alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
PipelineCallState::Call { fut }
}
}
type Call<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + 'static>>;
impl<S, R> Future for PipelineCall<S, R>
where
@ -190,21 +289,7 @@ where
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut slf = self.as_mut();
if let PipelineCallState::Call { ref mut fut, .. } = slf.state {
return Pin::new(fut).poll(cx);
}
task::ready!(slf.pipeline.poll_ready(cx))?;
let req = if let PipelineCallState::Ready { ref mut req } = slf.state {
req.take().unwrap()
} else {
panic!("future must not be polled after it returned `Poll::Ready`")
};
slf.state = PipelineCallState::new_call(&slf.pipeline, req);
slf.poll(cx)
Pin::new(&mut self.as_mut().fut).poll(cx)
}
}
@ -216,3 +301,52 @@ where
f.debug_struct("PipelineCall").finish()
}
}
fn ready<S, R>(pl: &'static Pipeline<S>) -> impl Future<Output = Result<(), S::Error>>
where
S: Service<R>,
R: 'static,
{
pl.svc.ready(ServiceCtx::<'_, S>::new(&pl.waiters))
}
struct CheckReadiness<S: 'static, F, Fut> {
f: F,
fut: Option<Fut>,
pl: &'static Pipeline<S>,
}
impl<S, F, Fut> Unpin for CheckReadiness<S, F, Fut> {}
impl<T, S, F, Fut> Future for CheckReadiness<S, F, Fut>
where
F: Fn(&'static Pipeline<S>) -> Fut,
Fut: Future<Output = T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut slf = self.as_mut();
if slf.pl.waiters.can_check(cx) {
if let Some(ref mut fut) = slf.fut {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => {
slf.pl.waiters.register(cx);
Poll::Pending
}
Poll::Ready(res) => {
let _ = slf.fut.take();
slf.pl.waiters.notify();
Poll::Ready(res)
}
}
} else {
slf.fut = Some((slf.f)(slf.pl));
self.poll(cx)
}
} else {
Poll::Pending
}
}
}

View file

@ -1,6 +1,4 @@
use std::{task::Context, task::Poll};
use super::{Service, ServiceCtx, ServiceFactory};
use super::{util, Service, ServiceCtx, ServiceFactory};
#[derive(Debug, Clone)]
/// Service for the `then` combinator, chaining a computation onto the end of
@ -28,22 +26,13 @@ where
type Error = B::Error;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.svc1.poll_ready(cx)?.is_ready();
if !self.svc2.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
util::ready(&self.svc1, &self.svc2, ctx).await
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.svc1.poll_shutdown(cx).is_ready() && self.svc2.poll_shutdown(cx).is_ready()
{
Poll::Ready(())
} else {
Poll::Pending
}
#[inline]
async fn shutdown(&self) {
util::shutdown(&self.svc1, &self.svc2).await
}
#[inline]
@ -97,21 +86,20 @@ where
#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, Ready};
use std::{cell::Cell, rc::Rc, task::Context, task::Poll};
use std::{cell::Cell, rc::Rc};
use crate::{chain, chain_factory, Service, ServiceCtx};
use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);
struct Srv1(Rc<Cell<usize>>, Rc<Cell<usize>>);
impl Service<Result<&'static str, &'static str>> for Srv1 {
type Response = &'static str;
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
Ok(())
}
async fn call(
@ -124,18 +112,22 @@ mod tests {
Err(_) => Err(()),
}
}
async fn shutdown(&self) {
self.1.set(self.1.get() + 1);
}
}
#[derive(Clone)]
struct Srv2(Rc<Cell<usize>>);
struct Srv2(Rc<Cell<usize>>, Rc<Cell<usize>>);
impl Service<Result<&'static str, ()>> for Srv2 {
type Response = (&'static str, &'static str);
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
Ok(())
}
async fn call(
@ -148,24 +140,31 @@ mod tests {
Err(()) => Ok(("srv2", "err")),
}
}
async fn shutdown(&self) {
self.1.set(self.1.get() + 1);
}
}
#[ntex::test]
async fn test_poll_ready() {
async fn test_ready() {
let cnt = Rc::new(Cell::new(0));
let srv = chain(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
let cnt_sht = Rc::new(Cell::new(0));
let srv = chain(Srv1(cnt.clone(), cnt_sht.clone()))
.then(Srv2(cnt.clone(), cnt_sht.clone()))
.into_pipeline();
let res = srv.ready().await;
assert_eq!(res, Ok(()));
assert_eq!(cnt.get(), 2);
let res = lazy(|cx| srv.poll_shutdown(cx)).await;
assert_eq!(res, Poll::Ready(()));
srv.shutdown().await;
assert_eq!(cnt_sht.get(), 2);
}
#[ntex::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let srv = chain(Srv1(cnt.clone()))
.then(Srv2(cnt))
let srv = chain(Srv1(cnt.clone(), Rc::new(Cell::new(0))))
.then(Srv2(cnt, Rc::new(Cell::new(0))))
.clone()
.into_pipeline();
@ -182,9 +181,15 @@ mod tests {
async fn test_factory() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let blank = move || Ready::<_, ()>::Ok(Srv1(cnt2.clone()));
let blank = fn_factory(move || {
let cnt = cnt2.clone();
async move { Ok::<_, ()>(Srv1(cnt, Rc::new(Cell::new(0)))) }
});
let factory = chain_factory(blank)
.then(move || Ready::Ok(Srv2(cnt.clone())))
.then(fn_factory(move || {
let cnt = cnt.clone();
async move { Ok(Srv2(cnt.clone(), Rc::new(Cell::new(0)))) }
}))
.clone();
let srv = factory.pipeline(&()).await.unwrap();
let res = srv.call(Ok("srv1")).await;

75
ntex-service/src/util.rs Normal file
View file

@ -0,0 +1,75 @@
use std::{future::poll_fn, future::Future, pin, task::Poll};
use crate::{Service, ServiceCtx};
pub(crate) async fn shutdown<A, AR, B, BR>(svc1: &A, svc2: &B)
where
A: Service<AR>,
B: Service<BR>,
{
let mut fut1 = pin::pin!(svc1.shutdown());
let mut fut2 = pin::pin!(svc2.shutdown());
let mut ready1 = false;
let mut ready2 = false;
poll_fn(move |cx| {
if !ready1 {
match pin::Pin::new(&mut fut1).poll(cx) {
Poll::Ready(_) => ready1 = true,
Poll::Pending => (),
}
}
if !ready2 {
match pin::Pin::new(&mut fut2).poll(cx) {
Poll::Ready(_) => ready2 = true,
Poll::Pending => (),
}
}
if ready1 && ready2 {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await
}
pub(crate) async fn ready<S, A, AR, B, BR>(
svc1: &A,
svc2: &B,
ctx: ServiceCtx<'_, S>,
) -> Result<(), A::Error>
where
A: Service<AR>,
B: Service<BR, Error = A::Error>,
{
let mut fut1 = pin::pin!(ctx.ready(svc1));
let mut fut2 = pin::pin!(ctx.ready(svc2));
let mut ready1 = false;
let mut ready2 = false;
poll_fn(move |cx| {
if !ready1 {
match pin::Pin::new(&mut fut1).poll(cx) {
Poll::Ready(Ok(())) => ready1 = true,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => (),
}
}
if !ready2 {
match pin::Pin::new(&mut fut2).poll(cx) {
Poll::Ready(Ok(())) => ready2 = true,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => (),
};
}
if ready1 && ready2 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
})
.await
}