Added Service::not_ready() method

This commit is contained in:
Nikolay Kim 2024-11-02 11:33:40 +05:00
parent 9ecf7c7e6c
commit 415d4658dd
9 changed files with 164 additions and 23 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [3.3.0] - 2024-11-02
* Added Service::not_ready() method
## [3.2.1] - 2024-10-31
* Fix shared readiness notification

View file

@ -30,6 +30,11 @@ where
util::ready(&self.svc1, &self.svc2, ctx).await
}
#[inline]
async fn not_ready(&self) -> Result<(), Self::Error> {
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
}
#[inline]
async fn shutdown(&self) {
util::shutdown(&self.svc1, &self.svc2).await

View file

@ -104,6 +104,11 @@ where
self.service.ready().await.map_err(From::from)
}
#[inline]
async fn not_ready(&self) -> Result<(), Err> {
self.service.get_ref().not_ready().await.map_err(From::from)
}
#[inline]
async fn shutdown(&self) {
self.service.shutdown().await

View file

@ -1,4 +1,5 @@
use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task, task::Context};
use std::task::{Context, Poll, Waker};
use std::{cell, collections::VecDeque, fmt, future::Future, marker, pin::Pin, rc::Rc};
use crate::Service;
@ -11,28 +12,37 @@ pub struct ServiceCtx<'a, S: ?Sized> {
#[derive(Debug)]
pub(crate) struct WaitersRef {
cur: cell::Cell<u32>,
indexes: cell::UnsafeCell<slab::Slab<Option<task::Waker>>>,
wakers: cell::UnsafeCell<VecDeque<u32>>,
indexes: cell::UnsafeCell<slab::Slab<Option<Waker>>>,
}
impl WaitersRef {
pub(crate) fn new() -> (u32, Self) {
let mut waiters = slab::Slab::new();
let index = waiters.insert(Default::default()) as u32;
// first insert for wake ups from services
let _ = waiters.insert(None);
(
index,
waiters.insert(Default::default()) as u32,
WaitersRef {
cur: cell::Cell::new(u32::MAX),
indexes: cell::UnsafeCell::new(waiters),
wakers: cell::UnsafeCell::new(VecDeque::default()),
},
)
}
#[allow(clippy::mut_from_ref)]
pub(crate) fn get(&self) -> &mut slab::Slab<Option<task::Waker>> {
pub(crate) fn get(&self) -> &mut slab::Slab<Option<Waker>> {
unsafe { &mut *self.indexes.get() }
}
#[allow(clippy::mut_from_ref)]
pub(crate) fn get_wakers(&self) -> &mut VecDeque<u32> {
unsafe { &mut *self.wakers.get() }
}
pub(crate) fn insert(&self) -> u32 {
self.get().insert(None) as u32
}
@ -47,12 +57,23 @@ impl WaitersRef {
pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) {
self.get()[idx as usize] = Some(cx.waker().clone());
self.get_wakers().push_back(idx);
}
pub(crate) fn register_unready(&self, cx: &mut Context<'_>) {
self.get()[0] = Some(cx.waker().clone());
self.get_wakers().push_back(0);
}
pub(crate) fn notify(&self) {
for (_, waker) in self.get().iter_mut().skip(1) {
if let Some(waker) = waker.take() {
waker.wake();
let indexes = self.get();
let wakers = self.get_wakers();
for idx in wakers.drain(..) {
if let Some(item) = indexes.get_mut(idx as usize) {
if let Some(waker) = item.take() {
waker.wake();
}
}
}
@ -184,30 +205,30 @@ impl<'a, S: ?Sized, F: Future> Unpin for ReadyCall<'a, S, F> {}
impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.ctx.waiters.can_check(self.ctx.idx, cx) {
// SAFETY: `fut` never moves
let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) };
match result {
task::Poll::Pending => {
Poll::Pending => {
self.ctx.waiters.register(self.ctx.idx, cx);
task::Poll::Pending
Poll::Pending
}
task::Poll::Ready(res) => {
Poll::Ready(res) => {
self.completed = true;
self.ctx.waiters.notify();
task::Poll::Ready(res)
Poll::Ready(res)
}
}
} else {
task::Poll::Pending
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use std::{cell::Cell, cell::RefCell, future::poll_fn, task::Poll};
use std::{cell::Cell, cell::RefCell, future::poll_fn};
use ntex_util::channel::{condition, oneshot};
use ntex_util::{future::lazy, future::select, spawn, time};

View file

@ -118,6 +118,12 @@ pub trait Service<Req> {
Ok(())
}
#[inline]
/// Returns when the service is not able to process requests.
async fn not_ready(&self) -> Result<(), Self::Error> {
std::future::pending().await
}
#[inline]
/// Shutdown service.
///

View file

@ -67,6 +67,11 @@ where
ctx.ready(&self.service).await.map_err(&self.f)
}
#[inline]
async fn not_ready(&self) -> Result<(), Self::Error> {
self.service.not_ready().await.map_err(&self.f)
}
#[inline]
async fn call(
&self,

View file

@ -159,6 +159,7 @@ where
{
pl: Pipeline<S>,
st: cell::UnsafeCell<State<S::Error>>,
not_ready: cell::UnsafeCell<State<S::Error>>,
}
enum State<E> {
@ -176,6 +177,7 @@ where
PipelineBinding {
pl,
st: cell::UnsafeCell::new(State::New),
not_ready: cell::UnsafeCell::new(State::New),
}
}
@ -211,6 +213,30 @@ where
}
}
#[inline]
/// Returns when the pipeline is not able to process requests.
pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let st = unsafe { &mut *self.not_ready.get() };
match st {
State::New => {
// SAFETY: `fut` has same lifetime same as lifetime of `self.pl`.
// Pipeline::svc is heap allocated(Rc<S>), and it is being kept alive until
// `self` is alive
let pl: &'static Pipeline<S> = unsafe { std::mem::transmute(&self.pl) };
let fut = Box::pin(CheckUnReadiness {
fut: None,
f: not_ready,
pl,
});
*st = State::Readiness(fut);
self.poll_not_ready(cx)
}
State::Readiness(ref mut fut) => Pin::new(fut).poll(cx),
State::Shutdown(_) => panic!("Pipeline is shutding down"),
}
}
#[inline]
/// Returns `Ready` when the service is properly shutdowns.
pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
@ -285,6 +311,7 @@ where
Self {
pl: self.pl.clone(),
st: cell::UnsafeCell::new(State::New),
not_ready: cell::UnsafeCell::new(State::New),
}
}
}
@ -343,6 +370,14 @@ where
.ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref()))
}
fn not_ready<S, R>(pl: &'static Pipeline<S>) -> impl Future<Output = Result<(), S::Error>>
where
S: Service<R>,
R: 'static,
{
pl.state.svc.not_ready()
}
struct CheckReadiness<S: 'static, F, Fut> {
f: F,
fut: Option<Fut>,
@ -370,9 +405,6 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut slf = self.as_mut();
// register pipeline tag
// slf.pl.state.waiters.register_pipeline(cx);
if slf.pl.state.waiters.can_check(slf.pl.index, cx) {
if let Some(ref mut fut) = slf.fut {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
@ -395,3 +427,40 @@ where
}
}
}
struct CheckUnReadiness<S: 'static, F, Fut> {
f: F,
fut: Option<Fut>,
pl: &'static Pipeline<S>,
}
impl<S, F, Fut> Unpin for CheckUnReadiness<S, F, Fut> {}
impl<T, S, F, Fut> Future for CheckUnReadiness<S, F, Fut>
where
F: Fn(&'static Pipeline<S>) -> Fut,
Fut: Future<Output = T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut slf = self.as_mut();
if let Some(ref mut fut) = slf.fut {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => {
slf.pl.state.waiters.register_unready(cx);
Poll::Pending
}
Poll::Ready(res) => {
let _ = slf.fut.take();
slf.pl.state.waiters.notify();
Poll::Ready(res)
}
}
} else {
slf.fut = Some((slf.f)(slf.pl));
self.poll(cx)
}
}
}

