diff --git a/README.md b/README.md index fbf20228..73735566 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ [![Version](https://img.shields.io/badge/rustc-1.75+-lightgray.svg)](https://blog.rust-lang.org/2023/12/28/Rust-1.75.0.html) ![License](https://img.shields.io/crates/l/ntex.svg) [![codecov](https://codecov.io/gh/ntex-rs/ntex/branch/master/graph/badge.svg)](https://codecov.io/gh/ntex-rs/ntex) -[![Chat on Discord](https://img.shields.io/discord/919288597826387979?label=chat&logo=discord)](https://discord.com/channels/919288597826387979/919288597826387982) +[![Chat on Discord](https://img.shields.io/discord/919288597826387979?label=chat&logo=discord)](https://discord.gg/4GtaeP5Uqu)

diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index 4ea3e706..c7168e61 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -31,7 +31,7 @@ where } #[inline] - async fn not_ready(&self) -> Result<(), Self::Error> { + async fn not_ready(&self) { util::select(self.svc1.not_ready(), self.svc2.not_ready()).await } @@ -88,6 +88,7 @@ where #[cfg(test)] mod tests { + use ntex_util::time; use std::{cell::Cell, rc::Rc}; use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx}; @@ -104,6 +105,11 @@ mod tests { Ok(()) } + async fn not_ready(&self) { + self.0.set(self.0.get() + 1); + std::future::pending().await + } + async fn call( &self, req: &'static str, @@ -129,6 +135,11 @@ mod tests { Ok(()) } + async fn not_ready(&self) { + self.0.set(self.0.get() + 1); + std::future::pending().await + } + async fn call( &self, req: &'static str, @@ -155,6 +166,14 @@ mod tests { let res = srv.ready().await; assert_eq!(res, Ok(())); assert_eq!(cnt.get(), 2); + + let srv2 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv2.not_ready().await; + }); + time::sleep(time::Millis(25)).await; + assert_eq!(cnt.get(), 4); + srv.shutdown().await; assert_eq!(cnt_sht.get(), 2); } diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 985cd968..43640e0c 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -104,16 +104,6 @@ where self.service.ready().await.map_err(From::from) } - #[inline] - async fn not_ready(&self) -> Result<(), Err> { - self.service.get_ref().not_ready().await.map_err(From::from) - } - - #[inline] - async fn shutdown(&self) { - self.service.shutdown().await - } - #[inline] async fn call( &self, @@ -122,6 +112,9 @@ where ) -> Result { (self.f)(req, self.service.clone()).await } + + crate::forward_notready!(service); + crate::forward_shutdown!(service); } /// `apply()` service factory @@ -228,6 +221,10 @@ mod tests { Ok(()) } + async fn not_ready(&self) { + self.0.set(self.0.get() + 1); + } + async fn shutdown(&self) { self.0.set(self.0.get() + 1); } @@ -256,9 +253,12 @@ mod tests { assert_eq!(srv.ready().await, Ok::<_, Err>(())); - srv.shutdown().await; + srv.not_ready().await; assert_eq!(cnt_sht.get(), 1); + srv.shutdown().await; + assert_eq!(cnt_sht.get(), 2); + let res = srv.call("srv").await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv", ())); diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index d940d454..7e63fbea 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -54,7 +54,7 @@ trait ServiceObj { waiters: &'a WaitersRef, ) -> BoxFuture<'a, (), Self::Error>; - fn not_ready(&self) -> BoxFuture<'_, (), Self::Error>; + fn not_ready<'a>(&'a self) -> Pin + 'a>>; fn call<'a>( &'a self, @@ -84,7 +84,7 @@ where } #[inline] - fn not_ready(&self) -> BoxFuture<'_, (), Self::Error> { + fn not_ready<'a>(&'a self) -> Pin + 'a>> { Box::pin(crate::Service::not_ready(self)) } @@ -159,7 +159,7 @@ where } #[inline] - async fn not_ready(&self) -> Result<(), Self::Error> { + async fn not_ready(&self) { self.0.not_ready().await } diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index fe207766..2715c40a 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -120,7 +120,7 @@ pub trait Service { #[inline] /// Returns when the service is not able to process requests. - async fn not_ready(&self) -> Result<(), Self::Error> { + async fn not_ready(&self) { std::future::pending().await } @@ -253,7 +253,7 @@ where } #[inline] - async fn not_ready(&self) -> Result<(), S::Error> { + async fn not_ready(&self) { (**self).not_ready().await } @@ -285,7 +285,7 @@ where } #[inline] - async fn not_ready(&self) -> Result<(), S::Error> { + async fn not_ready(&self) { (**self).not_ready().await } diff --git a/ntex-service/src/macros.rs b/ntex-service/src/macros.rs index 6d57c25b..d951775d 100644 --- a/ntex-service/src/macros.rs +++ b/ntex-service/src/macros.rs @@ -13,11 +13,8 @@ macro_rules! forward_ready { } #[inline] - async fn not_ready(&self) -> Result<(), Self::Error> { - self.$field - .not_ready() - .await - .map_err(::core::convert::Into::into) + async fn not_ready(&self) { + self.$field.not_ready().await } }; ($field:ident, $err:expr) => { @@ -30,8 +27,19 @@ macro_rules! forward_ready { } #[inline] - async fn not_ready(&self) -> Result<(), Self::Error> { - self.$field.not_ready().await.map_err($err) + async fn not_ready(&self) { + self.$field.not_ready().await + } + }; +} + +/// An implementation of [`not_ready`] that forwards not_ready call to a field. +#[macro_export] +macro_rules! forward_notready { + ($field:ident) => { + #[inline] + async fn not_ready(&self) { + self.$field.not_ready().await } }; } diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index f2073de5..544b0f7e 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -67,11 +67,6 @@ where ctx.ready(&self.service).await.map_err(&self.f) } - #[inline] - async fn not_ready(&self) -> Result<(), Self::Error> { - self.service.not_ready().await.map_err(&self.f) - } - #[inline] async fn call( &self, @@ -82,6 +77,7 @@ where } crate::forward_shutdown!(service); + crate::forward_notready!(service); } /// Factory for the `map_err` combinator, changing the type of a new diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index cf9b15cd..5bed092f 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -40,7 +40,7 @@ impl Pipeline { } #[inline] - /// Returns when the service is able to process requests. + /// Returns when the pipeline is able to process requests. pub async fn ready(&self) -> Result<(), S::Error> where S: Service, @@ -50,6 +50,15 @@ impl Pipeline { .await } + #[inline] + /// Returns when the pipeline is not able to process requests. + pub async fn not_ready(&self) + where + S: Service, + { + self.state.svc.not_ready().await + } + #[inline] /// Wait for service readiness and then create future object /// that resolves to service result. @@ -160,7 +169,7 @@ where { pl: Pipeline, st: cell::UnsafeCell>, - not_ready: cell::UnsafeCell>, + not_ready: cell::UnsafeCell, } enum State { @@ -169,9 +178,9 @@ enum State { Shutdown(Pin + 'static>>), } -enum StateNotReady { +enum StateNotReady { New, - Readiness(Pin> + 'static>>), + Readiness(Pin>>), } impl PipelineBinding @@ -221,7 +230,7 @@ where #[inline] /// Returns when the pipeline is not able to process requests. - pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll<()> { let st = unsafe { &mut *self.not_ready.get() }; match st { @@ -375,7 +384,7 @@ where .ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref())) } -fn not_ready(pl: &'static Pipeline) -> impl Future> +fn not_ready(pl: &'static Pipeline) -> impl Future where S: Service, R: 'static, diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 5b039a8e..2b733698 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -31,7 +31,7 @@ where } #[inline] - async fn not_ready(&self) -> Result<(), Self::Error> { + async fn not_ready(&self) { util::select(self.svc1.not_ready(), self.svc2.not_ready()).await } @@ -91,6 +91,7 @@ where #[cfg(test)] mod tests { + use ntex_util::time; use std::{cell::Cell, rc::Rc}; use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx}; @@ -107,6 +108,11 @@ mod tests { Ok(()) } + async fn not_ready(&self) { + self.0.set(self.0.get() + 1); + std::future::pending().await + } + async fn call( &self, req: Result<&'static str, &'static str>, @@ -135,6 +141,11 @@ mod tests { Ok(()) } + async fn not_ready(&self) { + self.0.set(self.0.get() + 1); + std::future::pending().await + } + async fn call( &self, req: Result<&'static str, ()>, @@ -161,6 +172,14 @@ mod tests { let res = srv.ready().await; assert_eq!(res, Ok(())); assert_eq!(cnt.get(), 2); + + let srv2 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv2.not_ready().await; + }); + time::sleep(time::Millis(25)).await; + assert_eq!(cnt.get(), 4); + srv.shutdown().await; assert_eq!(cnt_sht.get(), 2); } diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index f5fc2d4b..6cbd96a6 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.0] - 2024-11-02 + +* Use updated Service trait + ## [2.4.0] - 2024-09-26 * Remove "must_use" from `condition::Waiter` diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index dcbfbe51..bc496e0b 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "2.4.0" +version = "2.5.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,7 +16,7 @@ name = "ntex_util" path = "src/lib.rs" [dependencies] -ntex-service = "3" +ntex-service = "3.3" ntex-rt = "0.4" bitflags = "2" fxhash = "0.2" diff --git a/ntex-util/src/services/counter.rs b/ntex-util/src/services/counter.rs index 2cbbfe54..a8a1e8da 100644 --- a/ntex-util/src/services/counter.rs +++ b/ntex-util/src/services/counter.rs @@ -26,13 +26,17 @@ impl Counter { } /// Get counter guard. - pub fn get(&self) -> CounterGuard { + pub(crate) fn get(&self) -> CounterGuard { CounterGuard::new(self.0.clone()) } + pub(crate) fn is_available(&self) -> bool { + self.0.count.get() < self.0.capacity + } + /// Check if counter is not at capacity. If counter at capacity /// it registers notification for current task. - pub async fn available(&self) { + pub(crate) async fn available(&self) { poll_fn(|cx| { if self.poll_available(cx) { task::Poll::Ready(()) @@ -43,15 +47,21 @@ impl Counter { .await } - /// Check if counter is not at capacity. If counter at capacity - /// it registers notification for current task. - pub fn poll_available(&self, cx: &mut task::Context<'_>) -> bool { - self.0.available(cx) + pub(crate) async fn unavailable(&self) { + poll_fn(|cx| { + if self.poll_available(cx) { + task::Poll::Pending + } else { + task::Poll::Ready(()) + } + }) + .await } - /// Get total number of acquired counts - pub fn total(&self) -> usize { - self.0.count.get() + /// Check if counter is not at capacity. If counter at capacity + /// it registers notification for current task. + fn poll_available(&self, cx: &mut task::Context<'_>) -> bool { + self.0.available(cx) } } @@ -75,7 +85,11 @@ impl Drop for CounterGuard { impl CounterInner { fn inc(&self) { - self.count.set(self.count.get() + 1); + let num = self.count.get() + 1; + self.count.set(num); + if num == self.capacity { + self.task.wake(); + } } fn dec(&self) { @@ -87,10 +101,10 @@ impl CounterInner { } fn available(&self, cx: &mut task::Context<'_>) -> bool { + self.task.register(cx.waker()); if self.count.get() < self.capacity { true } else { - self.task.register(cx.waker()); false } } diff --git a/ntex-util/src/services/inflight.rs b/ntex-util/src/services/inflight.rs index bf48df00..692a1be7 100644 --- a/ntex-util/src/services/inflight.rs +++ b/ntex-util/src/services/inflight.rs @@ -62,8 +62,20 @@ where #[inline] async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { - self.count.available().await; - ctx.ready(&self.service).await + if !self.count.is_available() { + let (_, res) = + crate::future::join(self.count.available(), ctx.ready(&self.service)).await; + res + } else { + ctx.ready(&self.service).await + } + } + + #[inline] + async fn not_ready(&self) { + if self.count.is_available() { + crate::future::select(self.count.unavailable(), self.service.not_ready()).await; + } } #[inline] @@ -106,6 +118,7 @@ mod tests { let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); + assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); let srv2 = srv.clone(); ntex::rt::spawn(async move { @@ -113,10 +126,12 @@ mod tests { }); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); + assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(())); let _ = tx.send(()); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); + assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); srv.shutdown().await; } diff --git a/ntex-util/src/services/keepalive.rs b/ntex-util/src/services/keepalive.rs index 5932e4e9..c1304b21 100644 --- a/ntex-util/src/services/keepalive.rs +++ b/ntex-util/src/services/keepalive.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, convert::Infallible, fmt, future::poll_fn, marker, task, time}; +use std::{cell::Cell, convert::Infallible, fmt, marker, time}; use ntex_service::{Service, ServiceCtx, ServiceFactory}; @@ -111,23 +111,28 @@ where type Error = E; async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { - poll_fn(|cx| match self.sleep.poll_elapsed(cx) { - task::Poll::Ready(_) => { - let now = now(); - let expire = self.expire.get() + time::Duration::from(self.dur); - if expire <= now { - task::Poll::Ready(Err((self.f)())) - } else { - let expire = expire - now; - self.sleep - .reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX))); - let _ = self.sleep.poll_elapsed(cx); - task::Poll::Ready(Ok(())) - } + let expire = self.expire.get() + time::Duration::from(self.dur); + if expire <= now() { + Err((self.f)()) + } else { + Ok(()) + } + } + + async fn not_ready(&self) { + loop { + self.sleep.wait().await; + + let now = now(); + let expire = self.expire.get() + time::Duration::from(self.dur); + if expire <= now { + return; + } else { + let expire = expire - now; + self.sleep + .reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX))); } - task::Poll::Pending => task::Poll::Ready(Ok(())), - }) - .await + } } async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result { @@ -157,11 +162,13 @@ mod tests { assert_eq!(service.call(1usize).await, Ok(1usize)); assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); + assert!(!lazy(|cx| service.poll_not_ready(cx)).await.is_ready()); sleep(Millis(500)).await; assert_eq!( lazy(|cx| service.poll_ready(cx)).await, Poll::Ready(Err(TestErr)) ); + assert!(lazy(|cx| service.poll_not_ready(cx)).await.is_ready()); } } diff --git a/ntex-util/src/services/onerequest.rs b/ntex-util/src/services/onerequest.rs index 7c7b5185..5e9e43c4 100644 --- a/ntex-util/src/services/onerequest.rs +++ b/ntex-util/src/services/onerequest.rs @@ -51,19 +51,38 @@ where #[inline] async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { - poll_fn(|cx| { - self.waker.register(cx.waker()); - if self.ready.get() { - Poll::Ready(()) - } else { - Poll::Pending - } - }) - .await; - + if !self.ready.get() { + poll_fn(|cx| { + self.waker.register(cx.waker()); + if self.ready.get() { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await + } ctx.ready(&self.service).await } + #[inline] + async fn not_ready(&self) { + if self.ready.get() { + crate::future::select( + poll_fn(|cx| { + self.waker.register(cx.waker()); + if self.ready.get() { + Poll::Pending + } else { + Poll::Ready(()) + } + }), + self.service.not_ready(), + ) + .await; + } + } + #[inline] async fn call( &self, @@ -71,6 +90,7 @@ where ctx: ServiceCtx<'_, Self>, ) -> Result { self.ready.set(false); + self.waker.wake(); let result = ctx.call(&self.service, req).await; self.ready.set(true); @@ -107,6 +127,7 @@ mod tests { let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); + assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); let srv2 = srv.clone(); ntex::rt::spawn(async move { @@ -114,10 +135,12 @@ mod tests { }); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); + assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(())); let _ = tx.send(()); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); + assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); srv.shutdown().await; } diff --git a/ntex-util/src/services/variant.rs b/ntex-util/src/services/variant.rs index bc8c26a5..1605f364 100644 --- a/ntex-util/src/services/variant.rs +++ b/ntex-util/src/services/variant.rs @@ -143,6 +143,25 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, }).await } + async fn not_ready(&self) { + use std::{future::Future, pin::Pin}; + + let mut fut1 = ::std::pin::pin!(self.V1.not_ready()); + $(let mut $T = ::std::pin::pin!(self.$T.not_ready());)+ + + ::std::future::poll_fn(|cx| { + if Pin::new(&mut fut1).poll(cx).is_ready() { + return Poll::Ready(()) + } + + $(if Pin::new(&mut $T).poll(cx).is_ready() { + return Poll::Ready(()); + })+ + + Poll::Pending + }).await + } + async fn shutdown(&self) { self.V1.shutdown().await; $(self.$T.shutdown().await;)+ @@ -175,7 +194,7 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, impl fmt::Debug for $fac_type { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct(stringify!(fac_type)) + f.debug_struct("Variant") .field("V1", &self.V1) $(.field(stringify!($T), &self.$T))+ .finish() @@ -234,6 +253,7 @@ variant_impl_and!(VariantFactory7, VariantFactory8, V8, V8R, v8, (V2, V3, V4, V5 #[cfg(test)] mod tests { use ntex_service::fn_factory; + use std::{future::poll_fn, future::Future, pin}; use super::*; @@ -275,13 +295,28 @@ mod tests { #[ntex_macros::rt_test2] async fn test_variant() { - let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) })) + let factory = variant(fn_factory(|| async { Ok::<_, ()>(Srv1) })); + assert!(format!("{:?}", factory).contains("Variant")); + + let factory = factory .v2(fn_factory(|| async { Ok::<_, ()>(Srv2) })) .clone() .v3(fn_factory(|| async { Ok::<_, ()>(Srv2) })) .clone(); + assert!(format!("{:?}", factory).contains("Variant")); + let service = factory.pipeline(&()).await.unwrap().clone(); + let mut f = pin::pin!(service.not_ready()); + let _ = poll_fn(|cx| { + if pin::Pin::new(&mut f).poll(cx).is_pending() { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + assert!(service.ready().await.is_ok()); service.shutdown().await; diff --git a/ntex-util/src/time/mod.rs b/ntex-util/src/time/mod.rs index a33ab6f1..3d181b6c 100644 --- a/ntex-util/src/time/mod.rs +++ b/ntex-util/src/time/mod.rs @@ -112,6 +112,12 @@ impl Sleep { self.hnd.reset(millis.into().0 as u64); } + #[inline] + /// Wait when `Sleep` instance get elapsed. + pub async fn wait(&self) { + poll_fn(|cx| self.hnd.poll_elapsed(cx)).await + } + #[inline] pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> { self.hnd.poll_elapsed(cx) @@ -160,6 +166,12 @@ impl Deadline { } } + #[inline] + /// Wait when `Sleep` instance get elapsed. + pub async fn wait(&self) { + poll_fn(|cx| self.poll_elapsed(cx)).await + } + /// Resets the `Deadline` instance to a new deadline. /// /// Calling this function allows changing the instant at which the `Deadline`