Remove From<io::Error> constraint from Encoder and Decoder traits

This commit is contained in:
Nikolay Kim 2020-08-10 14:04:40 +06:00
parent c4fe71d3cf
commit 8a6c476d02
6 changed files with 80 additions and 20 deletions

View file

@ -1,5 +1,11 @@
# Changes
## [0.2.0] - 2020-08-xx
* Include custom `Encoder` and `Decoder` traits
* Remove `From<io::Error>` constraint from `Encoder` and `Decoder` traits
## [0.1.2] - 2020-04-17
* Do not swallow unprocessed data on read errors

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-codec"
version = "0.1.2"
version = "0.2.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -18,12 +18,12 @@ path = "src/lib.rs"
[dependencies]
bitflags = "1.2.1"
bytes = "0.5.6"
futures-core = "0.3.4"
futures-sink = "0.3.4"
tokio = { version = "0.2.6", default-features=false }
tokio-util = { version = "0.2.0", default-features=false, features=["codec"] }
either = "1.5.3"
futures-core = "0.3.5"
futures-sink = "0.3.5"
log = "0.4"
tokio = { version = "0.2.6", default-features=false }
[dev-dependencies]
ntex = "0.1.4"
futures = "0.3.4"
ntex = "0.1.20"
futures = "0.3.5"

34
ntex-codec/src/decoder.rs Normal file
View file

@ -0,0 +1,34 @@
use bytes::BytesMut;
/// Decoding of frames via buffers.
pub trait Decoder {
/// The type of decoded frames.
type Item;
/// The type of unrecoverable frame decoding errors.
///
/// If an individual message is ill-formed but can be ignored without
/// interfering with the processing of future messages, it may be more
/// useful to report the failure as an `Item`.
type Error;
/// Attempts to decode a frame from the provided buffer of bytes.
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
/// A default method available to be called when there are no more bytes
/// available to be read from the underlying I/O.
///
/// This method defaults to calling `decode` and returns an error if
/// `Ok(None)` is returned while there is unconsumed data in `buf`.
/// Typically this doesn't need to be implemented unless the framing
/// protocol differs near the end of the stream.
fn decode_eof(
&mut self,
buf: &mut BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(buf)? {
Some(frame) => Ok(Some(frame)),
None => Ok(None),
}
}
}

17
ntex-codec/src/encoder.rs Normal file
View file

@ -0,0 +1,17 @@
use bytes::BytesMut;
/// Trait of helper objects to write out messages as bytes.
pub trait Encoder {
/// The type of items consumed by the `Encoder`
type Item;
/// The type of encoding errors.
type Error;
/// Encodes a frame into the buffer provided.
fn encode(
&mut self,
item: Self::Item,
dst: &mut BytesMut,
) -> Result<(), Self::Error>;
}

View file

@ -3,6 +3,7 @@ use std::task::{Context, Poll};
use std::{fmt, io};
use bytes::{Buf, BytesMut};
use either::Either;
use futures_core::{ready, Stream};
use futures_sink::Sink;
@ -250,7 +251,7 @@ where
}
/// Flush write buffer to underlying I/O stream.
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>> {
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
log::trace!("flushing framed transport");
let len = self.write_buf.len();
@ -269,8 +270,7 @@ where
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)
.into()));
)));
} else {
written += n
}
@ -278,7 +278,7 @@ where
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(e.into()));
return Poll::Ready(Err(e));
}
}
}
@ -348,7 +348,7 @@ where
pub fn next_item(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<U::Item, U::Error>>> {
) -> Poll<Option<Result<U::Item, Either<U::Error, io::Error>>>> {
let mut done_read = false;
loop {
@ -364,12 +364,12 @@ where
Ok(Some(frame)) => Poll::Ready(Some(Ok(frame))),
Ok(None) => {
if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err.into())))
Poll::Ready(Some(Err(Either::Right(err))))
} else {
Poll::Ready(None)
}
}
Err(e) => return Poll::Ready(Some(Err(e))),
Err(e) => return Poll::Ready(Some(Err(Either::Left(e)))),
};
}
@ -380,7 +380,7 @@ where
log::trace!("frame decoded from buffer");
return Poll::Ready(Some(Ok(frame)));
}
Err(e) => return Poll::Ready(Some(Err(e))),
Err(e) => return Poll::Ready(Some(Err(Either::Left(e)))),
_ => (), // Need more data
}
@ -428,7 +428,7 @@ where
self.flags.insert(Flags::EOF | Flags::READABLE);
break;
} else {
return Poll::Ready(Some(Err(e.into())));
return Poll::Ready(Some(Err(Either::Right(e))));
}
}
}
@ -442,7 +442,7 @@ where
T: AsyncRead + Unpin,
U: Decoder + Unpin,
{
type Item = Result<U::Item, U::Error>;
type Item = Result<U::Item, Either<U::Error, io::Error>>;
#[inline]
fn poll_next(
@ -486,7 +486,7 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.flush(cx)
self.flush(cx).map_err(From::from)
}
#[inline]

View file

@ -6,13 +6,16 @@
//!
//! [`AsyncRead`]: #
//! [`AsyncWrite`]: #
// #![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms, warnings)]
mod bcodec;
mod decoder;
mod encoder;
mod framed;
pub use self::bcodec::BytesCodec;
pub use self::decoder::Decoder;
pub use self::encoder::Encoder;
pub use self::framed::{Framed, FramedParts};
pub use tokio::io::{AsyncRead, AsyncWrite};
pub use tokio_util::codec::{Decoder, Encoder};