Migrate to ntex-service 1.2 (#209)

* Migrate to ntex-service 1.2
This commit is contained in:
Nikolay Kim 2023-06-15 12:43:26 +06:00
parent e151b1eff1
commit 7960b550c9
84 changed files with 676 additions and 821 deletions

View file

@ -1,63 +1,59 @@
//! Service that buffers incomming requests.
use std::cell::{Cell, RefCell};
use std::task::{Context, Poll};
use std::{collections::VecDeque, future::Future, marker::PhantomData, pin::Pin, rc::Rc};
use std::task::{ready, Context, Poll};
use std::{collections::VecDeque, future::Future, marker::PhantomData, pin::Pin};
use ntex_service::{IntoService, Middleware, Service};
use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall};
use crate::{channel::oneshot, future::Either, task::LocalWaker};
/// Buffer - service factory for service that can buffer incoming request.
///
/// Default number of buffered requests is 16
pub struct Buffer<R, E> {
pub struct Buffer<R> {
buf_size: usize,
err: Rc<dyn Fn() -> E>,
_t: PhantomData<R>,
}
impl<R, E> Buffer<R, E> {
pub fn new<F>(f: F) -> Self
where
F: Fn() -> E + 'static,
{
impl<R> Default for Buffer<R> {
fn default() -> Self {
Self {
buf_size: 16,
err: Rc::new(f),
_t: PhantomData,
}
}
}
impl<R> Buffer<R> {
pub fn buf_size(mut self, size: usize) -> Self {
self.buf_size = size;
self
}
}
impl<R, E> Clone for Buffer<R, E> {
impl<R> Clone for Buffer<R> {
fn clone(&self) -> Self {
Self {
buf_size: self.buf_size,
err: self.err.clone(),
_t: PhantomData,
}
}
}
impl<R, S, E> Middleware<S> for Buffer<R, E>
impl<R, S> Middleware<S> for Buffer<R>
where
S: Service<R, Error = E>,
S: Service<R>,
{
type Service = BufferService<R, S, E>;
type Service = BufferService<R, S>;
fn create(&self, service: S) -> Self::Service {
BufferService {
service,
size: self.buf_size,
err: self.err.clone(),
ready: Cell::new(false),
waker: LocalWaker::default(),
buf: RefCell::new(VecDeque::with_capacity(self.buf_size)),
_t: PhantomData,
}
}
}
@ -65,58 +61,57 @@ where
/// Buffer service - service that can buffer incoming requests.
///
/// Default number of buffered requests is 16
pub struct BufferService<R, S: Service<R, Error = E>, E> {
pub struct BufferService<R, S: Service<R>> {
size: usize,
ready: Cell<bool>,
service: S,
waker: LocalWaker,
err: Rc<dyn Fn() -> E>,
buf: RefCell<VecDeque<(oneshot::Sender<R>, R)>>,
buf: RefCell<VecDeque<oneshot::Sender<()>>>,
_t: PhantomData<R>,
}
impl<R, S, E> BufferService<R, S, E>
impl<R, S> BufferService<R, S>
where
S: Service<R, Error = E>,
S: Service<R>,
{
pub fn new<U, F>(size: usize, err: F, service: U) -> Self
pub fn new<U>(size: usize, service: U) -> Self
where
U: IntoService<S, R>,
F: Fn() -> E + 'static,
{
Self {
size,
err: Rc::new(err),
ready: Cell::new(false),
service: service.into_service(),
waker: LocalWaker::default(),
buf: RefCell::new(VecDeque::with_capacity(size)),
_t: PhantomData,
}
}
}
impl<R, S, E> Clone for BufferService<R, S, E>
impl<R, S> Clone for BufferService<R, S>
where
S: Service<R, Error = E> + Clone,
S: Service<R> + Clone,
{
fn clone(&self) -> Self {
Self {
size: self.size,
err: self.err.clone(),
ready: Cell::new(false),
service: self.service.clone(),
waker: LocalWaker::default(),
buf: RefCell::new(VecDeque::with_capacity(self.size)),
_t: PhantomData,
}
}
}
impl<R, S, E> Service<R> for BufferService<R, S, E>
impl<R, S> Service<R> for BufferService<R, S>
where
S: Service<R, Error = E>,
S: Service<R>,
{
type Response = S::Response;
type Error = S::Error;
type Future<'f> = Either<S::Future<'f>, BufferServiceResponse<'f, R, S, E>> where Self: 'f, R: 'f;
type Future<'f> = Either<ServiceCall<'f, S, R>, BufferServiceResponse<'f, R, S>> where Self: 'f, R: 'f;
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -132,8 +127,8 @@ where
log::trace!("Buffer limit exceeded");
Poll::Pending
}
} else if let Some((sender, req)) = buffer.pop_front() {
let _ = sender.send(req);
} else if let Some(sender) = buffer.pop_front() {
let _ = sender.send(());
self.ready.set(false);
Poll::Ready(Ok(()))
} else {
@ -143,17 +138,18 @@ where
}
#[inline]
fn call(&self, req: R) -> Self::Future<'_> {
fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
if self.ready.get() {
self.ready.set(false);
Either::Left(self.service.call(req))
Either::Left(ctx.call(&self.service, req))
} else {
let (tx, rx) = oneshot::channel();
self.buf.borrow_mut().push_back((tx, req));
self.buf.borrow_mut().push_back(tx);
Either::Right(BufferServiceResponse {
slf: self,
state: State::Tx { rx },
fut: ctx.call(&self.service, req),
rx: Some(rx),
})
}
}
@ -163,63 +159,40 @@ where
pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct BufferServiceResponse<'f, R, S: Service<R, Error = E>, E>
#[must_use = "futures do nothing unless polled"]
pub struct BufferServiceResponse<'f, R, S: Service<R>>
{
slf: &'f BufferService<R, S, E>,
#[pin]
state: State<R, S::Future<'f>>,
fut: ServiceCall<'f, S, R>,
slf: &'f BufferService<R, S>,
rx: Option<oneshot::Receiver<()>>,
}
}
pin_project_lite::pin_project! {
#[project = StateProject]
enum State<R, F>
where F: Future,
{
Tx { rx: oneshot::Receiver<R> },
Srv { #[pin] fut: F },
}
}
impl<'f, R, S, E> Future for BufferServiceResponse<'f, R, S, E>
impl<'f, R, S> Future for BufferServiceResponse<'f, R, S>
where
S: Service<R, Error = E>,
S: Service<R>,
{
type Output = Result<S::Response, S::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
let this = self.as_mut().project();
loop {
match this.state.project() {
StateProject::Tx { rx } => match Pin::new(rx).poll(cx) {
Poll::Ready(Ok(req)) => {
let state = State::Srv {
fut: this.slf.service.call(req),
};
this = self.as_mut().project();
this.state.set(state);
}
Poll::Ready(Err(_)) => return Poll::Ready(Err((*this.slf.err)())),
Poll::Pending => return Poll::Pending,
},
StateProject::Srv { fut } => {
let res = match fut.poll(cx) {
Poll::Ready(res) => res,
Poll::Pending => return Poll::Pending,
};
this.slf.waker.wake();
return Poll::Ready(res);
}
}
if let Some(ref rx) = this.rx {
let _ = ready!(rx.poll_recv(cx));
this.rx.take();
}
let res = ready!(this.fut.poll(cx));
this.slf.waker.wake();
Poll::Ready(res)
}
}
#[cfg(test)]
mod tests {
use ntex_service::{apply, fn_factory, Service, ServiceFactory};
use std::task::{Context, Poll};
use ntex_service::{apply, fn_factory, Container, Service, ServiceFactory};
use std::{rc::Rc, task::Context, task::Poll, time::Duration};
use super::*;
use crate::future::{lazy, Ready};
@ -247,7 +220,7 @@ mod tests {
}
}
fn call(&self, _: ()) -> Self::Future<'_> {
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
self.0.ready.set(false);
self.0.count.set(self.0.count.get() + 1);
Ready::Ok(())
@ -255,21 +228,29 @@ mod tests {
}
#[ntex_macros::rt_test2]
async fn test_transform() {
async fn test_service() {
let inner = Rc::new(Inner {
ready: Cell::new(false),
waker: LocalWaker::default(),
count: Cell::new(0),
});
let srv = BufferService::new(2, || (), TestService(inner.clone())).clone();
let srv = Container::new(BufferService::new(2, TestService(inner.clone())).clone());
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let fut1 = srv.call(());
let srv1 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv1.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 0);
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let fut2 = srv.call(());
let srv1 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv1.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 0);
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
@ -277,14 +258,14 @@ mod tests {
inner.waker.wake();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let _ = fut1.await;
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 1);
inner.ready.set(true);
inner.waker.wake();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let _ = fut2.await;
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 2);
let inner = Rc::new(Inner {
@ -293,7 +274,7 @@ mod tests {
count: Cell::new(0),
});
let srv = BufferService::new(2, || (), TestService(inner.clone()));
let srv = Container::new(BufferService::new(2, TestService(inner.clone())));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let _ = srv.call(()).await;
assert_eq!(inner.count.get(), 1);
@ -303,7 +284,7 @@ mod tests {
#[ntex_macros::rt_test2]
#[allow(clippy::redundant_clone)]
async fn test_newtransform() {
async fn test_middleware() {
let inner = Rc::new(Inner {
ready: Cell::new(false),
waker: LocalWaker::default(),
@ -311,18 +292,26 @@ mod tests {
});
let srv = apply(
Buffer::new(|| ()).buf_size(2).clone(),
Buffer::default().buf_size(2).clone(),
fn_factory(|| async { Ok::<_, ()>(TestService(inner.clone())) }),
);
let srv = srv.create(&()).await.unwrap();
let srv = srv.container(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let fut1 = srv.call(());
let srv1 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv1.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 0);
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let fut2 = srv.call(());
let srv1 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv1.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 0);
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
@ -330,14 +319,14 @@ mod tests {
inner.waker.wake();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let _ = fut1.await;
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 1);
inner.ready.set(true);
inner.waker.wake();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let _ = fut2.await;
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(inner.count.get(), 2);
}
}

View file

@ -1,7 +1,7 @@
//! Service that limits number of in-flight async requests.
use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use ntex_service::{IntoService, Middleware, Service};
use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall};
use super::counter::{Counter, CounterGuard};
@ -76,9 +76,9 @@ where
}
#[inline]
fn call(&self, req: R) -> Self::Future<'_> {
fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
InFlightServiceResponse {
fut: self.service.call(req),
fut: ctx.call(&self.service, req),
_guard: self.count.get(),
_t: PhantomData,
}
@ -93,7 +93,7 @@ pin_project_lite::pin_project! {
where T: 'f, R: 'f
{
#[pin]
fut: T::Future<'f>,
fut: ServiceCall<'f, T, R>,
_guard: CounterGuard,
_t: PhantomData<R>
}
@ -109,39 +109,43 @@ impl<'f, T: Service<R>, R> Future for InFlightServiceResponse<'f, T, R> {
#[cfg(test)]
mod tests {
use ntex_service::{apply, fn_factory, Service, ServiceFactory};
use std::{task::Poll, time::Duration};
use ntex_service::{apply, fn_factory, Container, Ctx, Service, ServiceFactory};
use std::{cell::RefCell, task::Poll, time::Duration};
use super::*;
use crate::future::{lazy, BoxFuture};
use crate::{channel::oneshot, future::lazy, future::BoxFuture};
struct SleepService(Duration);
struct SleepService(oneshot::Receiver<()>);
impl Service<()> for SleepService {
type Response = ();
type Error = ();
type Future<'f> = BoxFuture<'f, Result<(), ()>>;
fn call(&self, _: ()) -> Self::Future<'_> {
let fut = crate::time::sleep(self.0);
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
Box::pin(async move {
fut.await;
let _ = self.0.recv().await;
Ok::<_, ()>(())
})
}
}
#[ntex_macros::rt_test2]
async fn test_inflight() {
let wait_time = Duration::from_millis(50);
async fn test_service() {
let (tx, rx) = oneshot::channel();
let srv = InFlightService::new(1, SleepService(wait_time));
let srv = Container::new(InFlightService::new(1, SleepService(rx)));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
let srv2 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv2.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
let _ = tx.send(());
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready());
}
@ -154,19 +158,28 @@ mod tests {
"InFlight { max_inflight: 1 }"
);
let wait_time = Duration::from_millis(50);
let (tx, rx) = oneshot::channel();
let rx = RefCell::new(Some(rx));
let srv = apply(
InFlight::new(1),
fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }),
fn_factory(move || {
let rx = rx.borrow_mut().take().unwrap();
async move { Ok::<_, ()>(SleepService(rx)) }
}),
);
let srv = srv.create(&()).await.unwrap();
let srv = srv.container(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
let srv2 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv2.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
let _ = tx.send(());
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}
}

View file

@ -1,7 +1,7 @@
use std::task::{Context, Poll};
use std::{cell::Cell, convert::Infallible, marker, time::Duration, time::Instant};
use ntex_service::{Service, ServiceFactory};
use ntex_service::{Ctx, Service, ServiceFactory};
use crate::future::Ready;
use crate::time::{now, sleep, Millis, Sleep};
@ -113,7 +113,7 @@ where
}
}
fn call(&self, req: R) -> Self::Future<'_> {
fn call<'a>(&'a self, req: R, _: Ctx<'a, Self>) -> Self::Future<'a> {
self.expire.set(now());
Ready::Ok(req)
}
@ -121,7 +121,7 @@ where
#[cfg(test)]
mod tests {
use ntex_service::{Service, ServiceFactory};
use ntex_service::ServiceFactory;
use super::*;
use crate::future::lazy;
@ -134,7 +134,7 @@ mod tests {
let factory = KeepAlive::new(Millis(100), || TestErr);
let _ = factory.clone();
let service = factory.create(&()).await.unwrap();
let service = factory.container(&()).await.unwrap();
assert_eq!(service.call(1usize).await, Ok(1usize));
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());

View file

@ -4,7 +4,6 @@ mod extensions;
pub mod inflight;
pub mod keepalive;
pub mod onerequest;
pub mod shared;
pub mod timeout;
pub mod variant;

View file

@ -1,7 +1,7 @@
//! Service that limits number of in-flight async requests to 1.
use std::{cell::Cell, future::Future, pin::Pin, task::Context, task::Poll};
use ntex_service::{IntoService, Middleware, Service};
use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall};
use crate::task::LocalWaker;
@ -63,11 +63,11 @@ where
}
#[inline]
fn call(&self, req: R) -> Self::Future<'_> {
fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
self.ready.set(false);
OneRequestServiceResponse {
fut: self.service.call(req),
fut: ctx.call(&self.service, req),
service: self,
}
}
@ -81,7 +81,7 @@ pin_project_lite::pin_project! {
where T: 'f, R: 'f
{
#[pin]
fut: T::Future<'f>,
fut: ServiceCall<'f, T, R>,
service: &'f OneRequestService<T>,
}
}
@ -101,23 +101,22 @@ impl<'f, T: Service<R>, R> Future for OneRequestServiceResponse<'f, T, R> {
#[cfg(test)]
mod tests {
use ntex_service::{apply, fn_factory, Service, ServiceFactory};
use std::{task::Poll, time::Duration};
use ntex_service::{apply, fn_factory, Container, Ctx, Service, ServiceFactory};
use std::{cell::RefCell, task::Poll, time::Duration};
use super::*;
use crate::future::{lazy, BoxFuture};
use crate::{channel::oneshot, future::lazy, future::BoxFuture};
struct SleepService(Duration);
struct SleepService(oneshot::Receiver<()>);
impl Service<()> for SleepService {
type Response = ();
type Error = ();
type Future<'f> = BoxFuture<'f, Result<(), ()>>;
fn call(&self, _: ()) -> Self::Future<'_> {
let fut = crate::time::sleep(self.0);
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
Box::pin(async move {
fut.await;
let _ = self.0.recv().await;
Ok::<_, ()>(())
})
}
@ -125,15 +124,20 @@ mod tests {
#[ntex_macros::rt_test2]
async fn test_oneshot() {
let wait_time = Duration::from_millis(50);
let (tx, rx) = oneshot::channel();
let srv = OneRequestService::new(SleepService(wait_time));
let srv = Container::new(OneRequestService::new(SleepService(rx)));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
let srv2 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv2.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
let _ = tx.send(());
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready());
}
@ -142,19 +146,28 @@ mod tests {
async fn test_middleware() {
assert_eq!(format!("{:?}", OneRequest), "OneRequest");
let wait_time = Duration::from_millis(50);
let (tx, rx) = oneshot::channel();
let rx = RefCell::new(Some(rx));
let srv = apply(
OneRequest,
fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }),
fn_factory(move || {
let rx = rx.borrow_mut().take().unwrap();
async move { Ok::<_, ()>(SleepService(rx)) }
}),
);
let srv = srv.create(&()).await.unwrap();
let srv = srv.container(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
let srv2 = srv.clone();
ntex::rt::spawn(async move {
let _ = srv2.call(()).await;
});
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
let _ = tx.send(());
crate::time::sleep(Duration::from_millis(25)).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}
}

View file

@ -1,169 +0,0 @@
/// A service that can be checked for readiness by multiple tasks
use std::{
cell::Cell, cell::RefCell, marker::PhantomData, rc::Rc, task::Context, task::Poll,
};
use ntex_service::{Middleware, Service};
use crate::channel::{condition, oneshot};
use crate::future::{poll_fn, select, Either};
/// A middleware that construct sharable service
pub struct Shared<R>(PhantomData<R>);
impl<R> Shared<R> {
pub fn new() -> Self {
Self(PhantomData)
}
}
impl<R> Default for Shared<R> {
fn default() -> Self {
Self::new()
}
}
impl<S: Service<R>, R> Middleware<S> for Shared<R> {
type Service = SharedService<S, R>;
fn create(&self, service: S) -> Self::Service {
SharedService::new(service)
}
}
/// A service that can be checked for readiness by multiple tasks
pub struct SharedService<S: Service<R>, R> {
inner: Rc<Inner<S, R>>,
readiness: condition::Waiter,
}
struct Inner<S: Service<R>, R> {
service: S,
ready: condition::Condition,
driver_stop: Cell<Option<oneshot::Sender<()>>>,
driver_running: Cell<bool>,
error: RefCell<Option<S::Error>>,
}
impl<S: Service<R>, R> SharedService<S, R> {
pub fn new(service: S) -> Self {
let condition = condition::Condition::default();
Self {
readiness: condition.wait(),
inner: Rc::new(Inner {
service,
ready: condition,
driver_stop: Cell::default(),
driver_running: Cell::default(),
error: RefCell::default(),
}),
}
}
}
impl<S: Service<R>, R> Clone for SharedService<S, R> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
readiness: self.readiness.clone(),
}
}
}
impl<S: Service<R>, R> Drop for SharedService<S, R> {
fn drop(&mut self) {
if self.inner.driver_running.get() {
// the only live references to inner are in this SharedService instance and the driver task
if Rc::strong_count(&self.inner) == 2 {
if let Some(stop) = self.inner.driver_stop.take() {
let _ = stop.send(());
}
}
}
}
}
impl<S, R> Service<R> for SharedService<S, R>
where
S: Service<R> + 'static,
S::Error: Clone,
R: 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future<'f> = S::Future<'f> where Self: 'f, R: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// if there is an error, it should be returned to all tasks checking readiness
if let Some(error) = self.inner.error.borrow().as_ref() {
return Poll::Ready(Err(error.clone()));
}
// if the service is being driven to readiness we must register our waker and wait
if self.inner.driver_running.get() {
log::trace!("polled SharedService driver, driver is already running");
// register waker to be notified, regardless of any previous notification
let _ = self.readiness.poll_ready(cx);
return Poll::Pending;
}
// driver is not running, check the inner service is ready
let result = self.inner.service.poll_ready(cx);
log::trace!(
"polled SharedService, ready: {}, errored: {}",
result.is_ready(),
matches!(result, Poll::Ready(Err(_)))
);
match result {
// pass through service is ready, allow call
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
// capture error, all future readiness checks will fail
Poll::Ready(Err(e)) => {
*self.inner.error.borrow_mut() = Some(e.clone());
Poll::Ready(Err(e))
}
// start driver and elide all poll_ready calls until it is complete
Poll::Pending => {
let inner = self.inner.clone();
let (tx, rx) = oneshot::channel();
inner.driver_running.set(true);
inner.driver_stop.set(Some(tx));
ntex_rt::spawn(async move {
log::trace!("SharedService driver has started");
let service_ready = poll_fn(|cx| inner.service.poll_ready(cx));
let clients_gone = rx;
let result = select(service_ready, clients_gone).await;
if let Either::Left(result) = result {
log::trace!(
"SharedService driver completed, errored: {}",
result.is_err()
);
if let Err(e) = result {
inner.error.borrow_mut().replace(e);
}
inner.driver_running.set(false);
inner.driver_stop.set(None);
inner.ready.notify();
} else {
log::trace!("SharedService driver task stopped because all clients are gone");
}
});
// register waker to be notified, regardless of any previous notification
let _ = self.readiness.poll_ready(cx);
Poll::Pending
}
}
}
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
self.inner.service.poll_shutdown(cx)
}
fn call(&self, req: R) -> Self::Future<'_> {
self.inner.service.call(req)
}
}

