From 589b48f073913c14d5d9150c0d2c02e94ea9160f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 21 Jun 2023 10:24:16 +0600 Subject: [PATCH] Link apply_fn service readiness with parent --- ntex-service/CHANGES.md | 4 ++++ ntex-service/Cargo.toml | 2 +- ntex-service/src/apply.rs | 41 +++++++++++++++++++++++++++------------ ntex-service/src/ctx.rs | 36 ++++++++++++++++++++++------------ ntex-service/src/lib.rs | 2 +- 5 files changed, 59 insertions(+), 26 deletions(-) diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 3eaa05a2..b8ae9dd8 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.2.0-beta.4] - 2023-06-2x + +* Link apply_fn service readiness with parent + ## [1.2.0-beta.3] - 2023-06-21 * Add custom ContainerCall future diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index cfd4930c..b0d5edf2 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "1.2.0-beta.3" +version = "1.2.0-beta.4" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 19151a13..4bb95ac2 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -1,7 +1,7 @@ #![allow(clippy::type_complexity)] use std::{future::Future, marker, pin::Pin, task, task::Poll}; -use super::ctx::{Container, ServiceCtx}; +use super::ctx::{Container, ServiceCall, ServiceCtx}; use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; /// Apply transform function to a service. @@ -11,7 +11,7 @@ pub fn apply_fn( ) -> Apply where T: Service, - F: Fn(In, Container) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, U: IntoService, { @@ -29,7 +29,7 @@ pub fn apply_fn_factory( ) -> ApplyFactory where T: ServiceFactory, - F: Fn(In, Container) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, U: IntoServiceFactory, { @@ -46,10 +46,24 @@ where r: marker::PhantomData (In, Out, R)>, } +pub struct ApplyService { + service: Container, +} + +impl ApplyService { + /// Check readiness and call enclosed service. + pub fn call(&self, req: R) -> ServiceCall<'_, S, R> + where + S: Service, + { + self.service.call(req) + } +} + impl Clone for Apply where T: Service + Clone, - F: Fn(In, Container) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { fn clone(&self) -> Self { @@ -64,7 +78,7 @@ where impl Service for Apply where T: Service, - F: Fn(In, Container) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, { type Response = Out; @@ -76,7 +90,10 @@ where #[inline] fn call<'a>(&'a self, req: In, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - (self.f)(req, self.service.clone()) + let svc = ApplyService { + service: self.service.clone(), + }; + (self.f)(req, svc) } } @@ -84,7 +101,7 @@ where pub struct ApplyFactory where T: ServiceFactory, - F: Fn(In, Container) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { service: T, @@ -95,7 +112,7 @@ where impl ApplyFactory where T: ServiceFactory, - F: Fn(In, Container) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { /// Create new `ApplyNewService` new service instance @@ -112,7 +129,7 @@ impl Clone for ApplyFactory where T: ServiceFactory + Clone, - F: Fn(In, Container) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, R: Future>, { fn clone(&self) -> Self { @@ -128,7 +145,7 @@ impl ServiceFactory for ApplyFactory where T: ServiceFactory, - F: Fn(In, Container) -> R + Clone, + F: Fn(In, ApplyService) -> R + Clone, for<'r> R: Future> + 'r, { type Response = Out; @@ -153,7 +170,7 @@ pin_project_lite::pin_project! { where T: ServiceFactory, T: 'f, - F: Fn(In, Container) -> R, + F: Fn(In, ApplyService) -> R, T::Service: 'f, R: Future>, Cfg: 'f, @@ -169,7 +186,7 @@ impl<'f, T, Req, Cfg, F, R, In, Out, Err> Future for ApplyFactoryResponse<'f, T, Req, Cfg, F, R, In, Out, Err> where T: ServiceFactory, - F: Fn(In, Container) -> R, + F: Fn(In, ApplyService) -> R, R: Future>, { type Output = Result, T::InitError>; diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index a265c12d..d2d969f5 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,4 +1,4 @@ -use std::{cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; +use std::{cell::Cell, cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task}; use crate::{Service, ServiceFactory}; @@ -8,6 +8,7 @@ use crate::{Service, ServiceFactory}; pub struct Container { svc: Rc, waiters: Waiters, + pending: Cell, } pub struct ServiceCtx<'a, S: ?Sized> { @@ -85,6 +86,7 @@ impl Container { let index = waiters.insert(None); Container { svc: Rc::new(svc), + pending: Cell::new(false), waiters: Waiters { index, waiters: Rc::new(WaitersRef(UnsafeCell::new(waiters))), @@ -92,21 +94,27 @@ impl Container { } } - /// Return reference to inner type + #[inline] + /// Return reference to enclosed service pub fn get_ref(&self) -> &S { self.svc.as_ref() } #[inline] /// Returns `Ready` when the service is able to process requests. - pub fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll> + pub fn poll_ready( + &self, + cx: &mut task::Context<'_>, + ) -> task::Poll> where S: Service, { let res = self.svc.poll_ready(cx); if res.is_pending() { + self.pending.set(true); self.waiters.register(cx) - } else { + } else if self.pending.get() { + self.pending.set(false); self.waiters.notify() } res @@ -114,7 +122,7 @@ impl Container { #[inline] /// Shutdown enclosed service. - pub fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> Poll<()> + pub fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> task::Poll<()> where S: Service, { @@ -189,6 +197,7 @@ impl Clone for Container { fn clone(&self) -> Self { Self { svc: self.svc.clone(), + pending: Cell::new(false), waiters: self.waiters.clone(), } } @@ -299,7 +308,7 @@ where type Output = Result; #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { self.project().fut.poll(cx) } } @@ -343,7 +352,10 @@ where { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll { let mut this = self.as_mut().project(); match this.state.as_mut().project() { @@ -353,7 +365,7 @@ where idx, waiters, } => match svc.poll_ready(cx)? { - Poll::Ready(()) => { + task::Poll::Ready(()) => { waiters.notify(); let fut = svc.call( @@ -367,9 +379,9 @@ where this.state.set(ServiceCallState::Call { fut }); self.poll(cx) } - Poll::Pending => { + task::Poll::Pending => { waiters.register(*idx, cx); - Poll::Pending + task::Poll::Pending } }, ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| { @@ -402,8 +414,8 @@ where { type Output = Result, F::InitError>; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - Poll::Ready(Ok(Container::new(task::ready!(self + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + task::Poll::Ready(Ok(Container::new(task::ready!(self .project() .fut .poll(cx))?))) diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 8d20aebb..7b0abe86 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -352,7 +352,7 @@ where pub mod dev { pub use crate::and_then::{AndThen, AndThenFactory}; - pub use crate::apply::{Apply, ApplyFactory}; + pub use crate::apply::{Apply, ApplyFactory, ApplyService}; pub use crate::fn_service::{ FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig, };