refactor Service trait

This commit is contained in:
Nikolay Kim 2020-03-30 17:44:22 +06:00
parent a3abae8c16
commit 89cebe5534
58 changed files with 701 additions and 593 deletions

View file

@ -62,6 +62,34 @@ impl<T, U> Router<T, U> {
}
}
pub fn recognize_checked<R, P, F>(
&self,
resource: &mut R,
check: F,
) -> Option<(&T, ResourceId)>
where
F: Fn(&R, Option<&U>) -> bool,
R: Resource<P>,
P: ResourcePath,
{
if let Some(idx) = if self.insensitive {
self.tree.find_checked_insensitive(resource, &|idx, res| {
let item = &self.resources[idx];
check(res, item.2.as_ref())
})
} else {
self.tree.find_checked(resource, &|idx, res| {
let item = &self.resources[idx];
check(res, item.2.as_ref())
})
} {
let item = &self.resources[idx];
Some((&item.1, ResourceId(item.0.id())))
} else {
None
}
}
pub fn recognize_mut_checked<R, P, F>(
&mut self,
resource: &mut R,

View file

@ -4,13 +4,12 @@ use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
use crate::cell::Cell;
/// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully.
///
/// This is created by the `ServiceExt::and_then` method.
pub(crate) struct AndThenService<A, B>(Cell<(A, B)>);
pub(crate) struct AndThenService<A, B>(Rc<(A, B)>);
impl<A, B> AndThenService<A, B> {
/// Create new `AndThen` combinator
@ -19,7 +18,7 @@ impl<A, B> AndThenService<A, B> {
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Cell::new((a, b)))
Self(Rc::new((a, b)))
}
}
@ -39,8 +38,8 @@ where
type Error = A::Error;
type Future = AndThenServiceResponse<A, B>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.as_ref();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
@ -49,8 +48,8 @@ where
}
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.0.get_mut();
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.0.as_ref();
if srv.0.poll_shutdown(cx, is_error).is_ready()
&& srv.1.poll_shutdown(cx, is_error).is_ready()
@ -62,9 +61,9 @@ where
}
#[inline]
fn call(&mut self, req: A::Request) -> Self::Future {
fn call(&self, req: A::Request) -> Self::Future {
AndThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),
state: State::A(self.0.as_ref().0.call(req), Some(self.0.clone())),
}
}
}
@ -85,7 +84,7 @@ where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Cell<(A, B)>>),
A(#[pin] A::Future, Option<Rc<(A, B)>>),
B(#[pin] B::Future),
Empty,
}
@ -105,9 +104,9 @@ where
match this.state.as_mut().project() {
State::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let mut b = b.take().unwrap();
let b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.get_mut().1.call(res);
let fut = b.as_ref().1.call(res);
this.state.set(State::B(fut));
self.poll(cx)
}
@ -284,12 +283,12 @@ mod tests {
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
fn call(&self, req: &'static str) -> Self::Future {
ok(req)
}
}
@ -303,12 +302,12 @@ mod tests {
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
fn call(&self, req: &'static str) -> Self::Future {
ok((req, "srv2"))
}
}
@ -316,7 +315,7 @@ mod tests {
#[ntex_rt::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let mut srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone()));
let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone()));
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 2);
@ -325,7 +324,7 @@ mod tests {
#[ntex_rt::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let mut srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt));
let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt));
let res = srv.call("srv1").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv1", "srv2")));
@ -339,7 +338,7 @@ mod tests {
pipeline_factory(fn_factory(move || ready(Ok::<_, ()>(Srv1(cnt2.clone())))))
.and_then(move || ready(Ok(Srv2(cnt.clone()))));
let mut srv = new_srv.new_service(()).await.unwrap();
let srv = new_srv.new_service(()).await.unwrap();
let res = srv.call("srv1").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2"));

View file

@ -4,7 +4,6 @@ use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use crate::cell::Cell;
use crate::{Service, ServiceFactory};
/// `Apply` service combinator
@ -12,11 +11,11 @@ pub(crate) struct AndThenApplyFn<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
F: Fn(A::Response, &B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
srv: Cell<(A, B, F)>,
srv: Rc<(A, B, F)>,
r: PhantomData<(Fut, Res, Err)>,
}
@ -24,14 +23,14 @@ impl<A, B, F, Fut, Res, Err> AndThenApplyFn<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
F: Fn(A::Response, &B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
/// Create new `Apply` combinator
pub(crate) fn new(a: A, b: B, f: F) -> Self {
Self {
srv: Cell::new((a, b, f)),
srv: Rc::new((a, b, f)),
r: PhantomData,
}
}
@ -41,7 +40,7 @@ impl<A, B, F, Fut, Res, Err> Clone for AndThenApplyFn<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
F: Fn(A::Response, &B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
@ -57,7 +56,7 @@ impl<A, B, F, Fut, Res, Err> Service for AndThenApplyFn<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
F: Fn(A::Response, &B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
@ -66,8 +65,8 @@ where
type Error = Err;
type Future = AndThenApplyFnFuture<A, B, F, Fut, Res, Err>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let inner = self.srv.get_mut();
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let inner = self.srv.as_ref();
let not_ready = inner.0.poll_ready(cx)?.is_pending();
if inner.1.poll_ready(cx)?.is_pending() || not_ready {
Poll::Pending
@ -76,8 +75,8 @@ where
}
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.srv.get_mut();
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.srv.as_ref();
if srv.0.poll_shutdown(cx, is_error).is_ready()
&& srv.1.poll_shutdown(cx, is_error).is_ready()
@ -88,8 +87,8 @@ where
}
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.srv.get_mut().0.call(req);
fn call(&self, req: A::Request) -> Self::Future {
let fut = self.srv.as_ref().0.call(req);
AndThenApplyFnFuture {
state: State::A(fut, Some(self.srv.clone())),
}
@ -101,7 +100,7 @@ pub(crate) struct AndThenApplyFnFuture<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
F: Fn(A::Response, &B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error>,
Err: From<B::Error>,
@ -115,12 +114,12 @@ enum State<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
F: Fn(A::Response, &B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error>,
Err: From<B::Error>,
{
A(#[pin] A::Future, Option<Cell<(A, B, F)>>),
A(#[pin] A::Future, Option<Rc<(A, B, F)>>),
B(#[pin] Fut),
Empty,
}
@ -129,7 +128,7 @@ impl<A, B, F, Fut, Res, Err> Future for AndThenApplyFnFuture<A, B, F, Fut, Res,
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
F: Fn(A::Response, &B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
@ -143,10 +142,10 @@ where
match this.state.as_mut().project() {
State::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let mut b = b.take().unwrap();
let b = b.take().unwrap();
this.state.set(State::Empty);
let b = b.get_mut();
let fut = (&mut b.2)(res, &mut b.1);
let b = b.as_ref();
let fut = (&b.2)(res, &b.1);
this.state.set(State::B(fut));
self.poll(cx)
}
@ -173,7 +172,7 @@ impl<A, B, F, Fut, Res, Err> AndThenApplyFnFactory<A, B, F, Fut, Res, Err>
where
A: ServiceFactory,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
F: Fn(A::Response, &B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
@ -201,7 +200,7 @@ where
A: ServiceFactory,
A::Config: Clone,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
F: Fn(A::Response, &B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
@ -230,7 +229,7 @@ pub(crate) struct AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err>
where
A: ServiceFactory,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
F: Fn(A::Response, &B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error>,
Err: From<B::Error>,
@ -249,7 +248,7 @@ impl<A, B, F, Fut, Res, Err> Future
where
A: ServiceFactory,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
F: Fn(A::Response, &B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
@ -273,7 +272,7 @@ where
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(AndThenApplyFn {
srv: Cell::new((
srv: Rc::new((
this.a.take().unwrap(),
this.b.take().unwrap(),
this.f.clone(),
@ -302,18 +301,18 @@ mod tests {
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
fn call(&self, req: Self::Request) -> Self::Future {
ok(req)
}
}
#[ntex_rt::test]
async fn test_service() {
let mut srv = pipeline(|r: &'static str| ok(r))
let srv = pipeline(|r: &'static str| ok(r))
.and_then_apply_fn(Srv, |req: &'static str, s| {
s.call(()).map_ok(move |res| (req, res))
});
@ -333,7 +332,7 @@ mod tests {
|| ok(Srv),
|req: &'static str, s| s.call(()).map_ok(move |res| (req, res)),
);
let mut srv = new_srv.new_service(()).await.unwrap();
let srv = new_srv.new_service(()).await.unwrap();
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));

View file

@ -12,7 +12,7 @@ pub fn apply_fn<T, F, R, In, Out, Err, U>(
) -> Apply<T, F, R, In, Out, Err>
where
T: Service<Error = Err>,
F: FnMut(In, &mut T) -> R,
F: Fn(In, &T) -> R,
R: Future<Output = Result<Out, Err>>,
U: IntoService<T>,
{
@ -26,7 +26,7 @@ pub fn apply_fn_factory<T, F, R, In, Out, Err, U>(
) -> ApplyServiceFactory<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R + Clone,
F: Fn(In, &T::Service) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
U: IntoServiceFactory<T>,
{
@ -46,7 +46,7 @@ where
impl<T, F, R, In, Out, Err> Apply<T, F, R, In, Out, Err>
where
T: Service<Error = Err>,
F: FnMut(In, &mut T) -> R,
F: Fn(In, &T) -> R,
R: Future<Output = Result<Out, Err>>,
{
/// Create new `Apply` combinator
@ -62,7 +62,7 @@ where
impl<T, F, R, In, Out, Err> Clone for Apply<T, F, R, In, Out, Err>
where
T: Service<Error = Err> + Clone,
F: FnMut(In, &mut T) -> R + Clone,
F: Fn(In, &T) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
fn clone(&self) -> Self {
@ -77,7 +77,7 @@ where
impl<T, F, R, In, Out, Err> Service for Apply<T, F, R, In, Out, Err>
where
T: Service<Error = Err>,
F: FnMut(In, &mut T) -> R,
F: Fn(In, &T) -> R,
R: Future<Output = Result<Out, Err>>,
{
type Request = In;
@ -86,18 +86,18 @@ where
type Future = R;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(futures_util::ready!(self.service.poll_ready(cx)))
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: In) -> Self::Future {
(self.f)(req, &mut self.service)
fn call(&self, req: In) -> Self::Future {
(self.f)(req, &self.service)
}
}
@ -105,7 +105,7 @@ where
pub struct ApplyServiceFactory<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R + Clone,
F: Fn(In, &T::Service) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
service: T,
@ -116,7 +116,7 @@ where
impl<T, F, R, In, Out, Err> ApplyServiceFactory<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R + Clone,
F: Fn(In, &T::Service) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
/// Create new `ApplyNewService` new service instance
@ -132,7 +132,7 @@ where
impl<T, F, R, In, Out, Err> Clone for ApplyServiceFactory<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err> + Clone,
F: FnMut(In, &mut T::Service) -> R + Clone,
F: Fn(In, &T::Service) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
fn clone(&self) -> Self {
@ -147,7 +147,7 @@ where
impl<T, F, R, In, Out, Err> ServiceFactory for ApplyServiceFactory<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R + Clone,
F: Fn(In, &T::Service) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
type Request = In;
@ -168,7 +168,7 @@ where
pub struct ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R,
F: Fn(In, &T::Service) -> R,
R: Future<Output = Result<Out, Err>>,
{
#[pin]
@ -180,7 +180,7 @@ where
impl<T, F, R, In, Out, Err> ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R,
F: Fn(In, &T::Service) -> R,
R: Future<Output = Result<Out, Err>>,
{
fn new(fut: T::Future, f: F) -> Self {
@ -195,7 +195,7 @@ where
impl<T, F, R, In, Out, Err> Future for ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
where
T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R,
F: Fn(In, &T::Service) -> R,
R: Future<Output = Result<Out, Err>>,
{
type Output = Result<Apply<T::Service, F, R, In, Out, Err>, T::InitError>;
@ -229,18 +229,18 @@ mod tests {
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
fn call(&self, _: ()) -> Self::Future {
ok(())
}
}
#[ntex_rt::test]
async fn test_call() {
let mut srv = pipeline(apply_fn(Srv, |req: &'static str, srv| {
let srv = pipeline(apply_fn(Srv, |req: &'static str, srv| {
let fut = srv.call(());
async move {
let res = fut.await.unwrap();
@ -268,7 +268,7 @@ mod tests {
},
));
let mut srv = new_srv.new_service(()).await.unwrap();
let srv = new_srv.new_service(()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

View file

@ -1,9 +1,9 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use crate::cell::Cell;
use crate::{Service, ServiceFactory};
/// Convert `Fn(Config, &mut Service1) -> Future<Service2>` fn to a service factory
@ -20,18 +20,18 @@ pub fn apply_cfg<F, C, T, R, S, E>(
Future = R,
> + Clone
where
F: FnMut(C, &mut T) -> R,
F: Fn(C, &T) -> R,
T: Service,
R: Future<Output = Result<S, E>>,
S: Service,
{
ApplyConfigService {
srv: Cell::new((srv, f)),
srv: Rc::new((srv, f)),
_t: PhantomData,
}
}
/// Convert `Fn(Config, &mut Service1) -> Future<Service2>` fn to a service factory
/// Convert `Fn(Config, &Service1) -> Future<Service2>` fn to a service factory
///
/// Service1 get constructed from `T` factory.
pub fn apply_cfg_factory<F, C, T, R, S>(
@ -46,33 +46,33 @@ pub fn apply_cfg_factory<F, C, T, R, S>(
InitError = T::InitError,
> + Clone
where
F: FnMut(C, &mut T::Service) -> R,
F: Fn(C, &T::Service) -> R,
T: ServiceFactory<Config = ()>,
T::InitError: From<T::Error>,
R: Future<Output = Result<S, T::InitError>>,
S: Service,
{
ApplyConfigServiceFactory {
srv: Cell::new((factory, f)),
srv: Rc::new((factory, f)),
_t: PhantomData,
}
}
/// Convert `Fn(Config, &mut Server) -> Future<Service>` fn to NewService\
/// Convert `Fn(Config, &Server) -> Future<Service>` fn to NewService\
struct ApplyConfigService<F, C, T, R, S, E>
where
F: FnMut(C, &mut T) -> R,
F: Fn(C, &T) -> R,
T: Service,
R: Future<Output = Result<S, E>>,
S: Service,
{
srv: Cell<(T, F)>,
srv: Rc<(T, F)>,
_t: PhantomData<(C, R, S)>,
}
impl<F, C, T, R, S, E> Clone for ApplyConfigService<F, C, T, R, S, E>
where
F: FnMut(C, &mut T) -> R,
F: Fn(C, &T) -> R,
T: Service,
R: Future<Output = Result<S, E>>,
S: Service,
@ -87,7 +87,7 @@ where
impl<F, C, T, R, S, E> ServiceFactory for ApplyConfigService<F, C, T, R, S, E>
where
F: FnMut(C, &mut T) -> R,
F: Fn(C, &T) -> R,
T: Service,
R: Future<Output = Result<S, E>>,
S: Service,
@ -102,28 +102,26 @@ where
type Future = R;
fn new_service(&self, cfg: C) -> Self::Future {
unsafe {
let srv = self.srv.get_mut_unsafe();
(srv.1)(cfg, &mut srv.0)
}
let srv = self.srv.as_ref();
(srv.1)(cfg, &srv.0)
}
}
/// Convert `Fn(&Config) -> Future<Service>` fn to NewService
struct ApplyConfigServiceFactory<F, C, T, R, S>
where
F: FnMut(C, &mut T::Service) -> R,
F: Fn(C, &T::Service) -> R,
T: ServiceFactory<Config = ()>,
R: Future<Output = Result<S, T::InitError>>,
S: Service,
{
srv: Cell<(T, F)>,
srv: Rc<(T, F)>,
_t: PhantomData<(C, R, S)>,
}
impl<F, C, T, R, S> Clone for ApplyConfigServiceFactory<F, C, T, R, S>
where
F: FnMut(C, &mut T::Service) -> R,
F: Fn(C, &T::Service) -> R,
T: ServiceFactory<Config = ()>,
R: Future<Output = Result<S, T::InitError>>,
S: Service,
@ -138,7 +136,7 @@ where
impl<F, C, T, R, S> ServiceFactory for ApplyConfigServiceFactory<F, C, T, R, S>
where
F: FnMut(C, &mut T::Service) -> R,
F: Fn(C, &T::Service) -> R,
T: ServiceFactory<Config = ()>,
T::InitError: From<T::Error>,
R: Future<Output = Result<S, T::InitError>>,
@ -157,7 +155,7 @@ where
ApplyConfigServiceFactoryResponse {
cfg: Some(cfg),
store: self.srv.clone(),
state: State::A(self.srv.get_ref().0.new_service(())),
state: State::A(self.srv.as_ref().0.new_service(())),
}
}
}
@ -165,14 +163,14 @@ where
#[pin_project::pin_project]
struct ApplyConfigServiceFactoryResponse<F, C, T, R, S>
where
F: FnMut(C, &mut T::Service) -> R,
F: Fn(C, &T::Service) -> R,
T: ServiceFactory<Config = ()>,
T::InitError: From<T::Error>,
R: Future<Output = Result<S, T::InitError>>,
S: Service,
{
cfg: Option<C>,
store: Cell<(T, F)>,
store: Rc<(T, F)>,
#[pin]
state: State<T, R, S>,
}
@ -192,7 +190,7 @@ where
impl<F, C, T, R, S> Future for ApplyConfigServiceFactoryResponse<F, C, T, R, S>
where
F: FnMut(C, &mut T::Service) -> R,
F: Fn(C, &T::Service) -> R,
T: ServiceFactory<Config = ()>,
T::InitError: From<T::Error>,
R: Future<Output = Result<S, T::InitError>>,
@ -215,7 +213,7 @@ where
},
State::B(srv) => match srv.poll_ready(cx)? {
Poll::Ready(_) => {
let fut = (this.store.get_mut().1)(this.cfg.take().unwrap(), srv);
let fut = (this.store.as_ref().1)(this.cfg.take().unwrap(), srv);
this.state.set(State::C(fut));
self.poll(cx)
}

View file

@ -145,17 +145,17 @@ where
type Future = BoxFuture<Res, Err>;
#[inline]
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(ctx)
}
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.0.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: Self::Request) -> Self::Future {
fn call(&self, req: Self::Request) -> Self::Future {
Box::pin(self.0.call(req))
}
}

View file

@ -1,61 +0,0 @@
//! Custom cell impl, internal use only
use std::task::{Context, Poll};
use std::{cell::UnsafeCell, fmt, rc::Rc};
pub(crate) struct Cell<T> {
inner: Rc<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Cell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> Cell<T> {
pub(crate) fn new(inner: T) -> Self {
Self {
inner: Rc::new(UnsafeCell::new(inner)),
}
}
pub(crate) fn get_ref(&self) -> &T {
unsafe { &*self.inner.as_ref().get() }
}
pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
&mut *self.inner.as_ref().get()
}
}
impl<T: crate::Service> crate::Service for Cell<T> {
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().poll_ready(cx)
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.get_mut().poll_shutdown(cx, is_error)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.get_mut().call(req)
}
}

View file

@ -6,17 +6,19 @@ use futures_util::future::{ok, Ready};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
#[inline]
/// Create `ServiceFactory` for function that can act as a `Service`
pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
f: F,
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
where
F: FnMut(Req) -> Fut + Clone,
F: Fn(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
FnServiceFactory::new(f)
}
#[inline]
/// Create `ServiceFactory` for function that can produce services
///
/// # Example
@ -43,7 +45,7 @@ where
/// });
///
/// // construct new service
/// let mut srv = factory.new_service(()).await?;
/// let srv = factory.new_service(()).await?;
///
/// // now we can use `div` service
/// let result = srv.call((10, 20)).await?;
@ -64,6 +66,7 @@ where
FnServiceNoConfig::new(f)
}
#[inline]
/// Create `ServiceFactory` for function that accepts config argument and can produce services
///
/// Any function that has following form `Fn(Config) -> Future<Output = Service>` could
@ -85,7 +88,7 @@ where
/// });
///
/// // construct new service with config argument
/// let mut srv = factory.new_service(10).await?;
/// let srv = factory.new_service(10).await?;
///
/// let result = srv.call(10).await?;
/// assert_eq!(result, 100);
@ -107,7 +110,7 @@ where
pub struct FnService<F, Fut, Req, Res, Err>
where
F: FnMut(Req) -> Fut,
F: Fn(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
f: F,
@ -116,7 +119,7 @@ where
impl<F, Fut, Req, Res, Err> FnService<F, Fut, Req, Res, Err>
where
F: FnMut(Req) -> Fut,
F: Fn(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
pub(crate) fn new(f: F) -> Self {
@ -126,9 +129,10 @@ where
impl<F, Fut, Req, Res, Err> Clone for FnService<F, Fut, Req, Res, Err>
where
F: FnMut(Req) -> Fut + Clone,
F: Fn(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
#[inline]
fn clone(&self) -> Self {
Self::new(self.f.clone())
}
@ -136,7 +140,7 @@ where
impl<F, Fut, Req, Res, Err> Service for FnService<F, Fut, Req, Res, Err>
where
F: FnMut(Req) -> Fut,
F: Fn(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
type Request = Req;
@ -144,20 +148,23 @@ where
type Error = Err;
type Future = Fut;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Req) -> Self::Future {
#[inline]
fn call(&self, req: Req) -> Self::Future {
(self.f)(req)
}
}
impl<F, Fut, Req, Res, Err> IntoService<FnService<F, Fut, Req, Res, Err>> for F
where
F: FnMut(Req) -> Fut,
F: Fn(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
#[inline]
fn into_service(self) -> FnService<F, Fut, Req, Res, Err> {
FnService::new(self)
}
@ -165,7 +172,7 @@ where
pub struct FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
where
F: FnMut(Req) -> Fut,
F: Fn(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
f: F,
@ -174,7 +181,7 @@ where
impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
where
F: FnMut(Req) -> Fut + Clone,
F: Fn(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
fn new(f: F) -> Self {
@ -184,9 +191,10 @@ where
impl<F, Fut, Req, Res, Err, Cfg> Clone for FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
where
F: FnMut(Req) -> Fut + Clone,
F: Fn(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
#[inline]
fn clone(&self) -> Self {
Self::new(self.f.clone())
}
@ -194,7 +202,7 @@ where
impl<F, Fut, Req, Res, Err> Service for FnServiceFactory<F, Fut, Req, Res, Err, ()>
where
F: FnMut(Req) -> Fut + Clone,
F: Fn(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
type Request = Req;
@ -202,11 +210,13 @@ where
type Error = Err;
type Future = Fut;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
#[inline]
fn call(&self, req: Self::Request) -> Self::Future {
(self.f)(req)
}
}
@ -214,7 +224,7 @@ where
impl<F, Fut, Req, Res, Err, Cfg> ServiceFactory
for FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
where
F: FnMut(Req) -> Fut + Clone,
F: Fn(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
type Request = Req;
@ -226,6 +236,7 @@ where
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
#[inline]
fn new_service(&self, _: Cfg) -> Self::Future {
ok(FnService::new(self.f.clone()))
}
@ -237,6 +248,7 @@ where
F: Fn(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
#[inline]
fn into_factory(self) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg> {
FnServiceFactory::new(self)
}
@ -270,6 +282,7 @@ where
Fut: Future<Output = Result<Srv, Err>>,
Srv: Service,
{
#[inline]
fn clone(&self) -> Self {
FnServiceConfig {
f: self.f.clone(),
@ -293,6 +306,7 @@ where
type InitError = Err;
type Future = Fut;
#[inline]
fn new_service(&self, cfg: Cfg) -> Self::Future {
(self.f)(cfg)
}
@ -334,6 +348,7 @@ where
type InitError = E;
type Future = R;
#[inline]
fn new_service(&self, _: C) -> Self::Future {
(self.f)()
}
@ -345,6 +360,7 @@ where
R: Future<Output = Result<S, E>>,
S: Service,
{
#[inline]
fn clone(&self) -> Self {
Self::new(self.f.clone())
}
@ -356,6 +372,7 @@ where
R: Future<Output = Result<S, E>>,
S: Service,
{
#[inline]
fn into_factory(self) -> FnServiceNoConfig<F, C, S, R, E> {
FnServiceNoConfig::new(self)
}
@ -374,7 +391,7 @@ mod tests {
async fn test_fn_service() {
let new_srv = fn_service(|()| ok::<_, ()>("srv"));
let mut srv = new_srv.new_service(()).await.unwrap();
let srv = new_srv.new_service(()).await.unwrap();
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
@ -383,7 +400,7 @@ mod tests {
#[ntex_rt::test]
async fn test_fn_service_service() {
let mut srv = fn_service(|()| ok::<_, ()>("srv"));
let srv = fn_service(|()| ok::<_, ()>("srv"));
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
@ -397,7 +414,7 @@ mod tests {
ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg))))
});
let mut srv = new_srv.new_service(1).await.unwrap();
let srv = new_srv.new_service(1).await.unwrap();
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());

View file

@ -1,7 +1,6 @@
#![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)]
use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
use std::sync::Arc;
@ -12,7 +11,6 @@ mod and_then_apply_fn;
mod apply;
mod apply_cfg;
pub mod boxed;
mod cell;
mod fn_service;
mod map;
mod map_config;
@ -95,10 +93,7 @@ pub trait Service {
/// 1. `.poll_ready()` might be called on different task from actual service call.
///
/// 2. In case of chained services, `.poll_ready()` get called for all services at once.
fn poll_ready(
&mut self,
ctx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>>;
fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
#[inline]
#[allow(unused_variables)]
@ -106,11 +101,7 @@ pub trait Service {
///
/// Returns `Ready` when the service is properly shutdowned. This method might be called
/// after it returns `Ready`.
fn poll_shutdown(
&mut self,
ctx: &mut task::Context<'_>,
is_error: bool,
) -> Poll<()> {
fn poll_shutdown(&self, ctx: &mut task::Context<'_>, is_error: bool) -> Poll<()> {
Poll::Ready(())
}
@ -123,7 +114,7 @@ pub trait Service {
///
/// Calling `call` without calling `poll_ready` is permitted. The
/// implementation must be resilient to this fact.
fn call(&mut self, req: Self::Request) -> Self::Future;
fn call(&self, req: Self::Request) -> Self::Future;
#[inline]
/// Map this service's output to a different type, returning a new service
@ -199,6 +190,7 @@ pub trait ServiceFactory {
/// Create and return a new service value asynchronously.
fn new_service(&self, cfg: Self::Config) -> Self::Future;
#[inline]
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
fn map<F, R>(self, f: F) -> crate::map::MapServiceFactory<Self, F, R>
@ -209,6 +201,7 @@ pub trait ServiceFactory {
crate::map::MapServiceFactory::new(self, f)
}
#[inline]
/// Map this service's error to a different error, returning a new service.
fn map_err<F, E>(self, f: F) -> crate::map_err::MapErrServiceFactory<Self, F, E>
where
@ -218,6 +211,7 @@ pub trait ServiceFactory {
crate::map_err::MapErrServiceFactory::new(self, f)
}
#[inline]
/// Map this factory's init error to a different error, returning a new service.
fn map_init_err<F, E>(self, f: F) -> crate::map_init_err::MapInitErr<Self, F, E>
where
@ -237,11 +231,18 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(**self).poll_ready(ctx)
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(**self).poll_ready(cx)
}
fn call(&mut self, request: Self::Request) -> S::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
(**self).poll_shutdown(cx, is_error)
}
#[inline]
fn call(&self, request: Self::Request) -> S::Future {
(**self).call(request)
}
}
@ -255,16 +256,23 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
(**self).poll_ready(ctx)
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
(**self).poll_ready(cx)
}
fn call(&mut self, request: Self::Request) -> S::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
(**self).poll_shutdown(cx, is_error)
}
#[inline]
fn call(&self, request: Self::Request) -> S::Future {
(**self).call(request)
}
}
impl<S> Service for RefCell<S>
impl<S> Service for Rc<S>
where
S: Service,
{
@ -273,30 +281,17 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.borrow_mut().poll_ready(ctx)
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(**self).poll_ready(cx)
}
fn call(&mut self, request: Self::Request) -> S::Future {
self.borrow_mut().call(request)
}
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
(**self).poll_shutdown(cx, is_error)
}
impl<S> Service for Rc<RefCell<S>>
where
S: Service,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.borrow_mut().poll_ready(ctx)
}
fn call(&mut self, request: Self::Request) -> S::Future {
(&mut (**self).borrow_mut()).call(request)
fn call(&self, request: Self::Request) -> S::Future {
(**self).call(request)
}
}

View file

@ -34,6 +34,7 @@ where
A: Clone,
F: Clone,
{
#[inline]
fn clone(&self) -> Self {
Map {
service: self.service.clone(),
@ -54,17 +55,17 @@ where
type Future = MapFuture<A, F, Response>;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: A::Request) -> Self::Future {
fn call(&self, req: A::Request) -> Self::Future {
MapFuture::new(self.service.call(req), self.f.clone())
}
}
@ -135,6 +136,7 @@ where
A: Clone,
F: Clone,
{
#[inline]
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
@ -158,6 +160,7 @@ where
type InitError = A::InitError;
type Future = MapServiceFuture<A, F, Res>;
#[inline]
fn new_service(&self, cfg: A::Config) -> Self::Future {
MapServiceFuture::new(self.a.new_service(cfg), self.f.clone())
}
@ -217,25 +220,25 @@ mod tests {
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
fn call(&self, _: ()) -> Self::Future {
ok(())
}
}
#[ntex_rt::test]
async fn test_poll_ready() {
let mut srv = Srv.map(|_| "ok");
let srv = Srv.map(|_| "ok");
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
}
#[ntex_rt::test]
async fn test_call() {
let mut srv = Srv.map(|_| "ok");
let srv = Srv.map(|_| "ok");
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), "ok");
@ -244,7 +247,7 @@ mod tests {
#[ntex_rt::test]
async fn test_new_service() {
let new_srv = (|| ok::<_, ()>(Srv)).into_factory().map(|_| "ok");
let mut srv = new_srv.new_service(&()).await.unwrap();
let srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("ok"));

View file

@ -35,6 +35,7 @@ where
A: Clone,
F: Clone,
{
#[inline]
fn clone(&self) -> Self {
MapErr {
service: self.service.clone(),
@ -55,17 +56,17 @@ where
type Future = MapErrFuture<A, F, E>;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(&self.f)
}
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: A::Request) -> Self::Future {
fn call(&self, req: A::Request) -> Self::Future {
MapErrFuture::new(self.service.call(req), self.f.clone())
}
}
@ -161,6 +162,7 @@ where
type InitError = A::InitError;
type Future = MapErrServiceFuture<A, F, E>;
#[inline]
fn new_service(&self, cfg: A::Config) -> Self::Future {
MapErrServiceFuture::new(self.a.new_service(cfg), self.f.clone())
}
@ -219,25 +221,25 @@ mod tests {
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(()))
}
fn call(&mut self, _: ()) -> Self::Future {
fn call(&self, _: ()) -> Self::Future {
err(())
}
}
#[ntex_rt::test]
async fn test_poll_ready() {
let mut srv = Srv.map_err(|_| "error");
let srv = Srv.map_err(|_| "error");
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Err("error")));
}
#[ntex_rt::test]
async fn test_call() {
let mut srv = Srv.map_err(|_| "error");
let srv = Srv.map_err(|_| "error");
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
@ -246,7 +248,7 @@ mod tests {
#[ntex_rt::test]
async fn test_new_service() {
let new_srv = (|| ok::<_, ()>(Srv)).into_factory().map_err(|_| "error");
let mut srv = new_srv.new_service(&()).await.unwrap();
let srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");

View file

@ -75,7 +75,7 @@ impl<T: Service> Pipeline<T> {
Self: Sized,
I: IntoService<U>,
U: Service,
F: FnMut(T::Response, &mut U) -> Fut,
F: Fn(T::Response, &U) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<T::Error> + From<U::Error>,
{
@ -161,17 +161,17 @@ impl<T: Service> Service for Pipeline<T> {
type Future = T::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), T::Error>> {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), T::Error>> {
self.service.poll_ready(cx)
}
#[inline]
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&mut self, req: T::Request) -> Self::Future {
fn call(&self, req: T::Request) -> Self::Future {
self.service.call(req)
}
}
@ -240,7 +240,7 @@ impl<T: ServiceFactory> PipelineFactory<T> {
T::Config: Clone,
I: IntoServiceFactory<U>,
U: ServiceFactory<Config = T::Config, InitError = T::InitError>,
F: FnMut(T::Response, &mut U::Service) -> Fut + Clone,
F: Fn(T::Response, &U::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<T::Error> + From<U::Error>,
{

View file

@ -4,13 +4,12 @@ use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
use crate::cell::Cell;
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `Pipeline::then` method.
pub(crate) struct ThenService<A, B>(Cell<(A, B)>);
pub(crate) struct ThenService<A, B>(Rc<(A, B)>);
impl<A, B> ThenService<A, B> {
/// Create new `.then()` combinator
@ -19,7 +18,7 @@ impl<A, B> ThenService<A, B> {
A: Service,
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error>,
{
Self(Cell::new((a, b)))
Self(Rc::new((a, b)))
}
}
@ -39,8 +38,8 @@ where
type Error = B::Error;
type Future = ThenServiceResponse<A, B>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.as_ref();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
@ -49,8 +48,8 @@ where
}
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.0.get_mut();
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let srv = self.0.as_ref();
if srv.0.poll_shutdown(cx, is_error).is_ready()
&& srv.1.poll_shutdown(cx, is_error).is_ready()
@ -61,9 +60,9 @@ where
}
}
fn call(&mut self, req: A::Request) -> Self::Future {
fn call(&self, req: A::Request) -> Self::Future {
ThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),
state: State::A(self.0.as_ref().0.call(req), Some(self.0.clone())),
}
}
}
@ -84,7 +83,7 @@ where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
A(#[pin] A::Future, Option<Cell<(A, B)>>),
A(#[pin] A::Future, Option<Rc<(A, B)>>),
B(#[pin] B::Future),
Empty,
}
@ -104,9 +103,9 @@ where
match this.state.as_mut().project() {
State::A(fut, b) => match fut.poll(cx) {
Poll::Ready(res) => {
let mut b = b.take().unwrap();
let b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.get_mut().1.call(res);
let fut = b.as_ref().1.call(res);
this.state.set(State::B(fut));
self.poll(cx)
}
@ -272,12 +271,12 @@ mod tests {
type Error = ();
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Result<&'static str, &'static str>) -> Self::Future {
fn call(&self, req: Result<&'static str, &'static str>) -> Self::Future {
match req {
Ok(msg) => ok(msg),
Err(_) => err(()),
@ -293,12 +292,12 @@ mod tests {
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Err(()))
}
fn call(&mut self, req: Result<&'static str, ()>) -> Self::Future {
fn call(&self, req: Result<&'static str, ()>) -> Self::Future {
match req {
Ok(msg) => ok((msg, "ok")),
Err(()) => ok(("srv2", "err")),
@ -309,7 +308,7 @@ mod tests {
#[ntex_rt::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let mut srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Err(())));
assert_eq!(cnt.get(), 2);
@ -318,7 +317,7 @@ mod tests {
#[ntex_rt::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let mut srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt));
let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt));
let res = srv.call(Ok("srv1")).await;
assert!(res.is_ok());
@ -335,7 +334,7 @@ mod tests {
let cnt2 = cnt.clone();
let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone())));
let factory = pipeline_factory(blank).then(move || ready(Ok(Srv2(cnt.clone()))));
let mut srv = factory.new_service(&()).await.unwrap();
let srv = factory.new_service(&()).await.unwrap();
let res = srv.call(Ok("srv1")).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv1", "ok")));

View file

@ -47,7 +47,7 @@ actix-threadpool = "0.3.1"
base64 = "0.11"
bitflags = "1.2"
bytes = "0.5.3"
bytestring = "0.1.4"
bytestring = "0.1.5"
derive_more = "0.99.2"
either = "1.5.3"
encoding_rs = "0.8"

View file

@ -57,6 +57,27 @@ pub struct Waiter {
inner: Cell<Inner>,
}
impl Waiter {
pub fn poll_waiter(&self, cx: &mut Context<'_>) -> Poll<()> {
let inner = unsafe {
self.inner
.get_mut_unsafe()
.data
.get_unchecked_mut(self.token)
};
if inner.is_none() {
let waker = LocalWaker::default();
waker.register(cx.waker());
*inner = Some(waker);
Poll::Pending
} else if inner.as_mut().unwrap().register(cx.waker()) {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
impl Clone for Waiter {
fn clone(&self) -> Self {
let token = unsafe { self.inner.get_mut_unsafe() }.data.insert(None);

View file

@ -126,8 +126,8 @@ struct PoolInner<T> {
}
impl<T> Pool<T> {
pub fn channel(&mut self) -> (PSender<T>, PReceiver<T>) {
let token = self.0.get_mut().insert(PoolInner {
pub fn channel(&self) -> (PSender<T>, PReceiver<T>) {
let token = unsafe { self.0.get_mut_unsafe() }.insert(PoolInner {
flags: Flags::all(),
value: None,
waker: LocalWaker::default(),

View file

@ -63,11 +63,12 @@ impl<T: Address + 'static> Service for OpensslConnector<T> {
type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
fn call(&self, req: Connect<T>) -> Self::Future {
let host = req.host().to_string();
let conn = self.connector.call(req);
let openssl = self.openssl.clone();

View file

@ -86,11 +86,13 @@ impl<T: Address> Service for Resolver<T> {
type Error = ConnectError;
type Future = Either<ResolverFuture<T>, Ready<Result<Connect<T>, Self::Error>>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
#[inline]
fn call(&self, req: Connect<T>) -> Self::Future {
self.lookup(req)
}
}

View file

@ -68,11 +68,12 @@ impl<T: Address + 'static> Service for RustlsConnector<T> {
type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
fn call(&self, req: Connect<T>) -> Self::Future {
let host = req.host().to_string();
let conn = self.connector.call(req);
let config = self.config.clone();

View file

@ -54,6 +54,7 @@ impl<T: Address> ServiceFactory for Connector<T> {
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
#[inline]
fn new_service(&self, _: ()) -> Self::Future {
ok(self.clone())
}
@ -65,11 +66,13 @@ impl<T: Address> Service for Connector<T> {
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
#[inline]
fn call(&self, req: Connect<T>) -> Self::Future {
ConnectServiceResponse {
state: ConnectState::Resolve(self.resolver.lookup(req)),
}

View file

@ -270,15 +270,18 @@ where
type Error = ServiceError<C::Error, Codec>;
type Future = FramedServiceImplResponse<St, Io, Codec, Out, C, T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.connect.poll_ready(cx).map_err(|e| e.into())
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.connect.poll_shutdown(cx, is_error)
}
fn call(&mut self, req: Io) -> Self::Future {
#[inline]
fn call(&self, req: Io) -> Self::Future {
log::trace!("Start connection handshake");
FramedServiceImplResponse {
inner: FramedServiceImplResponseInner::Connect(

View file

@ -1,4 +1,3 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::Duration;
@ -262,7 +261,7 @@ impl Connector {
None
};
Rc::new(RefCell::new(InnerConnector {
Rc::new(InnerConnector {
tcp_pool: ConnectionPool::new(
tcp_service,
self.conn_lifetime,
@ -271,7 +270,7 @@ impl Connector {
self.limit,
),
ssl_pool,
}))
})
}
}
@ -314,14 +313,40 @@ where
type Future =
Either<<Pool<T> as Service>::Future, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tcp_pool.poll_ready(cx)
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self.tcp_pool.poll_ready(cx)?.is_ready();
let ready = if let Some(ref ssl_pool) = self.ssl_pool {
ssl_pool.poll_ready(cx)?.is_ready() && ready
} else {
ready
};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, req: Connect) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let ready = self.tcp_pool.poll_shutdown(cx, is_error).is_ready();
let ready = if let Some(ref ssl_pool) = self.ssl_pool {
ssl_pool.poll_shutdown(cx, is_error).is_ready() && ready
} else {
ready
};
if ready {
Poll::Ready(())
} else {
Poll::Pending
}
}
fn call(&self, req: Connect) -> Self::Future {
match req.uri.scheme_str() {
Some("https") | Some("wss") => {
if let Some(ref mut conn) = self.ssl_pool {
if let Some(ref conn) = self.ssl_pool {
Either::Left(conn.call(req))
} else {
Either::Right(err(ConnectError::SslIsNotSupported))

View file

@ -37,7 +37,7 @@ impl From<Authority> for Key {
}
/// Connections pool
pub(super) struct ConnectionPool<T, Io: 'static>(Rc<RefCell<T>>, Rc<RefCell<Inner<Io>>>);
pub(super) struct ConnectionPool<T, Io: 'static>(Rc<T>, Rc<RefCell<Inner<Io>>>);
impl<T, Io> ConnectionPool<T, Io>
where
@ -52,7 +52,7 @@ where
limit: usize,
) -> Self {
ConnectionPool(
Rc::new(RefCell::new(connector)),
Rc::new(connector),
Rc::new(RefCell::new(Inner {
conn_lifetime,
conn_keep_alive,
@ -88,18 +88,25 @@ where
type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, req: Connect) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.0.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&self, req: Connect) -> Self::Future {
// start support future
crate::rt::spawn(ConnectorPoolSupport {
connector: self.0.clone(),
inner: self.1.clone(),
});
let mut connector = self.0.clone();
let connector = self.0.clone();
let inner = self.1.clone();
let fut = async move {
@ -619,7 +626,7 @@ where
impl<T> Drop for Acquired<T> {
fn drop(&mut self) {
if let Some(inner) = self.1.take() {
inner.as_ref().borrow_mut().release();
inner.borrow_mut().release();
}
}
}

View file

@ -1,40 +0,0 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::task::{Context, Poll};
use crate::service::Service;
#[doc(hidden)]
/// Service that allows to turn non-clone service to a service with `Clone` impl
///
/// # Panics
/// CloneableService might panic with some creative use of thread local storage.
/// See https://github.com/actix/actix-web/issues/1295 for example
pub(crate) struct CloneableService<T: Service>(Rc<RefCell<T>>);
impl<T: Service> CloneableService<T> {
pub(crate) fn new(service: T) -> Self {
Self(Rc::new(RefCell::new(service)))
}
}
impl<T: Service> Clone for CloneableService<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: Service> Service for CloneableService<T> {
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.borrow_mut().poll_ready(cx)
}
fn call(&mut self, req: T::Request) -> Self::Future {
self.0.borrow_mut().call(req)
}
}

View file

@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{fmt, io, net};
@ -10,7 +11,6 @@ use log::{error, trace};
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::cloneable::CloneableService;
use crate::http::config::ServiceConfig;
use crate::http::error::{DispatchError, ResponseError};
use crate::http::error::{ParseError, PayloadError};
@ -79,9 +79,9 @@ where
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
service: Rc<S>,
expect: Rc<X>,
upgrade: Option<Rc<U>>,
on_connect: Option<Box<dyn DataFactory>>,
flags: Flags,
peer_addr: Option<net::SocketAddr>,
@ -179,9 +179,9 @@ where
pub(crate) fn new(
stream: T,
config: ServiceConfig,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
service: Rc<S>,
expect: Rc<X>,
upgrade: Option<Rc<U>>,
on_connect: Option<Box<dyn DataFactory>>,
peer_addr: Option<net::SocketAddr>,
) -> Self {
@ -206,9 +206,9 @@ where
config: ServiceConfig,
read_buf: BytesMut,
timeout: Option<Delay>,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
service: Rc<S>,
expect: Rc<X>,
upgrade: Option<Rc<U>>,
on_connect: Option<Box<dyn DataFactory>>,
peer_addr: Option<net::SocketAddr>,
) -> Self {
@ -921,10 +921,10 @@ mod tests {
let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<TestBuffer>>::new(
buf,
ServiceConfig::default(),
CloneableService::new(
Rc::new(
(|_| ok::<_, io::Error>(Response::Ok().finish())).into_service(),
),
CloneableService::new(ExpectHandler),
Rc::new(ExpectHandler),
None,
None,
None,

View file

@ -17,6 +17,7 @@ impl ServiceFactory for ExpectHandler {
type InitError = io::Error;
type Future = Ready<Result<Self::Service, Self::InitError>>;
#[inline]
fn new_service(&self, _: ()) -> Self::Future {
ok(ExpectHandler)
}
@ -28,11 +29,13 @@ impl Service for ExpectHandler {
type Error = io::Error;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
#[inline]
fn call(&self, req: Request) -> Self::Future {
ok(req)
}
}

View file

@ -10,7 +10,6 @@ use futures::ready;
use crate::codec::{AsyncRead, AsyncWrite, Framed};
use crate::http::body::MessageBody;
use crate::http::cloneable::CloneableService;
use crate::http::config::ServiceConfig;
use crate::http::error::{DispatchError, ParseError, ResponseError};
use crate::http::helpers::DataFactory;
@ -365,9 +364,9 @@ where
/// `Service` implementation for HTTP1 transport
pub struct H1ServiceHandler<T, S: Service, B, X: Service, U: Service> {
srv: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
srv: Rc<S>,
expect: Rc<X>,
upgrade: Option<Rc<U>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
cfg: ServiceConfig,
_t: PhantomData<(T, B)>,
@ -392,9 +391,9 @@ where
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
) -> H1ServiceHandler<T, S, B, X, U> {
H1ServiceHandler {
srv: CloneableService::new(srv),
expect: CloneableService::new(expect),
upgrade: upgrade.map(CloneableService::new),
srv: Rc::new(srv),
expect: Rc::new(expect),
upgrade: upgrade.map(Rc::new),
cfg,
on_connect,
_t: PhantomData,
@ -419,7 +418,7 @@ where
type Error = DispatchError;
type Future = Dispatcher<T, S, B, X, U>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self
.expect
.poll_ready(cx)
@ -439,7 +438,7 @@ where
.is_ready()
&& ready;
let ready = if let Some(ref mut upg) = self.upgrade {
let ready = if let Some(ref upg) = self.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
@ -458,7 +457,23 @@ where
}
}
fn call(&mut self, (io, addr): Self::Request) -> Self::Future {
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let ready = self.expect.poll_shutdown(cx, is_error).is_ready();
let ready = self.srv.poll_shutdown(cx, is_error).is_ready() && ready;
let ready = if let Some(ref upg) = self.upgrade {
upg.poll_shutdown(cx, is_error).is_ready() && ready
} else {
ready
};
if ready {
Poll::Ready(())
} else {
Poll::Pending
}
}
fn call(&self, (io, addr): Self::Request) -> Self::Future {
let on_connect = if let Some(ref on_connect) = self.on_connect {
Some(on_connect(&io))
} else {
@ -533,11 +548,13 @@ where
type Error = ParseError;
type Future = OneRequestServiceResponse<T>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
#[inline]
fn call(&self, req: Self::Request) -> Self::Future {
OneRequestServiceResponse {
framed: Some(Framed::new(req, Codec::new(self.config.clone()))),
}

View file

@ -20,6 +20,7 @@ impl<T> ServiceFactory for UpgradeHandler<T> {
type InitError = io::Error;
type Future = Ready<Result<Self::Service, Self::InitError>>;
#[inline]
fn new_service(&self, _: ()) -> Self::Future {
unimplemented!()
}
@ -31,11 +32,13 @@ impl<T> Service for UpgradeHandler<T> {
type Error = io::Error;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: Self::Request) -> Self::Future {
#[inline]
fn call(&self, _: Self::Request) -> Self::Future {
unimplemented!()
}
}

View file

@ -3,6 +3,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::net;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut};
@ -13,7 +14,6 @@ use log::{error, trace};
use crate::codec::{AsyncRead, AsyncWrite};
use crate::http::body::{BodySize, MessageBody, ResponseBody};
use crate::http::cloneable::CloneableService;
use crate::http::config::ServiceConfig;
use crate::http::error::{DispatchError, ResponseError};
use crate::http::helpers::DataFactory;
@ -32,7 +32,7 @@ pub struct Dispatcher<T, S: Service<Request = Request>, B: MessageBody>
where
T: AsyncRead + AsyncWrite + Unpin,
{
service: CloneableService<S>,
service: Rc<S>,
connection: Connection<T, Bytes>,
on_connect: Option<Box<dyn DataFactory>>,
config: ServiceConfig,
@ -47,12 +47,11 @@ where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S::Error: ResponseError,
// S::Future: 'static,
S::Response: Into<Response<B>>,
B: MessageBody,
{
pub(crate) fn new(
service: CloneableService<S>,
service: Rc<S>,
connection: Connection<T, Bytes>,
on_connect: Option<Box<dyn DataFactory>>,
config: ServiceConfig,

View file

@ -1,8 +1,9 @@
use std::future::Future;
use std::marker::PhantomData;
use std::net;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{net, rc};
use bytes::Bytes;
use futures::future::ok;
@ -12,7 +13,6 @@ use log::error;
use crate::codec::{AsyncRead, AsyncWrite};
use crate::http::body::MessageBody;
use crate::http::cloneable::CloneableService;
use crate::http::config::ServiceConfig;
use crate::http::error::{DispatchError, ResponseError};
use crate::http::helpers::DataFactory;
@ -30,7 +30,7 @@ use super::dispatcher::Dispatcher;
pub struct H2Service<T, S, B> {
srv: S,
cfg: ServiceConfig,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, B)>,
}
@ -58,7 +58,7 @@ where
/// Set on connect callback.
pub(crate) fn on_connect(
mut self,
f: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
f: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
) -> Self {
self.on_connect = f;
self
@ -214,7 +214,7 @@ pub struct H2ServiceResponse<T, S: ServiceFactory, B> {
#[pin]
fut: S::Future,
cfg: Option<ServiceConfig>,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, B)>,
}
@ -245,9 +245,9 @@ where
/// `Service` implementation for http/2 transport
pub struct H2ServiceHandler<T, S: Service, B> {
srv: CloneableService<S>,
srv: Rc<S>,
cfg: ServiceConfig,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, B)>,
}
@ -261,13 +261,13 @@ where
{
fn new(
cfg: ServiceConfig,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
srv: S,
) -> H2ServiceHandler<T, S, B> {
H2ServiceHandler {
cfg,
on_connect,
srv: CloneableService::new(srv),
srv: Rc::new(srv),
_t: PhantomData,
}
}
@ -287,14 +287,20 @@ where
type Error = DispatchError;
type Future = H2ServiceHandlerResponse<T, S, B>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.srv.poll_ready(cx).map_err(|e| {
error!("Service readiness error: {:?}", e);
DispatchError::Service(Box::new(e))
})
}
fn call(&mut self, (io, addr): Self::Request) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.srv.poll_shutdown(cx, is_error)
}
fn call(&self, (io, addr): Self::Request) -> Self::Future {
let on_connect = if let Some(ref on_connect) = self.on_connect {
Some(on_connect(&io))
} else {
@ -320,7 +326,7 @@ where
{
Incoming(Dispatcher<T, S, B>),
Handshake(
Option<CloneableService<S>>,
Option<Rc<S>>,
Option<ServiceConfig>,
Option<net::SocketAddr>,
Option<Box<dyn DataFactory>>,

View file

@ -2,7 +2,6 @@
pub mod body;
mod builder;
pub mod client;
mod cloneable;
mod config;
#[cfg(feature = "compress")]
pub mod encoding;

View file

@ -1,7 +1,8 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{fmt, net, rc};
use std::{fmt, net};
use bytes::Bytes;
use futures::future::ok;
@ -15,7 +16,6 @@ use crate::service::{pipeline_factory, IntoServiceFactory, Service, ServiceFacto
use super::body::MessageBody;
use super::builder::HttpServiceBuilder;
use super::cloneable::CloneableService;
use super::config::{KeepAlive, ServiceConfig};
use super::error::{DispatchError, ResponseError};
use super::helpers::DataFactory;
@ -29,7 +29,7 @@ pub struct HttpService<T, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler<T>
cfg: ServiceConfig,
expect: X,
upgrade: Option<U>,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, B)>,
}
@ -146,7 +146,7 @@ where
/// Set on connect callback.
pub(crate) fn on_connect(
mut self,
f: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
f: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
) -> Self {
self.on_connect = f;
self
@ -379,7 +379,7 @@ pub struct HttpServiceResponse<
fut_upg: Option<U::Future>,
expect: Option<X::Service>,
upgrade: Option<U::Service>,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
cfg: ServiceConfig,
_t: PhantomData<(T, B)>,
}
@ -445,11 +445,11 @@ where
/// `Service` implementation for http transport
pub struct HttpServiceHandler<T, S: Service, B, X: Service, U: Service> {
srv: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
srv: Rc<S>,
expect: Rc<X>,
upgrade: Option<Rc<U>>,
cfg: ServiceConfig,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, B, X)>,
}
@ -470,14 +470,14 @@ where
srv: S,
expect: X,
upgrade: Option<U>,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
) -> HttpServiceHandler<T, S, B, X, U> {
HttpServiceHandler {
cfg,
on_connect,
srv: CloneableService::new(srv),
expect: CloneableService::new(expect),
upgrade: upgrade.map(CloneableService::new),
srv: Rc::new(srv),
expect: Rc::new(expect),
upgrade: upgrade.map(Rc::new),
_t: PhantomData,
}
}
@ -501,7 +501,7 @@ where
type Error = DispatchError;
type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self
.expect
.poll_ready(cx)
@ -521,7 +521,7 @@ where
.is_ready()
&& ready;
let ready = if let Some(ref mut upg) = self.upgrade {
let ready = if let Some(ref upg) = self.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
@ -540,7 +540,23 @@ where
}
}
fn call(&mut self, (io, proto, peer_addr): Self::Request) -> Self::Future {
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
let ready = self.expect.poll_shutdown(cx, is_error).is_ready();
let ready = self.srv.poll_shutdown(cx, is_error).is_ready() && ready;
let ready = if let Some(ref upg) = self.upgrade {
upg.poll_shutdown(cx, is_error).is_ready() && ready
} else {
ready
};
if ready {
Poll::Ready(())
} else {
Poll::Pending
}
}
fn call(&self, (io, proto, peer_addr): Self::Request) -> Self::Future {
let on_connect = if let Some(ref on_connect) = self.on_connect {
Some(on_connect(&io))
} else {
@ -591,7 +607,7 @@ where
Option<(
Handshake<T, Bytes>,
ServiceConfig,
CloneableService<S>,
Rc<S>,
Option<Box<dyn DataFactory>>,
Option<net::SocketAddr>,
)>,

View file

@ -73,7 +73,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Service for AcceptorService<T>
type Error = HandshakeError<T>;
type Future = AcceptorServiceResponse<T>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Poll::Ready(Ok(()))
} else {
@ -81,7 +82,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Service for AcceptorService<T>
}
}
fn call(&mut self, req: Self::Request) -> Self::Future {
#[inline]
fn call(&self, req: Self::Request) -> Self::Future {
let acc = self.acceptor.clone();
AcceptorServiceResponse {
_guard: self.conns.get(),

View file

@ -79,7 +79,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Service for AcceptorService<T> {
type Error = io::Error;
type Future = AcceptorServiceFut<T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Poll::Ready(Ok(()))
} else {
@ -87,7 +88,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Service for AcceptorService<T> {
}
}
fn call(&mut self, req: Self::Request) -> Self::Future {
#[inline]
fn call(&self, req: Self::Request) -> Self::Future {
AcceptorServiceFut {
_guard: self.conns.get(),
fut: self.acceptor.accept(req),

View file

@ -71,14 +71,17 @@ where
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(|_| ())
}
fn call(
&mut self,
(guard, req): (Option<CounterGuard>, ServerMessage),
) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
fn call(&self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req {
ServerMessage::Connect(stream) => {
let stream = FromStream::from_stdstream(stream).map_err(|e| {
@ -189,6 +192,7 @@ where
{
type Factory = T;
#[inline]
fn create(&self) -> T {
(self)()
}

View file

@ -52,7 +52,8 @@ where
type Error = A::Error;
type Future = future::Either<A::Future, B::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let left = self.left.poll_ready(cx)?;
let right = self.right.poll_ready(cx)?;
@ -63,7 +64,8 @@ where
}
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
if self.left.poll_shutdown(cx, is_error).is_ready()
&& self.right.poll_shutdown(cx, is_error).is_ready()
{
@ -73,7 +75,8 @@ where
}
}
fn call(&mut self, req: either::Either<A::Request, B::Request>) -> Self::Future {
#[inline]
fn call(&self, req: either::Either<A::Request, B::Request>) -> Self::Future {
match req {
either::Either::Left(req) => future::Either::Left(self.left.call(req)),
either::Either::Right(req) => future::Either::Right(self.right.call(req)),

View file

@ -73,7 +73,8 @@ where
type Error = T::Error;
type Future = InFlightServiceResponse<T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Poll::Pending = self.service.poll_ready(cx)? {
Poll::Pending
} else if !self.count.available(cx) {
@ -84,11 +85,13 @@ where
}
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
fn call(&mut self, req: T::Request) -> Self::Future {
#[inline]
fn call(&self, req: T::Request) -> Self::Future {
InFlightServiceResponse {
fut: self.service.call(req),
_guard: self.count.get(),
@ -129,11 +132,11 @@ mod tests {
type Error = ();
type Future = LocalBoxFuture<'static, Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
fn call(&self, _: ()) -> Self::Future {
crate::rt::time::delay_for(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
@ -144,7 +147,7 @@ mod tests {
async fn test_transform() {
let wait_time = Duration::from_millis(50);
let mut srv = InFlightService::new(1, SleepService(wait_time));
let srv = InFlightService::new(1, SleepService(wait_time));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
@ -160,7 +163,7 @@ mod tests {
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
let mut srv = srv.new_service(&()).await.unwrap();
let srv = srv.new_service(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());

View file

@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::convert::Infallible;
use std::future::Future;
use std::marker::PhantomData;
@ -72,9 +73,13 @@ pub struct KeepAliveService<R, E, F> {
f: F,
ka: Duration,
time: LowResTimeService,
inner: RefCell<Inner>,
_t: PhantomData<(R, E)>,
}
struct Inner {
delay: Delay,
expire: Instant,
_t: PhantomData<(R, E)>,
}
impl<R, E, F> KeepAliveService<R, E, F>
@ -87,8 +92,10 @@ where
f,
ka,
time,
inner: RefCell::new(Inner {
expire,
delay: delay_until(expire),
}),
_t: PhantomData,
}
}
@ -103,15 +110,18 @@ where
type Error = E;
type Future = Ready<Result<R, E>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.delay).poll(cx) {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.borrow_mut();
match Pin::new(&mut inner.delay).poll(cx) {
Poll::Ready(_) => {
let now = Instant::from_std(self.time.now());
if self.expire <= now {
if inner.expire <= now {
Poll::Ready(Err((self.f)()))
} else {
self.delay.reset(self.expire);
let _ = Pin::new(&mut self.delay).poll(cx);
let expire = inner.expire;
inner.delay.reset(expire);
let _ = Pin::new(&mut inner.delay).poll(cx);
Poll::Ready(Ok(()))
}
}
@ -119,8 +129,8 @@ where
}
}
fn call(&mut self, req: R) -> Self::Future {
self.expire = Instant::from_std(self.time.now() + self.ka);
fn call(&self, req: R) -> Self::Future {
self.inner.borrow_mut().expire = Instant::from_std(self.time.now() + self.ka);
ok(req)
}
}

View file

@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::fmt;
@ -58,7 +59,7 @@ pub struct InOrder<S> {
impl<S> InOrder<S>
where
S: Service,
S: Service + 'static,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
@ -74,7 +75,7 @@ where
impl<S> Default for InOrder<S>
where
S: Service,
S: Service + 'static,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
@ -86,7 +87,7 @@ where
impl<S> Transform<S> for InOrder<S>
where
S: Service,
S: Service + 'static,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
@ -105,7 +106,11 @@ where
pub struct InOrderService<S: Service> {
service: S,
waker: Rc<LocalWaker>,
inner: Rc<RefCell<Inner<S>>>,
}
struct Inner<S: Service> {
waker: LocalWaker,
acks: VecDeque<Record<S::Response, S::Error>>,
}
@ -122,15 +127,17 @@ where
{
Self {
service: service.into_service(),
inner: Rc::new(RefCell::new(Inner {
acks: VecDeque::new(),
waker: Rc::new(LocalWaker::new()),
waker: LocalWaker::new(),
})),
}
}
}
impl<S> Service for InOrderService<S>
where
S: Service,
S: Service + 'static,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
@ -140,16 +147,18 @@ where
type Error = InOrderError<S::Error>;
type Future = InOrderServiceResponse<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.borrow_mut();
// poll_ready could be called from different task
self.waker.register(cx.waker());
inner.waker.register(cx.waker());
// check acks
while !self.acks.is_empty() {
let rec = self.acks.front_mut().unwrap();
while !inner.acks.is_empty() {
let rec = inner.acks.front_mut().unwrap();
match Pin::new(&mut rec.rx).poll(cx) {
Poll::Ready(Ok(res)) => {
let rec = self.acks.pop_front().unwrap();
let rec = inner.acks.pop_front().unwrap();
let _ = rec.tx.send(res);
}
Poll::Pending => break,
@ -169,20 +178,24 @@ where
}
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
fn call(&mut self, request: S::Request) -> Self::Future {
fn call(&self, request: S::Request) -> Self::Future {
let inner = self.inner.clone();
let mut inner_b = inner.borrow_mut();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
self.acks.push_back(Record { rx: rx1, tx: tx2 });
inner_b.acks.push_back(Record { rx: rx1, tx: tx2 });
let waker = self.waker.clone();
let fut = self.service.call(request);
drop(inner_b);
crate::rt::spawn(async move {
let res = fut.await;
waker.wake();
inner.borrow().waker.wake();
let _ = tx1.send(res);
});
@ -227,11 +240,11 @@ mod tests {
type Error = ();
type Future = LocalBoxFuture<'static, Result<usize, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: oneshot::Receiver<usize>) -> Self::Future {
fn call(&self, req: oneshot::Receiver<usize>) -> Self::Future {
req.map(|res| res.map_err(|_| ())).boxed_local()
}
}
@ -249,7 +262,7 @@ mod tests {
let rx3 = rx3;
let tx_stop = tx_stop;
let _ = crate::rt::System::new("test").block_on(async {
let mut srv = InOrderService::new(Srv);
let srv = InOrderService::new(Srv);
let _ = lazy(|cx| srv.poll_ready(cx)).await;
let res1 = srv.call(rx1);

View file

@ -51,6 +51,7 @@ impl ServiceFactory for LowResTime {
type Service = LowResTimeService;
type Future = Ready<Result<Self::Service, Self::InitError>>;
#[inline]
fn new_service(&self, _: ()) -> Self::Future {
ok(self.timer())
}
@ -94,11 +95,13 @@ impl Service for LowResTimeService {
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
#[inline]
fn call(&self, _: ()) -> Self::Future {
ok(self.now())
}
}

View file

@ -136,15 +136,17 @@ where
type Error = TimeoutError<S::Error>;
type Future = Either<TimeoutServiceResponse<S>, TimeoutServiceResponse2<S>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(TimeoutError::Service)
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
fn call(&mut self, request: S::Request) -> Self::Future {
fn call(&self, request: S::Request) -> Self::Future {
if self.timeout == ZERO {
Either::Right(TimeoutServiceResponse2 {
fut: self.service.call(request),
@ -231,11 +233,11 @@ mod tests {
type Error = ();
type Future = LocalBoxFuture<'static, Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
fn call(&self, _: ()) -> Self::Future {
crate::rt::time::delay_for(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
@ -247,7 +249,7 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(50);
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
assert_eq!(timeout.call(()).await, Ok(()));
}
@ -256,7 +258,7 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(500);
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
assert_eq!(timeout.call(()).await, Err(TimeoutError::Timeout));
}
@ -269,7 +271,7 @@ mod tests {
Timeout::new(resolution),
fn_factory(|| ok::<_, ()>(SleepService(wait_time))),
);
let mut srv = timeout.new_service(&()).await.unwrap();
let srv = timeout.new_service(&()).await.unwrap();
assert_eq!(srv.call(()).await, Err(TimeoutError::Timeout));
}

View file

@ -459,7 +459,7 @@ where
>
where
B1: MessageBody,
F: FnMut(WebRequest<Err>, &mut T::Service) -> R + Clone,
F: Fn(WebRequest<Err>, &T::Service) -> R + Clone,
R: Future<Output = Result<WebResponse<B1>, Err::Container>>,
{
App {
@ -529,7 +529,7 @@ mod tests {
#[ntex_rt::test]
async fn test_default_resource() {
let mut srv = init_service(
let srv = init_service(
App::new()
.service(web::resource("/test").to(|| async { HttpResponse::Ok() })),
)
@ -542,7 +542,7 @@ mod tests {
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let mut srv = init_service(
let srv = init_service(
App::new()
.service(web::resource("/test").to(|| async { HttpResponse::Ok() }))
.service(
@ -575,7 +575,7 @@ mod tests {
#[ntex_rt::test]
async fn test_data_factory() {
let mut srv = init_service(
let srv = init_service(
App::new().data_factory(|| ok::<_, ()>(10usize)).service(
web::resource("/")
.to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
@ -586,12 +586,9 @@ mod tests {
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let mut srv = init_service(
App::new().data_factory(|| ok::<_, ()>(10u32)).service(
web::resource("/")
.to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
),
)
let srv = init_service(App::new().data_factory(|| ok::<_, ()>(10u32)).service(
web::resource("/").to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
))
.await;
let req = TestRequest::default().to_request();
let res = srv.call(req).await.unwrap();
@ -600,7 +597,7 @@ mod tests {
#[ntex_rt::test]
async fn test_extension() {
let mut srv = init_service(App::new().app_data(10usize).service(
let srv = init_service(App::new().app_data(10usize).service(
web::resource("/").to(|req: HttpRequest| async move {
assert_eq!(*req.app_data::<usize>().unwrap(), 10);
HttpResponse::Ok()
@ -614,7 +611,7 @@ mod tests {
#[ntex_rt::test]
async fn test_wrap() {
let mut srv = init_service(
let srv = init_service(
App::new()
.wrap(
DefaultHeaders::new()
@ -624,7 +621,7 @@ mod tests {
)
.await;
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_TYPE).unwrap(),
@ -634,7 +631,7 @@ mod tests {
#[ntex_rt::test]
async fn test_router_wrap() {
let mut srv = init_service(
let srv = init_service(
App::new()
.route("/test", web::get().to(|| async { HttpResponse::Ok() }))
.wrap(
@ -644,7 +641,7 @@ mod tests {
)
.await;
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_TYPE).unwrap(),
@ -654,7 +651,7 @@ mod tests {
#[ntex_rt::test]
async fn test_wrap_fn() {
let mut srv = init_service(
let srv = init_service(
App::new()
.wrap_fn(|req, srv| {
let fut = srv.call(req);
@ -671,7 +668,7 @@ mod tests {
)
.await;
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_TYPE).unwrap(),
@ -681,7 +678,7 @@ mod tests {
#[ntex_rt::test]
async fn test_router_wrap_fn() {
let mut srv = init_service(
let srv = init_service(
App::new()
.route("/test", web::get().to(|| async { HttpResponse::Ok() }))
.wrap_fn(|req, srv| {
@ -698,7 +695,7 @@ mod tests {
)
.await;
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_TYPE).unwrap(),
@ -708,24 +705,24 @@ mod tests {
#[ntex_rt::test]
async fn test_case_insensitive_router() {
let mut srv = init_service(
let srv = init_service(
App::new()
.case_insensitive_routing()
.route("/test", web::get().to(|| async { HttpResponse::Ok() })),
)
.await;
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let req = TestRequest::with_uri("/Test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
}
#[ntex_rt::test]
async fn test_external_resource() {
let mut srv = init_service(
let srv = init_service(
App::new()
.external_resource("youtube", "https://youtube.com/watch/{video_id}")
.route(
@ -740,7 +737,7 @@ mod tests {
)
.await;
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let body = read_body(resp).await;
assert_eq!(body, Bytes::from_static(b"https://youtube.com/watch/12345"));

View file

@ -246,11 +246,17 @@ where
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
fn call(&self, req: Request) -> Self::Future {
let (head, payload) = req.into_parts();
let req = if let Some(mut req) = self.pool.get_request() {
@ -419,7 +425,8 @@ impl<Err: ErrorRenderer> Service for AppRouting<Err> {
type Error = Err::Container;
type Future = BoxResponse<Err>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.ready.is_none() {
Poll::Ready(Ok(()))
} else {
@ -427,8 +434,8 @@ impl<Err: ErrorRenderer> Service for AppRouting<Err> {
}
}
fn call(&mut self, mut req: WebRequest<Err>) -> Self::Future {
let res = self.router.recognize_mut_checked(&mut req, |req, guards| {
fn call(&self, mut req: WebRequest<Err>) -> Self::Future {
let res = self.router.recognize_checked(&mut req, |req, guards| {
if let Some(guards) = guards {
for f in guards {
if !f.check(req.head()) {
@ -441,7 +448,7 @@ impl<Err: ErrorRenderer> Service for AppRouting<Err> {
if let Some((srv, _info)) = res {
srv.call(req)
} else if let Some(ref mut default) = self.default {
} else if let Some(ref default) = self.default {
default.call(req)
} else {
let req = req.into_parts().0;
@ -497,7 +504,7 @@ mod tests {
let data = Arc::new(AtomicBool::new(false));
{
let mut app =
let app =
init_service(App::new().data(DropData(data.clone())).service(
web::resource("/test").to(|| async { HttpResponse::Ok() }),
))

View file

@ -255,7 +255,7 @@ mod tests {
cfg.data(10usize);
};
let mut srv = init_service(App::new().configure(cfg).service(
let srv = init_service(App::new().configure(cfg).service(
web::resource("/").to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
))
.await;

View file

@ -142,7 +142,7 @@ mod tests {
#[ntex_rt::test]
async fn test_data_extractor() {
let mut srv = init_service(App::new().data("TEST".to_string()).service(
let srv = init_service(App::new().data("TEST".to_string()).service(
web::resource("/").to(|data: web::Data<String>| async move {
assert_eq!(data.to_lowercase(), "test");
HttpResponse::Ok()
@ -154,7 +154,7 @@ mod tests {
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let mut srv = init_service(App::new().data(10u32).service(
let srv = init_service(App::new().data(10u32).service(
web::resource("/").to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
))
.await;
@ -165,7 +165,7 @@ mod tests {
#[ntex_rt::test]
async fn test_app_data_extractor() {
let mut srv = init_service(App::new().app_data(Data::new(10usize)).service(
let srv = init_service(App::new().app_data(Data::new(10usize)).service(
web::resource("/").to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
))
.await;
@ -174,7 +174,7 @@ mod tests {
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let mut srv = init_service(App::new().app_data(Data::new(10u32)).service(
let srv = init_service(App::new().app_data(Data::new(10u32)).service(
web::resource("/").to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
))
.await;
@ -185,7 +185,7 @@ mod tests {
#[ntex_rt::test]
async fn test_route_data_extractor() {
let mut srv =
let srv =
init_service(App::new().service(web::resource("/").data(10usize).route(
web::get().to(|data: web::Data<usize>| async move {
let _ = data.clone();
@ -199,7 +199,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
// different type
let mut srv =
let srv =
init_service(App::new().service(web::resource("/").data(10u32).route(
web::get().to(|_: web::Data<usize>| async { HttpResponse::Ok() }),
)))
@ -211,7 +211,7 @@ mod tests {
#[ntex_rt::test]
async fn test_override_data() {
let mut srv = init_service(App::new().data(1usize).service(
let srv = init_service(App::new().data(1usize).service(
web::resource("/").data(10usize).route(web::get().to(
|data: web::Data<usize>| async move {
assert_eq!(**data, 10);

View file

@ -36,7 +36,7 @@ where
pub(super) trait HandlerFn<Err: ErrorRenderer> {
fn call(
&mut self,
&self,
_: WebRequest<Err>,
) -> LocalBoxFuture<'static, Result<WebResponse, Err::Container>>;
@ -86,7 +86,7 @@ where
Err: ErrorRenderer,
{
fn call(
&mut self,
&self,
req: WebRequest<Err>,
) -> LocalBoxFuture<'static, Result<WebResponse, Err::Container>> {
let (req, mut payload) = req.into_parts();

View file

@ -97,11 +97,17 @@ where
type Error = S::Error;
type Future = CompressResponse<S, B, E>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: WebRequest<E>) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
fn call(&self, req: WebRequest<E>) -> Self::Future {
// negotiate content-encoding
let encoding = if let Some(val) = req.headers().get(&ACCEPT_ENCODING) {
if let Ok(enc) = val.to_str() {

View file

@ -129,11 +129,17 @@ where
type Error = S::Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: WebRequest<E>) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
fn call(&self, req: WebRequest<E>) -> Self::Future {
let inner = self.inner.clone();
let fut = self.service.call(req);
@ -172,7 +178,7 @@ mod tests {
#[ntex_rt::test]
async fn test_default_headers() {
let mut mw = DefaultHeaders::<DefaultError>::new()
let mw = DefaultHeaders::<DefaultError>::new()
.header(CONTENT_TYPE, "0001")
.new_transform(ok_service())
.await
@ -189,7 +195,7 @@ mod tests {
HttpResponse::Ok().header(CONTENT_TYPE, "0002").finish(),
))
};
let mut mw = DefaultHeaders::<DefaultError>::new()
let mw = DefaultHeaders::<DefaultError>::new()
.header(CONTENT_TYPE, "0001")
.new_transform(srv.into_service())
.await
@ -203,7 +209,7 @@ mod tests {
let srv = |req: WebRequest<DefaultError>| {
ok::<_, Error>(req.into_response(HttpResponse::Ok().finish()))
};
let mut mw = DefaultHeaders::<DefaultError>::new()
let mw = DefaultHeaders::<DefaultError>::new()
.content_type()
.new_transform(srv.into_service())
.await

View file

@ -164,11 +164,18 @@ where
type Error = S::Error;
type Future = LoggerResponse<S, B, E>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: WebRequest<E>) -> Self::Future {
#[inline]
fn poll_shutdown(&self, cx: &mut Context<'_>, is_error: bool) -> Poll<()> {
self.service.poll_shutdown(cx, is_error)
}
#[inline]
fn call(&self, req: WebRequest<E>) -> Self::Future {
if self.inner.exclude.contains(req.path()) {
LoggerResponse {
fut: self.service.call(req),
@ -503,7 +510,7 @@ mod tests {
};
let logger = Logger::new("%% %{User-Agent}i %{X-Test}o %{HOME}e %D test");
let mut srv = Transform::new_transform(&logger, srv.into_service())
let srv = Transform::new_transform(&logger, srv.into_service())
.await
.unwrap();

View file

@ -331,7 +331,7 @@ where
>,
>
where
F: FnMut(WebRequest<Err>, &mut T::Service) -> R + Clone,
F: Fn(WebRequest<Err>, &T::Service) -> R + Clone,
R: Future<Output = Result<WebResponse, Err::Container>>,
{
Resource {
@ -504,12 +504,13 @@ impl<Err: ErrorRenderer> Service for ResourceService<Err> {
LocalBoxFuture<'static, Result<WebResponse, Err::Container>>,
>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, mut req: WebRequest<Err>) -> Self::Future {
for route in self.routes.iter_mut() {
fn call(&self, mut req: WebRequest<Err>) -> Self::Future {
for route in self.routes.iter() {
if route.check(&mut req) {
if let Some(ref data) = self.data {
req.set_data_container(data.clone());
@ -517,7 +518,7 @@ impl<Err: ErrorRenderer> Service for ResourceService<Err> {
return Either::Right(route.call(req));
}
}
if let Some(ref mut default) = self.default {
if let Some(ref default) = self.default {
Either::Right(default.call(req))
} else {
let req = req.into_parts().0;

View file

@ -487,7 +487,7 @@ pub(crate) mod tests {
#[ntex_rt::test]
async fn test_option_responder() {
let mut srv = init_service(
let srv = init_service(
web::App::new()
.service(
web::resource("/none").to(|| async { Option::<&'static str>::None }),

View file

@ -82,11 +82,13 @@ impl<Err: ErrorRenderer> Service for RouteService<Err> {
type Error = Err::Container;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: WebRequest<Err>) -> Self::Future {
#[inline]
fn call(&self, req: WebRequest<Err>) -> Self::Future {
self.handler.call(req)
}
}

View file

@ -394,7 +394,7 @@ where
>,
>
where
F: FnMut(WebRequest<Err>, &mut T::Service) -> R + Clone,
F: Fn(WebRequest<Err>, &T::Service) -> R + Clone,
R: Future<Output = Result<WebResponse, Err::Container>>,
{
Scope {
@ -619,12 +619,13 @@ impl<Err: ErrorRenderer> Service for ScopeService<Err> {
type Error = Err::Container;
type Future = Either<BoxedResponse<Err>, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, mut req: WebRequest<Err>) -> Self::Future {
let res = self.router.recognize_mut_checked(&mut req, |req, guards| {
fn call(&self, mut req: WebRequest<Err>) -> Self::Future {
let res = self.router.recognize_checked(&mut req, |req, guards| {
if let Some(guards) = guards {
for f in guards {
if !f.check(req.head()) {
@ -640,7 +641,7 @@ impl<Err: ErrorRenderer> Service for ScopeService<Err> {
req.set_data_container(data.clone());
}
Either::Left(srv.call(req))
} else if let Some(ref mut default) = self.default {
} else if let Some(ref default) = self.default {
Either::Left(default.call(req))
} else {
let req = req.into_parts().0;
@ -691,7 +692,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope() {
let mut srv =
let srv =
init_service(App::new().service(
web::scope("/app").service(
web::resource("/path1").to(|| async { HttpResponse::Ok() }),
@ -706,7 +707,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_root() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("/app")
.service(web::resource("").to(|| async { HttpResponse::Ok() }))
@ -728,7 +729,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_root2() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("/app/")
.service(web::resource("").to(|| async { HttpResponse::Ok() })),
@ -747,7 +748,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_root3() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("/app/")
.service(web::resource("/").to(|| async { HttpResponse::Ok() })),
@ -766,7 +767,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_route() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("app")
.route("/path1", web::get().to(|| async { HttpResponse::Ok() }))
@ -794,7 +795,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_route_without_leading_slash() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("app").service(
web::resource("path1")
@ -824,7 +825,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_guard() {
let mut srv =
let srv =
init_service(App::new().service(
web::scope("/app").guard(guard::Get()).service(
web::resource("/path1").to(|| async { HttpResponse::Ok() }),
@ -847,8 +848,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_variable_segment() {
let mut srv =
init_service(App::new().service(web::scope("/ab-{project}").service(
let srv = init_service(App::new().service(web::scope("/ab-{project}").service(
web::resource("/path1").to(|r: HttpRequest| async move {
HttpResponse::Ok()
.body(format!("project: {}", &r.match_info()["project"]))
@ -875,7 +875,7 @@ mod tests {
#[ntex_rt::test]
async fn test_nested_scope() {
let mut srv = init_service(App::new().service(web::scope("/app").service(
let srv = init_service(App::new().service(web::scope("/app").service(
web::scope("/t1").service(
web::resource("/path1").to(|| async { HttpResponse::Created() }),
),
@ -889,7 +889,7 @@ mod tests {
#[ntex_rt::test]
async fn test_nested_scope_no_slash() {
let mut srv = init_service(App::new().service(web::scope("/app").service(
let srv = init_service(App::new().service(web::scope("/app").service(
web::scope("t1").service(
web::resource("/path1").to(|| async { HttpResponse::Created() }),
),
@ -903,7 +903,7 @@ mod tests {
#[ntex_rt::test]
async fn test_nested_scope_root() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("/app").service(
web::scope("/t1")
@ -927,7 +927,7 @@ mod tests {
#[ntex_rt::test]
async fn test_nested_scope_filter() {
let mut srv =
let srv =
init_service(App::new().service(web::scope("/app").service(
web::scope("/t1").guard(guard::Get()).service(
web::resource("/path1").to(|| async { HttpResponse::Ok() }),
@ -950,7 +950,7 @@ mod tests {
#[ntex_rt::test]
async fn test_nested_scope_with_variable_segment() {
let mut srv = init_service(App::new().service(web::scope("/app").service(
let srv = init_service(App::new().service(web::scope("/app").service(
web::scope("/{project_id}").service(web::resource("/path1").to(
|r: HttpRequest| async move {
HttpResponse::Created()
@ -975,7 +975,7 @@ mod tests {
#[ntex_rt::test]
async fn test_nested2_scope_with_variable_segment() {
let mut srv = init_service(App::new().service(web::scope("/app").service(
let srv = init_service(App::new().service(web::scope("/app").service(
web::scope("/{project}").service(web::scope("/{id}").service(
web::resource("/path1").to(|r: HttpRequest| async move {
HttpResponse::Created().body(format!(
@ -1007,7 +1007,7 @@ mod tests {
#[ntex_rt::test]
async fn test_default_resource() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("/app")
.service(web::resource("/path1").to(|| async { HttpResponse::Ok() }))
@ -1029,7 +1029,7 @@ mod tests {
#[ntex_rt::test]
async fn test_default_resource_propagation() {
let mut srv = init_service(
let srv = init_service(
App::new()
.service(web::scope("/app1").default_service(
web::resource("").to(|| async { HttpResponse::BadRequest() }),
@ -1056,7 +1056,7 @@ mod tests {
#[ntex_rt::test]
async fn test_middleware() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("app")
.wrap(
@ -1072,7 +1072,7 @@ mod tests {
.await;
let req = TestRequest::with_uri("/app/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),
@ -1082,7 +1082,7 @@ mod tests {
#[ntex_rt::test]
async fn test_middleware_fn() {
let mut srv = init_service(
let srv = init_service(
App::new().service(
web::scope("app")
.wrap_fn(|req, srv| {
@ -1100,7 +1100,7 @@ mod tests {
.await;
let req = TestRequest::with_uri("/app/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),
@ -1110,7 +1110,7 @@ mod tests {
#[ntex_rt::test]
async fn test_override_data() {
let mut srv = init_service(App::new().data(1usize).service(
let srv = init_service(App::new().data(1usize).service(
web::scope("app").data(10usize).route(
"/t",
web::get().to(|data: web::Data<usize>| {
@ -1123,13 +1123,13 @@ mod tests {
.await;
let req = TestRequest::with_uri("/app/t").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
}
#[ntex_rt::test]
async fn test_override_app_data() {
let mut srv = init_service(App::new().app_data(web::Data::new(1usize)).service(
let srv = init_service(App::new().app_data(web::Data::new(1usize)).service(
web::scope("app").app_data(web::Data::new(10usize)).route(
"/t",
web::get().to(|data: web::Data<usize>| {
@ -1142,14 +1142,13 @@ mod tests {
.await;
let req = TestRequest::with_uri("/app/t").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
}
#[ntex_rt::test]
async fn test_scope_config() {
let mut srv =
init_service(App::new().service(web::scope("/app").configure(|s| {
let srv = init_service(App::new().service(web::scope("/app").configure(|s| {
s.route("/path1", web::get().to(|| async { HttpResponse::Ok() }));
})))
.await;
@ -1161,8 +1160,7 @@ mod tests {
#[ntex_rt::test]
async fn test_scope_config_2() {
let mut srv =
init_service(App::new().service(web::scope("/app").configure(|s| {
let srv = init_service(App::new().service(web::scope("/app").configure(|s| {
s.service(web::scope("/v1").configure(|s| {
s.route("/", web::get().to(|| async { HttpResponse::Ok() }));
}));
@ -1176,13 +1174,9 @@ mod tests {
#[ntex_rt::test]
async fn test_url_for_external() {
let mut srv =
init_service(App::new().service(web::scope("/app").configure(|s| {
let srv = init_service(App::new().service(web::scope("/app").configure(|s| {
s.service(web::scope("/v1").configure(|s| {
s.external_resource(
"youtube",
"https://youtube.com/watch/{video_id}",
);
s.external_resource("youtube", "https://youtube.com/watch/{video_id}");
s.route(
"/",
web::get().to(|req: HttpRequest| async move {
@ -1205,7 +1199,7 @@ mod tests {
#[ntex_rt::test]
async fn test_url_for_nested() {
let mut srv = init_service(App::new().service(web::scope("/a").service(
let srv = init_service(App::new().service(web::scope("/a").service(
web::scope("/b").service(web::resource("/c/{stuff}").name("c").route(
web::get().to(|req: HttpRequest| async move {
HttpResponse::Ok()
@ -1216,7 +1210,7 @@ mod tests {
.await;
let req = TestRequest::with_uri("/a/b/c/test").to_request();
let resp = call_service(&mut srv, req).await;
let resp = call_service(&srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let body = read_body(resp).await;
assert_eq!(

View file

@ -602,7 +602,7 @@ mod tests {
#[ntex_rt::test]
async fn test_service() {
let mut srv = init_service(App::new().service(
let srv = init_service(App::new().service(
web::service("/test").name("test").finish(
|req: WebRequest<DefaultError>| {
ok(req.into_response(HttpResponse::Ok().finish()))
@ -614,7 +614,7 @@ mod tests {
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let mut srv = init_service(App::new().service(
let srv = init_service(App::new().service(
web::service("/test").guard(guard::Get()).finish(
|req: WebRequest<DefaultError>| {
ok(req.into_response(HttpResponse::Ok().finish()))

View file

@ -125,7 +125,7 @@ where
/// assert_eq!(resp.status(), StatusCode::OK);
/// }
/// ```
pub async fn call_service<S, R, B, E>(app: &mut S, req: R) -> S::Response
pub async fn call_service<S, R, B, E>(app: &S, req: R) -> S::Response
where
S: Service<Request = R, Response = WebResponse<B>, Error = E>,
E: std::fmt::Debug,
@ -159,7 +159,7 @@ where
/// assert_eq!(result, Bytes::from_static(b"welcome!"));
/// }
/// ```
pub async fn read_response<S, B>(app: &mut S, req: Request) -> Bytes
pub async fn read_response<S, B>(app: &S, req: Request) -> Bytes
where
S: Service<Request = Request, Response = WebResponse<B>>,
B: MessageBody,
@ -262,7 +262,7 @@ where
/// let result: Person = test::read_response_json(&mut app, req).await;
/// }
/// ```
pub async fn read_response_json<S, B, T>(app: &mut S, req: Request) -> T
pub async fn read_response_json<S, B, T>(app: &S, req: Request) -> T
where
S: Service<Request = Request, Response = WebResponse<B>>,
B: MessageBody,
@ -1129,7 +1129,7 @@ mod tests {
}
}
let mut app = init_service(
let app = init_service(
App::new().service(
web::resource("/index.html")
.to(crate::web::dev::__assert_handler(async_with_block)),
@ -1149,7 +1149,7 @@ mod tests {
HttpResponse::Ok()
}
let mut app = init_service(App::new().data(10usize).service(
let app = init_service(App::new().data(10usize).service(
web::resource("/index.html").to(crate::web::dev::__assert_handler1(handler)),
))
.await;

View file

@ -21,7 +21,7 @@ async fn test_string() {
})
});
let mut conn = ntex::connect::Connector::default();
let conn = ntex::connect::Connector::default();
let addr = format!("localhost:{}", srv.addr().port());
let con = conn.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
@ -38,7 +38,7 @@ async fn test_rustls_string() {
})
});
let mut conn = ntex::connect::Connector::default();
let conn = ntex::connect::Connector::default();
let addr = format!("localhost:{}", srv.addr().port());
let con = conn.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
@ -55,13 +55,13 @@ async fn test_static_str() {
});
let resolver = ntex::connect::default_resolver();
let mut conn = ntex::connect::Connector::new(resolver.clone());
let conn = ntex::connect::Connector::new(resolver.clone());
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
let connect = Connect::new("127.0.0.1".to_owned());
let mut conn = ntex::connect::Connector::new(resolver);
let conn = ntex::connect::Connector::new(resolver);
let con = conn.call(connect).await;
assert!(con.is_err());
}
@ -83,7 +83,7 @@ async fn test_new_service() {
let factory = ntex::connect::Connector::new(resolver);
let mut conn = factory.new_service(()).await.unwrap();
let conn = factory.new_service(()).await.unwrap();
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
@ -101,7 +101,7 @@ async fn test_uri() {
})
});
let mut conn = ntex::connect::Connector::default();
let conn = ntex::connect::Connector::default();
let addr =
ntex::http::Uri::try_from(format!("https://localhost:{}", srv.addr().port()))
.unwrap();
@ -122,7 +122,7 @@ async fn test_rustls_uri() {
})
});
let mut conn = ntex::connect::Connector::default();
let conn = ntex::connect::Connector::default();
let addr =
ntex::http::Uri::try_from(format!("https://localhost:{}", srv.addr().port()))
.unwrap();

View file

@ -1,4 +1,4 @@
use std::cell::Cell;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use bytes::{Bytes, BytesMut};
@ -26,18 +26,19 @@ async fn test_basic() {
});
let item = client_item.clone();
let mut client = Builder::new(fn_service(move |conn: Connect<_, _>| async move {
let client = Builder::new(fn_service(move |conn: Connect<_, _>| async move {
let (tx, rx) = mpsc::channel();
let _ = tx.send(Bytes::from_static(b"Hello"));
Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx))))
}))
.build(fn_factory_with_config(move |mut cfg: State| {
.build(fn_factory_with_config(move |cfg: State| {
let item = item.clone();
let cfg = RefCell::new(cfg);
ok((move |t: BytesMut| {
assert_eq!(t.freeze(), Bytes::from_static(b"Hello"));
item.set(true);
// drop Sender, which will close connection
cfg.0.take();
cfg.borrow_mut().0.take();
ok::<_, ()>(None)
})
.into_service())

View file

@ -22,7 +22,7 @@ impl<T> WsService<T> {
WsService(Arc::new(Mutex::new((PhantomData, Cell::new(false)))))
}
fn set_polled(&mut self) {
fn set_polled(&self) {
*self.0.lock().unwrap().1.get_mut() = true;
}
@ -46,12 +46,12 @@ where
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<(), io::Error>>>>;
fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.set_polled();
Poll::Ready(Ok(()))
}
fn call(&mut self, (req, mut framed): Self::Request) -> Self::Future {
fn call(&self, (req, mut framed): Self::Request) -> Self::Future {
let fut = async move {
let res = handshake(req.head()).unwrap().message_body(());