View file

@ -2,11 +2,9 @@
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::{
fmt, future::Future, marker, marker::PhantomData, pin::Pin, task::Context, task::Poll,
};
use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll};
use ntex_service::{IntoService, Middleware, Service};
use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall};
use crate::future::Either;
use crate::time::{sleep, Millis, Sleep};
@ -127,17 +125,17 @@ where
type Error = TimeoutError<S::Error>;
type Future<'f> = Either<TimeoutServiceResponse<'f, S, R>, TimeoutServiceResponse2<'f, S, R>> where Self: 'f, R: 'f;
fn call(&self, request: R) -> Self::Future<'_> {
fn call<'a>(&'a self, request: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> {
if self.timeout.is_zero() {
Either::Right(TimeoutServiceResponse2 {
fut: self.service.call(request),
_t: PhantomData,
fut: ctx.call(&self.service, request),
_t: marker::PhantomData,
})
} else {
Either::Left(TimeoutServiceResponse {
fut: self.service.call(request),
fut: ctx.call(&self.service, request),
sleep: sleep(self.timeout),
_t: PhantomData,
_t: marker::PhantomData,
})
}
}
@ -149,14 +147,14 @@ where
pin_project_lite::pin_project! {
/// `TimeoutService` response future
#[doc(hidden)]
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct TimeoutServiceResponse<'f, T: Service<R>, R>
where T: 'f, R: 'f,
{
#[pin]
fut: T::Future<'f>,
fut: ServiceCall<'f, T, R>,
sleep: Sleep,
_t: PhantomData<R>
_t: marker::PhantomData<R>
}
}
@ -187,13 +185,13 @@ where
pin_project_lite::pin_project! {
/// `TimeoutService` response future
#[doc(hidden)]
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct TimeoutServiceResponse2<'f, T: Service<R>, R>
where T: 'f, R: 'f,
{
#[pin]
fut: T::Future<'f>,
_t: PhantomData<R>,
fut: ServiceCall<'f, T, R>,
_t: marker::PhantomData<R>,
}
}
@ -216,7 +214,7 @@ where
mod tests {
use std::{fmt, time::Duration};
use ntex_service::{apply, fn_factory, Service, ServiceFactory};
use ntex_service::{apply, fn_factory, Container, Service, ServiceFactory};
use super::*;
use crate::future::{lazy, BoxFuture};
@ -238,7 +236,7 @@ mod tests {
type Error = SrvError;
type Future<'f> = BoxFuture<'f, Result<(), SrvError>>;
fn call(&self, _: ()) -> Self::Future<'_> {
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
let fut = crate::time::sleep(self.0);
Box::pin(async move {
fut.await;
@ -252,7 +250,9 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(50);
let timeout = TimeoutService::new(resolution, SleepService(wait_time)).clone();
let timeout = Container::new(
TimeoutService::new(resolution, SleepService(wait_time)).clone(),
);
assert_eq!(timeout.call(()).await, Ok(()));
assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready());
assert!(lazy(|cx| timeout.poll_shutdown(cx)).await.is_ready());
@ -263,7 +263,8 @@ mod tests {
let wait_time = Duration::from_millis(50);
let resolution = Duration::from_millis(0);
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
let timeout =
Container::new(TimeoutService::new(resolution, SleepService(wait_time)));
assert_eq!(timeout.call(()).await, Ok(()));
assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready());
}
@ -273,7 +274,8 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(500);
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
let timeout =
Container::new(TimeoutService::new(resolution, SleepService(wait_time)));
assert_eq!(timeout.call(()).await, Err(TimeoutError::Timeout));
}
@ -287,7 +289,7 @@ mod tests {
Timeout::new(resolution).clone(),
fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }),
);
let srv = timeout.create(&()).await.unwrap();
let srv = timeout.container(&()).await.unwrap();
let res = srv.call(()).await.unwrap_err();
assert_eq!(res, TimeoutError::Timeout);

