diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 9ab1f0ac..c391cb0a 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.9.0] - 2024-12-04 + +* Use updated Service trait + ## [2.8.3] - 2024-11-10 * Check service readiness once per decoded item diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 965a9029..9e933195 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.8.3" +version = "2.9.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6" ntex-bytes = "0.1" -ntex-util = "2.5" -ntex-service = "3.3.3" +ntex-util = "2.8" +ntex-service = "3.4" ntex-rt = "0.4" bitflags = "2" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index e2c1de36..90a798aa 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -1,7 +1,7 @@ //! Framed transport dispatcher #![allow(clippy::let_underscore_future)] use std::task::{ready, Context, Poll}; -use std::{cell::Cell, future::poll_fn, future::Future, pin::Pin, rc::Rc}; +use std::{cell::Cell, future::Future, pin::Pin, rc::Rc}; use ntex_codec::{Decoder, Encoder}; use ntex_service::{IntoService, Pipeline, PipelineBinding, PipelineCall, Service}; @@ -131,7 +131,6 @@ bitflags::bitflags! { const KA_ENABLED = 0b0000100; const KA_TIMEOUT = 0b0001000; const READ_TIMEOUT = 0b0010000; - const READY_TASK = 0b1000000; } } @@ -284,12 +283,6 @@ where } } - // ready task - if slf.flags.contains(Flags::READY_TASK) { - slf.flags.insert(Flags::READY_TASK); - ntex_rt::spawn(not_ready(slf.shared.clone())); - } - loop { match slf.st { DispatcherState::Processing => { @@ -628,30 +621,6 @@ where } } -async fn not_ready(slf: Rc>) -where - S: Service, Response = Option>> + 'static, - U: Encoder + Decoder + 'static, -{ - let pl = slf.service.clone(); - loop { - if !pl.is_shutdown() { - if let Err(err) = poll_fn(|cx| pl.poll_ready(cx)).await { - log::trace!("{}: Service readiness check failed, stopping", slf.io.tag()); - slf.error.set(Some(DispatcherError::Service(err))); - break; - } - if !pl.is_shutdown() { - poll_fn(|cx| pl.poll_not_ready(cx)).await; - slf.ready.set(false); - slf.io.wake(); - continue; - } - } - break; - } -} - #[cfg(test)] mod tests { use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex}; @@ -902,8 +871,6 @@ mod tests { Err("test") } - async fn not_ready(&self) {} - async fn call( &self, _: DispatchItem, diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index c01352e4..457cf588 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.6.0] - 2024-12-04 + +* Use updated Service trait + ## [2.5.0] - 2024-11-04 * Use updated Service trait diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index 4ada3557..65aec6c7 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.5.0" +version = "2.6.0" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -18,9 +18,9 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1" ntex-net = "2" -ntex-service = "3.3" +ntex-service = "3.4" ntex-rt = "0.4" -ntex-util = "2.5" +ntex-util = "2.8" async-channel = "2" async-broadcast = "0.7" diff --git a/ntex-server/src/net/service.rs b/ntex-server/src/net/service.rs index 4be6c828..70e9c5e3 100644 --- a/ntex-server/src/net/service.rs +++ b/ntex-server/src/net/service.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::poll_fn, future::Future, pin::Pin, task::Poll}; +use std::{fmt, task::Context}; use ntex_bytes::{Pool, PoolRef}; use ntex_net::Io; @@ -170,27 +170,11 @@ impl Service for StreamServiceImpl { } #[inline] - async fn not_ready(&self) { - if self.conns.is_available() { - let mut futs: Vec<_> = self - .services - .iter() - .map(|s| Box::pin(s.not_ready())) - .collect(); - - ntex_util::future::select( - self.conns.unavailable(), - poll_fn(move |cx| { - for f in &mut futs { - if Pin::new(f).poll(cx).is_ready() { - return Poll::Ready(()); - } - } - Poll::Pending - }), - ) - .await; + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + for svc in &self.services { + svc.poll(cx)?; } + Ok(()) } async fn shutdown(&self) { diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 53d3b3cf..b6bc5503 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [3.4.0] - 2024-12-04 + +* Added Service::poll() method + ## [3.3.3] - 2024-11-10 * Add Pipeline::is_shutdown() helper diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index f23fccd9..f338931d 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "3.3.3" +version = "3.4.0" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index 494a8072..2e9e9f2f 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -31,8 +31,9 @@ where } #[inline] - async fn not_ready(&self) { - util::select(self.svc1.not_ready(), self.svc2.not_ready()).await + fn poll(&self, cx: &mut std::task::Context<'_>) -> Result<(), Self::Error> { + self.svc1.poll(cx)?; + self.svc2.poll(cx) } #[inline] @@ -88,8 +89,8 @@ where #[cfg(test)] mod tests { - use ntex_util::time; - use std::{cell::Cell, rc::Rc}; + use ntex_util::future::lazy; + use std::{cell::Cell, rc::Rc, task::Context}; use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx}; @@ -105,9 +106,9 @@ mod tests { Ok(()) } - async fn not_ready(&self) { + fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> { self.0.set(self.0.get() + 1); - std::future::pending().await + Ok(()) } async fn call( @@ -135,9 +136,9 @@ mod tests { Ok(()) } - async fn not_ready(&self) { + fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> { self.0.set(self.0.get() + 1); - std::future::pending().await + Ok(()) } async fn call( @@ -165,11 +166,7 @@ mod tests { assert_eq!(res, Ok(())); assert_eq!(cnt.get(), 2); - let srv2 = srv.clone(); - ntex::rt::spawn(async move { - let _ = srv2.not_ready().await; - }); - time::sleep(time::Millis(25)).await; + lazy(|cx| srv.clone().poll(cx)).await.unwrap(); assert_eq!(cnt.get(), 4); srv.shutdown().await; diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 43640e0c..84fc153a 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -113,7 +113,7 @@ where (self.f)(req, self.service.clone()).await } - crate::forward_notready!(service); + crate::forward_poll!(service); crate::forward_shutdown!(service); } @@ -205,7 +205,8 @@ where #[cfg(test)] mod tests { - use std::{cell::Cell, rc::Rc}; + use ntex_util::future::lazy; + use std::{cell::Cell, rc::Rc, task::Context}; use super::*; use crate::{chain, chain_factory, fn_factory}; @@ -221,8 +222,9 @@ mod tests { Ok(()) } - async fn not_ready(&self) { + fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> { self.0.set(self.0.get() + 1); + Ok(()) } async fn shutdown(&self) { @@ -253,7 +255,7 @@ mod tests { assert_eq!(srv.ready().await, Ok::<_, Err>(())); - srv.not_ready().await; + lazy(|cx| srv.poll(cx)).await.unwrap(); assert_eq!(cnt_sht.get(), 1); srv.shutdown().await; diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index 7e63fbea..6998ce61 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::Future, pin::Pin}; +use std::{fmt, future::Future, pin::Pin, task::Context}; use crate::ctx::{ServiceCtx, WaitersRef}; @@ -54,8 +54,6 @@ trait ServiceObj { waiters: &'a WaitersRef, ) -> BoxFuture<'a, (), Self::Error>; - fn not_ready<'a>(&'a self) -> Pin + 'a>>; - fn call<'a>( &'a self, req: Req, @@ -64,6 +62,8 @@ trait ServiceObj { ) -> BoxFuture<'a, Self::Response, Self::Error>; fn shutdown<'a>(&'a self) -> Pin + 'a>>; + + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error>; } impl ServiceObj for S @@ -83,11 +83,6 @@ where Box::pin(async move { ServiceCtx::<'a, S>::new(idx, waiters).ready(self).await }) } - #[inline] - fn not_ready<'a>(&'a self) -> Pin + 'a>> { - Box::pin(crate::Service::not_ready(self)) - } - #[inline] fn shutdown<'a>(&'a self) -> Pin + 'a>> { Box::pin(crate::Service::shutdown(self)) @@ -106,6 +101,11 @@ where .await }) } + + #[inline] + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + crate::Service::poll(self, cx) + } } trait ServiceFactoryObj { @@ -158,11 +158,6 @@ where self.0.ready(idx, waiters).await } - #[inline] - async fn not_ready(&self) { - self.0.not_ready().await - } - #[inline] async fn shutdown(&self) { self.0.shutdown().await @@ -173,6 +168,11 @@ where let (idx, waiters) = ctx.inner(); self.0.call(req, idx, waiters).await } + + #[inline] + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + self.0.poll(cx) + } } impl crate::ServiceFactory diff --git a/ntex-service/src/chain.rs b/ntex-service/src/chain.rs index f75c9731..836d88fa 100644 --- a/ntex-service/src/chain.rs +++ b/ntex-service/src/chain.rs @@ -171,6 +171,7 @@ impl, Req> Service for ServiceChain { type Response = Svc::Response; type Error = Svc::Error; + crate::forward_poll!(service); crate::forward_ready!(service); crate::forward_shutdown!(service); diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index ed4de4ee..dbac0716 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -21,9 +21,6 @@ impl WaitersRef { pub(crate) fn new() -> (u32, Self) { let mut waiters = slab::Slab::new(); - // first insert for wake ups from services - let _ = waiters.insert(None); - ( waiters.insert(Default::default()) as u32, WaitersRef { @@ -68,18 +65,6 @@ impl WaitersRef { self.get()[idx as usize] = Some(cx.waker().clone()); } - pub(crate) fn register_unready(&self, cx: &mut Context<'_>) { - self.get()[0] = Some(cx.waker().clone()); - } - - pub(crate) fn notify_unready(&self) { - if let Some(item) = self.get().get_mut(0) { - if let Some(waker) = item.take() { - waker.wake(); - } - } - } - pub(crate) fn notify(&self) { let wakers = self.get_wakers(); if !wakers.is_empty() { diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 67253013..6e3babd0 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -6,7 +6,7 @@ unreachable_pub, missing_debug_implementations )] -use std::rc::Rc; +use std::{rc::Rc, task::Context}; mod and_then; mod apply; @@ -118,7 +118,8 @@ pub trait Service { Ok(()) } - #[inline] + #[deprecated] + #[doc(hidden)] /// Returns when the service is not able to process requests. /// /// Unlike the "ready()" method, the "not_ready()" method returns @@ -136,6 +137,15 @@ pub trait Service { /// Returns when the service is properly shutdowns. async fn shutdown(&self) {} + #[inline] + /// Polls service from the current task. + /// + /// Service may require to execute asynchronous computation or + /// maintain asynchronous state. + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + Ok(()) + } + #[inline] /// Map this service's output to a different type, returning a new service of the resulting type. /// @@ -259,8 +269,8 @@ where } #[inline] - async fn not_ready(&self) { - (**self).not_ready().await + fn poll(&self, cx: &mut Context<'_>) -> Result<(), S::Error> { + (**self).poll(cx) } #[inline] @@ -290,11 +300,6 @@ where ctx.ready(&**self).await } - #[inline] - async fn not_ready(&self) { - (**self).not_ready().await - } - #[inline] async fn shutdown(&self) { (**self).shutdown().await @@ -308,6 +313,11 @@ where ) -> Result { ctx.call_nowait(&**self, request).await } + + #[inline] + fn poll(&self, cx: &mut Context<'_>) -> Result<(), S::Error> { + (**self).poll(cx) + } } impl ServiceFactory for Rc diff --git a/ntex-service/src/macros.rs b/ntex-service/src/macros.rs index d951775d..846efa8d 100644 --- a/ntex-service/src/macros.rs +++ b/ntex-service/src/macros.rs @@ -11,11 +11,6 @@ macro_rules! forward_ready { .await .map_err(::core::convert::Into::into) } - - #[inline] - async fn not_ready(&self) { - self.$field.not_ready().await - } }; ($field:ident, $err:expr) => { #[inline] @@ -25,21 +20,28 @@ macro_rules! forward_ready { ) -> Result<(), Self::Error> { ctx.ready(&self.$field).await.map_err($err) } - - #[inline] - async fn not_ready(&self) { - self.$field.not_ready().await - } }; } /// An implementation of [`not_ready`] that forwards not_ready call to a field. #[macro_export] macro_rules! forward_notready { + ($field:ident) => {}; +} + +/// An implementation of [`poll`] that forwards poll call to a field. +#[macro_export] +macro_rules! forward_poll { ($field:ident) => { #[inline] - async fn not_ready(&self) { - self.$field.not_ready().await + fn poll(&self, cx: &mut std::task::Context<'_>) -> Result<(), Self::Error> { + self.$field.poll(cx).map_err(From::from) + } + }; + ($field:ident, $err:expr) => { + #[inline] + fn poll(&self, cx: &mut std::task::Context<'_>) -> Result<(), Self::Error> { + self.$field.poll(cx).map_err($err) } }; } diff --git a/ntex-service/src/map.rs b/ntex-service/src/map.rs index 7d4cb094..3f1e37ff 100644 --- a/ntex-service/src/map.rs +++ b/ntex-service/src/map.rs @@ -62,6 +62,7 @@ where type Error = A::Error; crate::forward_ready!(service); + crate::forward_poll!(service); crate::forward_shutdown!(service); #[inline] diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index 544b0f7e..97279c3d 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -1,4 +1,4 @@ -use std::{fmt, marker::PhantomData}; +use std::{fmt, marker::PhantomData, task::Context}; use super::{Service, ServiceCtx, ServiceFactory}; @@ -67,6 +67,11 @@ where ctx.ready(&self.service).await.map_err(&self.f) } + #[inline] + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + self.service.poll(cx).map_err(&self.f) + } + #[inline] async fn call( &self, @@ -77,7 +82,6 @@ where } crate::forward_shutdown!(service); - crate::forward_notready!(service); } /// Factory for the `map_err` combinator, changing the type of a new diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 243ce885..8ff88942 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -1,4 +1,4 @@ -use std::{cell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; +use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll}; use crate::{ctx::WaitersRef, Service, ServiceCtx}; @@ -50,13 +50,14 @@ impl Pipeline { .await } - #[inline] + #[doc(hidden)] + #[deprecated] /// Returns when the pipeline is not able to process requests. pub async fn not_ready(&self) where S: Service, { - self.state.svc.not_ready().await + std::future::pending().await } #[inline] @@ -125,6 +126,14 @@ impl Pipeline { self.state.svc.shutdown().await } + #[inline] + pub fn poll(&self, cx: &mut Context<'_>) -> Result<(), S::Error> + where + S: Service, + { + self.state.svc.poll(cx) + } + #[inline] /// Get current pipeline. pub fn bind(self) -> PipelineBinding @@ -175,7 +184,6 @@ where { pl: Pipeline, st: cell::UnsafeCell>, - not_ready: cell::UnsafeCell, } enum State { @@ -184,11 +192,6 @@ enum State { Shutdown(Pin + 'static>>), } -enum StateNotReady { - New, - Readiness(Pin>>), -} - impl PipelineBinding where S: Service + 'static, @@ -198,7 +201,6 @@ where PipelineBinding { pl, st: cell::UnsafeCell::new(State::New), - not_ready: cell::UnsafeCell::new(StateNotReady::New), } } @@ -214,6 +216,11 @@ where self.pl.clone() } + #[inline] + pub fn poll(&self, cx: &mut Context<'_>) -> Result<(), S::Error> { + self.pl.poll(cx) + } + #[inline] /// Returns `Ready` when the pipeline is able to process requests. /// @@ -230,6 +237,7 @@ where let fut = Box::pin(CheckReadiness { fut: None, f: ready, + _t: marker::PhantomData, pl, }); *st = State::Readiness(fut); @@ -240,27 +248,12 @@ where } } + #[doc(hidden)] + #[deprecated] #[inline] /// Returns when the pipeline is not able to process requests. - pub fn poll_not_ready(&self, cx: &mut Context<'_>) -> Poll<()> { - let st = unsafe { &mut *self.not_ready.get() }; - - match st { - StateNotReady::New => { - // SAFETY: `fut` has same lifetime same as lifetime of `self.pl`. - // Pipeline::svc is heap allocated(Rc), and it is being kept alive until - // `self` is alive - let pl: &'static Pipeline = unsafe { std::mem::transmute(&self.pl) }; - let fut = Box::pin(CheckUnReadiness { - fut: None, - f: not_ready, - pl, - }); - *st = StateNotReady::Readiness(fut); - self.poll_not_ready(cx) - } - StateNotReady::Readiness(ref mut fut) => Pin::new(fut).poll(cx), - } + pub fn poll_not_ready(&self, _: &mut Context<'_>) -> Poll<()> { + Poll::Pending } #[inline] @@ -276,7 +269,6 @@ where let pl: &'static Pipeline = unsafe { std::mem::transmute(&self.pl) }; *st = State::Shutdown(Box::pin(async move { pl.shutdown().await })); pl.state.waiters.shutdown(); - pl.state.waiters.notify_unready(); self.poll_shutdown(cx) } State::Shutdown(ref mut fut) => Pin::new(fut).poll(cx), @@ -345,7 +337,6 @@ where Self { pl: self.pl.clone(), st: cell::UnsafeCell::new(State::New), - not_ready: cell::UnsafeCell::new(StateNotReady::New), } } } @@ -404,23 +395,16 @@ where .ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref())) } -fn not_ready(pl: &'static Pipeline) -> impl Future -where - S: Service, - R: 'static, -{ - pl.state.svc.not_ready() -} - -struct CheckReadiness { +struct CheckReadiness + 'static, R, F, Fut> { f: F, fut: Option, pl: &'static Pipeline, + _t: marker::PhantomData, } -impl Unpin for CheckReadiness {} +impl, R, F, Fut> Unpin for CheckReadiness {} -impl Drop for CheckReadiness { +impl, R, F, Fut> Drop for CheckReadiness { fn drop(&mut self) { // future fot dropped during polling, we must notify other waiters if self.fut.is_some() { @@ -429,16 +413,19 @@ impl Drop for CheckReadiness { } } -impl Future for CheckReadiness +impl Future for CheckReadiness where + S: Service, F: Fn(&'static Pipeline) -> Fut, - Fut: Future, + Fut: Future>, { - type Output = T; + type Output = Result<(), S::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut slf = self.as_mut(); + slf.pl.poll(cx)?; + if slf.pl.state.waiters.can_check(slf.pl.index, cx) { if slf.fut.is_none() { slf.fut = Some((slf.f)(slf.pl)); @@ -460,43 +447,3 @@ where } } } - -struct CheckUnReadiness { - f: F, - fut: Option, - pl: &'static Pipeline, -} - -impl Unpin for CheckUnReadiness {} - -impl Future for CheckUnReadiness -where - F: Fn(&'static Pipeline) -> Fut, - Fut: Future, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let mut slf = self.as_mut(); - - if slf.fut.is_none() { - slf.fut = Some((slf.f)(slf.pl)); - } - let fut = slf.fut.as_mut().unwrap(); - match unsafe { Pin::new_unchecked(fut) }.poll(cx) { - Poll::Pending => { - if slf.pl.state.waiters.is_shutdown() { - Poll::Ready(()) - } else { - slf.pl.state.waiters.register_unready(cx); - Poll::Pending - } - } - Poll::Ready(()) => { - let _ = slf.fut.take(); - slf.pl.state.waiters.notify(); - Poll::Ready(()) - } - } - } -} diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 2b733698..6de8a14e 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -31,8 +31,9 @@ where } #[inline] - async fn not_ready(&self) { - util::select(self.svc1.not_ready(), self.svc2.not_ready()).await + fn poll(&self, cx: &mut std::task::Context<'_>) -> Result<(), Self::Error> { + self.svc1.poll(cx)?; + self.svc2.poll(cx) } #[inline] @@ -91,8 +92,8 @@ where #[cfg(test)] mod tests { - use ntex_util::time; - use std::{cell::Cell, rc::Rc}; + use ntex_util::future::lazy; + use std::{cell::Cell, rc::Rc, task::Context}; use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx}; @@ -108,9 +109,9 @@ mod tests { Ok(()) } - async fn not_ready(&self) { + fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> { self.0.set(self.0.get() + 1); - std::future::pending().await + Ok(()) } async fn call( @@ -141,9 +142,9 @@ mod tests { Ok(()) } - async fn not_ready(&self) { + fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> { self.0.set(self.0.get() + 1); - std::future::pending().await + Ok(()) } async fn call( @@ -173,11 +174,7 @@ mod tests { assert_eq!(res, Ok(())); assert_eq!(cnt.get(), 2); - let srv2 = srv.clone(); - ntex::rt::spawn(async move { - let _ = srv2.not_ready().await; - }); - time::sleep(time::Millis(25)).await; + lazy(|cx| srv.clone().poll(cx)).await.unwrap(); assert_eq!(cnt.get(), 4); srv.shutdown().await; diff --git a/ntex-service/src/util.rs b/ntex-service/src/util.rs index 4c041a15..5421b96c 100644 --- a/ntex-service/src/util.rs +++ b/ntex-service/src/util.rs @@ -59,24 +59,3 @@ where }) .await } - -/// Waits for either one of two differently-typed futures to complete. -pub(crate) async fn select(fut1: A, fut2: B) -> A::Output -where - A: Future, - B: Future, -{ - let mut fut1 = pin::pin!(fut1); - let mut fut2 = pin::pin!(fut2); - - poll_fn(|cx| { - if let Poll::Ready(item) = Pin::new(&mut fut1).poll(cx) { - return Poll::Ready(item); - } - if let Poll::Ready(item) = Pin::new(&mut fut2).poll(cx) { - return Poll::Ready(item); - } - Poll::Pending - }) - .await -} diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index bf613a24..3cad994d 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.8.0] - 2024-12-04 + +* Use updated Service trait + ## [2.7.0] - 2024-12-03 * Add time::Sleep::elapse() method diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 611c30c8..b43366f3 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "2.7.0" +version = "2.8.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,7 +16,7 @@ name = "ntex_util" path = "src/lib.rs" [dependencies] -ntex-service = "3.3" +ntex-service = "3.4" ntex-rt = "0.4" bitflags = "2" fxhash = "0.2" diff --git a/ntex-util/src/services/buffer.rs b/ntex-util/src/services/buffer.rs index 9bf1c657..5dae648c 100644 --- a/ntex-util/src/services/buffer.rs +++ b/ntex-util/src/services/buffer.rs @@ -70,7 +70,6 @@ where fn create(&self, service: S) -> Self::Service { BufferService { service: Pipeline::new(service).bind(), - service_pending: Cell::new(true), size: self.buf_size, ready: Cell::new(false), buf: RefCell::new(VecDeque::with_capacity(self.buf_size)), @@ -113,7 +112,6 @@ impl std::error::Error for BufferService pub struct BufferService> { size: usize, ready: Cell, - service_pending: Cell, service: PipelineBinding, buf: RefCell>>>, next_call: RefCell>>, @@ -131,7 +129,6 @@ where Self { size, service: Pipeline::new(service).bind(), - service_pending: Cell::new(true), ready: Cell::new(false), buf: RefCell::new(VecDeque::with_capacity(size)), next_call: RefCell::default(), @@ -158,7 +155,6 @@ where size: self.size, ready: Cell::new(false), service: self.service.clone(), - service_pending: Cell::new(false), buf: RefCell::new(VecDeque::with_capacity(self.size)), next_call: RefCell::default(), cancel_on_shutdown: self.cancel_on_shutdown, @@ -178,7 +174,6 @@ where .field("cancel_on_shutdown", &self.cancel_on_shutdown) .field("ready", &self.ready) .field("service", &self.service) - .field("service_pending", &self.service_pending) .field("buf", &self.buf) .field("next_call", &self.next_call) .finish() @@ -208,18 +203,14 @@ where if buffer.len() < self.size { // buffer next request self.ready.set(false); - self.service_pending.set(false); Poll::Ready(Ok(())) } else { log::trace!("Buffer limit exceeded"); // service is not ready - self.service_pending.set(true); let _ = self.readiness.take().map(|w| w.wake()); Poll::Pending } } else { - self.service_pending.set(false); - while let Some(sender) = buffer.pop_front() { let (next_call_tx, next_call_rx) = oneshot::channel(); if sender.send(next_call_tx).is_err() @@ -240,19 +231,6 @@ where .await } - async fn not_ready(&self) { - let fut = poll_fn(|cx| { - if self.service_pending.get() { - Poll::Ready(()) - } else { - self.readiness.set(Some(cx.waker().clone())); - Poll::Pending - } - }); - - crate::future::select(fut, self.service.get_ref().not_ready()).await; - } - async fn shutdown(&self) { // hold advancement until the last released task either makes a call or is dropped let next_call = self.next_call.borrow_mut().take(); @@ -318,6 +296,8 @@ where Ok(self.service.call(req).await?) } } + + ntex_service::forward_poll!(service); } #[cfg(test)] @@ -373,7 +353,6 @@ mod tests { let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone()).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); let srv1 = srv.clone(); ntex::rt::spawn(async move { @@ -382,7 +361,6 @@ mod tests { crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 0); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); let srv1 = srv.clone(); ntex::rt::spawn(async move { @@ -391,12 +369,10 @@ mod tests { crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 0); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(())); inner.ready.set(true); inner.waker.wake(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 1); @@ -404,7 +380,6 @@ mod tests { inner.ready.set(true); inner.waker.wake(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 2); @@ -417,12 +392,10 @@ mod tests { let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone()))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); let _ = srv.call(()).await; assert_eq!(inner.count.get(), 1); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready()); let err = BufferServiceError::from("test"); diff --git a/ntex-util/src/services/inflight.rs b/ntex-util/src/services/inflight.rs index 692a1be7..a02a4f88 100644 --- a/ntex-util/src/services/inflight.rs +++ b/ntex-util/src/services/inflight.rs @@ -71,13 +71,6 @@ where } } - #[inline] - async fn not_ready(&self) { - if self.count.is_available() { - crate::future::select(self.count.unavailable(), self.service.not_ready()).await; - } - } - #[inline] async fn call( &self, @@ -88,6 +81,7 @@ where ctx.call(&self.service, req).await } + ntex_service::forward_poll!(service); ntex_service::forward_shutdown!(service); } @@ -118,7 +112,6 @@ mod tests { let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); let srv2 = srv.clone(); ntex::rt::spawn(async move { @@ -126,12 +119,10 @@ mod tests { }); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(())); let _ = tx.send(()); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); srv.shutdown().await; } diff --git a/ntex-util/src/services/keepalive.rs b/ntex-util/src/services/keepalive.rs index c1304b21..2b9b4ad5 100644 --- a/ntex-util/src/services/keepalive.rs +++ b/ntex-util/src/services/keepalive.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, convert::Infallible, fmt, marker, time}; +use std::{cell::Cell, convert::Infallible, fmt, marker, task::Context, task::Poll, time}; use ntex_service::{Service, ServiceCtx, ServiceFactory}; @@ -119,22 +119,26 @@ where } } - async fn not_ready(&self) { - loop { - self.sleep.wait().await; - - let now = now(); - let expire = self.expire.get() + time::Duration::from(self.dur); - if expire <= now { - return; - } else { - let expire = expire - now; - self.sleep - .reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX))); + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + match self.sleep.poll_elapsed(cx) { + Poll::Ready(_) => { + let now = now(); + let expire = self.expire.get() + time::Duration::from(self.dur); + if expire <= now { + Err((self.f)()) + } else { + let expire = expire - now; + self.sleep + .reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX))); + let _ = self.sleep.poll_elapsed(cx); + Ok(()) + } } + Poll::Pending => Ok(()), } } + #[inline] async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result { self.expire.set(now()); Ok(req) @@ -162,13 +166,11 @@ mod tests { assert_eq!(service.call(1usize).await, Ok(1usize)); assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); - assert!(!lazy(|cx| service.poll_not_ready(cx)).await.is_ready()); sleep(Millis(500)).await; assert_eq!( lazy(|cx| service.poll_ready(cx)).await, Poll::Ready(Err(TestErr)) ); - assert!(lazy(|cx| service.poll_not_ready(cx)).await.is_ready()); } } diff --git a/ntex-util/src/services/onerequest.rs b/ntex-util/src/services/onerequest.rs index 5e9e43c4..ab88eb51 100644 --- a/ntex-util/src/services/onerequest.rs +++ b/ntex-util/src/services/onerequest.rs @@ -65,24 +65,6 @@ where ctx.ready(&self.service).await } - #[inline] - async fn not_ready(&self) { - if self.ready.get() { - crate::future::select( - poll_fn(|cx| { - self.waker.register(cx.waker()); - if self.ready.get() { - Poll::Pending - } else { - Poll::Ready(()) - } - }), - self.service.not_ready(), - ) - .await; - } - } - #[inline] async fn call( &self, @@ -90,7 +72,6 @@ where ctx: ServiceCtx<'_, Self>, ) -> Result { self.ready.set(false); - self.waker.wake(); let result = ctx.call(&self.service, req).await; self.ready.set(true); @@ -98,6 +79,7 @@ where result } + ntex_service::forward_poll!(service); ntex_service::forward_shutdown!(service); } @@ -127,7 +109,6 @@ mod tests { let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); let srv2 = srv.clone(); ntex::rt::spawn(async move { @@ -135,12 +116,10 @@ mod tests { }); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Ready(())); let _ = tx.send(()); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending); srv.shutdown().await; } diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index 5c6aac4f..b4ad0a81 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -140,6 +140,7 @@ where } } + ntex_service::forward_poll!(service, TimeoutError::Service); ntex_service::forward_ready!(service, TimeoutError::Service); ntex_service::forward_shutdown!(service); } diff --git a/ntex-util/src/services/variant.rs b/ntex-util/src/services/variant.rs index 7d498c3b..08b71503 100644 --- a/ntex-util/src/services/variant.rs +++ b/ntex-util/src/services/variant.rs @@ -143,23 +143,10 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, }).await } - async fn not_ready(&self) { - use std::{future::Future, pin::Pin}; - - let mut fut1 = ::std::pin::pin!(self.V1.not_ready()); - $(let mut $T = ::std::pin::pin!(self.$T.not_ready());)+ - - ::std::future::poll_fn(|cx| { - if Pin::new(&mut fut1).poll(cx).is_ready() { - return Poll::Ready(()) - } - - $(if Pin::new(&mut $T).poll(cx).is_ready() { - return Poll::Ready(()); - })+ - - Poll::Pending - }).await + fn poll(&self, cx: &mut std::task::Context<'_>) -> Result<(), Self::Error> { + self.V1.poll(cx)?; + $(self.$T.poll(cx)?;)+ + Ok(()) } async fn shutdown(&self) { @@ -253,7 +240,6 @@ variant_impl_and!(VariantFactory7, VariantFactory8, V8, V8R, v8, (V2, V3, V4, V5 #[cfg(test)] mod tests { use ntex_service::fn_factory; - use std::{future::poll_fn, future::Future, pin}; use super::*; @@ -307,16 +293,7 @@ mod tests { let service = factory.pipeline(&()).await.unwrap().clone(); assert!(format!("{:?}", service).contains("Variant")); - let mut f = pin::pin!(service.not_ready()); - let _ = poll_fn(|cx| { - if pin::Pin::new(&mut f).poll(cx).is_pending() { - Poll::Ready(()) - } else { - Poll::Pending - } - }) - .await; - + assert!(crate::future::lazy(|cx| service.poll(cx)).await.is_ok()); assert!(service.ready().await.is_ok()); service.shutdown().await; diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index b5c796b0..24eb9de4 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.10.0] - 2024-12-04 + +* Use updated Service trait + ## [2.9.0] - 2024-11-30 * Fix handling unconsumed payload in h1 dispatcher #477 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 7794de3a..906d360e 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "2.9.0" +version = "2.10.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -64,14 +64,14 @@ brotli = ["dep:brotli2"] ntex-codec = "0.6" ntex-http = "0.1.13" ntex-router = "0.5" -ntex-service = "3.3" +ntex-service = "3.4" ntex-macros = "0.1" -ntex-util = "2.5" +ntex-util = "2.8" ntex-bytes = "0.1.27" -ntex-server = "2.5" +ntex-server = "2.6" ntex-h2 = "1.4" ntex-rt = "0.4.22" -ntex-io = "2.8" +ntex-io = "2.9" ntex-net = "2.4" ntex-tls = "2.3" diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index f0982dc9..122f2647 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -1,11 +1,11 @@ -use std::{fmt, time::Duration}; +use std::{fmt, task::Context, time::Duration}; use ntex_h2::{self as h2}; use crate::connect::{Connect as TcpConnect, Connector as TcpConnector}; use crate::service::{apply_fn, boxed, Service, ServiceCtx}; use crate::time::{Millis, Seconds}; -use crate::util::{join, select, timeout::TimeoutError, timeout::TimeoutService}; +use crate::util::{join, timeout::TimeoutError, timeout::TimeoutService}; use crate::{http::Uri, io::IoBoxed}; use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect}; @@ -285,12 +285,12 @@ where } #[inline] - async fn not_ready(&self) { + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + self.tcp_pool.poll(cx)?; if let Some(ref ssl_pool) = self.ssl_pool { - select(self.tcp_pool.not_ready(), ssl_pool.not_ready()).await; - } else { - self.tcp_pool.not_ready().await + ssl_pool.poll(cx)?; } + Ok(()) } async fn shutdown(&self) { diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 7b0210c6..a56d4898 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -123,8 +123,8 @@ where } #[inline] - async fn not_ready(&self) { - self.connector.not_ready().await + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + self.connector.poll(cx) } async fn shutdown(&self) { diff --git a/ntex/src/http/h1/service.rs b/ntex/src/http/h1/service.rs index e566b9ca..63c3b85b 100644 --- a/ntex/src/http/h1/service.rs +++ b/ntex/src/http/h1/service.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc}; +use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc, task::Context}; use crate::http::body::MessageBody; use crate::http::config::{DispatcherConfig, ServiceConfig}; @@ -6,7 +6,7 @@ use crate::http::error::{DispatchError, ResponseError}; use crate::http::{request::Request, response::Response}; use crate::io::{types, Filter, Io, IoRef}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use crate::{channel::oneshot, util::join, util::select, util::HashSet}; +use crate::{channel::oneshot, util::join, util::HashSet}; use super::control::{Control, ControlAck}; use super::default::DefaultControlService; @@ -230,10 +230,14 @@ where }) } - #[inline] - async fn not_ready(&self) { + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { let cfg = self.config.as_ref(); - select(cfg.control.not_ready(), cfg.service.not_ready()).await; + cfg.control + .poll(cx) + .map_err(|e| DispatchError::Control(Box::new(e)))?; + cfg.service + .poll(cx) + .map_err(|e| DispatchError::Service(Box::new(e))) } async fn shutdown(&self) { diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index 00889942..2ca77f4e 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -1,5 +1,5 @@ use std::cell::{Cell, RefCell}; -use std::{error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc}; +use std::{error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc, task::Context}; use ntex_h2::{self as h2, frame::StreamId, server}; @@ -227,8 +227,11 @@ where } #[inline] - async fn not_ready(&self) { - self.config.service.not_ready().await; + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + self.config + .service + .poll(cx) + .map_err(|e| DispatchError::Service(Box::new(e))) } #[inline] diff --git a/ntex/src/http/service.rs b/ntex/src/http/service.rs index 42156e63..0e3305ad 100644 --- a/ntex/src/http/service.rs +++ b/ntex/src/http/service.rs @@ -1,8 +1,8 @@ -use std::{cell::Cell, cell::RefCell, error, fmt, marker, rc::Rc}; +use std::{cell::Cell, cell::RefCell, error, fmt, marker, rc::Rc, task::Context}; use crate::io::{types, Filter, Io, IoRef}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use crate::{channel::oneshot, util::join, util::select, util::HashSet}; +use crate::{channel::oneshot, util::join, util::HashSet}; use super::body::MessageBody; use super::builder::HttpServiceBuilder; @@ -312,12 +312,16 @@ where } #[inline] - async fn not_ready(&self) { + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { let cfg = self.config.as_ref(); - select(cfg.control.not_ready(), cfg.service.not_ready()).await; + cfg.control + .poll(cx) + .map_err(|e| DispatchError::Control(Box::new(e)))?; + cfg.service + .poll(cx) + .map_err(|e| DispatchError::Service(Box::new(e))) } - #[inline] async fn shutdown(&self) { self.config.shutdown(); diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index 6a6a02cb..6d208d9e 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -28,7 +28,7 @@ pub use ntex_macros::{rt_main as main, rt_test as test}; #[cfg(test)] pub(crate) use ntex_macros::rt_test2 as rt_test; -pub use ntex_service::{forward_ready, forward_shutdown}; +pub use ntex_service::{forward_poll, forward_ready, forward_shutdown}; pub mod http; pub mod web; diff --git a/ntex/src/web/app_service.rs b/ntex/src/web/app_service.rs index 7752c1c2..b4b0ff9d 100644 --- a/ntex/src/web/app_service.rs +++ b/ntex/src/web/app_service.rs @@ -1,11 +1,11 @@ -use std::{cell::RefCell, marker, rc::Rc}; +use std::{cell::RefCell, marker, rc::Rc, task::Context}; use crate::http::{Request, Response}; use crate::router::{Path, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::dev::ServiceChainFactory; use crate::service::{fn_service, Middleware, Service, ServiceCtx, ServiceFactory}; -use crate::util::{join, select, BoxFuture, Extensions}; +use crate::util::{join, BoxFuture, Extensions}; use super::config::AppConfig; use super::error::ErrorRenderer; @@ -202,6 +202,7 @@ where type Response = WebResponse; type Error = T::Error; + crate::forward_poll!(service); crate::forward_ready!(service); crate::forward_shutdown!(service); @@ -302,8 +303,9 @@ where } #[inline] - async fn not_ready(&self) { - select(self.filter.not_ready(), self.routing.not_ready()).await; + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + self.filter.poll(cx)?; + self.routing.poll(cx) } async fn call( diff --git a/ntex/src/web/middleware/compress.rs b/ntex/src/web/middleware/compress.rs index 3d8c1db7..dc848ab1 100644 --- a/ntex/src/web/middleware/compress.rs +++ b/ntex/src/web/middleware/compress.rs @@ -67,6 +67,7 @@ where type Response = WebResponse; type Error = S::Error; + crate::forward_poll!(service); crate::forward_ready!(service); crate::forward_shutdown!(service); diff --git a/ntex/src/web/middleware/defaultheaders.rs b/ntex/src/web/middleware/defaultheaders.rs index 5aa0461e..670361be 100644 --- a/ntex/src/web/middleware/defaultheaders.rs +++ b/ntex/src/web/middleware/defaultheaders.rs @@ -110,6 +110,7 @@ where type Response = WebResponse; type Error = S::Error; + crate::forward_poll!(service); crate::forward_ready!(service); crate::forward_shutdown!(service); diff --git a/ntex/src/web/middleware/logger.rs b/ntex/src/web/middleware/logger.rs index cc5bbfeb..cd99d5bb 100644 --- a/ntex/src/web/middleware/logger.rs +++ b/ntex/src/web/middleware/logger.rs @@ -139,6 +139,7 @@ where type Response = WebResponse; type Error = S::Error; + crate::forward_poll!(service); crate::forward_ready!(service); crate::forward_shutdown!(service); diff --git a/ntex/src/web/scope.rs b/ntex/src/web/scope.rs index 6e462cec..a3bc0458 100644 --- a/ntex/src/web/scope.rs +++ b/ntex/src/web/scope.rs @@ -1,11 +1,11 @@ -use std::{cell::RefCell, fmt, rc::Rc}; +use std::{cell::RefCell, fmt, rc::Rc, task::Context}; use crate::http::Response; use crate::router::{IntoPattern, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::{chain_factory, dev::ServiceChainFactory, IntoServiceFactory}; use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory}; -use crate::util::{join, select, Extensions}; +use crate::util::{join, Extensions}; use super::app::Filter; use super::config::ServiceConfig; @@ -495,10 +495,12 @@ where } #[inline] - async fn not_ready(&self) { - select(self.filter.not_ready(), self.routing.not_ready()).await; + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { + self.filter.poll(cx)?; + self.routing.poll(cx) } + #[inline] async fn call( &self, req: WebRequest, diff --git a/ntex/src/web/stack.rs b/ntex/src/web/stack.rs index e708102b..d8c8643f 100644 --- a/ntex/src/web/stack.rs +++ b/ntex/src/web/stack.rs @@ -73,6 +73,7 @@ where ctx.call(&self.svc, req).await.map_err(Into::into) } + crate::forward_poll!(svc); crate::forward_ready!(svc); crate::forward_shutdown!(svc); }