Use async fn for Service::ready() and Service::shutdown() (#363)

This commit is contained in:
Nikolay Kim 2024-05-28 16:55:08 +05:00 committed by GitHub
parent dec6ab083a
commit b04bdf41f6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 285 additions and 299 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [2.0.0] - 2024-05-28
* Use async fn for Service::ready() and Service::shutdown()
## [1.1.0] - 2024-04-xx
* Change Extensions::insert() method according doc #345

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "1.1.0"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,7 +16,7 @@ name = "ntex_util"
path = "src/lib.rs"
[dependencies]
ntex-service = "2.0"
ntex-service = "3.0"
ntex-rt = "0.4"
bitflags = "2.4"
fxhash = "0.2.1"

View file

@ -1,9 +1,9 @@
//! Service that buffers incomming requests.
use std::cell::{Cell, RefCell};
use std::task::{ready, Context, Poll};
use std::{collections::VecDeque, fmt, marker::PhantomData};
use std::task::{ready, Poll};
use std::{collections::VecDeque, fmt, future::poll_fn, marker::PhantomData};
use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use ntex_service::{Middleware, Service, ServiceCtx};
use crate::channel::oneshot;
@ -121,15 +121,12 @@ impl<R, S> BufferService<R, S>
where
S: Service<R>,
{
pub fn new<U>(size: usize, service: U) -> Self
where
U: IntoService<S, R>,
{
pub fn new(size: usize, service: S) -> Self {
Self {
size,
service,
cancel_on_shutdown: false,
ready: Cell::new(false),
service: service.into_service(),
buf: RefCell::new(VecDeque::with_capacity(size)),
next_call: RefCell::default(),
_t: PhantomData,
@ -185,7 +182,7 @@ where
type Error = BufferServiceError<S::Error>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let mut buffer = self.buf.borrow_mut();
let mut next_call = self.next_call.borrow_mut();
if let Some(next_call) = &*next_call {
@ -220,42 +217,45 @@ where
Poll::Ready(Ok(()))
}
fn poll_shutdown(&self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut buffer = self.buf.borrow_mut();
if self.cancel_on_shutdown {
buffer.clear();
} else if !buffer.is_empty() {
let mut next_call = self.next_call.borrow_mut();
if let Some(next_call) = &*next_call {
// hold advancement until the last released task either makes a call or is dropped
let _ = ready!(next_call.poll_recv(cx));
}
next_call.take();
if ready!(self.service.poll_ready(cx)).is_err() {
log::error!(
"Buffered inner service failed while buffer flushing on shutdown"
);
return Poll::Ready(());
}
while let Some(sender) = buffer.pop_front() {
let (next_call_tx, next_call_rx) = oneshot::channel();
if sender.send(next_call_tx).is_err()
|| next_call_rx.poll_recv(cx).is_ready()
{
// the task is gone
continue;
async fn shutdown(&self) {
poll_fn(|cx| {
let mut buffer = self.buf.borrow_mut();
if self.cancel_on_shutdown {
buffer.clear();
} else if !buffer.is_empty() {
let mut next_call = self.next_call.borrow_mut();
if let Some(next_call) = &*next_call {
// hold advancement until the last released task either makes a call or is dropped
let _ = ready!(next_call.poll_recv(cx));
}
next_call.replace(next_call_rx);
if buffer.is_empty() {
break;
}
return Poll::Pending;
}
}
next_call.take();
self.service.poll_shutdown(cx)
if ready!(self.service.poll_ready(cx)).is_err() {
log::error!(
"Buffered inner service failed while buffer flushing on shutdown"
);
return Poll::Ready(());
}
while let Some(sender) = buffer.pop_front() {
let (next_call_tx, next_call_rx) = oneshot::channel();
if sender.send(next_call_tx).is_err()
|| next_call_rx.poll_recv(cx).is_ready()
{
// the task is gone
continue;
}
next_call.replace(next_call_rx);
if buffer.is_empty() {
break;
}
return Poll::Pending;
}
}
})
.await;
self.service.shutdown().await;
}
async fn call(

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, rc::Rc, task};
use std::{cell::Cell, future::poll_fn, rc::Rc, task};
use crate::task::LocalWaker;
@ -32,7 +32,20 @@ impl Counter {
/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
pub fn available(&self, cx: &mut task::Context<'_>) -> bool {
pub async fn available(&self) {
poll_fn(|cx| {
if self.poll_available(cx) {
task::Poll::Ready(())
} else {
task::Poll::Pending
}
})
.await
}
/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
pub fn poll_available(&self, cx: &mut task::Context<'_>) -> bool {
self.0.available(cx)
}

View file

@ -1,7 +1,5 @@
//! Service that limits number of in-flight async requests.
use std::{task::Context, task::Poll};
use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use ntex_service::{Middleware, Service, ServiceCtx};
use super::counter::Counter;
@ -44,14 +42,13 @@ pub struct InFlightService<S> {
}
impl<S> InFlightService<S> {
pub fn new<U, R>(max: usize, service: U) -> Self
pub fn new<R>(max: usize, service: S) -> Self
where
S: Service<R>,
U: IntoService<S, R>,
{
Self {
service,
count: Counter::new(max),
service: service.into_service(),
}
}
}
@ -64,15 +61,9 @@ where
type Error = T::Error;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.service.poll_ready(cx)?.is_pending() {
Poll::Pending
} else if !self.count.available(cx) {
log::trace!("InFlight limit exceeded");
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.count.available().await;
ctx.ready(&self.service).await
}
#[inline]
@ -85,13 +76,14 @@ where
ctx.call(&self.service, req).await
}
ntex_service::forward_poll_shutdown!(service);
ntex_service::forward_shutdown!(service);
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, task::Poll, time::Duration};
use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
use std::{cell::RefCell, time::Duration};
use super::*;
use crate::{channel::oneshot, future::lazy};
@ -112,7 +104,7 @@ mod tests {
async fn test_service() {
let (tx, rx) = oneshot::channel();
let srv = Pipeline::new(InFlightService::new(1, SleepService(rx)));
let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let srv2 = srv.clone();
@ -125,7 +117,7 @@ mod tests {
let _ = tx.send(());
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready());
assert_eq!(srv.shutdown().await, ());
}
#[ntex_macros::rt_test2]
@ -146,7 +138,7 @@ mod tests {
}),
);
let srv = srv.pipeline(&()).await.unwrap();
let srv = srv.pipeline(&()).await.unwrap().bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let srv2 = srv.clone();

View file

@ -1,5 +1,4 @@
use std::task::{Context, Poll};
use std::{cell::Cell, convert::Infallible, fmt, marker, time::Duration, time::Instant};
use std::{cell::Cell, convert::Infallible, fmt, future::poll_fn, marker, task, time};
use ntex_service::{Service, ServiceCtx, ServiceFactory};
@ -73,7 +72,7 @@ pub struct KeepAliveService<R, E, F> {
f: F,
dur: Millis,
sleep: Sleep,
expire: Cell<Instant>,
expire: Cell<time::Instant>,
_t: marker::PhantomData<(R, E)>,
}
@ -111,23 +110,24 @@ where
type Response = R;
type Error = E;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.sleep.poll_elapsed(cx) {
Poll::Ready(_) => {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| match self.sleep.poll_elapsed(cx) {
task::Poll::Ready(_) => {
let now = now();
let expire = self.expire.get() + Duration::from(self.dur);
let expire = self.expire.get() + time::Duration::from(self.dur);
if expire <= now {
Poll::Ready(Err((self.f)()))
task::Poll::Ready(Err((self.f)()))
} else {
let expire = expire - now;
self.sleep
.reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX)));
let _ = self.sleep.poll_elapsed(cx);
Poll::Ready(Ok(()))
task::Poll::Ready(Ok(()))
}
}
Poll::Pending => Poll::Ready(Ok(())),
}
task::Poll::Pending => task::Poll::Ready(Ok(())),
})
.await
}
async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result<R, E> {
@ -138,6 +138,8 @@ where
#[cfg(test)]
mod tests {
use std::task::Poll;
use super::*;
use crate::future::lazy;
@ -150,7 +152,7 @@ mod tests {
assert!(format!("{:?}", factory).contains("KeepAlive"));
let _ = factory.clone();
let service = factory.pipeline(&()).await.unwrap();
let service = factory.pipeline(&()).await.unwrap().bind();
assert!(format!("{:?}", service).contains("KeepAliveService"));
assert_eq!(service.call(1usize).await, Ok(1usize));

View file

@ -1,4 +1,4 @@
pub mod buffer;
// pub mod buffer;
pub mod counter;
mod extensions;
pub mod inflight;

View file

@ -1,7 +1,7 @@
//! Service that limits number of in-flight async requests to 1.
use std::{cell::Cell, task::Context, task::Poll};
use std::{cell::Cell, future::poll_fn, task::Poll};
use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use ntex_service::{Middleware, Service, ServiceCtx};
use crate::task::LocalWaker;
@ -30,13 +30,12 @@ pub struct OneRequestService<S> {
}
impl<S> OneRequestService<S> {
pub fn new<U, R>(service: U) -> Self
pub fn new<R>(service: S) -> Self
where
S: Service<R>,
U: IntoService<S, R>,
{
Self {
service: service.into_service(),
service,
ready: Cell::new(true),
waker: LocalWaker::new(),
}
@ -51,15 +50,18 @@ where
type Error = T::Error;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.waker.register(cx.waker());
if self.service.poll_ready(cx)?.is_pending() {
Poll::Pending
} else if self.ready.get() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| {
self.waker.register(cx.waker());
if self.ready.get() {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
ctx.ready(&self.service).await
}
#[inline]
@ -76,7 +78,7 @@ where
result
}
ntex_service::forward_poll_shutdown!(service);
ntex_service::forward_shutdown!(service);
}
#[cfg(test)]
@ -103,7 +105,7 @@ mod tests {
async fn test_oneshot() {
let (tx, rx) = oneshot::channel();
let srv = Pipeline::new(OneRequestService::new(SleepService(rx)));
let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let srv2 = srv.clone();
@ -116,7 +118,7 @@ mod tests {
let _ = tx.send(());
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready());
assert_eq!(srv.shutdown().await, ());
}
#[ntex_macros::rt_test2]
@ -133,7 +135,7 @@ mod tests {
}),
);
let srv = srv.pipeline(&()).await.unwrap();
let srv = srv.pipeline(&()).await.unwrap().bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let srv2 = srv.clone();

View file

@ -4,7 +4,7 @@
//! will be aborted.
use std::{fmt, marker};
use ntex_service::{IntoService, Middleware, Service, ServiceCtx};
use ntex_service::{Middleware, Service, ServiceCtx};
use crate::future::{select, Either};
use crate::time::{sleep, Millis};
@ -104,15 +104,14 @@ pub struct TimeoutService<S> {
}
impl<S> TimeoutService<S> {
pub fn new<T, U, R>(timeout: T, service: U) -> Self
pub fn new<T, R>(timeout: T, service: S) -> Self
where
T: Into<Millis>,
S: Service<R>,
U: IntoService<S, R>,
{
TimeoutService {
service,
timeout: timeout.into(),
service: service.into_service(),
}
}
}
@ -141,8 +140,8 @@ where
}
}
ntex_service::forward_poll_ready!(service, TimeoutError::Service);
ntex_service::forward_poll_shutdown!(service);
ntex_service::forward_ready!(service, TimeoutError::Service);
ntex_service::forward_shutdown!(service);
}
#[cfg(test)]
@ -152,7 +151,6 @@ mod tests {
use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
use super::*;
use crate::future::lazy;
#[derive(Clone, Debug, PartialEq)]
struct SleepService(Duration);
@ -184,8 +182,8 @@ mod tests {
let timeout =
Pipeline::new(TimeoutService::new(resolution, SleepService(wait_time)).clone());
assert_eq!(timeout.call(()).await, Ok(()));
assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready());
assert!(lazy(|cx| timeout.poll_shutdown(cx)).await.is_ready());
assert_eq!(timeout.ready().await, Ok(()));
assert_eq!(timeout.shutdown().await, ());
}
#[ntex_macros::rt_test2]
@ -196,7 +194,7 @@ mod tests {
let timeout =
Pipeline::new(TimeoutService::new(resolution, SleepService(wait_time)));
assert_eq!(timeout.call(()).await, Ok(()));
assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready());
assert_eq!(timeout.ready().await, Ok(()));
}
#[ntex_macros::rt_test2]

View file

@ -1,5 +1,6 @@
//! Contains `Variant` service and related types and functions.
use std::{fmt, marker::PhantomData, task::Context, task::Poll};
#![allow(non_snake_case)]
use std::{fmt, marker::PhantomData, task::Poll};
use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
@ -70,7 +71,7 @@ macro_rules! variant_impl_and ({$fac1_type:ident, $fac2_type:ident, $name:ident,
Response = V1::Response,
Error = V1::Error,
InitError = V1::InitError>,
F: IntoServiceFactory<$name, $r_name, V1C>,
F: IntoServiceFactory<$name, $r_name, V1C>,
{
$fac2_type {
V1: self.V1,
@ -124,30 +125,30 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
type Response = V1::Response;
type Error = V1::Error;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut ready = self.V1.poll_ready(cx)?.is_ready();
$(ready = self.$T.poll_ready(cx)?.is_ready() && ready;)+
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
use std::{future::Future, pin::Pin};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
let mut fut1 = ::std::pin::pin!(ctx.ready(&self.V1));
$(let mut $T = ::std::pin::pin!(ctx.ready(&self.$T));)+
::std::future::poll_fn(|cx| {
let mut ready = Pin::new(&mut fut1).poll(cx)?.is_ready();
$(ready = Pin::new(&mut $T).poll(cx)?.is_ready() && ready;)+
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}).await
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let mut ready = self.V1.poll_shutdown(cx).is_ready();
$(ready = self.$T.poll_shutdown(cx).is_ready() && ready;)+
if ready {
Poll::Ready(())
} else {
Poll::Pending
}
async fn shutdown(&self) {
self.V1.shutdown().await;
$(self.$T.shutdown().await;)+
}
async fn call(&self, req: $enum_type<V1R, $($R,)+>, ctx: ServiceCtx<'_, Self>) -> Result<Self::Response, Self::Error>
{
async fn call(&self, req: $enum_type<V1R, $($R,)+>, ctx: ServiceCtx<'_, Self>) -> Result<Self::Response, Self::Error> {
match req {
$enum_type::V1(req) => ctx.call(&self.V1, req).await,
$($enum_type::$T(req) => ctx.call(&self.$T, req).await,)+
@ -235,7 +236,6 @@ mod tests {
use ntex_service::fn_factory;
use super::*;
use crate::future::lazy;
#[derive(Clone)]
struct Srv1;
@ -244,13 +244,11 @@ mod tests {
type Response = usize;
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
Ok(())
}
fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
async fn shutdown(&self) {}
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<usize, ()> {
Ok(1)
@ -264,13 +262,11 @@ mod tests {
type Response = usize;
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
Ok(())
}
fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
async fn shutdown(&self) {}
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<usize, ()> {
Ok(2)
@ -286,8 +282,8 @@ mod tests {
.clone();
let service = factory.pipeline(&()).await.unwrap().clone();
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());
assert!(lazy(|cx| service.poll_shutdown(cx)).await.is_ready());
assert!(service.ready().await.is_ok());
assert_eq!(service.shutdown().await, ());
assert_eq!(service.call(Variant3::V1(())).await, Ok(1));
assert_eq!(service.call(Variant3::V2(())).await, Ok(2));