From 6382ef6b40f9426323c3c85a4467db9379c9758a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 22 Jun 2023 18:39:09 +0600 Subject: [PATCH] Refactor service pipelines (#214) --- ntex-async-std/Cargo.toml | 6 +- ntex-bytes/Cargo.toml | 2 +- ntex-connect/Cargo.toml | 18 +- ntex-connect/src/openssl.rs | 6 +- ntex-connect/src/resolve.rs | 2 +- ntex-connect/src/rustls.rs | 6 +- ntex-glommio/Cargo.toml | 6 +- ntex-io/Cargo.toml | 8 +- ntex-io/src/dispatcher.rs | 14 +- ntex-io/src/utils.rs | 15 +- ntex-macros/Cargo.toml | 2 +- ntex-service/Cargo.toml | 6 +- ntex-service/src/and_then.rs | 14 +- ntex-service/src/apply.rs | 22 +- ntex-service/src/boxed.rs | 2 +- ntex-service/src/chain.rs | 291 ++++++++++++++ ntex-service/src/ctx.rs | 248 ++---------- ntex-service/src/fn_service.rs | 14 +- ntex-service/src/fn_shutdown.rs | 4 +- ntex-service/src/lib.rs | 67 +++- ntex-service/src/map.rs | 12 +- ntex-service/src/map_err.rs | 12 +- ntex-service/src/map_init_err.rs | 4 +- ntex-service/src/middleware.rs | 8 +- ntex-service/src/pipeline.rs | 444 +++++++++------------- ntex-service/src/then.rs | 13 +- ntex-tls/Cargo.toml | 14 +- ntex-tls/examples/rustls-server.rs | 4 +- ntex-tls/examples/server.rs | 10 +- ntex-tokio/Cargo.toml | 6 +- ntex-util/Cargo.toml | 6 +- ntex-util/src/services/buffer.rs | 8 +- ntex-util/src/services/inflight.rs | 6 +- ntex-util/src/services/keepalive.rs | 2 +- ntex-util/src/services/onerequest.rs | 6 +- ntex-util/src/services/timeout.rs | 13 +- ntex-util/src/services/variant.rs | 2 +- ntex/Cargo.toml | 24 +- ntex/src/http/client/connect.rs | 4 +- ntex/src/http/client/connector.rs | 15 +- ntex/src/http/client/pool.rs | 14 +- ntex/src/http/config.rs | 10 +- ntex/src/http/h1/dispatcher.rs | 20 +- ntex/src/http/h1/service.rs | 30 +- ntex/src/http/h2/service.rs | 30 +- ntex/src/http/service.rs | 30 +- ntex/src/lib.rs | 4 +- ntex/src/server/worker.rs | 6 +- ntex/src/web/app.rs | 20 +- ntex/src/web/app_service.rs | 6 +- ntex/src/web/middleware/defaultheaders.rs | 8 +- ntex/src/web/middleware/logger.rs | 4 +- ntex/src/web/resource.rs | 20 +- ntex/src/web/scope.rs | 8 +- ntex/src/web/test.rs | 12 +- ntex/src/web/ws.rs | 2 +- ntex/src/ws/client.rs | 6 +- ntex/tests/connect.rs | 24 +- ntex/tests/http_awc_client.rs | 12 +- ntex/tests/http_awc_openssl_client.rs | 4 +- ntex/tests/http_awc_rustls_client.rs | 4 +- 61 files changed, 848 insertions(+), 792 deletions(-) create mode 100644 ntex-service/src/chain.rs diff --git a/ntex-async-std/Cargo.toml b/ntex-async-std/Cargo.toml index b3ea1c14..6a3844b1 100644 --- a/ntex-async-std/Cargo.toml +++ b/ntex-async-std/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-async-std" -version = "0.3.0-beta.0" +version = "0.3.0" authors = ["ntex contributors "] description = "async-std intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.3.0-beta.0" -ntex-util = "0.3.0-beta.0" +ntex-io = "0.3.0" +ntex-util = "0.3.0" async-oneshot = "0.5.0" log = "0.4" pin-project-lite = "0.2" diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index 21a6729a..09e45878 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -27,4 +27,4 @@ simdutf8 = { version = "0.1.4", optional = true } [dev-dependencies] serde_test = "1.0" serde_json = "1.0" -ntex = { version = "0.7.0-beta.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } diff --git a/ntex-connect/Cargo.toml b/ntex-connect/Cargo.toml index b243d51f..0e3af423 100644 --- a/ntex-connect/Cargo.toml +++ b/ntex-connect/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-connect" -version = "0.3.0-beta.1" +version = "0.3.0" authors = ["ntex contributors "] description = "ntexwork connect utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -34,19 +34,19 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"] async-std = ["ntex-rt/async-std", "ntex-async-std"] [dependencies] -ntex-service = "1.2.0-beta.1" +ntex-service = "1.2.0" ntex-bytes = "0.1.19" ntex-http = "0.1.8" -ntex-io = "0.3.0-beta.1" +ntex-io = "0.3.0" ntex-rt = "0.4.7" -ntex-tls = "0.3.0-beta.1" -ntex-util = "0.3.0-beta.1" +ntex-tls = "0.3.0" +ntex-util = "0.3.0" log = "0.4" thiserror = "1.0" -ntex-tokio = { version = "0.3.0-beta.0", optional = true } -ntex-glommio = { version = "0.3.0-beta.0", optional = true } -ntex-async-std = { version = "0.3.0-beta.0", optional = true } +ntex-tokio = { version = "0.3.0", optional = true } +ntex-glommio = { version = "0.3.0", optional = true } +ntex-async-std = { version = "0.3.0", optional = true } # openssl tls-openssl = { version="0.10", package = "openssl", optional = true } @@ -58,4 +58,4 @@ webpki-roots = { version = "0.23", optional = true } [dev-dependencies] rand = "0.8" env_logger = "0.10" -ntex = { version = "0.7.0-beta.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } diff --git a/ntex-connect/src/openssl.rs b/ntex-connect/src/openssl.rs index 58ed7ca0..03f385f8 100644 --- a/ntex-connect/src/openssl.rs +++ b/ntex-connect/src/openssl.rs @@ -5,14 +5,14 @@ pub use tls_openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslM use ntex_bytes::PoolId; use ntex_io::{FilterFactory, Io, Layer}; -use ntex_service::{Container, Service, ServiceCtx, ServiceFactory}; +use ntex_service::{Pipeline, Service, ServiceCtx, ServiceFactory}; use ntex_tls::openssl::SslConnector as IoSslConnector; use ntex_util::future::{BoxFuture, Ready}; use super::{Address, Connect, ConnectError, Connector as BaseConnector}; pub struct Connector { - connector: Container>, + connector: Pipeline>, openssl: SslConnector, } @@ -126,7 +126,7 @@ mod tests { let ssl = SslConnector::builder(SslMethod::tls()).unwrap(); let factory = Connector::new(ssl.build()).memory_pool(PoolId::P5).clone(); - let srv = factory.container(&()).await.unwrap(); + let srv = factory.pipeline(&()).await.unwrap(); let result = srv .call(Connect::new("").set_addr(Some(server.addr()))) .await; diff --git a/ntex-connect/src/resolve.rs b/ntex-connect/src/resolve.rs index acef3418..9635cdfe 100644 --- a/ntex-connect/src/resolve.rs +++ b/ntex-connect/src/resolve.rs @@ -129,7 +129,7 @@ mod tests { async fn resolver() { let resolver = Resolver::default().clone(); assert!(format!("{:?}", resolver).contains("Resolver")); - let srv = resolver.container(()).await.unwrap(); + let srv = resolver.pipeline(()).await.unwrap(); assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready()); let res = srv.call(Connect::new("www.rust-lang.org")).await; diff --git a/ntex-connect/src/rustls.rs b/ntex-connect/src/rustls.rs index b13125d7..fc6dbe6e 100644 --- a/ntex-connect/src/rustls.rs +++ b/ntex-connect/src/rustls.rs @@ -5,7 +5,7 @@ pub use tls_rustls::{ClientConfig, ServerName}; use ntex_bytes::PoolId; use ntex_io::{FilterFactory, Io, Layer}; -use ntex_service::{Container, Service, ServiceCtx, ServiceFactory}; +use ntex_service::{Pipeline, Service, ServiceCtx, ServiceFactory}; use ntex_tls::rustls::TlsConnector; use ntex_util::future::{BoxFuture, Ready}; @@ -13,7 +13,7 @@ use super::{Address, Connect, ConnectError, Connector as BaseConnector}; /// Rustls connector factory pub struct Connector { - connector: Container>, + connector: Pipeline>, inner: TlsConnector, } @@ -149,7 +149,7 @@ mod tests { .memory_pool(PoolId::P5) .clone(); - let srv = factory.container(&()).await.unwrap(); + let srv = factory.pipeline(&()).await.unwrap(); // always ready assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready()); let result = srv diff --git a/ntex-glommio/Cargo.toml b/ntex-glommio/Cargo.toml index b59d87df..334b255d 100644 --- a/ntex-glommio/Cargo.toml +++ b/ntex-glommio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-glommio" -version = "0.3.0-beta.0" +version = "0.3.0" authors = ["ntex contributors "] description = "glommio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.3.0-beta.0" -ntex-util = "0.3.0-beta.0" +ntex-io = "0.3.0" +ntex-util = "0.3.0" async-oneshot = "0.5.0" futures-lite = "1.12" log = "0.4" diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 3ac98f42..559d44da 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.0-beta.2" +version = "0.3.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.2" ntex-bytes = "0.1.19" -ntex-util = "0.3.0-beta.1" -ntex-service = "1.2.0-beta.3" +ntex-util = "0.3.0" +ntex-service = "1.2.0" bitflags = "1.3" log = "0.4" @@ -29,4 +29,4 @@ pin-project-lite = "0.2" rand = "0.8" env_logger = "0.10" -ntex = { version = "0.7.0-beta.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 10d26148..b4b67165 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -3,7 +3,7 @@ use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time} use ntex_bytes::Pool; use ntex_codec::{Decoder, Encoder}; -use ntex_service::{Container, IntoService, Service}; +use ntex_service::{IntoService, Pipeline, Service}; use ntex_util::time::Seconds; use ntex_util::{future::Either, ready, spawn}; @@ -51,7 +51,7 @@ where { io: IoBoxed, codec: U, - service: Container, + service: Pipeline, error: Cell::Error>>>, inflight: Cell, } @@ -107,7 +107,7 @@ where codec, error: Cell::new(None), inflight: Cell::new(0), - service: Container::new(service.into_service()), + service: Pipeline::new(service.into_service()), }); Dispatcher { @@ -250,7 +250,7 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); - let fut = shared.service.container_call(item).into_static(); + let fut = shared.service.call(item).into_static(); spawn(async move { let result = fut.await; shared.handle_result(result, &shared.io); @@ -276,7 +276,7 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); - let fut = shared.service.container_call(item).into_static(); + let fut = shared.service.call(item).into_static(); spawn(async move { let result = fut.await; shared.handle_result(result, &shared.io); @@ -342,7 +342,7 @@ where { fn poll_service( &self, - srv: &Container, + srv: &Pipeline, cx: &mut Context<'_>, io: &IoBoxed, ) -> Poll> { @@ -478,7 +478,7 @@ mod tests { io: state.into(), error: Cell::new(None), inflight: Cell::new(0), - service: Container::new(service.into_service()), + service: Pipeline::new(service.into_service()), }); ( diff --git a/ntex-io/src/utils.rs b/ntex-io/src/utils.rs index 042b1d55..4b5c2654 100644 --- a/ntex-io/src/utils.rs +++ b/ntex-io/src/utils.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use ntex_service::{fn_service, pipeline_factory, Service, ServiceCtx, ServiceFactory}; +use ntex_service::{chain_factory, fn_service, Service, ServiceCtx, ServiceFactory}; use ntex_util::future::Ready; use crate::{Filter, FilterFactory, Io, IoBoxed, Layer}; @@ -20,10 +20,9 @@ where S: ServiceFactory, C: Clone, { - pipeline_factory( - fn_service(|io: Io| Ready::Ok(IoBoxed::from(io))).map_init_err(|_| panic!()), - ) - .and_then(srv) + chain_factory(fn_service(|io: Io| Ready::Ok(IoBoxed::from(io)))) + .map_init_err(|_| panic!()) + .and_then(srv) } /// Create filter factory service @@ -106,7 +105,7 @@ mod tests { .unwrap(); Ok::<_, ()>(()) })) - .container(()) + .pipeline(()) .await .unwrap(); let _ = svc.call(Io::new(server)).await; @@ -143,7 +142,7 @@ mod tests { #[ntex::test] async fn test_utils_filter() { let (_, server) = IoTest::create(); - let svc = pipeline_factory( + let svc = chain_factory( filter::<_, crate::filter::Base>(TestFilterFactory) .map_err(|_| ()) .map_init_err(|_| ()), @@ -152,7 +151,7 @@ mod tests { let _ = io.recv(&BytesCodec).await; Ok::<_, ()>(()) }))) - .container(()) + .pipeline(()) .await .unwrap(); let _ = svc.call(Io::new(server)).await; diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml index e1894478..5811cd08 100644 --- a/ntex-macros/Cargo.toml +++ b/ntex-macros/Cargo.toml @@ -16,6 +16,6 @@ syn = { version = "^1", features = ["full", "parsing"] } proc-macro2 = "^1" [dev-dependencies] -ntex = { version = "0.7.0-beta.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } futures = "0.3" env_logger = "0.10" \ No newline at end of file diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index b0d5edf2..9df4ea42 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "1.2.0-beta.4" +version = "1.2.0" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] @@ -20,5 +20,5 @@ pin-project-lite = "0.2.6" slab = "0.4" [dev-dependencies] -ntex = { version = "0.7.0-beta.1", features = ["tokio"] } -ntex-util = "0.3.0-beta.1" +ntex = { version = "0.7.0", features = ["tokio"] } +ntex-util = "0.3.0" diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index c590ef51..910472c1 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -234,9 +234,7 @@ where mod tests { use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; - use crate::{ - fn_factory, pipeline, pipeline_factory, Service, ServiceCtx, ServiceFactory, - }; + use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx}; use ntex_util::future::{lazy, Ready}; #[derive(Clone)] @@ -286,9 +284,7 @@ mod tests { #[ntex::test] async fn test_poll_ready() { let cnt = Rc::new(Cell::new(0)); - let srv = pipeline(Srv1(cnt.clone())) - .and_then(Srv2(cnt.clone())) - .clone(); + let srv = chain(Srv1(cnt.clone())).and_then(Srv2(cnt.clone())).clone(); let res = lazy(|cx| srv.poll_ready(cx)).await; assert_eq!(res, Poll::Ready(Ok(()))); assert_eq!(cnt.get(), 2); @@ -299,7 +295,7 @@ mod tests { #[ntex::test] async fn test_call() { let cnt = Rc::new(Cell::new(0)); - let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt)).container(); + let srv = chain(Srv1(cnt.clone())).and_then(Srv2(cnt)).pipeline(); let res = srv.call("srv1").await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "srv2")); @@ -309,13 +305,13 @@ mod tests { async fn test_factory() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); - let new_srv = pipeline_factory(fn_factory(move || { + let new_srv = chain_factory(fn_factory(move || { Ready::from(Ok::<_, ()>(Srv1(cnt2.clone()))) })) .and_then(move || Ready::from(Ok(Srv2(cnt.clone())))) .clone(); - let srv = new_srv.container(&()).await.unwrap(); + let srv = new_srv.pipeline(&()).await.unwrap(); let res = srv.call("srv1").await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "srv2")); diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 4bb95ac2..5a110970 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -1,8 +1,8 @@ #![allow(clippy::type_complexity)] use std::{future::Future, marker, pin::Pin, task, task::Poll}; -use super::ctx::{Container, ServiceCall, ServiceCtx}; -use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; +use super::ctx::{ServiceCall, ServiceCtx}; +use super::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory}; /// Apply transform function to a service. pub fn apply_fn( @@ -17,7 +17,7 @@ where { Apply { f, - service: Container::new(service.into_service()), + service: Pipeline::new(service.into_service()), r: marker::PhantomData, } } @@ -41,13 +41,13 @@ pub struct Apply where T: Service, { - service: Container, + service: Pipeline, f: F, r: marker::PhantomData (In, Out, R)>, } pub struct ApplyService { - service: Container, + service: Pipeline, } impl ApplyService { @@ -56,7 +56,7 @@ impl ApplyService { where S: Service, { - self.service.call(req) + self.service.service_call(req) } } @@ -212,7 +212,7 @@ mod tests { use std::task::Poll; use super::*; - use crate::{pipeline, pipeline_factory, Service, ServiceCtx, ServiceFactory}; + use crate::{chain, chain_factory, Service, ServiceCtx}; #[derive(Clone)] struct Srv; @@ -229,14 +229,14 @@ mod tests { #[ntex::test] async fn test_call() { - let srv = pipeline( + let srv = chain( apply_fn(Srv, |req: &'static str, svc| async move { svc.call(()).await.unwrap(); Ok((req, ())) }) .clone(), ) - .container(); + .pipeline(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let res = lazy(|cx| srv.poll_shutdown(cx)).await; @@ -249,7 +249,7 @@ mod tests { #[ntex::test] async fn test_create() { - let new_srv = pipeline_factory( + let new_srv = chain_factory( apply_fn_factory( || Ready::<_, ()>::Ok(Srv), |req: &'static str, srv| async move { @@ -260,7 +260,7 @@ mod tests { .clone(), ); - let srv = new_srv.container(&()).await.unwrap(); + let srv = new_srv.pipeline(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index 06c0199c..002b03fe 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -73,7 +73,7 @@ where idx: usize, waiters: &'a WaitersRef, ) -> BoxFuture<'a, Self::Response, Self::Error> { - Box::pin(ServiceCtx::<'a, S>::new(idx, waiters).call_nowait(self, req)) + Box::pin(ServiceCtx::<'a, S>::from_ref(idx, waiters).call_nowait(self, req)) } } diff --git a/ntex-service/src/chain.rs b/ntex-service/src/chain.rs new file mode 100644 index 00000000..79155100 --- /dev/null +++ b/ntex-service/src/chain.rs @@ -0,0 +1,291 @@ +use std::marker::PhantomData; + +use crate::and_then::{AndThen, AndThenFactory}; +use crate::ctx::{ServiceCall, ServiceCtx}; +use crate::map::{Map, MapFactory}; +use crate::map_err::{MapErr, MapErrFactory}; +use crate::map_init_err::MapInitErr; +use crate::middleware::{ApplyMiddleware, Middleware}; +use crate::pipeline::CreatePipeline; +use crate::then::{Then, ThenFactory}; +use crate::{IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory}; + +/// Constructs new pipeline with one service in pipeline chain. +pub fn chain(service: F) -> ServiceChain +where + Svc: Service, + F: IntoService, +{ + ServiceChain { + service: service.into_service(), + _t: PhantomData, + } +} + +/// Constructs new pipeline factory with one service factory. +pub fn chain_factory(factory: F) -> ServiceChainFactory +where + T: ServiceFactory, + F: IntoServiceFactory, +{ + ServiceChainFactory { + factory: factory.into_factory(), + _t: PhantomData, + } +} + +/// Pipeline builder - pipeline allows to compose multiple service into one service. +pub struct ServiceChain { + service: Svc, + _t: PhantomData, +} + +impl, Req> ServiceChain { + /// Call another service after call to this one has resolved successfully. + /// + /// This function can be used to chain two services together and ensure that + /// the second service isn't called until call to the fist service have + /// finished. Result of the call to the first service is used as an + /// input parameter for the second service's call. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it. + pub fn and_then(self, service: F) -> ServiceChain, Req> + where + Self: Sized, + F: IntoService, + Next: Service, + { + ServiceChain { + service: AndThen::new(self.service, service.into_service()), + _t: PhantomData, + } + } + + /// Chain on a computation for when a call to the service finished, + /// passing the result of the call to the next service `U`. + /// + /// Note that this function consumes the receiving pipeline and returns a + /// wrapped version of it. + pub fn then(self, service: F) -> ServiceChain, Req> + where + Self: Sized, + F: IntoService>, + Next: Service, Error = Svc::Error>, + { + ServiceChain { + service: Then::new(self.service, service.into_service()), + _t: PhantomData, + } + } + + /// Map this service's output to a different type, returning a new service + /// of the resulting type. + /// + /// This function is similar to the `Option::map` or `Iterator::map` where + /// it will change the type of the underlying service. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + pub fn map(self, f: F) -> ServiceChain, Req> + where + Self: Sized, + F: Fn(Svc::Response) -> Res, + { + ServiceChain { + service: Map::new(self.service, f), + _t: PhantomData, + } + } + + /// Map this service's error to a different error, returning a new service. + /// + /// This function is similar to the `Result::map_err` where it will change + /// the error type of the underlying service. This is useful for example to + /// ensure that services have the same error type. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it. + pub fn map_err(self, f: F) -> ServiceChain, Req> + where + Self: Sized, + F: Fn(Svc::Error) -> Err, + { + ServiceChain { + service: MapErr::new(self.service, f), + _t: PhantomData, + } + } + + /// Create service pipeline + pub fn pipeline(self) -> Pipeline { + Pipeline::new(self.service) + } +} + +impl Clone for ServiceChain +where + Svc: Clone, +{ + fn clone(&self) -> Self { + ServiceChain { + service: self.service.clone(), + _t: PhantomData, + } + } +} + +impl, Req> Service for ServiceChain { + type Response = Svc::Response; + type Error = Svc::Error; + type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f; + + crate::forward_poll_ready!(service); + crate::forward_poll_shutdown!(service); + + #[inline] + fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { + ctx.call(&self.service, req) + } +} + +/// Service factory builder +pub struct ServiceChainFactory { + factory: T, + _t: PhantomData<(Req, C)>, +} + +impl, Req, C> ServiceChainFactory { + /// Call another service after call to this one has resolved successfully. + pub fn and_then( + self, + factory: F, + ) -> ServiceChainFactory, Req, C> + where + Self: Sized, + F: IntoServiceFactory, + U: ServiceFactory, + { + ServiceChainFactory { + factory: AndThenFactory::new(self.factory, factory.into_factory()), + _t: PhantomData, + } + } + + /// Apply middleware to current service factory. + /// + /// Short version of `apply(middleware, pipeline_factory(...))` + pub fn apply(self, tr: U) -> ServiceChainFactory, Req, C> + where + U: Middleware, + { + ServiceChainFactory { + factory: ApplyMiddleware::new(tr, self.factory), + _t: PhantomData, + } + } + + /// Create `NewService` to chain on a computation for when a call to the + /// service finished, passing the result of the call to the next + /// service `U`. + /// + /// Note that this function consumes the receiving pipeline and returns a + /// wrapped version of it. + pub fn then(self, factory: F) -> ServiceChainFactory, Req, C> + where + Self: Sized, + C: Clone, + F: IntoServiceFactory, C>, + U: ServiceFactory< + Result, + C, + Error = T::Error, + InitError = T::InitError, + >, + { + ServiceChainFactory { + factory: ThenFactory::new(self.factory, factory.into_factory()), + _t: PhantomData, + } + } + + /// Map this service's output to a different type, returning a new service + /// of the resulting type. + pub fn map( + self, + f: F, + ) -> ServiceChainFactory, Req, C> + where + Self: Sized, + F: Fn(T::Response) -> Res + Clone, + { + ServiceChainFactory { + factory: MapFactory::new(self.factory, f), + _t: PhantomData, + } + } + + /// Map this service's error to a different error, returning a new service. + pub fn map_err( + self, + f: F, + ) -> ServiceChainFactory, Req, C> + where + Self: Sized, + F: Fn(T::Error) -> E + Clone, + { + ServiceChainFactory { + factory: MapErrFactory::new(self.factory, f), + _t: PhantomData, + } + } + + /// Map this factory's init error to a different error, returning a new service. + pub fn map_init_err( + self, + f: F, + ) -> ServiceChainFactory, Req, C> + where + Self: Sized, + F: Fn(T::InitError) -> E + Clone, + { + ServiceChainFactory { + factory: MapInitErr::new(self.factory, f), + _t: PhantomData, + } + } + + /// Create and return a new service value asynchronously and wrap into a container + pub fn pipeline(&self, cfg: C) -> CreatePipeline<'_, T, Req, C> + where + Self: Sized, + { + CreatePipeline::new(self.factory.create(cfg)) + } +} + +impl Clone for ServiceChainFactory +where + T: Clone, +{ + fn clone(&self) -> Self { + ServiceChainFactory { + factory: self.factory.clone(), + _t: PhantomData, + } + } +} + +impl, R, C> ServiceFactory for ServiceChainFactory { + type Response = T::Response; + type Error = T::Error; + type Service = T::Service; + type InitError = T::InitError; + type Future<'f> = T::Future<'f> where Self: 'f; + + #[inline] + fn create(&self, cfg: C) -> Self::Future<'_> { + self.factory.create(cfg) + } +} diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 50dc2248..f0395dc9 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,15 +1,6 @@ -use std::{cell::Cell, cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task}; +use std::{cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task}; -use crate::{Service, ServiceFactory}; - -/// Container for a service. -/// -/// Container allows to call enclosed service and adds support of shared readiness. -pub struct Container { - svc: Rc, - waiters: Waiters, - pending: Cell, -} +use crate::Service; pub struct ServiceCtx<'a, S: ?Sized> { idx: usize, @@ -53,11 +44,24 @@ impl WaitersRef { } impl Waiters { - fn register(&self, cx: &mut task::Context<'_>) { + pub(crate) fn new() -> Self { + let mut waiters = slab::Slab::new(); + let index = waiters.insert(None); + Waiters { + index, + waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))), + } + } + + pub(crate) fn get_ref(&self) -> &WaitersRef { + self.waiters.as_ref() + } + + pub(crate) fn register(&self, cx: &mut task::Context<'_>) { self.waiters.register(self.index, cx) } - fn notify(&self) { + pub(crate) fn notify(&self) { self.waiters.notify() } } @@ -78,132 +82,16 @@ impl Drop for Waiters { } } -impl Container { - #[inline] - /// Construct new container instance. - pub fn new(svc: S) -> Self { - let mut waiters = slab::Slab::new(); - let index = waiters.insert(None); - Container { - svc: Rc::new(svc), - pending: Cell::new(false), - waiters: Waiters { - index, - waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))), - }, - } - } - - #[inline] - /// Return reference to enclosed service - pub fn get_ref(&self) -> &S { - self.svc.as_ref() - } - - #[inline] - /// Returns `Ready` when the service is able to process requests. - pub fn poll_ready( - &self, - cx: &mut task::Context<'_>, - ) -> task::Poll> - where - S: Service, - { - let res = self.svc.poll_ready(cx); - if res.is_pending() { - self.pending.set(true); - self.waiters.register(cx) - } else if self.pending.get() { - self.pending.set(false); - self.waiters.notify() - } - res - } - - #[inline] - /// Shutdown enclosed service. - pub fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> task::Poll<()> - where - S: Service, - { - self.svc.poll_shutdown(cx) - } - - #[inline] - /// Wait for service readiness and then create future object - /// that resolves to service result. - pub fn call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R> - where - S: Service, - { - let ctx = ServiceCtx::<'a, S> { - idx: self.waiters.index, - waiters: self.waiters.waiters.as_ref(), - _t: marker::PhantomData, - }; - ctx.call(self.svc.as_ref(), req) - } - - #[inline] - /// Call service and create future object that resolves to service result. - /// - /// Note, this call does not check service readiness. - pub fn container_call(&self, req: R) -> ContainerCall<'_, S, R> - where - S: Service, - { - let container = self.clone(); - let svc_call = container.svc.call( - req, - ServiceCtx { - idx: container.waiters.index, - waiters: container.waiters.waiters.as_ref(), - _t: marker::PhantomData, - }, - ); - - // SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc` - // Container::svc is heap allocated(Rc), we keep it alive until - // `svc_call` get resolved to result - let fut = unsafe { std::mem::transmute(svc_call) }; - ContainerCall { fut, container } - } - - pub(crate) fn create, R, C>( - f: &F, - cfg: C, - ) -> ContainerFactory<'_, F, R, C> { - ContainerFactory { fut: f.create(cfg) } - } - - /// Extract service if container hadnt been cloned before. - pub fn into_service(self) -> Option { - let svc = self.svc.clone(); - drop(self); - Rc::try_unwrap(svc).ok() - } -} - -impl From for Container { - #[inline] - fn from(svc: S) -> Self { - Container::new(svc) - } -} - -impl Clone for Container { - #[inline] - fn clone(&self) -> Self { - Self { - svc: self.svc.clone(), - pending: Cell::new(false), - waiters: self.waiters.clone(), - } - } -} - impl<'a, S: ?Sized> ServiceCtx<'a, S> { - pub(crate) fn new(idx: usize, waiters: &'a WaitersRef) -> Self { + pub(crate) fn new(waiters: &'a Waiters) -> Self { + Self { + idx: waiters.index, + waiters: waiters.get_ref(), + _t: marker::PhantomData, + } + } + + pub(crate) fn from_ref(idx: usize, waiters: &'a WaitersRef) -> Self { Self { idx, waiters, @@ -264,54 +152,6 @@ impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> { } } -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct ContainerCall<'f, S, R> - where - S: Service, - S: 'f, - R: 'f, - { - #[pin] - fut: S::Future<'f>, - container: Container, - } -} - -impl<'f, S, R> ContainerCall<'f, S, R> -where - S: Service + 'f, - R: 'f, -{ - #[inline] - /// Convert future object to static version. - /// - /// Returned future is suitable for spawning into a async runtime. - /// Note, this call does not check service readiness. - pub fn into_static(self) -> ContainerCall<'static, S, R> { - let svc_call = self.fut; - let container = self.container; - - // SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc` - // Container::svc is heap allocated(Rc), we keep it alive until - // `svc_call` get resolved to result - let fut = unsafe { std::mem::transmute(svc_call) }; - ContainerCall { fut, container } - } -} - -impl<'f, S, R> Future for ContainerCall<'f, S, R> -where - S: Service, -{ - type Output = Result; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - self.project().fut.poll(cx) - } -} - pin_project_lite::pin_project! { #[must_use = "futures do nothing unless polled"] pub struct ServiceCall<'a, S, Req> @@ -394,33 +234,6 @@ where } } -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct ContainerFactory<'f, F, R, C> - where F: ServiceFactory, - F: ?Sized, - F: 'f, - C: 'f, - { - #[pin] - fut: F::Future<'f>, - } -} - -impl<'f, F, R, C> Future for ContainerFactory<'f, F, R, C> -where - F: ServiceFactory + 'f, -{ - type Output = Result, F::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - task::Poll::Ready(Ok(Container::new(task::ready!(self - .project() - .fut - .poll(cx))?))) - } -} - #[cfg(test)] mod tests { use ntex_util::future::{lazy, poll_fn, Ready}; @@ -428,6 +241,7 @@ mod tests { use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll}; use super::*; + use crate::Pipeline; struct Srv(Rc>, condition::Waiter); @@ -455,7 +269,7 @@ mod tests { let cnt = Rc::new(Cell::new(0)); let con = condition::Condition::new(); - let srv1 = Container::from(Srv(cnt.clone(), con.wait())); + let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait())); let srv2 = srv1.clone(); let res = lazy(|cx| srv1.poll_ready(cx)).await; @@ -484,19 +298,19 @@ mod tests { let cnt = Rc::new(Cell::new(0)); let con = condition::Condition::new(); - let srv1 = Container::from(Srv(cnt.clone(), con.wait())); + let srv1 = Pipeline::from(Srv(cnt.clone(), con.wait())); let srv2 = srv1.clone(); let data1 = data.clone(); ntex::rt::spawn(async move { let _ = poll_fn(|cx| srv1.poll_ready(cx)).await; - let i = srv1.container_call("srv1").await.unwrap(); + let i = srv1.call("srv1").await.unwrap(); data1.borrow_mut().push(i); }); let data2 = data.clone(); ntex::rt::spawn(async move { - let i = srv2.call("srv2").await.unwrap(); + let i = srv2.service_call("srv2").await.unwrap(); data2.borrow_mut().push(i); }); time::sleep(time::Millis(50)).await; diff --git a/ntex-service/src/fn_service.rs b/ntex-service/src/fn_service.rs index b8673f7b..5bfad342 100644 --- a/ntex-service/src/fn_service.rs +++ b/ntex-service/src/fn_service.rs @@ -40,7 +40,7 @@ where /// }); /// /// // construct new service -/// let srv = factory.container(&()).await?; +/// let srv = factory.pipeline(&()).await?; /// /// // now we can use `div` service /// let result = srv.call((10, 20)).await?; @@ -81,7 +81,7 @@ where /// }); /// /// // construct new service with config argument -/// let srv = factory.container(&10).await?; +/// let srv = factory.pipeline(&10).await?; /// /// let result = srv.call(10).await?; /// assert_eq!(result, 100); @@ -348,19 +348,19 @@ mod tests { use std::task::Poll; use super::*; - use crate::{Container, ServiceFactory}; + use crate::{Pipeline, ServiceFactory}; #[ntex::test] async fn test_fn_service() { let new_srv = fn_service(|()| async { Ok::<_, ()>("srv") }).clone(); - let srv = Container::new(new_srv.create(()).await.unwrap()); + let srv = Pipeline::new(new_srv.create(()).await.unwrap()); let res = srv.call(()).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(res.is_ok()); assert_eq!(res.unwrap(), "srv"); - let srv2 = Container::new(new_srv.clone()); + let srv2 = Pipeline::new(new_srv.clone()); let res = srv2.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), "srv"); @@ -370,7 +370,7 @@ mod tests { #[ntex::test] async fn test_fn_service_service() { - let srv = Container::new( + let srv = Pipeline::new( fn_service(|()| async { Ok::<_, ()>("srv") }) .clone() .create(&()) @@ -398,7 +398,7 @@ mod tests { }) .clone(); - let srv = Container::new(new_srv.create(&1).await.unwrap()); + let srv = Pipeline::new(new_srv.create(&1).await.unwrap()); let res = srv.call(()).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(res.is_ok()); diff --git a/ntex-service/src/fn_shutdown.rs b/ntex-service/src/fn_shutdown.rs index e90d9d49..bbf1c39f 100644 --- a/ntex-service/src/fn_shutdown.rs +++ b/ntex-service/src/fn_shutdown.rs @@ -70,7 +70,7 @@ mod tests { use ntex_util::future::lazy; use std::{rc::Rc, task::Poll}; - use crate::{fn_service, pipeline, Container}; + use crate::{chain, fn_service, Pipeline}; use super::*; @@ -83,7 +83,7 @@ mod tests { is_called2.set(true); }); - let pipe = Container::new(pipeline(srv).and_then(on_shutdown).clone()); + let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone()); let res = pipe.call(()).await; assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(()))); diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 7b0abe86..ca9fdbf2 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -9,6 +9,7 @@ use std::task::{self, Context, Poll}; mod and_then; mod apply; pub mod boxed; +mod chain; mod ctx; mod fn_service; mod fn_shutdown; @@ -22,12 +23,13 @@ mod pipeline; mod then; pub use self::apply::{apply_fn, apply_fn_factory}; -pub use self::ctx::{Container, ContainerCall, ContainerFactory, ServiceCall, ServiceCtx}; +pub use self::chain::{chain, chain_factory}; +pub use self::ctx::{ServiceCall, ServiceCtx}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_shutdown::fn_shutdown; pub use self::map_config::{map_config, unit_config}; pub use self::middleware::{apply, Identity, Middleware, Stack}; -pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; +pub use self::pipeline::{Pipeline, PipelineCall}; #[allow(unused_variables)] /// An asynchronous function of `Request` to a `Response`. @@ -141,12 +143,12 @@ pub trait Service { /// /// Note that this function consumes the receiving service and returns a wrapped version of it, /// similar to the existing `map` methods in the standard library. - fn map(self, f: F) -> crate::dev::Map + fn map(self, f: F) -> dev::ServiceChain, Req> where Self: Sized, F: Fn(Self::Response) -> Res, { - crate::dev::Map::new(self, f) + chain(dev::Map::new(self, f)) } #[inline] @@ -157,12 +159,21 @@ pub trait Service { /// error type. /// /// Note that this function consumes the receiving service and returns a wrapped version of it. - fn map_err(self, f: F) -> crate::dev::MapErr + fn map_err(self, f: F) -> dev::ServiceChain, Req> where Self: Sized, F: Fn(Self::Error) -> E, { - crate::dev::MapErr::new(self, f) + chain(dev::MapErr::new(self, f)) + } + + #[inline] + /// Convert `Self` to a `ServiceChain` + fn chain(self) -> dev::ServiceChain + where + Self: Sized, + { + chain(self) } } @@ -200,32 +211,38 @@ pub trait ServiceFactory { fn create(&self, cfg: Cfg) -> Self::Future<'_>; /// Create and return a new service value asynchronously and wrap into a container - fn container(&self, cfg: Cfg) -> ContainerFactory<'_, Self, Req, Cfg> + fn pipeline(&self, cfg: Cfg) -> dev::CreatePipeline<'_, Self, Req, Cfg> where Self: Sized, { - Container::::create(self, cfg) + dev::CreatePipeline::new(self.create(cfg)) } #[inline] /// Map this service's output to a different type, returning a new service /// of the resulting type. - fn map(self, f: F) -> crate::map::MapFactory + fn map( + self, + f: F, + ) -> dev::ServiceChainFactory, Req, Cfg> where Self: Sized, F: Fn(Self::Response) -> Res + Clone, { - crate::map::MapFactory::new(self, f) + chain_factory(dev::MapFactory::new(self, f)) } #[inline] /// Map this service's error to a different error, returning a new service. - fn map_err(self, f: F) -> crate::map_err::MapErrFactory + fn map_err( + self, + f: F, + ) -> dev::ServiceChainFactory, Req, Cfg> where Self: Sized, F: Fn(Self::Error) -> E + Clone, { - crate::map_err::MapErrFactory::new(self, f) + chain_factory(dev::MapErrFactory::new(self, f)) } #[inline] @@ -233,12 +250,12 @@ pub trait ServiceFactory { fn map_init_err( self, f: F, - ) -> crate::map_init_err::MapInitErr + ) -> dev::ServiceChainFactory, Req, Cfg> where Self: Sized, F: Fn(Self::InitError) -> E + Clone, { - crate::map_init_err::MapInitErr::new(self, f) + chain_factory(dev::MapInitErr::new(self, f)) } } @@ -312,6 +329,15 @@ where { /// Convert to a `Service` fn into_service(self) -> Svc; + + #[inline] + /// Convert `Self` to a `ServiceChain` + fn into_chain(self) -> dev::ServiceChain + where + Self: Sized, + { + chain(self) + } } /// Trait for types that can be converted to a `ServiceFactory` @@ -321,12 +347,22 @@ where { /// Convert `Self` to a `ServiceFactory` fn into_factory(self) -> T; + + #[inline] + /// Convert `Self` to a `ServiceChainFactory` + fn chain(self) -> dev::ServiceChainFactory + where + Self: Sized, + { + chain_factory(self) + } } impl IntoService for Svc where Svc: Service, { + #[inline] fn into_service(self) -> Svc { self } @@ -336,6 +372,7 @@ impl IntoServiceFactory for T where T: ServiceFactory, { + #[inline] fn into_factory(self) -> T { self } @@ -353,6 +390,7 @@ where pub mod dev { pub use crate::and_then::{AndThen, AndThenFactory}; pub use crate::apply::{Apply, ApplyFactory, ApplyService}; + pub use crate::chain::{ServiceChain, ServiceChainFactory}; pub use crate::fn_service::{ FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig, }; @@ -362,5 +400,6 @@ pub mod dev { pub use crate::map_err::{MapErr, MapErrFactory}; pub use crate::map_init_err::MapInitErr; pub use crate::middleware::ApplyMiddleware; + pub use crate::pipeline::CreatePipeline; pub use crate::then::{Then, ThenFactory}; } diff --git a/ntex-service/src/map.rs b/ntex-service/src/map.rs index d57daa47..cb5e4f70 100644 --- a/ntex-service/src/map.rs +++ b/ntex-service/src/map.rs @@ -192,7 +192,7 @@ mod tests { use ntex_util::future::{lazy, Ready}; use super::*; - use crate::{fn_factory, Container, Service, ServiceCtx, ServiceFactory}; + use crate::{fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; #[derive(Clone)] struct Srv; @@ -213,7 +213,7 @@ mod tests { #[ntex::test] async fn test_service() { - let srv = Container::new(Srv.map(|_| "ok").clone()); + let srv = Pipeline::new(Srv.map(|_| "ok").clone()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), "ok"); @@ -227,7 +227,7 @@ mod tests { #[ntex::test] async fn test_pipeline() { - let srv = Container::new(crate::pipeline(Srv).map(|_| "ok").clone()); + let srv = Pipeline::new(crate::chain(Srv).map(|_| "ok").clone()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), "ok"); @@ -244,7 +244,7 @@ mod tests { let new_srv = fn_factory(|| async { Ok::<_, ()>(Srv) }) .map(|_| "ok") .clone(); - let srv = Container::new(new_srv.create(&()).await.unwrap()); + let srv = Pipeline::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("ok")); @@ -252,10 +252,10 @@ mod tests { #[ntex::test] async fn test_pipeline_factory() { - let new_srv = crate::pipeline_factory(fn_factory(|| async { Ok::<_, ()>(Srv) })) + let new_srv = crate::chain_factory(fn_factory(|| async { Ok::<_, ()>(Srv) })) .map(|_| "ok") .clone(); - let srv = Container::new(new_srv.create(&()).await.unwrap()); + let srv = Pipeline::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("ok")); diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index b10bd4e1..62aab36e 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -196,7 +196,7 @@ mod tests { use ntex_util::future::{lazy, Ready}; use super::*; - use crate::{fn_factory, Container, Service, ServiceCtx, ServiceFactory}; + use crate::{fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; #[derive(Clone)] struct Srv(bool); @@ -231,7 +231,7 @@ mod tests { #[ntex::test] async fn test_service() { - let srv = Container::new(Srv(false).map_err(|_| "error").clone()); + let srv = Pipeline::new(Srv(false).map_err(|_| "error").clone()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); @@ -239,7 +239,7 @@ mod tests { #[ntex::test] async fn test_pipeline() { - let srv = Container::new(crate::pipeline(Srv(false)).map_err(|_| "error").clone()); + let srv = Pipeline::new(crate::chain(Srv(false)).map_err(|_| "error").clone()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); @@ -250,7 +250,7 @@ mod tests { let new_srv = fn_factory(|| Ready::<_, ()>::Ok(Srv(false))) .map_err(|_| "error") .clone(); - let srv = Container::new(new_srv.create(&()).await.unwrap()); + let srv = Pipeline::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); @@ -259,10 +259,10 @@ mod tests { #[ntex::test] async fn test_pipeline_factory() { let new_srv = - crate::pipeline_factory(fn_factory(|| async { Ok::(Srv(false)) })) + crate::chain_factory(fn_factory(|| async { Ok::(Srv(false)) })) .map_err(|_| "error") .clone(); - let srv = Container::new(new_srv.create(&()).await.unwrap()); + let srv = Pipeline::new(new_srv.create(&()).await.unwrap()); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); diff --git a/ntex-service/src/map_init_err.rs b/ntex-service/src/map_init_err.rs index 5d699471..72eaaacf 100644 --- a/ntex-service/src/map_init_err.rs +++ b/ntex-service/src/map_init_err.rs @@ -89,11 +89,11 @@ where #[cfg(test)] mod tests { - use crate::{fn_factory_with_config, into_service, pipeline_factory, ServiceFactory}; + use crate::{chain_factory, fn_factory_with_config, into_service, ServiceFactory}; #[ntex::test] async fn map_init_err() { - let factory = pipeline_factory(fn_factory_with_config(|err: &bool| { + let factory = chain_factory(fn_factory_with_config(|err: &bool| { let err = *err; async move { if err { diff --git a/ntex-service/src/middleware.rs b/ntex-service/src/middleware.rs index fa9f1596..d370e952 100644 --- a/ntex-service/src/middleware.rs +++ b/ntex-service/src/middleware.rs @@ -214,7 +214,7 @@ mod tests { use std::marker; use super::*; - use crate::{fn_service, Container, Service, ServiceCall, ServiceCtx, ServiceFactory}; + use crate::{fn_service, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory}; #[derive(Clone)] struct Tr(marker::PhantomData); @@ -252,7 +252,7 @@ mod tests { ) .clone(); - let srv = Container::new(factory.create(&()).await.unwrap().clone()); + let srv = Pipeline::new(factory.create(&()).await.unwrap().clone()); let res = srv.call(10).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), 20); @@ -264,11 +264,11 @@ mod tests { assert_eq!(res, Poll::Ready(())); let factory = - crate::pipeline_factory(fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2))) + crate::chain_factory(fn_service(|i: usize| Ready::<_, ()>::Ok(i * 2))) .apply(Rc::new(Tr(marker::PhantomData).clone())) .clone(); - let srv = Container::new(factory.create(&()).await.unwrap().clone()); + let srv = Pipeline::new(factory.create(&()).await.unwrap().clone()); let res = srv.call(10).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), 20); diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 70d91c94..893182cc 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -1,278 +1,194 @@ -use std::marker::PhantomData; +use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll}; -use crate::and_then::{AndThen, AndThenFactory}; -use crate::ctx::{Container, ServiceCall, ServiceCtx}; -use crate::map::{Map, MapFactory}; -use crate::map_err::{MapErr, MapErrFactory}; -use crate::map_init_err::MapInitErr; -use crate::middleware::{ApplyMiddleware, Middleware}; -use crate::then::{Then, ThenFactory}; -use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; +use crate::ctx::{ServiceCall, ServiceCtx, Waiters}; +use crate::{Service, ServiceFactory}; -/// Constructs new pipeline with one service in pipeline chain. -pub fn pipeline(service: F) -> Pipeline -where - Svc: Service, - F: IntoService, -{ - Pipeline { - service: service.into_service(), - _t: PhantomData, - } +/// Container for a service. +/// +/// Container allows to call enclosed service and adds support of shared readiness. +pub struct Pipeline { + svc: Rc, + waiters: Waiters, + pending: Cell, } -/// Constructs new pipeline factory with one service factory. -pub fn pipeline_factory(factory: F) -> PipelineFactory -where - T: ServiceFactory, - F: IntoServiceFactory, -{ - PipelineFactory { - factory: factory.into_factory(), - _t: PhantomData, - } -} - -/// Pipeline service - pipeline allows to compose multiple service into one service. -pub struct Pipeline { - service: Svc, - _t: PhantomData, -} - -impl> Pipeline { - /// Call another service after call to this one has resolved successfully. - /// - /// This function can be used to chain two services together and ensure that - /// the second service isn't called until call to the fist service have - /// finished. Result of the call to the first service is used as an - /// input parameter for the second service's call. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - pub fn and_then(self, service: F) -> Pipeline> - where - Self: Sized, - F: IntoService, - Next: Service, - { +impl Pipeline { + #[inline] + /// Construct new container instance. + pub fn new(svc: S) -> Self { Pipeline { - service: AndThen::new(self.service, service.into_service()), - _t: PhantomData, + svc: Rc::new(svc), + pending: Cell::new(false), + waiters: Waiters::new(), } } - /// Chain on a computation for when a call to the service finished, - /// passing the result of the call to the next service `U`. - /// - /// Note that this function consumes the receiving pipeline and returns a - /// wrapped version of it. - pub fn then(self, service: F) -> Pipeline> - where - Self: Sized, - F: IntoService>, - Next: Service, Error = Svc::Error>, - { - Pipeline { - service: Then::new(self.service, service.into_service()), - _t: PhantomData, - } - } - - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - /// - /// This function is similar to the `Option::map` or `Iterator::map` where - /// it will change the type of the underlying service. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it, similar to the existing `map` methods in the - /// standard library. - pub fn map(self, f: F) -> Pipeline> - where - Self: Sized, - F: Fn(Svc::Response) -> Res, - { - Pipeline { - service: Map::new(self.service, f), - _t: PhantomData, - } - } - - /// Map this service's error to a different error, returning a new service. - /// - /// This function is similar to the `Result::map_err` where it will change - /// the error type of the underlying service. This is useful for example to - /// ensure that services have the same error type. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - pub fn map_err(self, f: F) -> Pipeline> - where - Self: Sized, - F: Fn(Svc::Error) -> Err, - { - Pipeline { - service: MapErr::new(self.service, f), - _t: PhantomData, - } - } - - /// Create service container - pub fn container(self) -> Container { - Container::new(self.service) - } -} - -impl Clone for Pipeline -where - Svc: Clone, -{ - fn clone(&self) -> Self { - Pipeline { - service: self.service.clone(), - _t: PhantomData, - } - } -} - -impl> Service for Pipeline { - type Response = Svc::Response; - type Error = Svc::Error; - type Future<'f> = ServiceCall<'f, Svc, Req> where Self: 'f, Req: 'f; - - crate::forward_poll_ready!(service); - crate::forward_poll_shutdown!(service); - #[inline] - fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - ctx.call(&self.service, req) + /// Return reference to enclosed service + pub fn get_ref(&self) -> &S { + self.svc.as_ref() } -} - -/// Pipeline factory -pub struct PipelineFactory { - factory: T, - _t: PhantomData<(Req, C)>, -} - -impl, C> PipelineFactory { - /// Call another service after call to this one has resolved successfully. - pub fn and_then(self, factory: F) -> PipelineFactory, C> - where - Self: Sized, - F: IntoServiceFactory, - U: ServiceFactory, - { - PipelineFactory { - factory: AndThenFactory::new(self.factory, factory.into_factory()), - _t: PhantomData, - } - } - - /// Apply middleware to current service factory. - /// - /// Short version of `apply(middleware, pipeline_factory(...))` - pub fn apply(self, tr: U) -> PipelineFactory, C> - where - U: Middleware, - { - PipelineFactory { - factory: ApplyMiddleware::new(tr, self.factory), - _t: PhantomData, - } - } - - /// Create `NewService` to chain on a computation for when a call to the - /// service finished, passing the result of the call to the next - /// service `U`. - /// - /// Note that this function consumes the receiving pipeline and returns a - /// wrapped version of it. - pub fn then(self, factory: F) -> PipelineFactory, C> - where - Self: Sized, - C: Clone, - F: IntoServiceFactory, C>, - U: ServiceFactory< - Result, - C, - Error = T::Error, - InitError = T::InitError, - >, - { - PipelineFactory { - factory: ThenFactory::new(self.factory, factory.into_factory()), - _t: PhantomData, - } - } - - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - pub fn map(self, f: F) -> PipelineFactory, C> - where - Self: Sized, - F: Fn(T::Response) -> Res + Clone, - { - PipelineFactory { - factory: MapFactory::new(self.factory, f), - _t: PhantomData, - } - } - - /// Map this service's error to a different error, returning a new service. - pub fn map_err( - self, - f: F, - ) -> PipelineFactory, C> - where - Self: Sized, - F: Fn(T::Error) -> E + Clone, - { - PipelineFactory { - factory: MapErrFactory::new(self.factory, f), - _t: PhantomData, - } - } - - /// Map this factory's init error to a different error, returning a new service. - pub fn map_init_err( - self, - f: F, - ) -> PipelineFactory, C> - where - Self: Sized, - F: Fn(T::InitError) -> E + Clone, - { - PipelineFactory { - factory: MapInitErr::new(self.factory, f), - _t: PhantomData, - } - } -} - -impl Clone for PipelineFactory -where - T: Clone, -{ - fn clone(&self) -> Self { - PipelineFactory { - factory: self.factory.clone(), - _t: PhantomData, - } - } -} - -impl, C> ServiceFactory - for PipelineFactory -{ - type Response = T::Response; - type Error = T::Error; - type Service = T::Service; - type InitError = T::InitError; - type Future<'f> = T::Future<'f> where Self: 'f; #[inline] - fn create(&self, cfg: C) -> Self::Future<'_> { - self.factory.create(cfg) + /// Returns `Ready` when the service is able to process requests. + pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> + where + S: Service, + { + let res = self.svc.poll_ready(cx); + if res.is_pending() { + self.pending.set(true); + self.waiters.register(cx) + } else if self.pending.get() { + self.pending.set(false); + self.waiters.notify() + } + res + } + + #[inline] + /// Shutdown enclosed service. + pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> + where + S: Service, + { + self.svc.poll_shutdown(cx) + } + + #[inline] + /// Wait for service readiness and then create future object + /// that resolves to service result. + pub fn service_call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R> + where + S: Service, + { + ServiceCtx::<'a, S>::new(&self.waiters).call(self.svc.as_ref(), req) + } + + #[inline] + /// Call service and create future object that resolves to service result. + /// + /// Note, this call does not check service readiness. + pub fn call(&self, req: R) -> PipelineCall<'_, S, R> + where + S: Service, + { + let pipeline = self.clone(); + let svc_call = pipeline.svc.call(req, ServiceCtx::new(&pipeline.waiters)); + + // SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc` + // Pipeline::svc is heap allocated(Rc), we keep it alive until + // `svc_call` get resolved to result + let fut = unsafe { std::mem::transmute(svc_call) }; + PipelineCall { fut, pipeline } + } + + /// Extract service if container hadnt been cloned before. + pub fn into_service(self) -> Option { + let svc = self.svc.clone(); + drop(self); + Rc::try_unwrap(svc).ok() + } +} + +impl From for Pipeline { + #[inline] + fn from(svc: S) -> Self { + Pipeline::new(svc) + } +} + +impl Clone for Pipeline { + #[inline] + fn clone(&self) -> Self { + Self { + svc: self.svc.clone(), + pending: Cell::new(false), + waiters: self.waiters.clone(), + } + } +} + +pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] + pub struct PipelineCall<'f, S, R> + where + S: Service, + S: 'f, + R: 'f, + { + #[pin] + fut: S::Future<'f>, + pipeline: Pipeline, + } +} + +impl<'f, S, R> PipelineCall<'f, S, R> +where + S: Service + 'f, + R: 'f, +{ + #[inline] + /// Convert future object to static version. + /// + /// Returned future is suitable for spawning into a async runtime. + /// Note, this call does not check service readiness. + pub fn into_static(self) -> PipelineCall<'static, S, R> { + let svc_call = self.fut; + let pipeline = self.pipeline; + + // SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc` + // Pipeline::svc is heap allocated(Rc), we keep it alive until + // `svc_call` get resolved to result + let fut = unsafe { std::mem::transmute(svc_call) }; + PipelineCall { fut, pipeline } + } +} + +impl<'f, S, R> future::Future for PipelineCall<'f, S, R> +where + S: Service, +{ + type Output = Result; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] + pub struct CreatePipeline<'f, F, R, C> + where F: ServiceFactory, + F: ?Sized, + F: 'f, + C: 'f, + { + #[pin] + fut: F::Future<'f>, + } +} + +impl<'f, F, R, C> CreatePipeline<'f, F, R, C> +where + F: ServiceFactory + 'f, +{ + pub(crate) fn new(fut: F::Future<'f>) -> Self { + Self { fut } + } +} + +impl<'f, F, R, C> future::Future for CreatePipeline<'f, F, R, C> +where + F: ServiceFactory + 'f, +{ + type Output = Result, F::InitError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(Pipeline::new(std::task::ready!(self + .project() + .fut + .poll(cx))?))) } } diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 092a218a..ce1aac2c 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -248,7 +248,7 @@ mod tests { use ntex_util::future::{lazy, Ready}; use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; - use crate::{pipeline, pipeline_factory, Service, ServiceCtx, ServiceFactory}; + use crate::{chain, chain_factory, Service, ServiceCtx}; #[derive(Clone)] struct Srv1(Rc>); @@ -303,7 +303,7 @@ mod tests { #[ntex::test] async fn test_poll_ready() { let cnt = Rc::new(Cell::new(0)); - let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone())); + let srv = chain(Srv1(cnt.clone())).then(Srv2(cnt.clone())); let res = lazy(|cx| srv.poll_ready(cx)).await; assert_eq!(res, Poll::Ready(Ok(()))); assert_eq!(cnt.get(), 2); @@ -314,10 +314,7 @@ mod tests { #[ntex::test] async fn test_call() { let cnt = Rc::new(Cell::new(0)); - let srv = pipeline(Srv1(cnt.clone())) - .then(Srv2(cnt)) - .clone() - .container(); + let srv = chain(Srv1(cnt.clone())).then(Srv2(cnt)).clone().pipeline(); let res = srv.call(Ok("srv1")).await; assert!(res.is_ok()); @@ -333,10 +330,10 @@ mod tests { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); let blank = move || Ready::<_, ()>::Ok(Srv1(cnt2.clone())); - let factory = pipeline_factory(blank) + let factory = chain_factory(blank) .then(move || Ready::Ok(Srv2(cnt.clone()))) .clone(); - let srv = factory.container(&()).await.unwrap(); + let srv = factory.pipeline(&()).await.unwrap(); let res = srv.call(Ok("srv1")).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "ok")); diff --git a/ntex-tls/Cargo.toml b/ntex-tls/Cargo.toml index 87816fa0..a7905202 100644 --- a/ntex-tls/Cargo.toml +++ b/ntex-tls/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tls" -version = "0.3.0-beta.1" +version = "0.3.0" authors = ["ntex contributors "] description = "An implementation of SSL streams for ntex backed by OpenSSL" keywords = ["network", "framework", "async", "futures"] @@ -26,9 +26,9 @@ rustls = ["tls_rust"] [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.3.0-beta.1" -ntex-util = "0.3.0-beta.1" -ntex-service = "1.2.0-beta.1" +ntex-io = "0.3.0" +ntex-util = "0.3.0" +ntex-service = "1.2.0" log = "0.4" pin-project-lite = "0.2" @@ -39,7 +39,7 @@ tls_openssl = { version = "0.10", package = "openssl", optional = true } tls_rust = { version = "0.21", package = "rustls", optional = true } [dev-dependencies] -ntex = { version = "0.7.0-beta.1", features = ["openssl", "rustls", "tokio"] } +ntex = { version = "0.7.0", features = ["openssl", "rustls", "tokio"] } env_logger = "0.10" -rustls-pemfile = { version = "1.0" } -webpki-roots = { version = "0.23" } +rustls-pemfile = "1.0" +webpki-roots = "0.23" diff --git a/ntex-tls/examples/rustls-server.rs b/ntex-tls/examples/rustls-server.rs index b87ebe59..ffee739c 100644 --- a/ntex-tls/examples/rustls-server.rs +++ b/ntex-tls/examples/rustls-server.rs @@ -1,6 +1,6 @@ use std::{fs::File, io, io::BufReader, sync::Arc}; -use ntex::service::{fn_service, pipeline_factory}; +use ntex::service::{chain_factory, fn_service}; use ntex::{codec, io::filter, io::Io, server, util::Either}; use ntex_tls::rustls::TlsAcceptor; use rustls_pemfile::{certs, rsa_private_keys}; @@ -34,7 +34,7 @@ async fn main() -> io::Result<()> { // start server server::ServerBuilder::new() .bind("basic", "127.0.0.1:8443", move |_| { - pipeline_factory(filter(TlsAcceptor::new(tls_config.clone()))).and_then( + chain_factory(filter(TlsAcceptor::new(tls_config.clone()))).and_then( fn_service(|io: Io<_>| async move { println!("New client is connected"); diff --git a/ntex-tls/examples/server.rs b/ntex-tls/examples/server.rs index 567a214b..74b25079 100644 --- a/ntex-tls/examples/server.rs +++ b/ntex-tls/examples/server.rs @@ -1,6 +1,6 @@ use std::io; -use ntex::service::{fn_service, pipeline_factory}; +use ntex::service::{chain_factory, fn_service}; use ntex::{codec, io::filter, io::Io, server, util::Either}; use ntex_tls::openssl::{PeerCert, PeerCertChain, SslAcceptor}; use tls_openssl::ssl::{self, SslFiletype, SslMethod, SslVerifyMode}; @@ -27,8 +27,8 @@ async fn main() -> io::Result<()> { // start server server::ServerBuilder::new() .bind("basic", "127.0.0.1:8443", move |_| { - pipeline_factory(filter(SslAcceptor::new(acceptor.clone()))).and_then( - fn_service(|io: Io<_>| async move { + chain_factory(filter(SslAcceptor::new(acceptor.clone()))).and_then(fn_service( + |io: Io<_>| async move { println!("New client is connected"); if let Some(cert) = io.query::().as_ref() { println!("Peer cert: {:?}", cert.0); @@ -53,8 +53,8 @@ async fn main() -> io::Result<()> { } println!("Client is disconnected"); Ok(()) - }), - ) + }, + )) })? .workers(1) .run() diff --git a/ntex-tokio/Cargo.toml b/ntex-tokio/Cargo.toml index 2db7954a..539f66b7 100644 --- a/ntex-tokio/Cargo.toml +++ b/ntex-tokio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tokio" -version = "0.3.0-beta.0" +version = "0.3.0" authors = ["ntex contributors "] description = "tokio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.3.0-beta.0" -ntex-util = "0.3.0-beta.0" +ntex-io = "0.3.0" +ntex-util = "0.3.0" log = "0.4" pin-project-lite = "0.2" tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] } diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index a3927a7d..bef46d01 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "0.3.0-beta.1" +version = "0.3.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] ntex-rt = "0.4.7" -ntex-service = "1.2.0-beta.1" +ntex-service = "1.2.0" bitflags = "1.3" fxhash = "0.2.1" log = "0.4" @@ -28,7 +28,7 @@ futures-sink = { version = "0.3", default-features = false, features = ["alloc"] pin-project-lite = "0.2.9" [dev-dependencies] -ntex = { version = "0.7.0-beta.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } ntex-bytes = "0.1.18" ntex-macros = "0.1.3" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex-util/src/services/buffer.rs b/ntex-util/src/services/buffer.rs index 35775448..7f7aae2a 100644 --- a/ntex-util/src/services/buffer.rs +++ b/ntex-util/src/services/buffer.rs @@ -191,7 +191,7 @@ where #[cfg(test)] mod tests { - use ntex_service::{apply, fn_factory, Container, Service, ServiceFactory}; + use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory}; use std::{rc::Rc, task::Context, task::Poll, time::Duration}; use super::*; @@ -235,7 +235,7 @@ mod tests { count: Cell::new(0), }); - let srv = Container::new(BufferService::new(2, TestService(inner.clone())).clone()); + let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone()); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv1 = srv.clone(); @@ -274,7 +274,7 @@ mod tests { count: Cell::new(0), }); - let srv = Container::new(BufferService::new(2, TestService(inner.clone()))); + let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone()))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let _ = srv.call(()).await; assert_eq!(inner.count.get(), 1); @@ -296,7 +296,7 @@ mod tests { fn_factory(|| async { Ok::<_, ()>(TestService(inner.clone())) }), ); - let srv = srv.container(&()).await.unwrap(); + let srv = srv.pipeline(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv1 = srv.clone(); diff --git a/ntex-util/src/services/inflight.rs b/ntex-util/src/services/inflight.rs index 83f93bec..eb15e20b 100644 --- a/ntex-util/src/services/inflight.rs +++ b/ntex-util/src/services/inflight.rs @@ -109,7 +109,7 @@ impl<'f, T: Service, R> Future for InFlightServiceResponse<'f, T, R> { #[cfg(test)] mod tests { - use ntex_service::{apply, fn_factory, Container, Service, ServiceCtx, ServiceFactory}; + use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; use std::{cell::RefCell, task::Poll, time::Duration}; use super::*; @@ -134,7 +134,7 @@ mod tests { async fn test_service() { let (tx, rx) = oneshot::channel(); - let srv = Container::new(InFlightService::new(1, SleepService(rx))); + let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); @@ -168,7 +168,7 @@ mod tests { }), ); - let srv = srv.container(&()).await.unwrap(); + let srv = srv.pipeline(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); diff --git a/ntex-util/src/services/keepalive.rs b/ntex-util/src/services/keepalive.rs index 024e70ad..93fb7505 100644 --- a/ntex-util/src/services/keepalive.rs +++ b/ntex-util/src/services/keepalive.rs @@ -134,7 +134,7 @@ mod tests { let factory = KeepAlive::new(Millis(100), || TestErr); let _ = factory.clone(); - let service = factory.container(&()).await.unwrap(); + let service = factory.pipeline(&()).await.unwrap(); assert_eq!(service.call(1usize).await, Ok(1usize)); assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); diff --git a/ntex-util/src/services/onerequest.rs b/ntex-util/src/services/onerequest.rs index e9a37098..dde7b3cc 100644 --- a/ntex-util/src/services/onerequest.rs +++ b/ntex-util/src/services/onerequest.rs @@ -101,7 +101,7 @@ impl<'f, T: Service, R> Future for OneRequestServiceResponse<'f, T, R> { #[cfg(test)] mod tests { - use ntex_service::{apply, fn_factory, Container, Service, ServiceCtx, ServiceFactory}; + use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceCtx, ServiceFactory}; use std::{cell::RefCell, task::Poll, time::Duration}; use super::*; @@ -126,7 +126,7 @@ mod tests { async fn test_oneshot() { let (tx, rx) = oneshot::channel(); - let srv = Container::new(OneRequestService::new(SleepService(rx))); + let srv = Pipeline::new(OneRequestService::new(SleepService(rx))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); @@ -156,7 +156,7 @@ mod tests { }), ); - let srv = srv.container(&()).await.unwrap(); + let srv = srv.pipeline(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index d2a02e05..79dcc546 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -214,7 +214,7 @@ where mod tests { use std::{fmt, time::Duration}; - use ntex_service::{apply, fn_factory, Container, Service, ServiceFactory}; + use ntex_service::{apply, fn_factory, Pipeline, Service, ServiceFactory}; use super::*; use crate::future::{lazy, BoxFuture}; @@ -250,9 +250,8 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(50); - let timeout = Container::new( - TimeoutService::new(resolution, SleepService(wait_time)).clone(), - ); + let timeout = + Pipeline::new(TimeoutService::new(resolution, SleepService(wait_time)).clone()); assert_eq!(timeout.call(()).await, Ok(())); assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready()); assert!(lazy(|cx| timeout.poll_shutdown(cx)).await.is_ready()); @@ -264,7 +263,7 @@ mod tests { let resolution = Duration::from_millis(0); let timeout = - Container::new(TimeoutService::new(resolution, SleepService(wait_time))); + Pipeline::new(TimeoutService::new(resolution, SleepService(wait_time))); assert_eq!(timeout.call(()).await, Ok(())); assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready()); } @@ -275,7 +274,7 @@ mod tests { let wait_time = Duration::from_millis(500); let timeout = - Container::new(TimeoutService::new(resolution, SleepService(wait_time))); + Pipeline::new(TimeoutService::new(resolution, SleepService(wait_time))); assert_eq!(timeout.call(()).await, Err(TimeoutError::Timeout)); } @@ -289,7 +288,7 @@ mod tests { Timeout::new(resolution).clone(), fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }), ); - let srv = timeout.container(&()).await.unwrap(); + let srv = timeout.pipeline(&()).await.unwrap(); let res = srv.call(()).await.unwrap_err(); assert_eq!(res, TimeoutError::Timeout); diff --git a/ntex-util/src/services/variant.rs b/ntex-util/src/services/variant.rs index a5158480..647abb3b 100644 --- a/ntex-util/src/services/variant.rs +++ b/ntex-util/src/services/variant.rs @@ -354,7 +354,7 @@ mod tests { .clone() .v3(fn_factory(|| async { Ok::<_, ()>(Srv2) })) .clone(); - let service = factory.container(&()).await.unwrap().clone(); + let service = factory.pipeline(&()).await.unwrap().clone(); assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); assert!(lazy(|cx| service.poll_shutdown(cx)).await.is_ready()); diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 58ddd964..1693bf26 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.0-beta.2" +version = "0.7.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -49,20 +49,20 @@ async-std = ["ntex-rt/async-std", "ntex-async-std", "ntex-connect/async-std"] [dependencies] ntex-codec = "0.6.2" -ntex-connect = "0.3.0-beta.1" +ntex-connect = "0.3.0" ntex-http = "0.1.9" ntex-router = "0.5.1" -ntex-service = "1.2.0-beta.3" +ntex-service = "1.2.0" ntex-macros = "0.1.3" -ntex-util = "0.3.0-beta.1" +ntex-util = "0.3.0" ntex-bytes = "0.1.19" -ntex-h2 = "0.3.0-beta.2" +ntex-h2 = "0.3.0" ntex-rt = "0.4.9" -ntex-io = "0.3.0-beta.2" -ntex-tls = "0.3.0-beta.1" -ntex-tokio = { version = "0.3.0-beta.0", optional = true } -ntex-glommio = { version = "0.3.0-beta.0", optional = true } -ntex-async-std = { version = "0.3.0-beta.0", optional = true } +ntex-io = "0.3.0" +ntex-tls = "0.3.0" +ntex-tokio = { version = "0.3.0", optional = true } +ntex-glommio = { version = "0.3.0", optional = true } +ntex-async-std = { version = "0.3.0", optional = true } async-oneshot = "0.5.0" async-channel = "1.8.0" @@ -108,5 +108,5 @@ time = "0.3" futures-util = "0.3" tls-openssl = { version="0.10", package = "openssl" } tls-rustls = { version = "0.21", package="rustls", features = ["dangerous_configuration"] } -rustls-pemfile = { version = "1.0" } -webpki-roots = { version = "0.23" } +rustls-pemfile = "1.0" +webpki-roots = "0.23" diff --git a/ntex/src/http/client/connect.rs b/ntex/src/http/client/connect.rs index 451897b7..dde1a0a4 100644 --- a/ntex/src/http/client/connect.rs +++ b/ntex/src/http/client/connect.rs @@ -1,13 +1,13 @@ use std::net; use crate::http::{body::Body, RequestHeadType}; -use crate::{service::Container, service::Service, util::BoxFuture}; +use crate::{service::Pipeline, service::Service, util::BoxFuture}; use super::error::{ConnectError, SendRequestError}; use super::response::ClientResponse; use super::{Connect as ClientConnect, Connection}; -pub(super) struct ConnectorWrapper(pub(crate) Container); +pub(super) struct ConnectorWrapper(pub(crate) Pipeline); pub(super) trait Connect { fn send_request( diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index 83cad5a4..daf74c99 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -53,6 +53,7 @@ impl Connector { let conn = Connector { connector: boxed::service( TcpConnector::new() + .chain() .map(IoBoxed::from) .map_err(ConnectError::from), ), @@ -192,8 +193,12 @@ impl Connector { T: Service, Error = crate::connect::ConnectError> + 'static, IoBoxed: From, { - self.connector = - boxed::service(connector.map(IoBoxed::from).map_err(ConnectError::from)); + self.connector = boxed::service( + connector + .chain() + .map(IoBoxed::from) + .map_err(ConnectError::from), + ); self } @@ -204,7 +209,10 @@ impl Connector { IoBoxed: From, { self.ssl_connector = Some(boxed::service( - connector.map(IoBoxed::from).map_err(ConnectError::from), + connector + .chain() + .map(IoBoxed::from) + .map_err(ConnectError::from), )); self } @@ -257,6 +265,7 @@ fn connector( async move { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)).await }, ) }) + .chain() .map(move |io: IoBoxed| { io.set_disconnect_timeout(disconnect_timeout); io diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 9b68f9d9..47d59775 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -6,7 +6,7 @@ use ntex_h2::{self as h2}; use crate::http::uri::{Authority, Scheme, Uri}; use crate::io::{types::HttpProtocol, IoBoxed}; -use crate::service::{Container, Service, ServiceCall, ServiceCtx}; +use crate::service::{Pipeline, Service, ServiceCall, ServiceCtx}; use crate::time::{now, Millis}; use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet}; use crate::{channel::pool, rt::spawn, task::LocalWaker}; @@ -44,7 +44,7 @@ struct AvailableConnection { /// Connections pool pub(super) struct ConnectionPool { - connector: Container, + connector: Pipeline, inner: Rc>, waiters: Rc>, } @@ -61,7 +61,7 @@ where limit: usize, h2config: h2::Config, ) -> Self { - let connector = Container::new(connector); + let connector = Pipeline::new(connector); let waiters = Rc::new(RefCell::new(Waiters { waiters: HashMap::default(), pool: pool::new(), @@ -307,7 +307,7 @@ impl Inner { } struct ConnectionPoolSupport { - connector: Container, + connector: Pipeline, inner: Rc>, waiters: Rc>, } @@ -408,7 +408,7 @@ where tx: Waiter, uri: Uri, inner: Rc>, - connector: Container, + pipeline: Pipeline, msg: Connect, ) { let disconnect_timeout = inner.borrow().disconnect_timeout; @@ -416,7 +416,7 @@ where #[allow(clippy::redundant_async_block)] spawn(async move { OpenConnection:: { - fut: connector.call(msg), + fut: pipeline.service_call(msg), tx: Some(tx), key: key.clone(), inner: inner.clone(), @@ -629,7 +629,7 @@ mod tests { let store = Rc::new(RefCell::new(Vec::new())); let store2 = store.clone(); - let pool = Container::new( + let pool = Pipeline::new( ConnectionPool::new( fn_service(move |req| { let (client, server) = Io::create(); diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index ff7aa915..d46919ab 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -3,7 +3,7 @@ use std::{cell::Cell, ptr::copy_nonoverlapping, rc::Rc, time, time::Duration}; use ntex_h2::{self as h2}; use crate::http::{Request, Response}; -use crate::service::{boxed::BoxService, Container}; +use crate::service::{boxed::BoxService, Pipeline}; use crate::time::{sleep, Millis, Seconds}; use crate::{io::IoRef, util::BytesMut}; @@ -102,16 +102,16 @@ impl ServiceConfig { pub(super) type OnRequest = BoxService<(Request, IoRef), Request, Response>; pub(super) struct DispatcherConfig { - pub(super) service: Container, - pub(super) expect: Container, - pub(super) upgrade: Option>, + pub(super) service: Pipeline, + pub(super) expect: Pipeline, + pub(super) upgrade: Option>, pub(super) keep_alive: Duration, pub(super) client_timeout: Duration, pub(super) client_disconnect: Seconds, pub(super) h2config: h2::Config, pub(super) ka_enabled: bool, pub(super) timer: DateService, - pub(super) on_request: Option>, + pub(super) on_request: Option>, } impl DispatcherConfig { diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 20a6b2d0..e488faf1 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; use std::{cell::RefCell, error::Error, future::Future, io, marker, pin::Pin, rc::Rc}; use crate::io::{Filter, Io, IoBoxed, IoRef, IoStatusUpdate, RecvError}; -use crate::service::{Container, ContainerCall, Service}; +use crate::service::{Pipeline, PipelineCall, Service}; use crate::util::{ready, Bytes}; use crate::http; @@ -78,10 +78,10 @@ pin_project_lite::pin_project! { where S: 'static, X: 'static { None, - Service { #[pin] fut: ContainerCall<'static, S, Request> }, - ServiceUpgrade { #[pin] fut: ContainerCall<'static, S, Request> }, - Expect { #[pin] fut: ContainerCall<'static, X, Request> }, - Filter { fut: ContainerCall<'static, OnRequest, (Request, IoRef)> } + Service { #[pin] fut: PipelineCall<'static, S, Request> }, + ServiceUpgrade { #[pin] fut: PipelineCall<'static, S, Request> }, + Expect { #[pin] fut: PipelineCall<'static, X, Request> }, + Filter { fut: PipelineCall<'static, OnRequest, (Request, IoRef)> } } } @@ -479,21 +479,21 @@ where fn service_call(&self, req: Request) -> CallState { // Handle normal requests CallState::Service { - fut: self.config.service.container_call(req).into_static(), + fut: self.config.service.call(req).into_static(), } } - fn service_filter(&self, req: Request, f: &Container) -> CallState { + fn service_filter(&self, req: Request, f: &Pipeline) -> CallState { // Handle filter fut CallState::Filter { - fut: f.container_call((req, self.io.get_ref())).into_static(), + fut: f.call((req, self.io.get_ref())).into_static(), } } fn service_expect(&self, req: Request) -> CallState { // Handle normal requests with EXPECT: 100-Continue` header CallState::Expect { - fut: self.config.expect.container_call(req).into_static(), + fut: self.config.expect.call(req).into_static(), } } @@ -506,7 +506,7 @@ where ))); // Handle upgrade requests CallState::ServiceUpgrade { - fut: self.config.service.container_call(req).into_static(), + fut: self.config.service.call(req).into_static(), } } diff --git a/ntex/src/http/h1/service.rs b/ntex/src/http/h1/service.rs index 4bdafc0c..64f09700 100644 --- a/ntex/src/http/h1/service.rs +++ b/ntex/src/http/h1/service.rs @@ -55,7 +55,7 @@ mod openssl { use tls_openssl::ssl::SslAcceptor; use super::*; - use crate::{io::Layer, server::SslError, service::pipeline_factory}; + use crate::{io::Layer, server::SslError}; impl H1Service, S, B, X, U> where @@ -83,13 +83,12 @@ mod openssl { Error = SslError, InitError = (), > { - pipeline_factory( - Acceptor::new(acceptor) - .timeout(self.handshake_timeout) - .map_err(SslError::Ssl) - .map_init_err(|_| panic!()), - ) - .and_then(self.map_err(SslError::Service)) + Acceptor::new(acceptor) + .timeout(self.handshake_timeout) + .chain() + .map_err(SslError::Ssl) + .map_init_err(|_| panic!()) + .and_then(self.chain().map_err(SslError::Service)) } } } @@ -102,7 +101,7 @@ mod rustls { use tls_rustls::ServerConfig; use super::*; - use crate::{io::Layer, server::SslError, service::pipeline_factory}; + use crate::{io::Layer, server::SslError}; impl H1Service, S, B, X, U> where @@ -130,13 +129,12 @@ mod rustls { Error = SslError, InitError = (), > { - pipeline_factory( - Acceptor::from(config) - .timeout(self.handshake_timeout) - .map_err(|e| SslError::Ssl(Box::new(e))) - .map_init_err(|_| panic!()), - ) - .and_then(self.map_err(SslError::Service)) + Acceptor::from(config) + .timeout(self.handshake_timeout) + .chain() + .map_err(|e| SslError::Ssl(Box::new(e))) + .map_init_err(|_| panic!()) + .and_then(self.chain().map_err(SslError::Service)) } } } diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index 6ad57df5..8e8befba 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -47,7 +47,7 @@ mod openssl { use ntex_tls::openssl::{Acceptor, SslFilter}; use tls_openssl::ssl::SslAcceptor; - use crate::{io::Layer, server::SslError, service::pipeline_factory}; + use crate::{io::Layer, server::SslError}; use super::*; @@ -69,13 +69,12 @@ mod openssl { Error = SslError, InitError = S::InitError, > { - pipeline_factory( - Acceptor::new(acceptor) - .timeout(self.cfg.0.ssl_handshake_timeout) - .map_err(SslError::Ssl) - .map_init_err(|_| panic!()), - ) - .and_then(self.map_err(SslError::Service)) + Acceptor::new(acceptor) + .timeout(self.cfg.0.ssl_handshake_timeout) + .chain() + .map_err(SslError::Ssl) + .map_init_err(|_| panic!()) + .and_then(self.chain().map_err(SslError::Service)) } } } @@ -86,7 +85,7 @@ mod rustls { use tls_rustls::ServerConfig; use super::*; - use crate::{io::Layer, server::SslError, service::pipeline_factory}; + use crate::{io::Layer, server::SslError}; impl H2Service, S, B> where @@ -109,13 +108,12 @@ mod rustls { let protos = vec!["h2".to_string().into()]; config.alpn_protocols = protos; - pipeline_factory( - Acceptor::from(config) - .timeout(self.cfg.0.ssl_handshake_timeout) - .map_err(|e| SslError::Ssl(Box::new(e))) - .map_init_err(|_| panic!()), - ) - .and_then(self.map_err(SslError::Service)) + Acceptor::from(config) + .timeout(self.cfg.0.ssl_handshake_timeout) + .chain() + .map_err(|e| SslError::Ssl(Box::new(e))) + .map_init_err(|_| panic!()) + .and_then(self.chain().map_err(SslError::Service)) } } } diff --git a/ntex/src/http/service.rs b/ntex/src/http/service.rs index 98103293..9a81ac66 100644 --- a/ntex/src/http/service.rs +++ b/ntex/src/http/service.rs @@ -146,7 +146,7 @@ mod openssl { use tls_openssl::ssl::SslAcceptor; use super::*; - use crate::{io::Layer, server::SslError, service::pipeline_factory}; + use crate::{io::Layer, server::SslError}; impl HttpService, S, B, X, U> where @@ -174,13 +174,12 @@ mod openssl { Error = SslError, InitError = (), > { - pipeline_factory( - Acceptor::new(acceptor) - .timeout(self.cfg.0.ssl_handshake_timeout) - .map_err(SslError::Ssl) - .map_init_err(|_| panic!()), - ) - .and_then(self.map_err(SslError::Service)) + Acceptor::new(acceptor) + .timeout(self.cfg.0.ssl_handshake_timeout) + .chain() + .map_err(SslError::Ssl) + .map_init_err(|_| panic!()) + .and_then(self.chain().map_err(SslError::Service)) } } } @@ -191,7 +190,7 @@ mod rustls { use tls_rustls::ServerConfig; use super::*; - use crate::{io::Layer, server::SslError, service::pipeline_factory}; + use crate::{io::Layer, server::SslError}; impl HttpService, S, B, X, U> where @@ -222,13 +221,12 @@ mod rustls { let protos = vec!["h2".to_string().into(), "http/1.1".to_string().into()]; config.alpn_protocols = protos; - pipeline_factory( - Acceptor::from(config) - .timeout(self.cfg.0.ssl_handshake_timeout) - .map_err(|e| SslError::Ssl(Box::new(e))) - .map_init_err(|_| panic!()), - ) - .and_then(self.map_err(SslError::Service)) + Acceptor::from(config) + .timeout(self.cfg.0.ssl_handshake_timeout) + .chain() + .map_err(|e| SslError::Ssl(Box::new(e))) + .map_init_err(|_| panic!()) + .and_then(self.chain().map_err(SslError::Service)) } } } diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index d67806db..a6aa6ced 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -38,8 +38,8 @@ pub mod web; pub mod ws; pub use self::service::{ - fn_service, into_service, pipeline, pipeline_factory, Container, IntoService, - IntoServiceFactory, Middleware, Service, ServiceCall, ServiceCtx, ServiceFactory, + chain, chain_factory, fn_service, into_service, IntoService, IntoServiceFactory, + Middleware, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory, }; pub use ntex_util::{channel, task}; diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index f18ab90b..c5bf440a 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -5,7 +5,7 @@ use async_channel::{unbounded, Receiver, Sender}; use async_oneshot as oneshot; use crate::rt::{spawn, Arbiter}; -use crate::service::Container; +use crate::service::Pipeline; use crate::time::{sleep, Millis, Sleep}; use crate::util::{ join_all, ready, select, stream_recv, BoxFuture, Either, Stream as FutStream, @@ -138,12 +138,12 @@ pub(super) struct Worker { struct WorkerService { factory: usize, status: WorkerServiceStatus, - service: Container, + service: Pipeline, } impl WorkerService { fn created(&mut self, service: BoxedServerService) { - self.service = Container::new(service); + self.service = Pipeline::new(service); self.status = WorkerServiceStatus::Unavailable; } } diff --git a/ntex/src/web/app.rs b/ntex/src/web/app.rs index bc14c4cc..580a7e10 100644 --- a/ntex/src/web/app.rs +++ b/ntex/src/web/app.rs @@ -3,7 +3,9 @@ use std::{cell::RefCell, fmt, future::Future, marker::PhantomData, rc::Rc}; use crate::http::Request; use crate::router::ResourceDef; use crate::service::boxed::{self, BoxServiceFactory}; -use crate::service::{map_config, pipeline_factory, IntoServiceFactory, PipelineFactory}; +use crate::service::{ + chain_factory, dev::ServiceChainFactory, map_config, IntoServiceFactory, +}; use crate::service::{Identity, Middleware, Service, ServiceCtx, ServiceFactory, Stack}; use crate::util::{BoxFuture, Extensions, Ready}; @@ -24,7 +26,7 @@ type FnStateFactory = Box BoxFuture<'static, Result { middleware: M, - filter: PipelineFactory, F>, + filter: ServiceChainFactory>, services: Vec>>, default: Option>>, external: Vec, @@ -39,7 +41,7 @@ impl App, DefaultError> { pub fn new() -> Self { App { middleware: Identity, - filter: pipeline_factory(Filter::new()), + filter: chain_factory(Filter::new()), state_factories: Vec::new(), services: Vec::new(), default: None, @@ -56,7 +58,7 @@ impl App, Err> { pub fn with(err: Err) -> Self { App { middleware: Identity, - filter: pipeline_factory(Filter::new()), + filter: chain_factory(Filter::new()), state_factories: Vec::new(), services: Vec::new(), default: None, @@ -267,9 +269,9 @@ where U::InitError: fmt::Debug, { // create and configure default resource - self.default = Some(Rc::new(boxed::factory(f.into_factory().map_init_err( - |e| log::error!("Cannot construct default service: {:?}", e), - )))); + self.default = Some(Rc::new(boxed::factory(f.chain().map_init_err(|e| { + log::error!("Cannot construct default service: {:?}", e) + })))); self } @@ -608,7 +610,7 @@ mod tests { let srv = App::new() .service(web::resource("/test").to(|| async { HttpResponse::Ok() })) .finish() - .container(()) + .pipeline(()) .await .unwrap(); let req = TestRequest::with_uri("/test").to_request(); @@ -632,7 +634,7 @@ mod tests { Ok(r.into_response(HttpResponse::MethodNotAllowed())) }) .with_config(Default::default()) - .container(()) + .pipeline(()) .await .unwrap(); diff --git a/ntex/src/web/app_service.rs b/ntex/src/web/app_service.rs index 84895dc9..3b7e3bb4 100644 --- a/ntex/src/web/app_service.rs +++ b/ntex/src/web/app_service.rs @@ -4,9 +4,9 @@ use std::{cell::RefCell, future::Future, marker::PhantomData, pin::Pin, rc::Rc}; use crate::http::{Request, Response}; use crate::router::{Path, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; +use crate::service::dev::ServiceChainFactory; use crate::service::{ - fn_service, Middleware, PipelineFactory, Service, ServiceCall, ServiceCtx, - ServiceFactory, + fn_service, Middleware, Service, ServiceCall, ServiceCtx, ServiceFactory, }; use crate::util::{BoxFuture, Either, Extensions}; @@ -41,7 +41,7 @@ where Err: ErrorRenderer, { pub(super) middleware: Rc, - pub(super) filter: PipelineFactory, F>, + pub(super) filter: ServiceChainFactory>, pub(super) extensions: RefCell>, pub(super) state_factories: Rc>, pub(super) services: Rc>>>>, diff --git a/ntex/src/web/middleware/defaultheaders.rs b/ntex/src/web/middleware/defaultheaders.rs index f5846173..8a44f613 100644 --- a/ntex/src/web/middleware/defaultheaders.rs +++ b/ntex/src/web/middleware/defaultheaders.rs @@ -145,7 +145,7 @@ where mod tests { use super::*; use crate::http::header::CONTENT_TYPE; - use crate::service::{Container, IntoService}; + use crate::service::{IntoService, Pipeline}; use crate::util::lazy; use crate::web::request::WebRequest; use crate::web::test::{ok_service, TestRequest}; @@ -153,7 +153,7 @@ mod tests { #[crate::rt_test] async fn test_default_headers() { - let mw = Container::new( + let mw = Pipeline::new( DefaultHeaders::new() .header(CONTENT_TYPE, "0001") .create(ok_service()), @@ -172,7 +172,7 @@ mod tests { req.into_response(HttpResponse::Ok().header(CONTENT_TYPE, "0002").finish()), ) }; - let mw = Container::new( + let mw = Pipeline::new( DefaultHeaders::new() .header(CONTENT_TYPE, "0001") .create(srv.into_service()), @@ -186,7 +186,7 @@ mod tests { let srv = |req: WebRequest| async move { Ok::<_, Error>(req.into_response(HttpResponse::Ok().finish())) }; - let mw = Container::new( + let mw = Pipeline::new( DefaultHeaders::new() .content_type() .create(srv.into_service()), diff --git a/ntex/src/web/middleware/logger.rs b/ntex/src/web/middleware/logger.rs index 4eb2ef2a..f0365520 100644 --- a/ntex/src/web/middleware/logger.rs +++ b/ntex/src/web/middleware/logger.rs @@ -452,7 +452,7 @@ impl<'a> fmt::Display for FormatDisplay<'a> { mod tests { use super::*; use crate::http::{header, StatusCode}; - use crate::service::{Container, IntoService, Middleware}; + use crate::service::{IntoService, Middleware, Pipeline}; use crate::util::lazy; use crate::web::test::{self, TestRequest}; use crate::web::{DefaultError, Error}; @@ -472,7 +472,7 @@ mod tests { let logger = Logger::new("%% %{User-Agent}i %{X-Test}o %{HOME}e %D %% test") .exclude("/test"); - let srv = Container::new(Middleware::create(&logger, srv.into_service())); + let srv = Pipeline::new(Middleware::create(&logger, srv.into_service())); assert!(lazy(|cx| srv.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| srv.poll_shutdown(cx).is_ready()).await); diff --git a/ntex/src/web/resource.rs b/ntex/src/web/resource.rs index 4521e0d3..d2131b7a 100644 --- a/ntex/src/web/resource.rs +++ b/ntex/src/web/resource.rs @@ -3,9 +3,8 @@ use std::{cell::RefCell, fmt, rc::Rc}; use crate::http::Response; use crate::router::{IntoPattern, ResourceDef}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; -use crate::service::{ - dev::AndThen, pipeline, pipeline_factory, Pipeline, PipelineFactory, ServiceCtx, -}; +use crate::service::dev::{AndThen, ServiceChain, ServiceChainFactory}; +use crate::service::{chain_factory, ServiceCtx}; use crate::service::{ Identity, IntoServiceFactory, Middleware, Service, ServiceCall, ServiceFactory, Stack, }; @@ -23,7 +22,8 @@ type HttpService = BoxService, WebResponse, Err::Container>; type HttpNewService = BoxServiceFactory<(), WebRequest, WebResponse, Err::Container, ()>; -type ResourcePipeline = Pipeline, AndThen>>; +type ResourcePipeline = + ServiceChain>, WebRequest>; type BoxResponse<'a, Err: ErrorRenderer> = ServiceCall<'a, HttpService, WebRequest>; @@ -51,7 +51,7 @@ type BoxResponse<'a, Err: ErrorRenderer> = /// Default behavior could be overriden with `default_resource()` method. pub struct Resource> { middleware: M, - filter: PipelineFactory, T>, + filter: ServiceChainFactory>, rdef: Vec, name: Option, routes: Vec>, @@ -68,7 +68,7 @@ impl Resource { name: None, state: None, middleware: Identity, - filter: pipeline_factory(Filter::new()), + filter: chain_factory(Filter::new()), guards: Vec::new(), default: Rc::new(RefCell::new(None)), } @@ -302,7 +302,7 @@ where { // create and configure default resource self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory( - f.into_factory() + f.chain() .map_init_err(|e| log::error!("Cannot construct default service: {:?}", e)), ))))); @@ -366,7 +366,7 @@ where impl IntoServiceFactory< - ResourceServiceFactory, F>>, + ResourceServiceFactory>>, WebRequest, > for Resource where @@ -382,7 +382,7 @@ where { fn into_factory( self, - ) -> ResourceServiceFactory, F>> { + ) -> ResourceServiceFactory>> { let router_factory = ResourceRouterFactory { state: None, routes: self.routes, @@ -426,7 +426,7 @@ where Box::pin(async move { let filter = self.filter.create(()).await?; let routing = self.routing.create(()).await?; - Ok(self.middleware.create(pipeline(filter).and_then(routing))) + Ok(self.middleware.create(filter.chain().and_then(routing))) }) } } diff --git a/ntex/src/web/scope.rs b/ntex/src/web/scope.rs index 24033702..dbf213ad 100644 --- a/ntex/src/web/scope.rs +++ b/ntex/src/web/scope.rs @@ -5,7 +5,7 @@ use std::{ use crate::http::Response; use crate::router::{IntoPattern, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; -use crate::service::{pipeline_factory, IntoServiceFactory, PipelineFactory}; +use crate::service::{chain_factory, dev::ServiceChainFactory, IntoServiceFactory}; use crate::service::{ Identity, Middleware, Service, ServiceCall, ServiceCtx, ServiceFactory, Stack, }; @@ -62,7 +62,7 @@ type BoxResponse<'a, Err: ErrorRenderer> = /// pub struct Scope> { middleware: M, - filter: PipelineFactory, T>, + filter: ServiceChainFactory>, rdef: Vec, state: Option, services: Vec>>, @@ -77,7 +77,7 @@ impl Scope { pub fn new(path: T) -> Scope { Scope { middleware: Identity, - filter: pipeline_factory(Filter::new()), + filter: chain_factory(Filter::new()), rdef: path.patterns(), state: None, guards: Vec::new(), @@ -288,7 +288,7 @@ where { // create and configure default resource self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory( - f.into_factory() + f.chain() .map_init_err(|e| log::error!("Cannot construct default service: {:?}", e)), ))))); diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 769abb24..17582cec 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -13,7 +13,7 @@ use crate::http::test::TestRequest as HttpTestRequest; use crate::http::{HttpService, Method, Payload, Request, StatusCode, Uri, Version}; use crate::router::{Path, ResourceDef}; use crate::service::{ - map_config, Container, IntoService, IntoServiceFactory, Service, ServiceFactory, + map_config, IntoService, IntoServiceFactory, Pipeline, Service, ServiceFactory, }; use crate::time::{sleep, Millis, Seconds}; use crate::util::{stream_recv, Bytes, BytesMut, Extensions, Ready, Stream}; @@ -69,14 +69,14 @@ pub fn default_service( /// ``` pub async fn init_service( app: R, -) -> Container> +) -> Pipeline> where R: IntoServiceFactory, S: ServiceFactory, S::InitError: std::fmt::Debug, { let srv = app.into_factory(); - srv.container(AppConfig::default()).await.unwrap() + srv.pipeline(AppConfig::default()).await.unwrap() } /// Calls service and waits for response future completion. @@ -102,7 +102,7 @@ where /// assert_eq!(resp.status(), StatusCode::OK); /// } /// ``` -pub async fn call_service(app: &Container, req: R) -> S::Response +pub async fn call_service(app: &Pipeline, req: R) -> S::Response where S: Service, E: std::fmt::Debug, @@ -135,7 +135,7 @@ where /// assert_eq!(result, Bytes::from_static(b"welcome!")); /// } /// ``` -pub async fn read_response(app: &Container, req: Request) -> Bytes +pub async fn read_response(app: &Pipeline, req: Request) -> Bytes where S: Service, { @@ -234,7 +234,7 @@ where /// let result: Person = test::read_response_json(&mut app, req).await; /// } /// ``` -pub async fn read_response_json(app: &Container, req: Request) -> T +pub async fn read_response_json(app: &Pipeline, req: Request) -> T where S: Service, T: DeserializeOwned, diff --git a/ntex/src/web/ws.rs b/ntex/src/web/ws.rs index 7921e310..1d48fad9 100644 --- a/ntex/src/web/ws.rs +++ b/ntex/src/web/ws.rs @@ -19,7 +19,7 @@ where F: IntoServiceFactory, Err: From + From, { - let inner_factory = Rc::new(factory.into_factory().map_err(WsError::Service)); + let inner_factory = Rc::new(factory.chain().map_err(WsError::Service)); let factory = fn_factory_with_config(move |sink: WsSink| { let factory = inner_factory.clone(); diff --git a/ntex/src/ws/client.rs b/ntex/src/ws/client.rs index a33b6d78..e87be13a 100644 --- a/ntex/src/ws/client.rs +++ b/ntex/src/ws/client.rs @@ -16,7 +16,7 @@ use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue, AUTHORIZATIO use crate::http::{body::BodySize, client::ClientResponse, error::HttpError, h1}; use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri}; use crate::io::{Base, DispatchItem, Dispatcher, Filter, Io, Layer, Sealed}; -use crate::service::{apply_fn, into_service, Container, IntoService, Service}; +use crate::service::{apply_fn, into_service, IntoService, Pipeline, Service}; use crate::time::{timeout, Millis, Seconds}; use crate::{channel::mpsc, rt, util::Ready, ws}; @@ -25,7 +25,7 @@ use super::transport::WsTransport; /// `WebSocket` client builder pub struct WsClient { - connector: Container, + connector: Pipeline, head: Rc, addr: Option, max_size: usize, @@ -754,7 +754,7 @@ impl WsConnection { U: IntoService, { let service = apply_fn( - service.into_service().map_err(WsError::Service), + service.into_chain().map_err(WsError::Service), |req, svc| async move { match req { DispatchItem::::Item(item) => svc.call(item).await, diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index 729436ba..394352c7 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -3,7 +3,7 @@ use std::{io, rc::Rc, sync::Arc}; use ntex::codec::BytesCodec; use ntex::connect::Connect; use ntex::io::{types::PeerAddr, Io}; -use ntex::service::{fn_service, pipeline_factory, Container, ServiceFactory}; +use ntex::service::{chain_factory, fn_service, Pipeline, ServiceFactory}; use ntex::{server::test_server, time, util::Bytes}; #[cfg(feature = "openssl")] @@ -79,7 +79,7 @@ async fn test_openssl_string() { }; let srv = test_server(|| { - pipeline_factory(fn_service(|io: Io<_>| async move { + chain_factory(fn_service(|io: Io<_>| async move { let res = io.read_ready().await; assert!(res.is_ok()); Ok(io) @@ -97,7 +97,7 @@ async fn test_openssl_string() { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); - let conn = Container::new(ntex::connect::openssl::Connector::new(builder.build())); + let conn = Pipeline::new(ntex::connect::openssl::Connector::new(builder.build())); let addr = format!("127.0.0.1:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); @@ -122,7 +122,7 @@ async fn test_openssl_read_before_error() { use tls_openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; let srv = test_server(|| { - pipeline_factory(fn_service(|io: Io<_>| async move { + chain_factory(fn_service(|io: Io<_>| async move { let res = io.read_ready().await; assert!(res.is_ok()); Ok(io) @@ -140,7 +140,7 @@ async fn test_openssl_read_before_error() { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); - let conn = Container::new(ntex::connect::openssl::Connector::new(builder.build())); + let conn = Pipeline::new(ntex::connect::openssl::Connector::new(builder.build())); let addr = format!("127.0.0.1:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); let item = io.recv(&Rc::new(BytesCodec)).await.unwrap().unwrap(); @@ -163,7 +163,7 @@ async fn test_rustls_string() { use tls_rustls::{Certificate, ClientConfig}; let srv = test_server(|| { - pipeline_factory(fn_service(|io: Io<_>| async move { + chain_factory(fn_service(|io: Io<_>| async move { let res = io.read_ready().await; assert!(res.is_ok()); Ok(io) @@ -185,7 +185,7 @@ async fn test_rustls_string() { .with_custom_certificate_verifier(Arc::new(danger::NoCertificateVerification {})) .with_no_client_auth(); - let conn = Container::new(ntex::connect::rustls::Connector::new(config)); + let conn = Pipeline::new(ntex::connect::rustls::Connector::new(config)); let addr = format!("localhost:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); @@ -225,13 +225,13 @@ async fn test_static_str() { }) }); - let conn = Container::new(ntex::connect::Connector::new()); + let conn = Pipeline::new(ntex::connect::Connector::new()); let io = conn.call(Connect::with("10", srv.addr())).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); let connect = Connect::new("127.0.0.1".to_owned()); - let conn = Container::new(ntex::connect::Connector::new()); + let conn = Pipeline::new(ntex::connect::Connector::new()); let io = conn.call(connect).await; assert!(io.is_err()); } @@ -248,7 +248,7 @@ async fn test_create() { }); let factory = ntex::connect::Connector::new(); - let conn = factory.container(()).await.unwrap(); + let conn = factory.pipeline(()).await.unwrap(); let io = conn.call(Connect::with("10", srv.addr())).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); } @@ -265,7 +265,7 @@ async fn test_uri() { }) }); - let conn = Container::new(ntex::connect::Connector::default()); + let conn = Pipeline::new(ntex::connect::Connector::default()); let addr = ntex::http::Uri::try_from(format!("https://localhost:{}", srv.addr().port())) .unwrap(); @@ -285,7 +285,7 @@ async fn test_rustls_uri() { }) }); - let conn = Container::new(ntex::connect::Connector::default()); + let conn = Pipeline::new(ntex::connect::Connector::default()); let addr = ntex::http::Uri::try_from(format!("https://localhost:{}", srv.addr().port())) .unwrap(); diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index 65240e7d..cc99b1ed 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -13,7 +13,7 @@ use ntex::http::client::error::{JsonPayloadError, SendRequestError}; use ntex::http::client::{Client, Connector}; use ntex::http::test::server as test_server; use ntex::http::{header, HttpMessage, HttpService, Method}; -use ntex::service::{map_config, pipeline_factory}; +use ntex::service::{chain_factory, map_config}; use ntex::web::dev::AppConfig; use ntex::web::middleware::Compress; use ntex::web::{self, test, App, BodyEncoding, Error, HttpRequest, HttpResponse}; @@ -208,7 +208,7 @@ async fn test_connection_reuse() { let srv = test_server(move || { let num2 = num2.clone(); - pipeline_factory(move |io| { + chain_factory(move |io| { num2.fetch_add(1, Ordering::Relaxed); Ready::Ok(io) }) @@ -243,7 +243,7 @@ async fn test_connection_force_close() { let srv = test_server(move || { let num2 = num2.clone(); - pipeline_factory(move |io| { + chain_factory(move |io| { num2.fetch_add(1, Ordering::Relaxed); Ready::Ok(io) }) @@ -279,7 +279,7 @@ async fn test_connection_server_close() { let srv = test_server(move || { let num2 = num2.clone(); - pipeline_factory(move |io| { + chain_factory(move |io| { num2.fetch_add(1, Ordering::Relaxed); Ready::Ok(io) }) @@ -314,7 +314,7 @@ async fn test_connection_wait_queue() { let srv = test_server(move || { let num2 = num2.clone(); - pipeline_factory(move |io| { + chain_factory(move |io| { num2.fetch_add(1, Ordering::Relaxed); Ready::Ok(io) }) @@ -360,7 +360,7 @@ async fn test_connection_wait_queue_force_close() { let srv = test_server(move || { let num2 = num2.clone(); - pipeline_factory(move |io| { + chain_factory(move |io| { num2.fetch_add(1, Ordering::Relaxed); Ready::Ok(io) }) diff --git a/ntex/tests/http_awc_openssl_client.rs b/ntex/tests/http_awc_openssl_client.rs index d4db28f2..0b8cb037 100644 --- a/ntex/tests/http_awc_openssl_client.rs +++ b/ntex/tests/http_awc_openssl_client.rs @@ -8,7 +8,7 @@ use tls_openssl::ssl::{ use ntex::http::client::{Client, Connector}; use ntex::http::test::server as test_server; use ntex::http::{HttpService, Version}; -use ntex::service::{map_config, pipeline_factory, ServiceFactory}; +use ntex::service::{chain_factory, map_config, ServiceFactory}; use ntex::web::{self, dev::AppConfig, App, HttpResponse}; use ntex::{time::Seconds, util::Ready}; @@ -40,7 +40,7 @@ async fn test_connection_reuse_h2() { let srv = test_server(move || { let num2 = num2.clone(); - pipeline_factory(move |io| { + chain_factory(move |io| { num2.fetch_add(1, Ordering::Relaxed); Ready::Ok(io) }) diff --git a/ntex/tests/http_awc_rustls_client.rs b/ntex/tests/http_awc_rustls_client.rs index 26a8d6c5..e42fcad3 100644 --- a/ntex/tests/http_awc_rustls_client.rs +++ b/ntex/tests/http_awc_rustls_client.rs @@ -8,7 +8,7 @@ use tls_rustls::ClientConfig; use ntex::http::client::{Client, Connector}; use ntex::http::test::server as test_server; use ntex::http::HttpService; -use ntex::service::{map_config, pipeline_factory, ServiceFactory}; +use ntex::service::{chain_factory, map_config, ServiceFactory}; use ntex::util::Ready; use ntex::web::{self, dev::AppConfig, App, HttpResponse}; @@ -62,7 +62,7 @@ async fn test_connection_reuse_h2() { let srv = test_server(move || { let num2 = num2.clone(); - pipeline_factory(move |io| { + chain_factory(move |io| { num2.fetch_add(1, Ordering::Relaxed); Ready::Ok(io) })