From 011e9cdfeac7743499e9875098fd6554749ae931 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 2 Nov 2024 12:49:49 +0500 Subject: [PATCH] Added Service::not_ready() method (#449) --- README.md | 2 +- ntex-service/CHANGES.md | 4 + ntex-service/Cargo.toml | 2 +- ntex-service/src/and_then.rs | 5 + ntex-service/src/apply.rs | 5 + ntex-service/src/boxed.rs | 38 +++--- ntex-service/src/ctx.rs | 249 ++++++++++------------------------- ntex-service/src/lib.rs | 18 ++- ntex-service/src/macros.rs | 13 ++ ntex-service/src/map_err.rs | 5 + ntex-service/src/pipeline.rs | 176 +++++++++++++++++++------ ntex-service/src/then.rs | 5 + ntex-service/src/util.rs | 31 ++++- 13 files changed, 314 insertions(+), 239 deletions(-) diff --git a/README.md b/README.md index 175d98ca..fbf20228 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ [![Version](https://img.shields.io/badge/rustc-1.75+-lightgray.svg)](https://blog.rust-lang.org/2023/12/28/Rust-1.75.0.html) ![License](https://img.shields.io/crates/l/ntex.svg) [![codecov](https://codecov.io/gh/ntex-rs/ntex/branch/master/graph/badge.svg)](https://codecov.io/gh/ntex-rs/ntex) -[![Chat on Discord](https://img.shields.io/discord/919288597826387979?label=chat&logo=discord)](https://discord.gg/zBNyhVRz) +[![Chat on Discord](https://img.shields.io/discord/919288597826387979?label=chat&logo=discord)](https://discord.com/channels/919288597826387979/919288597826387982)

diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 87525b87..f3e14370 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [3.3.0] - 2024-11-02 + +* Added Service::not_ready() method + ## [3.2.1] - 2024-10-31 * Fix shared readiness notification diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 86ef1147..48b568dc 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "3.2.1" +version = "3.3.0" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index fb86be56..4ea3e706 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -30,6 +30,11 @@ where util::ready(&self.svc1, &self.svc2, ctx).await } + #[inline] + async fn not_ready(&self) -> Result<(), Self::Error> { + util::select(self.svc1.not_ready(), self.svc2.not_ready()).await + } + #[inline] async fn shutdown(&self) { util::shutdown(&self.svc1, &self.svc2).await diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 5425d7d2..985cd968 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -104,6 +104,11 @@ where self.service.ready().await.map_err(From::from) } + #[inline] + async fn not_ready(&self) -> Result<(), Err> { + self.service.get_ref().not_ready().await.map_err(From::from) + } + #[inline] async fn shutdown(&self) { self.service.shutdown().await diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index ad8f2927..d940d454 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::Future, pin::Pin, rc::Rc}; +use std::{fmt, future::Future, pin::Pin}; use crate::ctx::{ServiceCtx, WaitersRef}; @@ -50,15 +50,17 @@ trait ServiceObj { fn ready<'a>( &'a self, - idx: usize, - waiters: &'a Rc, + idx: u32, + waiters: &'a WaitersRef, ) -> BoxFuture<'a, (), Self::Error>; + fn not_ready(&self) -> BoxFuture<'_, (), Self::Error>; + fn call<'a>( &'a self, req: Req, - idx: usize, - waiters: &'a Rc, + idx: u32, + waiters: &'a WaitersRef, ) -> BoxFuture<'a, Self::Response, Self::Error>; fn shutdown<'a>(&'a self) -> Pin + 'a>>; @@ -75,14 +77,15 @@ where #[inline] fn ready<'a>( &'a self, - idx: usize, - waiters: &'a Rc, + idx: u32, + waiters: &'a WaitersRef, ) -> BoxFuture<'a, (), Self::Error> { - Box::pin(async move { - ServiceCtx::<'a, S>::from_ref(idx, waiters) - .ready(self) - .await - }) + Box::pin(async move { ServiceCtx::<'a, S>::new(idx, waiters).ready(self).await }) + } + + #[inline] + fn not_ready(&self) -> BoxFuture<'_, (), Self::Error> { + Box::pin(crate::Service::not_ready(self)) } #[inline] @@ -94,11 +97,11 @@ where fn call<'a>( &'a self, req: Req, - idx: usize, - waiters: &'a Rc, + idx: u32, + waiters: &'a WaitersRef, ) -> BoxFuture<'a, Self::Response, Self::Error> { Box::pin(async move { - ServiceCtx::<'a, S>::from_ref(idx, waiters) + ServiceCtx::<'a, S>::new(idx, waiters) .call_nowait(self, req) .await }) @@ -155,6 +158,11 @@ where self.0.ready(idx, waiters).await } + #[inline] + async fn not_ready(&self) -> Result<(), Self::Error> { + self.0.not_ready().await + } + #[inline] async fn shutdown(&self) { self.0.shutdown().await diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index c5223fdf..36b07a03 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,71 +1,90 @@ -use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task, task::Context}; +use std::task::{Context, Poll, Waker}; +use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc}; use crate::Service; pub struct ServiceCtx<'a, S: ?Sized> { - idx: usize, - waiters: &'a Rc, + idx: u32, + waiters: &'a WaitersRef, _t: marker::PhantomData>, } -#[derive(Clone, Debug)] -/// Pipeline tag allows to notify pipeline binding -pub struct PipelineTag(Rc); - -pub(crate) struct Waiters { - index: usize, - waiters: Rc, -} - #[derive(Debug)] pub(crate) struct WaitersRef { - cur: cell::Cell, - indexes: cell::UnsafeCell>>, -} - -impl PipelineTag { - /// Notify pipeline dispatcher - pub fn notify(&self) { - if let Some(waker) = self.0.get()[0].take() { - waker.wake(); - } - } + cur: cell::Cell, + wakers: cell::UnsafeCell>, + indexes: cell::UnsafeCell>>, } impl WaitersRef { + pub(crate) fn new() -> (u32, Self) { + let mut waiters = slab::Slab::new(); + + // first insert for wake ups from services + let _ = waiters.insert(None); + + ( + waiters.insert(Default::default()) as u32, + WaitersRef { + cur: cell::Cell::new(u32::MAX), + indexes: cell::UnsafeCell::new(waiters), + wakers: cell::UnsafeCell::new(Vec::default()), + }, + ) + } + #[allow(clippy::mut_from_ref)] - fn get(&self) -> &mut slab::Slab> { + pub(crate) fn get(&self) -> &mut slab::Slab> { unsafe { &mut *self.indexes.get() } } - fn insert(&self) -> usize { - self.get().insert(None) + #[allow(clippy::mut_from_ref)] + pub(crate) fn get_wakers(&self) -> &mut Vec { + unsafe { &mut *self.wakers.get() } } - fn remove(&self, idx: usize) { - self.notify(); - self.get().remove(idx); + pub(crate) fn insert(&self) -> u32 { + self.get().insert(None) as u32 } - fn register(&self, idx: usize, cx: &mut Context<'_>) { - self.get()[idx] = Some(cx.waker().clone()); + pub(crate) fn remove(&self, idx: u32) { + self.get().remove(idx as usize); + + if self.cur.get() == idx { + self.notify(); + } } - fn notify(&self) { - for (_, waker) in self.get().iter_mut().skip(1) { - if let Some(waker) = waker.take() { - waker.wake(); + pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) { + self.get()[idx as usize] = Some(cx.waker().clone()); + self.get_wakers().push(idx); + } + + pub(crate) fn register_unready(&self, cx: &mut Context<'_>) { + self.get()[0] = Some(cx.waker().clone()); + self.get_wakers().push(0); + } + + pub(crate) fn notify(&self) { + let indexes = self.get(); + let wakers = self.get_wakers(); + + for idx in wakers.drain(..) { + if let Some(item) = indexes.get_mut(idx as usize) { + if let Some(waker) = item.take() { + waker.wake(); + } } } - self.cur.set(usize::MAX); + self.cur.set(u32::MAX); } - pub(crate) fn can_check(&self, idx: usize, cx: &mut Context<'_>) -> bool { + pub(crate) fn can_check(&self, idx: u32, cx: &mut Context<'_>) -> bool { let cur = self.cur.get(); if cur == idx { true - } else if cur == usize::MAX { + } else if cur == u32::MAX { self.cur.set(idx); true } else { @@ -75,81 +94,8 @@ impl WaitersRef { } } -impl Waiters { - pub(crate) fn new() -> Self { - let mut waiters = slab::Slab::new(); - - // first insert for wake ups from services - let _ = waiters.insert(None); - - Waiters { - index: waiters.insert(None), - waiters: Rc::new(WaitersRef { - cur: cell::Cell::new(usize::MAX), - indexes: cell::UnsafeCell::new(waiters), - }), - } - } - - pub(crate) fn get_ref(&self) -> &Rc { - &self.waiters - } - - pub(crate) fn can_check(&self, cx: &mut Context<'_>) -> bool { - self.waiters.can_check(self.index, cx) - } - - pub(crate) fn register(&self, cx: &mut Context<'_>) { - self.waiters.register(self.index, cx); - } - - pub(crate) fn register_pipeline(&self, cx: &mut Context<'_>) { - self.waiters.register(0, cx); - } - - pub(crate) fn notify(&self) { - if self.waiters.cur.get() == self.index { - self.waiters.notify(); - } - } -} - -impl Drop for Waiters { - #[inline] - fn drop(&mut self) { - self.waiters.remove(self.index); - self.notify(); - } -} - -impl Clone for Waiters { - fn clone(&self) -> Self { - Waiters { - index: self.waiters.insert(), - waiters: self.waiters.clone(), - } - } -} - -impl fmt::Debug for Waiters { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Waiters") - .field("index", &self.index) - .field("waiters", &self.waiters.get().len()) - .finish() - } -} - impl<'a, S> ServiceCtx<'a, S> { - pub(crate) fn new(waiters: &'a Waiters) -> Self { - Self { - idx: waiters.index, - waiters: waiters.get_ref(), - _t: marker::PhantomData, - } - } - - pub(crate) fn from_ref(idx: usize, waiters: &'a Rc) -> Self { + pub(crate) fn new(idx: u32, waiters: &'a WaitersRef) -> Self { Self { idx, waiters, @@ -157,7 +103,7 @@ impl<'a, S> ServiceCtx<'a, S> { } } - pub(crate) fn inner(self) -> (usize, &'a Rc) { + pub(crate) fn inner(self) -> (u32, &'a WaitersRef) { (self.idx, self.waiters) } @@ -220,23 +166,18 @@ impl<'a, S> ServiceCtx<'a, S> { ) .await } - - /// Get pipeline tag for current pipeline - pub fn tag(&self) -> PipelineTag { - PipelineTag(self.waiters.clone()) - } } -impl<'a, S> Copy for ServiceCtx<'a, S> {} +impl Copy for ServiceCtx<'_, S> {} -impl<'a, S> Clone for ServiceCtx<'a, S> { +impl Clone for ServiceCtx<'_, S> { #[inline] fn clone(&self) -> Self { *self } } -impl<'a, S> fmt::Debug for ServiceCtx<'a, S> { +impl fmt::Debug for ServiceCtx<'_, S> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ServiceCtx") .field("idx", &self.idx) @@ -251,7 +192,7 @@ struct ReadyCall<'a, S: ?Sized, F: Future> { ctx: ServiceCtx<'a, S>, } -impl<'a, S: ?Sized, F: Future> Drop for ReadyCall<'a, S, F> { +impl Drop for ReadyCall<'_, S, F> { fn drop(&mut self) { if !self.completed && self.ctx.waiters.cur.get() == self.ctx.idx { self.ctx.waiters.notify(); @@ -259,35 +200,35 @@ impl<'a, S: ?Sized, F: Future> Drop for ReadyCall<'a, S, F> { } } -impl<'a, S: ?Sized, F: Future> Unpin for ReadyCall<'a, S, F> {} +impl Unpin for ReadyCall<'_, S, F> {} -impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> { +impl Future for ReadyCall<'_, S, F> { type Output = F::Output; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task::Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.ctx.waiters.can_check(self.ctx.idx, cx) { // SAFETY: `fut` never moves let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) }; match result { - task::Poll::Pending => { + Poll::Pending => { self.ctx.waiters.register(self.ctx.idx, cx); - task::Poll::Pending + Poll::Pending } - task::Poll::Ready(res) => { + Poll::Ready(res) => { self.completed = true; self.ctx.waiters.notify(); - task::Poll::Ready(res) + Poll::Ready(res) } } } else { - task::Poll::Pending + Poll::Pending } } } #[cfg(test)] mod tests { - use std::{cell::Cell, cell::RefCell, future::poll_fn, task::Poll}; + use std::{cell::Cell, cell::RefCell, future::poll_fn}; use ntex_util::channel::{condition, oneshot}; use ntex_util::{future::lazy, future::select, spawn, time}; @@ -453,50 +394,4 @@ mod tests { assert_eq!(cnt.get(), 2); assert_eq!(&*data.borrow(), &["srv1", "srv2"]); } - - #[ntex::test] - async fn test_pipeline_tag() { - struct Srv(Rc>, Cell>); - - impl Service<&'static str> for Srv { - type Response = &'static str; - type Error = (); - - async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { - self.1.set(Some(ctx.tag())); - self.0.set(self.0.get() + 1); - Ok(()) - } - - async fn call( - &self, - req: &'static str, - _: ServiceCtx<'_, Self>, - ) -> Result<&'static str, ()> { - Ok(req) - } - } - - let cnt = Rc::new(Cell::new(0)); - let con = condition::Condition::new(); - - let srv = Pipeline::from(Srv(cnt.clone(), Cell::new(None))).bind(); - - let srv1 = srv.clone(); - let waiter = con.wait(); - ntex::rt::spawn(async move { - let _ = poll_fn(|cx| { - let _ = srv1.poll_ready(cx); - waiter.poll_ready(cx) - }) - .await; - }); - time::sleep(time::Millis(50)).await; - assert_eq!(cnt.get(), 1); - - let tag = srv.get_ref().1.take().unwrap(); - tag.notify(); - time::sleep(time::Millis(50)).await; - assert_eq!(cnt.get(), 2); - } } diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index bf4afbed..fe207766 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -27,7 +27,7 @@ mod util; pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::chain::{chain, chain_factory}; -pub use self::ctx::{PipelineTag, ServiceCtx}; +pub use self::ctx::ServiceCtx; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_shutdown::fn_shutdown; pub use self::map_config::{map_config, unit_config}; @@ -118,6 +118,12 @@ pub trait Service { Ok(()) } + #[inline] + /// Returns when the service is not able to process requests. + async fn not_ready(&self) -> Result<(), Self::Error> { + std::future::pending().await + } + #[inline] /// Shutdown service. /// @@ -246,6 +252,11 @@ where ctx.ready(&**self).await } + #[inline] + async fn not_ready(&self) -> Result<(), S::Error> { + (**self).not_ready().await + } + #[inline] async fn shutdown(&self) { (**self).shutdown().await @@ -273,6 +284,11 @@ where ctx.ready(&**self).await } + #[inline] + async fn not_ready(&self) -> Result<(), S::Error> { + (**self).not_ready().await + } + #[inline] async fn shutdown(&self) { (**self).shutdown().await diff --git a/ntex-service/src/macros.rs b/ntex-service/src/macros.rs index ac712710..6d57c25b 100644 --- a/ntex-service/src/macros.rs +++ b/ntex-service/src/macros.rs @@ -11,6 +11,14 @@ macro_rules! forward_ready { .await .map_err(::core::convert::Into::into) } + + #[inline] + async fn not_ready(&self) -> Result<(), Self::Error> { + self.$field + .not_ready() + .await + .map_err(::core::convert::Into::into) + } }; ($field:ident, $err:expr) => { #[inline] @@ -20,6 +28,11 @@ macro_rules! forward_ready { ) -> Result<(), Self::Error> { ctx.ready(&self.$field).await.map_err($err) } + + #[inline] + async fn not_ready(&self) -> Result<(), Self::Error> { + self.$field.not_ready().await.map_err($err) + } }; } diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index fcb24263..f2073de5 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -67,6 +67,11 @@ where ctx.ready(&self.service).await.map_err(&self.f) } + #[inline] + async fn not_ready(&self) -> Result<(), Self::Error> { + self.service.not_ready().await.map_err(&self.f) + } + #[inline] async fn call( &self, diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 7d1d76e9..cf9b15cd 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -1,30 +1,42 @@ use std::{cell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; -use crate::{ctx::Waiters, Service, ServiceCtx}; +use crate::{ctx::WaitersRef, Service, ServiceCtx}; #[derive(Debug)] /// Container for a service. /// /// Container allows to call enclosed service and adds support of shared readiness. pub struct Pipeline { - svc: Rc, - pub(crate) waiters: Waiters, + index: u32, + state: Rc>, +} + +struct PipelineState { + svc: S, + waiters: WaitersRef, +} + +impl PipelineState { + pub(crate) fn waiters_ref(&self) -> &WaitersRef { + &self.waiters + } } impl Pipeline { #[inline] /// Construct new container instance. pub fn new(svc: S) -> Self { + let (index, waiters) = WaitersRef::new(); Pipeline { - svc: Rc::new(svc), - waiters: Waiters::new(), + index, + state: Rc::new(PipelineState { svc, waiters }), } } #[inline] /// Return reference to enclosed service pub fn get_ref(&self) -> &S { - self.svc.as_ref() + &self.state.svc } #[inline] @@ -33,8 +45,8 @@ impl Pipeline { where S: Service, { - ServiceCtx::<'_, S>::new(&self.waiters) - .ready(self.svc.as_ref()) + ServiceCtx::<'_, S>::new(self.index, self.state.waiters_ref()) + .ready(&self.state.svc) .await } @@ -45,13 +57,9 @@ impl Pipeline { where S: Service, { - let ctx = ServiceCtx::<'_, S>::new(&self.waiters); - - // check service readiness - self.svc.as_ref().ready(ctx).await?; - - // call service - self.svc.as_ref().call(req, ctx).await + ServiceCtx::<'_, S>::new(self.index, self.state.waiters_ref()) + .call(&self.state.svc, req) + .await } #[inline] @@ -66,8 +74,8 @@ impl Pipeline { PipelineCall { fut: Box::pin(async move { - ServiceCtx::::new(&pl.waiters) - .call(pl.svc.as_ref(), req) + ServiceCtx::::new(pl.index, pl.state.waiters_ref()) + .call(&pl.state.svc, req) .await }), } @@ -86,8 +94,8 @@ impl Pipeline { PipelineCall { fut: Box::pin(async move { - ServiceCtx::::new(&pl.waiters) - .call_nowait(pl.svc.as_ref(), req) + ServiceCtx::::new(pl.index, pl.state.waiters_ref()) + .call_nowait(&pl.state.svc, req) .await }), } @@ -99,11 +107,11 @@ impl Pipeline { where S: Service, { - self.svc.as_ref().shutdown().await + self.state.svc.shutdown().await } #[inline] - /// Convert to lifetime object. + /// Get current pipeline. pub fn bind(self) -> PipelineBinding where S: Service + 'static, @@ -121,15 +129,30 @@ impl From for Pipeline { } impl Clone for Pipeline { - #[inline] fn clone(&self) -> Self { - Self { - svc: self.svc.clone(), - waiters: self.waiters.clone(), + Pipeline { + index: self.state.waiters.insert(), + state: self.state.clone(), } } } +impl Drop for Pipeline { + #[inline] + fn drop(&mut self) { + self.state.waiters.remove(self.index); + } +} + +impl fmt::Debug for PipelineState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PipelineState") + .field("svc", &self.svc) + .field("waiters", &self.waiters.get().len()) + .finish() + } +} + /// Bound container for a service. pub struct PipelineBinding where @@ -137,6 +160,7 @@ where { pl: Pipeline, st: cell::UnsafeCell>, + not_ready: cell::UnsafeCell>, } enum State { @@ -145,6 +169,11 @@ enum State { Shutdown(Pin + 'static>>), } +enum StateNotReady { + New, + Readiness(Pin> + 'static>>), +} + impl PipelineBinding where S: Service + 'static, @@ -154,13 +183,14 @@ where PipelineBinding { pl, st: cell::UnsafeCell::new(State::New), + not_ready: cell::UnsafeCell::new(StateNotReady::New), } } #[inline] /// Return reference to enclosed service pub fn get_ref(&self) -> &S { - self.pl.svc.as_ref() + &self.pl.state.svc } #[inline] @@ -189,6 +219,29 @@ where } } + #[inline] + /// Returns when the pipeline is not able to process requests. + pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll> { + let st = unsafe { &mut *self.not_ready.get() }; + + match st { + StateNotReady::New => { + // SAFETY: `fut` has same lifetime same as lifetime of `self.pl`. + // Pipeline::svc is heap allocated(Rc), and it is being kept alive until + // `self` is alive + let pl: &'static Pipeline = unsafe { std::mem::transmute(&self.pl) }; + let fut = Box::pin(CheckUnReadiness { + fut: None, + f: not_ready, + pl, + }); + *st = StateNotReady::Readiness(fut); + self.poll_not_ready(cx) + } + StateNotReady::Readiness(ref mut fut) => Pin::new(fut).poll(cx), + } + } + #[inline] /// Returns `Ready` when the service is properly shutdowns. pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { @@ -215,8 +268,8 @@ where PipelineCall { fut: Box::pin(async move { - ServiceCtx::::new(&pl.waiters) - .call(pl.svc.as_ref(), req) + ServiceCtx::::new(pl.index, pl.state.waiters_ref()) + .call(&pl.state.svc, req) .await }), } @@ -231,8 +284,8 @@ where PipelineCall { fut: Box::pin(async move { - ServiceCtx::::new(&pl.waiters) - .call_nowait(pl.svc.as_ref(), req) + ServiceCtx::::new(pl.index, pl.state.waiters_ref()) + .call_nowait(&pl.state.svc, req) .await }), } @@ -241,7 +294,7 @@ where #[inline] /// Shutdown enclosed service. pub async fn shutdown(&self) { - self.pl.svc.as_ref().shutdown().await + self.pl.state.svc.shutdown().await } } @@ -263,6 +316,7 @@ where Self { pl: self.pl.clone(), st: cell::UnsafeCell::new(State::New), + not_ready: cell::UnsafeCell::new(StateNotReady::New), } } } @@ -316,7 +370,17 @@ where S: Service, R: 'static, { - pl.svc.ready(ServiceCtx::<'_, S>::new(&pl.waiters)) + pl.state + .svc + .ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref())) +} + +fn not_ready(pl: &'static Pipeline) -> impl Future> +where + S: Service, + R: 'static, +{ + pl.state.svc.not_ready() } struct CheckReadiness { @@ -331,7 +395,7 @@ impl Drop for CheckReadiness { fn drop(&mut self) { // future fot dropped during polling, we must notify other waiters if self.fut.is_some() { - self.pl.waiters.notify(); + self.pl.state.waiters.notify(); } } } @@ -346,19 +410,16 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut slf = self.as_mut(); - // register pipeline tag - slf.pl.waiters.register_pipeline(cx); - - if slf.pl.waiters.can_check(cx) { + if slf.pl.state.waiters.can_check(slf.pl.index, cx) { if let Some(ref mut fut) = slf.fut { match unsafe { Pin::new_unchecked(fut) }.poll(cx) { Poll::Pending => { - slf.pl.waiters.register(cx); + slf.pl.state.waiters.register(slf.pl.index, cx); Poll::Pending } Poll::Ready(res) => { let _ = slf.fut.take(); - slf.pl.waiters.notify(); + slf.pl.state.waiters.notify(); Poll::Ready(res) } } @@ -371,3 +432,40 @@ where } } } + +struct CheckUnReadiness { + f: F, + fut: Option, + pl: &'static Pipeline, +} + +impl Unpin for CheckUnReadiness {} + +impl Future for CheckUnReadiness +where + F: Fn(&'static Pipeline) -> Fut, + Fut: Future, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut slf = self.as_mut(); + + if let Some(ref mut fut) = slf.fut { + match unsafe { Pin::new_unchecked(fut) }.poll(cx) { + Poll::Pending => { + slf.pl.state.waiters.register_unready(cx); + Poll::Pending + } + Poll::Ready(res) => { + let _ = slf.fut.take(); + slf.pl.state.waiters.notify(); + Poll::Ready(res) + } + } + } else { + slf.fut = Some((slf.f)(slf.pl)); + self.poll(cx) + } + } +} diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 0f5f8afe..5b039a8e 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -30,6 +30,11 @@ where util::ready(&self.svc1, &self.svc2, ctx).await } + #[inline] + async fn not_ready(&self) -> Result<(), Self::Error> { + util::select(self.svc1.not_ready(), self.svc2.not_ready()).await + } + #[inline] async fn shutdown(&self) { util::shutdown(&self.svc1, &self.svc2).await diff --git a/ntex-service/src/util.rs b/ntex-service/src/util.rs index 6015ab3b..4c041a15 100644 --- a/ntex-service/src/util.rs +++ b/ntex-service/src/util.rs @@ -1,4 +1,4 @@ -use std::{future::poll_fn, future::Future, pin, task::Poll}; +use std::{future::poll_fn, future::Future, pin, pin::Pin, task::Poll}; use crate::{Service, ServiceCtx}; @@ -14,10 +14,10 @@ where let mut ready2 = false; poll_fn(move |cx| { - if !ready1 && pin::Pin::new(&mut fut1).poll(cx).is_ready() { + if !ready1 && Pin::new(&mut fut1).poll(cx).is_ready() { ready1 = true; } - if !ready2 && pin::Pin::new(&mut fut2).poll(cx).is_ready() { + if !ready2 && Pin::new(&mut fut2).poll(cx).is_ready() { ready2 = true } if ready1 && ready2 { @@ -45,10 +45,10 @@ where let mut ready2 = false; poll_fn(move |cx| { - if !ready1 && pin::Pin::new(&mut fut1).poll(cx)?.is_ready() { + if !ready1 && Pin::new(&mut fut1).poll(cx)?.is_ready() { ready1 = true; } - if !ready2 && pin::Pin::new(&mut fut2).poll(cx)?.is_ready() { + if !ready2 && Pin::new(&mut fut2).poll(cx)?.is_ready() { ready2 = true; } if ready1 && ready2 { @@ -59,3 +59,24 @@ where }) .await } + +/// Waits for either one of two differently-typed futures to complete. +pub(crate) async fn select(fut1: A, fut2: B) -> A::Output +where + A: Future, + B: Future, +{ + let mut fut1 = pin::pin!(fut1); + let mut fut2 = pin::pin!(fut2); + + poll_fn(|cx| { + if let Poll::Ready(item) = Pin::new(&mut fut1).poll(cx) { + return Poll::Ready(item); + } + if let Poll::Ready(item) = Pin::new(&mut fut2).poll(cx) { + return Poll::Ready(item); + } + Poll::Pending + }) + .await +}