Migrate ntex-util to async fn in trait

This commit is contained in:
Nikolay Kim 2024-01-07 04:23:49 +06:00
parent a9d5845005
commit 9119f997fd
13 changed files with 145 additions and 439 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [1.0.0] - 2024-01-0x
* Use "async fn" in trait for Service definition
## [0.3.4] - 2023-11-06 ## [0.3.4] - 2023-11-06
* Add UnwindSafe trait on mpsc::Receiver<T> #239 * Add UnwindSafe trait on mpsc::Receiver<T> #239

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-util" name = "ntex-util"
version = "0.3.4" version = "1.0.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework" description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-rt = "0.4.7" ntex-rt = "0.4.7"
ntex-service = "1.2.6" ntex-service = "2.0.0"
bitflags = "2.4" bitflags = "2.4"
fxhash = "0.2.1" fxhash = "0.2.1"
log = "0.4" log = "0.4"
@ -29,6 +29,6 @@ pin-project-lite = "0.2.9"
[dev-dependencies] [dev-dependencies]
ntex = { version = "0.7", features = ["tokio"] } ntex = { version = "0.7", features = ["tokio"] }
ntex-bytes = "0.1.18" ntex-bytes = "0.1.21"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] } futures-util = { version = "0.3", default-features = false, features = ["alloc"] }

View file

@ -1,8 +1,8 @@
use slab::Slab; use slab::Slab;
use std::{future::Future, pin::Pin, task::Context, task::Poll}; use std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll};
use super::cell::Cell; use super::cell::Cell;
use crate::{future::poll_fn, task::LocalWaker}; use crate::task::LocalWaker;
/// Condition allows to notify multiple waiters at the same time /// Condition allows to notify multiple waiters at the same time
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View file

@ -1,13 +1,13 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue. //! A multi-producer, single-consumer, futures-aware, FIFO queue.
use std::{ use std::collections::VecDeque;
collections::VecDeque, fmt, panic::UnwindSafe, pin::Pin, task::Context, task::Poll, use std::future::poll_fn;
}; use std::{fmt, panic::UnwindSafe, pin::Pin, task::Context, task::Poll};
use futures_core::{FusedStream, Stream}; use futures_core::{FusedStream, Stream};
use futures_sink::Sink; use futures_sink::Sink;
use super::cell::{Cell, WeakCell}; use super::cell::{Cell, WeakCell};
use crate::{future::poll_fn, task::LocalWaker}; use crate::task::LocalWaker;
/// Creates a unbounded in-memory channel with buffered storage. /// Creates a unbounded in-memory channel with buffered storage.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) { pub fn channel<T>() -> (Sender<T>, Receiver<T>) {

View file

@ -1,8 +1,8 @@
//! A one-shot, futures-aware channel. //! A one-shot, futures-aware channel.
use std::{future::Future, pin::Pin, task::Context, task::Poll}; use std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll};
use super::{cell::Cell, Canceled}; use super::{cell::Cell, Canceled};
use crate::{future::poll_fn, task::LocalWaker}; use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel. /// Creates a new futures-aware, one-shot channel.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) { pub fn channel<T>() -> (Sender<T>, Receiver<T>) {

View file

@ -1,5 +1,5 @@
//! Utilities for futures //! Utilities for futures
use std::{future::Future, mem, pin::Pin, task::Context, task::Poll}; use std::{future::poll_fn, future::Future, mem, pin::Pin, task::Context, task::Poll};
pub use futures_core::{Stream, TryFuture}; pub use futures_core::{Stream, TryFuture};
pub use futures_sink::Sink; pub use futures_sink::Sink;
@ -20,35 +20,6 @@ pub use self::select::select;
/// you can't statically type your result or need to add some indirection. /// you can't statically type your result or need to add some indirection.
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>; pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
/// Creates a new future wrapping around a function returning [`Poll`].
///
/// Polling the returned future delegates to the wrapped function.
pub fn poll_fn<T, F>(f: F) -> impl Future<Output = T>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}
/// Future for the [`poll_fn`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(self.f)(cx)
}
}
/// Creates a future that resolves to the next item in the stream. /// Creates a future that resolves to the next item in the stream.
pub async fn stream_recv<S>(stream: &mut S) -> Option<S::Item> pub async fn stream_recv<S>(stream: &mut S) -> Option<S::Item>
where where

