diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 8b1a7e3b..d826c699 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -4,6 +4,8 @@ * Add EitherService/EitherServiceFactory +* Add retry middleware + * Add future on drop handler ## [2.8.0] - 2024-12-04 diff --git a/ntex-util/src/future/on_drop.rs b/ntex-util/src/future/on_drop.rs index 8f264e7c..62e8dc7a 100644 --- a/ntex-util/src/future/on_drop.rs +++ b/ntex-util/src/future/on_drop.rs @@ -83,8 +83,12 @@ mod test { #[ntex_macros::rt_test2] async fn on_drop() { - let mut dropped = false; + let f = OnDropFn::new(|| ()); + assert!(format!("{:?}", f).contains("OnDropFn")); + f.cancel(); + assert!(f.f.get().is_none()); + let mut dropped = false; let mut f = pending::<()>().on_drop(|| { dropped = true; }); diff --git a/ntex-util/src/services/mod.rs b/ntex-util/src/services/mod.rs index e3942d72..5b4104f7 100644 --- a/ntex-util/src/services/mod.rs +++ b/ntex-util/src/services/mod.rs @@ -4,6 +4,7 @@ mod extensions; pub mod inflight; pub mod keepalive; pub mod onerequest; +pub mod retry; pub mod timeout; pub mod variant; diff --git a/ntex-util/src/services/retry.rs b/ntex-util/src/services/retry.rs new file mode 100644 index 00000000..fe5889d6 --- /dev/null +++ b/ntex-util/src/services/retry.rs @@ -0,0 +1,177 @@ +#![allow(async_fn_in_trait)] +use ntex_service::{Middleware, Service, ServiceCtx}; + +/// Trait defines retry policy +pub trait Policy>: Sized + Clone { + async fn retry(&mut self, req: &Req, res: &Result) -> bool; + + fn clone_request(&self, req: &Req) -> Option; +} + +#[derive(Clone, Debug)] +/// Retry middleware +/// +/// Retry middleware allows to retry service call +pub struct Retry

{ + policy: P, +} + +#[derive(Clone, Debug)] +/// Retry service +/// +/// Retry service allows to retry service call +pub struct RetryService { + policy: P, + service: S, +} + +impl

Retry

{ + /// Create retry middleware + pub fn new(policy: P) -> Self { + Retry { policy } + } +} + +impl Middleware for Retry

{ + type Service = RetryService; + + fn create(&self, service: S) -> Self::Service { + RetryService { + service, + policy: self.policy.clone(), + } + } +} + +impl RetryService { + /// Create retry service + pub fn new(policy: P, service: S) -> Self { + RetryService { policy, service } + } +} + +impl Service for RetryService +where + P: Policy, + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + + ntex_service::forward_poll!(service); + ntex_service::forward_ready!(service); + ntex_service::forward_shutdown!(service); + + async fn call( + &self, + mut request: R, + ctx: ServiceCtx<'_, Self>, + ) -> Result { + let mut policy = self.policy.clone(); + let mut cloned = policy.clone_request(&request); + + loop { + let result = ctx.call(&self.service, request).await; + + cloned = if let Some(req) = cloned.take() { + if policy.retry(&req, &result).await { + request = req; + policy.clone_request(&request) + } else { + return result; + } + } else { + return result; + } + } + } +} + +#[derive(Copy, Clone, Debug)] +/// Default retry policy +/// +/// This policy retries on any error. By default retry count is 3 +pub struct DefaultRetryPolicy(u16); + +impl DefaultRetryPolicy { + /// Create default retry policy + pub fn new(retry: u16) -> Self { + DefaultRetryPolicy(retry) + } +} + +impl Default for DefaultRetryPolicy { + fn default() -> Self { + DefaultRetryPolicy::new(3) + } +} + +impl Policy for DefaultRetryPolicy +where + R: Clone, + S: Service, +{ + async fn retry(&mut self, _: &R, res: &Result) -> bool { + if res.is_err() { + if self.0 == 0 { + false + } else { + self.0 -= 1; + true + } + } else { + false + } + } + + fn clone_request(&self, req: &R) -> Option { + Some(req.clone()) + } +} + +#[cfg(test)] +mod tests { + use std::{cell::Cell, rc::Rc}; + + use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory}; + + use super::*; + + #[derive(Clone, Debug, PartialEq)] + struct TestService(Rc>); + + impl Service<()> for TestService { + type Response = (); + type Error = (); + + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { + let cnt = self.0.get(); + if cnt == 0 { + Ok(()) + } else { + self.0.set(cnt - 1); + Err(()) + } + } + } + + #[ntex_macros::rt_test2] + async fn test_retry() { + let cnt = Rc::new(Cell::new(5)); + let svc = Pipeline::new( + RetryService::new(DefaultRetryPolicy::default(), TestService(cnt.clone())) + .clone(), + ); + assert_eq!(svc.call(()).await, Err(())); + assert_eq!(svc.ready().await, Ok(())); + svc.shutdown().await; + assert_eq!(cnt.get(), 1); + + let factory = apply( + Retry::new(DefaultRetryPolicy::new(3)).clone(), + fn_factory(|| async { Ok::<_, ()>(TestService(Rc::new(Cell::new(2)))) }), + ); + let srv = factory.pipeline(&()).await.unwrap(); + assert_eq!(srv.call(()).await, Ok(())); + } +} diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index b4ad0a81..39905d15 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -210,7 +210,7 @@ mod tests { #[ntex_macros::rt_test2] #[allow(clippy::redundant_clone)] - async fn test_timeout_newservice() { + async fn test_timeout_middleware() { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(500); diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index c75aad23..23eb1c88 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -717,8 +717,8 @@ where .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); Connector::default() .lifetime(Seconds::ZERO) - .keep_alive(Seconds(45)) - .timeout(Millis(45_000)) + .keep_alive(Seconds(60)) + .timeout(Millis(60_000)) .disconnect_timeout(Seconds(5)) .openssl(builder.build()) .finish() @@ -727,14 +727,14 @@ where { Connector::default() .lifetime(Seconds::ZERO) - .timeout(Millis(45_000)) + .timeout(Millis(60_000)) .finish() } }; Client::build() .connector(connector) - .timeout(Seconds(45)) + .timeout(Seconds(60)) .finish() };