View file

@ -1,7 +1,7 @@
//! Contains `Variant` service and related types and functions.
use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll};
use ntex_service::{IntoServiceFactory, Service, ServiceFactory};
use ntex_service::{Ctx, IntoServiceFactory, Service, ServiceCall, ServiceFactory};
/// Construct `Variant` service factory.
///
@ -103,7 +103,8 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
{
type Response = V1::Response;
type Error = V1::Error;
type Future<'f> = $mod_name::ServiceResponse<V1::Future<'f>, $($T::Future<'f>),+> where Self: 'f, V1: 'f;
type Future<'f> = $mod_name::ServiceResponse<
ServiceCall<'f, V1, V1R>, $(ServiceCall<'f, $T, $R>),+> where Self: 'f, V1: 'f;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut ready = self.V1.poll_ready(cx)?.is_ready();
@ -127,10 +128,11 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident,
}
}
fn call(&self, req: $enum_type<V1R, $($R,)+>) -> Self::Future<'_> {
fn call<'a>(&'a self, req: $enum_type<V1R, $($R,)+>, ctx: Ctx<'a, Self>) -> Self::Future<'a>
{
match req {
$enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: self.V1.call(req) },
$($enum_type::$T(req) => $mod_name::ServiceResponse::$T { fut: self.$T.call(req) },)+
$enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: ctx.call(&self.V1, req) },
$($enum_type::$T(req) => $mod_name::ServiceResponse::$T { fut: ctx.call(&self.$T, req) },)+
}
}
}
@ -319,7 +321,7 @@ mod tests {
Poll::Ready(())
}
fn call(&self, _: ()) -> Self::Future<'_> {
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
Ready::<_, ()>::Ok(1)
}
}
@ -340,7 +342,7 @@ mod tests {
Poll::Ready(())
}
fn call(&self, _: ()) -> Self::Future<'_> {
fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> {
Ready::<_, ()>::Ok(2)
}
}
@ -352,7 +354,7 @@ mod tests {
.clone()
.v3(fn_factory(|| async { Ok::<_, ()>(Srv2) }))
.clone();
let service = factory.create(&()).await.unwrap().clone();
let service = factory.container(&()).await.unwrap().clone();
assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready());
assert!(lazy(|cx| service.poll_shutdown(cx)).await.is_ready());

View file

@ -711,7 +711,7 @@ impl Future for LowresTimerDriver {
flags.remove(Flags::LOWRES_TIMER);
self.0.flags.set(flags);
}
return Poll::Pending;
Poll::Pending
}
}