View file

@ -1,11 +1,11 @@
//! Service that buffers incomming requests. //! Service that buffers incomming requests.
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::task::{ready, Context, Poll}; use std::task::{ready, Context, Poll};
use std::{collections::VecDeque, fmt, future::Future, marker::PhantomData, pin::Pin}; use std::{collections::VecDeque, fmt, marker::PhantomData};
use ntex_service::{IntoService, Middleware, Service, ServiceCallToCall, ServiceCtx}; use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use crate::channel::{oneshot, Canceled}; use crate::channel::oneshot;
/// Buffer - service factory for service that can buffer incoming request. /// Buffer - service factory for service that can buffer incoming request.
/// ///
@ -79,6 +79,31 @@ where
} }
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BufferServiceError<E> {
Service(E),
RequestCanceled,
}
impl<E> From<E> for BufferServiceError<E> {
fn from(err: E) -> Self {
BufferServiceError::Service(err)
}
}
impl<E: std::fmt::Display> std::fmt::Display for BufferServiceError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BufferServiceError::Service(e) => std::fmt::Display::fmt(e, f),
BufferServiceError::RequestCanceled => {
f.write_str("buffer service request canceled")
}
}
}
}
impl<E: std::fmt::Display + std::fmt::Debug> std::error::Error for BufferServiceError<E> {}
/// Buffer service - service that can buffer incoming requests. /// Buffer service - service that can buffer incoming requests.
/// ///
/// Default number of buffered requests is 16 /// Default number of buffered requests is 16
@ -158,7 +183,6 @@ where
{ {
type Response = S::Response; type Response = S::Response;
type Error = BufferServiceError<S::Error>; type Error = BufferServiceError<S::Error>;
type Future<'f> = BufferServiceResponse<'f, R, S> where Self: 'f, R: 'f;
#[inline] #[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -196,30 +220,6 @@ where
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
#[inline]
fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
if self.ready.get() {
self.ready.set(false);
BufferServiceResponse {
slf: self,
state: ResponseState::Running {
fut: ctx.call_nowait(&self.service, req),
},
}
} else {
let (tx, rx) = oneshot::channel();
self.buf.borrow_mut().push_back(tx);
BufferServiceResponse {
slf: self,
state: ResponseState::WaitingForRelease {
rx,
call: Some(ctx.call(&self.service, req).advance_to_call()),
},
}
}
}
fn poll_shutdown(&self, cx: &mut std::task::Context<'_>) -> Poll<()> { fn poll_shutdown(&self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut buffer = self.buf.borrow_mut(); let mut buffer = self.buf.borrow_mut();
if self.cancel_on_shutdown { if self.cancel_on_shutdown {
@ -257,97 +257,41 @@ where
self.service.poll_shutdown(cx) self.service.poll_shutdown(cx)
} }
}
pin_project_lite::pin_project! { async fn call(
#[doc(hidden)] &self,
#[must_use = "futures do nothing unless polled"] req: R,
pub struct BufferServiceResponse<'f, R, S: Service<R>> ctx: ServiceCtx<'_, Self>,
{ ) -> Result<Self::Response, Self::Error> {
#[pin] if self.ready.get() {
state: ResponseState<'f, R, S>, self.ready.set(false);
slf: &'f BufferService<R, S>, Ok(ctx.call_nowait(&self.service, req).await?)
} } else {
} let (tx, rx) = oneshot::channel();
self.buf.borrow_mut().push_back(tx);
pin_project_lite::pin_project! { // release
#[project = ResponseStateProject] let _task_guard = rx.recv().await.map_err(|_| {
enum ResponseState<'f, R, S: Service<R>> log::trace!("Buffered service request canceled");
{ BufferServiceError::RequestCanceled
WaitingForRelease { rx: oneshot::Receiver<oneshot::Sender<()>>, call: Option<ServiceCallToCall<'f, S, R>> }, })?;
WaitingForReady { tx: oneshot::Sender<()>, #[pin] call: ServiceCallToCall<'f, S, R> },
Running { #[pin] fut: S::Future<'f> },
}
}
impl<'f, R, S> Future for BufferServiceResponse<'f, R, S> // check service readiness
where ctx.ready(&self.service).await?;
S: Service<R>,
{
type Output = Result<S::Response, BufferServiceError<S::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // call service
let mut this = self.as_mut().project(); Ok(ctx.call_nowait(&self.service, req).await?)
match this.state.as_mut().project() {
ResponseStateProject::WaitingForRelease { rx, call } => {
match ready!(rx.poll_recv(cx)) {
Ok(tx) => {
let call = call.take().expect("always set in this state");
this.state.set(ResponseState::WaitingForReady { tx, call });
self.poll(cx)
}
Err(Canceled) => {
log::trace!("Buffered service request canceled");
Poll::Ready(Err(BufferServiceError::RequestCanceled))
}
}
}
ResponseStateProject::WaitingForReady { call, .. } => {
let fut = match ready!(call.poll(cx)) {
Ok(fut) => fut,
Err(err) => return Poll::Ready(Err(err.into())),
};
this.state.set(ResponseState::Running { fut });
self.poll(cx)
}
ResponseStateProject::Running { fut } => fut.poll(cx).map_err(|e| e.into()),
} }
} }
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BufferServiceError<E> {
Service(E),
RequestCanceled,
}
impl<E> From<E> for BufferServiceError<E> {
fn from(err: E) -> Self {
BufferServiceError::Service(err)
}
}
impl<E: std::fmt::Display> std::fmt::Display for BufferServiceError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BufferServiceError::Service(e) => std::fmt::Display::fmt(e, f),
BufferServiceError::RequestCanceled => {
f.write_str("buffer service request canceled")
}
}
}
}
impl<E: std::fmt::Display + std::fmt::Debug> std::error::Error for BufferServiceError<E> {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory}; use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory};
use std::{rc::Rc, task::Context, task::Poll, time::Duration}; use std::{rc::Rc, task::Context, task::Poll, time::Duration};
use super::*; use super::*;
use crate::future::{lazy, Ready}; use crate::future::lazy;
use crate::task::LocalWaker; use crate::task::LocalWaker;
#[derive(Clone)] #[derive(Clone)]
@ -362,7 +306,6 @@ mod tests {
impl Service<()> for TestService { impl Service<()> for TestService {
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future<'f> = Ready<(), ()> where Self: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.waker.register(cx.waker()); self.0.waker.register(cx.waker());
@ -373,10 +316,10 @@ mod tests {
} }
} }
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
self.0.ready.set(false); self.0.ready.set(false);
self.0.count.set(self.0.count.get() + 1); self.0.count.set(self.0.count.get() + 1);
Ready::Ok(()) Ok(())
} }
} }

