mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 21:07:39 +03:00
Add ContainerCall future (#213)
* Add Container::static_call(), returns future that is suitable for spawning into runtime * Remove unsafe from h1 dispatcher * Make call_nowait public * Update MSRV
This commit is contained in:
parent
50528b11ff
commit
ea14e8f0f4
15 changed files with 267 additions and 176 deletions
2
.github/workflows/linux.yml
vendored
2
.github/workflows/linux.yml
vendored
|
@ -8,7 +8,7 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
version:
|
||||
- 1.65.0 # MSRV
|
||||
- 1.66.0 # MSRV
|
||||
- stable
|
||||
- nightly
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
[](https://github.com/ntex-rs/ntex/actions?query=workflow%3A"CI+(Linux)")
|
||||
[](https://crates.io/crates/ntex)
|
||||
[](https://docs.rs/ntex)
|
||||
[](https://blog.rust-lang.org/2022/11/03/Rust-1.65.0.html)
|
||||
[](https://blog.rust-lang.org/2022/12/15/Rust-1.66.0.html)
|
||||

|
||||
[](https://codecov.io/gh/ntex-rs/ntex)
|
||||
[](https://discord.gg/zBNyhVRz)
|
||||
|
@ -29,7 +29,7 @@ Starting ntex v0.5 async runtime must be selected as a feature. Available option
|
|||
|
||||
```toml
|
||||
[dependencies]
|
||||
ntex = { version = "0.6", features = ["glommio"] }
|
||||
ntex = { version = "0.7", features = ["tokio"] }
|
||||
```
|
||||
|
||||
## Documentation & community resources
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [0.3.0-beta.3] - 2023-06-21
|
||||
|
||||
* Use static ContainerCall for dispatcher
|
||||
|
||||
## [0.3.0-beta.0] - 2023-06-16
|
||||
|
||||
* Migrate to ntex-service 1.2
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-io"
|
||||
version = "0.3.0-beta.1"
|
||||
version = "0.3.0-beta.2"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "Utilities for encoding and decoding frames"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -19,7 +19,7 @@ path = "src/lib.rs"
|
|||
ntex-codec = "0.6.2"
|
||||
ntex-bytes = "0.1.19"
|
||||
ntex-util = "0.3.0-beta.1"
|
||||
ntex-service = "1.2.0-beta.1"
|
||||
ntex-service = "1.2.0-beta.3"
|
||||
|
||||
bitflags = "1.3"
|
||||
log = "0.4"
|
||||
|
|
|
@ -250,8 +250,9 @@ where
|
|||
// call service
|
||||
let shared = slf.shared.clone();
|
||||
shared.inflight.set(shared.inflight.get() + 1);
|
||||
let fut = shared.service.container_call(item).into_static();
|
||||
spawn(async move {
|
||||
let result = shared.service.call(item).await;
|
||||
let result = fut.await;
|
||||
shared.handle_result(result, &shared.io);
|
||||
});
|
||||
}
|
||||
|
@ -275,8 +276,9 @@ where
|
|||
// call service
|
||||
let shared = slf.shared.clone();
|
||||
shared.inflight.set(shared.inflight.get() + 1);
|
||||
let fut = shared.service.container_call(item).into_static();
|
||||
spawn(async move {
|
||||
let result = shared.service.call(item).await;
|
||||
let result = fut.await;
|
||||
shared.handle_result(result, &shared.io);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,5 +1,11 @@
|
|||
# Changes
|
||||
|
||||
## [1.2.0-beta.3] - 2023-06-21
|
||||
|
||||
* Add custom ContainerCall future
|
||||
|
||||
* Allow to turn ContainerCall to static
|
||||
|
||||
## [1.2.0-beta.2] - 2023-06-19
|
||||
|
||||
* Remove Deref for Container<T>
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-service"
|
||||
version = "1.2.0-beta.2"
|
||||
version = "1.2.0-beta.3"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "ntex service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -20,5 +20,5 @@ pin-project-lite = "0.2.6"
|
|||
slab = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
ntex = { version = "0.7.0-beta.0", features = ["tokio"] }
|
||||
ntex-util = "0.3.0-beta.0"
|
||||
ntex = { version = "0.7.0-beta.1", features = ["tokio"] }
|
||||
ntex-util = "0.3.0-beta.1"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#![allow(clippy::type_complexity)]
|
||||
use std::{future::Future, marker, pin::Pin, rc::Rc, task, task::Poll};
|
||||
use std::{future::Future, marker, pin::Pin, task, task::Poll};
|
||||
|
||||
use super::ctx::{ServiceCall, ServiceCtx, Waiters};
|
||||
use super::ctx::{Container, ServiceCtx};
|
||||
use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
|
||||
|
||||
/// Apply transform function to a service.
|
||||
|
@ -11,11 +11,15 @@ pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
|
|||
) -> Apply<T, Req, F, R, In, Out, Err>
|
||||
where
|
||||
T: Service<Req, Error = Err>,
|
||||
for<'r> F: Fn(In, ApplyService<T>) -> R,
|
||||
F: Fn(In, Container<T>) -> R,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
U: IntoService<T, Req>,
|
||||
{
|
||||
Apply::new(service.into_service(), f)
|
||||
Apply {
|
||||
f,
|
||||
service: Container::new(service.into_service()),
|
||||
r: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Service factory that produces `apply_fn` service.
|
||||
|
@ -25,7 +29,7 @@ pub fn apply_fn_factory<T, Req, Cfg, F, R, In, Out, Err, U>(
|
|||
) -> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
|
||||
where
|
||||
T: ServiceFactory<Req, Cfg, Error = Err>,
|
||||
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
|
||||
F: Fn(In, Container<T::Service>) -> R + Clone,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
U: IntoServiceFactory<T, Req, Cfg>,
|
||||
{
|
||||
|
@ -37,31 +41,15 @@ pub struct Apply<T, Req, F, R, In, Out, Err>
|
|||
where
|
||||
T: Service<Req, Error = Err>,
|
||||
{
|
||||
service: Rc<T>,
|
||||
service: Container<T>,
|
||||
f: F,
|
||||
r: marker::PhantomData<fn(Req) -> (In, Out, R)>,
|
||||
}
|
||||
|
||||
impl<T, Req, F, R, In, Out, Err> Apply<T, Req, F, R, In, Out, Err>
|
||||
where
|
||||
T: Service<Req, Error = Err>,
|
||||
F: Fn(In, ApplyService<T>) -> R,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
{
|
||||
/// Create new `Apply` combinator
|
||||
fn new(service: T, f: F) -> Self {
|
||||
Self {
|
||||
f,
|
||||
service: Rc::new(service),
|
||||
r: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Req, F, R, In, Out, Err> Clone for Apply<T, Req, F, R, In, Out, Err>
|
||||
where
|
||||
T: Service<Req, Error = Err> + Clone,
|
||||
F: Fn(In, ApplyService<T>) -> R + Clone,
|
||||
F: Fn(In, Container<T>) -> R + Clone,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
|
@ -73,24 +61,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub struct ApplyService<S> {
|
||||
svc: Rc<S>,
|
||||
waiters: Waiters,
|
||||
}
|
||||
|
||||
impl<S> ApplyService<S> {
|
||||
pub fn call<R>(&self, req: R) -> ServiceCall<'_, S, R>
|
||||
where
|
||||
S: Service<R>,
|
||||
{
|
||||
ServiceCtx::<S>::new(&self.waiters).call(&self.svc, req)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Req, F, R, In, Out, Err> Service<In> for Apply<T, Req, F, R, In, Out, Err>
|
||||
where
|
||||
T: Service<Req, Error = Err>,
|
||||
F: Fn(In, ApplyService<T>) -> R,
|
||||
F: Fn(In, Container<T>) -> R,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
{
|
||||
type Response = Out;
|
||||
|
@ -101,12 +75,8 @@ where
|
|||
crate::forward_poll_shutdown!(service);
|
||||
|
||||
#[inline]
|
||||
fn call<'a>(&'a self, req: In, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
|
||||
let svc = ApplyService {
|
||||
svc: self.service.clone(),
|
||||
waiters: ctx.waiters().clone(),
|
||||
};
|
||||
(self.f)(req, svc)
|
||||
fn call<'a>(&'a self, req: In, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
|
||||
(self.f)(req, self.service.clone())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,7 +84,7 @@ where
|
|||
pub struct ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
|
||||
where
|
||||
T: ServiceFactory<Req, Cfg, Error = Err>,
|
||||
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
|
||||
F: Fn(In, Container<T::Service>) -> R + Clone,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
{
|
||||
service: T,
|
||||
|
@ -125,7 +95,7 @@ where
|
|||
impl<T, Req, Cfg, F, R, In, Out, Err> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
|
||||
where
|
||||
T: ServiceFactory<Req, Cfg, Error = Err>,
|
||||
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
|
||||
F: Fn(In, Container<T::Service>) -> R + Clone,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
{
|
||||
/// Create new `ApplyNewService` new service instance
|
||||
|
@ -142,7 +112,7 @@ impl<T, Req, Cfg, F, R, In, Out, Err> Clone
|
|||
for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
|
||||
where
|
||||
T: ServiceFactory<Req, Cfg, Error = Err> + Clone,
|
||||
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
|
||||
F: Fn(In, Container<T::Service>) -> R + Clone,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
|
@ -158,7 +128,7 @@ impl<T, Req, Cfg, F, R, In, Out, Err> ServiceFactory<In, Cfg>
|
|||
for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
|
||||
where
|
||||
T: ServiceFactory<Req, Cfg, Error = Err>,
|
||||
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
|
||||
F: Fn(In, Container<T::Service>) -> R + Clone,
|
||||
for<'r> R: Future<Output = Result<Out, Err>> + 'r,
|
||||
{
|
||||
type Response = Out;
|
||||
|
@ -183,7 +153,7 @@ pin_project_lite::pin_project! {
|
|||
where
|
||||
T: ServiceFactory<Req, Cfg, Error = Err>,
|
||||
T: 'f,
|
||||
F: Fn(In, ApplyService<T::Service>) -> R,
|
||||
F: Fn(In, Container<T::Service>) -> R,
|
||||
T::Service: 'f,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
Cfg: 'f,
|
||||
|
@ -199,7 +169,7 @@ impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future
|
|||
for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err>
|
||||
where
|
||||
T: ServiceFactory<Req, Cfg, Error = Err>,
|
||||
F: Fn(In, ApplyService<T::Service>) -> R,
|
||||
F: Fn(In, Container<T::Service>) -> R,
|
||||
R: Future<Output = Result<Out, Err>>,
|
||||
{
|
||||
type Output = Result<Apply<T::Service, Req, F, R, In, Out, Err>, T::InitError>;
|
||||
|
@ -208,7 +178,11 @@ where
|
|||
let this = self.project();
|
||||
|
||||
if let Poll::Ready(svc) = this.fut.poll(cx)? {
|
||||
Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap())))
|
||||
Poll::Ready(Ok(Apply {
|
||||
service: svc.into(),
|
||||
f: this.f.take().unwrap(),
|
||||
r: marker::PhantomData,
|
||||
}))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
|
@ -239,8 +213,8 @@ mod tests {
|
|||
#[ntex::test]
|
||||
async fn test_call() {
|
||||
let srv = pipeline(
|
||||
apply_fn(Srv, |req: &'static str, srv| async move {
|
||||
srv.call(()).await.unwrap();
|
||||
apply_fn(Srv, |req: &'static str, svc| async move {
|
||||
svc.call(()).await.unwrap();
|
||||
Ok((req, ()))
|
||||
})
|
||||
.clone(),
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::{future::Future, pin::Pin, task::Context, task::Poll};
|
||||
|
||||
use crate::ctx::{ServiceCtx, Waiters};
|
||||
use crate::ctx::{ServiceCtx, WaitersRef};
|
||||
|
||||
pub type BoxFuture<'a, I, E> = Pin<Box<dyn Future<Output = Result<I, E>> + 'a>>;
|
||||
|
||||
|
@ -43,7 +43,8 @@ trait ServiceObj<Req> {
|
|||
fn call<'a>(
|
||||
&'a self,
|
||||
req: Req,
|
||||
waiters: &'a Waiters,
|
||||
idx: usize,
|
||||
waiters: &'a WaitersRef,
|
||||
) -> BoxFuture<'a, Self::Response, Self::Error>;
|
||||
}
|
||||
|
||||
|
@ -69,9 +70,10 @@ where
|
|||
fn call<'a>(
|
||||
&'a self,
|
||||
req: Req,
|
||||
waiters: &'a Waiters,
|
||||
idx: usize,
|
||||
waiters: &'a WaitersRef,
|
||||
) -> BoxFuture<'a, Self::Response, Self::Error> {
|
||||
Box::pin(ServiceCtx::<'a, S>::new(waiters).call_nowait(self, req))
|
||||
Box::pin(ServiceCtx::<'a, S>::new(idx, waiters).call_nowait(self, req))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,7 +134,8 @@ where
|
|||
|
||||
#[inline]
|
||||
fn call<'a>(&'a self, req: Req, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
|
||||
self.0.call(req, ctx.waiters())
|
||||
let (idx, waiters) = ctx.inner();
|
||||
self.0.call(req, idx, waiters)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,19 +11,35 @@ pub struct Container<S> {
|
|||
}
|
||||
|
||||
pub struct ServiceCtx<'a, S: ?Sized> {
|
||||
waiters: &'a Waiters,
|
||||
idx: usize,
|
||||
waiters: &'a WaitersRef,
|
||||
_t: marker::PhantomData<Rc<S>>,
|
||||
}
|
||||
|
||||
pub(crate) struct WaitersRef(UnsafeCell<slab::Slab<Option<task::Waker>>>);
|
||||
|
||||
pub(crate) struct Waiters {
|
||||
index: usize,
|
||||
waiters: Rc<UnsafeCell<slab::Slab<Option<task::Waker>>>>,
|
||||
waiters: Rc<WaitersRef>,
|
||||
}
|
||||
|
||||
impl Waiters {
|
||||
impl WaitersRef {
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
fn get(&self) -> &mut slab::Slab<Option<task::Waker>> {
|
||||
unsafe { &mut *self.waiters.as_ref().get() }
|
||||
unsafe { &mut *self.0.get() }
|
||||
}
|
||||
|
||||
fn insert(&self) -> usize {
|
||||
self.get().insert(None)
|
||||
}
|
||||
|
||||
fn remove(&self, idx: usize) {
|
||||
self.notify();
|
||||
self.get().remove(idx);
|
||||
}
|
||||
|
||||
fn register(&self, idx: usize, cx: &mut task::Context<'_>) {
|
||||
self.get()[idx] = Some(cx.waker().clone());
|
||||
}
|
||||
|
||||
fn notify(&self) {
|
||||
|
@ -33,16 +49,22 @@ impl Waiters {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Waiters {
|
||||
fn register(&self, cx: &mut task::Context<'_>) {
|
||||
self.get()[self.index] = Some(cx.waker().clone());
|
||||
self.waiters.register(self.index, cx)
|
||||
}
|
||||
|
||||
fn notify(&self) {
|
||||
self.waiters.notify()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Waiters {
|
||||
fn clone(&self) -> Self {
|
||||
Waiters {
|
||||
index: self.get().insert(None),
|
||||
index: self.waiters.insert(),
|
||||
waiters: self.waiters.clone(),
|
||||
}
|
||||
}
|
||||
|
@ -51,8 +73,7 @@ impl Clone for Waiters {
|
|||
impl Drop for Waiters {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
self.get().remove(self.index);
|
||||
self.notify();
|
||||
self.waiters.remove(self.index);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,7 +87,7 @@ impl<S> Container<S> {
|
|||
svc: Rc::new(svc),
|
||||
waiters: Waiters {
|
||||
index,
|
||||
waiters: Rc::new(UnsafeCell::new(waiters)),
|
||||
waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -101,18 +122,46 @@ impl<S> Container<S> {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
/// Process the request and return the response asynchronously.
|
||||
/// Call service and create future object that resolves to service result.
|
||||
///
|
||||
/// Note, this call does not check service readiness.
|
||||
pub fn call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R>
|
||||
where
|
||||
S: Service<R>,
|
||||
{
|
||||
let ctx = ServiceCtx::<'a, S> {
|
||||
waiters: &self.waiters,
|
||||
idx: self.waiters.index,
|
||||
waiters: self.waiters.waiters.as_ref(),
|
||||
_t: marker::PhantomData,
|
||||
};
|
||||
ctx.call(self.svc.as_ref(), req)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Call service and create future object that resolves to service result.
|
||||
///
|
||||
/// Note, this call does not check service readiness.
|
||||
pub fn container_call<R>(&self, req: R) -> ContainerCall<'_, S, R>
|
||||
where
|
||||
S: Service<R>,
|
||||
{
|
||||
let container = self.clone();
|
||||
let svc_call = container.svc.call(
|
||||
req,
|
||||
ServiceCtx {
|
||||
idx: container.waiters.index,
|
||||
waiters: container.waiters.waiters.as_ref(),
|
||||
_t: marker::PhantomData,
|
||||
},
|
||||
);
|
||||
|
||||
// SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc`
|
||||
// Container::svc is heap allocated(Rc<S>), we keep it alive until
|
||||
// `svc_call` get resolved to result
|
||||
let fut = unsafe { std::mem::transmute(svc_call) };
|
||||
ContainerCall { fut, container }
|
||||
}
|
||||
|
||||
pub(crate) fn create<F: ServiceFactory<R, C>, R, C>(
|
||||
f: &F,
|
||||
cfg: C,
|
||||
|
@ -128,6 +177,13 @@ impl<S> Container<S> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<S> From<S> for Container<S> {
|
||||
#[inline]
|
||||
fn from(svc: S) -> Self {
|
||||
Container::new(svc)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Clone for Container<S> {
|
||||
#[inline]
|
||||
fn clone(&self) -> Self {
|
||||
|
@ -138,38 +194,17 @@ impl<S> Clone for Container<S> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<S> From<S> for Container<S> {
|
||||
#[inline]
|
||||
fn from(svc: S) -> Self {
|
||||
Container::new(svc)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: ?Sized> ServiceCtx<'a, S> {
|
||||
pub(crate) fn new(waiters: &'a Waiters) -> Self {
|
||||
pub(crate) fn new(idx: usize, waiters: &'a WaitersRef) -> Self {
|
||||
Self {
|
||||
idx,
|
||||
waiters,
|
||||
_t: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn waiters(self) -> &'a Waiters {
|
||||
self.waiters
|
||||
}
|
||||
|
||||
/// Call service, do not check service readiness
|
||||
pub(crate) fn call_nowait<T, R>(&self, svc: &'a T, req: R) -> T::Future<'a>
|
||||
where
|
||||
T: Service<R> + ?Sized,
|
||||
R: 'a,
|
||||
{
|
||||
svc.call(
|
||||
req,
|
||||
ServiceCtx {
|
||||
waiters: self.waiters,
|
||||
_t: marker::PhantomData,
|
||||
},
|
||||
)
|
||||
pub(crate) fn inner(self) -> (usize, &'a WaitersRef) {
|
||||
(self.idx, self.waiters)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -183,10 +218,29 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> {
|
|||
state: ServiceCallState::Ready {
|
||||
svc,
|
||||
req: Some(req),
|
||||
idx: self.idx,
|
||||
waiters: self.waiters,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[inline]
|
||||
/// Call service, do not check service readiness
|
||||
pub fn call_nowait<T, R>(&self, svc: &'a T, req: R) -> T::Future<'a>
|
||||
where
|
||||
T: Service<R> + ?Sized,
|
||||
R: 'a,
|
||||
{
|
||||
svc.call(
|
||||
req,
|
||||
ServiceCtx {
|
||||
idx: self.idx,
|
||||
waiters: self.waiters,
|
||||
_t: marker::PhantomData,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: ?Sized> Copy for ServiceCtx<'a, S> {}
|
||||
|
@ -195,6 +249,7 @@ impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> {
|
|||
#[inline]
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
idx: self.idx,
|
||||
waiters: self.waiters,
|
||||
_t: marker::PhantomData,
|
||||
}
|
||||
|
@ -203,67 +258,120 @@ impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> {
|
|||
|
||||
pin_project_lite::pin_project! {
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct ServiceCall<'a, T, Req>
|
||||
pub struct ContainerCall<'f, S, R>
|
||||
where
|
||||
T: Service<Req>,
|
||||
T: 'a,
|
||||
T: ?Sized,
|
||||
S: Service<R>,
|
||||
S: 'f,
|
||||
R: 'f,
|
||||
{
|
||||
#[pin]
|
||||
fut: S::Future<'f>,
|
||||
container: Container<S>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<'f, S, R> ContainerCall<'f, S, R>
|
||||
where
|
||||
S: Service<R> + 'f,
|
||||
R: 'f,
|
||||
{
|
||||
#[inline]
|
||||
/// Call service and create future object that resolves to service result.
|
||||
///
|
||||
/// Returned future is suitable for spawning into a async runtime.
|
||||
/// Note, this call does not check service readiness.
|
||||
pub fn into_static(self) -> ContainerCall<'static, S, R> {
|
||||
let svc_call = self.fut;
|
||||
let container = self.container;
|
||||
|
||||
// SAFETY: `svc_call` has same lifetime same as lifetime of `container.svc`
|
||||
// Container::svc is heap allocated(Rc<S>), we keep it alive until
|
||||
// `svc_call` get resolved to result
|
||||
let fut = unsafe { std::mem::transmute(svc_call) };
|
||||
ContainerCall { fut, container }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'f, S, R> Future for ContainerCall<'f, S, R>
|
||||
where
|
||||
S: Service<R>,
|
||||
{
|
||||
type Output = Result<S::Response, S::Error>;
|
||||
|
||||
#[inline]
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
self.project().fut.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct ServiceCall<'a, S, Req>
|
||||
where
|
||||
S: Service<Req>,
|
||||
S: 'a,
|
||||
S: ?Sized,
|
||||
Req: 'a,
|
||||
{
|
||||
#[pin]
|
||||
state: ServiceCallState<'a, T, Req>,
|
||||
state: ServiceCallState<'a, S, Req>,
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
#[project = ServiceCallStateProject]
|
||||
enum ServiceCallState<'a, T, Req>
|
||||
enum ServiceCallState<'a, S, Req>
|
||||
where
|
||||
T: Service<Req>,
|
||||
T: 'a,
|
||||
T: ?Sized,
|
||||
S: Service<Req>,
|
||||
S: 'a,
|
||||
S: ?Sized,
|
||||
Req: 'a,
|
||||
{
|
||||
Ready { req: Option<Req>,
|
||||
svc: &'a T,
|
||||
waiters: &'a Waiters,
|
||||
svc: &'a S,
|
||||
idx: usize,
|
||||
waiters: &'a WaitersRef,
|
||||
},
|
||||
Call { #[pin] fut: T::Future<'a> },
|
||||
Call { #[pin] fut: S::Future<'a> },
|
||||
Empty,
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T, Req> Future for ServiceCall<'a, T, Req>
|
||||
impl<'a, S, Req> Future for ServiceCall<'a, S, Req>
|
||||
where
|
||||
T: Service<Req> + ?Sized,
|
||||
S: Service<Req> + ?Sized,
|
||||
{
|
||||
type Output = Result<T::Response, T::Error>;
|
||||
type Output = Result<S::Response, S::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.as_mut().project();
|
||||
|
||||
match this.state.as_mut().project() {
|
||||
ServiceCallStateProject::Ready { req, svc, waiters } => {
|
||||
match svc.poll_ready(cx)? {
|
||||
Poll::Ready(()) => {
|
||||
waiters.notify();
|
||||
ServiceCallStateProject::Ready {
|
||||
req,
|
||||
svc,
|
||||
idx,
|
||||
waiters,
|
||||
} => match svc.poll_ready(cx)? {
|
||||
Poll::Ready(()) => {
|
||||
waiters.notify();
|
||||
|
||||
let fut = svc.call(
|
||||
req.take().unwrap(),
|
||||
ServiceCtx {
|
||||
waiters,
|
||||
_t: marker::PhantomData,
|
||||
},
|
||||
);
|
||||
this.state.set(ServiceCallState::Call { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
Poll::Pending => {
|
||||
waiters.register(cx);
|
||||
Poll::Pending
|
||||
}
|
||||
let fut = svc.call(
|
||||
req.take().unwrap(),
|
||||
ServiceCtx {
|
||||
idx: *idx,
|
||||
waiters,
|
||||
_t: marker::PhantomData,
|
||||
},
|
||||
);
|
||||
this.state.set(ServiceCallState::Call { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
}
|
||||
Poll::Pending => {
|
||||
waiters.register(*idx, cx);
|
||||
Poll::Pending
|
||||
}
|
||||
},
|
||||
ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| {
|
||||
this.state.set(ServiceCallState::Empty);
|
||||
r
|
||||
|
@ -304,7 +412,8 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ntex_util::{channel::condition, future::lazy, future::Ready, time};
|
||||
use ntex_util::future::{lazy, poll_fn, Ready};
|
||||
use ntex_util::{channel::condition, time};
|
||||
use std::{cell::Cell, cell::RefCell, rc::Rc, task::Context, task::Poll};
|
||||
|
||||
use super::*;
|
||||
|
@ -369,7 +478,8 @@ mod tests {
|
|||
|
||||
let data1 = data.clone();
|
||||
ntex::rt::spawn(async move {
|
||||
let i = srv1.call("srv1").await.unwrap();
|
||||
let _ = poll_fn(|cx| srv1.poll_ready(cx)).await;
|
||||
let i = srv1.container_call("srv1").await.unwrap();
|
||||
data1.borrow_mut().push(i);
|
||||
});
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ mod pipeline;
|
|||
mod then;
|
||||
|
||||
pub use self::apply::{apply_fn, apply_fn_factory};
|
||||
pub use self::ctx::{Container, ContainerFactory, ServiceCall, ServiceCtx};
|
||||
pub use self::ctx::{Container, ContainerCall, ContainerFactory, ServiceCall, ServiceCtx};
|
||||
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
|
||||
pub use self::fn_shutdown::fn_shutdown;
|
||||
pub use self::map_config::{map_config, unit_config};
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [0.7.0-beta.2] - 2023-06-21
|
||||
|
||||
* Remove unsafe from h1 dispatcher
|
||||
|
||||
## [0.7.0-beta.1] - 2023-06-19
|
||||
|
||||
* Rename Ctx to ServiceCtx
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex"
|
||||
version = "0.7.0-beta.1"
|
||||
version = "0.7.0-beta.2"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "Framework for composable network services"
|
||||
readme = "README.md"
|
||||
|
@ -52,13 +52,13 @@ ntex-codec = "0.6.2"
|
|||
ntex-connect = "0.3.0-beta.1"
|
||||
ntex-http = "0.1.9"
|
||||
ntex-router = "0.5.1"
|
||||
ntex-service = "1.2.0-beta.2"
|
||||
ntex-service = "1.2.0-beta.3"
|
||||
ntex-macros = "0.1.3"
|
||||
ntex-util = "0.3.0-beta.1"
|
||||
ntex-bytes = "0.1.19"
|
||||
ntex-h2 = "0.3.0-beta.1"
|
||||
ntex-h2 = "0.3.0-beta.2"
|
||||
ntex-rt = "0.4.9"
|
||||
ntex-io = "0.3.0-beta.1"
|
||||
ntex-io = "0.3.0-beta.2"
|
||||
ntex-tls = "0.3.0-beta.1"
|
||||
ntex-tokio = { version = "0.3.0-beta.0", optional = true }
|
||||
ntex-glommio = { version = "0.3.0-beta.0", optional = true }
|
||||
|
|
|
@ -382,7 +382,7 @@ impl Future for ReadBody {
|
|||
Poll::Ready(Err(PayloadError::Incomplete(Some(
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"Operation timed our",
|
||||
"Operation timed out",
|
||||
),
|
||||
))))
|
||||
} else {
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
//! Framed transport dispatcher
|
||||
use std::task::{Context, Poll};
|
||||
use std::{cell::RefCell, error::Error, future::Future, io, marker, mem, pin::Pin, rc::Rc};
|
||||
use std::{cell::RefCell, error::Error, future::Future, io, marker, pin::Pin, rc::Rc};
|
||||
|
||||
use crate::io::{Filter, Io, IoBoxed, IoRef, IoStatusUpdate, RecvError};
|
||||
use crate::service::{Container, Service, ServiceCall};
|
||||
use crate::service::{Container, ContainerCall, Service};
|
||||
use crate::util::{ready, Bytes};
|
||||
|
||||
use crate::http;
|
||||
|
@ -78,10 +78,10 @@ pin_project_lite::pin_project! {
|
|||
where S: 'static, X: 'static
|
||||
{
|
||||
None,
|
||||
Service { #[pin] fut: ServiceCall<'static, S, Request> },
|
||||
ServiceUpgrade { #[pin] fut: ServiceCall<'static, S, Request> },
|
||||
Expect { #[pin] fut: ServiceCall<'static, X, Request> },
|
||||
Filter { fut: ServiceCall<'static, OnRequest, (Request, IoRef)> }
|
||||
Service { #[pin] fut: ContainerCall<'static, S, Request> },
|
||||
ServiceUpgrade { #[pin] fut: ContainerCall<'static, S, Request> },
|
||||
Expect { #[pin] fut: ContainerCall<'static, X, Request> },
|
||||
Filter { fut: ContainerCall<'static, OnRequest, (Request, IoRef)> }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -478,32 +478,23 @@ where
|
|||
|
||||
fn service_call(&self, req: Request) -> CallState<S, X> {
|
||||
// Handle normal requests
|
||||
let fut = self.config.service.call(req);
|
||||
let st = CallState::Service {
|
||||
fut: unsafe { mem::transmute_copy(&fut) },
|
||||
};
|
||||
mem::forget(fut);
|
||||
st
|
||||
CallState::Service {
|
||||
fut: self.config.service.container_call(req).into_static(),
|
||||
}
|
||||
}
|
||||
|
||||
fn service_filter(&self, req: Request, f: &Container<OnRequest>) -> CallState<S, X> {
|
||||
// Handle filter fut
|
||||
let fut = f.call((req, self.io.get_ref()));
|
||||
let st = CallState::Filter {
|
||||
fut: unsafe { mem::transmute_copy(&fut) },
|
||||
};
|
||||
mem::forget(fut);
|
||||
st
|
||||
CallState::Filter {
|
||||
fut: f.container_call((req, self.io.get_ref())).into_static(),
|
||||
}
|
||||
}
|
||||
|
||||
fn service_expect(&self, req: Request) -> CallState<S, X> {
|
||||
// Handle normal requests with EXPECT: 100-Continue` header
|
||||
let fut = self.config.expect.call(req);
|
||||
let st = CallState::Expect {
|
||||
fut: unsafe { mem::transmute_copy(&fut) },
|
||||
};
|
||||
mem::forget(fut);
|
||||
st
|
||||
CallState::Expect {
|
||||
fut: self.config.expect.container_call(req).into_static(),
|
||||
}
|
||||
}
|
||||
|
||||
fn service_upgrade(&mut self, mut req: Request) -> CallState<S, X> {
|
||||
|
@ -514,12 +505,9 @@ where
|
|||
RefCell::new(Some(Box::new((io, self.codec.clone())))),
|
||||
)));
|
||||
// Handle upgrade requests
|
||||
let fut = self.config.service.call(req);
|
||||
let st = CallState::ServiceUpgrade {
|
||||
fut: unsafe { mem::transmute_copy(&fut) },
|
||||
};
|
||||
mem::forget(fut);
|
||||
st
|
||||
CallState::ServiceUpgrade {
|
||||
fut: self.config.service.container_call(req).into_static(),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_request(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue