Migrate ntex to async fn in trait

This commit is contained in:
Nikolay Kim 2024-01-07 06:43:37 +06:00
parent 2e12cc6edf
commit 4950eb4167
57 changed files with 576 additions and 932 deletions

View file

@ -35,3 +35,5 @@ ntex-util = { path = "ntex-util" }
ntex-glommio = { path = "ntex-glommio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-async-std = { path = "ntex-async-std" }
ntex-h2 = { git = "https://github.com/ntex-rs/ntex-h2.git", branch = "async-fn-in-trait" }

View file

@ -27,4 +27,4 @@ simdutf8 = { version = "0.1.4", optional = true }
[dev-dependencies]
serde_test = "1.0"
serde_json = "1.0"
ntex = { version = "0.7.0", features = ["tokio"] }
ntex = { version = "1.0.0", features = ["tokio"] }

View file

@ -59,4 +59,4 @@ webpki-roots = { version = "0.25", optional = true }
[dev-dependencies]
rand = "0.8"
env_logger = "0.10"
ntex = { version = "0.7.0", features = ["tokio"] }
ntex = { version = "1.0", features = ["tokio"] }

View file

@ -29,4 +29,4 @@ pin-project-lite = "0.2"
rand = "0.8"
env_logger = "0.10"
ntex = { version = "0.7", features = ["tokio"] }
ntex = { version = "1.0.0", features = ["tokio"] }

View file

@ -839,10 +839,10 @@ mod tests {
Poll::Ready(Err(()))
}
async fn call<'a>(
&'a self,
async fn call(
&self,
_: DispatchItem<BytesCodec>,
_: ServiceCtx<'a, Self>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
Ok(None)
}

View file

@ -16,6 +16,6 @@ syn = { version = "^1", features = ["full", "parsing"] }
proc-macro2 = "^1"
[dev-dependencies]
ntex = { version = "0.7.0", features = ["tokio"] }
ntex = { version = "1.0.0", features = ["tokio"] }
futures = "0.3"
env_logger = "0.10"

View file

@ -20,5 +20,5 @@ pin-project-lite = "0.2.6"
slab = "0.4"
[dev-dependencies]
ntex = { version = "0.7.0", features = ["tokio"] }
ntex-util = "0.3.0"
ntex = { version = "1.0", features = ["tokio"] }
ntex-util = "1.0.0"

View file

@ -110,10 +110,10 @@ mod tests {
Poll::Ready(Ok(()))
}
async fn call<'a>(
&'a self,
async fn call(
&self,
req: &'static str,
_: ServiceCtx<'a, Self>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, ()> {
Ok(req)
}
@ -131,10 +131,10 @@ mod tests {
Poll::Ready(Ok(()))
}
async fn call<'a>(
&'a self,
async fn call(
&self,
req: &'static str,
_: ServiceCtx<'a, Self>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, ()> {
Ok((req, "srv2"))
}

View file

@ -212,7 +212,7 @@ mod tests {
type Response = ();
type Error = ();
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Ok(())
}
}

View file

@ -212,10 +212,10 @@ mod tests {
self.1.poll_ready(cx).map(|_| Ok(()))
}
async fn call<'a>(
&'a self,
async fn call(
&self,
req: &'static str,
ctx: ServiceCtx<'a, Self>,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let _ = ctx.clone();
Ok(req)

View file

@ -69,7 +69,7 @@ pub use self::pipeline::{Pipeline, PipelineCall};
/// type Response = u64;
/// type Error = Infallible;
///
/// async fn call<'a>(&'a self, req: u8, _: ServiceCtx<'a, Self>) -> Result<Self::Response, Self::Error> {
/// async fn call(&self, req: u8, _: ServiceCtx<'_, Self>) -> Result<Self::Response, Self::Error> {
/// Ok(req as u64)
/// }
/// }

View file

@ -162,7 +162,7 @@ mod tests {
Poll::Ready(Ok(()))
}
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Ok(())
}
}

View file

@ -178,7 +178,7 @@ mod tests {
}
}
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Err(())
}
}

View file

@ -215,10 +215,10 @@ mod tests {
self.0.poll_ready(cx)
}
async fn call<'a>(
&'a self,
async fn call(
&self,
req: R,
ctx: ServiceCtx<'a, Self>,
ctx: ServiceCtx<'_, Self>,
) -> Result<S::Response, S::Error> {
ctx.call(&self.0, req).await
}

View file

@ -114,10 +114,10 @@ mod tests {
Poll::Ready(Ok(()))
}
async fn call<'a>(
&'a self,
async fn call(
&self,
req: Result<&'static str, &'static str>,
_: ServiceCtx<'a, Self>,
_: ServiceCtx<'_, Self>,
) -> Result<&'static str, ()> {
match req {
Ok(msg) => Ok(msg),
@ -138,10 +138,10 @@ mod tests {
Poll::Ready(Ok(()))
}
async fn call<'a>(
&'a self,
async fn call(
&self,
req: Result<&'static str, ()>,
_: ServiceCtx<'a, Self>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, ()> {
match req {
Ok(msg) => Ok((msg, "ok")),

View file

@ -39,7 +39,7 @@ tls_openssl = { version = "0.10", package = "openssl", optional = true }
tls_rust = { version = "0.21", package = "rustls", optional = true }
[dev-dependencies]
ntex = { version = "0.7", features = ["openssl", "rustls", "tokio"] }
ntex = { version = "1.0", features = ["openssl", "rustls", "tokio"] }
env_logger = "0.10"
rustls-pemfile = "1.0"
webpki-roots = "0.25"

View file

@ -28,7 +28,7 @@ futures-sink = { version = "0.3", default-features = false, features = ["alloc"]
pin-project-lite = "0.2.9"
[dev-dependencies]
ntex = { version = "0.7", features = ["tokio"] }
ntex = { version = "1.0.0", features = ["tokio"] }
ntex-bytes = "0.1.21"
ntex-macros = "0.1.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }

View file

@ -316,7 +316,7 @@ mod tests {
}
}
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
self.0.ready.set(false);
self.0.count.set(self.0.count.get() + 1);
Ok(())

View file

@ -102,7 +102,7 @@ mod tests {
type Response = ();
type Error = ();
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
let _ = self.0.recv().await;
Ok(())
}

View file

@ -93,7 +93,7 @@ mod tests {
type Response = ();
type Error = ();
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<(), ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
let _ = self.0.recv().await;
Ok::<_, ()>(())
}

View file

@ -170,11 +170,7 @@ mod tests {
type Response = ();
type Error = SrvError;
async fn call<'a>(
&'a self,
_: (),
_: ServiceCtx<'a, Self>,
) -> Result<(), SrvError> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), SrvError> {
crate::time::sleep(self.0).await;
Ok::<_, SrvError>(())
}

View file

@ -253,7 +253,7 @@ mod tests {
Poll::Ready(())
}
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<usize, ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<usize, ()> {
Ok(1)
}
}
@ -273,7 +273,7 @@ mod tests {
Poll::Ready(())
}
async fn call<'a>(&'a self, _: (), _: ServiceCtx<'a, Self>) -> Result<usize, ()> {
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<usize, ()> {
Ok(2)
}
}

View file

@ -1,5 +1,9 @@
# Changes
## [1.0.0] - 2024-01-0x
* Use "async fn" in trait for Service definition
## [0.7.17] - 2024-01-05
* Allow to set default response payload limit and timeout

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.7.17"
version = "1.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -49,20 +49,20 @@ async-std = ["ntex-rt/async-std", "ntex-async-std", "ntex-connect/async-std"]
[dependencies]
ntex-codec = "0.6.2"
ntex-connect = "0.3.4"
ntex-connect = "1.0.0"
ntex-http = "0.1.11"
ntex-router = "0.5.2"
ntex-service = "1.2.7"
ntex-service = "2.0.0"
ntex-macros = "0.1.3"
ntex-util = "0.3.4"
ntex-util = "1.0.0"
ntex-bytes = "0.1.21"
ntex-h2 = "0.4.4"
ntex-h2 = "0.5.0"
ntex-rt = "0.4.11"
ntex-io = "0.3.17"
ntex-tls = "0.3.3"
ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.1", optional = true }
ntex-async-std = { version = "0.3.2", optional = true }
ntex-io = "1.0.0"
ntex-tls = "1.0.0"
ntex-tokio = { version = "0.4.0", optional = true }
ntex-glommio = { version = "0.4.0", optional = true }
ntex-async-std = { version = "0.4.0", optional = true }
async-channel = "2.1"
base64 = "0.21"

View file

