mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 21:37:58 +03:00
Use updated Service trait
This commit is contained in:
parent
011e9cdfea
commit
bf34d51646
16 changed files with 217 additions and 64 deletions
|
@ -31,7 +31,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
async fn not_ready(&self) {
|
||||||
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
|
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,6 +88,7 @@ where
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use ntex_util::time;
|
||||||
use std::{cell::Cell, rc::Rc};
|
use std::{cell::Cell, rc::Rc};
|
||||||
|
|
||||||
use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
|
use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
|
||||||
|
@ -104,6 +105,11 @@ mod tests {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
self.0.set(self.0.get() + 1);
|
||||||
|
std::future::pending().await
|
||||||
|
}
|
||||||
|
|
||||||
async fn call(
|
async fn call(
|
||||||
&self,
|
&self,
|
||||||
req: &'static str,
|
req: &'static str,
|
||||||
|
@ -129,6 +135,11 @@ mod tests {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
self.0.set(self.0.get() + 1);
|
||||||
|
std::future::pending().await
|
||||||
|
}
|
||||||
|
|
||||||
async fn call(
|
async fn call(
|
||||||
&self,
|
&self,
|
||||||
req: &'static str,
|
req: &'static str,
|
||||||
|
@ -155,6 +166,14 @@ mod tests {
|
||||||
let res = srv.ready().await;
|
let res = srv.ready().await;
|
||||||
assert_eq!(res, Ok(()));
|
assert_eq!(res, Ok(()));
|
||||||
assert_eq!(cnt.get(), 2);
|
assert_eq!(cnt.get(), 2);
|
||||||
|
|
||||||
|
let srv2 = srv.clone();
|
||||||
|
ntex::rt::spawn(async move {
|
||||||
|
let _ = srv2.not_ready().await;
|
||||||
|
});
|
||||||
|
time::sleep(time::Millis(25)).await;
|
||||||
|
assert_eq!(cnt.get(), 4);
|
||||||
|
|
||||||
srv.shutdown().await;
|
srv.shutdown().await;
|
||||||
assert_eq!(cnt_sht.get(), 2);
|
assert_eq!(cnt_sht.get(), 2);
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,8 +105,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), Err> {
|
async fn not_ready(&self) {
|
||||||
self.service.get_ref().not_ready().await.map_err(From::from)
|
self.service.not_ready().await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
|
|
@ -54,7 +54,7 @@ trait ServiceObj<Req> {
|
||||||
waiters: &'a WaitersRef,
|
waiters: &'a WaitersRef,
|
||||||
) -> BoxFuture<'a, (), Self::Error>;
|
) -> BoxFuture<'a, (), Self::Error>;
|
||||||
|
|
||||||
fn not_ready(&self) -> BoxFuture<'_, (), Self::Error>;
|
fn not_ready<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
|
||||||
|
|
||||||
fn call<'a>(
|
fn call<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
|
@ -84,7 +84,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn not_ready(&self) -> BoxFuture<'_, (), Self::Error> {
|
fn not_ready<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
||||||
Box::pin(crate::Service::not_ready(self))
|
Box::pin(crate::Service::not_ready(self))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
async fn not_ready(&self) {
|
||||||
self.0.not_ready().await
|
self.0.not_ready().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,7 +120,7 @@ pub trait Service<Req> {
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Returns when the service is not able to process requests.
|
/// Returns when the service is not able to process requests.
|
||||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
async fn not_ready(&self) {
|
||||||
std::future::pending().await
|
std::future::pending().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +253,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), S::Error> {
|
async fn not_ready(&self) {
|
||||||
(**self).not_ready().await
|
(**self).not_ready().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,7 +285,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), S::Error> {
|
async fn not_ready(&self) {
|
||||||
(**self).not_ready().await
|
(**self).not_ready().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,11 +13,8 @@ macro_rules! forward_ready {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
async fn not_ready(&self) {
|
||||||
self.$field
|
self.$field.not_ready().await
|
||||||
.not_ready()
|
|
||||||
.await
|
|
||||||
.map_err(::core::convert::Into::into)
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
($field:ident, $err:expr) => {
|
($field:ident, $err:expr) => {
|
||||||
|
@ -30,8 +27,19 @@ macro_rules! forward_ready {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
async fn not_ready(&self) {
|
||||||
self.$field.not_ready().await.map_err($err)
|
self.$field.not_ready().await
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An implementation of [`not_ready`] that forwards not_ready call to a field.
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! forward_notready {
|
||||||
|
($field:ident) => {
|
||||||
|
#[inline]
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
self.$field.not_ready().await
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,11 +67,6 @@ where
|
||||||
ctx.ready(&self.service).await.map_err(&self.f)
|
ctx.ready(&self.service).await.map_err(&self.f)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
|
||||||
self.service.not_ready().await.map_err(&self.f)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn call(
|
async fn call(
|
||||||
&self,
|
&self,
|
||||||
|
@ -82,6 +77,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
crate::forward_shutdown!(service);
|
crate::forward_shutdown!(service);
|
||||||
|
crate::forward_notready!(service);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Factory for the `map_err` combinator, changing the type of a new
|
/// Factory for the `map_err` combinator, changing the type of a new
|
||||||
|
|
|
@ -40,7 +40,7 @@ impl<S> Pipeline<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Returns when the service is able to process requests.
|
/// Returns when the pipeline is able to process requests.
|
||||||
pub async fn ready<R>(&self) -> Result<(), S::Error>
|
pub async fn ready<R>(&self) -> Result<(), S::Error>
|
||||||
where
|
where
|
||||||
S: Service<R>,
|
S: Service<R>,
|
||||||
|
@ -50,6 +50,15 @@ impl<S> Pipeline<S> {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
/// Returns when the pipeline is not able to process requests.
|
||||||
|
pub async fn not_ready<R>(&self)
|
||||||
|
where
|
||||||
|
S: Service<R>,
|
||||||
|
{
|
||||||
|
self.state.svc.not_ready().await
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Wait for service readiness and then create future object
|
/// Wait for service readiness and then create future object
|
||||||
/// that resolves to service result.
|
/// that resolves to service result.
|
||||||
|
@ -160,7 +169,7 @@ where
|
||||||
{
|
{
|
||||||
pl: Pipeline<S>,
|
pl: Pipeline<S>,
|
||||||
st: cell::UnsafeCell<State<S::Error>>,
|
st: cell::UnsafeCell<State<S::Error>>,
|
||||||
not_ready: cell::UnsafeCell<StateNotReady<S::Error>>,
|
not_ready: cell::UnsafeCell<StateNotReady>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum State<E> {
|
enum State<E> {
|
||||||
|
@ -169,9 +178,9 @@ enum State<E> {
|
||||||
Shutdown(Pin<Box<dyn Future<Output = ()> + 'static>>),
|
Shutdown(Pin<Box<dyn Future<Output = ()> + 'static>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
enum StateNotReady<E> {
|
enum StateNotReady {
|
||||||
New,
|
New,
|
||||||
Readiness(Pin<Box<dyn Future<Output = Result<(), E>> + 'static>>),
|
Readiness(Pin<Box<dyn Future<Output = ()>>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, R> PipelineBinding<S, R>
|
impl<S, R> PipelineBinding<S, R>
|
||||||
|
@ -221,7 +230,7 @@ where
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Returns when the pipeline is not able to process requests.
|
/// Returns when the pipeline is not able to process requests.
|
||||||
pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||||
let st = unsafe { &mut *self.not_ready.get() };
|
let st = unsafe { &mut *self.not_ready.get() };
|
||||||
|
|
||||||
match st {
|
match st {
|
||||||
|
@ -375,7 +384,7 @@ where
|
||||||
.ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref()))
|
.ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn not_ready<S, R>(pl: &'static Pipeline<S>) -> impl Future<Output = Result<(), S::Error>>
|
fn not_ready<S, R>(pl: &'static Pipeline<S>) -> impl Future<Output = ()>
|
||||||
where
|
where
|
||||||
S: Service<R>,
|
S: Service<R>,
|
||||||
R: 'static,
|
R: 'static,
|
||||||
|
|
|
@ -31,7 +31,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn not_ready(&self) -> Result<(), Self::Error> {
|
async fn not_ready(&self) {
|
||||||
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
|
util::select(self.svc1.not_ready(), self.svc2.not_ready()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,6 +91,7 @@ where
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use ntex_util::time;
|
||||||
use std::{cell::Cell, rc::Rc};
|
use std::{cell::Cell, rc::Rc};
|
||||||
|
|
||||||
use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
|
use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
|
||||||
|
@ -107,6 +108,11 @@ mod tests {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
self.0.set(self.0.get() + 1);
|
||||||
|
std::future::pending().await
|
||||||
|
}
|
||||||
|
|
||||||
async fn call(
|
async fn call(
|
||||||
&self,
|
&self,
|
||||||
req: Result<&'static str, &'static str>,
|
req: Result<&'static str, &'static str>,
|
||||||
|
@ -135,6 +141,11 @@ mod tests {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
self.0.set(self.0.get() + 1);
|
||||||
|
std::future::pending().await
|
||||||
|
}
|
||||||
|
|
||||||
async fn call(
|
async fn call(
|
||||||
&self,
|
&self,
|
||||||
req: Result<&'static str, ()>,
|
req: Result<&'static str, ()>,
|
||||||
|
@ -161,6 +172,14 @@ mod tests {
|
||||||
let res = srv.ready().await;
|
let res = srv.ready().await;
|
||||||
assert_eq!(res, Ok(()));
|
assert_eq!(res, Ok(()));
|
||||||
assert_eq!(cnt.get(), 2);
|
assert_eq!(cnt.get(), 2);
|
||||||
|
|
||||||
|
let srv2 = srv.clone();
|
||||||
|
ntex::rt::spawn(async move {
|
||||||
|
let _ = srv2.not_ready().await;
|
||||||
|
});
|
||||||
|
time::sleep(time::Millis(25)).await;
|
||||||
|
assert_eq!(cnt.get(), 4);
|
||||||
|
|
||||||
srv.shutdown().await;
|
srv.shutdown().await;
|
||||||
assert_eq!(cnt_sht.get(), 2);
|
assert_eq!(cnt_sht.get(), 2);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.5.0] - 2024-11-02
|
||||||
|
|
||||||
|
* Use updated Service trait
|
||||||
|
|
||||||
## [2.4.0] - 2024-09-26
|
## [2.4.0] - 2024-09-26
|
||||||
|
|
||||||
* Remove "must_use" from `condition::Waiter`
|
* Remove "must_use" from `condition::Waiter`
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-util"
|
name = "ntex-util"
|
||||||
version = "2.4.0"
|
version = "2.5.0"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Utilities for ntex framework"
|
description = "Utilities for ntex framework"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@ -16,7 +16,7 @@ name = "ntex_util"
|
||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
ntex-service = "3"
|
ntex-service = "3.3"
|
||||||
ntex-rt = "0.4"
|
ntex-rt = "0.4"
|
||||||
bitflags = "2"
|
bitflags = "2"
|
||||||
fxhash = "0.2"
|
fxhash = "0.2"
|
||||||
|
|
|
@ -26,13 +26,17 @@ impl Counter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get counter guard.
|
/// Get counter guard.
|
||||||
pub fn get(&self) -> CounterGuard {
|
pub(crate) fn get(&self) -> CounterGuard {
|
||||||
CounterGuard::new(self.0.clone())
|
CounterGuard::new(self.0.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn is_available(&self) -> bool {
|
||||||
|
self.0.count.get() < self.0.capacity
|
||||||
|
}
|
||||||
|
|
||||||
/// Check if counter is not at capacity. If counter at capacity
|
/// Check if counter is not at capacity. If counter at capacity
|
||||||
/// it registers notification for current task.
|
/// it registers notification for current task.
|
||||||
pub async fn available(&self) {
|
pub(crate) async fn available(&self) {
|
||||||
poll_fn(|cx| {
|
poll_fn(|cx| {
|
||||||
if self.poll_available(cx) {
|
if self.poll_available(cx) {
|
||||||
task::Poll::Ready(())
|
task::Poll::Ready(())
|
||||||
|
@ -43,9 +47,20 @@ impl Counter {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn unavailable(&self) {
|
||||||
|
poll_fn(|cx| {
|
||||||
|
if self.poll_available(cx) {
|
||||||
|
task::Poll::Pending
|
||||||
|
} else {
|
||||||
|
task::Poll::Ready(())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
/// Check if counter is not at capacity. If counter at capacity
|
/// Check if counter is not at capacity. If counter at capacity
|
||||||
/// it registers notification for current task.
|
/// it registers notification for current task.
|
||||||
pub fn poll_available(&self, cx: &mut task::Context<'_>) -> bool {
|
fn poll_available(&self, cx: &mut task::Context<'_>) -> bool {
|
||||||
self.0.available(cx)
|
self.0.available(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +90,11 @@ impl Drop for CounterGuard {
|
||||||
|
|
||||||
impl CounterInner {
|
impl CounterInner {
|
||||||
fn inc(&self) {
|
fn inc(&self) {
|
||||||
self.count.set(self.count.get() + 1);
|
let num = self.count.get();
|
||||||
|
self.count.set(num + 1);
|
||||||
|
if num == self.capacity {
|
||||||
|
self.task.wake();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dec(&self) {
|
fn dec(&self) {
|
||||||
|
@ -87,10 +106,10 @@ impl CounterInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn available(&self, cx: &mut task::Context<'_>) -> bool {
|
fn available(&self, cx: &mut task::Context<'_>) -> bool {
|
||||||
|
self.task.register(cx.waker());
|
||||||
if self.count.get() < self.capacity {
|
if self.count.get() < self.capacity {
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
self.task.register(cx.waker());
|
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,8 +62,20 @@ where
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||||
self.count.available().await;
|
if !self.count.is_available() {
|
||||||
ctx.ready(&self.service).await
|
let (_, res) =
|
||||||
|
crate::future::join(self.count.available(), ctx.ready(&self.service)).await;
|
||||||
|
res
|
||||||
|
} else {
|
||||||
|
ctx.ready(&self.service).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
if self.count.is_available() {
|
||||||
|
crate::future::select(self.count.unavailable(), self.service.not_ready()).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -113,10 +125,12 @@ mod tests {
|
||||||
});
|
});
|
||||||
crate::time::sleep(Duration::from_millis(25)).await;
|
crate::time::sleep(Duration::from_millis(25)).await;
|
||||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
|
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
|
||||||
|
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(()));
|
||||||
|
|
||||||
let _ = tx.send(());
|
let _ = tx.send(());
|
||||||
crate::time::sleep(Duration::from_millis(25)).await;
|
crate::time::sleep(Duration::from_millis(25)).await;
|
||||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||||
|
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
|
||||||
srv.shutdown().await;
|
srv.shutdown().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{cell::Cell, convert::Infallible, fmt, future::poll_fn, marker, task, time};
|
use std::{cell::Cell, convert::Infallible, fmt, marker, time};
|
||||||
|
|
||||||
use ntex_service::{Service, ServiceCtx, ServiceFactory};
|
use ntex_service::{Service, ServiceCtx, ServiceFactory};
|
||||||
|
|
||||||
|
@ -111,23 +111,28 @@ where
|
||||||
type Error = E;
|
type Error = E;
|
||||||
|
|
||||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||||
poll_fn(|cx| match self.sleep.poll_elapsed(cx) {
|
let expire = self.expire.get() + time::Duration::from(self.dur);
|
||||||
task::Poll::Ready(_) => {
|
if expire <= now() {
|
||||||
let now = now();
|
Err((self.f)())
|
||||||
let expire = self.expire.get() + time::Duration::from(self.dur);
|
} else {
|
||||||
if expire <= now {
|
Ok(())
|
||||||
task::Poll::Ready(Err((self.f)()))
|
}
|
||||||
} else {
|
}
|
||||||
let expire = expire - now;
|
|
||||||
self.sleep
|
async fn not_ready(&self) {
|
||||||
.reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX)));
|
loop {
|
||||||
let _ = self.sleep.poll_elapsed(cx);
|
self.sleep.wait().await;
|
||||||
task::Poll::Ready(Ok(()))
|
|
||||||
}
|
let now = now();
|
||||||
|
let expire = self.expire.get() + time::Duration::from(self.dur);
|
||||||
|
if expire <= now {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
let expire = expire - now;
|
||||||
|
self.sleep
|
||||||
|
.reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX)));
|
||||||
}
|
}
|
||||||
task::Poll::Pending => task::Poll::Ready(Ok(())),
|
}
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result<R, E> {
|
async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result<R, E> {
|
||||||
|
@ -157,11 +162,13 @@ mod tests {
|
||||||
|
|
||||||
assert_eq!(service.call(1usize).await, Ok(1usize));
|
assert_eq!(service.call(1usize).await, Ok(1usize));
|
||||||
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());
|
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());
|
||||||
|
assert!(!lazy(|cx| service.poll_not_ready(cx)).await.is_ready());
|
||||||
|
|
||||||
sleep(Millis(500)).await;
|
sleep(Millis(500)).await;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
lazy(|cx| service.poll_ready(cx)).await,
|
lazy(|cx| service.poll_ready(cx)).await,
|
||||||
Poll::Ready(Err(TestErr))
|
Poll::Ready(Err(TestErr))
|
||||||
);
|
);
|
||||||
|
assert!(lazy(|cx| service.poll_not_ready(cx)).await.is_ready());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,19 +51,38 @@ where
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||||
poll_fn(|cx| {
|
if !self.ready.get() {
|
||||||
self.waker.register(cx.waker());
|
poll_fn(|cx| {
|
||||||
if self.ready.get() {
|
self.waker.register(cx.waker());
|
||||||
Poll::Ready(())
|
if self.ready.get() {
|
||||||
} else {
|
Poll::Ready(())
|
||||||
Poll::Pending
|
} else {
|
||||||
}
|
Poll::Pending
|
||||||
})
|
}
|
||||||
.await;
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
ctx.ready(&self.service).await
|
ctx.ready(&self.service).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
if self.ready.get() {
|
||||||
|
crate::future::select(
|
||||||
|
poll_fn(|cx| {
|
||||||
|
self.waker.register(cx.waker());
|
||||||
|
if self.ready.get() {
|
||||||
|
Poll::Pending
|
||||||
|
} else {
|
||||||
|
Poll::Ready(())
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
self.service.not_ready(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn call(
|
async fn call(
|
||||||
&self,
|
&self,
|
||||||
|
@ -71,6 +90,7 @@ where
|
||||||
ctx: ServiceCtx<'_, Self>,
|
ctx: ServiceCtx<'_, Self>,
|
||||||
) -> Result<Self::Response, Self::Error> {
|
) -> Result<Self::Response, Self::Error> {
|
||||||
self.ready.set(false);
|
self.ready.set(false);
|
||||||
|
self.waker.wake();
|
||||||
|
|
||||||
let result = ctx.call(&self.service, req).await;
|
let result = ctx.call(&self.service, req).await;
|
||||||
self.ready.set(true);
|
self.ready.set(true);
|
||||||
|
@ -107,6 +127,7 @@ mod tests {
|
||||||
|
|
||||||
let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind();
|
let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind();
|
||||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||||
|
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
|
||||||
|
|
||||||
let srv2 = srv.clone();
|
let srv2 = srv.clone();
|
||||||
ntex::rt::spawn(async move {
|
ntex::rt::spawn(async move {
|
||||||
|
@ -114,10 +135,12 @@ mod tests {
|
||||||
});
|
});
|
||||||
crate::time::sleep(Duration::from_millis(25)).await;
|
crate::time::sleep(Duration::from_millis(25)).await;
|
||||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
|
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
|
||||||
|
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(()));
|
||||||
|
|
||||||
let _ = tx.send(());
|
let _ = tx.send(());
|
||||||
crate::time::sleep(Duration::from_millis(25)).await;
|
crate::time::sleep(Duration::from_millis(25)).await;
|
||||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||||
|
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
|
||||||
srv.shutdown().await;
|
srv.shutdown().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -143,6 +143,25 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn not_ready(&self) {
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
|
let mut fut1 = ::std::pin::pin!(self.V1.not_ready());
|
||||||
|
$(let mut $T = ::std::pin::pin!(self.$T.not_ready());)+
|
||||||
|
|
||||||
|
::std::future::poll_fn(|cx| {
|
||||||
|
if Pin::new(&mut fut1).poll(cx).is_ready() {
|
||||||
|
return Poll::Ready(())
|
||||||
|
}
|
||||||
|
|
||||||
|
$(if Pin::new(&mut $T).poll(cx).is_ready() {
|
||||||
|
return Poll::Ready(());
|
||||||
|
})+
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn shutdown(&self) {
|
async fn shutdown(&self) {
|
||||||
self.V1.shutdown().await;
|
self.V1.shutdown().await;
|
||||||
$(self.$T.shutdown().await;)+
|
$(self.$T.shutdown().await;)+
|
||||||
|
@ -234,6 +253,7 @@ variant_impl_and!(VariantFactory7, VariantFactory8, V8, V8R, v8, (V2, V3, V4, V5
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use ntex_service::fn_factory;
|
use ntex_service::fn_factory;
|
||||||
|
use std::{future::poll_fn, future::Future, pin};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
@ -282,6 +302,15 @@ mod tests {
|
||||||
.clone();
|
.clone();
|
||||||
let service = factory.pipeline(&()).await.unwrap().clone();
|
let service = factory.pipeline(&()).await.unwrap().clone();
|
||||||
|
|
||||||
|
let mut f = pin::pin!(service.not_ready());
|
||||||
|
let _ = poll_fn(|cx| {
|
||||||
|
if pin::Pin::new(&mut f).poll(cx).is_pending() {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
assert!(service.ready().await.is_ok());
|
assert!(service.ready().await.is_ok());
|
||||||
service.shutdown().await;
|
service.shutdown().await;
|
||||||
|
|
||||||
|
|
|
@ -112,6 +112,12 @@ impl Sleep {
|
||||||
self.hnd.reset(millis.into().0 as u64);
|
self.hnd.reset(millis.into().0 as u64);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
/// Wait when `Sleep` instance get elapsed.
|
||||||
|
pub async fn wait(&self) {
|
||||||
|
poll_fn(|cx| self.hnd.poll_elapsed(cx)).await
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
|
pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
|
||||||
self.hnd.poll_elapsed(cx)
|
self.hnd.poll_elapsed(cx)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue