This commit is contained in:
Nikolay Kim 2024-05-28 15:47:24 +05:00
parent b25223265c
commit 8bcf988db6
24 changed files with 122 additions and 129 deletions

View file

@ -3,7 +3,7 @@
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
use ntex_service::{Pipeline, PipelineBinding, PipelineCall, Service}; use ntex_service::{IntoService, Pipeline, PipelineBinding, PipelineCall, Service};
use ntex_util::{future::Either, ready, spawn, time::Seconds}; use ntex_util::{future::Either, ready, spawn, time::Seconds};
use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError}; use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError};
@ -142,7 +142,7 @@ where
error: Option<S::Error>, error: Option<S::Error>,
flags: Flags, flags: Flags,
shared: Rc<DispatcherShared<S, U>>, shared: Rc<DispatcherShared<S, U>>,
response: Option<PipelineCall<S::Response, S::Error>>, response: Option<PipelineCall<S, DispatchItem<U>>>,
cfg: DispatcherConfig, cfg: DispatcherConfig,
read_remains: u32, read_remains: u32,
read_remains_prev: u32, read_remains_prev: u32,
@ -196,9 +196,15 @@ where
U: Decoder + Encoder + 'static, U: Decoder + Encoder + 'static,
{ {
/// Construct new `Dispatcher` instance. /// Construct new `Dispatcher` instance.
pub fn new<Io>(io: Io, codec: U, service: S, cfg: &DispatcherConfig) -> Dispatcher<S, U> pub fn new<Io, F>(
io: Io,
codec: U,
service: F,
cfg: &DispatcherConfig,
) -> Dispatcher<S, U>
where where
IoBoxed: From<Io>, IoBoxed: From<Io>,
F: IntoService<S, DispatchItem<U>>,
{ {
let io = IoBoxed::from(io); let io = IoBoxed::from(io);
io.set_disconnect_timeout(cfg.disconnect_timeout()); io.set_disconnect_timeout(cfg.disconnect_timeout());
@ -214,7 +220,7 @@ where
codec, codec,
error: Cell::new(None), error: Cell::new(None),
inflight: Cell::new(0), inflight: Cell::new(0),
service: Pipeline::new(service).bind(), service: Pipeline::new(service.into_service()).bind(),
}); });
Dispatcher { Dispatcher {

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, rc::Rc, task, task::Poll, future::poll_fn}; use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll};
use ntex_util::task::LocalWaker; use ntex_util::task::LocalWaker;
@ -31,11 +31,14 @@ impl Counter {
/// Check if counter is not at capacity. If counter at capacity /// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task. /// it registers notification for current task.
pub(super) async fn available(&self) { pub(super) async fn available(&self) {
poll_fn(|cx| if self.0.available(cx) { poll_fn(|cx| {
if self.0.available(cx) {
Poll::Ready(()) Poll::Ready(())
} else { } else {
Poll::Pending Poll::Pending
}).await }
})
.await
} }
/// Get total number of acquired counts /// Get total number of acquired counts

View file

@ -6,7 +6,7 @@ use async_broadcast::{self as bus, broadcast};
use async_channel::{unbounded, Receiver, Sender}; use async_channel::{unbounded, Receiver, Sender};
use ntex_rt::{spawn, Arbiter}; use ntex_rt::{spawn, Arbiter};
use ntex_service::{Pipeline, ServiceFactory, PipelineBinding}; use ntex_service::{Pipeline, PipelineBinding, ServiceFactory};
use ntex_util::future::{select, stream_recv, Either, Stream}; use ntex_util::future::{select, stream_recv, Either, Stream};
use ntex_util::time::{sleep, timeout_checked, Millis}; use ntex_util::time::{sleep, timeout_checked, Millis};
@ -240,8 +240,10 @@ struct WorkerSt<T, F: ServiceFactory<WorkerMessage<T>>> {
availability: WorkerAvailabilityTx, availability: WorkerAvailabilityTx,
} }
async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, WorkerMessage<T>>, mut wrk: WorkerSt<T, F>) async fn run_worker<T, F>(
where mut svc: PipelineBinding<F::Service, WorkerMessage<T>>,
mut wrk: WorkerSt<T, F>,
) where
T: Send + 'static, T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static, F: ServiceFactory<WorkerMessage<T>> + 'static,
{ {
@ -305,7 +307,13 @@ async fn create<T, F>(
stop: Receiver<Shutdown>, stop: Receiver<Shutdown>,
factory: Result<F, ()>, factory: Result<F, ()>,
availability: WorkerAvailabilityTx, availability: WorkerAvailabilityTx,
) -> Result<(PipelineBinding<F::Service, WorkerMessage<T>>, WorkerSt<T, F>), ()> ) -> Result<
(
PipelineBinding<F::Service, WorkerMessage<T>>,
WorkerSt<T, F>,
),
(),
>
where where
T: Send + 'static, T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static, F: ServiceFactory<WorkerMessage<T>> + 'static,

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, rc::Rc, task, task::Poll, future::poll_fn}; use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll};
use ntex_util::task::LocalWaker; use ntex_util::task::LocalWaker;
@ -33,11 +33,14 @@ impl Counter {
/// Check if counter is not at capacity. If counter at capacity /// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task. /// it registers notification for current task.
pub(super) async fn available(&self) { pub(super) async fn available(&self) {
poll_fn(|cx| if self.0.available(cx) { poll_fn(|cx| {
if self.0.available(cx) {
Poll::Ready(()) Poll::Ready(())
} else { } else {
Poll::Pending Poll::Pending
}).await }
})
.await
} }
} }

