Flush underlying io stream

This commit is contained in:
Nikolay Kim 2021-01-20 23:36:57 +06:00
parent 79c4a34dbc
commit 715331081c
3 changed files with 40 additions and 34 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.2] - 2021-01-21
* Flush underlying io stream
## [0.2.1] - 2020-08-10
* Require `Debug` impl for `Error`

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-codec"
version = "0.2.1"
version = "0.2.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -19,11 +19,11 @@ path = "src/lib.rs"
bitflags = "1.2.1"
bytes = "0.5.6"
either = "1.5.3"
futures-core = "0.3.5"
futures-sink = "0.3.5"
futures-core = "0.3.12"
futures-sink = "0.3.12"
log = "0.4"
tokio = { version = "0.2.6", default-features=false }
[dev-dependencies]
ntex = "0.1.21"
futures = "0.3.5"
ntex = "0.2.0-b.2"
futures = "0.3.12"

View file

@ -233,41 +233,43 @@ where
log::trace!("flushing framed transport");
let len = self.write_buf.len();
if len == 0 {
return Poll::Ready(Ok(()));
}
let mut written = 0;
while written < len {
match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
if len != 0 {
let mut written = 0;
while written < len {
match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)));
} else {
written += n
}
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)));
} else {
written += n
return Poll::Ready(Err(e));
}
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(e));
}
}
// remove written data
if written == len {
// flushed same amount as in buffer, we dont need to reallocate
unsafe { self.write_buf.set_len(0) }
} else {
self.write_buf.advance(written);
}
}
// remove written data
if written == len {
// flushed same amount as in buffer, we dont need to reallocate
unsafe { self.write_buf.set_len(0) }
} else {
self.write_buf.advance(written);
}
// flush
ready!(Pin::new(&mut self.io).poll_flush(cx))?;
if self.write_buf.is_empty() {
Poll::Ready(Ok(()))
} else {