Do not swallow unprocessed data on read errors

This commit is contained in:
Nikolay Kim 2020-04-17 11:16:17 +06:00
parent 8846453137
commit 7dad057d8c
4 changed files with 108 additions and 22 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.2] - 2020-04-17
* Do not swallow unprocessed data on read errors
## [0.1.1] - 2020-04-07
* Optimize io operations

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-codec"
version = "0.1.1"
version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -29,6 +29,7 @@ pub struct Framed<T, U> {
flags: Flags,
read_buf: BytesMut,
write_buf: BytesMut,
err: Option<io::Error>,
}
impl<T, U> Framed<T, U>
@ -54,6 +55,7 @@ where
Framed {
io,
codec,
err: None,
flags: Flags::empty(),
read_buf: BytesMut::with_capacity(HW),
write_buf: BytesMut::with_capacity(HW),
@ -87,6 +89,7 @@ impl<T, U> Framed<T, U> {
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
err: parts.err,
}
}
@ -163,6 +166,7 @@ impl<T, U> Framed<T, U> {
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
err: self.err,
}
}
@ -178,6 +182,7 @@ impl<T, U> Framed<T, U> {
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
err: self.err,
}
}
@ -193,6 +198,7 @@ impl<T, U> Framed<T, U> {
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
err: self.err,
}
}
@ -210,6 +216,7 @@ impl<T, U> Framed<T, U> {
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
err: self.err,
}
}
}
@ -353,11 +360,17 @@ where
if self.flags.contains(Flags::READABLE) {
if self.flags.contains(Flags::EOF) {
match self.codec.decode_eof(&mut self.read_buf) {
Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
Ok(None) => return Poll::Ready(None),
return match self.codec.decode_eof(&mut self.read_buf) {
Ok(Some(frame)) => Poll::Ready(Some(Ok(frame))),
Ok(None) => {
if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err.into())))
} else {
Poll::Ready(None)
}
}
Err(e) => return Poll::Ready(Some(Err(e))),
}
};
}
log::trace!("attempting to decode a frame");
@ -408,7 +421,16 @@ where
updated = true;
}
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Err(e)) => {
if updated {
done_read = true;
self.err = Some(e);
self.flags.insert(Flags::EOF | Flags::READABLE);
break;
} else {
return Poll::Ready(Some(Err(e.into())));
}
}
}
}
}
@ -507,6 +529,7 @@ pub struct FramedParts<T, U> {
pub write_buf: BytesMut,
flags: Flags,
err: Option<io::Error>,
}
impl<T, U> FramedParts<T, U> {
@ -515,6 +538,7 @@ impl<T, U> FramedParts<T, U> {
FramedParts {
io,
codec,
err: None,
flags: Flags::empty(),
read_buf: BytesMut::new(),
write_buf: BytesMut::new(),
@ -527,6 +551,7 @@ impl<T, U> FramedParts<T, U> {
io,
codec,
read_buf,
err: None,
flags: Flags::empty(),
write_buf: BytesMut::new(),
}
@ -543,6 +568,13 @@ mod tests {
use super::*;
use crate::BytesCodec;
#[ntex::test]
async fn test_debug() {
let (_, server) = Io::create();
let server = Framed::new(server, BytesCodec);
assert!(format!("{:?}", server).contains("Framed"));
}
#[ntex::test]
async fn test_sink() {
let (client, server) = Io::create();
@ -607,4 +639,58 @@ mod tests {
assert!(client.is_closed());
assert!(server.is_closed());
}
#[ntex::test]
async fn test_read_pending() {
let (client, server) = Io::create();
let mut server = Framed::new(server, BytesCodec);
client.read_pending();
assert!(lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.is_pending());
client.write(b"GET /test HTTP/1.1\r\n\r\n");
client.close().await;
let item = lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.unwrap().unwrap().freeze());
assert_eq!(
item,
Poll::Ready(Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n"))
);
let item = lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.is_none());
assert_eq!(item, Poll::Ready(true));
}
#[ntex::test]
async fn test_read_error() {
let (client, server) = Io::create();
let mut server = Framed::new(server, BytesCodec);
client.read_pending();
assert!(lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.is_pending());
client.write(b"GET /test HTTP/1.1\r\n\r\n");
client.read_error(io::Error::new(io::ErrorKind::Other, "error"));
let item = lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.unwrap().unwrap().freeze());
assert_eq!(
item,
Poll::Ready(Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n"))
);
assert_eq!(
lazy(|cx| Pin::new(&mut server).next_item(cx))
.await
.map(|i| i.unwrap().is_err()),
Poll::Ready(true)
);
}
}