From 9b958a151b0be6b33d5d51a04b8e5b8370d7cdc6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 21 Sep 2020 15:07:30 +0600 Subject: [PATCH] Revert "Introduce IoFramed trait" This reverts commit 0de3647f5d642cb3a1c9670c7ee3b53e0126394c. --- ntex-codec/CHANGES.md | 4 - ntex-codec/Cargo.toml | 2 +- ntex-codec/src/framed.rs | 327 ++++++++++++++++++--------------------- ntex-codec/src/lib.rs | 43 ----- 4 files changed, 148 insertions(+), 228 deletions(-) diff --git a/ntex-codec/CHANGES.md b/ntex-codec/CHANGES.md index 5474c104..b0c11ab3 100644 --- a/ntex-codec/CHANGES.md +++ b/ntex-codec/CHANGES.md @@ -1,9 +1,5 @@ # Changes -## [0.2.2] - 2020-09-xx - -* Introduce `IoFramed` trait - ## [0.2.1] - 2020-08-10 * Require `Debug` impl for `Error` diff --git a/ntex-codec/Cargo.toml b/ntex-codec/Cargo.toml index 1a6e15d5..dc43511c 100644 --- a/ntex-codec/Cargo.toml +++ b/ntex-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-codec" -version = "0.2.2" +version = "0.2.1" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-codec/src/framed.rs b/ntex-codec/src/framed.rs index 170c5ba3..7b2f9995 100644 --- a/ntex-codec/src/framed.rs +++ b/ntex-codec/src/framed.rs @@ -1,12 +1,13 @@ +use std::pin::Pin; use std::task::{Context, Poll}; -use std::{fmt, io, pin::Pin}; +use std::{fmt, io}; use bytes::{Buf, BytesMut}; use either::Either; use futures_core::{ready, Stream}; use futures_sink::Sink; -use crate::{AsyncRead, AsyncWrite, BufStatus, Decoder, Encoder, IoFramed}; +use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; const LW: usize = 1024; const HW: usize = 8 * 1024; @@ -20,7 +21,7 @@ bitflags::bitflags! { } } -/// A unified interface to an underlying I/O object, using +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using /// the `Encoder` and `Decoder` traits to encode and decode frames. /// `Framed` is heavily optimized for streaming io. pub struct Framed { @@ -38,7 +39,7 @@ where U: Decoder + Encoder, { #[inline] - /// Provides an interface for reading and writing to this + /// Provides a `Stream` and `Sink` interface for reading and writing to this /// `Io` object, using `Decode` and `Encode` to read and write the raw data. /// /// Raw I/O objects work with byte sequences, but higher-level code usually @@ -158,7 +159,7 @@ impl Framed { } #[inline] - /// Consume the `Frame`, returning rame` with different codec. + /// Consume the `Frame`, returning `Frame` with different codec. pub fn into_framed(self, codec: U2) -> Framed { Framed { codec, @@ -221,24 +222,133 @@ impl Framed { } } -impl IoFramed for Framed +impl Framed where - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncWrite + Unpin, + U: Encoder, { - fn buf_status(&self) -> BufStatus { - let len = self.write_buf.len(); - if len == 0 { - BufStatus::Empty - } else if len > HW { - BufStatus::Full - } else { - BufStatus::Ready + #[inline] + /// Serialize item and Write to the inner buffer + pub fn write( + &mut self, + item: ::Item, + ) -> Result<(), ::Error> { + let remaining = self.write_buf.capacity() - self.write_buf.len(); + if remaining < LW { + self.write_buf.reserve(HW - remaining); } + + self.codec.encode(item, &mut self.write_buf)?; + Ok(()) } + #[inline] + /// Check if framed is able to write more data. + /// + /// `Framed` object considers ready if there is free space in write buffer. + pub fn is_write_ready(&self) -> bool { + self.write_buf.len() < HW + } + + /// Flush write buffer to underlying I/O stream. + pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { + log::trace!("flushing framed transport"); + + let len = self.write_buf.len(); + if len == 0 { + return Poll::Ready(Ok(())); + } + + let mut written = 0; + while written < len { + match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) { + Poll::Pending => break, + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("Disconnected during flush, written {}", written); + self.flags.insert(Flags::DISCONNECTED); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))); + } else { + written += n + } + } + Poll::Ready(Err(e)) => { + log::trace!("Error during flush: {}", e); + self.flags.insert(Flags::DISCONNECTED); + return Poll::Ready(Err(e)); + } + } + } + + // remove written data + if written == len { + // flushed same amount as in buffer, we dont need to reallocate + unsafe { self.write_buf.set_len(0) } + } else { + self.write_buf.advance(written); + } + if self.write_buf.is_empty() { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } +} + +impl Framed +where + T: AsyncRead + AsyncWrite + Unpin, +{ + #[inline] + /// Flush write buffer and shutdown underlying I/O stream. + /// + /// Close method shutdown write side of a io object and + /// then reads until disconnect or error, high level code must use + /// timeout for close operation. + pub fn close(&mut self, cx: &mut Context<'_>) -> Poll> { + if !self.flags.contains(Flags::DISCONNECTED) { + // flush write buffer + ready!(Pin::new(&mut self.io).poll_flush(cx))?; + + if !self.flags.contains(Flags::SHUTDOWN) { + // shutdown WRITE side + ready!(Pin::new(&mut self.io).poll_shutdown(cx)).map_err(|e| { + self.flags.insert(Flags::DISCONNECTED); + e + })?; + self.flags.insert(Flags::SHUTDOWN); + } + + // read until 0 or err + let mut buf = [0u8; 512]; + loop { + match ready!(Pin::new(&mut self.io).poll_read(cx, &mut buf)) { + Err(_) | Ok(0) => { + break; + } + _ => (), + } + } + self.flags.insert(Flags::DISCONNECTED); + } + log::trace!("framed transport flushed and closed"); + Poll::Ready(Ok(())) + } +} + +pub type ItemType = + Result<::Item, Either<::Error, io::Error>>; + +impl Framed +where + T: AsyncRead + Unpin, + U: Decoder, +{ /// Try to read underlying I/O stream and decode item. - fn read(&mut self, cx: &mut Context<'_>) -> Poll>> { + pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> { let mut done_read = false; loop { @@ -330,187 +440,40 @@ where } } } - - #[inline] - /// Serialize item and Write to the inner buffer - fn write( - &mut self, - item: ::Item, - ) -> Result<(), ::Error> { - let remaining = self.write_buf.capacity() - self.write_buf.len(); - if remaining < LW { - self.write_buf.reserve(HW - remaining); - } - - self.codec.encode(item, &mut self.write_buf)?; - Ok(()) - } - - #[inline] - /// Flush write buffer to underlying I/O stream. - fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { - log::trace!("flushing framed transport"); - - let len = self.write_buf.len(); - if len == 0 { - return Poll::Ready(Ok(())); - } - - let mut written = 0; - while written < len { - match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) { - Poll::Pending => break, - Poll::Ready(Ok(n)) => { - if n == 0 { - log::trace!("Disconnected during flush, written {}", written); - self.flags.insert(Flags::DISCONNECTED); - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))); - } else { - written += n - } - } - Poll::Ready(Err(e)) => { - log::trace!("Error during flush: {}", e); - self.flags.insert(Flags::DISCONNECTED); - return Poll::Ready(Err(e)); - } - } - } - - // remove written data - if written == len { - // flushed same amount as in buffer, we dont need to reallocate - unsafe { self.write_buf.set_len(0) } - } else { - self.write_buf.advance(written); - } - if self.write_buf.is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - #[inline] - /// Flush write buffer and shutdown underlying I/O stream. - /// - /// Close method shutdown write side of a io object and - /// then reads until disconnect or error, high level code must use - /// timeout for close operation. - fn close(&mut self, cx: &mut Context<'_>) -> Poll> { - if !self.flags.contains(Flags::DISCONNECTED) { - // flush write buffer - ready!(Pin::new(&mut self.io).poll_flush(cx))?; - - if !self.flags.contains(Flags::SHUTDOWN) { - // shutdown WRITE side - ready!(Pin::new(&mut self.io).poll_shutdown(cx)).map_err(|e| { - self.flags.insert(Flags::DISCONNECTED); - e - })?; - self.flags.insert(Flags::SHUTDOWN); - } - - // read until 0 or err - let mut buf = [0u8; 512]; - loop { - match ready!(Pin::new(&mut self.io).poll_read(cx, &mut buf)) { - Err(_) | Ok(0) => { - break; - } - _ => (), - } - } - self.flags.insert(Flags::DISCONNECTED); - } - log::trace!("framed transport flushed and closed"); - Poll::Ready(Ok(())) - } } -#[doc(hidden)] -impl Framed -where - T: AsyncRead + AsyncWrite + Unpin, - U: Encoder + Decoder + Unpin, -{ - #[inline] - /// Serialize item and Write to the inner buffer - pub fn write( - &mut self, - item: ::Item, - ) -> Result<(), ::Error> { - >::write(self, item) - } - - #[inline] - /// Check if framed is able to write more data. - /// - /// `Framed` object considers ready if there is free space in write buffer. - pub fn is_write_ready(&self) -> bool { - self.write_buf.len() < HW - } - - #[inline] - /// Flush write buffer to underlying I/O stream. - pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { - >::flush(self, cx) - } - - #[inline] - /// Flush write buffer and shutdown underlying I/O stream. - /// - /// Close method shutdown write side of a io object and - /// then reads until disconnect or error, high level code must use - /// timeout for close operation. - pub fn close(&mut self, cx: &mut Context<'_>) -> Poll> { - >::close(self, cx) - } - - /// Try to read underlying I/O stream and decode item. - pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> { - >::read(self, cx) - } -} - -pub type ItemType = - Result<::Item, Either<::Error, io::Error>>; - impl Stream for Framed where - T: AsyncRead + AsyncWrite + Unpin, - U: Encoder + Decoder + Unpin, + T: AsyncRead + Unpin, + U: Decoder + Unpin, { - type Item = Result<::Item, Either<::Error, io::Error>>; + type Item = Result>; #[inline] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.read(cx) + self.next_item(cx) } } -impl Sink<::Item> for Framed +impl Sink for Framed where T: AsyncRead + AsyncWrite + Unpin, - U: Encoder + Decoder + Unpin, + U: Encoder + Unpin, { - type Error = Either<::Error, io::Error>; + type Error = Either; #[inline] fn poll_ready( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { - if self.buf_status() == BufStatus::Full { - Poll::Pending - } else { + if self.is_write_ready() { Poll::Ready(Ok(())) + } else { + Poll::Pending } } @@ -693,19 +656,21 @@ mod tests { let mut server = Framed::new(server, BytesCodec); client.read_pending(); - assert!(lazy(|cx| Pin::new(&mut server).read(cx)).await.is_pending()); + assert!(lazy(|cx| Pin::new(&mut server).next_item(cx)) + .await + .is_pending()); client.write(b"GET /test HTTP/1.1\r\n\r\n"); client.close().await; - let item = lazy(|cx| Pin::new(&mut server).read(cx)) + let item = lazy(|cx| Pin::new(&mut server).next_item(cx)) .await .map(|i| i.unwrap().unwrap().freeze()); assert_eq!( item, Poll::Ready(Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n")) ); - let item = lazy(|cx| Pin::new(&mut server).read(cx)) + let item = lazy(|cx| Pin::new(&mut server).next_item(cx)) .await .map(|i| i.is_none()); assert_eq!(item, Poll::Ready(true)); @@ -717,12 +682,14 @@ mod tests { let mut server = Framed::new(server, BytesCodec); client.read_pending(); - assert!(lazy(|cx| Pin::new(&mut server).read(cx)).await.is_pending()); + assert!(lazy(|cx| Pin::new(&mut server).next_item(cx)) + .await + .is_pending()); client.write(b"GET /test HTTP/1.1\r\n\r\n"); client.read_error(io::Error::new(io::ErrorKind::Other, "error")); - let item = lazy(|cx| Pin::new(&mut server).read(cx)) + let item = lazy(|cx| Pin::new(&mut server).next_item(cx)) .await .map(|i| i.unwrap().unwrap().freeze()); assert_eq!( @@ -730,7 +697,7 @@ mod tests { Poll::Ready(Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n")) ); assert_eq!( - lazy(|cx| Pin::new(&mut server).read(cx)) + lazy(|cx| Pin::new(&mut server).next_item(cx)) .await .map(|i| i.unwrap().is_err()), Poll::Ready(true) diff --git a/ntex-codec/src/lib.rs b/ntex-codec/src/lib.rs index 3a6303be..2ff08073 100644 --- a/ntex-codec/src/lib.rs +++ b/ntex-codec/src/lib.rs @@ -8,10 +8,6 @@ //! [`AsyncWrite`]: # #![deny(rust_2018_idioms, warnings)] -use either::Either; -use std::io; -use std::task::{Context, Poll}; - mod bcodec; mod decoder; mod encoder; @@ -23,42 +19,3 @@ pub use self::encoder::Encoder; pub use self::framed::{Framed, FramedParts}; pub use tokio::io::{AsyncRead, AsyncWrite}; - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum BufStatus { - /// Buffer is empty - Empty, - /// Buffer is ready for write operation - Ready, - /// Buffer is full - Full, -} - -/// A unified interface to an underlying I/O object, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -pub trait IoFramed: Unpin { - /// Write buffer status - fn buf_status(&self) -> BufStatus; - - /// Try to read underlying I/O stream and decode item. - fn read( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - Option::Item, Either<::Error, io::Error>>>, - >; - - /// Serialize item and write to the inner buffer - fn write(&mut self, item: ::Item) - -> Result<(), ::Error>; - - /// Flush write buffer to underlying I/O stream. - fn flush(&mut self, cx: &mut Context<'_>) -> Poll>; - - /// Flush write buffer and shutdown underlying I/O stream. - /// - /// Close method shutdown write side of a io object and - /// then reads until disconnect or error, high level code must use - /// timeout for close operation. - fn close(&mut self, cx: &mut Context<'_>) -> Poll>; -}