diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index e0583c65..1f0954f9 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -86,8 +86,7 @@ impl Filter for Base { Poll::Ready(WriteStatus::Terminate) } else if flags.intersects(Flags::IO_SHUTDOWN) { Poll::Ready(WriteStatus::Shutdown(self.0 .0.disconnect_timeout.get())) - } else if flags.contains(Flags::IO_FILTERS) - && !flags.contains(Flags::IO_FILTERS_TO) + } else if flags.contains(Flags::IO_FILTERS) && !flags.contains(Flags::IO_FILTERS_TO) { flags.insert(Flags::IO_FILTERS_TO); self.0.set_flags(flags); diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 4ba8a99f..029b7e55 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -559,9 +559,12 @@ impl Io { pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll> { // check io error if !self.0 .0.is_io_open() { - return Poll::Ready(Err(self.0 .0.error.take().unwrap_or_else(|| { - io::Error::new(io::ErrorKind::Other, "disconnected") - }))); + return Poll::Ready(Err(self + .0 + .0 + .error + .take() + .unwrap_or_else(|| io::Error::new(io::ErrorKind::Other, "disconnected")))); } if let Some(buf) = self.0 .0.write_buf.take() { diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 74197b8d..2fa15a15 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -185,11 +185,7 @@ impl IoRef { /// Encode and write item to a buffer and wake up write task /// /// Returns write buffer state, false is returned if write buffer if full. - pub fn encode( - &self, - item: U::Item, - codec: &U, - ) -> Result::Error> + pub fn encode(&self, item: U::Item, codec: &U) -> Result::Error> where U: Encoder, { diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index b3f43f78..8c2a0a87 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -29,9 +29,7 @@ pub use self::io::{Io, IoRef, OnDisconnect}; pub use self::tasks::{ReadContext, WriteContext}; pub use self::time::Timer; -pub use self::utils::{ - filter_factory, seal, sealed_service, SealedFactory, SealedService, -}; +pub use self::utils::{filter_factory, seal, sealed_service, SealedFactory, SealedService}; pub type IoBoxed = Io; @@ -148,24 +146,6 @@ pub mod rt { pub use crate::tokio_rt::*; } -#[deprecated] -#[doc(hidden)] -pub fn into_boxed( - srv: S, -) -> impl ntex_service::ServiceFactory< - Io, - Config = S::Config, - Response = S::Response, - Error = S::Error, - InitError = S::InitError, -> -where - F: Filter + 'static, - S: ntex_service::ServiceFactory, -{ - seal(srv) -} - #[cfg(test)] mod tests { use super::*; @@ -187,7 +167,8 @@ mod tests { .contains("DispatchItem::WBackPressureEnabled")); assert!(format!("{:?}", T::WBackPressureDisabled) .contains("DispatchItem::WBackPressureDisabled")); - assert!(format!("{:?}", T::KeepAliveTimeout) - .contains("DispatchItem::KeepAliveTimeout")); + assert!( + format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout") + ); } } diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 8785ed37..028dfeea 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -7,9 +7,7 @@ use ntex_bytes::{Buf, BufMut, BytesMut}; use ntex_util::future::poll_fn; use ntex_util::time::{sleep, Millis, Sleep}; -use crate::{ - types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, -}; +use crate::{types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus}; #[derive(Default)] struct AtomicWaker(Arc>>>); @@ -411,10 +409,7 @@ impl Future for ReadTask { match io.poll_read_buf(cx, &mut buf) { Poll::Pending => { - log::trace!( - "no more data in io stream, read: {:?}", - new_bytes - ); + log::trace!("no more data in io stream, read: {:?}", new_bytes); break; } Poll::Ready(Ok(n)) => { @@ -732,10 +727,7 @@ mod tokio_impl { } } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } diff --git a/ntex-io/src/tokio_impl.rs b/ntex-io/src/tokio_impl.rs index 49587a66..17df8d42 100644 --- a/ntex-io/src/tokio_impl.rs +++ b/ntex-io/src/tokio_impl.rs @@ -72,8 +72,7 @@ impl Future for ReadTask { Poll::Ready(Ok(n)) => { if n == 0 { log::trace!("io stream is disconnected"); - if let Err(e) = - this.state.release_read_buf(buf, new_bytes) + if let Err(e) = this.state.release_read_buf(buf, new_bytes) { this.state.close(Some(e)); } else { @@ -221,8 +220,7 @@ impl Future for WriteTask { } Shutdown::Flushed => { // shutdown WRITE side - match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) - { + match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) { Poll::Ready(Ok(_)) => { *st = Shutdown::Stopping; continue; @@ -386,10 +384,7 @@ impl AsyncWrite for Io { Io::poll_flush(&*self, cx, false) } - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Io::poll_shutdown(&*self, cx) } } @@ -431,10 +426,7 @@ impl AsyncWrite for IoBoxed { Self::poll_flush(&*self, cx, false) } - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Self::poll_shutdown(&*self, cx) } } @@ -446,11 +438,7 @@ mod unixstream { use super::*; impl IoStream for UnixStream { - fn start( - self, - read: ReadContext, - write: WriteContext, - ) -> Option> { + fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = Rc::new(RefCell::new(self)); tok_io::task::spawn_local(ReadTask::new(io.clone(), read)); @@ -604,8 +592,7 @@ mod unixstream { Poll::Ready(WriteStatus::Terminate) => { log::trace!("write task is instructed to terminate"); - let _ = - Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx); + let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx); this.state.close(None); Poll::Ready(()) } @@ -619,11 +606,8 @@ mod unixstream { match st { Shutdown::None => { // flush write buffer - match flush_io( - &mut *this.io.borrow_mut(), - &this.state, - cx, - ) { + match flush_io(&mut *this.io.borrow_mut(), &this.state, cx) + { Poll::Ready(true) => { *st = Shutdown::Flushed; continue; @@ -639,15 +623,16 @@ mod unixstream { } Shutdown::Flushed => { // shutdown WRITE side - match Pin::new(&mut *this.io.borrow_mut()) - .poll_shutdown(cx) + match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) { Poll::Ready(Ok(_)) => { *st = Shutdown::Stopping; continue; } Poll::Ready(Err(e)) => { - log::trace!("write task is closed with err during shutdown"); + log::trace!( + "write task is closed with err during shutdown" + ); this.state.close(Some(e)); return Poll::Ready(()); } @@ -660,8 +645,7 @@ mod unixstream { let mut io = this.io.borrow_mut(); loop { let mut read_buf = ReadBuf::new(&mut buf); - match Pin::new(&mut *io).poll_read(cx, &mut read_buf) - { + match Pin::new(&mut *io).poll_read(cx, &mut read_buf) { Poll::Ready(Err(_)) | Poll::Ready(Ok(_)) if read_buf.filled().is_empty() => { diff --git a/ntex-io/src/utils.rs b/ntex-io/src/utils.rs index f469c10b..dac95d7a 100644 --- a/ntex-io/src/utils.rs +++ b/ntex-io/src/utils.rs @@ -6,20 +6,20 @@ use ntex_util::{future::Ready, ready}; use super::{Filter, FilterFactory, Io, IoBoxed}; /// Service that converts any Io stream to IoBoxed stream -pub fn seal( +pub fn seal( srv: S, ) -> impl ServiceFactory< Io, - Config = S::Config, + C, Response = S::Response, Error = S::Error, InitError = S::InitError, > where F: Filter + 'static, - S: ServiceFactory, + S: ServiceFactory, { - fn_factory_with_config(move |cfg: S::Config| { + fn_factory_with_config(move |cfg: C| { let fut = srv.new_service(cfg); async move { let srv = fut.await?; @@ -72,19 +72,18 @@ impl Clone for SealedFactory { } } -impl ServiceFactory for SealedFactory +impl ServiceFactory for SealedFactory where F: Filter, - S: ServiceFactory>, + S: ServiceFactory>, { - type Config = S::Config; type Response = IoBoxed; type Error = S::Error; type Service = SealedService; type InitError = S::InitError; - type Future = SealedFactoryResponse; + type Future = SealedFactoryResponse; - fn new_service(&self, cfg: S::Config) -> Self::Future { + fn new_service(&self, cfg: C) -> Self::Future { SealedFactoryResponse { fut: self.inner.new_service(cfg), _t: PhantomData, @@ -138,14 +137,14 @@ where } pin_project_lite::pin_project! { - pub struct SealedFactoryResponse, R> { + pub struct SealedFactoryResponse, R, C> { #[pin] fut: S::Future, - _t: PhantomData + _t: PhantomData<(R, C)> } } -impl, R> Future for SealedFactoryResponse { +impl, R, C> Future for SealedFactoryResponse { type Output = Result, S::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -165,9 +164,7 @@ pin_project_lite::pin_project! { } } -impl>, R, F: Filter> Future - for SealedServiceResponse -{ +impl>, R, F: Filter> Future for SealedServiceResponse { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -180,12 +177,11 @@ pub struct FilterServiceFactory { _t: PhantomData, } -impl ServiceFactory> for FilterServiceFactory +impl ServiceFactory, ()> for FilterServiceFactory where T: FilterFactory + Clone, F: Filter, { - type Config = (); type Response = Io; type Error = T::Error; type Service = FilterService; diff --git a/ntex-tls/examples/rustls-server.rs b/ntex-tls/examples/rustls-server.rs index 213d2969..f167149a 100644 --- a/ntex-tls/examples/rustls-server.rs +++ b/ntex-tls/examples/rustls-server.rs @@ -17,8 +17,7 @@ async fn main() -> io::Result<()> { // load ssl keys let cert_file = &mut BufReader::new(File::open("../ntex-tls/examples/cert.pem").unwrap()); - let key_file = - &mut BufReader::new(File::open("../ntex-tls/examples/key.pem").unwrap()); + let key_file = &mut BufReader::new(File::open("../ntex-tls/examples/key.pem").unwrap()); let keys = PrivateKey(rsa_private_keys(key_file).unwrap().remove(0)); let cert_chain = certs(cert_file) .unwrap() @@ -36,8 +35,8 @@ async fn main() -> io::Result<()> { // start server server::ServerBuilder::new() .bind("basic", "127.0.0.1:8443", move |_| { - pipeline_factory(filter_factory(TlsAcceptor::new(tls_config.clone()))) - .and_then(fn_service(|io: Io<_>| async move { + pipeline_factory(filter_factory(TlsAcceptor::new(tls_config.clone()))).and_then( + fn_service(|io: Io<_>| async move { println!("New client is connected"); io.send( @@ -64,7 +63,8 @@ async fn main() -> io::Result<()> { } println!("Client is disconnected"); Ok(()) - })) + }), + ) })? .workers(1) .run() diff --git a/ntex-tls/examples/server.rs b/ntex-tls/examples/server.rs index 146f2dad..7c9946db 100644 --- a/ntex-tls/examples/server.rs +++ b/ntex-tls/examples/server.rs @@ -25,8 +25,8 @@ async fn main() -> io::Result<()> { // start server server::ServerBuilder::new() .bind("basic", "127.0.0.1:8443", move |_| { - pipeline_factory(filter_factory(SslAcceptor::new(acceptor.clone()))) - .and_then(fn_service(|io: Io<_>| async move { + pipeline_factory(filter_factory(SslAcceptor::new(acceptor.clone()))).and_then( + fn_service(|io: Io<_>| async move { println!("New client is connected"); loop { match io.recv(&codec::BytesCodec).await { @@ -45,7 +45,8 @@ async fn main() -> io::Result<()> { } println!("Client is disconnected"); Ok(()) - })) + }), + ) })? .workers(1) .run() diff --git a/ntex-tls/src/openssl/accept.rs b/ntex-tls/src/openssl/accept.rs index 630e35c4..0be9523c 100644 --- a/ntex-tls/src/openssl/accept.rs +++ b/ntex-tls/src/openssl/accept.rs @@ -56,15 +56,15 @@ impl Clone for Acceptor { } } -impl ServiceFactory> for Acceptor { +impl ServiceFactory, C> for Acceptor { type Response = Io>; type Error = Box; - type Config = (); type Service = AcceptorService; type InitError = (); type Future = Ready; - fn new_service(&self, _: ()) -> Self::Future { + #[inline] + fn new_service(&self, _: C) -> Self::Future { MAX_SSL_ACCEPT_COUNTER.with(|conns| { Ready::Ok(AcceptorService { acceptor: self.acceptor.clone(), diff --git a/ntex-tls/src/openssl/mod.rs b/ntex-tls/src/openssl/mod.rs index 410dcaad..6035cd8b 100644 --- a/ntex-tls/src/openssl/mod.rs +++ b/ntex-tls/src/openssl/mod.rs @@ -65,17 +65,16 @@ impl io::Write for IoInner { impl Filter for SslFilter { fn query(&self, id: any::TypeId) -> Option> { if id == any::TypeId::of::() { - let proto = if let Some(protos) = - self.inner.borrow().ssl().selected_alpn_protocol() - { - if protos.windows(2).any(|window| window == b"h2") { - types::HttpProtocol::Http2 + let proto = + if let Some(protos) = self.inner.borrow().ssl().selected_alpn_protocol() { + if protos.windows(2).any(|window| window == b"h2") { + types::HttpProtocol::Http2 + } else { + types::HttpProtocol::Http1 + } } else { types::HttpProtocol::Http1 - } - } else { - types::HttpProtocol::Http1 - }; + }; Some(Box::new(proto)) } else { self.inner.borrow().get_ref().inner.query(id) @@ -160,12 +159,12 @@ impl Filter for SslFilter { let (hw, lw) = pool.read_params().unpack(); // get inner filter buffer - let mut buf = - if let Some(buf) = self.inner.borrow().get_ref().inner.get_read_buf() { - buf - } else { - BytesMut::with_capacity_in(lw, pool) - }; + let mut buf = if let Some(buf) = self.inner.borrow().get_ref().inner.get_read_buf() + { + buf + } else { + BytesMut::with_capacity_in(lw, pool) + }; let mut new_bytes = 0; loop { @@ -330,10 +329,8 @@ impl FilterFactory for SslConnector { }) })?; - poll_fn(|cx| { - handle_result(st.filter().inner.borrow_mut().connect(), &st, cx) - }) - .await?; + poll_fn(|cx| handle_result(st.filter().inner.borrow_mut().connect(), &st, cx)) + .await?; Ok(st) }) diff --git a/ntex-tls/src/rustls/accept.rs b/ntex-tls/src/rustls/accept.rs index 59e6dc69..c18f410a 100644 --- a/ntex-tls/src/rustls/accept.rs +++ b/ntex-tls/src/rustls/accept.rs @@ -51,16 +51,15 @@ impl Clone for Acceptor { } } -impl ServiceFactory> for Acceptor { +impl ServiceFactory, C> for Acceptor { type Response = Io>; type Error = io::Error; type Service = AcceptorService; - type Config = (); type InitError = (); type Future = Ready; - fn new_service(&self, _: ()) -> Self::Future { + fn new_service(&self, _: C) -> Self::Future { MAX_SSL_ACCEPT_COUNTER.with(|conns| { Ready::Ok(AcceptorService { acceptor: self.inner.clone(),