PipelineCall is static (#215)

* PipelineCall is static

* Fix static lifetimes req
This commit is contained in:
Nikolay Kim 2023-06-23 23:11:16 +06:00 committed by GitHub
parent 1fd6311a5d
commit 4380b3a155
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 43 additions and 51 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.3.1] - 2023-06-23
* `PipelineCall` is static
## [0.3.0] - 2023-06-22 ## [0.3.0] - 2023-06-22
* Release v0.3.0 * Release v0.3.0

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.3.0" version = "0.3.1"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -19,7 +19,7 @@ path = "src/lib.rs"
ntex-codec = "0.6.2" ntex-codec = "0.6.2"
ntex-bytes = "0.1.19" ntex-bytes = "0.1.19"
ntex-util = "0.3.0" ntex-util = "0.3.0"
ntex-service = "1.2.0" ntex-service = "1.2.1"
bitflags = "1.3" bitflags = "1.3"
log = "0.4" log = "0.4"

View file

@ -250,7 +250,7 @@ where
// call service // call service
let shared = slf.shared.clone(); let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1); shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item).into_static(); let fut = shared.service.call(item);
spawn(async move { spawn(async move {
let result = fut.await; let result = fut.await;
shared.handle_result(result, &shared.io); shared.handle_result(result, &shared.io);
@ -276,7 +276,7 @@ where
// call service // call service
let shared = slf.shared.clone(); let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1); shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item).into_static(); let fut = shared.service.call(item);
spawn(async move { spawn(async move {
let result = fut.await; let result = fut.await;
shared.handle_result(result, &shared.io); shared.handle_result(result, &shared.io);

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [1.2.1] - 2023-06-23
* Make `PipelineCall` static
## [1.2.0] - 2023-06-22 ## [1.2.0] - 2023-06-22
* Rename Container to Pipeline * Rename Container to Pipeline

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-service" name = "ntex-service"
version = "1.2.0" version = "1.2.1"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service" description = "ntex service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -85,7 +85,7 @@ mod tests {
let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone()); let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone());
let res = pipe.call(()).await; let res = pipe.service_call(()).await;
assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), "pipe"); assert_eq!(res.unwrap(), "pipe");

View file

@ -69,9 +69,10 @@ impl<S> Pipeline<S> {
/// Call service and create future object that resolves to service result. /// Call service and create future object that resolves to service result.
/// ///
/// Note, this call does not check service readiness. /// Note, this call does not check service readiness.
pub fn call<R>(&self, req: R) -> PipelineCall<'_, S, R> pub fn call<R>(&self, req: R) -> PipelineCall<S, R>
where where
S: Service<R>, S: Service<R> + 'static,
R: 'static,
{ {
let pipeline = self.clone(); let pipeline = self.clone();
let svc_call = pipeline.svc.call(req, ServiceCtx::new(&pipeline.waiters)); let svc_call = pipeline.svc.call(req, ServiceCtx::new(&pipeline.waiters));
@ -111,41 +112,19 @@ impl<S> Clone for Pipeline<S> {
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct PipelineCall<'f, S, R> pub struct PipelineCall<S, R>
where where
S: Service<R>, S: Service<R>,
S: 'f, S: 'static,
R: 'f, R: 'static,
{ {
#[pin] #[pin]
fut: S::Future<'f>, fut: S::Future<'static>,
pipeline: Pipeline<S>, pipeline: Pipeline<S>,
} }
} }
impl<'f, S, R> PipelineCall<'f, S, R> impl<S, R> future::Future for PipelineCall<S, R>
where
S: Service<R> + 'f,
R: 'f,
{
#[inline]
/// Convert future object to static version.
///
/// Returned future is suitable for spawning into a async runtime.
/// Note, this call does not check service readiness.
pub fn into_static(self) -> PipelineCall<'static, S, R> {
let svc_call = self.fut;
let pipeline = self.pipeline;
// SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
PipelineCall { fut, pipeline }
}
}
impl<'f, S, R> future::Future for PipelineCall<'f, S, R>
where where
S: Service<R>, S: Service<R>,
{ {

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.7.1] - 2023-06-23
* `PipelineCall` is static
## [0.7.0] - 2023-06-22 ## [0.7.0] - 2023-06-22
* Release v0.7.0 * Release v0.7.0

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.7.0" version = "0.7.2"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -52,13 +52,13 @@ ntex-codec = "0.6.2"
ntex-connect = "0.3.0" ntex-connect = "0.3.0"
ntex-http = "0.1.9" ntex-http = "0.1.9"
ntex-router = "0.5.1" ntex-router = "0.5.1"
ntex-service = "1.2.0" ntex-service = "1.2.1"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "0.3.0" ntex-util = "0.3.0"
ntex-bytes = "0.1.19" ntex-bytes = "0.1.19"
ntex-h2 = "0.3.0" ntex-h2 = "0.3.2"
ntex-rt = "0.4.9" ntex-rt = "0.4.9"
ntex-io = "0.3.0" ntex-io = "0.3.1"
ntex-tls = "0.3.0" ntex-tls = "0.3.0"
ntex-tokio = { version = "0.3.0", optional = true } ntex-tokio = { version = "0.3.0", optional = true }
ntex-glommio = { version = "0.3.0", optional = true } ntex-glommio = { version = "0.3.0", optional = true }

View file

@ -30,7 +30,8 @@ where
) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>> { ) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>> {
Box::pin(async move { Box::pin(async move {
// connect to the host // connect to the host
let fut = self.0.call(ClientConnect { let pl = self.0.clone();
let fut = pl.service_call(ClientConnect {
uri: head.as_ref().uri.clone(), uri: head.as_ref().uri.clone(),
addr, addr,
}); });

View file

@ -78,10 +78,10 @@ pin_project_lite::pin_project! {
where S: 'static, X: 'static where S: 'static, X: 'static
{ {
None, None,
Service { #[pin] fut: PipelineCall<'static, S, Request> }, Service { #[pin] fut: PipelineCall<S, Request> },
ServiceUpgrade { #[pin] fut: PipelineCall<'static, S, Request> }, ServiceUpgrade { #[pin] fut: PipelineCall<S, Request> },
Expect { #[pin] fut: PipelineCall<'static, X, Request> }, Expect { #[pin] fut: PipelineCall<X, Request> },
Filter { fut: PipelineCall<'static, OnRequest, (Request, IoRef)> } Filter { fut: PipelineCall<OnRequest, (Request, IoRef)> }
} }
} }
@ -479,21 +479,21 @@ where
fn service_call(&self, req: Request) -> CallState<S, X> { fn service_call(&self, req: Request) -> CallState<S, X> {
// Handle normal requests // Handle normal requests
CallState::Service { CallState::Service {
fut: self.config.service.call(req).into_static(), fut: self.config.service.call(req),
} }
} }
fn service_filter(&self, req: Request, f: &Pipeline<OnRequest>) -> CallState<S, X> { fn service_filter(&self, req: Request, f: &Pipeline<OnRequest>) -> CallState<S, X> {
// Handle filter fut // Handle filter fut
CallState::Filter { CallState::Filter {
fut: f.call((req, self.io.get_ref())).into_static(), fut: f.call((req, self.io.get_ref())),
} }
} }
fn service_expect(&self, req: Request) -> CallState<S, X> { fn service_expect(&self, req: Request) -> CallState<S, X> {
// Handle normal requests with EXPECT: 100-Continue` header // Handle normal requests with EXPECT: 100-Continue` header
CallState::Expect { CallState::Expect {
fut: self.config.expect.call(req).into_static(), fut: self.config.expect.call(req),
} }
} }
@ -506,7 +506,7 @@ where
))); )));
// Handle upgrade requests // Handle upgrade requests
CallState::ServiceUpgrade { CallState::ServiceUpgrade {
fut: self.config.service.call(req).into_static(), fut: self.config.service.call(req),
} }
} }

View file

@ -107,7 +107,7 @@ where
S: Service<R, Response = WebResponse, Error = E>, S: Service<R, Response = WebResponse, Error = E>,
E: std::fmt::Debug, E: std::fmt::Debug,
{ {
app.call(req).await.unwrap() app.service_call(req).await.unwrap()
} }
/// Helper function that returns a response body of a TestRequest /// Helper function that returns a response body of a TestRequest
@ -140,7 +140,7 @@ where
S: Service<Request, Response = WebResponse>, S: Service<Request, Response = WebResponse>,
{ {
let mut resp = app let mut resp = app
.call(req) .service_call(req)
.await .await
.unwrap_or_else(|_| panic!("read_response failed at application call")); .unwrap_or_else(|_| panic!("read_response failed at application call"));

View file

@ -158,7 +158,7 @@ where
let msg = Connect::new(head.uri.clone()).set_addr(self.addr); let msg = Connect::new(head.uri.clone()).set_addr(self.addr);
log::trace!("Open ws connection to {:?} addr: {:?}", head.uri, self.addr); log::trace!("Open ws connection to {:?} addr: {:?}", head.uri, self.addr);
let io = self.connector.call(msg).await?; let io = self.connector.clone().service_call(msg).await?;
// create Framed and send request // create Framed and send request
let codec = h1::ClientCodec::default(); let codec = h1::ClientCodec::default();