mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 05:17:39 +03:00
Use async fn for Service::ready() and Service::shutdown() (#364)
This commit is contained in:
parent
c52db3fd10
commit
3b49828e5f
33 changed files with 147 additions and 190 deletions
|
@ -29,7 +29,7 @@ Starting ntex v0.5 async runtime must be selected as a feature. Available option
|
|||
|
||||
```toml
|
||||
[dependencies]
|
||||
ntex = { version = "1.0", features = ["tokio"] }
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
```
|
||||
|
||||
## Documentation & community resources
|
||||
|
|
|
@ -29,5 +29,5 @@ simdutf8 = { version = "0.1.4", optional = true }
|
|||
[dev-dependencies]
|
||||
serde_test = "1"
|
||||
serde_json = "1"
|
||||
ntex = { version = "1", features = ["tokio"] }
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
ntex-bytes = { path = ".", features = ["mpool"] }
|
||||
|
|
|
@ -29,4 +29,4 @@ pin-project-lite = "0.2"
|
|||
rand = "0.8"
|
||||
env_logger = "0.11"
|
||||
|
||||
ntex = { version = "1", features = ["tokio"] }
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
|
|
|
@ -16,6 +16,6 @@ syn = { version = "^1", features = ["full", "parsing"] }
|
|||
proc-macro2 = "^1"
|
||||
|
||||
[dev-dependencies]
|
||||
ntex = { version = "1", features = ["tokio"] }
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
futures = "0.3"
|
||||
env_logger = "0.11"
|
|
@ -44,4 +44,4 @@ thiserror = "1.0"
|
|||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.11"
|
||||
ntex = { version = "1", features = ["tokio"] }
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
|
|
|
@ -30,7 +30,7 @@ socket2 = "0.5"
|
|||
oneshot = { version = "0.1", default-features = false, features = ["async"] }
|
||||
|
||||
[dev-dependencies]
|
||||
ntex = { version = "1", features = ["tokio"] }
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
ntex-macros = "0.1.3"
|
||||
|
||||
[target.'cfg(target_family = "unix")'.dependencies]
|
||||
|
|
|
@ -19,5 +19,5 @@ path = "src/lib.rs"
|
|||
slab = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
ntex = { version = "1", features = ["tokio"] }
|
||||
ntex-util = "1"
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
ntex-util = "2"
|
||||
|
|
|
@ -40,7 +40,7 @@ tls_openssl = { version = "0.10", package = "openssl", optional = true }
|
|||
tls_rust = { version = "0.23", package = "rustls", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
ntex = { version = "1", features = ["openssl", "rustls", "tokio"] }
|
||||
ntex = { version = "2", features = ["openssl", "rustls", "tokio"] }
|
||||
env_logger = "0.11"
|
||||
rustls-pemfile = "2"
|
||||
webpki-roots = "0.26"
|
||||
|
|
|
@ -16,7 +16,7 @@ async fn main() -> io::Result<()> {
|
|||
.with_no_client_auth();
|
||||
|
||||
// rustls connector
|
||||
let connector = connect::rustls::Connector::new(config.clone());
|
||||
let connector = connect::rustls::TlsConnector::new(config.clone());
|
||||
|
||||
//let io = connector.connect("www.rust-lang.org:443").await.unwrap();
|
||||
let io = connector.connect("127.0.0.1:8443").await.unwrap();
|
||||
|
|
|
@ -21,7 +21,7 @@ async fn main() -> io::Result<()> {
|
|||
builder.set_verify(SslVerifyMode::NONE);
|
||||
|
||||
// openssl connector
|
||||
let connector = connect::openssl::Connector::new(builder.build());
|
||||
let connector = connect::openssl::SslConnector::new(builder.build());
|
||||
|
||||
let io = connector.connect("127.0.0.1:8443").await.unwrap();
|
||||
println!("Connected to ssl server");
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
#![allow(dead_code)]
|
||||
use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll};
|
||||
|
||||
use ntex_util::task::LocalWaker;
|
||||
|
|
|
@ -28,6 +28,6 @@ futures-sink = { version = "0.3", default-features = false, features = ["alloc"]
|
|||
pin-project-lite = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
ntex = { version = "1", features = ["tokio"] }
|
||||
ntex = { version = "2", features = ["tokio"] }
|
||||
ntex-macros = "0.1.3"
|
||||
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [2.0.0] - 2024-05-28
|
||||
|
||||
* Use "async fn" for Service::ready() and Service::shutdown()
|
||||
|
||||
## [1.2.1] - 2024-03-28
|
||||
|
||||
* Feature gate websocket support #320
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex"
|
||||
version = "1.2.1"
|
||||
version = "2.0.0"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "Framework for composable network services"
|
||||
readme = "README.md"
|
||||
|
@ -60,16 +60,16 @@ brotli = ["dep:brotli2"]
|
|||
ntex-codec = "0.6.2"
|
||||
ntex-http = "0.1.12"
|
||||
ntex-router = "0.5.3"
|
||||
ntex-service = "2.0.1"
|
||||
ntex-service = "3.0"
|
||||
ntex-macros = "0.1.3"
|
||||
ntex-util = "1.0.1"
|
||||
ntex-util = "2.0"
|
||||
ntex-bytes = "0.1.25"
|
||||
ntex-server = "1.0.5"
|
||||
ntex-h2 = "0.5.4"
|
||||
ntex-server = "2.0"
|
||||
ntex-h2 = "1.0"
|
||||
ntex-rt = "0.4.12"
|
||||
ntex-io = "1.2.0"
|
||||
ntex-net = "1.0.1"
|
||||
ntex-tls = "1.1.0"
|
||||
ntex-io = "2.0"
|
||||
ntex-net = "2.0"
|
||||
ntex-tls = "2.0"
|
||||
|
||||
base64 = "0.22"
|
||||
bitflags = "2"
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use std::{fmt, task::Context, task::Poll, time::Duration};
|
||||
use std::{fmt, time::Duration};
|
||||
|
||||
use ntex_h2::{self as h2};
|
||||
|
||||
use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
|
||||
use crate::service::{apply_fn, boxed, Service, ServiceCtx};
|
||||
use crate::time::{Millis, Seconds};
|
||||
use crate::util::{timeout::TimeoutError, timeout::TimeoutService};
|
||||
use crate::util::{join, timeout::TimeoutError, timeout::TimeoutService};
|
||||
use crate::{http::Uri, io::IoBoxed};
|
||||
|
||||
use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect};
|
||||
|
@ -273,31 +273,20 @@ where
|
|||
type Response = <ConnectionPool<T> as Service<Connect>>::Response;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let ready = self.tcp_pool.poll_ready(cx)?.is_ready();
|
||||
let ready = if let Some(ref ssl_pool) = self.ssl_pool {
|
||||
ssl_pool.poll_ready(cx)?.is_ready() && ready
|
||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
if let Some(ref ssl_pool) = self.ssl_pool {
|
||||
let (r1, r2) = join(ctx.ready(&self.tcp_pool), ctx.ready(ssl_pool)).await;
|
||||
r1?;
|
||||
r2
|
||||
} else {
|
||||
ready
|
||||
};
|
||||
if ready {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
ctx.ready(&self.tcp_pool).await
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
let tcp_ready = self.tcp_pool.poll_shutdown(cx).is_ready();
|
||||
let ssl_ready = self
|
||||
.ssl_pool
|
||||
.as_ref()
|
||||
.map(|pool| pool.poll_shutdown(cx).is_ready())
|
||||
.unwrap_or(true);
|
||||
if tcp_ready && ssl_ready {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
async fn shutdown(&self) {
|
||||
self.tcp_pool.shutdown().await;
|
||||
if let Some(ref ssl_pool) = self.ssl_pool {
|
||||
ssl_pool.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -322,11 +311,11 @@ where
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::util::lazy;
|
||||
use crate::{service::Pipeline, util::lazy};
|
||||
|
||||
#[crate::rt_test]
|
||||
async fn test_readiness() {
|
||||
let conn = Connector::default().finish();
|
||||
let conn = Pipeline::new(Connector::default().finish()).bind();
|
||||
assert!(lazy(|cx| conn.poll_ready(cx).is_ready()).await);
|
||||
assert!(lazy(|cx| conn.poll_shutdown(cx).is_ready()).await);
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ where
|
|||
if !eof {
|
||||
// sending body is async process, we can handle upload and download
|
||||
// at the same time
|
||||
crate::rt::spawn(async move {
|
||||
let _ = crate::rt::spawn(async move {
|
||||
if let Err(e) = send_body(body, &snd_stream).await {
|
||||
log::error!("Cannot send body: {:?}", e);
|
||||
snd_stream.reset(frame::Reason::INTERNAL_ERROR);
|
||||
|
@ -125,7 +125,7 @@ async fn get_response(
|
|||
log::debug!("Creating local payload stream for {:?}", stream.id());
|
||||
let (mut pl, payload) =
|
||||
payload::Payload::create(stream.empty_capacity());
|
||||
crate::rt::spawn(async move {
|
||||
let _ = crate::rt::spawn(async move {
|
||||
loop {
|
||||
let h2::Message { stream, kind } =
|
||||
match rcv_stream.recv().await {
|
||||
|
|
|
@ -80,7 +80,7 @@ where
|
|||
}));
|
||||
|
||||
// start pool support future
|
||||
crate::rt::spawn(ConnectionPoolSupport {
|
||||
let _ = crate::rt::spawn(ConnectionPoolSupport {
|
||||
connector: connector.clone(),
|
||||
inner: inner.clone(),
|
||||
waiters: waiters.clone(),
|
||||
|
@ -117,8 +117,13 @@ where
|
|||
type Response = Connection;
|
||||
type Error = ConnectError;
|
||||
|
||||
crate::forward_poll_ready!(connector);
|
||||
crate::forward_poll_shutdown!(connector);
|
||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
self.connector.ready().await
|
||||
}
|
||||
|
||||
async fn shutdown(&self) {
|
||||
self.connector.shutdown().await
|
||||
}
|
||||
|
||||
async fn call(
|
||||
&self,
|
||||
|
@ -252,7 +257,7 @@ impl Inner {
|
|||
|| (now - conn.created) > self.conn_lifetime
|
||||
{
|
||||
if let ConnectionType::H1(io) = conn.io {
|
||||
spawn(async move {
|
||||
let _ = spawn(async move {
|
||||
let _ = io.shutdown().await;
|
||||
});
|
||||
}
|
||||
|
@ -419,7 +424,7 @@ where
|
|||
let disconnect_timeout = inner.borrow().disconnect_timeout;
|
||||
|
||||
#[allow(clippy::redundant_async_block)]
|
||||
spawn(async move {
|
||||
let _ = spawn(async move {
|
||||
OpenConnection::<T> {
|
||||
tx: Some(tx),
|
||||
key: key.clone(),
|
||||
|
@ -576,7 +581,7 @@ impl Acquired {
|
|||
);
|
||||
match io {
|
||||
ConnectionType::H1(io) => {
|
||||
spawn(async move {
|
||||
let _ = spawn(async move {
|
||||
let _ = io.shutdown().await;
|
||||
});
|
||||
}
|
||||
|
@ -634,7 +639,8 @@ mod tests {
|
|||
h2::Config::client(),
|
||||
)
|
||||
.clone(),
|
||||
);
|
||||
)
|
||||
.bind();
|
||||
|
||||
// uri must contain authority
|
||||
let req = Connect {
|
||||
|
|
|
@ -325,7 +325,7 @@ impl DateService {
|
|||
|
||||
// periodic date update
|
||||
let s = self.clone();
|
||||
crate::rt::spawn(async move {
|
||||
let _ = crate::rt::spawn(async move {
|
||||
sleep(Millis(500)).await;
|
||||
s.0.current.set(false);
|
||||
});
|
||||
|
|
|
@ -210,7 +210,7 @@ impl<F: Filter> Upgrade<F> {
|
|||
H: FnOnce(Request, Io<F>, Codec) -> R + 'static,
|
||||
R: Future<Output = O>,
|
||||
{
|
||||
crate::rt::spawn(async move {
|
||||
let _ = crate::rt::spawn(async move {
|
||||
let _ = f(self.req, self.io, self.codec).await;
|
||||
});
|
||||
ControlAck {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{error::Error, fmt, marker, rc::Rc, task::Context, task::Poll};
|
||||
use std::{error::Error, fmt, marker, rc::Rc};
|
||||
|
||||
use crate::http::body::MessageBody;
|
||||
use crate::http::config::{DispatcherConfig, ServiceConfig};
|
||||
|
@ -6,6 +6,7 @@ use crate::http::error::{DispatchError, ResponseError};
|
|||
use crate::http::{request::Request, response::Response};
|
||||
use crate::io::{types, Filter, Io};
|
||||
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
||||
use crate::util::join;
|
||||
|
||||
use super::control::{Control, ControlAck};
|
||||
use super::default::DefaultControlService;
|
||||
|
@ -208,43 +209,23 @@ where
|
|||
type Response = ();
|
||||
type Error = DispatchError;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
let cfg = self.config.as_ref();
|
||||
|
||||
let ready1 = cfg
|
||||
.control
|
||||
.poll_ready(cx)
|
||||
.map_err(|e| {
|
||||
let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
|
||||
ready1.map_err(|e| {
|
||||
log::error!("Http control service readiness error: {:?}", e);
|
||||
DispatchError::Control(Box::new(e))
|
||||
})?
|
||||
.is_ready();
|
||||
|
||||
let ready2 = cfg
|
||||
.service
|
||||
.poll_ready(cx)
|
||||
.map_err(|e| {
|
||||
})?;
|
||||
ready2.map_err(|e| {
|
||||
log::error!("Http service readiness error: {:?}", e);
|
||||
DispatchError::Service(Box::new(e))
|
||||
})?
|
||||
.is_ready();
|
||||
|
||||
if ready1 && ready2 {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
let ready1 = self.config.control.poll_shutdown(cx).is_ready();
|
||||
let ready2 = self.config.service.poll_shutdown(cx).is_ready();
|
||||
|
||||
if ready1 && ready2 {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
async fn shutdown(&self) {
|
||||
self.config.control.shutdown().await;
|
||||
self.config.service.shutdown().await;
|
||||
}
|
||||
|
||||
async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use std::{cell::RefCell, io, task::Context, task::Poll};
|
||||
use std::{error::Error, fmt, future::poll_fn, marker, mem, rc::Rc};
|
||||
use std::{cell::RefCell, error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc};
|
||||
|
||||
use ntex_h2::{self as h2, frame::StreamId, server};
|
||||
|
||||
|
@ -209,15 +208,17 @@ where
|
|||
type Response = ();
|
||||
type Error = DispatchError;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.config.service.poll_ready(cx).map_err(|e| {
|
||||
#[inline]
|
||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
self.config.service.ready().await.map_err(|e| {
|
||||
log::error!("Service readiness error: {:?}", e);
|
||||
DispatchError::Service(Box::new(e))
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.config.service.poll_shutdown(cx)
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
self.config.service.shutdown().await
|
||||
}
|
||||
|
||||
async fn call(
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::{error, fmt, marker, rc::Rc, task::Context, task::Poll};
|
||||
use std::{error, fmt, marker, rc::Rc};
|
||||
|
||||
use crate::io::{types, Filter, Io};
|
||||
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
|
||||
use crate::util::join;
|
||||
|
||||
use super::body::MessageBody;
|
||||
use super::builder::HttpServiceBuilder;
|
||||
|
@ -289,43 +290,24 @@ where
|
|||
type Response = ();
|
||||
type Error = DispatchError;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
let cfg = self.config.as_ref();
|
||||
|
||||
let ready1 = cfg
|
||||
.service
|
||||
.poll_ready(cx)
|
||||
.map_err(|e| {
|
||||
log::error!("Http service readiness error: {:?}", e);
|
||||
DispatchError::Service(Box::new(e))
|
||||
})?
|
||||
.is_ready();
|
||||
|
||||
let ready2 = cfg
|
||||
.control
|
||||
.poll_ready(cx)
|
||||
.map_err(|e| {
|
||||
let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
|
||||
ready1.map_err(|e| {
|
||||
log::error!("Http control service readiness error: {:?}", e);
|
||||
DispatchError::Control(Box::new(e))
|
||||
})?
|
||||
.is_ready();
|
||||
|
||||
if ready1 && ready2 {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})?;
|
||||
ready2.map_err(|e| {
|
||||
log::error!("Http service readiness error: {:?}", e);
|
||||
DispatchError::Service(Box::new(e))
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
let ready1 = self.config.control.poll_shutdown(cx).is_ready();
|
||||
let ready2 = self.config.service.poll_shutdown(cx).is_ready();
|
||||
|
||||
if ready1 && ready2 {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
#[inline]
|
||||
async fn shutdown(&self) {
|
||||
self.config.control.shutdown().await;
|
||||
self.config.service.shutdown().await;
|
||||
}
|
||||
|
||||
async fn call(
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
clippy::borrow_interior_mutable_const,
|
||||
clippy::needless_doctest_main,
|
||||
clippy::too_many_arguments,
|
||||
clippy::new_without_default
|
||||
clippy::new_without_default,
|
||||
clippy::let_underscore_future
|
||||
)]
|
||||
|
||||
#[cfg(not(test))] // Work around for rust-lang/rust#62127
|
||||
|
@ -27,7 +28,7 @@ pub use ntex_macros::{rt_main as main, rt_test as test};
|
|||
#[cfg(test)]
|
||||
pub(crate) use ntex_macros::rt_test2 as rt_test;
|
||||
|
||||
pub use ntex_service::{forward_poll_ready, forward_poll_shutdown};
|
||||
pub use ntex_service::{forward_ready, forward_shutdown};
|
||||
|
||||
pub mod http;
|
||||
pub mod web;
|
||||
|
@ -36,8 +37,8 @@ pub mod web;
|
|||
pub mod ws;
|
||||
|
||||
pub use self::service::{
|
||||
chain, chain_factory, fn_service, into_service, IntoService, IntoServiceFactory,
|
||||
Middleware, Pipeline, Service, ServiceCtx, ServiceFactory,
|
||||
chain, chain_factory, fn_service, IntoService, IntoServiceFactory, Middleware,
|
||||
Pipeline, Service, ServiceCtx, ServiceFactory,
|
||||
};
|
||||
|
||||
pub use ntex_util::{channel, task};
|
||||
|
@ -54,19 +55,11 @@ pub mod connect {
|
|||
#[cfg(feature = "openssl")]
|
||||
pub mod openssl {
|
||||
pub use ntex_tls::openssl::{SslConnector, SslFilter};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated]
|
||||
pub use ntex_tls::openssl::SslConnector as Connector;
|
||||
}
|
||||
|
||||
#[cfg(feature = "rustls")]
|
||||
pub mod rustls {
|
||||
pub use ntex_tls::rustls::{TlsClientFilter, TlsConnector};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated]
|
||||
pub use ntex_tls::rustls::TlsConnector as Connector;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use std::{cell::RefCell, marker, rc::Rc, task::Context, task::Poll};
|
||||
use std::{cell::RefCell, marker, rc::Rc};
|
||||
|
||||
use crate::http::{Request, Response};
|
||||
use crate::router::{Path, ResourceDef, Router};
|
||||
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
|
||||
use crate::service::dev::ServiceChainFactory;
|
||||
use crate::service::{fn_service, Middleware, Service, ServiceCtx, ServiceFactory};
|
||||
use crate::util::{BoxFuture, Extensions};
|
||||
use crate::util::{join, BoxFuture, Extensions};
|
||||
|
||||
use super::config::AppConfig;
|
||||
use super::error::ErrorRenderer;
|
||||
|
@ -202,8 +202,8 @@ where
|
|||
type Response = WebResponse;
|
||||
type Error = T::Error;
|
||||
|
||||
crate::forward_poll_ready!(service);
|
||||
crate::forward_poll_shutdown!(service);
|
||||
crate::forward_ready!(service);
|
||||
crate::forward_shutdown!(service);
|
||||
|
||||
async fn call(
|
||||
&self,
|
||||
|
@ -294,14 +294,11 @@ where
|
|||
type Error = Err::Container;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let ready1 = self.filter.poll_ready(cx)?.is_ready();
|
||||
let ready2 = self.routing.poll_ready(cx)?.is_ready();
|
||||
if ready1 && ready2 {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
let (ready1, ready2) =
|
||||
join(ctx.ready(&self.filter), ctx.ready(&self.routing)).await;
|
||||
ready1?;
|
||||
ready2
|
||||
}
|
||||
|
||||
async fn call(
|
||||
|
|
|
@ -67,8 +67,8 @@ where
|
|||
type Response = WebResponse;
|
||||
type Error = S::Error;
|
||||
|
||||
crate::forward_poll_ready!(service);
|
||||
crate::forward_poll_shutdown!(service);
|
||||
crate::forward_ready!(service);
|
||||
crate::forward_shutdown!(service);
|
||||
|
||||
async fn call(
|
||||
&self,
|
||||
|
|
|
@ -110,8 +110,8 @@ where
|
|||
type Response = WebResponse;
|
||||
type Error = S::Error;
|
||||
|
||||
crate::forward_poll_ready!(service);
|
||||
crate::forward_poll_shutdown!(service);
|
||||
crate::forward_ready!(service);
|
||||
crate::forward_shutdown!(service);
|
||||
|
||||
async fn call(
|
||||
&self,
|
||||
|
@ -151,7 +151,8 @@ mod tests {
|
|||
DefaultHeaders::new()
|
||||
.header(CONTENT_TYPE, "0001")
|
||||
.create(ok_service()),
|
||||
);
|
||||
)
|
||||
.bind();
|
||||
|
||||
assert!(lazy(|cx| mw.poll_ready(cx).is_ready()).await);
|
||||
assert!(lazy(|cx| mw.poll_shutdown(cx).is_ready()).await);
|
||||
|
|
|
@ -139,8 +139,8 @@ where
|
|||
type Response = WebResponse;
|
||||
type Error = S::Error;
|
||||
|
||||
crate::forward_poll_ready!(service);
|
||||
crate::forward_poll_shutdown!(service);
|
||||
crate::forward_ready!(service);
|
||||
crate::forward_shutdown!(service);
|
||||
|
||||
async fn call(
|
||||
&self,
|
||||
|
@ -437,7 +437,7 @@ mod tests {
|
|||
let logger = Logger::new("%% %{User-Agent}i %{X-Test}o %{HOME}e %D %% test")
|
||||
.exclude("/test");
|
||||
|
||||
let srv = Pipeline::new(Middleware::create(&logger, srv.into_service()));
|
||||
let srv = Pipeline::new(Middleware::create(&logger, srv.into_service())).bind();
|
||||
assert!(lazy(|cx| srv.poll_ready(cx).is_ready()).await);
|
||||
assert!(lazy(|cx| srv.poll_shutdown(cx).is_ready()).await);
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use std::{cell::RefCell, fmt, rc::Rc, task::Context, task::Poll};
|
||||
use std::{cell::RefCell, fmt, rc::Rc};
|
||||
|
||||
use crate::http::Response;
|
||||
use crate::router::{IntoPattern, ResourceDef, Router};
|
||||
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
|
||||
use crate::service::{chain_factory, dev::ServiceChainFactory, IntoServiceFactory};
|
||||
use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack};
|
||||
use crate::util::Extensions;
|
||||
use crate::util::{join, Extensions};
|
||||
|
||||
use super::app::Filter;
|
||||
use super::config::ServiceConfig;
|
||||
|
@ -486,14 +486,11 @@ where
|
|||
type Error = Err::Container;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let ready1 = self.filter.poll_ready(cx)?.is_ready();
|
||||
let ready2 = self.routing.poll_ready(cx)?.is_ready();
|
||||
if ready1 && ready2 {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
let (ready1, ready2) =
|
||||
join(ctx.ready(&self.filter), ctx.ready(&self.routing)).await;
|
||||
ready1?;
|
||||
ready2
|
||||
}
|
||||
|
||||
async fn call(
|
||||
|
|
|
@ -38,7 +38,7 @@ where
|
|||
Either::Left(async move {
|
||||
let result = srv.call(item).await;
|
||||
if let Some(s) = s {
|
||||
rt::spawn(async move { s.io().close() });
|
||||
let _ = rt::spawn(async move { s.io().close() });
|
||||
}
|
||||
result
|
||||
})
|
||||
|
@ -104,7 +104,7 @@ where
|
|||
cfg.set_keepalive_timeout(Seconds::ZERO);
|
||||
|
||||
// start websockets service dispatcher
|
||||
rt::spawn(async move {
|
||||
let _ = rt::spawn(async move {
|
||||
let res = crate::io::Dispatcher::new(io, codec, srv, &cfg).await;
|
||||
log::trace!("Ws handler is terminated: {:?}", res);
|
||||
});
|
||||
|
|
|
@ -18,7 +18,7 @@ use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri}
|
|||
use crate::io::{
|
||||
Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed,
|
||||
};
|
||||
use crate::service::{apply_fn, into_service, IntoService, Pipeline, Service};
|
||||
use crate::service::{apply_fn, fn_service, IntoService, Pipeline, Service};
|
||||
use crate::time::{timeout, Millis, Seconds};
|
||||
use crate::{channel::mpsc, rt, util::Ready, ws};
|
||||
|
||||
|
@ -534,8 +534,8 @@ where
|
|||
pub fn openssl(
|
||||
&mut self,
|
||||
connector: tls_openssl::ssl::SslConnector,
|
||||
) -> WsClientBuilder<Layer<openssl::SslFilter>, openssl::Connector<Uri>> {
|
||||
self.connector(openssl::Connector::new(connector))
|
||||
) -> WsClientBuilder<Layer<openssl::SslFilter>, openssl::SslConnector<Uri>> {
|
||||
self.connector(openssl::SslConnector::new(connector))
|
||||
}
|
||||
|
||||
#[cfg(feature = "rustls")]
|
||||
|
@ -543,8 +543,8 @@ where
|
|||
pub fn rustls(
|
||||
&mut self,
|
||||
config: std::sync::Arc<tls_rustls::ClientConfig>,
|
||||
) -> WsClientBuilder<Layer<rustls::TlsClientFilter>, rustls::Connector<Uri>> {
|
||||
self.connector(rustls::Connector::from(config))
|
||||
) -> WsClientBuilder<Layer<rustls::TlsClientFilter>, rustls::TlsConnector<Uri>> {
|
||||
self.connector(rustls::TlsConnector::from(config))
|
||||
}
|
||||
|
||||
/// This method construct new `WsClientBuilder`
|
||||
|
@ -732,12 +732,12 @@ impl WsConnection<Sealed> {
|
|||
pub fn receiver(self) -> mpsc::Receiver<Result<ws::Frame, WsError<()>>> {
|
||||
let (tx, rx): (_, mpsc::Receiver<Result<ws::Frame, WsError<()>>>) = mpsc::channel();
|
||||
|
||||
rt::spawn(async move {
|
||||
let _ = rt::spawn(async move {
|
||||
let tx2 = tx.clone();
|
||||
let io = self.io.get_ref();
|
||||
|
||||
let result = self
|
||||
.start(into_service(move |item: ws::Frame| {
|
||||
.start(fn_service(move |item: ws::Frame| {
|
||||
match tx.send(Ok(item)) {
|
||||
Ok(()) => (),
|
||||
Err(_) => io.close(),
|
||||
|
|
|
@ -67,7 +67,7 @@ async fn test_openssl_string() {
|
|||
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
builder.set_verify(SslVerifyMode::NONE);
|
||||
|
||||
let conn = Pipeline::new(ntex::connect::openssl::Connector::new(builder.build()));
|
||||
let conn = Pipeline::new(ntex::connect::openssl::SslConnector::new(builder.build()));
|
||||
let addr = format!("127.0.0.1:{}", srv.addr().port());
|
||||
let io = conn.call(addr.into()).await.unwrap();
|
||||
assert_eq!(io.query::<PeerAddr>().get().unwrap(), srv.addr().into());
|
||||
|
@ -116,7 +116,7 @@ async fn test_openssl_read_before_error() {
|
|||
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
builder.set_verify(SslVerifyMode::NONE);
|
||||
|
||||
let conn = Pipeline::new(ntex::connect::openssl::Connector::new(builder.build()));
|
||||
let conn = Pipeline::new(ntex::connect::openssl::SslConnector::new(builder.build()));
|
||||
let addr = format!("127.0.0.1:{}", srv.addr().port());
|
||||
let io = conn.call(addr.into()).await.unwrap();
|
||||
let item = io.recv(&Rc::new(BytesCodec)).await.unwrap().unwrap();
|
||||
|
@ -166,7 +166,7 @@ async fn test_rustls_string() {
|
|||
)
|
||||
});
|
||||
|
||||
let conn = Pipeline::new(ntex::connect::rustls::Connector::new(
|
||||
let conn = Pipeline::new(ntex::connect::rustls::TlsConnector::new(
|
||||
rustls_utils::tls_connector(),
|
||||
));
|
||||
let addr = format!("localhost:{}", srv.addr().port());
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{cell::Cell, io, sync::Arc, sync::Mutex, task::Context, task::Poll};
|
||||
use std::{cell::Cell, io, sync::Arc, sync::Mutex};
|
||||
|
||||
use ntex::codec::BytesCodec;
|
||||
use ntex::http::test::server as test_server;
|
||||
|
@ -35,9 +35,9 @@ impl Service<(Request, Io, h1::Codec)> for WsService {
|
|||
type Response = ();
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
self.set_polled();
|
||||
Poll::Ready(Ok(()))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn call(
|
||||
|
|
|
@ -111,7 +111,7 @@ async fn test_transport() {
|
|||
Dispatcher::new(
|
||||
io.seal(),
|
||||
ws::Codec::default(),
|
||||
ws_service,
|
||||
fn_service(ws_service),
|
||||
&Default::default(),
|
||||
)
|
||||
.await
|
||||
|
@ -153,7 +153,12 @@ async fn test_keepalive_timeout() {
|
|||
// start websocket service
|
||||
let cfg = DispatcherConfig::default();
|
||||
cfg.set_keepalive_timeout(Seconds::ZERO);
|
||||
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service, &cfg)
|
||||
Dispatcher::new(
|
||||
io.seal(),
|
||||
ws::Codec::default(),
|
||||
fn_service(ws_service),
|
||||
&cfg,
|
||||
)
|
||||
.await
|
||||
})
|
||||
} else {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue