Link apply_fn service readiness with parent

This commit is contained in:
Nikolay Kim 2023-06-21 10:24:16 +06:00
parent ea14e8f0f4
commit 589b48f073
5 changed files with 59 additions and 26 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [1.2.0-beta.4] - 2023-06-2x
* Link apply_fn service readiness with parent
## [1.2.0-beta.3] - 2023-06-21
* Add custom ContainerCall future

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "1.2.0-beta.3"
version = "1.2.0-beta.4"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]

View file

@ -1,7 +1,7 @@
#![allow(clippy::type_complexity)]
use std::{future::Future, marker, pin::Pin, task, task::Poll};
use super::ctx::{Container, ServiceCtx};
use super::ctx::{Container, ServiceCall, ServiceCtx};
use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Apply transform function to a service.
@ -11,7 +11,7 @@ pub fn apply_fn<T, Req, F, R, In, Out, Err, U>(
) -> Apply<T, Req, F, R, In, Out, Err>
where
T: Service<Req, Error = Err>,
F: Fn(In, Container<T>) -> R,
F: Fn(In, ApplyService<T>) -> R,
R: Future<Output = Result<Out, Err>>,
U: IntoService<T, Req>,
{
@ -29,7 +29,7 @@ pub fn apply_fn_factory<T, Req, Cfg, F, R, In, Out, Err, U>(
) -> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Container<T::Service>) -> R + Clone,
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
U: IntoServiceFactory<T, Req, Cfg>,
{
@ -46,10 +46,24 @@ where
r: marker::PhantomData<fn(Req) -> (In, Out, R)>,
}
pub struct ApplyService<S> {
service: Container<S>,
}
impl<S> ApplyService<S> {
/// Check readiness and call enclosed service.
pub fn call<R>(&self, req: R) -> ServiceCall<'_, S, R>
where
S: Service<R>,
{
self.service.call(req)
}
}
impl<T, Req, F, R, In, Out, Err> Clone for Apply<T, Req, F, R, In, Out, Err>
where
T: Service<Req, Error = Err> + Clone,
F: Fn(In, Container<T>) -> R + Clone,
F: Fn(In, ApplyService<T>) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
fn clone(&self) -> Self {
@ -64,7 +78,7 @@ where
impl<T, Req, F, R, In, Out, Err> Service<In> for Apply<T, Req, F, R, In, Out, Err>
where
T: Service<Req, Error = Err>,
F: Fn(In, Container<T>) -> R,
F: Fn(In, ApplyService<T>) -> R,
R: Future<Output = Result<Out, Err>>,
{
type Response = Out;
@ -76,7 +90,10 @@ where
#[inline]
fn call<'a>(&'a self, req: In, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
(self.f)(req, self.service.clone())
let svc = ApplyService {
service: self.service.clone(),
};
(self.f)(req, svc)
}
}
@ -84,7 +101,7 @@ where
pub struct ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Container<T::Service>) -> R + Clone,
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
service: T,
@ -95,7 +112,7 @@ where
impl<T, Req, Cfg, F, R, In, Out, Err> ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Container<T::Service>) -> R + Clone,
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
/// Create new `ApplyNewService` new service instance
@ -112,7 +129,7 @@ impl<T, Req, Cfg, F, R, In, Out, Err> Clone
for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg, Error = Err> + Clone,
F: Fn(In, Container<T::Service>) -> R + Clone,
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
fn clone(&self) -> Self {
@ -128,7 +145,7 @@ impl<T, Req, Cfg, F, R, In, Out, Err> ServiceFactory<In, Cfg>
for ApplyFactory<T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Container<T::Service>) -> R + Clone,
F: Fn(In, ApplyService<T::Service>) -> R + Clone,
for<'r> R: Future<Output = Result<Out, Err>> + 'r,
{
type Response = Out;
@ -153,7 +170,7 @@ pin_project_lite::pin_project! {
where
T: ServiceFactory<Req, Cfg, Error = Err>,
T: 'f,
F: Fn(In, Container<T::Service>) -> R,
F: Fn(In, ApplyService<T::Service>) -> R,
T::Service: 'f,
R: Future<Output = Result<Out, Err>>,
Cfg: 'f,
@ -169,7 +186,7 @@ impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future
for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err>
where
T: ServiceFactory<Req, Cfg, Error = Err>,
F: Fn(In, Container<T::Service>) -> R,
F: Fn(In, ApplyService<T::Service>) -> R,
R: Future<Output = Result<Out, Err>>,
{
type Output = Result<Apply<T::Service, Req, F, R, In, Out, Err>, T::InitError>;

View file

@ -1,4 +1,4 @@
use std::{cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll};
use std::{cell::Cell, cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task};
use crate::{Service, ServiceFactory};
@ -8,6 +8,7 @@ use crate::{Service, ServiceFactory};
pub struct Container<S> {
svc: Rc<S>,
waiters: Waiters,
pending: Cell<bool>,
}
pub struct ServiceCtx<'a, S: ?Sized> {
@ -85,6 +86,7 @@ impl<S> Container<S> {
let index = waiters.insert(None);
Container {
svc: Rc::new(svc),
pending: Cell::new(false),
waiters: Waiters {
index,
waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))),
@ -92,21 +94,27 @@ impl<S> Container<S> {
}
}
/// Return reference to inner type
#[inline]
/// Return reference to enclosed service
pub fn get_ref(&self) -> &S {
self.svc.as_ref()
}
#[inline]
/// Returns `Ready` when the service is able to process requests.
pub fn poll_ready<R>(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), S::Error>>
pub fn poll_ready<R>(
&self,
cx: &mut task::Context<'_>,
) -> task::Poll<Result<(), S::Error>>
where
S: Service<R>,
{
let res = self.svc.poll_ready(cx);
if res.is_pending() {
self.pending.set(true);
self.waiters.register(cx)
} else {
} else if self.pending.get() {
self.pending.set(false);
self.waiters.notify()
}
res
@ -114,7 +122,7 @@ impl<S> Container<S> {
#[inline]
/// Shutdown enclosed service.
pub fn poll_shutdown<R>(&self, cx: &mut task::Context<'_>) -> Poll<()>
pub fn poll_shutdown<R>(&self, cx: &mut task::Context<'_>) -> task::Poll<()>
where
S: Service<R>,
{
@ -189,6 +197,7 @@ impl<S> Clone for Container<S> {
fn clone(&self) -> Self {
Self {
svc: self.svc.clone(),
pending: Cell::new(false),
waiters: self.waiters.clone(),
}
}
@ -299,7 +308,7 @@ where
type Output = Result<S::Response, S::Error>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
@ -343,7 +352,10 @@ where
{
type Output = Result<S::Response, S::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
@ -353,7 +365,7 @@ where
idx,
waiters,
} => match svc.poll_ready(cx)? {
Poll::Ready(()) => {
task::Poll::Ready(()) => {
waiters.notify();
let fut = svc.call(
@ -367,9 +379,9 @@ where
this.state.set(ServiceCallState::Call { fut });
self.poll(cx)
}
Poll::Pending => {
task::Poll::Pending => {
waiters.register(*idx, cx);
Poll::Pending
task::Poll::Pending
}
},
ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| {
@ -402,8 +414,8 @@ where
{
type Output = Result<Container<F::Service>, F::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(Container::new(task::ready!(self
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
task::Poll::Ready(Ok(Container::new(task::ready!(self
.project()
.fut
.poll(cx))?)))

View file

@ -352,7 +352,7 @@ where
pub mod dev {
pub use crate::and_then::{AndThen, AndThenFactory};
pub use crate::apply::{Apply, ApplyFactory};
pub use crate::apply::{Apply, ApplyFactory, ApplyService};
pub use crate::fn_service::{
FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig,
};