mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 21:07:39 +03:00
Use updated Service trait (#450)
This commit is contained in:
parent
011e9cdfea
commit
8288fc0364
17 changed files with 243 additions and 82 deletions
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [2.5.0] - 2024-11-02
|
||||
|
||||
* Use updated Service trait
|
||||
|
||||
## [2.4.0] - 2024-09-26
|
||||
|
||||
* Remove "must_use" from `condition::Waiter`
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-util"
|
||||
version = "2.4.0"
|
||||
version = "2.5.0"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "Utilities for ntex framework"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -16,7 +16,7 @@ name = "ntex_util"
|
|||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
ntex-service = "3"
|
||||
ntex-service = "3.3"
|
||||
ntex-rt = "0.4"
|
||||
bitflags = "2"
|
||||
fxhash = "0.2"
|
||||
|
|
|
@ -26,13 +26,17 @@ impl Counter {
|
|||
}
|
||||
|
||||
/// Get counter guard.
|
||||
pub fn get(&self) -> CounterGuard {
|
||||
pub(crate) fn get(&self) -> CounterGuard {
|
||||
CounterGuard::new(self.0.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn is_available(&self) -> bool {
|
||||
self.0.count.get() < self.0.capacity
|
||||
}
|
||||
|
||||
/// Check if counter is not at capacity. If counter at capacity
|
||||
/// it registers notification for current task.
|
||||
pub async fn available(&self) {
|
||||
pub(crate) async fn available(&self) {
|
||||
poll_fn(|cx| {
|
||||
if self.poll_available(cx) {
|
||||
task::Poll::Ready(())
|
||||
|
@ -43,15 +47,21 @@ impl Counter {
|
|||
.await
|
||||
}
|
||||
|
||||
/// Check if counter is not at capacity. If counter at capacity
|
||||
/// it registers notification for current task.
|
||||
pub fn poll_available(&self, cx: &mut task::Context<'_>) -> bool {
|
||||
self.0.available(cx)
|
||||
pub(crate) async fn unavailable(&self) {
|
||||
poll_fn(|cx| {
|
||||
if self.poll_available(cx) {
|
||||
task::Poll::Pending
|
||||
} else {
|
||||
task::Poll::Ready(())
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get total number of acquired counts
|
||||
pub fn total(&self) -> usize {
|
||||
self.0.count.get()
|
||||
/// Check if counter is not at capacity. If counter at capacity
|
||||
/// it registers notification for current task.
|
||||
fn poll_available(&self, cx: &mut task::Context<'_>) -> bool {
|
||||
self.0.available(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,7 +85,11 @@ impl Drop for CounterGuard {
|
|||
|
||||
impl CounterInner {
|
||||
fn inc(&self) {
|
||||
self.count.set(self.count.get() + 1);
|
||||
let num = self.count.get() + 1;
|
||||
self.count.set(num);
|
||||
if num == self.capacity {
|
||||
self.task.wake();
|
||||
}
|
||||
}
|
||||
|
||||
fn dec(&self) {
|
||||
|
@ -87,10 +101,10 @@ impl CounterInner {
|
|||
}
|
||||
|
||||
fn available(&self, cx: &mut task::Context<'_>) -> bool {
|
||||
self.task.register(cx.waker());
|
||||
if self.count.get() < self.capacity {
|
||||
true
|
||||
} else {
|
||||
self.task.register(cx.waker());
|
||||
false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,8 +62,20 @@ where
|
|||
|
||||
#[inline]
|
||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
self.count.available().await;
|
||||
ctx.ready(&self.service).await
|
||||
if !self.count.is_available() {
|
||||
let (_, res) =
|
||||
crate::future::join(self.count.available(), ctx.ready(&self.service)).await;
|
||||
res
|
||||
} else {
|
||||
ctx.ready(&self.service).await
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) {
|
||||
if self.count.is_available() {
|
||||
crate::future::select(self.count.unavailable(), self.service.not_ready()).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -106,6 +118,7 @@ mod tests {
|
|||
|
||||
let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
|
||||
|
||||
let srv2 = srv.clone();
|
||||
ntex::rt::spawn(async move {
|
||||
|
@ -113,10 +126,12 @@ mod tests {
|
|||
});
|
||||
crate::time::sleep(Duration::from_millis(25)).await;
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
|
||||
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(()));
|
||||
|
||||
let _ = tx.send(());
|
||||
crate::time::sleep(Duration::from_millis(25)).await;
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
|
||||
srv.shutdown().await;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{cell::Cell, convert::Infallible, fmt, future::poll_fn, marker, task, time};
|
||||
use std::{cell::Cell, convert::Infallible, fmt, marker, time};
|
||||
|
||||
use ntex_service::{Service, ServiceCtx, ServiceFactory};
|
||||
|
||||
|
@ -111,23 +111,28 @@ where
|
|||
type Error = E;
|
||||
|
||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
poll_fn(|cx| match self.sleep.poll_elapsed(cx) {
|
||||
task::Poll::Ready(_) => {
|
||||
let now = now();
|
||||
let expire = self.expire.get() + time::Duration::from(self.dur);
|
||||
if expire <= now {
|
||||
task::Poll::Ready(Err((self.f)()))
|
||||
} else {
|
||||
let expire = expire - now;
|
||||
self.sleep
|
||||
.reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX)));
|
||||
let _ = self.sleep.poll_elapsed(cx);
|
||||
task::Poll::Ready(Ok(()))
|
||||
}
|
||||
let expire = self.expire.get() + time::Duration::from(self.dur);
|
||||
if expire <= now() {
|
||||
Err((self.f)())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn not_ready(&self) {
|
||||
loop {
|
||||
self.sleep.wait().await;
|
||||
|
||||
let now = now();
|
||||
let expire = self.expire.get() + time::Duration::from(self.dur);
|
||||
if expire <= now {
|
||||
return;
|
||||
} else {
|
||||
let expire = expire - now;
|
||||
self.sleep
|
||||
.reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX)));
|
||||
}
|
||||
task::Poll::Pending => task::Poll::Ready(Ok(())),
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result<R, E> {
|
||||
|
@ -157,11 +162,13 @@ mod tests {
|
|||
|
||||
assert_eq!(service.call(1usize).await, Ok(1usize));
|
||||
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());
|
||||
assert!(!lazy(|cx| service.poll_not_ready(cx)).await.is_ready());
|
||||
|
||||
sleep(Millis(500)).await;
|
||||
assert_eq!(
|
||||
lazy(|cx| service.poll_ready(cx)).await,
|
||||
Poll::Ready(Err(TestErr))
|
||||
);
|
||||
assert!(lazy(|cx| service.poll_not_ready(cx)).await.is_ready());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,19 +51,38 @@ where
|
|||
|
||||
#[inline]
|
||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
poll_fn(|cx| {
|
||||
self.waker.register(cx.waker());
|
||||
if self.ready.get() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
if !self.ready.get() {
|
||||
poll_fn(|cx| {
|
||||
self.waker.register(cx.waker());
|
||||
if self.ready.get() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
ctx.ready(&self.service).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn not_ready(&self) {
|
||||
if self.ready.get() {
|
||||
crate::future::select(
|
||||
poll_fn(|cx| {
|
||||
self.waker.register(cx.waker());
|
||||
if self.ready.get() {
|
||||
Poll::Pending
|
||||
} else {
|
||||
Poll::Ready(())
|
||||
}
|
||||
}),
|
||||
self.service.not_ready(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn call(
|
||||
&self,
|
||||
|
@ -71,6 +90,7 @@ where
|
|||
ctx: ServiceCtx<'_, Self>,
|
||||
) -> Result<Self::Response, Self::Error> {
|
||||
self.ready.set(false);
|
||||
self.waker.wake();
|
||||
|
||||
let result = ctx.call(&self.service, req).await;
|
||||
self.ready.set(true);
|
||||
|
@ -107,6 +127,7 @@ mod tests {
|
|||
|
||||
let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind();
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
|
||||
|
||||
let srv2 = srv.clone();
|
||||
ntex::rt::spawn(async move {
|
||||
|
@ -114,10 +135,12 @@ mod tests {
|
|||
});
|
||||
crate::time::sleep(Duration::from_millis(25)).await;
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
|
||||
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(()));
|
||||
|
||||
let _ = tx.send(());
|
||||
crate::time::sleep(Duration::from_millis(25)).await;
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
|
||||
srv.shutdown().await;
|
||||
}
|
||||
|
||||
|
|
|
@ -143,6 +143,25 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
|
|||
}).await
|
||||
}
|
||||
|
||||
async fn not_ready(&self) {
|
||||
use std::{future::Future, pin::Pin};
|
||||
|
||||
let mut fut1 = ::std::pin::pin!(self.V1.not_ready());
|
||||
$(let mut $T = ::std::pin::pin!(self.$T.not_ready());)+
|
||||
|
||||
::std::future::poll_fn(|cx| {
|
||||
if Pin::new(&mut fut1).poll(cx).is_ready() {
|
||||
return Poll::Ready(())
|
||||
}
|
||||
|
||||
$(if Pin::new(&mut $T).poll(cx).is_ready() {
|
||||
return Poll::Ready(());
|
||||
})+
|
||||
|
||||
Poll::Pending
|
||||
}).await
|
||||
}
|
||||
|
||||
async fn shutdown(&self) {
|
||||
self.V1.shutdown().await;
|
||||
$(self.$T.shutdown().await;)+
|
||||
|
@ -175,7 +194,7 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
|
|||
|
||||
impl<V1: fmt::Debug, V1C, $($T: fmt::Debug,)+ V1R, $($R,)+> fmt::Debug for $fac_type<V1, V1C, $($T,)+ V1R, $($R,)+> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct(stringify!(fac_type))
|
||||
f.debug_struct("Variant")
|
||||
.field("V1", &self.V1)
|
||||
$(.field(stringify!($T), &self.$T))+
|
||||
.finish()
|
||||
|
@ -234,6 +253,7 @@ variant_impl_and!(VariantFactory7, VariantFactory8, V8, V8R, v8, (V2, V3, V4, V5
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ntex_service::fn_factory;
|
||||
use std::{future::poll_fn, future::Future, pin};
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -275,13 +295,28 @@ mod tests {
|
|||
|
||||
#[ntex_macros::rt_test2]
|
||||
async fn test_variant() {
|
||||
let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) }))
|
||||
let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) }));
|
||||
assert!(format!("{:?}", factory).contains("Variant"));
|
||||
|
||||
let factory = factory
|
||||
.v2(fn_factory(|| async { Ok::<_, ()>(Srv2) }))
|
||||
.clone()
|
||||
.v3(fn_factory(|| async { Ok::<_, ()>(Srv2) }))
|
||||
.clone();
|
||||
assert!(format!("{:?}", factory).contains("Variant"));
|
||||
|
||||
let service = factory.pipeline(&()).await.unwrap().clone();
|
||||
|
||||
let mut f = pin::pin!(service.not_ready());
|
||||
let _ = poll_fn(|cx| {
|
||||
if pin::Pin::new(&mut f).poll(cx).is_pending() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(service.ready().await.is_ok());
|
||||
service.shutdown().await;
|
||||
|
||||
|
|
|
@ -112,6 +112,12 @@ impl Sleep {
|
|||
self.hnd.reset(millis.into().0 as u64);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Wait when `Sleep` instance get elapsed.
|
||||
pub async fn wait(&self) {
|
||||
poll_fn(|cx| self.hnd.poll_elapsed(cx)).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> {
|
||||
self.hnd.poll_elapsed(cx)
|
||||
|
@ -160,6 +166,12 @@ impl Deadline {
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Wait when `Sleep` instance get elapsed.
|
||||
pub async fn wait(&self) {
|
||||
poll_fn(|cx| self.poll_elapsed(cx)).await
|
||||
}
|
||||
|
||||
/// Resets the `Deadline` instance to a new deadline.
|
||||
///
|
||||
/// Calling this function allows changing the instant at which the `Deadline`
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue