From a440ebb345849eedea96d36fc9ee3e32faae6063 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 31 Mar 2020 10:01:06 +0600 Subject: [PATCH] cleanup codec::Framed --- ntex-codec/Cargo.toml | 6 +- ntex-codec/src/framed.rs | 117 +++++++++++++++-------------- ntex/src/framed/connect.rs | 6 +- ntex/src/framed/dispatcher.rs | 2 +- ntex/src/framed/service.rs | 20 ++--- ntex/src/framed/transport.rs | 28 ++----- ntex/src/http/client/connection.rs | 4 +- ntex/src/http/h1/utils.rs | 2 +- ntex/src/util/framed.rs | 28 ++----- ntex/src/ws/dispatcher.rs | 6 +- 10 files changed, 93 insertions(+), 126 deletions(-) diff --git a/ntex-codec/Cargo.toml b/ntex-codec/Cargo.toml index a32234a6..17408325 100644 --- a/ntex-codec/Cargo.toml +++ b/ntex-codec/Cargo.toml @@ -17,9 +17,9 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" -bytes = "0.5.2" -futures-core = "0.3.1" -futures-sink = "0.3.1" +bytes = "0.5.4" +futures-core = "0.3.4" +futures-sink = "0.3.4" tokio = { version = "0.2.4", default-features=false } tokio-util = { version = "0.2.0", default-features=false, features=["codec"] } log = "0.4" \ No newline at end of file diff --git a/ntex-codec/src/framed.rs b/ntex-codec/src/framed.rs index bbace51e..66a02b8c 100644 --- a/ntex-codec/src/framed.rs +++ b/ntex-codec/src/framed.rs @@ -195,16 +195,17 @@ impl Framed { } } -impl Framed { +impl Framed +where + T: AsyncWrite + Unpin, + U: Encoder, +{ + #[inline] /// Serialize item and Write to the inner buffer pub fn write( &mut self, item: ::Item, - ) -> Result<(), ::Error> - where - T: AsyncWrite, - U: Encoder, - { + ) -> Result<(), ::Error> { let remaining = self.write_buf.capacity() - self.write_buf.len(); if remaining < LW { self.write_buf.reserve(HW - remaining); @@ -214,6 +215,7 @@ impl Framed { Ok(()) } + #[inline] /// Check if framed is able to write more data. /// /// `Framed` object considers ready if there is free space in write buffer. @@ -221,6 +223,48 @@ impl Framed { self.write_buf.len() < HW } + /// Flush write buffer to underlying I/O stream. + pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { + log::trace!("flushing framed transport"); + + while !self.write_buf.is_empty() { + log::trace!("writing; remaining={}", self.write_buf.len()); + + let n = ready!(Pin::new(&mut self.io).poll_write(cx, &self.write_buf))?; + if n == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ) + .into())); + } + + // remove written data + self.write_buf.advance(n); + } + + // Try flushing the underlying IO + ready!(Pin::new(&mut self.io).poll_flush(cx))?; + + log::trace!("framed transport flushed"); + Poll::Ready(Ok(())) + } + + #[inline] + /// Flush write buffer and shutdown underlying I/O stream. + pub fn close(&mut self, cx: &mut Context<'_>) -> Poll> { + ready!(Pin::new(&mut self.io).poll_flush(cx))?; + ready!(Pin::new(&mut self.io).poll_shutdown(cx))?; + log::trace!("framed transport flushed and closed"); + Poll::Ready(Ok(())) + } +} + +impl Framed +where + T: AsyncRead + Unpin, + U: Decoder, +{ /// Try to read underlying I/O stream and decode item. pub fn next_item( &mut self, @@ -267,9 +311,8 @@ impl Framed { if remaining < LW { self.read_buf.reserve(HW - remaining) } - let cnt = match unsafe { - Pin::new_unchecked(&mut self.io).poll_read_buf(cx, &mut self.read_buf) - } { + let cnt = match Pin::new(&mut self.io).poll_read_buf(cx, &mut self.read_buf) + { Poll::Pending => return Poll::Pending, Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), Poll::Ready(Ok(cnt)) => cnt, @@ -281,62 +324,16 @@ impl Framed { self.flags.insert(Flags::READABLE); } } - - /// Flush write buffer to underlying I/O stream. - pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> - where - T: AsyncWrite, - U: Encoder, - { - log::trace!("flushing framed transport"); - - while !self.write_buf.is_empty() { - log::trace!("writing; remaining={}", self.write_buf.len()); - - let n = ready!(unsafe { - Pin::new_unchecked(&mut self.io).poll_write(cx, &self.write_buf) - })?; - - if n == 0 { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ) - .into())); - } - - // remove written data - self.write_buf.advance(n); - } - - // Try flushing the underlying IO - ready!(unsafe { Pin::new_unchecked(&mut self.io).poll_flush(cx) })?; - - log::trace!("framed transport flushed"); - Poll::Ready(Ok(())) - } - - /// Flush write buffer and shutdown underlying I/O stream. - pub fn close(&mut self, cx: &mut Context<'_>) -> Poll> - where - T: AsyncWrite, - U: Encoder, - { - unsafe { - ready!(Pin::new_unchecked(&mut self.io).poll_flush(cx))?; - ready!(Pin::new_unchecked(&mut self.io).poll_shutdown(cx))?; - } - Poll::Ready(Ok(())) - } } impl Stream for Framed where - T: AsyncRead, + T: AsyncRead + Unpin, U: Decoder, { type Item = Result; + #[inline] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -347,12 +344,13 @@ where impl Sink for Framed where - T: AsyncWrite, + T: AsyncWrite + Unpin, U: Encoder, U::Error: From, { type Error = U::Error; + #[inline] fn poll_ready( self: Pin<&mut Self>, _: &mut Context<'_>, @@ -364,6 +362,7 @@ where } } + #[inline] fn start_send( mut self: Pin<&mut Self>, item: ::Item, @@ -371,6 +370,7 @@ where self.write(item) } + #[inline] fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -378,6 +378,7 @@ where self.flush(cx) } + #[inline] fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/ntex/src/framed/connect.rs b/ntex/src/framed/connect.rs index ec83c66f..fbef748c 100644 --- a/ntex/src/framed/connect.rs +++ b/ntex/src/framed/connect.rs @@ -17,7 +17,7 @@ where impl Connect where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder, { pub(crate) fn new(io: Io) -> Self { @@ -80,7 +80,7 @@ impl ConnectResult Stream for ConnectResult where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder, { type Item = Result<::Item, ::Error>; @@ -96,7 +96,7 @@ where impl futures::Sink<::Item> for ConnectResult where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder, { type Error = ::Error; diff --git a/ntex/src/framed/dispatcher.rs b/ntex/src/framed/dispatcher.rs index 744bdc33..2c7079cf 100644 --- a/ntex/src/framed/dispatcher.rs +++ b/ntex/src/framed/dispatcher.rs @@ -86,7 +86,7 @@ where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, diff --git a/ntex/src/framed/service.rs b/ntex/src/framed/service.rs index b0a17a13..be392f0f 100644 --- a/ntex/src/framed/service.rs +++ b/ntex/src/framed/service.rs @@ -33,7 +33,7 @@ where Response = ConnectResult, >, C::Error: fmt::Debug, - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -79,7 +79,7 @@ pub struct FactoryBuilder { impl FactoryBuilder where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: ServiceFactory< Config = (), Request = Connect, @@ -132,7 +132,7 @@ pub struct FramedService { impl ServiceFactory for FramedService where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: ServiceFactory< Config = (), Request = Connect, @@ -173,7 +173,7 @@ where #[pin_project::pin_project] pub struct FramedServiceResponse where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: ServiceFactory< Config = (), Request = Connect, @@ -201,7 +201,7 @@ where impl Future for FramedServiceResponse where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: ServiceFactory< Config = (), Request = Connect, @@ -245,7 +245,7 @@ pub struct FramedServiceImpl { impl Service for FramedServiceImpl where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: Service< Request = Connect, Response = ConnectResult, @@ -309,7 +309,7 @@ where >, ::Error: 'static, ::Future: 'static, - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -336,7 +336,7 @@ where >, ::Error: 'static, ::Future: 'static, - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -376,7 +376,7 @@ where >, ::Error: 'static, ::Future: 'static, - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -403,7 +403,7 @@ where >, ::Error: 'static, ::Future: 'static, - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, diff --git a/ntex/src/framed/transport.rs b/ntex/src/framed/transport.rs index bd91750f..d1b724aa 100644 --- a/ntex/src/framed/transport.rs +++ b/ntex/src/framed/transport.rs @@ -29,7 +29,7 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, U: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -70,7 +70,7 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -130,16 +130,7 @@ where &mut self.framed } - fn poll_read(&mut self, cx: &mut Context<'_>) -> bool - where - S: Service, Response = Response>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - { + fn poll_read(&mut self, cx: &mut Context<'_>) -> bool { loop { match self.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { @@ -171,16 +162,7 @@ where } /// write to framed object - fn poll_write(&mut self, cx: &mut Context<'_>) -> bool - where - S: Service, Response = Response>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - { + fn poll_write(&mut self, cx: &mut Context<'_>) -> bool { loop { while !self.framed.is_write_buf_full() { match Pin::new(&mut self.rx).poll_next(cx) { @@ -226,7 +208,7 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, diff --git a/ntex/src/http/client/connection.rs b/ntex/src/http/client/connection.rs index 7776a586..abf0a7ec 100644 --- a/ntex/src/http/client/connection.rs +++ b/ntex/src/http/client/connection.rs @@ -41,7 +41,9 @@ pub trait Connection { fn open_tunnel>(self, head: H) -> Self::TunnelFuture; } -pub(super) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { +pub(super) trait ConnectionLifetime: + AsyncRead + AsyncWrite + Unpin + 'static +{ /// Close connection fn close(&mut self); diff --git a/ntex/src/http/h1/utils.rs b/ntex/src/http/h1/utils.rs index f43fbe0c..eb06d524 100644 --- a/ntex/src/http/h1/utils.rs +++ b/ntex/src/http/h1/utils.rs @@ -33,7 +33,7 @@ where impl Future for SendResponse where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, B: MessageBody, { type Output = Result, Box>; diff --git a/ntex/src/util/framed.rs b/ntex/src/util/framed.rs index e4dda483..75989c39 100644 --- a/ntex/src/util/framed.rs +++ b/ntex/src/util/framed.rs @@ -76,7 +76,7 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, U: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -117,7 +117,7 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -177,16 +177,7 @@ where &mut self.framed } - fn poll_read(&mut self, cx: &mut Context<'_>) -> bool - where - S: Service, Response = Response>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - { + fn poll_read(&mut self, cx: &mut Context<'_>) -> bool { loop { match self.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { @@ -219,16 +210,7 @@ where } /// write to framed object - fn poll_write(&mut self, cx: &mut Context<'_>) -> bool - where - S: Service, Response = Response>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - { + fn poll_write(&mut self, cx: &mut Context<'_>) -> bool { loop { while !self.framed.is_write_buf_full() { match Pin::new(&mut self.rx).poll_next(cx) { @@ -275,7 +257,7 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, diff --git a/ntex/src/ws/dispatcher.rs b/ntex/src/ws/dispatcher.rs index d842c820..ae9448db 100644 --- a/ntex/src/ws/dispatcher.rs +++ b/ntex/src/ws/dispatcher.rs @@ -12,14 +12,14 @@ use super::{Codec, Frame, Message}; pub struct Dispatcher where S: Service + 'static, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, { inner: framed::Dispatcher, } impl Dispatcher where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Future: 'static, S::Error: 'static, @@ -39,7 +39,7 @@ where impl Future for Dispatcher where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Future: 'static, S::Error: 'static,