@ -543,10 +543,10 @@ where
#[cfg(test)]
mod tests {
use futures_util::stream;
use std::io;
use std::{future::poll_fn, io};
use super::*;
use crate::util::{poll_fn, Ready};
use crate::util::Ready;
impl Body {
pub(crate) fn get_ref(&self) -> &[u8] {

View file

@ -3,9 +3,9 @@ use std::{fmt, task::Context, task::Poll, time::Duration};
use ntex_h2::{self as h2};
use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
use crate::service::{apply_fn, boxed, Service, ServiceCall, ServiceCtx};
use crate::service::{apply_fn, boxed, Service, ServiceCtx};
use crate::time::{Millis, Seconds};
use crate::util::{timeout::TimeoutError, timeout::TimeoutService, Either, Ready};
use crate::util::{timeout::TimeoutError, timeout::TimeoutService};
use crate::{http::Uri, io::IoBoxed};
use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect};
@ -54,7 +54,6 @@ impl Connector {
let conn = Connector {
connector: boxed::service(
TcpConnector::new()
.chain()
.map(IoBoxed::from)
.map_err(ConnectError::from),
),
@ -192,12 +191,8 @@ impl Connector {
T: Service<TcpConnect<Uri>, Error = crate::connect::ConnectError> + 'static,
IoBoxed: From<T::Response>,
{
self.connector = boxed::service(
connector
.chain()
.map(IoBoxed::from)
.map_err(ConnectError::from),
);
self.connector =
boxed::service(connector.map(IoBoxed::from).map_err(ConnectError::from));
self
}
@ -208,10 +203,7 @@ impl Connector {
IoBoxed: From<T::Response>,
{
self.ssl_connector = Some(boxed::service(
connector
.chain()
.map(IoBoxed::from)
.map_err(ConnectError::from),
connector.map(IoBoxed::from).map_err(ConnectError::from),
));
self
}
@ -265,7 +257,6 @@ fn connector(
async move { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)).await },
)
})
.chain()
.map(move |io: IoBoxed| {
io.set_disconnect_timeout(disconnect_timeout);
io
@ -290,12 +281,7 @@ where
{
type Response = <ConnectionPool<T> as Service<Connect>>::Response;
type Error = ConnectError;
type Future<'f> = Either<
ServiceCall<'f, ConnectionPool<T>, Connect>,
Ready<Self::Response, Self::Error>,
>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self.tcp_pool.poll_ready(cx)?.is_ready();
let ready = if let Some(ref ssl_pool) = self.ssl_pool {
@ -310,7 +296,6 @@ where
}
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let tcp_ready = self.tcp_pool.poll_shutdown(cx).is_ready();
let ssl_ready = self
@ -325,16 +310,20 @@ where
}
}
fn call<'a>(&'a self, req: Connect, ctx: ServiceCtx<'a, Self>) -> Self::Future<'_> {
async fn call(
&self,
req: Connect,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
match req.uri.scheme_str() {
Some("https") | Some("wss") => {
if let Some(ref conn) = self.ssl_pool {
Either::Left(ctx.call(conn, req))
ctx.call(conn, req).await
} else {
Either::Right(Ready::Err(ConnectError::SslIsNotSupported))
Err(ConnectError::SslIsNotSupported)
}
}
_ => Either::Left(ctx.call(&self.tcp_pool, req)),
_ => ctx.call(&self.tcp_pool, req).await,
}
}
}

View file

@ -1,4 +1,6 @@
use std::{io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant};
use std::{
future::poll_fn, io, io::Write, pin::Pin, task::Context, task::Poll, time::Instant,
};
use crate::http::body::{BodySize, MessageBody};
use crate::http::error::PayloadError;
@ -8,7 +10,7 @@ use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::{Payload, PayloadStream};
use crate::io::{IoBoxed, RecvError};
use crate::time::{timeout_checked, Millis};
use crate::util::{poll_fn, ready, BufMut, Bytes, BytesMut, Stream};
use crate::util::{ready, BufMut, Bytes, BytesMut, Stream};
use super::connection::{Connection, ConnectionType};
use super::error::{ConnectError, SendRequestError};

View file

@ -1,4 +1,4 @@
use std::io;
use std::{future::poll_fn, io};
use ntex_h2::client::{RecvStream, SimpleClient};
use ntex_h2::{self as h2, frame};
@ -8,7 +8,7 @@ use crate::http::header::{self, HeaderMap, HeaderValue};
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::{h2::payload, payload::Payload, Method, Version};
use crate::time::{timeout_checked, Millis};
use crate::util::{poll_fn, ByteString, Bytes};
use crate::util::{ByteString, Bytes};
use super::error::{ConnectError, SendRequestError};

View file

@ -8,7 +8,7 @@ use crate::http::uri::{Authority, Scheme, Uri};
use crate::io::{types::HttpProtocol, IoBoxed};
use crate::service::{Pipeline, PipelineCall, Service, ServiceCtx};
use crate::time::{now, Seconds};
use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet};
use crate::util::{ready, ByteString, HashMap, HashSet};
use crate::{channel::pool, rt::spawn, task::LocalWaker};
use super::connection::{Connection, ConnectionType};
@ -116,61 +116,62 @@ where
{
type Response = Connection;
type Error = ConnectError;
type Future<'f> = BoxFuture<'f, Result<Connection, ConnectError>>;
crate::forward_poll_ready!(connector);
crate::forward_poll_shutdown!(connector);
fn call<'a>(&'a self, req: Connect, _: ServiceCtx<'a, Self>) -> Self::Future<'_> {
async fn call(
&self,
req: Connect,
_: ServiceCtx<'_, Self>,
) -> Result<Connection, ConnectError> {
trace!("Get connection for {:?}", req.uri);
let inner = self.inner.clone();
let waiters = self.waiters.clone();
Box::pin(async move {
let key = if let Some(authority) = req.uri.authority() {
authority.clone().into()
} else {
return Err(ConnectError::Unresolved);
};
let key = if let Some(authority) = req.uri.authority() {
authority.clone().into()
} else {
return Err(ConnectError::Unresolved);
};
// acquire connection
let result = inner.borrow_mut().acquire(&key);
match result {
// use existing connection
Acquire::Acquired(io, created) => {
trace!("Use existing {:?} connection for {:?}", io, req.uri);
Ok(Connection::new(
io,
created,
Some(Acquired::new(key, inner)),
))
}
// open new tcp connection
Acquire::Available => {
trace!("Connecting to {:?}", req.uri);
let uri = req.uri.clone();
let (tx, rx) = waiters.borrow_mut().pool.channel();
OpenConnection::spawn(key, tx, uri, inner, &self.connector, req);
// acquire connection
let result = inner.borrow_mut().acquire(&key);
match result {
// use existing connection
Acquire::Acquired(io, created) => {
trace!("Use existing {:?} connection for {:?}", io, req.uri);
Ok(Connection::new(
io,
created,
Some(Acquired::new(key, inner)),
))
}
// open new tcp connection
Acquire::Available => {
trace!("Connecting to {:?}", req.uri);
let uri = req.uri.clone();
let (tx, rx) = waiters.borrow_mut().pool.channel();
OpenConnection::spawn(key, tx, uri, inner, &self.connector, req);
match rx.await {
Err(_) => Err(ConnectError::Disconnected(None)),
Ok(res) => res,
}
}
// pool is full, wait
Acquire::NotAvailable => {
trace!(
"Pool is full, waiting for available connections for {:?}",
req.uri
);
let rx = waiters.borrow_mut().wait_for(req);
match rx.await {
Err(_) => Err(ConnectError::Disconnected(None)),
Ok(res) => res,
}
match rx.await {
Err(_) => Err(ConnectError::Disconnected(None)),
Ok(res) => res,
}
}
})
// pool is full, wait
Acquire::NotAvailable => {
trace!(
"Pool is full, waiting for available connections for {:?}",
req.uri
);
let rx = waiters.borrow_mut().wait_for(req);
match rx.await {
Err(_) => Err(ConnectError::Disconnected(None)),
Ok(res) => res,
}
}
}
}
}
@ -659,8 +660,8 @@ mod tests {
assert!(pool.get_ref().inner.borrow().connecting.is_empty());
// pool is full, waiting
let mut fut = pool.call(req.clone());
assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending());
let mut fut = std::pin::pin!(pool.call(req.clone()));
assert!(lazy(|cx| fut.as_mut().poll(cx)).await.is_pending());
assert_eq!(pool.get_ref().waiters.borrow().waiters.len(), 1);
// release connection and push it to next waiter
@ -676,8 +677,8 @@ mod tests {
assert_eq!(store.borrow().len(), 2);
assert_eq!(pool.get_ref().inner.borrow().acquired, 1);
assert!(pool.get_ref().inner.borrow().connecting.is_empty());
let mut fut = pool.call(req.clone());
assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending());
let mut fut = std::pin::pin!(pool.call(req.clone()));
assert!(lazy(|cx| fut.as_mut().poll(cx)).await.is_pending());
assert_eq!(pool.get_ref().waiters.borrow().waiters.len(), 1);
// release and close
@ -692,7 +693,7 @@ mod tests {
assert_eq!(pool.get_ref().inner.borrow().acquired, 1);
// drop waiter, no interest in connection
let mut fut = pool.call(req.clone());
let mut fut = Box::pin(pool.call(req.clone()));
assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending());
drop(fut);
sleep(Millis(50)).await;
@ -704,8 +705,8 @@ mod tests {
uri: Uri::try_from("http://localhost2/test").unwrap(),
addr: None,
};
let mut fut = pool.call(req.clone());
assert!(lazy(|cx| Pin::new(&mut fut).poll(cx)).await.is_pending());
let mut fut = std::pin::pin!(pool.call(req.clone()));
assert!(lazy(|cx| fut.as_mut().poll(cx)).await.is_pending());
assert_eq!(pool.get_ref().waiters.borrow().waiters.len(), 1);
conn.release(false);
assert_eq!(pool.get_ref().inner.borrow().acquired, 0);

View file

@ -117,7 +117,7 @@ where
// slow-request timer
let (flags, max_timeout) = if let Some(cfg) = config.headers_read_rate() {
io.start_timer_secs(cfg.timeout);
io.start_timer(cfg.timeout);
(Flags::READ_HDRS_TIMEOUT, cfg.max_timeout)
} else {
(Flags::empty(), Seconds::ZERO)
@ -888,7 +888,7 @@ where
self.io.tag(),
total
);
self.io.start_timer_secs(cfg.timeout);
self.io.start_timer(cfg.timeout);
return Ok(());
}
}
@ -935,7 +935,7 @@ where
);
self.flags.insert(Flags::READ_KA_TIMEOUT);
if self.config.keep_alive_enabled() {
self.io.start_timer_secs(self.config.keep_alive);
self.io.start_timer(self.config.keep_alive);
}
}
} else {
@ -957,7 +957,7 @@ where
self.read_consumed = 0;
self.read_remains = decoded.remains as u32;
self.read_max_timeout = cfg.max_timeout;
self.io.start_timer_secs(cfg.timeout);
self.io.start_timer(cfg.timeout);
}
None
}
@ -973,7 +973,7 @@ where
self.read_remains = decoded.remains as u32;
self.read_consumed = decoded.consumed as u32;
self.read_max_timeout = cfg.max_timeout;
self.io.start_timer_secs(cfg.timeout);
self.io.start_timer(cfg.timeout);
}
}
}
@ -981,7 +981,7 @@ where
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cell::Cell, io, sync::Arc};
use std::{cell::Cell, future::poll_fn, io, sync::Arc};
use ntex_h2::Config;
use rand::Rng;
@ -992,7 +992,7 @@ mod tests {
use crate::http::{body, Request, ResponseHead, StatusCode};
use crate::io::{self as nio, Base};
use crate::service::{boxed, fn_service, IntoService};
use crate::util::{lazy, poll_fn, stream_recv, Bytes, BytesMut};
use crate::util::{lazy, stream_recv, Bytes, BytesMut};
use crate::{codec::Decoder, testing::Io, time::sleep, time::Millis, time::Seconds};
const BUFFER_SIZE: usize = 32_768;
@ -1274,9 +1274,7 @@ mod tests {
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
sleep(Millis(50)).await;
crate::util::poll_fn(|cx| Pin::new(&mut h1).poll(cx))
.await
.unwrap();
poll_fn(|cx| Pin::new(&mut h1).poll(cx)).await.unwrap();
assert!(h1.inner.io.is_closed());
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);

View file

@ -1,7 +1,7 @@
use std::io;
use crate::http::request::Request;
use crate::service::{Service, ServiceCtx, ServiceFactory};
use crate::{http::request::Request, util::Ready};
#[derive(Copy, Clone, Debug)]
pub struct ExpectHandler;
@ -11,21 +11,21 @@ impl ServiceFactory<Request> for ExpectHandler {
type Error = io::Error;
type Service = ExpectHandler;
type InitError = io::Error;
type Future<'f> = Ready<Self::Service, Self::InitError>;
#[inline]
fn create(&self, _: ()) -> Self::Future<'_> {
Ready::Ok(ExpectHandler)
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
Ok(ExpectHandler)
}
}
impl Service<Request> for ExpectHandler {
type Response = Request;
type Error = io::Error;
type Future<'f> = Ready<Self::Response, Self::Error>;
#[inline]
fn call<'a>(&'a self, req: Request, _: ServiceCtx<'a, Self>) -> Self::Future<'_> {
Ready::Ok(req)
async fn call(
&self,
req: Request,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
Ok(req)
}
}

View file

@ -205,8 +205,9 @@ impl Inner {
#[cfg(test)]
mod tests {
use std::future::poll_fn;
use super::*;
use crate::util::poll_fn;
#[crate::rt_test]
async fn test_unread_data() {

View file

@ -6,7 +6,6 @@ use crate::http::error::{DispatchError, ResponseError};
use crate::http::{request::Request, response::Response};
use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::BoxFuture;
use super::codec::Codec;
use super::dispatcher::Dispatcher;
@ -82,10 +81,9 @@ mod openssl {
> {
Acceptor::new(acceptor)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(SslError::Ssl)
.map_init_err(|_| panic!())
.and_then(self.chain().map_err(SslError::Service))
.and_then(self.map_err(SslError::Service))
}
}
}
@ -128,10 +126,9 @@ mod rustls {
> {
Acceptor::from(config)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(|e| SslError::Ssl(Box::new(e)))
.map_init_err(|_| panic!())
.and_then(self.chain().map_err(SslError::Service))
.and_then(self.map_err(SslError::Service))
}
}
}
@ -205,39 +202,36 @@ where
type Error = DispatchError;
type InitError = ();
type Service = H1ServiceHandler<F, S::Service, B, X::Service, U::Service>;
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>>;
fn create(&self, _: ()) -> Self::Future<'_> {
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
let fut = self.srv.create(());
let fut_ex = self.expect.create(());
let fut_upg = self.upgrade.as_ref().map(|f| f.create(()));
let on_request = self.on_request.borrow_mut().take();
let cfg = self.cfg.clone();
Box::pin(async move {
let service = fut
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let expect = fut_ex
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let upgrade = if let Some(fut) = fut_upg {
Some(
fut.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?,
)
} else {
None
};
let service = fut
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let expect = fut_ex
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let upgrade = if let Some(fut) = fut_upg {
Some(
fut.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?,
)
} else {
None
};
let config = Rc::new(DispatcherConfig::new(
cfg, service, expect, upgrade, on_request,
));
let config = Rc::new(DispatcherConfig::new(
cfg, service, expect, upgrade, on_request,
));
Ok(H1ServiceHandler {
config,
_t: marker::PhantomData,
})
Ok(H1ServiceHandler {
config,
_t: marker::PhantomData,
})
}
}
@ -262,7 +256,6 @@ where
{
type Response = ();
type Error = DispatchError;
type Future<'f> = Dispatcher<F, S, B, X, U>;
fn poll_ready(
&self,
@ -324,12 +317,12 @@ where
}
}
fn call<'a>(&'a self, io: Io<F>, _: ServiceCtx<'a, Self>) -> Self::Future<'_> {
async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), DispatchError> {
log::trace!(
"New http1 connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
);
Dispatcher::new(io, self.config.clone())
Dispatcher::new(io, self.config.clone()).await
}
}

View file

@ -1,8 +1,8 @@
use std::{io, marker::PhantomData};
use crate::http::{h1::Codec, request::Request};
use crate::io::Io;
use crate::service::{Service, ServiceCtx, ServiceFactory};
use crate::{io::Io, util::Ready};
pub struct UpgradeHandler<F>(PhantomData<F>);
@ -12,10 +12,8 @@ impl<F> ServiceFactory<(Request, Io<F>, Codec)> for UpgradeHandler<F> {
type Service = UpgradeHandler<F>;
type InitError = io::Error;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;
#[inline]
fn create(&self, _: ()) -> Self::Future<'_> {
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
unimplemented!()
}
}
@ -23,14 +21,12 @@ impl<F> ServiceFactory<(Request, Io<F>, Codec)> for UpgradeHandler<F> {
impl<F> Service<(Request, Io<F>, Codec)> for UpgradeHandler<F> {
type Response = ();
type Error = io::Error;
type Future<'f> = Ready<Self::Response, Self::Error> where F: 'f;
#[inline]
fn call<'a>(
&'a self,
async fn call(
&self,
_: (Request, Io<F>, Codec),
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
unimplemented!()
}
}

View file

@ -1,10 +1,11 @@
//! Payload stream
use std::collections::VecDeque;
use std::task::{Context, Poll};
use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, rc::Weak};
use std::{cell::RefCell, future::poll_fn, pin::Pin, rc::Rc, rc::Weak};
use ntex_h2::{self as h2};
use crate::util::{poll_fn, Bytes, Stream};
use crate::util::{Bytes, Stream};
use crate::{http::error::PayloadError, task::LocalWaker};
/// Buffered stream of byte chunks

