diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index bf9f9303..6d1256bf 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.0] - 2024-01-0x + +* Use "async fn" in trait for Service definition + ## [0.3.4] - 2023-11-06 * Add UnwindSafe trait on mpsc::Receiver #239 diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 45c84322..c49e645d 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "0.3.4" +version = "1.0.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] ntex-rt = "0.4.7" -ntex-service = "1.2.6" +ntex-service = "2.0.0" bitflags = "2.4" fxhash = "0.2.1" log = "0.4" @@ -29,6 +29,6 @@ pin-project-lite = "0.2.9" [dev-dependencies] ntex = { version = "0.7", features = ["tokio"] } -ntex-bytes = "0.1.18" +ntex-bytes = "0.1.21" ntex-macros = "0.1.3" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex-util/src/channel/condition.rs b/ntex-util/src/channel/condition.rs index 2e329734..45aa8b0f 100644 --- a/ntex-util/src/channel/condition.rs +++ b/ntex-util/src/channel/condition.rs @@ -1,8 +1,8 @@ use slab::Slab; -use std::{future::Future, pin::Pin, task::Context, task::Poll}; +use std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll}; use super::cell::Cell; -use crate::{future::poll_fn, task::LocalWaker}; +use crate::task::LocalWaker; /// Condition allows to notify multiple waiters at the same time #[derive(Clone, Debug)] diff --git a/ntex-util/src/channel/mpsc.rs b/ntex-util/src/channel/mpsc.rs index 3962ac49..e4b16b3e 100644 --- a/ntex-util/src/channel/mpsc.rs +++ b/ntex-util/src/channel/mpsc.rs @@ -1,13 +1,13 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. -use std::{ - collections::VecDeque, fmt, panic::UnwindSafe, pin::Pin, task::Context, task::Poll, -}; +use std::collections::VecDeque; +use std::future::poll_fn; +use std::{fmt, panic::UnwindSafe, pin::Pin, task::Context, task::Poll}; use futures_core::{FusedStream, Stream}; use futures_sink::Sink; use super::cell::{Cell, WeakCell}; -use crate::{future::poll_fn, task::LocalWaker}; +use crate::task::LocalWaker; /// Creates a unbounded in-memory channel with buffered storage. pub fn channel() -> (Sender, Receiver) { diff --git a/ntex-util/src/channel/oneshot.rs b/ntex-util/src/channel/oneshot.rs index 84771a5a..61cbd01e 100644 --- a/ntex-util/src/channel/oneshot.rs +++ b/ntex-util/src/channel/oneshot.rs @@ -1,8 +1,8 @@ //! A one-shot, futures-aware channel. -use std::{future::Future, pin::Pin, task::Context, task::Poll}; +use std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll}; use super::{cell::Cell, Canceled}; -use crate::{future::poll_fn, task::LocalWaker}; +use crate::task::LocalWaker; /// Creates a new futures-aware, one-shot channel. pub fn channel() -> (Sender, Receiver) { diff --git a/ntex-util/src/future/mod.rs b/ntex-util/src/future/mod.rs index 1049e5ce..6620dbd7 100644 --- a/ntex-util/src/future/mod.rs +++ b/ntex-util/src/future/mod.rs @@ -1,5 +1,5 @@ //! Utilities for futures -use std::{future::Future, mem, pin::Pin, task::Context, task::Poll}; +use std::{future::poll_fn, future::Future, mem, pin::Pin, task::Context, task::Poll}; pub use futures_core::{Stream, TryFuture}; pub use futures_sink::Sink; @@ -20,35 +20,6 @@ pub use self::select::select; /// you can't statically type your result or need to add some indirection. pub type BoxFuture<'a, T> = Pin + 'a>>; -/// Creates a new future wrapping around a function returning [`Poll`]. -/// -/// Polling the returned future delegates to the wrapped function. -pub fn poll_fn(f: F) -> impl Future -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - PollFn { f } -} - -/// Future for the [`poll_fn`] function. -#[must_use = "futures do nothing unless you `.await` or poll them"] -struct PollFn { - f: F, -} - -impl Unpin for PollFn {} - -impl Future for PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - (self.f)(cx) - } -} - /// Creates a future that resolves to the next item in the stream. pub async fn stream_recv(stream: &mut S) -> Option where diff --git a/ntex-util/src/services/buffer.rs b/ntex-util/src/services/buffer.rs index 571d26b8..6c91c956 100644 --- a/ntex-util/src/services/buffer.rs +++ b/ntex-util/src/services/buffer.rs @@ -1,11 +1,11 @@ //! Service that buffers incomming requests. use std::cell::{Cell, RefCell}; use std::task::{ready, Context, Poll}; -use std::{collections::VecDeque, fmt, future::Future, marker::PhantomData, pin::Pin}; +use std::{collections::VecDeque, fmt, marker::PhantomData}; -use ntex_service::{IntoService, Middleware, Service, ServiceCallToCall, ServiceCtx}; +use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; -use crate::channel::{oneshot, Canceled}; +use crate::channel::oneshot; /// Buffer - service factory for service that can buffer incoming request. /// @@ -79,6 +79,31 @@ where } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum BufferServiceError { + Service(E), + RequestCanceled, +} + +impl From for BufferServiceError { + fn from(err: E) -> Self { + BufferServiceError::Service(err) + } +} + +impl std::fmt::Display for BufferServiceError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BufferServiceError::Service(e) => std::fmt::Display::fmt(e, f), + BufferServiceError::RequestCanceled => { + f.write_str("buffer service request canceled") + } + } + } +} + +impl std::error::Error for BufferServiceError {} + /// Buffer service - service that can buffer incoming requests. /// /// Default number of buffered requests is 16 @@ -158,7 +183,6 @@ where { type Response = S::Response; type Error = BufferServiceError; - type Future<'f> = BufferServiceResponse<'f, R, S> where Self: 'f, R: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -196,30 +220,6 @@ where Poll::Ready(Ok(())) } - #[inline] - fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - if self.ready.get() { - self.ready.set(false); - BufferServiceResponse { - slf: self, - state: ResponseState::Running { - fut: ctx.call_nowait(&self.service, req), - }, - } - } else { - let (tx, rx) = oneshot::channel(); - self.buf.borrow_mut().push_back(tx); - - BufferServiceResponse { - slf: self, - state: ResponseState::WaitingForRelease { - rx, - call: Some(ctx.call(&self.service, req).advance_to_call()), - }, - } - } - } - fn poll_shutdown(&self, cx: &mut std::task::Context<'_>) -> Poll<()> { let mut buffer = self.buf.borrow_mut(); if self.cancel_on_shutdown { @@ -257,97 +257,41 @@ where self.service.poll_shutdown(cx) } -} -pin_project_lite::pin_project! { - #[doc(hidden)] - #[must_use = "futures do nothing unless polled"] - pub struct BufferServiceResponse<'f, R, S: Service> - { - #[pin] - state: ResponseState<'f, R, S>, - slf: &'f BufferService, - } -} + async fn call( + &self, + req: R, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + if self.ready.get() { + self.ready.set(false); + Ok(ctx.call_nowait(&self.service, req).await?) + } else { + let (tx, rx) = oneshot::channel(); + self.buf.borrow_mut().push_back(tx); -pin_project_lite::pin_project! { - #[project = ResponseStateProject] - enum ResponseState<'f, R, S: Service> - { - WaitingForRelease { rx: oneshot::Receiver>, call: Option> }, - WaitingForReady { tx: oneshot::Sender<()>, #[pin] call: ServiceCallToCall<'f, S, R> }, - Running { #[pin] fut: S::Future<'f> }, - } -} + // release + let _task_guard = rx.recv().await.map_err(|_| { + log::trace!("Buffered service request canceled"); + BufferServiceError::RequestCanceled + })?; -impl<'f, R, S> Future for BufferServiceResponse<'f, R, S> -where - S: Service, -{ - type Output = Result>; + // check service readiness + ctx.ready(&self.service).await?; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - match this.state.as_mut().project() { - ResponseStateProject::WaitingForRelease { rx, call } => { - match ready!(rx.poll_recv(cx)) { - Ok(tx) => { - let call = call.take().expect("always set in this state"); - this.state.set(ResponseState::WaitingForReady { tx, call }); - self.poll(cx) - } - Err(Canceled) => { - log::trace!("Buffered service request canceled"); - Poll::Ready(Err(BufferServiceError::RequestCanceled)) - } - } - } - ResponseStateProject::WaitingForReady { call, .. } => { - let fut = match ready!(call.poll(cx)) { - Ok(fut) => fut, - Err(err) => return Poll::Ready(Err(err.into())), - }; - - this.state.set(ResponseState::Running { fut }); - self.poll(cx) - } - ResponseStateProject::Running { fut } => fut.poll(cx).map_err(|e| e.into()), + // call service + Ok(ctx.call_nowait(&self.service, req).await?) } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum BufferServiceError { - Service(E), - RequestCanceled, -} - -impl From for BufferServiceError { - fn from(err: E) -> Self { - BufferServiceError::Service(err) - } -} - -impl std::fmt::Display for BufferServiceError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - BufferServiceError::Service(e) => std::fmt::Display::fmt(e, f), - BufferServiceError::RequestCanceled => { - f.write_str("buffer service request canceled") - } - } - } -} - -impl std::error::Error for BufferServiceError {} - #[cfg(test)] mod tests { use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory}; use std::{rc::Rc, task::Context, task::Poll, time::Duration}; use super::*; - use crate::future::{lazy, Ready}; + use crate::future::lazy; use crate::task::LocalWaker; #[derive(Clone)] @@ -362,7 +306,6 @@ mod tests { impl Service<()> for TestService { type Response = (); type Error = (); - type Future<'f> = Ready<(), ()> where Self: 'f; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.0.waker.register(cx.waker()); @@ -373,10 +316,10 @@ mod tests { } } - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { self.0.ready.set(false); self.0.count.set(self.0.count.get() + 1); - Ready::Ok(()) + Ok(()) } } diff --git a/ntex-util/src/services/inflight.rs b/ntex-util/src/services/inflight.rs index 8fc6754c..08fe2087 100644 --- a/ntex-util/src/services/inflight.rs +++ b/ntex-util/src/services/inflight.rs @@ -1,9 +1,9 @@ //! Service that limits number of in-flight async requests. -use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; +use std::{task::Context, task::Poll}; -use ntex_service::{IntoService, Middleware, Service, ServiceCall, ServiceCtx}; +use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; -use super::counter::{Counter, CounterGuard}; +use super::counter::Counter; /// InFlight - service factory for service that can limit number of in-flight /// async requests. @@ -62,7 +62,6 @@ where { type Response = T::Response; type Error = T::Error; - type Future<'f> = InFlightServiceResponse<'f, T, R> where Self: 'f, R: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -77,57 +76,35 @@ where } #[inline] - fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - InFlightServiceResponse { - fut: ctx.call(&self.service, req), - _guard: self.count.get(), - _t: PhantomData, - } + async fn call( + &self, + req: R, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + let _guard = self.count.get(); + ctx.call(&self.service, req).await } ntex_service::forward_poll_shutdown!(service); } -pin_project_lite::pin_project! { - #[doc(hidden)] - pub struct InFlightServiceResponse<'f, T: Service, R> - where T: 'f, R: 'f - { - #[pin] - fut: ServiceCall<'f, T, R>, - _guard: CounterGuard, - _t: PhantomData - } -} - -impl<'f, T: Service, R> Future for InFlightServiceResponse<'f, T, R> { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) - } -} - #[cfg(test)] mod tests { use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; use std::{cell::RefCell, task::Poll, time::Duration}; use super::*; - use crate::{channel::oneshot, future::lazy, future::BoxFuture}; + use crate::{channel::oneshot, future::lazy}; struct SleepService(oneshot::Receiver<()>); impl Service<()> for SleepService { type Response = (); type Error = (); - type Future<'f> = BoxFuture<'f, Result<(), ()>>; - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Box::pin(async move { - let _ = self.0.recv().await; - Ok::<_, ()>(()) - }) + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + let _ = self.0.recv().await; + Ok(()) } } diff --git a/ntex-util/src/services/keepalive.rs b/ntex-util/src/services/keepalive.rs index 87507e0b..109129c2 100644 --- a/ntex-util/src/services/keepalive.rs +++ b/ntex-util/src/services/keepalive.rs @@ -3,7 +3,6 @@ use std::{cell::Cell, convert::Infallible, fmt, marker, time::Duration, time::In use ntex_service::{Service, ServiceCtx, ServiceFactory}; -use crate::future::Ready; use crate::time::{now, sleep, Millis, Sleep}; /// KeepAlive service factory @@ -60,13 +59,13 @@ where { type Response = R; type Error = E; - type InitError = Infallible; + type Service = KeepAliveService; - type Future<'f> = Ready where Self: 'f, C: 'f; + type InitError = Infallible; #[inline] - fn create(&self, _: C) -> Self::Future<'_> { - Ready::Ok(KeepAliveService::new(self.ka, self.f.clone())) + async fn create(&self, _: C) -> Result { + Ok(KeepAliveService::new(self.ka, self.f.clone())) } } @@ -111,7 +110,6 @@ where { type Response = R; type Error = E; - type Future<'f> = Ready where Self: 'f, R: 'f; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { match self.sleep.poll_elapsed(cx) { @@ -132,9 +130,9 @@ where } } - fn call<'a>(&'a self, req: R, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result { self.expire.set(now()); - Ready::Ok(req) + Ok(req) } } diff --git a/ntex-util/src/services/onerequest.rs b/ntex-util/src/services/onerequest.rs index 5d78f848..11b1faf7 100644 --- a/ntex-util/src/services/onerequest.rs +++ b/ntex-util/src/services/onerequest.rs @@ -1,7 +1,7 @@ //! Service that limits number of in-flight async requests to 1. -use std::{cell::Cell, future::Future, pin::Pin, task::Context, task::Poll}; +use std::{cell::Cell, task::Context, task::Poll}; -use ntex_service::{IntoService, Middleware, Service, ServiceCall, ServiceCtx}; +use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; use crate::task::LocalWaker; @@ -49,7 +49,6 @@ where { type Response = T::Response; type Error = T::Error; - type Future<'f> = OneRequestServiceResponse<'f, T, R> where Self: 'f, R: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -64,62 +63,39 @@ where } #[inline] - fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call( + &self, + req: R, + ctx: ServiceCtx<'_, Self>, + ) -> Result { self.ready.set(false); - OneRequestServiceResponse { - fut: ctx.call(&self.service, req), - service: self, - } + let result = ctx.call(&self.service, req).await; + self.ready.set(true); + self.waker.wake(); + result } ntex_service::forward_poll_shutdown!(service); } -pin_project_lite::pin_project! { - #[doc(hidden)] - pub struct OneRequestServiceResponse<'f, T: Service, R> - where T: 'f, R: 'f - { - #[pin] - fut: ServiceCall<'f, T, R>, - service: &'f OneRequestService, - } -} - -impl<'f, T: Service, R> Future for OneRequestServiceResponse<'f, T, R> { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let result = self.as_mut().project().fut.poll(cx); - if result.is_ready() { - self.service.ready.set(true); - self.service.waker.wake(); - } - result - } -} - #[cfg(test)] mod tests { use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; use std::{cell::RefCell, task::Poll, time::Duration}; use super::*; - use crate::{channel::oneshot, future::lazy, future::BoxFuture}; + use crate::{channel::oneshot, future::lazy}; struct SleepService(oneshot::Receiver<()>); impl Service<()> for SleepService { type Response = (); type Error = (); - type Future<'f> = BoxFuture<'f, Result<(), ()>>; - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Box::pin(async move { - let _ = self.0.recv().await; - Ok::<_, ()>(()) - }) + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> { + let _ = self.0.recv().await; + Ok::<_, ()>(()) } } diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index 79dcc546..63768a8a 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -2,12 +2,12 @@ //! //! If the response does not complete within the specified timeout, the response //! will be aborted. -use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll}; +use std::{fmt, marker}; -use ntex_service::{IntoService, Middleware, Service, ServiceCall, ServiceCtx}; +use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; -use crate::future::Either; -use crate::time::{sleep, Millis, Sleep}; +use crate::future::{select, Either}; +use crate::time::{sleep, Millis}; /// Applies a timeout to requests. /// @@ -123,20 +123,21 @@ where { type Response = S::Response; type Error = TimeoutError; - type Future<'f> = Either, TimeoutServiceResponse2<'f, S, R>> where Self: 'f, R: 'f; - fn call<'a>(&'a self, request: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { + async fn call( + &self, + request: R, + ctx: ServiceCtx<'_, Self>, + ) -> Result { if self.timeout.is_zero() { - Either::Right(TimeoutServiceResponse2 { - fut: ctx.call(&self.service, request), - _t: marker::PhantomData, - }) + ctx.call(&self.service, request) + .await + .map_err(TimeoutError::Service) } else { - Either::Left(TimeoutServiceResponse { - fut: ctx.call(&self.service, request), - sleep: sleep(self.timeout), - _t: marker::PhantomData, - }) + match select(sleep(self.timeout), ctx.call(&self.service, request)).await { + Either::Left(_) => Err(TimeoutError::Timeout), + Either::Right(res) => res.map_err(TimeoutError::Service), + } } } @@ -144,72 +145,6 @@ where ntex_service::forward_poll_shutdown!(service); } -pin_project_lite::pin_project! { - /// `TimeoutService` response future - #[doc(hidden)] - #[must_use = "futures do nothing unless polled"] - pub struct TimeoutServiceResponse<'f, T: Service, R> - where T: 'f, R: 'f, - { - #[pin] - fut: ServiceCall<'f, T, R>, - sleep: Sleep, - _t: marker::PhantomData - } -} - -impl<'f, T, R> Future for TimeoutServiceResponse<'f, T, R> -where - T: Service, -{ - type Output = Result>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - // First, try polling the future - match this.fut.poll(cx) { - Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), - Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))), - Poll::Pending => {} - } - - // Now check the sleep - match this.sleep.poll_elapsed(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)), - } - } -} - -pin_project_lite::pin_project! { - /// `TimeoutService` response future - #[doc(hidden)] - #[must_use = "futures do nothing unless polled"] - pub struct TimeoutServiceResponse2<'f, T: Service, R> - where T: 'f, R: 'f, - { - #[pin] - fut: ServiceCall<'f, T, R>, - _t: marker::PhantomData, - } -} - -impl<'f, T, R> Future for TimeoutServiceResponse2<'f, T, R> -where - T: Service, -{ - type Output = Result>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project().fut.poll(cx) { - Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), - Poll::Ready(Err(e)) => Poll::Ready(Err(TimeoutError::Service(e))), - Poll::Pending => Poll::Pending, - } - } -} - #[cfg(test)] mod tests { use std::{fmt, time::Duration}; @@ -217,7 +152,7 @@ mod tests { use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory}; use super::*; - use crate::future::{lazy, BoxFuture}; + use crate::future::lazy; #[derive(Clone, Debug, PartialEq)] struct SleepService(Duration); @@ -234,14 +169,14 @@ mod tests { impl Service<()> for SleepService { type Response = (); type Error = SrvError; - type Future<'f> = BoxFuture<'f, Result<(), SrvError>>; - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - let fut = crate::time::sleep(self.0); - Box::pin(async move { - fut.await; - Ok::<_, SrvError>(()) - }) + async fn call<'a>( + &'a self, + _: (), + _: ServiceCtx<'a, Self>, + ) -> Result<(), SrvError> { + crate::time::sleep(self.0).await; + Ok::<_, SrvError>(()) } } diff --git a/ntex-util/src/services/variant.rs b/ntex-util/src/services/variant.rs index f30672df..0642e30e 100644 --- a/ntex-util/src/services/variant.rs +++ b/ntex-util/src/services/variant.rs @@ -1,7 +1,7 @@ //! Contains `Variant` service and related types and functions. -use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; +use std::{fmt, marker::PhantomData, task::Context, task::Poll}; -use ntex_service::{IntoServiceFactory, Service, ServiceCall, ServiceCtx, ServiceFactory}; +use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; /// Construct `Variant` service factory. /// @@ -123,8 +123,6 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, { type Response = V1::Response; type Error = V1::Error; - type Future<'f> = $mod_name::ServiceResponse< - ServiceCall<'f, V1, V1R>, $(ServiceCall<'f, $T, $R>),+> where Self: 'f, V1: 'f; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let mut ready = self.V1.poll_ready(cx)?.is_ready(); @@ -148,11 +146,11 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, } } - fn call<'a>(&'a self, req: $enum_type, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> + async fn call(&self, req: $enum_type, ctx: ServiceCtx<'_, Self>) -> Result { match req { - $enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: ctx.call(&self.V1, req) }, - $($enum_type::$T(req) => $mod_name::ServiceResponse::$T { fut: ctx.call(&self.$T, req) },)+ + $enum_type::V1(req) => ctx.call(&self.V1, req).await, + $($enum_type::$T(req) => ctx.call(&self.$T, req).await,)+ } } } @@ -191,111 +189,17 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, { type Response = V1::Response; type Error = V1::Error; - type InitError = V1::InitError; type Service = $srv_type; - type Future<'f> = $mod_name::ServiceFactoryResponse<'f, V1, V1C, $($T,)+ V1R, $($R,)+> where Self: 'f, V1C: 'f; + type InitError = V1::InitError; - fn create(&self, cfg: V1C) -> Self::Future<'_> { - $mod_name::ServiceFactoryResponse { - V1: None, - items: Default::default(), - $($T: self.$T.create(cfg.clone()),)+ - V1_fut: self.V1.create(cfg), - } + async fn create(&self, cfg: V1C) -> Result { + Ok($srv_type { + V1: self.V1.create(cfg.clone()).await?, + $($T: self.$T.create(cfg.clone()).await?,)+ + _t: PhantomData + }) } } - - #[doc(hidden)] - #[allow(non_snake_case)] - pub mod $mod_name { - use super::*; - - pin_project_lite::pin_project! { - #[project = ServiceResponseProject] - pub enum ServiceResponse - { - V1{ #[pin] fut: V1 }, - $($T{ #[pin] fut: $T },)+ - } - } - - impl Future for ServiceResponse - where - V1: Future, - $($T: Future),+ - { - type Output = V1::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - ServiceResponseProject::V1{fut} => fut.poll(cx), - $(ServiceResponseProject::$T{fut} => fut.poll(cx),)+ - } - } - } - - - pin_project_lite::pin_project! { - #[doc(hidden)] - pub struct ServiceFactoryResponse<'f, V1: ServiceFactory, V1C, $($T: ServiceFactory<$R, V1C>,)+ V1R, $($R,)+> - where - V1C: 'f, - V1: 'f, - $($T: 'f,)+ - { - pub(super) V1: Option, - pub(super) items: ($(Option<$T::Service>,)+), - #[pin] pub(super) V1_fut: V1::Future<'f>, - $(#[pin] pub(super) $T: $T::Future<'f>),+ - } - } - - impl<'f, V1, V1C, $($T,)+ V1R, $($R,)+> Future for ServiceFactoryResponse<'f, V1, V1C, $($T,)+ V1R, $($R,)+> - where - V1: ServiceFactory + 'f, - $($T: ServiceFactory<$R, V1C, Response = V1::Response, Error = V1::Error, InitError = V1::InitError,> + 'f),+ - { - type Output = Result<$srv_type, V1::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let mut ready = true; - - if this.V1.is_none() { - match this.V1_fut.poll(cx) { - Poll::Ready(Ok(item)) => { - *this.V1 = Some(item); - } - Poll::Pending => ready = false, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), - } - } - - $( - if this.items.$n.is_none() { - match this.$T.poll(cx) { - Poll::Ready(Ok(item)) => { - this.items.$n = Some(item); - } - Poll::Pending => ready = false, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), - } - } - )+ - - if ready { - Poll::Ready(Ok($srv_type { - V1: this.V1.take().unwrap(), - $($T: this.items.$n.take().unwrap(),)+ - _t: PhantomData - })) - } else { - Poll::Pending - } - } - } - } - }); #[rustfmt::skip] @@ -332,7 +236,7 @@ mod tests { use std::task::{Context, Poll}; use super::*; - use crate::future::{lazy, Ready}; + use crate::future::lazy; #[derive(Clone)] struct Srv1; @@ -340,7 +244,6 @@ mod tests { impl Service<()> for Srv1 { type Response = usize; type Error = (); - type Future<'f> = Ready where Self: 'f; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -350,8 +253,8 @@ mod tests { Poll::Ready(()) } - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Ready::<_, ()>::Ok(1) + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result { + Ok(1) } } @@ -361,7 +264,6 @@ mod tests { impl Service<()> for Srv2 { type Response = usize; type Error = (); - type Future<'f> = Ready where Self: 'f; fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -371,8 +273,8 @@ mod tests { Poll::Ready(()) } - fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - Ready::<_, ()>::Ok(2) + async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result { + Ok(2) } } diff --git a/ntex-util/src/time/mod.rs b/ntex-util/src/time/mod.rs index 4d630658..5dbc1525 100644 --- a/ntex-util/src/time/mod.rs +++ b/ntex-util/src/time/mod.rs @@ -1,5 +1,5 @@ //! Utilities for tracking time. -use std::{cmp, future::Future, pin::Pin, task, task::Poll}; +use std::{cmp, future::poll_fn, future::Future, pin::Pin, task, task::Poll}; mod types; mod wheel; @@ -312,7 +312,7 @@ impl Interval { #[inline] pub async fn tick(&self) { - crate::future::poll_fn(|cx| self.poll_tick(cx)).await; + poll_fn(|cx| self.poll_tick(cx)).await; } #[inline]