From b04bdf41f637d8a6a4ce59284b6a432721c145e5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 28 May 2024 16:55:08 +0500 Subject: [PATCH] Use async fn for Service::ready() and Service::shutdown() (#363) --- ntex-async-std/Cargo.toml | 8 +-- ntex-glommio/Cargo.toml | 8 +-- ntex-io/CHANGES.md | 4 ++ ntex-io/Cargo.toml | 6 +- ntex-io/src/dispatcher.rs | 40 ++++--------- ntex-net/CHANGES.md | 4 ++ ntex-net/Cargo.toml | 16 +++--- ntex-net/src/connect/resolve.rs | 9 +-- ntex-net/src/connect/service.rs | 12 ++-- ntex-server/CHANGES.md | 4 ++ ntex-server/Cargo.toml | 12 ++-- ntex-server/src/net/counter.rs | 13 ++++- ntex-server/src/net/factory.rs | 7 +-- ntex-server/src/net/service.rs | 47 +++++---------- ntex-server/src/wrk.rs | 28 +++++---- ntex-tls/CHANGES.md | 4 ++ ntex-tls/Cargo.toml | 12 ++-- ntex-tls/src/counter.rs | 14 +++-- ntex-tls/src/openssl/accept.rs | 11 ++-- ntex-tls/src/openssl/connect.rs | 7 +-- ntex-tls/src/rustls/accept.rs | 10 +--- ntex-tls/src/rustls/connect.rs | 9 +-- ntex-tokio/Cargo.toml | 8 +-- ntex-util/CHANGES.md | 4 ++ ntex-util/Cargo.toml | 4 +- ntex-util/src/services/buffer.rs | 86 ++++++++++++++-------------- ntex-util/src/services/counter.rs | 17 +++++- ntex-util/src/services/inflight.rs | 32 ++++------- ntex-util/src/services/keepalive.rs | 26 +++++---- ntex-util/src/services/mod.rs | 2 +- ntex-util/src/services/onerequest.rs | 38 ++++++------ ntex-util/src/services/timeout.rs | 18 +++--- ntex-util/src/services/variant.rs | 64 ++++++++++----------- 33 files changed, 285 insertions(+), 299 deletions(-) diff --git a/ntex-async-std/Cargo.toml b/ntex-async-std/Cargo.toml index 47126f00..2cf5111c 100644 --- a/ntex-async-std/Cargo.toml +++ b/ntex-async-std/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-async-std" -version = "0.4.0" +version = "0.5.0" authors = ["ntex contributors "] description = "async-std intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,9 +16,9 @@ name = "ntex_async_std" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.21" -ntex-io = "1.0.0" -ntex-util = "1.0.0" +ntex-bytes = "0.1" +ntex-io = "2.0" +ntex-util = "2.0" log = "0.4" async-std = { version = "1", features = ["unstable"] } oneshot = { version = "0.1", default-features = false, features = ["async"] } diff --git a/ntex-glommio/Cargo.toml b/ntex-glommio/Cargo.toml index 58d0e20c..edcd7feb 100644 --- a/ntex-glommio/Cargo.toml +++ b/ntex-glommio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-glommio" -version = "0.4.0" +version = "0.5.0" authors = ["ntex contributors "] description = "glommio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,9 +16,9 @@ name = "ntex_glommio" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.24" -ntex-io = "1.0.0" -ntex-util = "1.0.0" +ntex-bytes = "0.1" +ntex-io = "2.0" +ntex-util = "2.0" futures-lite = "2.2" log = "0.4" oneshot = { version = "0.1", default-features = false, features = ["async"] } diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index a51c852c..2cb997c2 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-05-28 + +* Use async fn for Service::ready() and Service::shutdown() + ## [1.2.0] - 2024-05-12 * Better write back-pressure handling diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 90adc74e..18d86279 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "1.2.0" +version = "2.0.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.2" ntex-bytes = "0.1.24" -ntex-util = "1.0" -ntex-service = "2.0" +ntex-util = "2.0" +ntex-service = "3.0" bitflags = "2" log = "0.4" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 8470fe6a..c0aeb0d1 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -2,9 +2,8 @@ #![allow(clippy::let_underscore_future)] use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; -use ntex_bytes::Pool; use ntex_codec::{Decoder, Encoder}; -use ntex_service::{IntoService, Pipeline, PipelineCall, Service}; +use ntex_service::{IntoService, Pipeline, PipelineBinding, PipelineCall, Service}; use ntex_util::{future::Either, ready, spawn, time::Seconds}; use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError}; @@ -144,7 +143,6 @@ where flags: Flags, shared: Rc>, response: Option>>, - pool: Pool, cfg: DispatcherConfig, read_remains: u32, read_remains_prev: u32, @@ -158,7 +156,7 @@ where { io: IoBoxed, codec: U, - service: Pipeline, + service: PipelineBinding>, error: Cell::Error>>>, inflight: Cell, } @@ -194,7 +192,7 @@ impl From> for DispatcherError { impl Dispatcher where - S: Service, Response = Option>>, + S: Service, Response = Option>> + 'static, U: Decoder + Encoder + 'static, { /// Construct new `Dispatcher` instance. @@ -217,18 +215,16 @@ where Flags::KA_ENABLED }; - let pool = io.memory_pool().pool(); let shared = Rc::new(DispatcherShared { io, codec, error: Cell::new(None), inflight: Cell::new(0), - service: Pipeline::new(service.into_service()), + service: Pipeline::new(service.into_service()).bind(), }); Dispatcher { inner: DispatcherInner { - pool, shared, flags, cfg: cfg.clone(), @@ -284,14 +280,6 @@ where } } - // handle memory pool pressure - if slf.pool.poll_ready(cx).is_pending() { - slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); - slf.shared.io.stop_timer(); - slf.shared.io.pause(); - return Poll::Pending; - } - loop { match slf.st { DispatcherState::Processing => { @@ -434,7 +422,7 @@ where U: Decoder + Encoder + 'static, { fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem) { - let mut fut = self.shared.service.call_static(item); + let mut fut = self.shared.service.call(item); self.shared.inflight.set(self.shared.inflight.get() + 1); // optimize first call @@ -682,11 +670,7 @@ mod tests { U: Decoder + Encoder + 'static, { /// Construct new `Dispatcher` instance - pub(crate) fn debug>>( - io: T, - codec: U, - service: F, - ) -> (Self, State) { + pub(crate) fn debug(io: T, codec: U, service: S) -> (Self, State) { let cfg = DispatcherConfig::default() .set_keepalive_timeout(Seconds(1)) .clone(); @@ -694,14 +678,13 @@ mod tests { } /// Construct new `Dispatcher` instance - pub(crate) fn debug_cfg>>( + pub(crate) fn debug_cfg( io: T, codec: U, - service: F, + service: S, cfg: DispatcherConfig, ) -> (Self, State) { let state = Io::new(io); - let pool = state.memory_pool().pool(); state.set_disconnect_timeout(cfg.disconnect_timeout()); state.set_tag("DBG"); @@ -719,7 +702,7 @@ mod tests { io: state.into(), error: Cell::new(None), inflight: Cell::new(0), - service: Pipeline::new(service.into_service()), + service: Pipeline::new(service).bind(), }); ( @@ -731,7 +714,6 @@ mod tests { read_remains: 0, read_remains_prev: 0, read_max_timeout: Seconds::ZERO, - pool, shared, cfg, flags, @@ -864,9 +846,9 @@ mod tests { type Response = Option>; type Error = (); - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> { self.0.set(self.0.get() + 1); - Poll::Ready(Err(())) + Err(()) } async fn call( diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 20c42c07..ae5d657f 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-05-28 + +* Use async fn for Service::ready() and Service::shutdown() + ## [1.0.2] - 2024-03-30 * Fix glommio compat feature #327 diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 8658325c..71bd0fa9 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "1.0.2" +version = "2.0.0" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -28,16 +28,16 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"] async-std = ["ntex-rt/async-std", "ntex-async-std"] [dependencies] -ntex-service = "2.0" -ntex-bytes = "0.1.24" +ntex-service = "3.0" +ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "1.0" +ntex-io = "2.0" ntex-rt = "0.4.11" -ntex-util = "1.0" +ntex-util = "2.0" -ntex-tokio = { version = "0.4.0", optional = true } -ntex-glommio = { version = "0.4.0", optional = true } -ntex-async-std = { version = "0.4.0", optional = true } +ntex-tokio = { version = "0.5.0", optional = true } +ntex-glommio = { version = "0.5.0", optional = true } +ntex-async-std = { version = "0.5.0", optional = true } log = "0.4" thiserror = "1.0" diff --git a/ntex-net/src/connect/resolve.rs b/ntex-net/src/connect/resolve.rs index 74d115ca..ae906e74 100644 --- a/ntex-net/src/connect/resolve.rs +++ b/ntex-net/src/connect/resolve.rs @@ -6,7 +6,6 @@ use ntex_util::future::Either; use super::{Address, Connect, ConnectError}; -#[derive(Copy)] /// DNS Resolver Service pub struct Resolver(marker::PhantomData); @@ -17,6 +16,8 @@ impl Resolver { } } +impl Copy for Resolver {} + impl Resolver { /// Lookup ip addresses for provided host pub async fn lookup(&self, req: Connect) -> Result, ConnectError> { @@ -100,7 +101,7 @@ impl Default for Resolver { impl Clone for Resolver { fn clone(&self) -> Self { - Resolver(marker::PhantomData) + *self } } @@ -117,7 +118,7 @@ impl ServiceFactory, C> for Resolver { type InitError = (); async fn create(&self, _: C) -> Result { - Ok(self.clone()) + Ok(*self) } } @@ -144,7 +145,7 @@ mod tests { async fn resolver() { let resolver = Resolver::default().clone(); assert!(format!("{:?}", resolver).contains("Resolver")); - let srv = resolver.pipeline(()).await.unwrap(); + let srv = resolver.pipeline(()).await.unwrap().bind(); assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready()); let res = srv.call(Connect::new("www.rust-lang.org")).await; diff --git a/ntex-net/src/connect/service.rs b/ntex-net/src/connect/service.rs index 083a3638..4969b9fd 100644 --- a/ntex-net/src/connect/service.rs +++ b/ntex-net/src/connect/service.rs @@ -9,13 +9,15 @@ use ntex_util::future::{BoxFuture, Either}; use super::{Address, Connect, ConnectError, Resolver}; use crate::tcp_connect_in; -#[derive(Copy)] +/// Basic tcp stream connector pub struct Connector { resolver: Resolver, pool: PoolRef, tag: &'static str, } +impl Copy for Connector {} + impl Connector { /// Construct new connect service with default dns resolver pub fn new() -> Self { @@ -85,11 +87,7 @@ impl Default for Connector { impl Clone for Connector { fn clone(&self) -> Self { - Connector { - resolver: self.resolver.clone(), - tag: self.tag, - pool: self.pool, - } + *self } } @@ -110,7 +108,7 @@ impl ServiceFactory, C> for Connector { type InitError = (); async fn create(&self, _: C) -> Result { - Ok(self.clone()) + Ok(*self) } } diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index fde095fa..f6385f46 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-05-28 + +* Use async fn for Service::ready() and Service::shutdown() + ## [1.0.5] - 2024-04-02 * Fix external configuration handling diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index 76893170..4bab2c28 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "1.0.5" +version = "2.0.0" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,11 +16,11 @@ name = "ntex_server" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.24" -ntex-net = "1.0" -ntex-service = "2.0" -ntex-rt = "0.4.13" -ntex-util = "1.0" +ntex-bytes = "0.1" +ntex-net = "2.0" +ntex-service = "3.0" +ntex-rt = "0.4" +ntex-util = "2.0" async-channel = "2" async-broadcast = "0.7" diff --git a/ntex-server/src/net/counter.rs b/ntex-server/src/net/counter.rs index 214045ab..5d92f173 100644 --- a/ntex-server/src/net/counter.rs +++ b/ntex-server/src/net/counter.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, rc::Rc, task}; +use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll}; use ntex_util::task::LocalWaker; @@ -30,8 +30,15 @@ impl Counter { /// Check if counter is not at capacity. If counter at capacity /// it registers notification for current task. - pub(super) fn available(&self, cx: &mut task::Context<'_>) -> bool { - self.0.available(cx) + pub(super) async fn available(&self) { + poll_fn(|cx| { + if self.0.available(cx) { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await } /// Get total number of acquired counts diff --git a/ntex-server/src/net/factory.rs b/ntex-server/src/net/factory.rs index 27c08de0..3cfa070b 100644 --- a/ntex-server/src/net/factory.rs +++ b/ntex-server/src/net/factory.rs @@ -1,4 +1,3 @@ -use std::task::{Context, Poll}; use std::{fmt, future::Future, marker::PhantomData}; use ntex_bytes::PoolId; @@ -144,10 +143,10 @@ where type Response = (); type Error = (); - ntex_service::forward_poll_shutdown!(inner); + ntex_service::forward_shutdown!(inner); - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx).map_err(|_| ()) + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { + ctx.ready(&self.inner).await.map_err(|_| ()) } async fn call(&self, req: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { diff --git a/ntex-server/src/net/service.rs b/ntex-server/src/net/service.rs index efdcd56a..52cabc6a 100644 --- a/ntex-server/src/net/service.rs +++ b/ntex-server/src/net/service.rs @@ -1,5 +1,3 @@ -use std::{task::Context, task::Poll}; - use ntex_bytes::{Pool, PoolRef}; use ntex_net::Io; use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory}; @@ -142,53 +140,34 @@ impl Service for StreamServiceImpl { type Response = (); type Error = (); - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let mut ready = self.conns.available(cx); + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.conns.available().await; for (idx, svc) in self.services.iter().enumerate() { - match svc.poll_ready(cx) { - Poll::Pending => ready = false, - Poll::Ready(Ok(())) => (), - Poll::Ready(Err(_)) => { + match ctx.ready(svc).await { + Ok(()) => (), + Err(_) => { for (idx_, tag, _, _) in self.tokens.values() { if idx == *idx_ { log::error!("{}: Service readiness has failed", tag); break; } } - return Poll::Ready(Err(())); + return Err(()); } } } - // check memory pools - for (_, _, pool, _) in self.tokens.values() { - ready = pool.poll_ready(cx).is_ready() && ready; - } - - if ready { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + Ok(()) } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let mut ready = true; + async fn shutdown(&self) { for svc in &self.services { - match svc.poll_shutdown(cx) { - Poll::Pending => ready = false, - Poll::Ready(_) => (), - } - } - if ready { - log::info!( - "Worker service shutdown, {} connections", - super::num_connections() - ); - Poll::Ready(()) - } else { - Poll::Pending + svc.shutdown().await; } + log::info!( + "Worker service shutdown, {} connections", + super::num_connections() + ); } async fn call(&self, req: ServerMessage, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index a19554da..b0e1fed5 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -6,7 +6,7 @@ use async_broadcast::{self as bus, broadcast}; use async_channel::{unbounded, Receiver, Sender}; use ntex_rt::{spawn, Arbiter}; -use ntex_service::{Pipeline, ServiceFactory}; +use ntex_service::{Pipeline, PipelineBinding, ServiceFactory}; use ntex_util::future::{select, stream_recv, Either, Stream}; use ntex_util::time::{sleep, timeout_checked, Millis}; @@ -240,8 +240,10 @@ struct WorkerSt>> { availability: WorkerAvailabilityTx, } -async fn run_worker(mut svc: Pipeline, mut wrk: WorkerSt) -where +async fn run_worker( + mut svc: PipelineBinding>, + mut wrk: WorkerSt, +) where T: Send + 'static, F: ServiceFactory> + 'static, { @@ -250,7 +252,7 @@ where ready!(svc.poll_ready(cx)?); if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) { - let fut = svc.call_static(WorkerMessage::New(item)); + let fut = svc.call(WorkerMessage::New(item)); let _ = spawn(async move { let _ = fut.await; }); @@ -267,17 +269,17 @@ where wrk.availability.set(false); if timeout.is_zero() { - let fut = svc.call_static(WorkerMessage::ForceShutdown); + let fut = svc.call(WorkerMessage::ForceShutdown); let _ = spawn(async move { let _ = fut.await; }); sleep(STOP_TIMEOUT).await; } else { - let fut = svc.call_static(WorkerMessage::Shutdown(timeout)); + let fut = svc.call(WorkerMessage::Shutdown(timeout)); let res = timeout_checked(timeout, fut).await; let _ = result.send(res.is_ok()); }; - poll_fn(|cx| svc.poll_shutdown(cx)).await; + svc.shutdown().await; log::info!("Stopping worker {:?}", wrk.id); return; @@ -289,7 +291,7 @@ where match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await { Either::Left(Ok(service)) => { wrk.availability.set(true); - svc = Pipeline::new(service); + svc = Pipeline::new(service).bind(); break; } Either::Left(Err(_)) => sleep(STOP_TIMEOUT).await, @@ -305,7 +307,13 @@ async fn create( stop: Receiver, factory: Result, availability: WorkerAvailabilityTx, -) -> Result<(Pipeline, WorkerSt), ()> +) -> Result< + ( + PipelineBinding>, + WorkerSt, + ), + (), +> where T: Send + 'static, F: ServiceFactory> + 'static, @@ -317,7 +325,7 @@ where let mut stop = Box::pin(stop); let svc = match select(factory.create(()), stream_recv(&mut stop)).await { - Either::Left(Ok(svc)) => Pipeline::new(svc), + Either::Left(Ok(svc)) => Pipeline::new(svc).bind(), Either::Left(Err(_)) => return Err(()), Either::Right(Some(Shutdown { result, .. })) => { log::trace!("Shutdown uninitialized worker"); diff --git a/ntex-tls/CHANGES.md b/ntex-tls/CHANGES.md index 42e57bfb..224c8d6f 100644 --- a/ntex-tls/CHANGES.md +++ b/ntex-tls/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-05-28 + +* Use async fn for Service::ready() and Service::shutdown() + ## [1.1.0] - 2024-03-24 * Move tls connectors from ntex-connect diff --git a/ntex-tls/Cargo.toml b/ntex-tls/Cargo.toml index 7eee8781..9d60adc1 100644 --- a/ntex-tls/Cargo.toml +++ b/ntex-tls/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tls" -version = "1.1.0" +version = "2.0.0" authors = ["ntex contributors "] description = "An implementation of SSL streams for ntex backed by OpenSSL" keywords = ["network", "framework", "async", "futures"] @@ -25,11 +25,11 @@ openssl = ["tls_openssl"] rustls = ["tls_rust"] [dependencies] -ntex-bytes = "0.1.21" -ntex-io = "1.0" -ntex-util = "1.0" -ntex-service = "2.0" -ntex-net = "1.0" +ntex-bytes = "0.1" +ntex-io = "2.0" +ntex-util = "2.0" +ntex-service = "3.0" +ntex-net = "2.0" log = "0.4" diff --git a/ntex-tls/src/counter.rs b/ntex-tls/src/counter.rs index f2127db6..6ca0e401 100644 --- a/ntex-tls/src/counter.rs +++ b/ntex-tls/src/counter.rs @@ -1,5 +1,4 @@ -#![allow(dead_code)] -use std::{cell::Cell, rc::Rc, task}; +use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll}; use ntex_util::task::LocalWaker; @@ -33,8 +32,15 @@ impl Counter { /// Check if counter is not at capacity. If counter at capacity /// it registers notification for current task. - pub(super) fn available(&self, cx: &mut task::Context<'_>) -> bool { - self.0.available(cx) + pub(super) async fn available(&self) { + poll_fn(|cx| { + if self.0.available(cx) { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await } } diff --git a/ntex-tls/src/openssl/accept.rs b/ntex-tls/src/openssl/accept.rs index 6fc0921c..a083f69e 100644 --- a/ntex-tls/src/openssl/accept.rs +++ b/ntex-tls/src/openssl/accept.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, error::Error, fmt, io, task::Context, task::Poll}; +use std::{cell::RefCell, error::Error, fmt, io}; use ntex_io::{Filter, Io, Layer}; use ntex_service::{Service, ServiceCtx, ServiceFactory}; @@ -97,12 +97,9 @@ impl Service> for SslAcceptorService { type Response = Io>; type Error = Box; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - if self.conns.available(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.conns.available().await; + Ok(()) } async fn call( diff --git a/ntex-tls/src/openssl/connect.rs b/ntex-tls/src/openssl/connect.rs index b8ccb9ff..3df5debf 100644 --- a/ntex-tls/src/openssl/connect.rs +++ b/ntex-tls/src/openssl/connect.rs @@ -27,12 +27,7 @@ impl SslConnector { /// Use specified memory pool for memory allocations. By default P0 /// memory pool is used. pub fn memory_pool(self, id: PoolId) -> Self { - let connector = self - .connector - .into_service() - .expect("Connector has been cloned") - .memory_pool(id) - .into(); + let connector = self.connector.get_ref().memory_pool(id).into(); Self { connector, diff --git a/ntex-tls/src/rustls/accept.rs b/ntex-tls/src/rustls/accept.rs index d8bc8b61..d5102357 100644 --- a/ntex-tls/src/rustls/accept.rs +++ b/ntex-tls/src/rustls/accept.rs @@ -1,4 +1,3 @@ -use std::task::{Context, Poll}; use std::{io, sync::Arc}; use tls_rust::ServerConfig; @@ -81,12 +80,9 @@ impl Service> for TlsAcceptorService { type Response = Io>; type Error = io::Error; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - if self.conns.available(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.conns.available().await; + Ok(()) } async fn call( diff --git a/ntex-tls/src/rustls/connect.rs b/ntex-tls/src/rustls/connect.rs index 8398ed35..ab3c5939 100644 --- a/ntex-tls/src/rustls/connect.rs +++ b/ntex-tls/src/rustls/connect.rs @@ -36,12 +36,7 @@ impl TlsConnector { /// Use specified memory pool for memory allocations. By default P0 /// memory pool is used. pub fn memory_pool(self, id: PoolId) -> Self { - let connector = self - .connector - .into_service() - .unwrap() - .memory_pool(id) - .into(); + let connector = self.connector.get_ref().memory_pool(id).into(); Self { connector, config: self.config, @@ -146,7 +141,7 @@ mod tests { .memory_pool(PoolId::P5) .clone(); - let srv = factory.pipeline(&()).await.unwrap(); + let srv = factory.pipeline(&()).await.unwrap().bind(); // always ready assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready()); let result = srv diff --git a/ntex-tokio/Cargo.toml b/ntex-tokio/Cargo.toml index 62dba300..9bffa23d 100644 --- a/ntex-tokio/Cargo.toml +++ b/ntex-tokio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tokio" -version = "0.4.0" +version = "0.5.0" authors = ["ntex contributors "] description = "tokio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,8 +16,8 @@ name = "ntex_tokio" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.21" -ntex-io = "1.0.0" -ntex-util = "1.0.0" +ntex-bytes = "0.1" +ntex-io = "2.0" +ntex-util = "2.0" log = "0.4" tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] } diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 7d855ea0..3afaeb80 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-05-28 + +* Use async fn for Service::ready() and Service::shutdown() + ## [1.1.0] - 2024-04-xx * Change Extensions::insert() method according doc #345 diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 39b96910..88ddd23f 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "1.1.0" +version = "2.0.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,7 +16,7 @@ name = "ntex_util" path = "src/lib.rs" [dependencies] -ntex-service = "2.0" +ntex-service = "3.0" ntex-rt = "0.4" bitflags = "2.4" fxhash = "0.2.1" diff --git a/ntex-util/src/services/buffer.rs b/ntex-util/src/services/buffer.rs index 8dbd75d9..d1b1cd3a 100644 --- a/ntex-util/src/services/buffer.rs +++ b/ntex-util/src/services/buffer.rs @@ -1,9 +1,9 @@ //! Service that buffers incomming requests. use std::cell::{Cell, RefCell}; -use std::task::{ready, Context, Poll}; -use std::{collections::VecDeque, fmt, marker::PhantomData}; +use std::task::{ready, Poll}; +use std::{collections::VecDeque, fmt, future::poll_fn, marker::PhantomData}; -use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; +use ntex_service::{Middleware, Service, ServiceCtx}; use crate::channel::oneshot; @@ -121,15 +121,12 @@ impl BufferService where S: Service, { - pub fn new(size: usize, service: U) -> Self - where - U: IntoService, - { + pub fn new(size: usize, service: S) -> Self { Self { size, + service, cancel_on_shutdown: false, ready: Cell::new(false), - service: service.into_service(), buf: RefCell::new(VecDeque::with_capacity(size)), next_call: RefCell::default(), _t: PhantomData, @@ -185,7 +182,7 @@ where type Error = BufferServiceError; #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let mut buffer = self.buf.borrow_mut(); let mut next_call = self.next_call.borrow_mut(); if let Some(next_call) = &*next_call { @@ -220,42 +217,45 @@ where Poll::Ready(Ok(())) } - fn poll_shutdown(&self, cx: &mut std::task::Context<'_>) -> Poll<()> { - let mut buffer = self.buf.borrow_mut(); - if self.cancel_on_shutdown { - buffer.clear(); - } else if !buffer.is_empty() { - let mut next_call = self.next_call.borrow_mut(); - if let Some(next_call) = &*next_call { - // hold advancement until the last released task either makes a call or is dropped - let _ = ready!(next_call.poll_recv(cx)); - } - next_call.take(); - - if ready!(self.service.poll_ready(cx)).is_err() { - log::error!( - "Buffered inner service failed while buffer flushing on shutdown" - ); - return Poll::Ready(()); - } - - while let Some(sender) = buffer.pop_front() { - let (next_call_tx, next_call_rx) = oneshot::channel(); - if sender.send(next_call_tx).is_err() - || next_call_rx.poll_recv(cx).is_ready() - { - // the task is gone - continue; + async fn shutdown(&self) { + poll_fn(|cx| { + let mut buffer = self.buf.borrow_mut(); + if self.cancel_on_shutdown { + buffer.clear(); + } else if !buffer.is_empty() { + let mut next_call = self.next_call.borrow_mut(); + if let Some(next_call) = &*next_call { + // hold advancement until the last released task either makes a call or is dropped + let _ = ready!(next_call.poll_recv(cx)); } - next_call.replace(next_call_rx); - if buffer.is_empty() { - break; - } - return Poll::Pending; - } - } + next_call.take(); - self.service.poll_shutdown(cx) + if ready!(self.service.poll_ready(cx)).is_err() { + log::error!( + "Buffered inner service failed while buffer flushing on shutdown" + ); + return Poll::Ready(()); + } + + while let Some(sender) = buffer.pop_front() { + let (next_call_tx, next_call_rx) = oneshot::channel(); + if sender.send(next_call_tx).is_err() + || next_call_rx.poll_recv(cx).is_ready() + { + // the task is gone + continue; + } + next_call.replace(next_call_rx); + if buffer.is_empty() { + break; + } + return Poll::Pending; + } + } + }) + .await; + + self.service.shutdown().await; } async fn call( diff --git a/ntex-util/src/services/counter.rs b/ntex-util/src/services/counter.rs index 3a248417..2cbbfe54 100644 --- a/ntex-util/src/services/counter.rs +++ b/ntex-util/src/services/counter.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, rc::Rc, task}; +use std::{cell::Cell, future::poll_fn, rc::Rc, task}; use crate::task::LocalWaker; @@ -32,7 +32,20 @@ impl Counter { /// Check if counter is not at capacity. If counter at capacity /// it registers notification for current task. - pub fn available(&self, cx: &mut task::Context<'_>) -> bool { + pub async fn available(&self) { + poll_fn(|cx| { + if self.poll_available(cx) { + task::Poll::Ready(()) + } else { + task::Poll::Pending + } + }) + .await + } + + /// Check if counter is not at capacity. If counter at capacity + /// it registers notification for current task. + pub fn poll_available(&self, cx: &mut task::Context<'_>) -> bool { self.0.available(cx) } diff --git a/ntex-util/src/services/inflight.rs b/ntex-util/src/services/inflight.rs index c6fff000..5bdfec23 100644 --- a/ntex-util/src/services/inflight.rs +++ b/ntex-util/src/services/inflight.rs @@ -1,7 +1,5 @@ //! Service that limits number of in-flight async requests. -use std::{task::Context, task::Poll}; - -use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; +use ntex_service::{Middleware, Service, ServiceCtx}; use super::counter::Counter; @@ -44,14 +42,13 @@ pub struct InFlightService { } impl InFlightService { - pub fn new(max: usize, service: U) -> Self + pub fn new(max: usize, service: S) -> Self where S: Service, - U: IntoService, { Self { + service, count: Counter::new(max), - service: service.into_service(), } } } @@ -64,15 +61,9 @@ where type Error = T::Error; #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - if self.service.poll_ready(cx)?.is_pending() { - Poll::Pending - } else if !self.count.available(cx) { - log::trace!("InFlight limit exceeded"); - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.count.available().await; + ctx.ready(&self.service).await } #[inline] @@ -85,13 +76,14 @@ where ctx.call(&self.service, req).await } - ntex_service::forward_poll_shutdown!(service); + ntex_service::forward_shutdown!(service); } #[cfg(test)] mod tests { + use std::{cell::RefCell, task::Poll, time::Duration}; + use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory}; - use std::{cell::RefCell, time::Duration}; use super::*; use crate::{channel::oneshot, future::lazy}; @@ -112,7 +104,7 @@ mod tests { async fn test_service() { let (tx, rx) = oneshot::channel(); - let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))); + let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); @@ -125,7 +117,7 @@ mod tests { let _ = tx.send(()); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready()); + assert_eq!(srv.shutdown().await, ()); } #[ntex_macros::rt_test2] @@ -146,7 +138,7 @@ mod tests { }), ); - let srv = srv.pipeline(&()).await.unwrap(); + let srv = srv.pipeline(&()).await.unwrap().bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); diff --git a/ntex-util/src/services/keepalive.rs b/ntex-util/src/services/keepalive.rs index d363f19f..5932e4e9 100644 --- a/ntex-util/src/services/keepalive.rs +++ b/ntex-util/src/services/keepalive.rs @@ -1,5 +1,4 @@ -use std::task::{Context, Poll}; -use std::{cell::Cell, convert::Infallible, fmt, marker, time::Duration, time::Instant}; +use std::{cell::Cell, convert::Infallible, fmt, future::poll_fn, marker, task, time}; use ntex_service::{Service, ServiceCtx, ServiceFactory}; @@ -73,7 +72,7 @@ pub struct KeepAliveService { f: F, dur: Millis, sleep: Sleep, - expire: Cell, + expire: Cell, _t: marker::PhantomData<(R, E)>, } @@ -111,23 +110,24 @@ where type Response = R; type Error = E; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - match self.sleep.poll_elapsed(cx) { - Poll::Ready(_) => { + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + poll_fn(|cx| match self.sleep.poll_elapsed(cx) { + task::Poll::Ready(_) => { let now = now(); - let expire = self.expire.get() + Duration::from(self.dur); + let expire = self.expire.get() + time::Duration::from(self.dur); if expire <= now { - Poll::Ready(Err((self.f)())) + task::Poll::Ready(Err((self.f)())) } else { let expire = expire - now; self.sleep .reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX))); let _ = self.sleep.poll_elapsed(cx); - Poll::Ready(Ok(())) + task::Poll::Ready(Ok(())) } } - Poll::Pending => Poll::Ready(Ok(())), - } + task::Poll::Pending => task::Poll::Ready(Ok(())), + }) + .await } async fn call(&self, req: R, _: ServiceCtx<'_, Self>) -> Result { @@ -138,6 +138,8 @@ where #[cfg(test)] mod tests { + use std::task::Poll; + use super::*; use crate::future::lazy; @@ -150,7 +152,7 @@ mod tests { assert!(format!("{:?}", factory).contains("KeepAlive")); let _ = factory.clone(); - let service = factory.pipeline(&()).await.unwrap(); + let service = factory.pipeline(&()).await.unwrap().bind(); assert!(format!("{:?}", service).contains("KeepAliveService")); assert_eq!(service.call(1usize).await, Ok(1usize)); diff --git a/ntex-util/src/services/mod.rs b/ntex-util/src/services/mod.rs index 28974192..fa575077 100644 --- a/ntex-util/src/services/mod.rs +++ b/ntex-util/src/services/mod.rs @@ -1,4 +1,4 @@ -pub mod buffer; +// pub mod buffer; pub mod counter; mod extensions; pub mod inflight; diff --git a/ntex-util/src/services/onerequest.rs b/ntex-util/src/services/onerequest.rs index 011b5a22..33c26fba 100644 --- a/ntex-util/src/services/onerequest.rs +++ b/ntex-util/src/services/onerequest.rs @@ -1,7 +1,7 @@ //! Service that limits number of in-flight async requests to 1. -use std::{cell::Cell, task::Context, task::Poll}; +use std::{cell::Cell, future::poll_fn, task::Poll}; -use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; +use ntex_service::{Middleware, Service, ServiceCtx}; use crate::task::LocalWaker; @@ -30,13 +30,12 @@ pub struct OneRequestService { } impl OneRequestService { - pub fn new(service: U) -> Self + pub fn new(service: S) -> Self where S: Service, - U: IntoService, { Self { - service: service.into_service(), + service, ready: Cell::new(true), waker: LocalWaker::new(), } @@ -51,15 +50,18 @@ where type Error = T::Error; #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.waker.register(cx.waker()); - if self.service.poll_ready(cx)?.is_pending() { - Poll::Pending - } else if self.ready.get() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + poll_fn(|cx| { + self.waker.register(cx.waker()); + if self.ready.get() { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + + ctx.ready(&self.service).await } #[inline] @@ -76,7 +78,7 @@ where result } - ntex_service::forward_poll_shutdown!(service); + ntex_service::forward_shutdown!(service); } #[cfg(test)] @@ -103,7 +105,7 @@ mod tests { async fn test_oneshot() { let (tx, rx) = oneshot::channel(); - let srv = Pipeline::new(OneRequestService::new(SleepService(rx))); + let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); @@ -116,7 +118,7 @@ mod tests { let _ = tx.send(()); crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready()); + assert_eq!(srv.shutdown().await, ()); } #[ntex_macros::rt_test2] @@ -133,7 +135,7 @@ mod tests { }), ); - let srv = srv.pipeline(&()).await.unwrap(); + let srv = srv.pipeline(&()).await.unwrap().bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index 53404c25..04ddf31c 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -4,7 +4,7 @@ //! will be aborted. use std::{fmt, marker}; -use ntex_service::{IntoService, Middleware, Service, ServiceCtx}; +use ntex_service::{Middleware, Service, ServiceCtx}; use crate::future::{select, Either}; use crate::time::{sleep, Millis}; @@ -104,15 +104,14 @@ pub struct TimeoutService { } impl TimeoutService { - pub fn new(timeout: T, service: U) -> Self + pub fn new(timeout: T, service: S) -> Self where T: Into, S: Service, - U: IntoService, { TimeoutService { + service, timeout: timeout.into(), - service: service.into_service(), } } } @@ -141,8 +140,8 @@ where } } - ntex_service::forward_poll_ready!(service, TimeoutError::Service); - ntex_service::forward_poll_shutdown!(service); + ntex_service::forward_ready!(service, TimeoutError::Service); + ntex_service::forward_shutdown!(service); } #[cfg(test)] @@ -152,7 +151,6 @@ mod tests { use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory}; use super::*; - use crate::future::lazy; #[derive(Clone, Debug, PartialEq)] struct SleepService(Duration); @@ -184,8 +182,8 @@ mod tests { let timeout = Pipeline::new(TimeoutService::new(resolution, SleepService(wait_time)).clone()); assert_eq!(timeout.call(()).await, Ok(())); - assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready()); - assert!(lazy(|cx| timeout.poll_shutdown(cx)).await.is_ready()); + assert_eq!(timeout.ready().await, Ok(())); + assert_eq!(timeout.shutdown().await, ()); } #[ntex_macros::rt_test2] @@ -196,7 +194,7 @@ mod tests { let timeout = Pipeline::new(TimeoutService::new(resolution, SleepService(wait_time))); assert_eq!(timeout.call(()).await, Ok(())); - assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready()); + assert_eq!(timeout.ready().await, Ok(())); } #[ntex_macros::rt_test2] diff --git a/ntex-util/src/services/variant.rs b/ntex-util/src/services/variant.rs index 021749f7..0e4757a8 100644 --- a/ntex-util/src/services/variant.rs +++ b/ntex-util/src/services/variant.rs @@ -1,5 +1,6 @@ //! Contains `Variant` service and related types and functions. -use std::{fmt, marker::PhantomData, task::Context, task::Poll}; +#![allow(non_snake_case)] +use std::{fmt, marker::PhantomData, task::Poll}; use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; @@ -70,7 +71,7 @@ macro_rules! variant_impl_and ({$fac1_type:ident, $fac2_type:ident, $name:ident, Response = V1::Response, Error = V1::Error, InitError = V1::InitError>, - F: IntoServiceFactory<$name, $r_name, V1C>, + F: IntoServiceFactory<$name, $r_name, V1C>, { $fac2_type { V1: self.V1, @@ -124,30 +125,30 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, type Response = V1::Response; type Error = V1::Error; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let mut ready = self.V1.poll_ready(cx)?.is_ready(); - $(ready = self.$T.poll_ready(cx)?.is_ready() && ready;)+ + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + use std::{future::Future, pin::Pin}; - if ready { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + let mut fut1 = ::std::pin::pin!(ctx.ready(&self.V1)); + $(let mut $T = ::std::pin::pin!(ctx.ready(&self.$T));)+ + + ::std::future::poll_fn(|cx| { + let mut ready = Pin::new(&mut fut1).poll(cx)?.is_ready(); + $(ready = Pin::new(&mut $T).poll(cx)?.is_ready() && ready;)+ + + if ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + }).await } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let mut ready = self.V1.poll_shutdown(cx).is_ready(); - $(ready = self.$T.poll_shutdown(cx).is_ready() && ready;)+ - - if ready { - Poll::Ready(()) - } else { - Poll::Pending - } + async fn shutdown(&self) { + self.V1.shutdown().await; + $(self.$T.shutdown().await;)+ } - async fn call(&self, req: $enum_type, ctx: ServiceCtx<'_, Self>) -> Result - { + async fn call(&self, req: $enum_type, ctx: ServiceCtx<'_, Self>) -> Result { match req { $enum_type::V1(req) => ctx.call(&self.V1, req).await, $($enum_type::$T(req) => ctx.call(&self.$T, req).await,)+ @@ -235,7 +236,6 @@ mod tests { use ntex_service::fn_factory; use super::*; - use crate::future::lazy; #[derive(Clone)] struct Srv1; @@ -244,13 +244,11 @@ mod tests { type Response = usize; type Error = (); - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + Ok(()) } - fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> { - Poll::Ready(()) - } + async fn shutdown(&self) {} async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result { Ok(1) @@ -264,13 +262,11 @@ mod tests { type Response = usize; type Error = (); - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + Ok(()) } - fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> { - Poll::Ready(()) - } + async fn shutdown(&self) {} async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result { Ok(2) @@ -286,8 +282,8 @@ mod tests { .clone(); let service = factory.pipeline(&()).await.unwrap().clone(); - assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); - assert!(lazy(|cx| service.poll_shutdown(cx)).await.is_ready()); + assert!(service.ready().await.is_ok()); + assert_eq!(service.shutdown().await, ()); assert_eq!(service.call(Variant3::V1(())).await, Ok(1)); assert_eq!(service.call(Variant3::V2(())).await, Ok(2));