View file

@ -1,5 +1,5 @@
use std::{cell::RefCell, io, task::Context, task::Poll};
use std::{marker::PhantomData, mem, rc::Rc};
use std::{future::poll_fn, marker::PhantomData, mem, rc::Rc};
use ntex_h2::{self as h2, frame::StreamId, server};
@ -11,7 +11,7 @@ use crate::http::message::{CurrentIo, ResponseHead};
use crate::http::{DateService, Method, Request, Response, StatusCode, Uri, Version};
use crate::io::{types, Filter, Io, IoBoxed, IoRef};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::{poll_fn, BoxFuture, Bytes, BytesMut, Either, HashMap, Ready};
use crate::util::{Bytes, BytesMut, HashMap};
use super::payload::{Payload, PayloadSender};
@ -71,10 +71,9 @@ mod openssl {
> {
Acceptor::new(acceptor)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(SslError::Ssl)
.map_init_err(|_| panic!())
.and_then(self.chain().map_err(SslError::Service))
.and_then(self.map_err(SslError::Service))
}
}
}
@ -110,10 +109,9 @@ mod rustls {
Acceptor::from(config)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(|e| SslError::Ssl(Box::new(e)))
.map_init_err(|_| panic!())
.and_then(self.chain().map_err(SslError::Service))
.and_then(self.map_err(SslError::Service))
}
}
}
@ -130,20 +128,20 @@ where
type Error = DispatchError;
type InitError = S::InitError;
type Service = H2ServiceHandler<F, S::Service, B>;
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>>;
fn create(&self, _: ()) -> Self::Future<'_> {
let fut = self.srv.create(());
let cfg = self.cfg.clone();
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
let service = self.srv.create(()).await?;
let config = Rc::new(DispatcherConfig::new(
self.cfg.clone(),
service,
(),
None,
None,
));
Box::pin(async move {
let service = fut.await?;
let config = Rc::new(DispatcherConfig::new(cfg, service, (), None, None));
Ok(H2ServiceHandler {
config,
_t: PhantomData,
})
Ok(H2ServiceHandler {
config,
_t: PhantomData,
})
}
}
@ -164,9 +162,7 @@ where
{
type Response = ();
type Error = DispatchError;
type Future<'f> = BoxFuture<'f, Result<Self::Response, Self::Error>>;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.config.service.poll_ready(cx).map_err(|e| {
log::error!("Service readiness error: {:?}", e);
@ -174,18 +170,21 @@ where
})
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
self.config.service.poll_shutdown(cx)
}
fn call<'a>(&'a self, io: Io<F>, _: ServiceCtx<'a, Self>) -> Self::Future<'_> {
async fn call(
&self,
io: Io<F>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
log::trace!(
"New http2 connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
);
Box::pin(handle(io.into(), self.config.clone()))
handle(io.into(), self.config.clone()).await
}
}
@ -226,15 +225,14 @@ impl ControlService {
impl Service<h2::ControlMessage<H2Error>> for ControlService {
type Response = h2::ControlResult;
type Error = ();
type Future<'f> = Ready<Self::Response, Self::Error>;
fn call<'a>(
&'a self,
async fn call(
&self,
msg: h2::ControlMessage<H2Error>,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
log::trace!("Control message: {:?}", msg);
Ready::Ok::<_, ()>(msg.ack())
Ok::<_, ()>(msg.ack())
}
}
@ -273,12 +271,12 @@ where
{
type Response = ();
type Error = H2Error;
type Future<'f> = Either<
BoxFuture<'f, Result<Self::Response, Self::Error>>,
Ready<Self::Response, Self::Error>,
>;
fn call<'a>(&'a self, msg: h2::Message, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
async fn call(
&self,
msg: h2::Message,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let h2::Message { stream, kind } = msg;
let (io, pseudo, headers, eof, payload) = match kind {
h2::MessageKind::Headers {
@ -303,7 +301,7 @@ where
} else {
log::error!("Payload stream does not exists for {:?}", stream.id());
};
return Either::Right(Ready::Ok(()));
return Ok(());
}
h2::MessageKind::Eof(item) => {
log::debug!("Got payload eof for {:?}: {:?}", stream.id(), item);
@ -318,95 +316,93 @@ where
h2::StreamEof::Error(err) => sender.set_error(err.into()),
}
}
return Either::Right(Ready::Ok(()));
return Ok(());
}
h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err);
if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) {
sender.set_error(io::Error::new(io::ErrorKind::Other, err).into());
}
return Either::Right(Ready::Ok(()));
return Ok(());
}
};
let cfg = self.config.clone();
Either::Left(Box::pin(async move {
log::trace!(
"{:?} got request (eof: {}): {:#?}\nheaders: {:#?}",
stream.id(),
eof,
pseudo,
headers
);
let mut req = if let Some(pl) = payload {
Request::with_payload(crate::http::Payload::H2(pl))
} else {
Request::new()
};
log::trace!(
"{:?} got request (eof: {}): {:#?}\nheaders: {:#?}",
stream.id(),
eof,
pseudo,
headers
);
let mut req = if let Some(pl) = payload {
Request::with_payload(crate::http::Payload::H2(pl))
} else {
Request::new()
};
let path = pseudo.path.ok_or(H2Error::MissingPseudo("Path"))?;
let method = pseudo.method.ok_or(H2Error::MissingPseudo("Method"))?;
let path = pseudo.path.ok_or(H2Error::MissingPseudo("Path"))?;
let method = pseudo.method.ok_or(H2Error::MissingPseudo("Method"))?;
let head = req.head_mut();
head.uri = if let Some(ref authority) = pseudo.authority {
let scheme = pseudo.scheme.ok_or(H2Error::MissingPseudo("Scheme"))?;
Uri::try_from(format!("{}://{}{}", scheme, authority, path))?
} else {
Uri::try_from(path.as_str())?
};
let is_head_req = method == Method::HEAD;
head.version = Version::HTTP_2;
head.method = method;
head.headers = headers;
head.io = CurrentIo::Ref(io);
let head = req.head_mut();
head.uri = if let Some(ref authority) = pseudo.authority {
let scheme = pseudo.scheme.ok_or(H2Error::MissingPseudo("Scheme"))?;
Uri::try_from(format!("{}://{}{}", scheme, authority, path))?
} else {
Uri::try_from(path.as_str())?
};
let is_head_req = method == Method::HEAD;
head.version = Version::HTTP_2;
head.method = method;
head.headers = headers;
head.io = CurrentIo::Ref(io);
let (mut res, mut body) = match cfg.service.call(req).await {
Ok(res) => res.into().into_parts(),
Err(err) => {
let (res, body) = Response::from(&err).into_parts();
(res, body.into_body())
}
};
let (mut res, mut body) = match cfg.service.call(req).await {
Ok(res) => res.into().into_parts(),
Err(err) => {
let (res, body) = Response::from(&err).into_parts();
(res, body.into_body())
}
};
let head = res.head_mut();
let mut size = body.size();
prepare_response(&cfg.timer, head, &mut size);
let head = res.head_mut();
let mut size = body.size();
prepare_response(&cfg.timer, head, &mut size);
log::debug!("Received service response: {:?} payload: {:?}", head, size);
log::debug!("Received service response: {:?} payload: {:?}", head, size);
let hdrs = mem::replace(&mut head.headers, HeaderMap::new());
if size.is_eof() || is_head_req {
stream.send_response(head.status, hdrs, true)?;
} else {
stream.send_response(head.status, hdrs, false)?;
let hdrs = mem::replace(&mut head.headers, HeaderMap::new());
if size.is_eof() || is_head_req {
stream.send_response(head.status, hdrs, true)?;
} else {
stream.send_response(head.status, hdrs, false)?;
loop {
match poll_fn(|cx| body.poll_next_chunk(cx)).await {
None => {
log::debug!("{:?} closing payload stream", stream.id());
stream.send_payload(Bytes::new(), true).await?;
break;
}
Some(Ok(chunk)) => {
log::debug!(
"{:?} sending data chunk {:?} bytes",
stream.id(),
chunk.len()
);
if !chunk.is_empty() {
stream.send_payload(chunk, false).await?;
}
}
Some(Err(e)) => {
error!("Response payload stream error: {:?}", e);
return Err(e.into());
loop {
match poll_fn(|cx| body.poll_next_chunk(cx)).await {
None => {
log::debug!("{:?} closing payload stream", stream.id());
stream.send_payload(Bytes::new(), true).await?;
break;
}
Some(Ok(chunk)) => {
log::debug!(
"{:?} sending data chunk {:?} bytes",
stream.id(),
chunk.len()
);
if !chunk.is_empty() {
stream.send_payload(chunk, false).await?;
}
}
Some(Err(e)) => {
error!("Response payload stream error: {:?}", e);
return Err(e.into());
}
}
}
Ok(())
}))
}
Ok(())
}
}

View file

@ -1,7 +1,7 @@
use std::{fmt, mem, pin::Pin, task::Context, task::Poll};
use std::{fmt, future::poll_fn, mem, pin::Pin, task::Context, task::Poll};
use super::{error::PayloadError, h1, h2};
use crate::util::{poll_fn, Bytes, Stream};
use crate::util::{Bytes, Stream};
/// Type represent boxed payload
pub type PayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>;

View file

@ -1,9 +1,7 @@
use std::task::{Context, Poll};
use std::{cell, error, fmt, future, marker, pin::Pin, rc::Rc};
use std::{cell, error, fmt, marker, rc::Rc, task::Context, task::Poll};
use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::BoxFuture;
use super::body::MessageBody;
use super::builder::HttpServiceBuilder;
@ -175,10 +173,9 @@ mod openssl {
> {
Acceptor::new(acceptor)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(SslError::Ssl)
.map_init_err(|_| panic!())
.and_then(self.chain().map_err(SslError::Service))
.and_then(self.map_err(SslError::Service))
}
}
}
@ -222,10 +219,9 @@ mod rustls {
Acceptor::from(config)
.timeout(self.cfg.ssl_handshake_timeout)
.chain()
.map_err(|e| SslError::Ssl(Box::new(e)))
.map_init_err(|_| panic!())
.and_then(self.chain().map_err(SslError::Service))
.and_then(self.map_err(SslError::Service))
}
}
}
@ -249,39 +245,36 @@ where
type Error = DispatchError;
type InitError = ();
type Service = HttpServiceHandler<F, S::Service, B, X::Service, U::Service>;
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>>;
fn create(&self, _: ()) -> Self::Future<'_> {
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
let fut = self.srv.create(());
let fut_ex = self.expect.create(());
let fut_upg = self.upgrade.as_ref().map(|f| f.create(()));
let on_request = self.on_request.borrow_mut().take();
let cfg = self.cfg.clone();
Box::pin(async move {
let service = fut
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let service = fut
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let expect = fut_ex
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let expect = fut_ex
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
let upgrade = if let Some(fut) = fut_upg {
Some(
fut.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?,
)
} else {
None
};
let upgrade = if let Some(fut) = fut_upg {
Some(
fut.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?,
)
} else {
None
};
let config = DispatcherConfig::new(cfg, service, expect, upgrade, on_request);
let config = DispatcherConfig::new(cfg, service, expect, upgrade, on_request);
Ok(HttpServiceHandler {
config: Rc::new(config),
_t: marker::PhantomData,
})
Ok(HttpServiceHandler {
config: Rc::new(config),
_t: marker::PhantomData,
})
}
}
@ -306,7 +299,6 @@ where
{
type Response = ();
type Error = DispatchError;
type Future<'f> = HttpServiceHandlerResponse<F, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let cfg = self.config.as_ref();
@ -365,96 +357,20 @@ where
}
}
fn call<'a>(&'a self, io: Io<F>, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
async fn call(
&self,
io: Io<F>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
log::trace!(
"New http connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
);
if io.query::<types::HttpProtocol>().get() == Some(types::HttpProtocol::Http2) {
HttpServiceHandlerResponse {
state: ResponseState::H2 {
fut: Box::pin(h2::handle(io.into(), self.config.clone())),
},
}
h2::handle(io.into(), self.config.clone()).await
} else {
HttpServiceHandlerResponse {
state: ResponseState::H1 {
fut: h1::Dispatcher::new(io, self.config.clone()),
},
}
}
}
}
pin_project_lite::pin_project! {
pub struct HttpServiceHandlerResponse<F, S, B, X, U>
where
F: Filter,
S: Service<Request>,
S: 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X: 'static,
X::Error: ResponseError,
X::Error: 'static,
U: Service<(Request, Io<F>, h1::Codec), Response = ()>,
U: 'static,
U::Error: fmt::Display,
U::Error: error::Error,
U: 'static,
{
#[pin]
state: ResponseState<F, S, B, X, U>,
}
}
pin_project_lite::pin_project! {
#[project = StateProject]
enum ResponseState<F, S, B, X, U>
where
F: Filter,
S: Service<Request>,
S: 'static,
S::Error: ResponseError,
B: MessageBody,
X: Service<Request, Response = Request>,
X: 'static,
X::Error: ResponseError,
X::Error: 'static,
U: Service<(Request, Io<F>, h1::Codec), Response = ()>,
U: 'static,
U::Error: fmt::Display,
U::Error: error::Error,
U: 'static,
{
H1 { #[pin] fut: h1::Dispatcher<F, S, B, X, U> },
H2 { fut: BoxFuture<'static, Result<(), DispatchError>> },
}
}
impl<F, S, B, X, U> future::Future for HttpServiceHandlerResponse<F, S, B, X, U>
where
F: Filter,
S: Service<Request> + 'static,
S::Error: ResponseError,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request> + 'static,
X::Error: ResponseError,
U: Service<(Request, Io<F>, h1::Codec), Response = ()> + 'static,
U::Error: fmt::Display + error::Error,
{
type Output = Result<(), DispatchError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
match this.state.project() {
StateProject::H1 { fut } => fut.poll(cx),
StateProject::H2 { ref mut fut } => Pin::new(fut).poll(cx),
h1::Dispatcher::new(io, self.config.clone()).await
}
}
}

View file

@ -39,7 +39,7 @@ pub mod ws;
pub use self::service::{
chain, chain_factory, fn_service, into_service, IntoService, IntoServiceFactory,
Middleware, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory,
Middleware, Pipeline, Service, ServiceCtx, ServiceFactory,
};
pub use ntex_util::{channel, task};

View file

@ -323,7 +323,7 @@ impl ServerBuilder {
pub fn set_tag<N: AsRef<str>>(mut self, name: N, tag: &'static str) -> Self {
let mut token = None;
for sock in &self.sockets {
if &sock.1 == name.as_ref() {
if sock.1 == name.as_ref() {
token = Some(sock.0);
break;
}

View file

@ -397,20 +397,17 @@ where
type Error = ();
type InitError = ();
type Service = BoxedServerService;
type Future<'f> = BoxFuture<'f, Result<BoxedServerService, ()>> where Self: 'f;
fn create(&self, _: ()) -> Self::Future<'_> {
async fn create(&self, _: ()) -> Result<BoxedServerService, ()> {
let tag = self.tag;
let pool = self.pool;
let fut = self.inner.create(());
Box::pin(async move {
match fut.await {
Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))),
Err(e) => {
error!("Cannot construct service: {:?}", e);
Err(())
}
match self.inner.create(()).await {
Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))),
Err(e) => {
error!("Cannot construct service: {:?}", e);
Err(())
}
})
}
}
}