View file

@ -27,11 +27,7 @@ impl<T: Address> SslConnector<T> {
/// Use specified memory pool for memory allocations. By default P0 /// Use specified memory pool for memory allocations. By default P0
/// memory pool is used. /// memory pool is used.
pub fn memory_pool(self, id: PoolId) -> Self { pub fn memory_pool(self, id: PoolId) -> Self {
let connector = self let connector = self.connector.get_ref().memory_pool(id).into();
.connector
.get_ref()
.memory_pool(id)
.into();
Self { Self {
connector, connector,

View file

@ -36,11 +36,7 @@ impl<T: Address> TlsConnector<T> {
/// Use specified memory pool for memory allocations. By default P0 /// Use specified memory pool for memory allocations. By default P0
/// memory pool is used. /// memory pool is used.
pub fn memory_pool(self, id: PoolId) -> Self { pub fn memory_pool(self, id: PoolId) -> Self {
let connector = self let connector = self.connector.get_ref().memory_pool(id).into();
.connector
.get_ref()
.memory_pool(id)
.into();
Self { Self {
connector, connector,
config: self.config, config: self.config,

View file

@ -1,11 +1,11 @@
use std::{fmt, task::Context, task::Poll, time::Duration}; use std::{fmt, time::Duration};
use ntex_h2::{self as h2}; use ntex_h2::{self as h2};
use crate::connect::{Connect as TcpConnect, Connector as TcpConnector}; use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
use crate::service::{apply_fn, boxed, Service, ServiceCtx}; use crate::service::{apply_fn, boxed, Service, ServiceCtx};
use crate::time::{Millis, Seconds}; use crate::time::{Millis, Seconds};
use crate::util::{timeout::TimeoutError, timeout::TimeoutService}; use crate::util::{join, timeout::TimeoutError, timeout::TimeoutService};
use crate::{http::Uri, io::IoBoxed}; use crate::{http::Uri, io::IoBoxed};
use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect}; use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect};
@ -273,31 +273,20 @@ where
type Response = <ConnectionPool<T> as Service<Connect>>::Response; type Response = <ConnectionPool<T> as Service<Connect>>::Response;
type Error = ConnectError; type Error = ConnectError;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let ready = self.tcp_pool.poll_ready(cx)?.is_ready(); if let Some(ref ssl_pool) = self.ssl_pool {
let ready = if let Some(ref ssl_pool) = self.ssl_pool { let (r1, r2) = join(ctx.ready(&self.tcp_pool), ctx.ready(ssl_pool)).await;
ssl_pool.poll_ready(cx)?.is_ready() && ready r1?;
r2
} else { } else {
ready ctx.ready(&self.tcp_pool).await
};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
} }
} }
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { async fn shutdown(&self) {
let tcp_ready = self.tcp_pool.poll_shutdown(cx).is_ready(); self.tcp_pool.shutdown().await;
let ssl_ready = self if let Some(ref ssl_pool) = self.ssl_pool {
.ssl_pool ssl_pool.shutdown().await;
.as_ref()
.map(|pool| pool.poll_shutdown(cx).is_ready())
.unwrap_or(true);
if tcp_ready && ssl_ready {
Poll::Ready(())
} else {
Poll::Pending
} }
} }
@ -322,11 +311,11 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::util::lazy; use crate::{service::Pipeline, util::lazy};
#[crate::rt_test] #[crate::rt_test]
async fn test_readiness() { async fn test_readiness() {
let conn = Connector::default().finish(); let conn = Pipeline::new(Connector::default().finish()).bind();
assert!(lazy(|cx| conn.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| conn.poll_ready(cx).is_ready()).await);
assert!(lazy(|cx| conn.poll_shutdown(cx).is_ready()).await); assert!(lazy(|cx| conn.poll_shutdown(cx).is_ready()).await);
} }

View file

@ -80,7 +80,7 @@ where
if !eof { if !eof {
// sending body is async process, we can handle upload and download // sending body is async process, we can handle upload and download
// at the same time // at the same time
crate::rt::spawn(async move { let _ = crate::rt::spawn(async move {
if let Err(e) = send_body(body, &snd_stream).await { if let Err(e) = send_body(body, &snd_stream).await {
log::error!("Cannot send body: {:?}", e); log::error!("Cannot send body: {:?}", e);
snd_stream.reset(frame::Reason::INTERNAL_ERROR); snd_stream.reset(frame::Reason::INTERNAL_ERROR);
@ -125,7 +125,7 @@ async fn get_response(
log::debug!("Creating local payload stream for {:?}", stream.id()); log::debug!("Creating local payload stream for {:?}", stream.id());
let (mut pl, payload) = let (mut pl, payload) =
payload::Payload::create(stream.empty_capacity()); payload::Payload::create(stream.empty_capacity());
crate::rt::spawn(async move { let _ = crate::rt::spawn(async move {
loop { loop {
let h2::Message { stream, kind } = let h2::Message { stream, kind } =
match rcv_stream.recv().await { match rcv_stream.recv().await {

View file

@ -80,7 +80,7 @@ where
})); }));
// start pool support future // start pool support future
crate::rt::spawn(ConnectionPoolSupport { let _ = crate::rt::spawn(ConnectionPoolSupport {
connector: connector.clone(), connector: connector.clone(),
inner: inner.clone(), inner: inner.clone(),
waiters: waiters.clone(), waiters: waiters.clone(),
@ -117,8 +117,13 @@ where
type Response = Connection; type Response = Connection;
type Error = ConnectError; type Error = ConnectError;
crate::forward_poll_ready!(connector); async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
crate::forward_poll_shutdown!(connector); self.connector.ready().await
}
async fn shutdown(&self) {
self.connector.shutdown().await
}
async fn call( async fn call(
&self, &self,
@ -252,7 +257,7 @@ impl Inner {
|| (now - conn.created) > self.conn_lifetime || (now - conn.created) > self.conn_lifetime
{ {
if let ConnectionType::H1(io) = conn.io { if let ConnectionType::H1(io) = conn.io {
spawn(async move { let _ = spawn(async move {
let _ = io.shutdown().await; let _ = io.shutdown().await;
}); });
} }
@ -419,7 +424,7 @@ where
let disconnect_timeout = inner.borrow().disconnect_timeout; let disconnect_timeout = inner.borrow().disconnect_timeout;
#[allow(clippy::redundant_async_block)] #[allow(clippy::redundant_async_block)]
spawn(async move { let _ = spawn(async move {
OpenConnection::<T> { OpenConnection::<T> {
tx: Some(tx), tx: Some(tx),
key: key.clone(), key: key.clone(),
@ -576,7 +581,7 @@ impl Acquired {
); );
match io { match io {
ConnectionType::H1(io) => { ConnectionType::H1(io) => {
spawn(async move { let _ = spawn(async move {
let _ = io.shutdown().await; let _ = io.shutdown().await;
}); });
} }
@ -634,7 +639,8 @@ mod tests {
h2::Config::client(), h2::Config::client(),
) )
.clone(), .clone(),
); )
.bind();
// uri must contain authority // uri must contain authority
let req = Connect { let req = Connect {

View file

@ -325,7 +325,7 @@ impl DateService {
// periodic date update // periodic date update
let s = self.clone(); let s = self.clone();
crate::rt::spawn(async move { let _ = crate::rt::spawn(async move {
sleep(Millis(500)).await; sleep(Millis(500)).await;
s.0.current.set(false); s.0.current.set(false);
}); });

View file

@ -210,7 +210,7 @@ impl<F: Filter> Upgrade<F> {
H: FnOnce(Request, Io<F>, Codec) -> R + 'static, H: FnOnce(Request, Io<F>, Codec) -> R + 'static,
R: Future<Output = O>, R: Future<Output = O>,
{ {
crate::rt::spawn(async move { let _ = crate::rt::spawn(async move {
let _ = f(self.req, self.io, self.codec).await; let _ = f(self.req, self.io, self.codec).await;
}); });
ControlAck { ControlAck {

View file

@ -1,4 +1,4 @@
use std::{error::Error, fmt, marker, rc::Rc, task::Context, task::Poll}; use std::{error::Error, fmt, marker, rc::Rc};
use crate::http::body::MessageBody; use crate::http::body::MessageBody;
use crate::http::config::{DispatcherConfig, ServiceConfig}; use crate::http::config::{DispatcherConfig, ServiceConfig};
@ -209,11 +209,10 @@ where
type Response = (); type Response = ();
type Error = DispatchError; type Error = DispatchError;
async fn ready(&self) -> Result<(), Self::Error> { async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let cfg = self.config.as_ref(); let cfg = self.config.as_ref();
let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await; let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
ready1.map_err(|e| { ready1.map_err(|e| {
log::error!("Http control service readiness error: {:?}", e); log::error!("Http control service readiness error: {:?}", e);
DispatchError::Control(Box::new(e)) DispatchError::Control(Box::new(e))

View file

@ -1,5 +1,4 @@
use std::{cell::RefCell, io, task::Context, task::Poll}; use std::{cell::RefCell, error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc};
use std::{error::Error, fmt, future::poll_fn, marker, mem, rc::Rc};
use ntex_h2::{self as h2, frame::StreamId, server}; use ntex_h2::{self as h2, frame::StreamId, server};

View file

@ -1,7 +1,8 @@
use std::{error, fmt, marker, rc::Rc, task::Context, task::Poll}; use std::{error, fmt, marker, rc::Rc};
use crate::io::{types, Filter, Io}; use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::join;
use super::body::MessageBody; use super::body::MessageBody;
use super::builder::HttpServiceBuilder; use super::builder::HttpServiceBuilder;
@ -289,43 +290,24 @@ where
type Response = (); type Response = ();
type Error = DispatchError; type Error = DispatchError;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let cfg = self.config.as_ref(); let cfg = self.config.as_ref();
let ready1 = cfg let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
.service ready1.map_err(|e| {
.poll_ready(cx)
.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(Box::new(e))
})?
.is_ready();
let ready2 = cfg
.control
.poll_ready(cx)
.map_err(|e| {
log::error!("Http control service readiness error: {:?}", e); log::error!("Http control service readiness error: {:?}", e);
DispatchError::Control(Box::new(e)) DispatchError::Control(Box::new(e))
})? })?;
.is_ready(); ready2.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
if ready1 && ready2 { DispatchError::Service(Box::new(e))
Poll::Ready(Ok(())) })
} else {
Poll::Pending
}
} }
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { #[inline]
let ready1 = self.config.control.poll_shutdown(cx).is_ready(); async fn shutdown(&self) {
let ready2 = self.config.service.poll_shutdown(cx).is_ready(); self.config.control.shutdown().await;
self.config.service.shutdown().await;
if ready1 && ready2 {
Poll::Ready(())
} else {
Poll::Pending
}
} }
async fn call( async fn call(

View file

@ -27,7 +27,7 @@ pub use ntex_macros::{rt_main as main, rt_test as test};
#[cfg(test)] #[cfg(test)]
pub(crate) use ntex_macros::rt_test2 as rt_test; pub(crate) use ntex_macros::rt_test2 as rt_test;
pub use ntex_service::{forward_poll_ready, forward_poll_shutdown}; pub use ntex_service::{forward_ready, forward_shutdown};
pub mod http; pub mod http;
pub mod web; pub mod web;
@ -36,8 +36,8 @@ pub mod web;
pub mod ws; pub mod ws;
pub use self::service::{ pub use self::service::{
chain, chain_factory, fn_service, into_service, IntoService, IntoServiceFactory, chain, chain_factory, fn_service, IntoService, IntoServiceFactory, Middleware,
Middleware, Pipeline, Service, ServiceCtx, ServiceFactory, Pipeline, Service, ServiceCtx, ServiceFactory,
}; };
pub use ntex_util::{channel, task}; pub use ntex_util::{channel, task};

View file

@ -1,4 +1,4 @@
use std::{cell::RefCell, marker, rc::Rc, task::Context, task::Poll}; use std::{cell::RefCell, marker, rc::Rc};
use crate::http::{Request, Response}; use crate::http::{Request, Response};
use crate::router::{Path, ResourceDef, Router}; use crate::router::{Path, ResourceDef, Router};

View file

@ -67,8 +67,8 @@ where
type Response = WebResponse; type Response = WebResponse;
type Error = S::Error; type Error = S::Error;
crate::forward_poll_ready!(service); crate::forward_ready!(service);
crate::forward_poll_shutdown!(service); crate::forward_shutdown!(service);
async fn call( async fn call(
&self, &self,

View file

@ -110,8 +110,8 @@ where
type Response = WebResponse; type Response = WebResponse;
type Error = S::Error; type Error = S::Error;
crate::forward_poll_ready!(service); crate::forward_ready!(service);
crate::forward_poll_shutdown!(service); crate::forward_shutdown!(service);
async fn call( async fn call(
&self, &self,
@ -151,7 +151,8 @@ mod tests {
DefaultHeaders::new() DefaultHeaders::new()
.header(CONTENT_TYPE, "0001") .header(CONTENT_TYPE, "0001")
.create(ok_service()), .create(ok_service()),
); )
.bind();
assert!(lazy(|cx| mw.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| mw.poll_ready(cx).is_ready()).await);
assert!(lazy(|cx| mw.poll_shutdown(cx).is_ready()).await); assert!(lazy(|cx| mw.poll_shutdown(cx).is_ready()).await);

View file

@ -139,8 +139,8 @@ where
type Response = WebResponse; type Response = WebResponse;
type Error = S::Error; type Error = S::Error;
crate::forward_poll_ready!(service); crate::forward_ready!(service);
crate::forward_poll_shutdown!(service); crate::forward_shutdown!(service);
async fn call( async fn call(
&self, &self,
@ -437,7 +437,7 @@ mod tests {
let logger = Logger::new("%% %{User-Agent}i %{X-Test}o %{HOME}e %D %% test") let logger = Logger::new("%% %{User-Agent}i %{X-Test}o %{HOME}e %D %% test")
.exclude("/test"); .exclude("/test");
let srv = Pipeline::new(Middleware::create(&logger, srv.into_service())); let srv = Pipeline::new(Middleware::create(&logger, srv.into_service())).bind();
assert!(lazy(|cx| srv.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| srv.poll_ready(cx).is_ready()).await);
assert!(lazy(|cx| srv.poll_shutdown(cx).is_ready()).await); assert!(lazy(|cx| srv.poll_shutdown(cx).is_ready()).await);

View file

@ -1,4 +1,4 @@
use std::{cell::RefCell, fmt, rc::Rc, task::Context, task::Poll}; use std::{cell::RefCell, fmt, rc::Rc};
use crate::http::Response; use crate::http::Response;
use crate::router::{IntoPattern, ResourceDef, Router}; use crate::router::{IntoPattern, ResourceDef, Router};

View file

@ -38,7 +38,7 @@ where
Either::Left(async move { Either::Left(async move {
let result = srv.call(item).await; let result = srv.call(item).await;
if let Some(s) = s { if let Some(s) = s {
rt::spawn(async move { s.io().close() }); let _ = rt::spawn(async move { s.io().close() });
} }
result result
}) })
@ -104,7 +104,7 @@ where
cfg.set_keepalive_timeout(Seconds::ZERO); cfg.set_keepalive_timeout(Seconds::ZERO);
// start websockets service dispatcher // start websockets service dispatcher
rt::spawn(async move { let _ = rt::spawn(async move {
let res = crate::io::Dispatcher::new(io, codec, srv, &cfg).await; let res = crate::io::Dispatcher::new(io, codec, srv, &cfg).await;
log::trace!("Ws handler is terminated: {:?}", res); log::trace!("Ws handler is terminated: {:?}", res);
}); });

View file

@ -18,7 +18,7 @@ use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri}
use crate::io::{ use crate::io::{
Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed, Base, DispatchItem, Dispatcher, DispatcherConfig, Filter, Io, Layer, Sealed,
}; };
use crate::service::{apply_fn, into_service, IntoService, Pipeline, Service}; use crate::service::{apply_fn, fn_service, IntoService, Pipeline, Service};
use crate::time::{timeout, Millis, Seconds}; use crate::time::{timeout, Millis, Seconds};
use crate::{channel::mpsc, rt, util::Ready, ws}; use crate::{channel::mpsc, rt, util::Ready, ws};
@ -732,12 +732,12 @@ impl WsConnection<Sealed> {
pub fn receiver(self) -> mpsc::Receiver<Result<ws::Frame, WsError<()>>> { pub fn receiver(self) -> mpsc::Receiver<Result<ws::Frame, WsError<()>>> {
let (tx, rx): (_, mpsc::Receiver<Result<ws::Frame, WsError<()>>>) = mpsc::channel(); let (tx, rx): (_, mpsc::Receiver<Result<ws::Frame, WsError<()>>>) = mpsc::channel();
rt::spawn(async move { let _ = rt::spawn(async move {
let tx2 = tx.clone(); let tx2 = tx.clone();
let io = self.io.get_ref(); let io = self.io.get_ref();
let result = self let result = self
.start(into_service(move |item: ws::Frame| { .start(fn_service(move |item: ws::Frame| {
match tx.send(Ok(item)) { match tx.send(Ok(item)) {
Ok(()) => (), Ok(()) => (),
Err(_) => io.close(), Err(_) => io.close(),

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, io, sync::Arc, sync::Mutex, task::Context, task::Poll}; use std::{cell::Cell, io, sync::Arc, sync::Mutex};
use ntex::codec::BytesCodec; use ntex::codec::BytesCodec;
use ntex::http::test::server as test_server; use ntex::http::test::server as test_server;
@ -35,9 +35,9 @@ impl Service<(Request, Io, h1::Codec)> for WsService {
type Response = (); type Response = ();
type Error = io::Error; type Error = io::Error;
fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.set_polled(); self.set_polled();
Poll::Ready(Ok(())) Ok(())
} }
async fn call( async fn call(

View file

@ -111,7 +111,7 @@ async fn test_transport() {
Dispatcher::new( Dispatcher::new(
io.seal(), io.seal(),
ws::Codec::default(), ws::Codec::default(),
ws_service, fn_service(ws_service),
&Default::default(), &Default::default(),
) )
.await .await
@ -153,7 +153,12 @@ async fn test_keepalive_timeout() {
// start websocket service // start websocket service
let cfg = DispatcherConfig::default(); let cfg = DispatcherConfig::default();
cfg.set_keepalive_timeout(Seconds::ZERO); cfg.set_keepalive_timeout(Seconds::ZERO);
Dispatcher::new(io.seal(), ws::Codec::default(), ws_service, &cfg) Dispatcher::new(
io.seal(),
ws::Codec::default(),
fn_service(ws_service),
&cfg,
)
.await .await
}) })
} else { } else {