View file

@ -1,9 +1,9 @@
//! Service that limits number of in-flight async requests. //! Service that limits number of in-flight async requests.
use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; use std::{task::Context, task::Poll};
use ntex_service::{IntoService, Middleware, Service, ServiceCall, ServiceCtx}; use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use super::counter::{Counter, CounterGuard}; use super::counter::Counter;
/// InFlight - service factory for service that can limit number of in-flight /// InFlight - service factory for service that can limit number of in-flight
/// async requests. /// async requests.
@ -62,7 +62,6 @@ where
{ {
type Response = T::Response; type Response = T::Response;
type Error = T::Error; type Error = T::Error;
type Future<'f> = InFlightServiceResponse<'f, T, R> where Self: 'f, R: 'f;
#[inline] #[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -77,57 +76,35 @@ where
} }
#[inline] #[inline]
fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call(
InFlightServiceResponse { &self,
fut: ctx.call(&self.service, req), req: R,
_guard: self.count.get(), ctx: ServiceCtx<'_, Self>,
_t: PhantomData, ) -> Result<Self::Response, Self::Error> {
} let _guard = self.count.get();
ctx.call(&self.service, req).await
} }
ntex_service::forward_poll_shutdown!(service); ntex_service::forward_poll_shutdown!(service);
} }
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct InFlightServiceResponse<'f, T: Service<R>, R>
where T: 'f, R: 'f
{
#[pin]
fut: ServiceCall<'f, T, R>,
_guard: CounterGuard,
_t: PhantomData<R>
}
}
impl<'f, T: Service<R>, R> Future for InFlightServiceResponse<'f, T, R> {
type Output = Result<T::Response, T::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory};
use std::{cell::RefCell, task::Poll, time::Duration}; use std::{cell::RefCell, task::Poll, time::Duration};
use super::*; use super::*;
use crate::{channel::oneshot, future::lazy, future::BoxFuture}; use crate::{channel::oneshot, future::lazy};
struct SleepService(oneshot::Receiver<()>); struct SleepService(oneshot::Receiver<()>);
impl Service<()> for SleepService { impl Service<()> for SleepService {
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future<'f> = BoxFuture<'f, Result<(), ()>>;
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
Box::pin(async move { let _ = self.0.recv().await;
let _ = self.0.recv().await; Ok(())
Ok::<_, ()>(())
})
} }
} }

