Retry middleware (#502)

This commit is contained in:
Nikolay Kim 2025-01-15 17:13:50 +05:00 committed by GitHub
parent 451f546a13
commit 4d4ab811bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 190 additions and 6 deletions

View file

@ -4,6 +4,8 @@
* Add EitherService/EitherServiceFactory
* Add retry middleware
* Add future on drop handler
## [2.8.0] - 2024-12-04

View file

@ -83,8 +83,12 @@ mod test {
#[ntex_macros::rt_test2]
async fn on_drop() {
let mut dropped = false;
let f = OnDropFn::new(|| ());
assert!(format!("{:?}", f).contains("OnDropFn"));
f.cancel();
assert!(f.f.get().is_none());
let mut dropped = false;
let mut f = pending::<()>().on_drop(|| {
dropped = true;
});

View file

@ -4,6 +4,7 @@ mod extensions;
pub mod inflight;
pub mod keepalive;
pub mod onerequest;
pub mod retry;
pub mod timeout;
pub mod variant;

View file

@ -0,0 +1,177 @@
#![allow(async_fn_in_trait)]
use ntex_service::{Middleware, Service, ServiceCtx};
/// Trait defines retry policy
pub trait Policy<Req, S: Service<Req>>: Sized + Clone {
async fn retry(&mut self, req: &Req, res: &Result<S::Response, S::Error>) -> bool;
fn clone_request(&self, req: &Req) -> Option<Req>;
}
#[derive(Clone, Debug)]
/// Retry middleware
///
/// Retry middleware allows to retry service call
pub struct Retry<P> {
policy: P,
}
#[derive(Clone, Debug)]
/// Retry service
///
/// Retry service allows to retry service call
pub struct RetryService<P, S> {
policy: P,
service: S,
}
impl<P> Retry<P> {
/// Create retry middleware
pub fn new(policy: P) -> Self {
Retry { policy }
}
}
impl<P: Clone, S> Middleware<S> for Retry<P> {
type Service = RetryService<P, S>;
fn create(&self, service: S) -> Self::Service {
RetryService {
service,
policy: self.policy.clone(),
}
}
}
impl<P, S> RetryService<P, S> {
/// Create retry service
pub fn new(policy: P, service: S) -> Self {
RetryService { policy, service }
}
}
impl<P, S, R> Service<R> for RetryService<P, S>
where
P: Policy<R, S>,
S: Service<R>,
{
type Response = S::Response;
type Error = S::Error;
ntex_service::forward_poll!(service);
ntex_service::forward_ready!(service);
ntex_service::forward_shutdown!(service);
async fn call(
&self,
mut request: R,
ctx: ServiceCtx<'_, Self>,
) -> Result<S::Response, S::Error> {
let mut policy = self.policy.clone();
let mut cloned = policy.clone_request(&request);
loop {
let result = ctx.call(&self.service, request).await;
cloned = if let Some(req) = cloned.take() {
if policy.retry(&req, &result).await {
request = req;
policy.clone_request(&request)
} else {
return result;
}
} else {
return result;
}
}
}
}
#[derive(Copy, Clone, Debug)]
/// Default retry policy
///
/// This policy retries on any error. By default retry count is 3
pub struct DefaultRetryPolicy(u16);
impl DefaultRetryPolicy {
/// Create default retry policy
pub fn new(retry: u16) -> Self {
DefaultRetryPolicy(retry)
}
}
impl Default for DefaultRetryPolicy {
fn default() -> Self {
DefaultRetryPolicy::new(3)
}
}
impl<R, S> Policy<R, S> for DefaultRetryPolicy
where
R: Clone,
S: Service<R>,
{
async fn retry(&mut self, _: &R, res: &Result<S::Response, S::Error>) -> bool {
if res.is_err() {
if self.0 == 0 {
false
} else {
self.0 -= 1;
true
}
} else {
false
}
}
fn clone_request(&self, req: &R) -> Option<R> {
Some(req.clone())
}
}
#[cfg(test)]
mod tests {
use std::{cell::Cell, rc::Rc};
use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
use super::*;
#[derive(Clone, Debug, PartialEq)]
struct TestService(Rc<Cell<usize>>);
impl Service<()> for TestService {
type Response = ();
type Error = ();
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
let cnt = self.0.get();
if cnt == 0 {
Ok(())
} else {
self.0.set(cnt - 1);
Err(())
}
}
}
#[ntex_macros::rt_test2]
async fn test_retry() {
let cnt = Rc::new(Cell::new(5));
let svc = Pipeline::new(
RetryService::new(DefaultRetryPolicy::default(), TestService(cnt.clone()))
.clone(),
);
assert_eq!(svc.call(()).await, Err(()));
assert_eq!(svc.ready().await, Ok(()));
svc.shutdown().await;
assert_eq!(cnt.get(), 1);
let factory = apply(
Retry::new(DefaultRetryPolicy::new(3)).clone(),
fn_factory(|| async { Ok::<_, ()>(TestService(Rc::new(Cell::new(2)))) }),
);
let srv = factory.pipeline(&()).await.unwrap();
assert_eq!(srv.call(()).await, Ok(()));
}
}

View file

@ -210,7 +210,7 @@ mod tests {
#[ntex_macros::rt_test2]
#[allow(clippy::redundant_clone)]
async fn test_timeout_newservice() {
async fn test_timeout_middleware() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(500);

View file

@ -717,8 +717,8 @@ where
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.lifetime(Seconds::ZERO)
.keep_alive(Seconds(45))
.timeout(Millis(45_000))
.keep_alive(Seconds(60))
.timeout(Millis(60_000))
.disconnect_timeout(Seconds(5))
.openssl(builder.build())
.finish()
@ -727,14 +727,14 @@ where
{
Connector::default()
.lifetime(Seconds::ZERO)
.timeout(Millis(45_000))
.timeout(Millis(60_000))
.finish()
}
};
Client::build()
.connector(connector)
.timeout(Seconds(45))
.timeout(Seconds(60))
.finish()
};