Revert "Introduce IoFramed trait"

This reverts commit 0de3647f5d.
This commit is contained in:
Nikolay Kim 2020-09-21 15:07:30 +06:00
parent 0de3647f5d
commit 9b958a151b
4 changed files with 148 additions and 228 deletions

View file

@ -1,9 +1,5 @@
# Changes
## [0.2.2] - 2020-09-xx
* Introduce `IoFramed` trait
## [0.2.1] - 2020-08-10
* Require `Debug` impl for `Error`

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-codec"
version = "0.2.2"
version = "0.2.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -1,12 +1,13 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io, pin::Pin};
use std::{fmt, io};
use bytes::{Buf, BytesMut};
use either::Either;
use futures_core::{ready, Stream};
use futures_sink::Sink;
use crate::{AsyncRead, AsyncWrite, BufStatus, Decoder, Encoder, IoFramed};
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};
const LW: usize = 1024;
const HW: usize = 8 * 1024;
@ -20,7 +21,7 @@ bitflags::bitflags! {
}
}
/// A unified interface to an underlying I/O object, using
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
/// `Framed` is heavily optimized for streaming io.
pub struct Framed<T, U> {
@ -38,7 +39,7 @@ where
U: Decoder + Encoder,
{
#[inline]
/// Provides an interface for reading and writing to this
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
@ -158,7 +159,7 @@ impl<T, U> Framed<T, U> {
}
#[inline]
/// Consume the `Frame`, returning rame` with different codec.
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
@ -221,24 +222,133 @@ impl<T, U> Framed<T, U> {
}
}
impl<T, U> IoFramed<U> for Framed<T, U>
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
T: AsyncWrite + Unpin,
U: Encoder,
{
fn buf_status(&self) -> BufStatus {
let len = self.write_buf.len();
if len == 0 {
BufStatus::Empty
} else if len > HW {
BufStatus::Full
} else {
BufStatus::Ready
#[inline]
/// Serialize item and Write to the inner buffer
pub fn write(
&mut self,
item: <U as Encoder>::Item,
) -> Result<(), <U as Encoder>::Error> {
let remaining = self.write_buf.capacity() - self.write_buf.len();
if remaining < LW {
self.write_buf.reserve(HW - remaining);
}
self.codec.encode(item, &mut self.write_buf)?;
Ok(())
}
#[inline]
/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}
/// Flush write buffer to underlying I/O stream.
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
log::trace!("flushing framed transport");
let len = self.write_buf.len();
if len == 0 {
return Poll::Ready(Ok(()));
}
let mut written = 0;
while written < len {
match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)));
} else {
written += n
}
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(e));
}
}
}
// remove written data
if written == len {
// flushed same amount as in buffer, we dont need to reallocate
unsafe { self.write_buf.set_len(0) }
} else {
self.write_buf.advance(written);
}
if self.write_buf.is_empty() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite + Unpin,
{
#[inline]
/// Flush write buffer and shutdown underlying I/O stream.
///
/// Close method shutdown write side of a io object and
/// then reads until disconnect or error, high level code must use
/// timeout for close operation.
pub fn close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
if !self.flags.contains(Flags::DISCONNECTED) {
// flush write buffer
ready!(Pin::new(&mut self.io).poll_flush(cx))?;
if !self.flags.contains(Flags::SHUTDOWN) {
// shutdown WRITE side
ready!(Pin::new(&mut self.io).poll_shutdown(cx)).map_err(|e| {
self.flags.insert(Flags::DISCONNECTED);
e
})?;
self.flags.insert(Flags::SHUTDOWN);
}
// read until 0 or err
let mut buf = [0u8; 512];
loop {
match ready!(Pin::new(&mut self.io).poll_read(cx, &mut buf)) {
Err(_) | Ok(0) => {
break;
}
_ => (),
}
}
self.flags.insert(Flags::DISCONNECTED);
}
log::trace!("framed transport flushed and closed");
Poll::Ready(Ok(()))
}
}
pub type ItemType<U> =
Result<<U as Decoder>::Item, Either<<U as Decoder>::Error, io::Error>>;
impl<T, U> Framed<T, U>
where
T: AsyncRead + Unpin,
U: Decoder,
{
/// Try to read underlying I/O stream and decode item.
fn read(&mut self, cx: &mut Context<'_>) -> Poll<Option<ItemType<U>>> {
pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll<Option<ItemType<U>>> {
let mut done_read = false;
loop {
@ -330,187 +440,40 @@ where
}
}
}
#[inline]
/// Serialize item and Write to the inner buffer
fn write(
&mut self,
item: <U as Encoder>::Item,
) -> Result<(), <U as Encoder>::Error> {
let remaining = self.write_buf.capacity() - self.write_buf.len();
if remaining < LW {
self.write_buf.reserve(HW - remaining);
}
self.codec.encode(item, &mut self.write_buf)?;
Ok(())
}
#[inline]
/// Flush write buffer to underlying I/O stream.
fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
log::trace!("flushing framed transport");
let len = self.write_buf.len();
if len == 0 {
return Poll::Ready(Ok(()));
}
let mut written = 0;
while written < len {
match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)));
} else {
written += n
}
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(e));
}
}
}
// remove written data
if written == len {
// flushed same amount as in buffer, we dont need to reallocate
unsafe { self.write_buf.set_len(0) }
} else {
self.write_buf.advance(written);
}
if self.write_buf.is_empty() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
#[inline]
/// Flush write buffer and shutdown underlying I/O stream.
///
/// Close method shutdown write side of a io object and
/// then reads until disconnect or error, high level code must use
/// timeout for close operation.
fn close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
if !self.flags.contains(Flags::DISCONNECTED) {
// flush write buffer
ready!(Pin::new(&mut self.io).poll_flush(cx))?;
if !self.flags.contains(Flags::SHUTDOWN) {
// shutdown WRITE side
ready!(Pin::new(&mut self.io).poll_shutdown(cx)).map_err(|e| {
self.flags.insert(Flags::DISCONNECTED);
e
})?;
self.flags.insert(Flags::SHUTDOWN);
}
// read until 0 or err
let mut buf = [0u8; 512];
loop {
match ready!(Pin::new(&mut self.io).poll_read(cx, &mut buf)) {
Err(_) | Ok(0) => {
break;
}
_ => (),
}
}
self.flags.insert(Flags::DISCONNECTED);
}
log::trace!("framed transport flushed and closed");
Poll::Ready(Ok(()))
}
}
#[doc(hidden)]
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite + Unpin,
U: Encoder + Decoder + Unpin,
{
#[inline]
/// Serialize item and Write to the inner buffer
pub fn write(
&mut self,
item: <U as Encoder>::Item,
) -> Result<(), <U as Encoder>::Error> {
<Self as IoFramed<_>>::write(self, item)
}
#[inline]
/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}
#[inline]
/// Flush write buffer to underlying I/O stream.
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
<Self as IoFramed<_>>::flush(self, cx)
}
#[inline]
/// Flush write buffer and shutdown underlying I/O stream.
///
/// Close method shutdown write side of a io object and
/// then reads until disconnect or error, high level code must use
/// timeout for close operation.
pub fn close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
<Self as IoFramed<_>>::close(self, cx)
}
/// Try to read underlying I/O stream and decode item.
pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll<Option<ItemType<U>>> {
<Self as IoFramed<_>>::read(self, cx)
}
}
pub type ItemType<U> =
Result<<U as Decoder>::Item, Either<<U as Decoder>::Error, io::Error>>;
impl<T, U> Stream for Framed<T, U>
where
T: AsyncRead + AsyncWrite + Unpin,
U: Encoder + Decoder + Unpin,
T: AsyncRead + Unpin,
U: Decoder + Unpin,
{
type Item = Result<<U as Decoder>::Item, Either<<U as Decoder>::Error, io::Error>>;
type Item = Result<U::Item, Either<U::Error, io::Error>>;
#[inline]
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.read(cx)
self.next_item(cx)
}
}
impl<T, U> Sink<<U as Encoder>::Item> for Framed<T, U>
impl<T, U> Sink<U::Item> for Framed<T, U>
where
T: AsyncRead + AsyncWrite + Unpin,
U: Encoder + Decoder + Unpin,
U: Encoder + Unpin,
{
type Error = Either<<U as Encoder>::Error, io::Error>;
type Error = Either<U::Error, io::Error>;
#[inline]
fn poll_ready(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
if self.buf_status() == BufStatus::Full {
Poll::Pending
} else {
if self.is_write_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
@ -693,19 +656,21 @@ mod tests {
let mut server = Framed::new(server, BytesCodec);
client.read_pending();
assert!(lazy(|cx| Pin::new(&mut server).read(cx)).await.is_pending());
assert!(lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.is_pending());
client.write(b"GET /test HTTP/1.1\r\n\r\n");
client.close().await;
let item = lazy(|cx| Pin::new(&mut server).read(cx))
let item = lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.unwrap().unwrap().freeze());
assert_eq!(
item,
Poll::Ready(Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n"))
);
let item = lazy(|cx| Pin::new(&mut server).read(cx))
let item = lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.is_none());
assert_eq!(item, Poll::Ready(true));
@ -717,12 +682,14 @@ mod tests {
let mut server = Framed::new(server, BytesCodec);
client.read_pending();
assert!(lazy(|cx| Pin::new(&mut server).read(cx)).await.is_pending());
assert!(lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.is_pending());
client.write(b"GET /test HTTP/1.1\r\n\r\n");
client.read_error(io::Error::new(io::ErrorKind::Other, "error"));
let item = lazy(|cx| Pin::new(&mut server).read(cx))
let item = lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.unwrap().unwrap().freeze());
assert_eq!(
@ -730,7 +697,7 @@ mod tests {
Poll::Ready(Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n"))
);
assert_eq!(
lazy(|cx| Pin::new(&mut server).read(cx))
lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.unwrap().is_err()),
Poll::Ready(true)

View file

@ -8,10 +8,6 @@
//! [`AsyncWrite`]: #
#![deny(rust_2018_idioms, warnings)]
use either::Either;
use std::io;
use std::task::{Context, Poll};
mod bcodec;
mod decoder;
mod encoder;
@ -23,42 +19,3 @@ pub use self::encoder::Encoder;
pub use self::framed::{Framed, FramedParts};
pub use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BufStatus {
/// Buffer is empty
Empty,
/// Buffer is ready for write operation
Ready,
/// Buffer is full
Full,
}
/// A unified interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
pub trait IoFramed<U: Decoder + Encoder>: Unpin {
/// Write buffer status
fn buf_status(&self) -> BufStatus;
/// Try to read underlying I/O stream and decode item.
fn read(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
Option<Result<<U as Decoder>::Item, Either<<U as Decoder>::Error, io::Error>>>,
>;
/// Serialize item and write to the inner buffer
fn write(&mut self, item: <U as Encoder>::Item)
-> Result<(), <U as Encoder>::Error>;
/// Flush write buffer to underlying I/O stream.
fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
/// Flush write buffer and shutdown underlying I/O stream.
///
/// Close method shutdown write side of a io object and
/// then reads until disconnect or error, high level code must use
/// timeout for close operation.
fn close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
}