View file

@ -3,7 +3,6 @@ use std::{cell::Cell, convert::Infallible, fmt, marker, time::Duration, time::In
use ntex_service::{Service, ServiceCtx, ServiceFactory}; use ntex_service::{Service, ServiceCtx, ServiceFactory};
use crate::future::Ready;
use crate::time::{now, sleep, Millis, Sleep}; use crate::time::{now, sleep, Millis, Sleep};
/// KeepAlive service factory /// KeepAlive service factory
@ -60,13 +59,13 @@ where
{ {
type Response = R; type Response = R;
type Error = E; type Error = E;
type InitError = Infallible;
type Service = KeepAliveService<R, E, F>; type Service = KeepAliveService<R, E, F>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f, C: 'f; type InitError = Infallible;
#[inline] #[inline]
fn create(&self, _: C) -> Self::Future<'_> { async fn create(&self, _: C) -> Result<Self::Service, Self::InitError> {
Ready::Ok(KeepAliveService::new(self.ka, self.f.clone())) Ok(KeepAliveService::new(self.ka, self.f.clone()))
} }
} }
@ -111,7 +110,6 @@ where
{ {
type Response = R; type Response = R;
type Error = E; type Error = E;
type Future<'f> = Ready<R, E> where Self: 'f, R: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.sleep.poll_elapsed(cx) { match self.sleep.poll_elapsed(cx) {
@ -132,9 +130,9 @@ where
} }
} }
fn call<'a>(&'a self, req: R, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result<R, E> {
self.expire.set(now()); self.expire.set(now());
Ready::Ok(req) Ok(req)
} }
} }

View file

@ -1,7 +1,7 @@
//! Service that limits number of in-flight async requests to 1. //! Service that limits number of in-flight async requests to 1.
use std::{cell::Cell, future::Future, pin::Pin, task::Context, task::Poll}; use std::{cell::Cell, task::Context, task::Poll};
use ntex_service::{IntoService, Middleware, Service, ServiceCall, ServiceCtx}; use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use crate::task::LocalWaker; use crate::task::LocalWaker;
@ -49,7 +49,6 @@ where
{ {
type Response = T::Response; type Response = T::Response;
type Error = T::Error; type Error = T::Error;
type Future<'f> = OneRequestServiceResponse<'f, T, R> where Self: 'f, R: 'f;
#[inline] #[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -64,62 +63,39 @@ where
} }
#[inline] #[inline]
fn call<'a>(&'a self, req: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call(
&self,
req: R,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
self.ready.set(false); self.ready.set(false);
OneRequestServiceResponse { let result = ctx.call(&self.service, req).await;
fut: ctx.call(&self.service, req), self.ready.set(true);
service: self, self.waker.wake();
} result
} }
ntex_service::forward_poll_shutdown!(service); ntex_service::forward_poll_shutdown!(service);
} }
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct OneRequestServiceResponse<'f, T: Service<R>, R>
where T: 'f, R: 'f
{
#[pin]
fut: ServiceCall<'f, T, R>,
service: &'f OneRequestService<T>,
}
}
impl<'f, T: Service<R>, R> Future for OneRequestServiceResponse<'f, T, R> {
type Output = Result<T::Response, T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = self.as_mut().project().fut.poll(cx);
if result.is_ready() {
self.service.ready.set(true);
self.service.waker.wake();
}
result
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory};
use std::{cell::RefCell, task::Poll, time::Duration}; use std::{cell::RefCell, task::Poll, time::Duration};
use super::*; use super::*;
use crate::{channel::oneshot, future::lazy, future::BoxFuture}; use crate::{channel::oneshot, future::lazy};
struct SleepService(oneshot::Receiver<()>); struct SleepService(oneshot::Receiver<()>);
impl Service<()> for SleepService { impl Service<()> for SleepService {
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future<'f> = BoxFuture<'f, Result<(), ()>>;
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
Box::pin(async move { let _ = self.0.recv().await;
let _ = self.0.recv().await; Ok::<_, ()>(())
Ok::<_, ()>(())
})
} }
} }

