From 8a6c476d02dabd1ed3bad19e073dc769385a7bfc Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 10 Aug 2020 14:04:40 +0600 Subject: [PATCH] Remove From constraint from Encoder and Decoder traits --- ntex-codec/CHANGES.md | 6 ++++++ ntex-codec/Cargo.toml | 14 +++++++------- ntex-codec/src/decoder.rs | 34 ++++++++++++++++++++++++++++++++++ ntex-codec/src/encoder.rs | 17 +++++++++++++++++ ntex-codec/src/framed.rs | 22 +++++++++++----------- ntex-codec/src/lib.rs | 7 +++++-- 6 files changed, 80 insertions(+), 20 deletions(-) create mode 100644 ntex-codec/src/decoder.rs create mode 100644 ntex-codec/src/encoder.rs diff --git a/ntex-codec/CHANGES.md b/ntex-codec/CHANGES.md index 6a40e91f..fec8da6b 100644 --- a/ntex-codec/CHANGES.md +++ b/ntex-codec/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.2.0] - 2020-08-xx + +* Include custom `Encoder` and `Decoder` traits + +* Remove `From` constraint from `Encoder` and `Decoder` traits + ## [0.1.2] - 2020-04-17 * Do not swallow unprocessed data on read errors diff --git a/ntex-codec/Cargo.toml b/ntex-codec/Cargo.toml index b0ea312c..dfdcc5ce 100644 --- a/ntex-codec/Cargo.toml +++ b/ntex-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-codec" -version = "0.1.2" +version = "0.2.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,12 +18,12 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" bytes = "0.5.6" -futures-core = "0.3.4" -futures-sink = "0.3.4" -tokio = { version = "0.2.6", default-features=false } -tokio-util = { version = "0.2.0", default-features=false, features=["codec"] } +either = "1.5.3" +futures-core = "0.3.5" +futures-sink = "0.3.5" log = "0.4" +tokio = { version = "0.2.6", default-features=false } [dev-dependencies] -ntex = "0.1.4" -futures = "0.3.4" +ntex = "0.1.20" +futures = "0.3.5" diff --git a/ntex-codec/src/decoder.rs b/ntex-codec/src/decoder.rs new file mode 100644 index 00000000..a5315a6f --- /dev/null +++ b/ntex-codec/src/decoder.rs @@ -0,0 +1,34 @@ +use bytes::BytesMut; + +/// Decoding of frames via buffers. +pub trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + /// + /// If an individual message is ill-formed but can be ignored without + /// interfering with the processing of future messages, it may be more + /// useful to report the failure as an `Item`. + type Error; + + /// Attempts to decode a frame from the provided buffer of bytes. + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the underlying I/O. + /// + /// This method defaults to calling `decode` and returns an error if + /// `Ok(None)` is returned while there is unconsumed data in `buf`. + /// Typically this doesn't need to be implemented unless the framing + /// protocol differs near the end of the stream. + fn decode_eof( + &mut self, + buf: &mut BytesMut, + ) -> Result, Self::Error> { + match self.decode(buf)? { + Some(frame) => Ok(Some(frame)), + None => Ok(None), + } + } +} diff --git a/ntex-codec/src/encoder.rs b/ntex-codec/src/encoder.rs new file mode 100644 index 00000000..40948960 --- /dev/null +++ b/ntex-codec/src/encoder.rs @@ -0,0 +1,17 @@ +use bytes::BytesMut; + +/// Trait of helper objects to write out messages as bytes. +pub trait Encoder { + /// The type of items consumed by the `Encoder` + type Item; + + /// The type of encoding errors. + type Error; + + /// Encodes a frame into the buffer provided. + fn encode( + &mut self, + item: Self::Item, + dst: &mut BytesMut, + ) -> Result<(), Self::Error>; +} diff --git a/ntex-codec/src/framed.rs b/ntex-codec/src/framed.rs index 7adbfbf4..359acfcb 100644 --- a/ntex-codec/src/framed.rs +++ b/ntex-codec/src/framed.rs @@ -3,6 +3,7 @@ use std::task::{Context, Poll}; use std::{fmt, io}; use bytes::{Buf, BytesMut}; +use either::Either; use futures_core::{ready, Stream}; use futures_sink::Sink; @@ -250,7 +251,7 @@ where } /// Flush write buffer to underlying I/O stream. - pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { + pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { log::trace!("flushing framed transport"); let len = self.write_buf.len(); @@ -269,8 +270,7 @@ where return Poll::Ready(Err(io::Error::new( io::ErrorKind::WriteZero, "failed to write frame to transport", - ) - .into())); + ))); } else { written += n } @@ -278,7 +278,7 @@ where Poll::Ready(Err(e)) => { log::trace!("Error during flush: {}", e); self.flags.insert(Flags::DISCONNECTED); - return Poll::Ready(Err(e.into())); + return Poll::Ready(Err(e)); } } } @@ -348,7 +348,7 @@ where pub fn next_item( &mut self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>>> { let mut done_read = false; loop { @@ -364,12 +364,12 @@ where Ok(Some(frame)) => Poll::Ready(Some(Ok(frame))), Ok(None) => { if let Some(err) = self.err.take() { - Poll::Ready(Some(Err(err.into()))) + Poll::Ready(Some(Err(Either::Right(err)))) } else { Poll::Ready(None) } } - Err(e) => return Poll::Ready(Some(Err(e))), + Err(e) => return Poll::Ready(Some(Err(Either::Left(e)))), }; } @@ -380,7 +380,7 @@ where log::trace!("frame decoded from buffer"); return Poll::Ready(Some(Ok(frame))); } - Err(e) => return Poll::Ready(Some(Err(e))), + Err(e) => return Poll::Ready(Some(Err(Either::Left(e)))), _ => (), // Need more data } @@ -428,7 +428,7 @@ where self.flags.insert(Flags::EOF | Flags::READABLE); break; } else { - return Poll::Ready(Some(Err(e.into()))); + return Poll::Ready(Some(Err(Either::Right(e)))); } } } @@ -442,7 +442,7 @@ where T: AsyncRead + Unpin, U: Decoder + Unpin, { - type Item = Result; + type Item = Result>; #[inline] fn poll_next( @@ -486,7 +486,7 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.flush(cx) + self.flush(cx).map_err(From::from) } #[inline] diff --git a/ntex-codec/src/lib.rs b/ntex-codec/src/lib.rs index 0e3109ed..2ff08073 100644 --- a/ntex-codec/src/lib.rs +++ b/ntex-codec/src/lib.rs @@ -6,13 +6,16 @@ //! //! [`AsyncRead`]: # //! [`AsyncWrite`]: # -// #![deny(rust_2018_idioms, warnings)] +#![deny(rust_2018_idioms, warnings)] mod bcodec; +mod decoder; +mod encoder; mod framed; pub use self::bcodec::BytesCodec; +pub use self::decoder::Decoder; +pub use self::encoder::Encoder; pub use self::framed::{Framed, FramedParts}; pub use tokio::io::{AsyncRead, AsyncWrite}; -pub use tokio_util::codec::{Decoder, Encoder};