add Service::poll_shutdown

This commit is contained in:
Nikolay Kim 2020-02-17 20:27:00 +06:00
parent 156083c336
commit c38974401a
14 changed files with 98 additions and 13 deletions

View file

@ -26,7 +26,7 @@ before_install:
before_cache: | before_cache: |
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2020-02-16" ]]; then if [[ "$TRAVIS_RUST_VERSION" == "nightly-2020-02-16" ]]; then
RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install --version 0.6.11 cargo-tarpaulin RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install --version 0.10.2 cargo-tarpaulin
fi fi
# Add clippy # Add clippy

View file

@ -3,3 +3,7 @@ members = [
"ntex", "ntex",
"ntex-web-macros", "ntex-web-macros",
] ]
[patch.crates-io]
actix-server = { path = "actix-net/actix-server" }
actix-service = { path = "actix-net/actix-service" }

View file

@ -2,7 +2,6 @@
members = [ members = [
"actix-codec", "actix-codec",
"actix-connect", "actix-connect",
"actix-ioframe",
"actix-rt", "actix-rt",
"actix-macros", "actix-macros",
"actix-service", "actix-service",
@ -19,7 +18,6 @@ members = [
[patch.crates-io] [patch.crates-io]
actix-codec = { path = "actix-codec" } actix-codec = { path = "actix-codec" }
actix-connect = { path = "actix-connect" } actix-connect = { path = "actix-connect" }
actix-ioframe = { path = "actix-ioframe" }
actix-rt = { path = "actix-rt" } actix-rt = { path = "actix-rt" }
actix-macros = { path = "actix-macros" } actix-macros = { path = "actix-macros" }
actix-server = { path = "actix-server" } actix-server = { path = "actix-server" }

View file

@ -49,6 +49,19 @@ where
} }
} }
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.0.get_mut();
if srv.0.poll_shutdown(cx, is_error).is_ready()
&& srv.1.poll_shutdown(cx, is_error).is_ready()
{
Poll::Ready(())
} else {
Poll::Pending
}
}
#[inline]
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
AndThenServiceResponse { AndThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())), state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),

View file

@ -76,6 +76,18 @@ where
} }
} }
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.srv.get_mut();
if srv.0.poll_shutdown(cx, is_error).is_ready()
&& srv.1.poll_shutdown(cx, is_error).is_ready()
{
Poll::Ready(())
} else {
Poll::Pending
}
}
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.srv.get_mut().0.call(req); let fut = self.srv.get_mut().0.call(req);
AndThenApplyFnFuture { AndThenApplyFnFuture {

View file

@ -82,10 +82,17 @@ where
type Error = Err; type Error = Err;
type Future = R; type Future = R;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(futures_util::ready!(self.service.poll_ready(cx))) Poll::Ready(futures_util::ready!(self.service.poll_ready(cx)))
} }
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: In) -> Self::Future { fn call(&mut self, req: In) -> Self::Future {
(self.f)(req, &mut self.service) (self.f)(req, &mut self.service)
} }

View file

@ -135,10 +135,17 @@ where
type Error = Err; type Error = Err;
type Future = BoxFuture<Res, Err>; type Future = BoxFuture<Res, Err>;
#[inline]
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(ctx) self.0.poll_ready(ctx)
} }
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.0.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
Box::pin(self.0.call(req)) Box::pin(self.0.call(req))
} }

View file

@ -51,6 +51,10 @@ impl<T: crate::Service> crate::Service for Cell<T> {
self.get_mut().poll_ready(cx) self.get_mut().poll_ready(cx)
} }
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.get_mut().poll_shutdown(cx, is_error)
}
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
self.get_mut().call(req) self.get_mut().call(req)
} }

View file

@ -97,6 +97,16 @@ pub trait Service {
/// 2. In case of chained services, `.poll_ready()` get called for all services at once. /// 2. In case of chained services, `.poll_ready()` get called for all services at once.
fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>; fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
#[inline]
#[allow(unused_variables)]
/// Shutdown service.
///
/// Returns `Ready` when the service is properly shutdowned. This method might be called
/// after it returns `Ready`.
fn poll_shutdown(&mut self, ctx: &mut task::Context<'_>, is_error: bool) -> Poll<()> {
Poll::Ready(())
}
/// Process the request and return the response asynchronously. /// Process the request and return the response asynchronously.
/// ///
/// This function is expected to be callable off task. As such, /// This function is expected to be callable off task. As such,
@ -108,6 +118,7 @@ pub trait Service {
/// implementation must be resilient to this fact. /// implementation must be resilient to this fact.
fn call(&mut self, req: Self::Request) -> Self::Future; fn call(&mut self, req: Self::Request) -> Self::Future;
#[inline]
/// Map this service's output to a different type, returning a new service /// Map this service's output to a different type, returning a new service
/// of the resulting type. /// of the resulting type.
/// ///
@ -125,6 +136,7 @@ pub trait Service {
crate::dev::Map::new(self, f) crate::dev::Map::new(self, f)
} }
#[inline]
/// Map this service's error to a different error, returning a new service. /// Map this service's error to a different error, returning a new service.
/// ///
/// This function is similar to the `Result::map_err` where it will change /// This function is similar to the `Result::map_err` where it will change

View file

@ -53,10 +53,17 @@ where
type Error = A::Error; type Error = A::Error;
type Future = MapFuture<A, F, Response>; type Future = MapFuture<A, F, Response>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { #[inline]
self.service.poll_ready(ctx) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
} }
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
MapFuture::new(self.service.call(req), self.f.clone()) MapFuture::new(self.service.call(req), self.f.clone())
} }

View file

@ -54,10 +54,17 @@ where
type Error = E; type Error = E;
type Future = MapErrFuture<A, F, E>; type Future = MapErrFuture<A, F, E>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { #[inline]
self.service.poll_ready(ctx).map_err(&self.f) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(&self.f)
} }
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
MapErrFuture::new(self.service.call(req), self.f.clone()) MapErrFuture::new(self.service.call(req), self.f.clone())
} }

View file

@ -161,8 +161,13 @@ impl<T: Service> Service for Pipeline<T> {
type Future = T::Future; type Future = T::Future;
#[inline] #[inline]
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), T::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), T::Error>> {
self.service.poll_ready(ctx) self.service.poll_ready(cx)
}
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
} }
#[inline] #[inline]

View file

@ -49,6 +49,18 @@ where
} }
} }
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.0.get_mut();
if srv.0.poll_shutdown(cx, is_error).is_ready()
&& srv.1.poll_shutdown(cx, is_error).is_ready()
{
Poll::Ready(())
} else {
Poll::Pending
}
}
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
ThenServiceResponse { ThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())), state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),

View file

@ -59,10 +59,7 @@ pub fn test_server<F: ServiceFactory<TcpStream>>(factory: F) -> TestServer {
let (system, addr) = rx.recv().unwrap(); let (system, addr) = rx.recv().unwrap();
TestServer { TestServer { addr, system }
addr,
system,
}
} }
/// Test server controller /// Test server controller