View file

@ -2,12 +2,12 @@
//! //!
//! If the response does not complete within the specified timeout, the response //! If the response does not complete within the specified timeout, the response
//! will be aborted. //! will be aborted.
use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll}; use std::{fmt, marker};
use ntex_service::{IntoService, Middleware, Service, ServiceCall, ServiceCtx}; use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use crate::future::Either; use crate::future::{select, Either};
use crate::time::{sleep, Millis, Sleep}; use crate::time::{sleep, Millis};
/// Applies a timeout to requests. /// Applies a timeout to requests.
/// ///
@ -123,20 +123,21 @@ where
{ {
type Response = S::Response; type Response = S::Response;
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type Future<'f> = Either<TimeoutServiceResponse<'f, S, R>, TimeoutServiceResponse2<'f, S, R>> where Self: 'f, R: 'f;
fn call<'a>(&'a self, request: R, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call(
&self,
request: R,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
if self.timeout.is_zero() { if self.timeout.is_zero() {
Either::Right(TimeoutServiceResponse2 { ctx.call(&self.service, request)
fut: ctx.call(&self.service, request), .await
_t: marker::PhantomData, .map_err(TimeoutError::Service)
})
} else { } else {
Either::Left(TimeoutServiceResponse { match select(sleep(self.timeout), ctx.call(&self.service, request)).await {
fut: ctx.call(&self.service, request), Either::Left(_) => Err(TimeoutError::Timeout),
sleep: sleep(self.timeout), Either::Right(res) => res.map_err(TimeoutError::Service),
_t: marker::PhantomData, }
})
} }
} }
@ -144,72 +145,6 @@ where
ntex_service::forward_poll_shutdown!(service); ntex_service::forward_poll_shutdown!(service);
} }
pin_project_lite::pin_project! {
/// `TimeoutService` response future
#[doc(hidden)]
#[must_use = "futures do nothing unless polled"]
pub struct TimeoutServiceResponse<'f, T: Service<R>, R>
where T: 'f, R: 'f,
{
#[pin]
fut: ServiceCall<'f, T, R>,
sleep: Sleep,
_t: marker::PhantomData<R>
}
}
impl<'f, T, R> Future for TimeoutServiceResponse<'f, T, R>
where
T: Service<R>,
{
type Output = Result<T::Response, TimeoutError<T::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
// First, try polling the future
match this.fut.poll(cx) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))),
Poll::Pending => {}
}
// Now check the sleep
match this.sleep.poll_elapsed(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)),
}
}
}
pin_project_lite::pin_project! {
/// `TimeoutService` response future
#[doc(hidden)]
#[must_use = "futures do nothing unless polled"]
pub struct TimeoutServiceResponse2<'f, T: Service<R>, R>
where T: 'f, R: 'f,
{
#[pin]
fut: ServiceCall<'f, T, R>,
_t: marker::PhantomData<R>,
}
}
impl<'f, T, R> Future for TimeoutServiceResponse2<'f, T, R>
where
T: Service<R>,
{
type Output = Result<T::Response, TimeoutError<T::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().fut.poll(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
Poll::Ready(Err(e)) => Poll::Ready(Err(TimeoutError::Service(e))),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{fmt, time::Duration}; use std::{fmt, time::Duration};
@ -217,7 +152,7 @@ mod tests {
use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory}; use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory};
use super::*; use super::*;
use crate::future::{lazy, BoxFuture}; use crate::future::lazy;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
struct SleepService(Duration); struct SleepService(Duration);
@ -234,14 +169,14 @@ mod tests {
impl Service<()> for SleepService { impl Service<()> for SleepService {
type Response = (); type Response = ();
type Error = SrvError; type Error = SrvError;
type Future<'f> = BoxFuture<'f, Result<(), SrvError>>;
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call<'a>(
let fut = crate::time::sleep(self.0); &'a self,
Box::pin(async move { _: (),
fut.await; _: ServiceCtx<'a, Self>,
Ok::<_, SrvError>(()) ) -> Result<(), SrvError> {
}) crate::time::sleep(self.0).await;
Ok::<_, SrvError>(())
} }
} }