View file

@ -30,6 +30,11 @@ where
util::ready(&self.svc1, &self.svc2, ctx).await
}
#[inline]
async fn not_ready(&self) -> Result<(), Self::Error> {
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
}
#[inline]
async fn shutdown(&self) {
util::shutdown(&self.svc1, &self.svc2).await

View file

@ -1,4 +1,4 @@
use std::{future::poll_fn, future::Future, pin, task::Poll};
use std::{future::poll_fn, future::Future, pin, pin::Pin, task::Poll};
use crate::{Service, ServiceCtx};
@ -14,10 +14,10 @@ where
let mut ready2 = false;
poll_fn(move |cx| {
if !ready1 && pin::Pin::new(&mut fut1).poll(cx).is_ready() {
if !ready1 && Pin::new(&mut fut1).poll(cx).is_ready() {
ready1 = true;
}
if !ready2 && pin::Pin::new(&mut fut2).poll(cx).is_ready() {
if !ready2 && Pin::new(&mut fut2).poll(cx).is_ready() {
ready2 = true
}
if ready1 && ready2 {
@ -45,10 +45,10 @@ where
let mut ready2 = false;
poll_fn(move |cx| {
if !ready1 && pin::Pin::new(&mut fut1).poll(cx)?.is_ready() {
if !ready1 && Pin::new(&mut fut1).poll(cx)?.is_ready() {
ready1 = true;
}
if !ready2 && pin::Pin::new(&mut fut2).poll(cx)?.is_ready() {
if !ready2 && Pin::new(&mut fut2).poll(cx)?.is_ready() {
ready2 = true;
}
if ready1 && ready2 {
@ -59,3 +59,24 @@ where
})
.await
}
/// Waits for either one of two differently-typed futures to complete.
pub(crate) async fn select<A, B>(fut1: A, fut2: B) -> A::Output
where
A: Future,
B: Future<Output = A::Output>,
{
let mut fut1 = pin::pin!(fut1);
let mut fut2 = pin::pin!(fut2);
poll_fn(|cx| {
if let Poll::Ready(item) = Pin::new(&mut fut1).poll(cx) {
return Poll::Ready(item);
}
if let Poll::Ready(item) = Pin::new(&mut fut2).poll(cx) {
return Poll::Ready(item);
}
Poll::Pending
})
.await
}