From 3b49828e5f86e7668981a0b3cb36d522cc773d7b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 28 May 2024 19:26:08 +0500 Subject: [PATCH] Use async fn for Service::ready() and Service::shutdown() (#364) --- README.md | 2 +- ntex-bytes/Cargo.toml | 2 +- ntex-io/Cargo.toml | 2 +- ntex-macros/Cargo.toml | 2 +- ntex-net/Cargo.toml | 2 +- ntex-server/Cargo.toml | 2 +- ntex-service/Cargo.toml | 4 +- ntex-tls/Cargo.toml | 2 +- ntex-tls/examples/rustls-client.rs | 2 +- ntex-tls/examples/simple-client.rs | 2 +- ntex-tls/src/counter.rs | 1 + ntex-util/Cargo.toml | 2 +- ntex/CHANGES.md | 4 ++ ntex/Cargo.toml | 16 ++++---- ntex/src/http/client/connector.rs | 39 +++++++----------- ntex/src/http/client/h2proto.rs | 4 +- ntex/src/http/client/pool.rs | 20 +++++---- ntex/src/http/config.rs | 2 +- ntex/src/http/h1/control.rs | 2 +- ntex/src/http/h1/service.rs | 49 +++++++--------------- ntex/src/http/h2/service.rs | 13 +++--- ntex/src/http/service.rs | 50 ++++++++--------------- ntex/src/lib.rs | 17 +++----- ntex/src/web/app_service.rs | 21 ++++------ ntex/src/web/middleware/compress.rs | 4 +- ntex/src/web/middleware/defaultheaders.rs | 7 ++-- ntex/src/web/middleware/logger.rs | 6 +-- ntex/src/web/scope.rs | 17 ++++---- ntex/src/web/ws.rs | 4 +- ntex/src/ws/client.rs | 14 +++---- ntex/tests/connect.rs | 6 +-- ntex/tests/http_ws.rs | 6 +-- ntex/tests/http_ws_client.rs | 11 +++-- 33 files changed, 147 insertions(+), 190 deletions(-) diff --git a/README.md b/README.md index 2545c767..51ce1c86 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Starting ntex v0.5 async runtime must be selected as a feature. Available option ```toml [dependencies] -ntex = { version = "1.0", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } ``` ## Documentation & community resources diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index 8d3eeb3a..a64fc4aa 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -29,5 +29,5 @@ simdutf8 = { version = "0.1.4", optional = true } [dev-dependencies] serde_test = "1" serde_json = "1" -ntex = { version = "1", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } ntex-bytes = { path = ".", features = ["mpool"] } diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 18d86279..516a8199 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -29,4 +29,4 @@ pin-project-lite = "0.2" rand = "0.8" env_logger = "0.11" -ntex = { version = "1", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml index 5bc4089e..40476494 100644 --- a/ntex-macros/Cargo.toml +++ b/ntex-macros/Cargo.toml @@ -16,6 +16,6 @@ syn = { version = "^1", features = ["full", "parsing"] } proc-macro2 = "^1" [dev-dependencies] -ntex = { version = "1", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } futures = "0.3" env_logger = "0.11" \ No newline at end of file diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 71bd0fa9..a026af03 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -44,4 +44,4 @@ thiserror = "1.0" [dev-dependencies] env_logger = "0.11" -ntex = { version = "1", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index 4bab2c28..0fc585d2 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -30,7 +30,7 @@ socket2 = "0.5" oneshot = { version = "0.1", default-features = false, features = ["async"] } [dev-dependencies] -ntex = { version = "1", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } ntex-macros = "0.1.3" [target.'cfg(target_family = "unix")'.dependencies] diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 7fc6f922..fd5f1826 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -19,5 +19,5 @@ path = "src/lib.rs" slab = "0.4" [dev-dependencies] -ntex = { version = "1", features = ["tokio"] } -ntex-util = "1" +ntex = { version = "2", features = ["tokio"] } +ntex-util = "2" diff --git a/ntex-tls/Cargo.toml b/ntex-tls/Cargo.toml index 9d60adc1..c6ede013 100644 --- a/ntex-tls/Cargo.toml +++ b/ntex-tls/Cargo.toml @@ -40,7 +40,7 @@ tls_openssl = { version = "0.10", package = "openssl", optional = true } tls_rust = { version = "0.23", package = "rustls", optional = true } [dev-dependencies] -ntex = { version = "1", features = ["openssl", "rustls", "tokio"] } +ntex = { version = "2", features = ["openssl", "rustls", "tokio"] } env_logger = "0.11" rustls-pemfile = "2" webpki-roots = "0.26" diff --git a/ntex-tls/examples/rustls-client.rs b/ntex-tls/examples/rustls-client.rs index 9e021488..b2844df9 100644 --- a/ntex-tls/examples/rustls-client.rs +++ b/ntex-tls/examples/rustls-client.rs @@ -16,7 +16,7 @@ async fn main() -> io::Result<()> { .with_no_client_auth(); // rustls connector - let connector = connect::rustls::Connector::new(config.clone()); + let connector = connect::rustls::TlsConnector::new(config.clone()); //let io = connector.connect("www.rust-lang.org:443").await.unwrap(); let io = connector.connect("127.0.0.1:8443").await.unwrap(); diff --git a/ntex-tls/examples/simple-client.rs b/ntex-tls/examples/simple-client.rs index 6e2f5d39..29a17c73 100644 --- a/ntex-tls/examples/simple-client.rs +++ b/ntex-tls/examples/simple-client.rs @@ -21,7 +21,7 @@ async fn main() -> io::Result<()> { builder.set_verify(SslVerifyMode::NONE); // openssl connector - let connector = connect::openssl::Connector::new(builder.build()); + let connector = connect::openssl::SslConnector::new(builder.build()); let io = connector.connect("127.0.0.1:8443").await.unwrap(); println!("Connected to ssl server"); diff --git a/ntex-tls/src/counter.rs b/ntex-tls/src/counter.rs index 6ca0e401..d91bfc90 100644 --- a/ntex-tls/src/counter.rs +++ b/ntex-tls/src/counter.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll}; use ntex_util::task::LocalWaker; diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 29447bcd..4ed6037a 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -28,6 +28,6 @@ futures-sink = { version = "0.3", default-features = false, features = ["alloc"] pin-project-lite = "0.2" [dev-dependencies] -ntex = { version = "1", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } ntex-macros = "0.1.3" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 621a9381..a6f19e1f 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-05-28 + +* Use "async fn" for Service::ready() and Service::shutdown() + ## [1.2.1] - 2024-03-28 * Feature gate websocket support #320 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 9d504323..e0c6f775 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "1.2.1" +version = "2.0.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -60,16 +60,16 @@ brotli = ["dep:brotli2"] ntex-codec = "0.6.2" ntex-http = "0.1.12" ntex-router = "0.5.3" -ntex-service = "2.0.1" +ntex-service = "3.0" ntex-macros = "0.1.3" -ntex-util = "1.0.1" +ntex-util = "2.0" ntex-bytes = "0.1.25" -ntex-server = "1.0.5" -ntex-h2 = "0.5.4" +ntex-server = "2.0" +ntex-h2 = "1.0" ntex-rt = "0.4.12" -ntex-io = "1.2.0" -ntex-net = "1.0.1" -ntex-tls = "1.1.0" +ntex-io = "2.0" +ntex-net = "2.0" +ntex-tls = "2.0" base64 = "0.22" bitflags = "2" diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index 7f63d5ad..c0197274 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -1,11 +1,11 @@ -use std::{fmt, task::Context, task::Poll, time::Duration}; +use std::{fmt, time::Duration}; use ntex_h2::{self as h2}; use crate::connect::{Connect as TcpConnect, Connector as TcpConnector}; use crate::service::{apply_fn, boxed, Service, ServiceCtx}; use crate::time::{Millis, Seconds}; -use crate::util::{timeout::TimeoutError, timeout::TimeoutService}; +use crate::util::{join, timeout::TimeoutError, timeout::TimeoutService}; use crate::{http::Uri, io::IoBoxed}; use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect}; @@ -273,31 +273,20 @@ where type Response = as Service>::Response; type Error = ConnectError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready = self.tcp_pool.poll_ready(cx)?.is_ready(); - let ready = if let Some(ref ssl_pool) = self.ssl_pool { - ssl_pool.poll_ready(cx)?.is_ready() && ready + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + if let Some(ref ssl_pool) = self.ssl_pool { + let (r1, r2) = join(ctx.ready(&self.tcp_pool), ctx.ready(ssl_pool)).await; + r1?; + r2 } else { - ready - }; - if ready { - Poll::Ready(Ok(())) - } else { - Poll::Pending + ctx.ready(&self.tcp_pool).await } } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let tcp_ready = self.tcp_pool.poll_shutdown(cx).is_ready(); - let ssl_ready = self - .ssl_pool - .as_ref() - .map(|pool| pool.poll_shutdown(cx).is_ready()) - .unwrap_or(true); - if tcp_ready && ssl_ready { - Poll::Ready(()) - } else { - Poll::Pending + async fn shutdown(&self) { + self.tcp_pool.shutdown().await; + if let Some(ref ssl_pool) = self.ssl_pool { + ssl_pool.shutdown().await; } } @@ -322,11 +311,11 @@ where #[cfg(test)] mod tests { use super::*; - use crate::util::lazy; + use crate::{service::Pipeline, util::lazy}; #[crate::rt_test] async fn test_readiness() { - let conn = Connector::default().finish(); + let conn = Pipeline::new(Connector::default().finish()).bind(); assert!(lazy(|cx| conn.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| conn.poll_shutdown(cx).is_ready()).await); } diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index 23520fa2..3ef76485 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -80,7 +80,7 @@ where if !eof { // sending body is async process, we can handle upload and download // at the same time - crate::rt::spawn(async move { + let _ = crate::rt::spawn(async move { if let Err(e) = send_body(body, &snd_stream).await { log::error!("Cannot send body: {:?}", e); snd_stream.reset(frame::Reason::INTERNAL_ERROR); @@ -125,7 +125,7 @@ async fn get_response( log::debug!("Creating local payload stream for {:?}", stream.id()); let (mut pl, payload) = payload::Payload::create(stream.empty_capacity()); - crate::rt::spawn(async move { + let _ = crate::rt::spawn(async move { loop { let h2::Message { stream, kind } = match rcv_stream.recv().await { diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 799e582c..2de670fe 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -80,7 +80,7 @@ where })); // start pool support future - crate::rt::spawn(ConnectionPoolSupport { + let _ = crate::rt::spawn(ConnectionPoolSupport { connector: connector.clone(), inner: inner.clone(), waiters: waiters.clone(), @@ -117,8 +117,13 @@ where type Response = Connection; type Error = ConnectError; - crate::forward_poll_ready!(connector); - crate::forward_poll_shutdown!(connector); + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.connector.ready().await + } + + async fn shutdown(&self) { + self.connector.shutdown().await + } async fn call( &self, @@ -252,7 +257,7 @@ impl Inner { || (now - conn.created) > self.conn_lifetime { if let ConnectionType::H1(io) = conn.io { - spawn(async move { + let _ = spawn(async move { let _ = io.shutdown().await; }); } @@ -419,7 +424,7 @@ where let disconnect_timeout = inner.borrow().disconnect_timeout; #[allow(clippy::redundant_async_block)] - spawn(async move { + let _ = spawn(async move { OpenConnection:: { tx: Some(tx), key: key.clone(), @@ -576,7 +581,7 @@ impl Acquired { ); match io { ConnectionType::H1(io) => { - spawn(async move { + let _ = spawn(async move { let _ = io.shutdown().await; }); } @@ -634,7 +639,8 @@ mod tests { h2::Config::client(), ) .clone(), - ); + ) + .bind(); // uri must contain authority let req = Connect { diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index ce7d9dde..a2b5091d 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -325,7 +325,7 @@ impl DateService { // periodic date update let s = self.clone(); - crate::rt::spawn(async move { + let _ = crate::rt::spawn(async move { sleep(Millis(500)).await; s.0.current.set(false); }); diff --git a/ntex/src/http/h1/control.rs b/ntex/src/http/h1/control.rs index 0874214d..5ecaec95 100644 --- a/ntex/src/http/h1/control.rs +++ b/ntex/src/http/h1/control.rs @@ -210,7 +210,7 @@ impl Upgrade { H: FnOnce(Request, Io, Codec) -> R + 'static, R: Future, { - crate::rt::spawn(async move { + let _ = crate::rt::spawn(async move { let _ = f(self.req, self.io, self.codec).await; }); ControlAck { diff --git a/ntex/src/http/h1/service.rs b/ntex/src/http/h1/service.rs index 986b0967..dccf3486 100644 --- a/ntex/src/http/h1/service.rs +++ b/ntex/src/http/h1/service.rs @@ -1,4 +1,4 @@ -use std::{error::Error, fmt, marker, rc::Rc, task::Context, task::Poll}; +use std::{error::Error, fmt, marker, rc::Rc}; use crate::http::body::MessageBody; use crate::http::config::{DispatcherConfig, ServiceConfig}; @@ -6,6 +6,7 @@ use crate::http::error::{DispatchError, ResponseError}; use crate::http::{request::Request, response::Response}; use crate::io::{types, Filter, Io}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; +use crate::util::join; use super::control::{Control, ControlAck}; use super::default::DefaultControlService; @@ -208,43 +209,23 @@ where type Response = (); type Error = DispatchError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let cfg = self.config.as_ref(); - let ready1 = cfg - .control - .poll_ready(cx) - .map_err(|e| { - log::error!("Http control service readiness error: {:?}", e); - DispatchError::Control(Box::new(e)) - })? - .is_ready(); - - let ready2 = cfg - .service - .poll_ready(cx) - .map_err(|e| { - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(Box::new(e)) - })? - .is_ready(); - - if ready1 && ready2 { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await; + ready1.map_err(|e| { + log::error!("Http control service readiness error: {:?}", e); + DispatchError::Control(Box::new(e)) + })?; + ready2.map_err(|e| { + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(Box::new(e)) + }) } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let ready1 = self.config.control.poll_shutdown(cx).is_ready(); - let ready2 = self.config.service.poll_shutdown(cx).is_ready(); - - if ready1 && ready2 { - Poll::Ready(()) - } else { - Poll::Pending - } + async fn shutdown(&self) { + self.config.control.shutdown().await; + self.config.service.shutdown().await; } async fn call(&self, io: Io, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index cdfaaeb3..ca53f251 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -1,5 +1,4 @@ -use std::{cell::RefCell, io, task::Context, task::Poll}; -use std::{error::Error, fmt, future::poll_fn, marker, mem, rc::Rc}; +use std::{cell::RefCell, error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc}; use ntex_h2::{self as h2, frame::StreamId, server}; @@ -209,15 +208,17 @@ where type Response = (); type Error = DispatchError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.config.service.poll_ready(cx).map_err(|e| { + #[inline] + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.config.service.ready().await.map_err(|e| { log::error!("Service readiness error: {:?}", e); DispatchError::Service(Box::new(e)) }) } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - self.config.service.poll_shutdown(cx) + #[inline] + async fn shutdown(&self) { + self.config.service.shutdown().await } async fn call( diff --git a/ntex/src/http/service.rs b/ntex/src/http/service.rs index d08d55dd..0c554f21 100644 --- a/ntex/src/http/service.rs +++ b/ntex/src/http/service.rs @@ -1,7 +1,8 @@ -use std::{error, fmt, marker, rc::Rc, task::Context, task::Poll}; +use std::{error, fmt, marker, rc::Rc}; use crate::io::{types, Filter, Io}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; +use crate::util::join; use super::body::MessageBody; use super::builder::HttpServiceBuilder; @@ -289,43 +290,24 @@ where type Response = (); type Error = DispatchError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let cfg = self.config.as_ref(); - let ready1 = cfg - .service - .poll_ready(cx) - .map_err(|e| { - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(Box::new(e)) - })? - .is_ready(); - - let ready2 = cfg - .control - .poll_ready(cx) - .map_err(|e| { - log::error!("Http control service readiness error: {:?}", e); - DispatchError::Control(Box::new(e)) - })? - .is_ready(); - - if ready1 && ready2 { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await; + ready1.map_err(|e| { + log::error!("Http control service readiness error: {:?}", e); + DispatchError::Control(Box::new(e)) + })?; + ready2.map_err(|e| { + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(Box::new(e)) + }) } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let ready1 = self.config.control.poll_shutdown(cx).is_ready(); - let ready2 = self.config.service.poll_shutdown(cx).is_ready(); - - if ready1 && ready2 { - Poll::Ready(()) - } else { - Poll::Pending - } + #[inline] + async fn shutdown(&self) { + self.config.control.shutdown().await; + self.config.service.shutdown().await; } async fn call( diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index 07341ad1..0c9c1172 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -18,7 +18,8 @@ clippy::borrow_interior_mutable_const, clippy::needless_doctest_main, clippy::too_many_arguments, - clippy::new_without_default + clippy::new_without_default, + clippy::let_underscore_future )] #[cfg(not(test))] // Work around for rust-lang/rust#62127 @@ -27,7 +28,7 @@ pub use ntex_macros::{rt_main as main, rt_test as test}; #[cfg(test)] pub(crate) use ntex_macros::rt_test2 as rt_test; -pub use ntex_service::{forward_poll_ready, forward_poll_shutdown}; +pub use ntex_service::{forward_ready, forward_shutdown}; pub mod http; pub mod web; @@ -36,8 +37,8 @@ pub mod web; pub mod ws; pub use self::service::{ - chain, chain_factory, fn_service, into_service, IntoService, IntoServiceFactory, - Middleware, Pipeline, Service, ServiceCtx, ServiceFactory, + chain, chain_factory, fn_service, IntoService, IntoServiceFactory, Middleware, + Pipeline, Service, ServiceCtx, ServiceFactory, }; pub use ntex_util::{channel, task}; @@ -54,19 +55,11 @@ pub mod connect { #[cfg(feature = "openssl")] pub mod openssl { pub use ntex_tls::openssl::{SslConnector, SslFilter}; - - #[doc(hidden)] - #[deprecated] - pub use ntex_tls::openssl::SslConnector as Connector; } #[cfg(feature = "rustls")] pub mod rustls { pub use ntex_tls::rustls::{TlsClientFilter, TlsConnector}; - - #[doc(hidden)] - #[deprecated] - pub use ntex_tls::rustls::TlsConnector as Connector; } } diff --git a/ntex/src/web/app_service.rs b/ntex/src/web/app_service.rs index dff9fdf8..f3ecd228 100644 --- a/ntex/src/web/app_service.rs +++ b/ntex/src/web/app_service.rs @@ -1,11 +1,11 @@ -use std::{cell::RefCell, marker, rc::Rc, task::Context, task::Poll}; +use std::{cell::RefCell, marker, rc::Rc}; use crate::http::{Request, Response}; use crate::router::{Path, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::dev::ServiceChainFactory; use crate::service::{fn_service, Middleware, Service, ServiceCtx, ServiceFactory}; -use crate::util::{BoxFuture, Extensions}; +use crate::util::{join, BoxFuture, Extensions}; use super::config::AppConfig; use super::error::ErrorRenderer; @@ -202,8 +202,8 @@ where type Response = WebResponse; type Error = T::Error; - crate::forward_poll_ready!(service); - crate::forward_poll_shutdown!(service); + crate::forward_ready!(service); + crate::forward_shutdown!(service); async fn call( &self, @@ -294,14 +294,11 @@ where type Error = Err::Container; #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready1 = self.filter.poll_ready(cx)?.is_ready(); - let ready2 = self.routing.poll_ready(cx)?.is_ready(); - if ready1 && ready2 { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + let (ready1, ready2) = + join(ctx.ready(&self.filter), ctx.ready(&self.routing)).await; + ready1?; + ready2 } async fn call( diff --git a/ntex/src/web/middleware/compress.rs b/ntex/src/web/middleware/compress.rs index 2bf38126..3d8c1db7 100644 --- a/ntex/src/web/middleware/compress.rs +++ b/ntex/src/web/middleware/compress.rs @@ -67,8 +67,8 @@ where type Response = WebResponse; type Error = S::Error; - crate::forward_poll_ready!(service); - crate::forward_poll_shutdown!(service); + crate::forward_ready!(service); + crate::forward_shutdown!(service); async fn call( &self, diff --git a/ntex/src/web/middleware/defaultheaders.rs b/ntex/src/web/middleware/defaultheaders.rs index 54b96605..5aa0461e 100644 --- a/ntex/src/web/middleware/defaultheaders.rs +++ b/ntex/src/web/middleware/defaultheaders.rs @@ -110,8 +110,8 @@ where type Response = WebResponse; type Error = S::Error; - crate::forward_poll_ready!(service); - crate::forward_poll_shutdown!(service); + crate::forward_ready!(service); + crate::forward_shutdown!(service); async fn call( &self, @@ -151,7 +151,8 @@ mod tests { DefaultHeaders::new() .header(CONTENT_TYPE, "0001") .create(ok_service()), - ); + ) + .bind(); assert!(lazy(|cx| mw.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| mw.poll_shutdown(cx).is_ready()).await); diff --git a/ntex/src/web/middleware/logger.rs b/ntex/src/web/middleware/logger.rs index 514b9dbd..7ea0340d 100644 --- a/ntex/src/web/middleware/logger.rs +++ b/ntex/src/web/middleware/logger.rs @@ -139,8 +139,8 @@ where type Response = WebResponse; type Error = S::Error; - crate::forward_poll_ready!(service); - crate::forward_poll_shutdown!(service); + crate::forward_ready!(service); + crate::forward_shutdown!(service); async fn call( &self, @@ -437,7 +437,7 @@ mod tests { let logger = Logger::new("%% %{User-Agent}i %{X-Test}o %{HOME}e %D %% test") .exclude("/test"); - let srv = Pipeline::new(Middleware::create(&logger, srv.into_service())); + let srv = Pipeline::new(Middleware::create(&logger, srv.into_service())).bind(); assert!(lazy(|cx| srv.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| srv.poll_shutdown(cx).is_ready()).await); diff --git a/ntex/src/web/scope.rs b/ntex/src/web/scope.rs index 22d7b321..e501493b 100644 --- a/ntex/src/web/scope.rs +++ b/ntex/src/web/scope.rs @@ -1,11 +1,11 @@ -use std::{cell::RefCell, fmt, rc::Rc, task::Context, task::Poll}; +use std::{cell::RefCell, fmt, rc::Rc}; use crate::http::Response; use crate::router::{IntoPattern, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::{chain_factory, dev::ServiceChainFactory, IntoServiceFactory}; use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack}; -use crate::util::Extensions; +use crate::util::{join, Extensions}; use super::app::Filter; use super::config::ServiceConfig; @@ -486,14 +486,11 @@ where type Error = Err::Container; #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready1 = self.filter.poll_ready(cx)?.is_ready(); - let ready2 = self.routing.poll_ready(cx)?.is_ready(); - if ready1 && ready2 { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + let (ready1, ready2) = + join(ctx.ready(&self.filter), ctx.ready(&self.routing)).await; + ready1?; + ready2 } async fn call( diff --git a/ntex/src/web/ws.rs b/ntex/src/web/ws.rs index d3a3bbff..f4eb98e5 100644 --- a/ntex/src/web/ws.rs +++ b/ntex/src/web/ws.rs @@ -38,7 +38,7 @@ where Either::Left(async move { let result = srv.call(item).await; if let Some(s) = s { - rt::spawn(async move { s.io().close() }); + let _ = rt::spawn(async move { s.io().close() }); } result }) @@ -104,7 +104,7 @@ where cfg.set_keepalive_timeout(Seconds::ZERO); // start websockets service dispatcher - rt::spawn(async move { + let _ = rt::spawn(async move { let res = crate::io::Dispatcher::new(io, codec, srv, &cfg).await; log::trace!("Ws handler is terminated: {:?}", res); }); diff --git a/ntex/src/ws/client.rs b/ntex/src/ws/client.rs index 69dfc812..819fb33f 100644 --- a/ntex/src/ws/client.rs +++ b/ntex/src/ws/client.rs @@ -18,7 +18,7 @@ use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri} use crate::io::{ Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed, }; -use crate::service::{apply_fn, into_service, IntoService, Pipeline, Service}; +use crate::service::{apply_fn, fn_service, IntoService, Pipeline, Service}; use crate::time::{timeout, Millis, Seconds}; use crate::{channel::mpsc, rt, util::Ready, ws}; @@ -534,8 +534,8 @@ where pub fn openssl( &mut self, connector: tls_openssl::ssl::SslConnector, - ) -> WsClientBuilder, openssl::Connector> { - self.connector(openssl::Connector::new(connector)) + ) -> WsClientBuilder, openssl::SslConnector> { + self.connector(openssl::SslConnector::new(connector)) } #[cfg(feature = "rustls")] @@ -543,8 +543,8 @@ where pub fn rustls( &mut self, config: std::sync::Arc, - ) -> WsClientBuilder, rustls::Connector> { - self.connector(rustls::Connector::from(config)) + ) -> WsClientBuilder, rustls::TlsConnector> { + self.connector(rustls::TlsConnector::from(config)) } /// This method construct new `WsClientBuilder` @@ -732,12 +732,12 @@ impl WsConnection { pub fn receiver(self) -> mpsc::Receiver>> { let (tx, rx): (_, mpsc::Receiver>>) = mpsc::channel(); - rt::spawn(async move { + let _ = rt::spawn(async move { let tx2 = tx.clone(); let io = self.io.get_ref(); let result = self - .start(into_service(move |item: ws::Frame| { + .start(fn_service(move |item: ws::Frame| { match tx.send(Ok(item)) { Ok(()) => (), Err(_) => io.close(), diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index 0a8aeaa9..5ecd51b7 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -67,7 +67,7 @@ async fn test_openssl_string() { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); - let conn = Pipeline::new(ntex::connect::openssl::Connector::new(builder.build())); + let conn = Pipeline::new(ntex::connect::openssl::SslConnector::new(builder.build())); let addr = format!("127.0.0.1:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); @@ -116,7 +116,7 @@ async fn test_openssl_read_before_error() { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); - let conn = Pipeline::new(ntex::connect::openssl::Connector::new(builder.build())); + let conn = Pipeline::new(ntex::connect::openssl::SslConnector::new(builder.build())); let addr = format!("127.0.0.1:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); let item = io.recv(&Rc::new(BytesCodec)).await.unwrap().unwrap(); @@ -166,7 +166,7 @@ async fn test_rustls_string() { ) }); - let conn = Pipeline::new(ntex::connect::rustls::Connector::new( + let conn = Pipeline::new(ntex::connect::rustls::TlsConnector::new( rustls_utils::tls_connector(), )); let addr = format!("localhost:{}", srv.addr().port()); diff --git a/ntex/tests/http_ws.rs b/ntex/tests/http_ws.rs index ce3b1bee..3a15fde4 100644 --- a/ntex/tests/http_ws.rs +++ b/ntex/tests/http_ws.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, io, sync::Arc, sync::Mutex, task::Context, task::Poll}; +use std::{cell::Cell, io, sync::Arc, sync::Mutex}; use ntex::codec::BytesCodec; use ntex::http::test::server as test_server; @@ -35,9 +35,9 @@ impl Service<(Request, Io, h1::Codec)> for WsService { type Response = (); type Error = io::Error; - fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll> { + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { self.set_polled(); - Poll::Ready(Ok(())) + Ok(()) } async fn call( diff --git a/ntex/tests/http_ws_client.rs b/ntex/tests/http_ws_client.rs index ba3aa2fa..4b7a7dc6 100644 --- a/ntex/tests/http_ws_client.rs +++ b/ntex/tests/http_ws_client.rs @@ -111,7 +111,7 @@ async fn test_transport() { Dispatcher::new( io.seal(), ws::Codec::default(), - ws_service, + fn_service(ws_service), &Default::default(), ) .await @@ -153,8 +153,13 @@ async fn test_keepalive_timeout() { // start websocket service let cfg = DispatcherConfig::default(); cfg.set_keepalive_timeout(Seconds::ZERO); - Dispatcher::new(io.seal(), ws::Codec::default(), ws_service, &cfg) - .await + Dispatcher::new( + io.seal(), + ws::Codec::default(), + fn_service(ws_service), + &cfg, + ) + .await }) } else { req.ack()