View file

@ -1,7 +1,7 @@
//! Contains `Variant` service and related types and functions. //! Contains `Variant` service and related types and functions.
use std::{fmt, future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; use std::{fmt, marker::PhantomData, task::Context, task::Poll};
use ntex_service::{IntoServiceFactory, Service, ServiceCall, ServiceCtx, ServiceFactory}; use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
/// Construct `Variant` service factory. /// Construct `Variant` service factory.
/// ///
@ -123,8 +123,6 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
{ {
type Response = V1::Response; type Response = V1::Response;
type Error = V1::Error; type Error = V1::Error;
type Future<'f> = $mod_name::ServiceResponse<
ServiceCall<'f, V1, V1R>, $(ServiceCall<'f, $T, $R>),+> where Self: 'f, V1: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut ready = self.V1.poll_ready(cx)?.is_ready(); let mut ready = self.V1.poll_ready(cx)?.is_ready();
@ -148,11 +146,11 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
} }
} }
fn call<'a>(&'a self, req: $enum_type<V1R, $($R,)+>, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> async fn call(&self, req: $enum_type<V1R, $($R,)+>, ctx: ServiceCtx<'_, Self>) -> Result<Self::Response, Self::Error>
{ {
match req { match req {
$enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: ctx.call(&self.V1, req) }, $enum_type::V1(req) => ctx.call(&self.V1, req).await,
$($enum_type::$T(req) => $mod_name::ServiceResponse::$T { fut: ctx.call(&self.$T, req) },)+ $($enum_type::$T(req) => ctx.call(&self.$T, req).await,)+
} }
} }
} }
@ -191,111 +189,17 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
{ {
type Response = V1::Response; type Response = V1::Response;
type Error = V1::Error; type Error = V1::Error;
type InitError = V1::InitError;
type Service = $srv_type<V1::Service, $($T::Service,)+ V1R, $($R,)+>; type Service = $srv_type<V1::Service, $($T::Service,)+ V1R, $($R,)+>;
type Future<'f> = $mod_name::ServiceFactoryResponse<'f, V1, V1C, $($T,)+ V1R, $($R,)+> where Self: 'f, V1C: 'f; type InitError = V1::InitError;
fn create(&self, cfg: V1C) -> Self::Future<'_> { async fn create(&self, cfg: V1C) -> Result<Self::Service, Self::InitError> {
$mod_name::ServiceFactoryResponse { Ok($srv_type {
V1: None, V1: self.V1.create(cfg.clone()).await?,
items: Default::default(), $($T: self.$T.create(cfg.clone()).await?,)+
$($T: self.$T.create(cfg.clone()),)+ _t: PhantomData
V1_fut: self.V1.create(cfg), })
}
} }
} }
#[doc(hidden)]
#[allow(non_snake_case)]
pub mod $mod_name {
use super::*;
pin_project_lite::pin_project! {
#[project = ServiceResponseProject]
pub enum ServiceResponse<V1: Future, $($T: Future),+>
{
V1{ #[pin] fut: V1 },
$($T{ #[pin] fut: $T },)+
}
}
impl<V1, $($T),+> Future for ServiceResponse<V1, $($T),+>
where
V1: Future,
$($T: Future<Output = V1::Output>),+
{
type Output = V1::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
ServiceResponseProject::V1{fut} => fut.poll(cx),
$(ServiceResponseProject::$T{fut} => fut.poll(cx),)+
}
}
}
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct ServiceFactoryResponse<'f, V1: ServiceFactory<V1R, V1C>, V1C, $($T: ServiceFactory<$R, V1C>,)+ V1R, $($R,)+>
where
V1C: 'f,
V1: 'f,
$($T: 'f,)+
{
pub(super) V1: Option<V1::Service>,
pub(super) items: ($(Option<$T::Service>,)+),
#[pin] pub(super) V1_fut: V1::Future<'f>,
$(#[pin] pub(super) $T: $T::Future<'f>),+
}
}
impl<'f, V1, V1C, $($T,)+ V1R, $($R,)+> Future for ServiceFactoryResponse<'f, V1, V1C, $($T,)+ V1R, $($R,)+>
where
V1: ServiceFactory<V1R, V1C> + 'f,
$($T: ServiceFactory<$R, V1C, Response = V1::Response, Error = V1::Error, InitError = V1::InitError,> + 'f),+
{
type Output = Result<$srv_type<V1::Service, $($T::Service,)+ V1R, $($R),+>, V1::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut ready = true;
if this.V1.is_none() {
match this.V1_fut.poll(cx) {
Poll::Ready(Ok(item)) => {
*this.V1 = Some(item);
}
Poll::Pending => ready = false,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
}
$(
if this.items.$n.is_none() {
match this.$T.poll(cx) {
Poll::Ready(Ok(item)) => {
this.items.$n = Some(item);
}
Poll::Pending => ready = false,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
}
)+
if ready {
Poll::Ready(Ok($srv_type {
V1: this.V1.take().unwrap(),
$($T: this.items.$n.take().unwrap(),)+
_t: PhantomData
}))
} else {
Poll::Pending
}
}
}
}
}); });
#[rustfmt::skip] #[rustfmt::skip]
@ -332,7 +236,7 @@ mod tests {
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use super::*; use super::*;
use crate::future::{lazy, Ready}; use crate::future::lazy;
#[derive(Clone)] #[derive(Clone)]
struct Srv1; struct Srv1;
@ -340,7 +244,6 @@ mod tests {
impl Service<()> for Srv1 { impl Service<()> for Srv1 {
type Response = usize; type Response = usize;
type Error = (); type Error = ();
type Future<'f> = Ready<usize, ()> where Self: 'f;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@ -350,8 +253,8 @@ mod tests {
Poll::Ready(()) Poll::Ready(())
} }
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<usize, ()> {
Ready::<_, ()>::Ok(1) Ok(1)
} }
} }
@ -361,7 +264,6 @@ mod tests {
impl Service<()> for Srv2 { impl Service<()> for Srv2 {
type Response = usize; type Response = usize;
type Error = (); type Error = ();
type Future<'f> = Ready<usize, ()> where Self: 'f;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@ -371,8 +273,8 @@ mod tests {
Poll::Ready(()) Poll::Ready(())
} }
fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Self::Future<'a> { async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<usize, ()> {
Ready::<_, ()>::Ok(2) Ok(2)
} }
} }

View file

@ -1,5 +1,5 @@
//! Utilities for tracking time. //! Utilities for tracking time.
use std::{cmp, future::Future, pin::Pin, task, task::Poll}; use std::{cmp, future::poll_fn, future::Future, pin::Pin, task, task::Poll};
mod types; mod types;
mod wheel; mod wheel;
@ -312,7 +312,7 @@ impl Interval {
#[inline] #[inline]
pub async fn tick(&self) { pub async fn tick(&self) {
crate::future::poll_fn(|cx| self.poll_tick(cx)).await; poll_fn(|cx| self.poll_tick(cx)).await;
} }
#[inline] #[inline]