diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 8f952ac8..a5eb9c1d 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -8,7 +8,7 @@ jobs: fail-fast: false matrix: version: - - 1.65.0 # MSRV + - 1.66.0 # MSRV - stable - nightly diff --git a/README.md b/README.md index 2c34a136..425903fb 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![build status](https://github.com/ntex-rs/ntex/workflows/CI%20%28Linux%29/badge.svg?branch=master&event=push)](https://github.com/ntex-rs/ntex/actions?query=workflow%3A"CI+(Linux)") [![crates.io](https://img.shields.io/crates/v/ntex.svg)](https://crates.io/crates/ntex) [![Documentation](https://img.shields.io/docsrs/ntex/latest)](https://docs.rs/ntex) -[![Version](https://img.shields.io/badge/rustc-1.65+-lightgray.svg)](https://blog.rust-lang.org/2022/11/03/Rust-1.65.0.html) +[![Version](https://img.shields.io/badge/rustc-1.66+-lightgray.svg)](https://blog.rust-lang.org/2022/12/15/Rust-1.66.0.html) ![License](https://img.shields.io/crates/l/ntex.svg) [![codecov](https://codecov.io/gh/ntex-rs/ntex/branch/master/graph/badge.svg)](https://codecov.io/gh/ntex-rs/ntex) [![Chat on Discord](https://img.shields.io/discord/919288597826387979?label=chat&logo=discord)](https://discord.gg/zBNyhVRz) @@ -29,7 +29,7 @@ Starting ntex v0.5 async runtime must be selected as a feature. Available option ```toml [dependencies] -ntex = { version = "0.6", features = ["glommio"] } +ntex = { version = "0.7", features = ["tokio"] } ``` ## Documentation & community resources diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 9d2dff86..d3d91a19 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.0-beta.3] - 2023-06-21 + +* Use static ContainerCall for dispatcher + ## [0.3.0-beta.0] - 2023-06-16 * Migrate to ntex-service 1.2 diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 15fc386d..3ac98f42 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.0-beta.1" +version = "0.3.0-beta.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -19,7 +19,7 @@ path = "src/lib.rs" ntex-codec = "0.6.2" ntex-bytes = "0.1.19" ntex-util = "0.3.0-beta.1" -ntex-service = "1.2.0-beta.1" +ntex-service = "1.2.0-beta.3" bitflags = "1.3" log = "0.4" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 2236a1e1..10d26148 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -250,8 +250,9 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); + let fut = shared.service.container_call(item).into_static(); spawn(async move { - let result = shared.service.call(item).await; + let result = fut.await; shared.handle_result(result, &shared.io); }); } @@ -275,8 +276,9 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); + let fut = shared.service.container_call(item).into_static(); spawn(async move { - let result = shared.service.call(item).await; + let result = fut.await; shared.handle_result(result, &shared.io); }); } diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 4a370f81..3eaa05a2 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [1.2.0-beta.3] - 2023-06-21 + +* Add custom ContainerCall future + +* Allow to turn ContainerCall to static + ## [1.2.0-beta.2] - 2023-06-19 * Remove Deref for Container diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 11185e9b..cfd4930c 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "1.2.0-beta.2" +version = "1.2.0-beta.3" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] @@ -20,5 +20,5 @@ pin-project-lite = "0.2.6" slab = "0.4" [dev-dependencies] -ntex = { version = "0.7.0-beta.0", features = ["tokio"] } -ntex-util = "0.3.0-beta.0" +ntex = { version = "0.7.0-beta.1", features = ["tokio"] } +ntex-util = "0.3.0-beta.1" diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 85c011aa..19151a13 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -1,7 +1,7 @@ #![allow(clippy::type_complexity)] -use std::{future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; +use std::{future::Future, marker, pin::Pin, task, task::Poll}; -use super::ctx::{ServiceCall, ServiceCtx, Waiters}; +use super::ctx::{Container, ServiceCtx}; use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; /// Apply transform function to a service. @@ -11,11 +11,15 @@ pub fn apply_fn( ) -> Apply where T: Service, - for<'r> F: Fn(In, ApplyService) -> R, + F: Fn(In, Container) -> R, R: Future>, U: IntoService, { - Apply::new(service.into_service(), f) + Apply { + f, + service: Container::new(service.into_service()), + r: marker::PhantomData, + } } /// Service factory that produces `apply_fn` service. @@ -25,7 +29,7 @@ pub fn apply_fn_factory( ) -> ApplyFactory where T: ServiceFactory, - F: Fn(In, ApplyService) -> R + Clone, + F: Fn(In, Container) -> R + Clone, R: Future>, U: IntoServiceFactory, { @@ -37,31 +41,15 @@ pub struct Apply where T: Service, { - service: Rc, + service: Container, f: F, r: marker::PhantomData (In, Out, R)>, } -impl Apply -where - T: Service, - F: Fn(In, ApplyService) -> R, - R: Future>, -{ - /// Create new `Apply` combinator - fn new(service: T, f: F) -> Self { - Self { - f, - service: Rc::new(service), - r: marker::PhantomData, - } - } -} - impl Clone for Apply where T: Service + Clone, - F: Fn(In, ApplyService) -> R + Clone, + F: Fn(In, Container) -> R + Clone, R: Future>, { fn clone(&self) -> Self { @@ -73,24 +61,10 @@ where } } -pub struct ApplyService { - svc: Rc, - waiters: Waiters, -} - -impl ApplyService { - pub fn call(&self, req: R) -> ServiceCall<'_, S, R> - where - S: Service, - { - ServiceCtx::::new(&self.waiters).call(&self.svc, req) - } -} - impl Service for Apply where T: Service, - F: Fn(In, ApplyService) -> R, + F: Fn(In, Container) -> R, R: Future>, { type Response = Out; @@ -101,12 +75,8 @@ where crate::forward_poll_shutdown!(service); #[inline] - fn call<'a>(&'a self, req: In, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - let svc = ApplyService { - svc: self.service.clone(), - waiters: ctx.waiters().clone(), - }; - (self.f)(req, svc) + fn call<'a>(&'a self, req: In, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { + (self.f)(req, self.service.clone()) } } @@ -114,7 +84,7 @@ where pub struct ApplyFactory where T: ServiceFactory, - F: Fn(In, ApplyService) -> R + Clone, + F: Fn(In, Container) -> R + Clone, R: Future>, { service: T, @@ -125,7 +95,7 @@ where impl ApplyFactory where T: ServiceFactory, - F: Fn(In, ApplyService) -> R + Clone, + F: Fn(In, Container) -> R + Clone, R: Future>, { /// Create new `ApplyNewService` new service instance @@ -142,7 +112,7 @@ impl Clone for ApplyFactory where T: ServiceFactory + Clone, - F: Fn(In, ApplyService) -> R + Clone, + F: Fn(In, Container) -> R + Clone, R: Future>, { fn clone(&self) -> Self { @@ -158,7 +128,7 @@ impl ServiceFactory for ApplyFactory where T: ServiceFactory, - F: Fn(In, ApplyService) -> R + Clone, + F: Fn(In, Container) -> R + Clone, for<'r> R: Future> + 'r, { type Response = Out; @@ -183,7 +153,7 @@ pin_project_lite::pin_project! { where T: ServiceFactory, T: 'f, - F: Fn(In, ApplyService) -> R, + F: Fn(In, Container) -> R, T::Service: 'f, R: Future>, Cfg: 'f, @@ -199,7 +169,7 @@ impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> where T: ServiceFactory, - F: Fn(In, ApplyService) -> R, + F: Fn(In, Container) -> R, R: Future>, { type Output = Result, T::InitError>; @@ -208,7 +178,11 @@ where let this = self.project(); if let Poll::Ready(svc) = this.fut.poll(cx)? { - Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap()))) + Poll::Ready(Ok(Apply { + service: svc.into(), + f: this.f.take().unwrap(), + r: marker::PhantomData, + })) } else { Poll::Pending } @@ -239,8 +213,8 @@ mod tests { #[ntex::test] async fn test_call() { let srv = pipeline( - apply_fn(Srv, |req: &'static str, srv| async move { - srv.call(()).await.unwrap(); + apply_fn(Srv, |req: &'static str, svc| async move { + svc.call(()).await.unwrap(); Ok((req, ())) }) .clone(), diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index f5027e0c..06c0199c 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -1,6 +1,6 @@ use std::{future::Future, pin::Pin, task::Context, task::Poll}; -use crate::ctx::{ServiceCtx, Waiters}; +use crate::ctx::{ServiceCtx, WaitersRef}; pub type BoxFuture<'a, I, E> = Pin> + 'a>>; @@ -43,7 +43,8 @@ trait ServiceObj { fn call<'a>( &'a self, req: Req, - waiters: &'a Waiters, + idx: usize, + waiters: &'a WaitersRef, ) -> BoxFuture<'a, Self::Response, Self::Error>; } @@ -69,9 +70,10 @@ where fn call<'a>( &'a self, req: Req, - waiters: &'a Waiters, + idx: usize, + waiters: &'a WaitersRef, ) -> BoxFuture<'a, Self::Response, Self::Error> { - Box::pin(ServiceCtx::<'a, S>::new(waiters).call_nowait(self, req)) + Box::pin(ServiceCtx::<'a, S>::new(idx, waiters).call_nowait(self, req)) } } @@ -132,7 +134,8 @@ where #[inline] fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { - self.0.call(req, ctx.waiters()) + let (idx, waiters) = ctx.inner(); + self.0.call(req, idx, waiters) } } diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 2bd0fac6..a265c12d 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -11,19 +11,35 @@ pub struct Container { } pub struct ServiceCtx<'a, S: ?Sized> { - waiters: &'a Waiters, + idx: usize, + waiters: &'a WaitersRef, _t: marker::PhantomData>, } +pub(crate) struct WaitersRef(UnsafeCell>>); + pub(crate) struct Waiters { index: usize, - waiters: Rc>>>, + waiters: Rc, } -impl Waiters { +impl WaitersRef { #[allow(clippy::mut_from_ref)] fn get(&self) -> &mut slab::Slab> { - unsafe { &mut *self.waiters.as_ref().get() } + unsafe { &mut *self.0.get() } + } + + fn insert(&self) -> usize { + self.get().insert(None) + } + + fn remove(&self, idx: usize) { + self.notify(); + self.get().remove(idx); + } + + fn register(&self, idx: usize, cx: &mut task::Context<'_>) { + self.get()[idx] = Some(cx.waker().clone()); } fn notify(&self) { @@ -33,16 +49,22 @@ impl Waiters { } } } +} +impl Waiters { fn register(&self, cx: &mut task::Context<'_>) { - self.get()[self.index] = Some(cx.waker().clone()); + self.waiters.register(self.index, cx) + } + + fn notify(&self) { + self.waiters.notify() } } impl Clone for Waiters { fn clone(&self) -> Self { Waiters { - index: self.get().insert(None), + index: self.waiters.insert(), waiters: self.waiters.clone(), } } @@ -51,8 +73,7 @@ impl Clone for Waiters { impl Drop for Waiters { #[inline] fn drop(&mut self) { - self.get().remove(self.index); - self.notify(); + self.waiters.remove(self.index); } } @@ -66,7 +87,7 @@ impl Container { svc: Rc::new(svc), waiters: Waiters { index, - waiters: Rc::new(UnsafeCell::new(waiters)), + waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))), }, } } @@ -101,18 +122,46 @@ impl Container { } #[inline] - /// Process the request and return the response asynchronously. + /// Call service and create future object that resolves to service result. + /// + /// Note, this call does not check service readiness. pub fn call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R> where S: Service, { let ctx = ServiceCtx::<'a, S> { - waiters: &self.waiters, + idx: self.waiters.index, + waiters: self.waiters.waiters.as_ref(), _t: marker::PhantomData, }; ctx.call(self.svc.as_ref(), req) } + #[inline] + /// Call service and create future object that resolves to service result. + /// + /// Note, this call does not check service readiness. + pub fn container_call(&self, req: R) -> ContainerCall<'_, S, R> + where + S: Service, + { + let container = self.clone(); + let svc_call = container.svc.call( + req, + ServiceCtx { + idx: container.waiters.index, + waiters: container.waiters.waiters.as_ref(), + _t: marker::PhantomData, + }, + ); + + // SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc` + // Container::svc is heap allocated(Rc), we keep it alive until + // `svc_call` get resolved to result + let fut = unsafe { std::mem::transmute(svc_call) }; + ContainerCall { fut, container } + } + pub(crate) fn create, R, C>( f: &F, cfg: C, @@ -128,6 +177,13 @@ impl Container { } } +impl From for Container { + #[inline] + fn from(svc: S) -> Self { + Container::new(svc) + } +} + impl Clone for Container { #[inline] fn clone(&self) -> Self { @@ -138,38 +194,17 @@ impl Clone for Container { } } -impl From for Container { - #[inline] - fn from(svc: S) -> Self { - Container::new(svc) - } -} - impl<'a, S: ?Sized> ServiceCtx<'a, S> { - pub(crate) fn new(waiters: &'a Waiters) -> Self { + pub(crate) fn new(idx: usize, waiters: &'a WaitersRef) -> Self { Self { + idx, waiters, _t: marker::PhantomData, } } - pub(crate) fn waiters(self) -> &'a Waiters { - self.waiters - } - - /// Call service, do not check service readiness - pub(crate) fn call_nowait(&self, svc: &'a T, req: R) -> T::Future<'a> - where - T: Service + ?Sized, - R: 'a, - { - svc.call( - req, - ServiceCtx { - waiters: self.waiters, - _t: marker::PhantomData, - }, - ) + pub(crate) fn inner(self) -> (usize, &'a WaitersRef) { + (self.idx, self.waiters) } #[inline] @@ -183,10 +218,29 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> { state: ServiceCallState::Ready { svc, req: Some(req), + idx: self.idx, waiters: self.waiters, }, } } + + #[doc(hidden)] + #[inline] + /// Call service, do not check service readiness + pub fn call_nowait(&self, svc: &'a T, req: R) -> T::Future<'a> + where + T: Service + ?Sized, + R: 'a, + { + svc.call( + req, + ServiceCtx { + idx: self.idx, + waiters: self.waiters, + _t: marker::PhantomData, + }, + ) + } } impl<'a, S: ?Sized> Copy for ServiceCtx<'a, S> {} @@ -195,6 +249,7 @@ impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> { #[inline] fn clone(&self) -> Self { Self { + idx: self.idx, waiters: self.waiters, _t: marker::PhantomData, } @@ -203,67 +258,120 @@ impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> { pin_project_lite::pin_project! { #[must_use = "futures do nothing unless polled"] - pub struct ServiceCall<'a, T, Req> + pub struct ContainerCall<'f, S, R> where - T: Service, - T: 'a, - T: ?Sized, + S: Service, + S: 'f, + R: 'f, + { + #[pin] + fut: S::Future<'f>, + container: Container, + } +} + +impl<'f, S, R> ContainerCall<'f, S, R> +where + S: Service + 'f, + R: 'f, +{ + #[inline] + /// Call service and create future object that resolves to service result. + /// + /// Returned future is suitable for spawning into a async runtime. + /// Note, this call does not check service readiness. + pub fn into_static(self) -> ContainerCall<'static, S, R> { + let svc_call = self.fut; + let container = self.container; + + // SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc` + // Container::svc is heap allocated(Rc), we keep it alive until + // `svc_call` get resolved to result + let fut = unsafe { std::mem::transmute(svc_call) }; + ContainerCall { fut, container } + } +} + +impl<'f, S, R> Future for ContainerCall<'f, S, R> +where + S: Service, +{ + type Output = Result; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +pin_project_lite::pin_project! { + #[must_use = "futures do nothing unless polled"] + pub struct ServiceCall<'a, S, Req> + where + S: Service, + S: 'a, + S: ?Sized, Req: 'a, { #[pin] - state: ServiceCallState<'a, T, Req>, + state: ServiceCallState<'a, S, Req>, } } pin_project_lite::pin_project! { #[project = ServiceCallStateProject] - enum ServiceCallState<'a, T, Req> + enum ServiceCallState<'a, S, Req> where - T: Service, - T: 'a, - T: ?Sized, + S: Service, + S: 'a, + S: ?Sized, Req: 'a, { Ready { req: Option, - svc: &'a T, - waiters: &'a Waiters, + svc: &'a S, + idx: usize, + waiters: &'a WaitersRef, }, - Call { #[pin] fut: T::Future<'a> }, + Call { #[pin] fut: S::Future<'a> }, Empty, } } -impl<'a, T, Req> Future for ServiceCall<'a, T, Req> +impl<'a, S, Req> Future for ServiceCall<'a, S, Req> where - T: Service + ?Sized, + S: Service + ?Sized, { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let mut this = self.as_mut().project(); match this.state.as_mut().project() { - ServiceCallStateProject::Ready { req, svc, waiters } => { - match svc.poll_ready(cx)? { - Poll::Ready(()) => { - waiters.notify(); + ServiceCallStateProject::Ready { + req, + svc, + idx, + waiters, + } => match svc.poll_ready(cx)? { + Poll::Ready(()) => { + waiters.notify(); - let fut = svc.call( - req.take().unwrap(), - ServiceCtx { - waiters, - _t: marker::PhantomData, - }, - ); - this.state.set(ServiceCallState::Call { fut }); - self.poll(cx) - } - Poll::Pending => { - waiters.register(cx); - Poll::Pending - } + let fut = svc.call( + req.take().unwrap(), + ServiceCtx { + idx: *idx, + waiters, + _t: marker::PhantomData, + }, + ); + this.state.set(ServiceCallState::Call { fut }); + self.poll(cx) } - } + Poll::Pending => { + waiters.register(*idx, cx); + Poll::Pending + } + }, ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| { this.state.set(ServiceCallState::Empty); r @@ -304,7 +412,8 @@ where #[cfg(test)] mod tests { - use ntex_util::{channel::condition, future::lazy, future::Ready, time}; + use ntex_util::future::{lazy, poll_fn, Ready}; + use ntex_util::{channel::condition, time}; use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll}; use super::*; @@ -369,7 +478,8 @@ mod tests { let data1 = data.clone(); ntex::rt::spawn(async move { - let i = srv1.call("srv1").await.unwrap(); + let _ = poll_fn(|cx| srv1.poll_ready(cx)).await; + let i = srv1.container_call("srv1").await.unwrap(); data1.borrow_mut().push(i); }); diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 1e5e98be..8d20aebb 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -22,7 +22,7 @@ mod pipeline; mod then; pub use self::apply::{apply_fn, apply_fn_factory}; -pub use self::ctx::{Container, ContainerFactory, ServiceCall, ServiceCtx}; +pub use self::ctx::{Container, ContainerCall, ContainerFactory, ServiceCall, ServiceCtx}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_shutdown::fn_shutdown; pub use self::map_config::{map_config, unit_config}; diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 3e6a94cf..8fe2ba48 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.0-beta.2] - 2023-06-21 + +* Remove unsafe from h1 dispatcher + ## [0.7.0-beta.1] - 2023-06-19 * Rename Ctx to ServiceCtx diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 8d4ca7de..58ddd964 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.0-beta.1" +version = "0.7.0-beta.2" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -52,13 +52,13 @@ ntex-codec = "0.6.2" ntex-connect = "0.3.0-beta.1" ntex-http = "0.1.9" ntex-router = "0.5.1" -ntex-service = "1.2.0-beta.2" +ntex-service = "1.2.0-beta.3" ntex-macros = "0.1.3" ntex-util = "0.3.0-beta.1" ntex-bytes = "0.1.19" -ntex-h2 = "0.3.0-beta.1" +ntex-h2 = "0.3.0-beta.2" ntex-rt = "0.4.9" -ntex-io = "0.3.0-beta.1" +ntex-io = "0.3.0-beta.2" ntex-tls = "0.3.0-beta.1" ntex-tokio = { version = "0.3.0-beta.0", optional = true } ntex-glommio = { version = "0.3.0-beta.0", optional = true } diff --git a/ntex/src/http/client/response.rs b/ntex/src/http/client/response.rs index 85fed748..994eac72 100644 --- a/ntex/src/http/client/response.rs +++ b/ntex/src/http/client/response.rs @@ -382,7 +382,7 @@ impl Future for ReadBody { Poll::Ready(Err(PayloadError::Incomplete(Some( std::io::Error::new( std::io::ErrorKind::TimedOut, - "Operation timed our", + "Operation timed out", ), )))) } else { diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 15d47bee..20a6b2d0 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -1,9 +1,9 @@ //! Framed transport dispatcher use std::task::{Context, Poll}; -use std::{cell::RefCell, error::Error, future::Future, io, marker, mem, pin::Pin, rc::Rc}; +use std::{cell::RefCell, error::Error, future::Future, io, marker, pin::Pin, rc::Rc}; use crate::io::{Filter, Io, IoBoxed, IoRef, IoStatusUpdate, RecvError}; -use crate::service::{Container, Service, ServiceCall}; +use crate::service::{Container, ContainerCall, Service}; use crate::util::{ready, Bytes}; use crate::http; @@ -78,10 +78,10 @@ pin_project_lite::pin_project! { where S: 'static, X: 'static { None, - Service { #[pin] fut: ServiceCall<'static, S, Request> }, - ServiceUpgrade { #[pin] fut: ServiceCall<'static, S, Request> }, - Expect { #[pin] fut: ServiceCall<'static, X, Request> }, - Filter { fut: ServiceCall<'static, OnRequest, (Request, IoRef)> } + Service { #[pin] fut: ContainerCall<'static, S, Request> }, + ServiceUpgrade { #[pin] fut: ContainerCall<'static, S, Request> }, + Expect { #[pin] fut: ContainerCall<'static, X, Request> }, + Filter { fut: ContainerCall<'static, OnRequest, (Request, IoRef)> } } } @@ -478,32 +478,23 @@ where fn service_call(&self, req: Request) -> CallState { // Handle normal requests - let fut = self.config.service.call(req); - let st = CallState::Service { - fut: unsafe { mem::transmute_copy(&fut) }, - }; - mem::forget(fut); - st + CallState::Service { + fut: self.config.service.container_call(req).into_static(), + } } fn service_filter(&self, req: Request, f: &Container) -> CallState { // Handle filter fut - let fut = f.call((req, self.io.get_ref())); - let st = CallState::Filter { - fut: unsafe { mem::transmute_copy(&fut) }, - }; - mem::forget(fut); - st + CallState::Filter { + fut: f.container_call((req, self.io.get_ref())).into_static(), + } } fn service_expect(&self, req: Request) -> CallState { // Handle normal requests with EXPECT: 100-Continue` header - let fut = self.config.expect.call(req); - let st = CallState::Expect { - fut: unsafe { mem::transmute_copy(&fut) }, - }; - mem::forget(fut); - st + CallState::Expect { + fut: self.config.expect.container_call(req).into_static(), + } } fn service_upgrade(&mut self, mut req: Request) -> CallState { @@ -514,12 +505,9 @@ where RefCell::new(Some(Box::new((io, self.codec.clone())))), ))); // Handle upgrade requests - let fut = self.config.service.call(req); - let st = CallState::ServiceUpgrade { - fut: unsafe { mem::transmute_copy(&fut) }, - }; - mem::forget(fut); - st + CallState::ServiceUpgrade { + fut: self.config.service.container_call(req).into_static(), + } } fn read_request(