View file

@ -62,7 +62,6 @@ where
{
type Response = ();
type Error = ();
type Future<'f> = BoxFuture<'f, Result<(), ()>> where T: 'f;
crate::forward_poll_shutdown!(service);
@ -77,32 +76,30 @@ where
}
}
fn call<'a>(
&'a self,
async fn call(
&self,
(guard, req): (Option<CounterGuard>, ServerMessage),
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
Box::pin(async move {
match req {
ServerMessage::Connect(stream) => {
let stream = stream.try_into().map_err(|e| {
error!("Cannot convert to an async io stream: {}", e);
});
ctx: ServiceCtx<'_, Self>,
) -> Result<(), ()> {
match req {
ServerMessage::Connect(stream) => {
let stream = stream.try_into().map_err(|e| {
error!("Cannot convert to an async io stream: {}", e);
});
if let Ok(stream) = stream {
let stream: Io<_> = stream;
stream.set_tag(self.tag);
stream.set_memory_pool(self.pool_ref);
let _ = ctx.call(self.service.as_ref(), stream).await;
drop(guard);
Ok(())
} else {
Err(())
}
if let Ok(stream) = stream {
let stream: Io<_> = stream;
stream.set_tag(self.tag);
stream.set_memory_pool(self.pool_ref);
let _ = ctx.call(self.service.as_ref(), stream).await;
drop(guard);
Ok(())
} else {
Err(())
}
_ => Ok(()),
}
})
_ => Ok(()),
}
}
}

View file

@ -515,7 +515,7 @@ mod tests {
use crate::io::Io;
use crate::server::service::Factory;
use crate::service::{Service, ServiceCtx, ServiceFactory};
use crate::util::{lazy, Ready};
use crate::util::lazy;
#[derive(Clone, Copy, Debug)]
enum St {
@ -535,12 +535,11 @@ mod tests {
type Error = ();
type Service = Srv;
type InitError = ();
type Future<'f> = Ready<Srv, ()>;
fn create(&self, _: ()) -> Self::Future<'_> {
async fn create(&self, _: ()) -> Result<Srv, ()> {
let mut cnt = self.counter.lock().unwrap();
*cnt += 1;
Ready::Ok(Srv {
Ok(Srv {
st: self.st.clone(),
})
}
@ -553,7 +552,6 @@ mod tests {
impl Service<Io> for Srv {
type Response = ();
type Error = ();
type Future<'f> = Ready<(), ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let st: St = { *self.st.lock().unwrap() };
@ -574,8 +572,8 @@ mod tests {
}
}
fn call<'a>(&'a self, _: Io, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Ready::Ok(())
async fn call(&self, _: Io, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Ok(())
}
}

View file

@ -7,7 +7,7 @@ use crate::service::{
chain_factory, dev::ServiceChainFactory, map_config, IntoServiceFactory,
};
use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack};
use crate::util::{BoxFuture, Extensions, Ready};
use crate::util::{BoxFuture, Extensions};
use super::app_service::{AppFactory, AppService};
use super::config::{AppConfig, ServiceConfig};
@ -269,9 +269,9 @@ where
U::InitError: fmt::Debug,
{
// create and configure default resource
self.default = Some(Rc::new(boxed::factory(f.chain().map_init_err(|e| {
log::error!("Cannot construct default service: {:?}", e)
}))));
self.default = Some(Rc::new(boxed::factory(chain_factory(f).map_init_err(
|e| log::error!("Cannot construct default service: {:?}", e),
))));
self
}
@ -569,26 +569,22 @@ impl<Err: ErrorRenderer> ServiceFactory<WebRequest<Err>> for Filter<Err> {
type Error = Err::Container;
type InitError = ();
type Service = Filter<Err>;
type Future<'f> = Ready<Filter<Err>, ()>;
#[inline]
fn create(&self, _: ()) -> Self::Future<'_> {
Ready::Ok(Filter(PhantomData))
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
Ok(Filter(PhantomData))
}
}
impl<Err: ErrorRenderer> Service<WebRequest<Err>> for Filter<Err> {
type Response = WebRequest<Err>;
type Error = Err::Container;
type Future<'f> = Ready<WebRequest<Err>, Err::Container>;
#[inline]
fn call<'a>(
&'a self,
async fn call(
&self,
req: WebRequest<Err>,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
Ready::Ok(req)
_: ServiceCtx<'_, Self>,
) -> Result<WebRequest<Err>, Err::Container> {
Ok(req)
}
}

View file

@ -1,14 +1,11 @@
use std::task::{Context, Poll};
use std::{cell::RefCell, future::Future, marker::PhantomData, pin::Pin, rc::Rc};
use std::{cell::RefCell, marker::PhantomData, rc::Rc, task::Context, task::Poll};
use crate::http::{Request, Response};
use crate::router::{Path, ResourceDef, Router};
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::dev::ServiceChainFactory;
use crate::service::{
fn_service, Middleware, Service, ServiceCall, ServiceCtx, ServiceFactory,
};
use crate::util::{BoxFuture, Either, Extensions};
use crate::service::{fn_service, Middleware, Service, ServiceCtx, ServiceFactory};
use crate::util::{BoxFuture, Extensions};
use super::config::AppConfig;
use super::error::ErrorRenderer;
@ -24,8 +21,6 @@ type HttpService<Err: ErrorRenderer> =
BoxService<WebRequest<Err>, WebResponse, Err::Container>;
type HttpNewService<Err: ErrorRenderer> =
BoxServiceFactory<(), WebRequest<Err>, WebResponse, Err::Container, ()>;
type BoxResponse<'a, Err: ErrorRenderer> =
ServiceCall<'a, HttpService<Err>, WebRequest<Err>>;
type FnStateFactory = Box<dyn Fn(Extensions) -> BoxFuture<'static, Result<Extensions, ()>>>;
/// Service factory to convert `Request` to a `WebRequest<S>`.
@ -66,10 +61,9 @@ where
type Error = Err::Container;
type InitError = ();
type Service = AppFactoryService<T::Service, Err>;
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>> where Self: 'f;
fn create(&self, _: ()) -> Self::Future<'_> {
ServiceFactory::create(self, AppConfig::default())
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
ServiceFactory::create(self, AppConfig::default()).await
}
}
@ -89,9 +83,8 @@ where
type Error = Err::Container;
type InitError = ();
type Service = AppFactoryService<T::Service, Err>;
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>> where Self: 'f;
fn create(&self, config: AppConfig) -> Self::Future<'_> {
async fn create(&self, config: AppConfig) -> Result<Self::Service, Self::InitError> {
let services = std::mem::take(&mut *self.services.borrow_mut());
// update resource default service
@ -114,65 +107,63 @@ where
router.case_insensitive();
}
Box::pin(async move {
// app state factories
for fut in state_factories.iter() {
extensions = fut(extensions).await?;
}
let state = AppState::new(extensions, None, config.clone());
// app state factories
for fut in state_factories.iter() {
extensions = fut(extensions).await?;
}
let state = AppState::new(extensions, None, config.clone());
// App config
let mut config = WebServiceConfig::new(state.clone(), default.clone());
// App config
let mut config = WebServiceConfig::new(state.clone(), default.clone());
// register services
services
.into_iter()
.for_each(|mut srv| srv.register(&mut config));
let services = config.into_services();
// register services
services
.into_iter()
.for_each(|mut srv| srv.register(&mut config));
let services = config.into_services();
// resource map
let mut rmap = ResourceMap::new(ResourceDef::new(""));
for mut rdef in external {
rmap.add(&mut rdef, None);
}
// resource map
let mut rmap = ResourceMap::new(ResourceDef::new(""));
for mut rdef in external {
rmap.add(&mut rdef, None);
}
// complete pipeline creation
let services: Vec<_> = services
.into_iter()
.map(|(mut rdef, srv, guards, nested)| {
rmap.add(&mut rdef, nested);
(rdef, srv, RefCell::new(guards))
})
.collect();
// complete ResourceMap tree creation
let rmap = Rc::new(rmap);
rmap.finish(rmap.clone());
// create http services
for (path, factory, guards) in &mut services.iter() {
let service = factory.create(()).await?;
router.rdef(path.clone(), service).2 = guards.borrow_mut().take();
}
let routing = AppRouting {
router: router.finish(),
default: Some(default.create(()).await?),
};
// main service
let service = AppService {
routing,
filter: filter_fut.await?,
};
Ok(AppFactoryService {
rmap,
state,
service: middleware.create(service),
pool: HttpRequestPool::create(),
_t: PhantomData,
// complete pipeline creation
let services: Vec<_> = services
.into_iter()
.map(|(mut rdef, srv, guards, nested)| {
rmap.add(&mut rdef, nested);
(rdef, srv, RefCell::new(guards))
})
.collect();
// complete ResourceMap tree creation
let rmap = Rc::new(rmap);
rmap.finish(rmap.clone());
// create http services
for (path, factory, guards) in &mut services.iter() {
let service = factory.create(()).await?;
router.rdef(path.clone(), service).2 = guards.borrow_mut().take();
}
let routing = AppRouting {
router: router.finish(),
default: Some(default.create(()).await?),
};
// main service
let service = AppService {
routing,
filter: filter_fut.await?,
};
Ok(AppFactoryService {
rmap,
state,
service: middleware.create(service),
pool: HttpRequestPool::create(),
_t: PhantomData,
})
}
}
@ -197,12 +188,15 @@ where
{
type Response = WebResponse;
type Error = T::Error;
type Future<'f> = ServiceCall<'f, T, WebRequest<Err>> where T: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
fn call<'a>(&'a self, req: Request, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
async fn call(
&self,
req: Request,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let (head, payload) = req.into_parts();
let req = if let Some(mut req) = self.pool.get_request() {
@ -222,7 +216,7 @@ where
self.pool,
)
};
ctx.call(&self.service, WebRequest::new(req))
ctx.call(&self.service, WebRequest::new(req)).await
}
}
@ -244,14 +238,12 @@ struct AppRouting<Err: ErrorRenderer> {
impl<Err: ErrorRenderer> Service<WebRequest<Err>> for AppRouting<Err> {
type Response = WebResponse;
type Error = Err::Container;
type Future<'f> =
Either<BoxResponse<'f, Err>, BoxFuture<'f, Result<WebResponse, Err::Container>>>;
fn call<'a>(
&'a self,
async fn call(
&self,
mut req: WebRequest<Err>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
ctx: ServiceCtx<'_, Self>,
) -> Result<WebResponse, Err::Container> {
let res = self.router.recognize_checked(&mut req, |req, guards| {
if let Some(guards) = guards {
for f in guards {
@ -264,14 +256,12 @@ impl<Err: ErrorRenderer> Service<WebRequest<Err>> for AppRouting<Err> {
});
if let Some((srv, _info)) = res {
Either::Left(ctx.call(srv, req))
ctx.call(srv, req).await
} else if let Some(ref default) = self.default {
Either::Left(ctx.call(default, req))
ctx.call(default, req).await
} else {
let req = req.into_parts().0;
Either::Right(Box::pin(async {
Ok(WebResponse::new(Response::NotFound().finish(), req))
}))
Ok(WebResponse::new(Response::NotFound().finish(), req))
}
}
}
@ -289,7 +279,6 @@ where
{
type Response = WebResponse;
type Error = Err::Container;
type Future<'f> = AppServiceResponse<'f, F, Err> where F: 'f;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -302,58 +291,13 @@ where
}
}
fn call<'a>(
&'a self,
async fn call(
&self,
req: WebRequest<Err>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
AppServiceResponse {
filter: ctx.call(&self.filter, req),
routing: &self.routing,
endpoint: None,
ctx,
}
}
}
type BoxAppServiceResponse<'a, Err: ErrorRenderer> =
ServiceCall<'a, AppRouting<Err>, WebRequest<Err>>;
pin_project_lite::pin_project! {
pub struct AppServiceResponse<'f, F: Service<WebRequest<Err>>, Err: ErrorRenderer>
where F: 'f
{
#[pin]
filter: ServiceCall<'f, F, WebRequest<Err>>,
routing: &'f AppRouting<Err>,
endpoint: Option<BoxAppServiceResponse<'f, Err>>,
ctx: ServiceCtx<'f, AppService<F, Err>>,
}
}
impl<'f, F, Err> Future for AppServiceResponse<'f, F, Err>
where
F: Service<WebRequest<Err>, Response = WebRequest<Err>, Error = Err::Container>,
Err: ErrorRenderer,
{
type Output = Result<WebResponse, Err::Container>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
loop {
if let Some(fut) = this.endpoint.as_mut() {
return Pin::new(fut).poll(cx);
} else {
let res = if let Poll::Ready(res) = this.filter.poll(cx) {
res?
} else {
return Poll::Pending;
};
*this.endpoint = Some(this.ctx.call(this.routing, res));
this = self.as_mut().project();
}
}
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let req = ctx.call(&self.filter, req).await?;
ctx.call(&self.routing, req).await
}
}

View file

@ -84,10 +84,6 @@ pub enum StateExtractorError {
NotConfigured,
}
#[deprecated]
#[doc(hidden)]
pub type DataExtractorError = StateExtractorError;
/// Errors which can occur when attempting to generate resource uri.
#[derive(Error, Debug, Copy, Clone, PartialEq, Eq)]
pub enum UrlGenerationError {

View file

@ -1,10 +1,9 @@
//! `Middleware` for compressing response body.
use std::task::{Context, Poll};
use std::{cmp, future::Future, marker, pin::Pin, str::FromStr};
use std::{cmp, str::FromStr};
use crate::http::encoding::Encoder;
use crate::http::header::{ContentEncoding, ACCEPT_ENCODING};
use crate::service::{Middleware, Service, ServiceCall, ServiceCtx};
use crate::service::{Middleware, Service, ServiceCtx};
use crate::web::{BodyEncoding, ErrorRenderer, WebRequest, WebResponse};
#[derive(Debug, Clone)]
@ -67,16 +66,15 @@ where
{
type Response = WebResponse;
type Error = S::Error;
type Future<'f> = CompressResponse<'f, S, E> where S: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
fn call<'a>(
&'a self,
async fn call(
&self,
req: WebRequest<E>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
ctx: ServiceCtx<'_, Self>,
) -> Result<WebResponse, S::Error> {
// negotiate content-encoding
let encoding = if let Some(val) = req.headers().get(&ACCEPT_ENCODING) {
if let Ok(enc) = val.to_str() {
@ -88,50 +86,15 @@ where
ContentEncoding::Identity
};
CompressResponse {
encoding,
fut: ctx.call(&self.service, req),
_t: marker::PhantomData,
}
}
}
let resp = ctx.call(&self.service, req).await?;
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct CompressResponse<'f, S: Service<WebRequest<E>>, E>
where S: 'f, E: 'f
{
#[pin]
fut: ServiceCall<'f, S, WebRequest<E>>,
encoding: ContentEncoding,
_t: marker::PhantomData<E>,
}
}
let enc = if let Some(enc) = resp.response().get_encoding() {
enc
} else {
encoding
};
impl<'f, S, E> Future for CompressResponse<'f, S, E>
where
S: Service<WebRequest<E>, Response = WebResponse>,
E: ErrorRenderer,
{
type Output = Result<WebResponse, S::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.fut.poll(cx)? {
Poll::Ready(resp) => {
let enc = if let Some(enc) = resp.response().get_encoding() {
enc
} else {
*this.encoding
};
Poll::Ready(Ok(
resp.map_body(move |head, body| Encoder::response(enc, head, body))
))
}
Poll::Pending => Poll::Pending,
}
Ok(resp.map_body(move |head, body| Encoder::response(enc, head, body)))
}
}

View file

@ -4,7 +4,6 @@ use std::rc::Rc;
use crate::http::error::HttpError;
use crate::http::header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE};
use crate::service::{Middleware, Service, ServiceCtx};
use crate::util::BoxFuture;
use crate::web::{WebRequest, WebResponse};
/// `Middleware` for setting default response headers.
@ -111,35 +110,31 @@ where
{
type Response = WebResponse;
type Error = S::Error;
type Future<'f> =
BoxFuture<'f, Result<Self::Response, Self::Error>> where S: 'f, E: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
fn call<'a>(
&'a self,
async fn call(
&self,
req: WebRequest<E>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
Box::pin(async move {
let mut res = ctx.call(&self.service, req).await?;
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let mut res = ctx.call(&self.service, req).await?;
// set response headers
for (key, value) in self.inner.headers.iter() {
if !res.headers().contains_key(key) {
res.headers_mut().insert(key.clone(), value.clone());
}
// set response headers
for (key, value) in self.inner.headers.iter() {
if !res.headers().contains_key(key) {
res.headers_mut().insert(key.clone(), value.clone());
}
// default content-type
if self.inner.ct && !res.headers().contains_key(&CONTENT_TYPE) {
res.headers_mut().insert(
CONTENT_TYPE,
HeaderValue::from_static("application/octet-stream"),
);
}
Ok(res)
})
}
// default content-type
if self.inner.ct && !res.headers().contains_key(&CONTENT_TYPE) {
res.headers_mut().insert(
CONTENT_TYPE,
HeaderValue::from_static("application/octet-stream"),
);
}
Ok(res)
}
}

View file

@ -1,14 +1,13 @@
//! Request logging middleware
use std::task::{ready, Context, Poll};
use std::{env, error::Error, future::Future};
use std::{fmt, fmt::Display, marker::PhantomData, pin::Pin, rc::Rc, time};
use std::task::{Context, Poll};
use std::{env, error::Error, fmt, fmt::Display, rc::Rc, time};
use regex::Regex;
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::HeaderName;
use crate::service::{Middleware, Service, ServiceCall, ServiceCtx};
use crate::util::{Bytes, Either, HashSet};
use crate::service::{Middleware, Service, ServiceCtx};
use crate::util::{Bytes, HashSet};
use crate::web::{HttpResponse, WebRequest, WebResponse};
/// `Middleware` for logging request and response info to the terminal.
@ -139,19 +138,17 @@ where
{
type Response = WebResponse;
type Error = S::Error;
type Future<'f> = Either<LoggerResponse<'f, S, E>, ServiceCall<'f, S, WebRequest<E>>> where S: 'f, E: 'f;
crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);
#[inline]
fn call<'a>(
&'a self,
async fn call(
&self,
req: WebRequest<E>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
if self.inner.exclude.contains(req.path()) {
Either::Right(ctx.call(&self.service, req))
ctx.call(&self.service, req).await
} else {
let time = time::SystemTime::now();
let mut format = self.inner.format.clone();
@ -159,56 +156,21 @@ where
for unit in &mut format.0 {
unit.render_request(time, &req);
}
Either::Left(LoggerResponse {
time,
format: Some(format),
fut: ctx.call(&self.service, req),
_t: PhantomData,
})
}
}
}
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct LoggerResponse<'f, S: Service<WebRequest<E>>, E>
where S: 'f, E: 'f
{
#[pin]
fut: ServiceCall<'f, S, WebRequest<E>>,
time: time::SystemTime,
format: Option<Format>,
_t: PhantomData<E>
}
}
impl<'f, S, E> Future for LoggerResponse<'f, S, E>
where
S: Service<WebRequest<E>, Response = WebResponse>,
{
type Output = Result<WebResponse, S::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = ready!(this.fut.poll(cx)?);
if let Some(ref mut format) = this.format {
let res = ctx.call(&self.service, req).await?;
for unit in &mut format.0 {
unit.render_response(res.response());
}
}
let time = *this.time;
let format = this.format.take();
Poll::Ready(Ok(res.map_body(move |_, body| {
ResponseBody::Other(Body::from_message(StreamLog {
body,
time,
format,
size: 0,
Ok(res.map_body(move |_, body| {
ResponseBody::Other(Body::from_message(StreamLog {
body,
time,
format: Some(format),
size: 0,
}))
}))
})))
}
}
}

View file

@ -4,11 +4,11 @@ use crate::http::Response;
use crate::router::{IntoPattern, ResourceDef};
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::dev::{AndThen, ServiceChain, ServiceChainFactory};
use crate::service::{chain_factory, ServiceCtx};
use crate::service::{chain, chain_factory, ServiceCtx};
use crate::service::{
Identity, IntoServiceFactory, Middleware, Service, ServiceCall, ServiceFactory, Stack,
Identity, IntoServiceFactory, Middleware, Service, ServiceFactory, Stack,
};
use crate::util::{BoxFuture, Either, Extensions, Ready};
use crate::util::Extensions;
use super::dev::{insert_slash, WebServiceConfig, WebServiceFactory};
use super::extract::FromRequest;
@ -24,8 +24,6 @@ type HttpNewService<Err: ErrorRenderer> =
BoxServiceFactory<(), WebRequest<Err>, WebResponse, Err::Container, ()>;
type ResourcePipeline<F, Err> =
ServiceChain<AndThen<F, ResourceRouter<Err>>, WebRequest<Err>>;
type BoxResponse<'a, Err: ErrorRenderer> =
ServiceCall<'a, HttpService<Err>, WebRequest<Err>>;
/// *Resource* is an entry in resources table which corresponds to requested URL.
///
@ -302,7 +300,7 @@ where
{
// create and configure default resource
self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory(
f.chain()
chain_factory(f.into_factory())
.map_init_err(|e| log::error!("Cannot construct default service: {:?}", e)),
)))));
@ -420,14 +418,11 @@ where
type Error = Err::Container;
type Service = M::Service;
type InitError = ();
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>>;
fn create(&self, _: ()) -> Self::Future<'_> {
Box::pin(async move {
let filter = self.filter.create(()).await?;
let routing = self.routing.create(()).await?;
Ok(self.middleware.create(filter.chain().and_then(routing)))
})
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
let filter = self.filter.create(()).await?;
let routing = self.routing.create(()).await?;
Ok(self.middleware.create(chain(filter).and_then(routing)))
}
}
@ -442,27 +437,21 @@ impl<Err: ErrorRenderer> ServiceFactory<WebRequest<Err>> for ResourceRouterFacto
type Error = Err::Container;
type InitError = ();
type Service = ResourceRouter<Err>;
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>>;
fn create(&self, _: ()) -> Self::Future<'_> {
Box::pin(async move {
let default = if let Some(ref default) = self.default {
Some(default.create(()).await?)
} else {
None
};
Ok(ResourceRouter {
default,
state: self.state.clone(),
routes: self.routes.iter().map(|route| route.service()).collect(),
})
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
let default = if let Some(ref default) = self.default {
Some(default.create(()).await?)
} else {
None
};
Ok(ResourceRouter {
default,
state: self.state.clone(),
routes: self.routes.iter().map(|route| route.service()).collect(),
})
}
}
type BoxResourceRouterResponse<'a, Err: ErrorRenderer> =
ServiceCall<'a, RouteService<Err>, WebRequest<Err>>;
pub struct ResourceRouter<Err: ErrorRenderer> {
state: Option<AppState>,
routes: Vec<RouteService<Err>>,
@ -472,31 +461,27 @@ pub struct ResourceRouter<Err: ErrorRenderer> {
impl<Err: ErrorRenderer> Service<WebRequest<Err>> for ResourceRouter<Err> {
type Response = WebResponse;
type Error = Err::Container;
type Future<'f> = Either<
BoxResourceRouterResponse<'f, Err>,
Either<Ready<WebResponse, Err::Container>, BoxResponse<'f, Err>>,
>;
fn call<'a>(
&'a self,
async fn call(
&self,
mut req: WebRequest<Err>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
for route in self.routes.iter() {
if route.check(&mut req) {
if let Some(ref state) = self.state {
req.set_state_container(state.clone());
}
return Either::Left(ctx.call(route, req));
return ctx.call(route, req).await;
}
}
if let Some(ref default) = self.default {
Either::Right(Either::Right(ctx.call(default, req)))
ctx.call(default, req).await
} else {
Either::Right(Either::Left(Ready::Ok(WebResponse::new(
Ok(WebResponse::new(
Response::MethodNotAllowed().finish(),
req.into_parts().0,
))))
))
}
}
}

View file

@ -1,6 +1,5 @@
use std::{fmt, mem, rc::Rc};
use crate::util::{BoxFuture, Ready};
use crate::{http::Method, service::Service, service::ServiceCtx, service::ServiceFactory};
use super::error::ErrorRenderer;
@ -66,10 +65,9 @@ impl<Err: ErrorRenderer> ServiceFactory<WebRequest<Err>> for Route<Err> {
type Error = Err::Container;
type InitError = ();
type Service = RouteService<Err>;
type Future<'f> = Ready<RouteService<Err>, ()>;
fn create(&self, _: ()) -> Self::Future<'_> {
Ok(self.service()).into()
async fn create(&self, _: ()) -> Result<RouteService<Err>, ()> {
Ok(self.service())
}
}
@ -102,15 +100,13 @@ impl<Err: ErrorRenderer> fmt::Debug for RouteService<Err> {
impl<Err: ErrorRenderer> Service<WebRequest<Err>> for RouteService<Err> {
type Response = WebResponse;
type Error = Err::Container;
type Future<'f> = BoxFuture<'f, Result<Self::Response, Self::Error>>;
#[inline]
fn call<'a>(
&'a self,
async fn call(
&self,
req: WebRequest<Err>,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
self.handler.call(req)
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
self.handler.call(req).await
}
}

View file

@ -1,15 +1,11 @@
use std::{
cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll,
};
use std::{cell::RefCell, fmt, rc::Rc, task::Context, task::Poll};
use crate::http::Response;
use crate::router::{IntoPattern, ResourceDef, Router};
use crate::service::boxed::{self, BoxService, BoxServiceFactory};
use crate::service::{chain_factory, dev::ServiceChainFactory, IntoServiceFactory};
use crate::service::{
Identity, Middleware, Service, ServiceCall, ServiceCtx, ServiceFactory, Stack,
};
use crate::util::{BoxFuture, Either, Extensions, Ready};
use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack};
use crate::util::Extensions;
use super::app::Filter;
use super::config::ServiceConfig;
@ -28,8 +24,6 @@ type HttpService<Err: ErrorRenderer> =
BoxService<WebRequest<Err>, WebResponse, Err::Container>;
type HttpNewService<Err: ErrorRenderer> =
BoxServiceFactory<(), WebRequest<Err>, WebResponse, Err::Container, ()>;
type BoxResponse<'a, Err: ErrorRenderer> =
ServiceCall<'a, HttpService<Err>, WebRequest<Err>>;
/// Resources scope.
///
@ -288,7 +282,7 @@ where
{
// create and configure default resource
self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory(
f.chain()
chain_factory(f.into_factory())
.map_init_err(|e| log::error!("Cannot construct default service: {:?}", e)),
)))));
@ -468,15 +462,12 @@ where
type Error = Err::Container;
type Service = M::Service;
type InitError = ();
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>>;
fn create(&self, _: ()) -> Self::Future<'_> {
Box::pin(async move {
Ok(self.middleware.create(ScopeService {
filter: self.filter.create(()).await?,
routing: self.routing.create(()).await?,
}))
})
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
Ok(self.middleware.create(ScopeService {
filter: self.filter.create(()).await?,
routing: self.routing.create(()).await?,
}))
}
}
@ -492,7 +483,6 @@ where
{
type Response = WebResponse;
type Error = Err::Container;
type Future<'f> = ScopeServiceResponse<'f, F, Err> where F: 'f;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -505,55 +495,13 @@ where
}
}
fn call<'a>(
&'a self,
async fn call(
&self,
req: WebRequest<Err>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
ScopeServiceResponse {
filter: ctx.call(&self.filter, req),
routing: &self.routing,
endpoint: None,
ctx,
}
}
}
pin_project_lite::pin_project! {
pub struct ScopeServiceResponse<'f, F: Service<WebRequest<Err>>, Err: ErrorRenderer>
where F: 'f
{
#[pin]
filter: ServiceCall<'f, F, WebRequest<Err>>,
routing: &'f ScopeRouter<Err>,
ctx: ServiceCtx<'f, ScopeService<F, Err>>,
endpoint: Option<ServiceCall<'f, ScopeRouter<Err>, WebRequest<Err>>>,
}
}
impl<'f, F, Err> Future for ScopeServiceResponse<'f, F, Err>
where
F: Service<WebRequest<Err>, Response = WebRequest<Err>, Error = Err::Container>,
Err: ErrorRenderer,
{
type Output = Result<WebResponse, Err::Container>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
loop {
if let Some(fut) = this.endpoint.as_mut() {
return Pin::new(fut).poll(cx);
}
let res = if let Poll::Ready(res) = this.filter.poll(cx) {
res?
} else {
return Poll::Pending;
};
*this.endpoint = Some(this.ctx.call(this.routing, res));
this = self.as_mut().project();
}
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let req = ctx.call(&self.filter, req).await?;
ctx.call(&self.routing, req).await
}
}
@ -569,31 +517,28 @@ impl<Err: ErrorRenderer> ServiceFactory<WebRequest<Err>> for ScopeRouterFactory<
type Error = Err::Container;
type InitError = ();
type Service = ScopeRouter<Err>;
type Future<'f> = BoxFuture<'f, Result<Self::Service, Self::InitError>>;
fn create(&self, _: ()) -> Self::Future<'_> {
Box::pin(async move {
// create http services
let mut router = Router::build();
if self.case_insensitive {
router.case_insensitive();
}
for (path, factory, guards) in &mut self.services.iter() {
let service = factory.create(()).await?;
router.rdef(path.clone(), service).2 = guards.borrow_mut().take();
}
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
// create http services
let mut router = Router::build();
if self.case_insensitive {
router.case_insensitive();
}
for (path, factory, guards) in &mut self.services.iter() {
let service = factory.create(()).await?;
router.rdef(path.clone(), service).2 = guards.borrow_mut().take();
}
let default = if let Some(ref default) = self.default {
Some(default.create(()).await?)
} else {
None
};
let default = if let Some(ref default) = self.default {
Some(default.create(()).await?)
} else {
None
};
Ok(ScopeRouter {
default,
router: router.finish(),
state: self.state.clone(),
})
Ok(ScopeRouter {
default,
router: router.finish(),
state: self.state.clone(),
})
}
}
@ -607,13 +552,12 @@ struct ScopeRouter<Err: ErrorRenderer> {
impl<Err: ErrorRenderer> Service<WebRequest<Err>> for ScopeRouter<Err> {
type Response = WebResponse;
type Error = Err::Container;
type Future<'f> = Either<BoxResponse<'f, Err>, Ready<Self::Response, Self::Error>>;
fn call<'a>(
&'a self,
async fn call(
&self,
mut req: WebRequest<Err>,
ctx: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let res = self.router.recognize_checked(&mut req, |req, guards| {
if let Some(guards) = guards {
for f in guards {
@ -629,15 +573,12 @@ impl<Err: ErrorRenderer> Service<WebRequest<Err>> for ScopeRouter<Err> {
if let Some(ref state) = self.state {
req.set_state_container(state.clone());
}
Either::Left(ctx.call(srv, req))
ctx.call(srv, req).await
} else if let Some(ref default) = self.default {
Either::Left(ctx.call(default, req))
ctx.call(default, req).await
} else {
let req = req.into_parts().0;
Either::Right(Ready::Ok(WebResponse::new(
Response::NotFound().finish(),
req,
)))
Ok(WebResponse::new(Response::NotFound().finish(), req))
}
}
}

View file

@ -13,7 +13,3 @@ pub use self::path::Path;
pub use self::payload::{Payload, PayloadConfig};
pub use self::query::Query;
pub use self::state::State;
#[deprecated]
#[doc(hidden)]
pub type Data<T> = State<T>;

View file

@ -5,7 +5,7 @@ pub use crate::ws::{CloseCode, CloseReason, Frame, Message, WsSink};
use crate::http::{body::BodySize, h1, StatusCode};
use crate::service::{
apply_fn, fn_factory_with_config, IntoServiceFactory, ServiceFactory,
apply_fn, chain_factory, fn_factory_with_config, IntoServiceFactory, ServiceFactory,
};
use crate::web::{HttpRequest, HttpResponse};
use crate::ws::{self, error::HandshakeError, error::WsError, handshake};
@ -19,7 +19,7 @@ where
F: IntoServiceFactory<T, Frame, WsSink>,
Err: From<T::InitError> + From<HandshakeError>,
{
let inner_factory = Rc::new(factory.chain().map_err(WsError::Service));
let inner_factory = Rc::new(chain_factory(factory).map_err(WsError::Service));
let factory = fn_factory_with_config(move |sink: WsSink| {
let factory = inner_factory.clone();
@ -105,7 +105,7 @@ where
// start websockets service dispatcher
rt::spawn(async move {
let res = crate::io::Dispatcher::with_config(io, codec, srv, &cfg).await;
let res = crate::io::Dispatcher::new(io, codec, srv, &cfg).await;
log::trace!("Ws handler is terminated: {:?}", res);
});

View file

@ -757,7 +757,7 @@ impl WsConnection<Sealed> {
U: IntoService<T, ws::Frame>,
{
let service = apply_fn(
service.into_chain().map_err(WsError::Service),
service.into_service().map_err(WsError::Service),
|req, svc| async move {
match req {
DispatchItem::<ws::Codec>::Item(item) => svc.call(item).await,
@ -773,7 +773,7 @@ impl WsConnection<Sealed> {
},
);
Dispatcher::with_config(self.io, self.codec, service, &self.config).await
Dispatcher::new(self.io, self.codec, service, &self.config).await
}
}

View file

@ -5,8 +5,8 @@ use ntex::http::test::server as test_server;
use ntex::http::{body, h1, test, HttpService, Request, Response, StatusCode};
use ntex::io::{DispatchItem, Dispatcher, Io};
use ntex::service::{fn_factory, Service, ServiceCtx};
use ntex::time::{sleep, Millis, Seconds};
use ntex::util::{BoxFuture, ByteString, Bytes, Ready};
use ntex::time::Seconds;
use ntex::util::{ByteString, Bytes, Ready};
use ntex::ws::{self, handshake, handshake_response};
struct WsService(Arc<Mutex<Cell<bool>>>);
@ -34,39 +34,28 @@ impl Clone for WsService {
impl Service<(Request, Io, h1::Codec)> for WsService {
type Response = ();
type Error = io::Error;
type Future<'f> = BoxFuture<'f, Result<(), io::Error>>;
fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.set_polled();
Poll::Ready(Ok(()))
}
fn call<'a>(
&'a self,
async fn call(
&self,
(req, io, codec): (Request, Io, h1::Codec),
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
let fut = async move {
let res = handshake(req.head()).unwrap().message_body(());
_: ServiceCtx<'_, Self>,
) -> Result<(), io::Error> {
let res = handshake(req.head()).unwrap().message_body(());
io.encode((res, body::BodySize::None).into(), &codec)
.unwrap();
io.encode((res, body::BodySize::None).into(), &codec)
.unwrap();
let cfg = ntex_io::DispatcherConfig::default();
cfg.set_keepalive_timeout(Seconds(0));
let cfg = ntex_io::DispatcherConfig::default();
cfg.set_keepalive_timeout(Seconds(0));
Dispatcher::with_config(
io.seal(),
ws::Codec::new(),
service,
//&Default::default(),
&cfg,
)
Dispatcher::new(io.seal(), ws::Codec::new(), service, &cfg)
.await
.map_err(|_| panic!())
};
Box::pin(fut)
}
}

View file

@ -40,7 +40,7 @@ async fn test_simple() {
.unwrap();
// start websocket service
Dispatcher::with_config(
Dispatcher::new(
io.seal(),
ws::Codec::default(),
ws_service,
@ -96,7 +96,7 @@ async fn test_transport() {
.unwrap();
// start websocket service
Dispatcher::with_config(
Dispatcher::new(
io.seal(),
ws::Codec::default(),
ws_service,
@ -133,13 +133,7 @@ async fn test_keepalive_timeout() {
// start websocket service
let cfg = DispatcherConfig::default();
cfg.set_keepalive_timeout(Seconds::ZERO);
Dispatcher::with_config(
io.seal(),
ws::Codec::default(),
ws_service,
&cfg,
)
.await
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service, &cfg).await
}
})
.finish(|_| Ready::Ok::<_, io::Error>(Response::NotFound()))