mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 21:07:39 +03:00
Added Service::not_ready() method (#449)
This commit is contained in:
parent
d8f2c87781
commit
011e9cdfea
13 changed files with 314 additions and 239 deletions
|
@ -9,7 +9,7 @@
|
|||
[](https://blog.rust-lang.org/2023/12/28/Rust-1.75.0.html)
|
||||

|
||||
[](https://codecov.io/gh/ntex-rs/ntex)
|
||||
[](https://discord.gg/zBNyhVRz)
|
||||
[](https://discord.com/channels/919288597826387979/919288597826387982)
|
||||
|
||||
</p>
|
||||
</div>
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [3.3.0] - 2024-11-02
|
||||
|
||||
* Added Service::not_ready() method
|
||||
|
||||
## [3.2.1] - 2024-10-31
|
||||
|
||||
* Fix shared readiness notification
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-service"
|
||||
version = "3.2.1"
|
||||
version = "3.3.0"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "ntex service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
|
|
@ -30,6 +30,11 @@ where
|
|||
util::ready(&self.svc1, &self.svc2, ctx).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
||||
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
util::shutdown(&self.svc1, &self.svc2).await
|
||||
|
|
|
@ -104,6 +104,11 @@ where
|
|||
self.service.ready().await.map_err(From::from)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), Err> {
|
||||
self.service.get_ref().not_ready().await.map_err(From::from)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
self.service.shutdown().await
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{fmt, future::Future, pin::Pin, rc::Rc};
|
||||
use std::{fmt, future::Future, pin::Pin};
|
||||
|
||||
use crate::ctx::{ServiceCtx, WaitersRef};
|
||||
|
||||
|
@ -50,15 +50,17 @@ trait ServiceObj<Req> {
|
|||
|
||||
fn ready<'a>(
|
||||
&'a self,
|
||||
idx: usize,
|
||||
waiters: &'a Rc<WaitersRef>,
|
||||
idx: u32,
|
||||
waiters: &'a WaitersRef,
|
||||
) -> BoxFuture<'a, (), Self::Error>;
|
||||
|
||||
fn not_ready(&self) -> BoxFuture<'_, (), Self::Error>;
|
||||
|
||||
fn call<'a>(
|
||||
&'a self,
|
||||
req: Req,
|
||||
idx: usize,
|
||||
waiters: &'a Rc<WaitersRef>,
|
||||
idx: u32,
|
||||
waiters: &'a WaitersRef,
|
||||
) -> BoxFuture<'a, Self::Response, Self::Error>;
|
||||
|
||||
fn shutdown<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
|
||||
|
@ -75,14 +77,15 @@ where
|
|||
#[inline]
|
||||
fn ready<'a>(
|
||||
&'a self,
|
||||
idx: usize,
|
||||
waiters: &'a Rc<WaitersRef>,
|
||||
idx: u32,
|
||||
waiters: &'a WaitersRef,
|
||||
) -> BoxFuture<'a, (), Self::Error> {
|
||||
Box::pin(async move {
|
||||
ServiceCtx::<'a, S>::from_ref(idx, waiters)
|
||||
.ready(self)
|
||||
.await
|
||||
})
|
||||
Box::pin(async move { ServiceCtx::<'a, S>::new(idx, waiters).ready(self).await })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn not_ready(&self) -> BoxFuture<'_, (), Self::Error> {
|
||||
Box::pin(crate::Service::not_ready(self))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -94,11 +97,11 @@ where
|
|||
fn call<'a>(
|
||||
&'a self,
|
||||
req: Req,
|
||||
idx: usize,
|
||||
waiters: &'a Rc<WaitersRef>,
|
||||
idx: u32,
|
||||
waiters: &'a WaitersRef,
|
||||
) -> BoxFuture<'a, Self::Response, Self::Error> {
|
||||
Box::pin(async move {
|
||||
ServiceCtx::<'a, S>::from_ref(idx, waiters)
|
||||
ServiceCtx::<'a, S>::new(idx, waiters)
|
||||
.call_nowait(self, req)
|
||||
.await
|
||||
})
|
||||
|
@ -155,6 +158,11 @@ where
|
|||
self.0.ready(idx, waiters).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
||||
self.0.not_ready().await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
self.0.shutdown().await
|
||||
|
|
|
@ -1,71 +1,90 @@
|
|||
use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task, task::Context};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc};
|
||||
|
||||
use crate::Service;
|
||||
|
||||
pub struct ServiceCtx<'a, S: ?Sized> {
|
||||
idx: usize,
|
||||
waiters: &'a Rc<WaitersRef>,
|
||||
idx: u32,
|
||||
waiters: &'a WaitersRef,
|
||||
_t: marker::PhantomData<Rc<S>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// Pipeline tag allows to notify pipeline binding
|
||||
pub struct PipelineTag(Rc<WaitersRef>);
|
||||
|
||||
pub(crate) struct Waiters {
|
||||
index: usize,
|
||||
waiters: Rc<WaitersRef>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WaitersRef {
|
||||
cur: cell::Cell<usize>,
|
||||
indexes: cell::UnsafeCell<slab::Slab<Option<task::Waker>>>,
|
||||
}
|
||||
|
||||
impl PipelineTag {
|
||||
/// Notify pipeline dispatcher
|
||||
pub fn notify(&self) {
|
||||
if let Some(waker) = self.0.get()[0].take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
cur: cell::Cell<u32>,
|
||||
wakers: cell::UnsafeCell<Vec<u32>>,
|
||||
indexes: cell::UnsafeCell<slab::Slab<Option<Waker>>>,
|
||||
}
|
||||
|
||||
impl WaitersRef {
|
||||
pub(crate) fn new() -> (u32, Self) {
|
||||
let mut waiters = slab::Slab::new();
|
||||
|
||||
// first insert for wake ups from services
|
||||
let _ = waiters.insert(None);
|
||||
|
||||
(
|
||||
waiters.insert(Default::default()) as u32,
|
||||
WaitersRef {
|
||||
cur: cell::Cell::new(u32::MAX),
|
||||
indexes: cell::UnsafeCell::new(waiters),
|
||||
wakers: cell::UnsafeCell::new(Vec::default()),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
fn get(&self) -> &mut slab::Slab<Option<task::Waker>> {
|
||||
pub(crate) fn get(&self) -> &mut slab::Slab<Option<Waker>> {
|
||||
unsafe { &mut *self.indexes.get() }
|
||||
}
|
||||
|
||||
fn insert(&self) -> usize {
|
||||
self.get().insert(None)
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub(crate) fn get_wakers(&self) -> &mut Vec<u32> {
|
||||
unsafe { &mut *self.wakers.get() }
|
||||
}
|
||||
|
||||
fn remove(&self, idx: usize) {
|
||||
self.notify();
|
||||
self.get().remove(idx);
|
||||
pub(crate) fn insert(&self) -> u32 {
|
||||
self.get().insert(None) as u32
|
||||
}
|
||||
|
||||
fn register(&self, idx: usize, cx: &mut Context<'_>) {
|
||||
self.get()[idx] = Some(cx.waker().clone());
|
||||
pub(crate) fn remove(&self, idx: u32) {
|
||||
self.get().remove(idx as usize);
|
||||
|
||||
if self.cur.get() == idx {
|
||||
self.notify();
|
||||
}
|
||||
}
|
||||
|
||||
fn notify(&self) {
|
||||
for (_, waker) in self.get().iter_mut().skip(1) {
|
||||
if let Some(waker) = waker.take() {
|
||||
waker.wake();
|
||||
pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) {
|
||||
self.get()[idx as usize] = Some(cx.waker().clone());
|
||||
self.get_wakers().push(idx);
|
||||
}
|
||||
|
||||
pub(crate) fn register_unready(&self, cx: &mut Context<'_>) {
|
||||
self.get()[0] = Some(cx.waker().clone());
|
||||
self.get_wakers().push(0);
|
||||
}
|
||||
|
||||
pub(crate) fn notify(&self) {
|
||||
let indexes = self.get();
|
||||
let wakers = self.get_wakers();
|
||||
|
||||
for idx in wakers.drain(..) {
|
||||
if let Some(item) = indexes.get_mut(idx as usize) {
|
||||
if let Some(waker) = item.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.cur.set(usize::MAX);
|
||||
self.cur.set(u32::MAX);
|
||||
}
|
||||
|
||||
pub(crate) fn can_check(&self, idx: usize, cx: &mut Context<'_>) -> bool {
|
||||
pub(crate) fn can_check(&self, idx: u32, cx: &mut Context<'_>) -> bool {
|
||||
let cur = self.cur.get();
|
||||
if cur == idx {
|
||||
true
|
||||
} else if cur == usize::MAX {
|
||||
} else if cur == u32::MAX {
|
||||
self.cur.set(idx);
|
||||
true
|
||||
} else {
|
||||
|
@ -75,81 +94,8 @@ impl WaitersRef {
|
|||
}
|
||||
}
|
||||
|
||||
impl Waiters {
|
||||
pub(crate) fn new() -> Self {
|
||||
let mut waiters = slab::Slab::new();
|
||||
|
||||
// first insert for wake ups from services
|
||||
let _ = waiters.insert(None);
|
||||
|
||||
Waiters {
|
||||
index: waiters.insert(None),
|
||||
waiters: Rc::new(WaitersRef {
|
||||
cur: cell::Cell::new(usize::MAX),
|
||||
indexes: cell::UnsafeCell::new(waiters),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_ref(&self) -> &Rc<WaitersRef> {
|
||||
&self.waiters
|
||||
}
|
||||
|
||||
pub(crate) fn can_check(&self, cx: &mut Context<'_>) -> bool {
|
||||
self.waiters.can_check(self.index, cx)
|
||||
}
|
||||
|
||||
pub(crate) fn register(&self, cx: &mut Context<'_>) {
|
||||
self.waiters.register(self.index, cx);
|
||||
}
|
||||
|
||||
pub(crate) fn register_pipeline(&self, cx: &mut Context<'_>) {
|
||||
self.waiters.register(0, cx);
|
||||
}
|
||||
|
||||
pub(crate) fn notify(&self) {
|
||||
if self.waiters.cur.get() == self.index {
|
||||
self.waiters.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Waiters {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
self.waiters.remove(self.index);
|
||||
self.notify();
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Waiters {
|
||||
fn clone(&self) -> Self {
|
||||
Waiters {
|
||||
index: self.waiters.insert(),
|
||||
waiters: self.waiters.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Waiters {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Waiters")
|
||||
.field("index", &self.index)
|
||||
.field("waiters", &self.waiters.get().len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S> ServiceCtx<'a, S> {
|
||||
pub(crate) fn new(waiters: &'a Waiters) -> Self {
|
||||
Self {
|
||||
idx: waiters.index,
|
||||
waiters: waiters.get_ref(),
|
||||
_t: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_ref(idx: usize, waiters: &'a Rc<WaitersRef>) -> Self {
|
||||
pub(crate) fn new(idx: u32, waiters: &'a WaitersRef) -> Self {
|
||||
Self {
|
||||
idx,
|
||||
waiters,
|
||||
|
@ -157,7 +103,7 @@ impl<'a, S> ServiceCtx<'a, S> {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn inner(self) -> (usize, &'a Rc<WaitersRef>) {
|
||||
pub(crate) fn inner(self) -> (u32, &'a WaitersRef) {
|
||||
(self.idx, self.waiters)
|
||||
}
|
||||
|
||||
|
@ -220,23 +166,18 @@ impl<'a, S> ServiceCtx<'a, S> {
|
|||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get pipeline tag for current pipeline
|
||||
pub fn tag(&self) -> PipelineTag {
|
||||
PipelineTag(self.waiters.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S> Copy for ServiceCtx<'a, S> {}
|
||||
impl<S> Copy for ServiceCtx<'_, S> {}
|
||||
|
||||
impl<'a, S> Clone for ServiceCtx<'a, S> {
|
||||
impl<S> Clone for ServiceCtx<'_, S> {
|
||||
#[inline]
|
||||
fn clone(&self) -> Self {
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S> fmt::Debug for ServiceCtx<'a, S> {
|
||||
impl<S> fmt::Debug for ServiceCtx<'_, S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("ServiceCtx")
|
||||
.field("idx", &self.idx)
|
||||
|
@ -251,7 +192,7 @@ struct ReadyCall<'a, S: ?Sized, F: Future> {
|
|||
ctx: ServiceCtx<'a, S>,
|
||||
}
|
||||
|
||||
impl<'a, S: ?Sized, F: Future> Drop for ReadyCall<'a, S, F> {
|
||||
impl<S: ?Sized, F: Future> Drop for ReadyCall<'_, S, F> {
|
||||
fn drop(&mut self) {
|
||||
if !self.completed && self.ctx.waiters.cur.get() == self.ctx.idx {
|
||||
self.ctx.waiters.notify();
|
||||
|
@ -259,35 +200,35 @@ impl<'a, S: ?Sized, F: Future> Drop for ReadyCall<'a, S, F> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a, S: ?Sized, F: Future> Unpin for ReadyCall<'a, S, F> {}
|
||||
impl<S: ?Sized, F: Future> Unpin for ReadyCall<'_, S, F> {}
|
||||
|
||||
impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> {
|
||||
impl<S: ?Sized, F: Future> Future for ReadyCall<'_, S, F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task::Poll<Self::Output> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
if self.ctx.waiters.can_check(self.ctx.idx, cx) {
|
||||
// SAFETY: `fut` never moves
|
||||
let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) };
|
||||
match result {
|
||||
task::Poll::Pending => {
|
||||
Poll::Pending => {
|
||||
self.ctx.waiters.register(self.ctx.idx, cx);
|
||||
task::Poll::Pending
|
||||
Poll::Pending
|
||||
}
|
||||
task::Poll::Ready(res) => {
|
||||
Poll::Ready(res) => {
|
||||
self.completed = true;
|
||||
self.ctx.waiters.notify();
|
||||
task::Poll::Ready(res)
|
||||
Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
task::Poll::Pending
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{cell::Cell, cell::RefCell, future::poll_fn, task::Poll};
|
||||
use std::{cell::Cell, cell::RefCell, future::poll_fn};
|
||||
|
||||
use ntex_util::channel::{condition, oneshot};
|
||||
use ntex_util::{future::lazy, future::select, spawn, time};
|
||||
|
@ -453,50 +394,4 @@ mod tests {
|
|||
assert_eq!(cnt.get(), 2);
|
||||
assert_eq!(&*data.borrow(), &["srv1", "srv2"]);
|
||||
}
|
||||
|
||||
#[ntex::test]
|
||||
async fn test_pipeline_tag() {
|
||||
struct Srv(Rc<Cell<usize>>, Cell<Option<PipelineTag>>);
|
||||
|
||||
impl Service<&'static str> for Srv {
|
||||
type Response = &'static str;
|
||||
type Error = ();
|
||||
|
||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
self.1.set(Some(ctx.tag()));
|
||||
self.0.set(self.0.get() + 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn call(
|
||||
&self,
|
||||
req: &'static str,
|
||||
_: ServiceCtx<'_, Self>,
|
||||
) -> Result<&'static str, ()> {
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let con = condition::Condition::new();
|
||||
|
||||
let srv = Pipeline::from(Srv(cnt.clone(), Cell::new(None))).bind();
|
||||
|
||||
let srv1 = srv.clone();
|
||||
let waiter = con.wait();
|
||||
ntex::rt::spawn(async move {
|
||||
let _ = poll_fn(|cx| {
|
||||
let _ = srv1.poll_ready(cx);
|
||||
waiter.poll_ready(cx)
|
||||
})
|
||||
.await;
|
||||
});
|
||||
time::sleep(time::Millis(50)).await;
|
||||
assert_eq!(cnt.get(), 1);
|
||||
|
||||
let tag = srv.get_ref().1.take().unwrap();
|
||||
tag.notify();
|
||||
time::sleep(time::Millis(50)).await;
|
||||
assert_eq!(cnt.get(), 2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ mod util;
|
|||
|
||||
pub use self::apply::{apply_fn, apply_fn_factory};
|
||||
pub use self::chain::{chain, chain_factory};
|
||||
pub use self::ctx::{PipelineTag, ServiceCtx};
|
||||
pub use self::ctx::ServiceCtx;
|
||||
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
|
||||
pub use self::fn_shutdown::fn_shutdown;
|
||||
pub use self::map_config::{map_config, unit_config};
|
||||
|
@ -118,6 +118,12 @@ pub trait Service<Req> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns when the service is not able to process requests.
|
||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
||||
std::future::pending().await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Shutdown service.
|
||||
///
|
||||
|
@ -246,6 +252,11 @@ where
|
|||
ctx.ready(&**self).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), S::Error> {
|
||||
(**self).not_ready().await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
(**self).shutdown().await
|
||||
|
@ -273,6 +284,11 @@ where
|
|||
ctx.ready(&**self).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), S::Error> {
|
||||
(**self).not_ready().await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
(**self).shutdown().await
|
||||
|
|
|
@ -11,6 +11,14 @@ macro_rules! forward_ready {
|
|||
.await
|
||||
.map_err(::core::convert::Into::into)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
||||
self.$field
|
||||
.not_ready()
|
||||
.await
|
||||
.map_err(::core::convert::Into::into)
|
||||
}
|
||||
};
|
||||
($field:ident, $err:expr) => {
|
||||
#[inline]
|
||||
|
@ -20,6 +28,11 @@ macro_rules! forward_ready {
|
|||
) -> Result<(), Self::Error> {
|
||||
ctx.ready(&self.$field).await.map_err($err)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
||||
self.$field.not_ready().await.map_err($err)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,11 @@ where
|
|||
ctx.ready(&self.service).await.map_err(&self.f)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
||||
self.service.not_ready().await.map_err(&self.f)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn call(
|
||||
&self,
|
||||
|
|
|
@ -1,30 +1,42 @@
|
|||
use std::{cell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
|
||||
|
||||
use crate::{ctx::Waiters, Service, ServiceCtx};
|
||||
use crate::{ctx::WaitersRef, Service, ServiceCtx};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Container for a service.
|
||||
///
|
||||
/// Container allows to call enclosed service and adds support of shared readiness.
|
||||
pub struct Pipeline<S> {
|
||||
svc: Rc<S>,
|
||||
pub(crate) waiters: Waiters,
|
||||
index: u32,
|
||||
state: Rc<PipelineState<S>>,
|
||||
}
|
||||
|
||||
struct PipelineState<S> {
|
||||
svc: S,
|
||||
waiters: WaitersRef,
|
||||
}
|
||||
|
||||
impl<S> PipelineState<S> {
|
||||
pub(crate) fn waiters_ref(&self) -> &WaitersRef {
|
||||
&self.waiters
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Pipeline<S> {
|
||||
#[inline]
|
||||
/// Construct new container instance.
|
||||
pub fn new(svc: S) -> Self {
|
||||
let (index, waiters) = WaitersRef::new();
|
||||
Pipeline {
|
||||
svc: Rc::new(svc),
|
||||
waiters: Waiters::new(),
|
||||
index,
|
||||
state: Rc::new(PipelineState { svc, waiters }),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Return reference to enclosed service
|
||||
pub fn get_ref(&self) -> &S {
|
||||
self.svc.as_ref()
|
||||
&self.state.svc
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -33,8 +45,8 @@ impl<S> Pipeline<S> {
|
|||
where
|
||||
S: Service<R>,
|
||||
{
|
||||
ServiceCtx::<'_, S>::new(&self.waiters)
|
||||
.ready(self.svc.as_ref())
|
||||
ServiceCtx::<'_, S>::new(self.index, self.state.waiters_ref())
|
||||
.ready(&self.state.svc)
|
||||
.await
|
||||
}
|
||||
|
||||
|
@ -45,13 +57,9 @@ impl<S> Pipeline<S> {
|
|||
where
|
||||
S: Service<R>,
|
||||
{
|
||||
let ctx = ServiceCtx::<'_, S>::new(&self.waiters);
|
||||
|
||||
// check service readiness
|
||||
self.svc.as_ref().ready(ctx).await?;
|
||||
|
||||
// call service
|
||||
self.svc.as_ref().call(req, ctx).await
|
||||
ServiceCtx::<'_, S>::new(self.index, self.state.waiters_ref())
|
||||
.call(&self.state.svc, req)
|
||||
.await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -66,8 +74,8 @@ impl<S> Pipeline<S> {
|
|||
|
||||
PipelineCall {
|
||||
fut: Box::pin(async move {
|
||||
ServiceCtx::<S>::new(&pl.waiters)
|
||||
.call(pl.svc.as_ref(), req)
|
||||
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
|
||||
.call(&pl.state.svc, req)
|
||||
.await
|
||||
}),
|
||||
}
|
||||
|
@ -86,8 +94,8 @@ impl<S> Pipeline<S> {
|
|||
|
||||
PipelineCall {
|
||||
fut: Box::pin(async move {
|
||||
ServiceCtx::<S>::new(&pl.waiters)
|
||||
.call_nowait(pl.svc.as_ref(), req)
|
||||
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
|
||||
.call_nowait(&pl.state.svc, req)
|
||||
.await
|
||||
}),
|
||||
}
|
||||
|
@ -99,11 +107,11 @@ impl<S> Pipeline<S> {
|
|||
where
|
||||
S: Service<R>,
|
||||
{
|
||||
self.svc.as_ref().shutdown().await
|
||||
self.state.svc.shutdown().await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Convert to lifetime object.
|
||||
/// Get current pipeline.
|
||||
pub fn bind<R>(self) -> PipelineBinding<S, R>
|
||||
where
|
||||
S: Service<R> + 'static,
|
||||
|
@ -121,15 +129,30 @@ impl<S> From<S> for Pipeline<S> {
|
|||
}
|
||||
|
||||
impl<S> Clone for Pipeline<S> {
|
||||
#[inline]
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
svc: self.svc.clone(),
|
||||
waiters: self.waiters.clone(),
|
||||
Pipeline {
|
||||
index: self.state.waiters.insert(),
|
||||
state: self.state.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Drop for Pipeline<S> {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
self.state.waiters.remove(self.index);
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: fmt::Debug> fmt::Debug for PipelineState<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("PipelineState")
|
||||
.field("svc", &self.svc)
|
||||
.field("waiters", &self.waiters.get().len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Bound container for a service.
|
||||
pub struct PipelineBinding<S, R>
|
||||
where
|
||||
|
@ -137,6 +160,7 @@ where
|
|||
{
|
||||
pl: Pipeline<S>,
|
||||
st: cell::UnsafeCell<State<S::Error>>,
|
||||
not_ready: cell::UnsafeCell<StateNotReady<S::Error>>,
|
||||
}
|
||||
|
||||
enum State<E> {
|
||||
|
@ -145,6 +169,11 @@ enum State<E> {
|
|||
Shutdown(Pin<Box<dyn Future<Output = ()> + 'static>>),
|
||||
}
|
||||
|
||||
enum StateNotReady<E> {
|
||||
New,
|
||||
Readiness(Pin<Box<dyn Future<Output = Result<(), E>> + 'static>>),
|
||||
}
|
||||
|
||||
impl<S, R> PipelineBinding<S, R>
|
||||
where
|
||||
S: Service<R> + 'static,
|
||||
|
@ -154,13 +183,14 @@ where
|
|||
PipelineBinding {
|
||||
pl,
|
||||
st: cell::UnsafeCell::new(State::New),
|
||||
not_ready: cell::UnsafeCell::new(StateNotReady::New),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Return reference to enclosed service
|
||||
pub fn get_ref(&self) -> &S {
|
||||
self.pl.svc.as_ref()
|
||||
&self.pl.state.svc
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -189,6 +219,29 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns when the pipeline is not able to process requests.
|
||||
pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
||||
let st = unsafe { &mut *self.not_ready.get() };
|
||||
|
||||
match st {
|
||||
StateNotReady::New => {
|
||||
// SAFETY: `fut` has same lifetime same as lifetime of `self.pl`.
|
||||
// Pipeline::svc is heap allocated(Rc<S>), and it is being kept alive until
|
||||
// `self` is alive
|
||||
let pl: &'static Pipeline<S> = unsafe { std::mem::transmute(&self.pl) };
|
||||
let fut = Box::pin(CheckUnReadiness {
|
||||
fut: None,
|
||||
f: not_ready,
|
||||
pl,
|
||||
});
|
||||
*st = StateNotReady::Readiness(fut);
|
||||
self.poll_not_ready(cx)
|
||||
}
|
||||
StateNotReady::Readiness(ref mut fut) => Pin::new(fut).poll(cx),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns `Ready` when the service is properly shutdowns.
|
||||
pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
|
@ -215,8 +268,8 @@ where
|
|||
|
||||
PipelineCall {
|
||||
fut: Box::pin(async move {
|
||||
ServiceCtx::<S>::new(&pl.waiters)
|
||||
.call(pl.svc.as_ref(), req)
|
||||
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
|
||||
.call(&pl.state.svc, req)
|
||||
.await
|
||||
}),
|
||||
}
|
||||
|
@ -231,8 +284,8 @@ where
|
|||
|
||||
PipelineCall {
|
||||
fut: Box::pin(async move {
|
||||
ServiceCtx::<S>::new(&pl.waiters)
|
||||
.call_nowait(pl.svc.as_ref(), req)
|
||||
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
|
||||
.call_nowait(&pl.state.svc, req)
|
||||
.await
|
||||
}),
|
||||
}
|
||||
|
@ -241,7 +294,7 @@ where
|
|||
#[inline]
|
||||
/// Shutdown enclosed service.
|
||||
pub async fn shutdown(&self) {
|
||||
self.pl.svc.as_ref().shutdown().await
|
||||
self.pl.state.svc.shutdown().await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -263,6 +316,7 @@ where
|
|||
Self {
|
||||
pl: self.pl.clone(),
|
||||
st: cell::UnsafeCell::new(State::New),
|
||||
not_ready: cell::UnsafeCell::new(StateNotReady::New),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -316,7 +370,17 @@ where
|
|||
S: Service<R>,
|
||||
R: 'static,
|
||||
{
|
||||
pl.svc.ready(ServiceCtx::<'_, S>::new(&pl.waiters))
|
||||
pl.state
|
||||
.svc
|
||||
.ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref()))
|
||||
}
|
||||
|
||||
fn not_ready<S, R>(pl: &'static Pipeline<S>) -> impl Future<Output = Result<(), S::Error>>
|
||||
where
|
||||
S: Service<R>,
|
||||
R: 'static,
|
||||
{
|
||||
pl.state.svc.not_ready()
|
||||
}
|
||||
|
||||
struct CheckReadiness<S: 'static, F, Fut> {
|
||||
|
@ -331,7 +395,7 @@ impl<S, F, Fut> Drop for CheckReadiness<S, F, Fut> {
|
|||
fn drop(&mut self) {
|
||||
// future fot dropped during polling, we must notify other waiters
|
||||
if self.fut.is_some() {
|
||||
self.pl.waiters.notify();
|
||||
self.pl.state.waiters.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -346,19 +410,16 @@ where
|
|||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||
let mut slf = self.as_mut();
|
||||
|
||||
// register pipeline tag
|
||||
slf.pl.waiters.register_pipeline(cx);
|
||||
|
||||
if slf.pl.waiters.can_check(cx) {
|
||||
if slf.pl.state.waiters.can_check(slf.pl.index, cx) {
|
||||
if let Some(ref mut fut) = slf.fut {
|
||||
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
|
||||
Poll::Pending => {
|
||||
slf.pl.waiters.register(cx);
|
||||
slf.pl.state.waiters.register(slf.pl.index, cx);
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Ready(res) => {
|
||||
let _ = slf.fut.take();
|
||||
slf.pl.waiters.notify();
|
||||
slf.pl.state.waiters.notify();
|
||||
Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
|
@ -371,3 +432,40 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct CheckUnReadiness<S: 'static, F, Fut> {
|
||||
f: F,
|
||||
fut: Option<Fut>,
|
||||
pl: &'static Pipeline<S>,
|
||||
}
|
||||
|
||||
impl<S, F, Fut> Unpin for CheckUnReadiness<S, F, Fut> {}
|
||||
|
||||
impl<T, S, F, Fut> Future for CheckUnReadiness<S, F, Fut>
|
||||
where
|
||||
F: Fn(&'static Pipeline<S>) -> Fut,
|
||||
Fut: Future<Output = T>,
|
||||
{
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||
let mut slf = self.as_mut();
|
||||
|
||||
if let Some(ref mut fut) = slf.fut {
|
||||
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
|
||||
Poll::Pending => {
|
||||
slf.pl.state.waiters.register_unready(cx);
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Ready(res) => {
|
||||
let _ = slf.fut.take();
|
||||
slf.pl.state.waiters.notify();
|
||||
Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
slf.fut = Some((slf.f)(slf.pl));
|
||||
self.poll(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,11 @@ where
|
|||
util::ready(&self.svc1, &self.svc2, ctx).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
||||
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
util::shutdown(&self.svc1, &self.svc2).await
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{future::poll_fn, future::Future, pin, task::Poll};
|
||||
use std::{future::poll_fn, future::Future, pin, pin::Pin, task::Poll};
|
||||
|
||||
use crate::{Service, ServiceCtx};
|
||||
|
||||
|
@ -14,10 +14,10 @@ where
|
|||
let mut ready2 = false;
|
||||
|
||||
poll_fn(move |cx| {
|
||||
if !ready1 && pin::Pin::new(&mut fut1).poll(cx).is_ready() {
|
||||
if !ready1 && Pin::new(&mut fut1).poll(cx).is_ready() {
|
||||
ready1 = true;
|
||||
}
|
||||
if !ready2 && pin::Pin::new(&mut fut2).poll(cx).is_ready() {
|
||||
if !ready2 && Pin::new(&mut fut2).poll(cx).is_ready() {
|
||||
ready2 = true
|
||||
}
|
||||
if ready1 && ready2 {
|
||||
|
@ -45,10 +45,10 @@ where
|
|||
let mut ready2 = false;
|
||||
|
||||
poll_fn(move |cx| {
|
||||
if !ready1 && pin::Pin::new(&mut fut1).poll(cx)?.is_ready() {
|
||||
if !ready1 && Pin::new(&mut fut1).poll(cx)?.is_ready() {
|
||||
ready1 = true;
|
||||
}
|
||||
if !ready2 && pin::Pin::new(&mut fut2).poll(cx)?.is_ready() {
|
||||
if !ready2 && Pin::new(&mut fut2).poll(cx)?.is_ready() {
|
||||
ready2 = true;
|
||||
}
|
||||
if ready1 && ready2 {
|
||||
|
@ -59,3 +59,24 @@ where
|
|||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Waits for either one of two differently-typed futures to complete.
|
||||
pub(crate) async fn select<A, B>(fut1: A, fut2: B) -> A::Output
|
||||
where
|
||||
A: Future,
|
||||
B: Future<Output = A::Output>,
|
||||
{
|
||||
let mut fut1 = pin::pin!(fut1);
|
||||
let mut fut2 = pin::pin!(fut2);
|
||||
|
||||
poll_fn(|cx| {
|
||||
if let Poll::Ready(item) = Pin::new(&mut fut1).poll(cx) {
|
||||
return Poll::Ready(item);
|
||||
}
|
||||
if let Poll::Ready(item) = Pin::new(&mut fut2).poll(cx) {
|
||||
return Poll::Ready(item);
|
||||
}
|
||||
Poll::Pending
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue