From 715331081c1484aa24dcdbcdc6fd7d04b41bc171 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 20 Jan 2021 23:36:57 +0600 Subject: [PATCH] Flush underlying io stream --- ntex-codec/CHANGES.md | 4 +++ ntex-codec/Cargo.toml | 10 +++---- ntex-codec/src/framed.rs | 60 +++++++++++++++++++++------------------- 3 files changed, 40 insertions(+), 34 deletions(-) diff --git a/ntex-codec/CHANGES.md b/ntex-codec/CHANGES.md index b0c11ab3..cd65e31a 100644 --- a/ntex-codec/CHANGES.md +++ b/ntex-codec/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.2] - 2021-01-21 + +* Flush underlying io stream + ## [0.2.1] - 2020-08-10 * Require `Debug` impl for `Error` diff --git a/ntex-codec/Cargo.toml b/ntex-codec/Cargo.toml index dc43511c..82a2747c 100644 --- a/ntex-codec/Cargo.toml +++ b/ntex-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-codec" -version = "0.2.1" +version = "0.2.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -19,11 +19,11 @@ path = "src/lib.rs" bitflags = "1.2.1" bytes = "0.5.6" either = "1.5.3" -futures-core = "0.3.5" -futures-sink = "0.3.5" +futures-core = "0.3.12" +futures-sink = "0.3.12" log = "0.4" tokio = { version = "0.2.6", default-features=false } [dev-dependencies] -ntex = "0.1.21" -futures = "0.3.5" +ntex = "0.2.0-b.2" +futures = "0.3.12" diff --git a/ntex-codec/src/framed.rs b/ntex-codec/src/framed.rs index 788a6875..e541d592 100644 --- a/ntex-codec/src/framed.rs +++ b/ntex-codec/src/framed.rs @@ -233,41 +233,43 @@ where log::trace!("flushing framed transport"); let len = self.write_buf.len(); - if len == 0 { - return Poll::Ready(Ok(())); - } - - let mut written = 0; - while written < len { - match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) { - Poll::Pending => break, - Poll::Ready(Ok(n)) => { - if n == 0 { - log::trace!("Disconnected during flush, written {}", written); + if len != 0 { + let mut written = 0; + while written < len { + match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) { + Poll::Pending => break, + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("Disconnected during flush, written {}", written); + self.flags.insert(Flags::DISCONNECTED); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))); + } else { + written += n + } + } + Poll::Ready(Err(e)) => { + log::trace!("Error during flush: {}", e); self.flags.insert(Flags::DISCONNECTED); - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))); - } else { - written += n + return Poll::Ready(Err(e)); } } - Poll::Ready(Err(e)) => { - log::trace!("Error during flush: {}", e); - self.flags.insert(Flags::DISCONNECTED); - return Poll::Ready(Err(e)); - } + } + + // remove written data + if written == len { + // flushed same amount as in buffer, we dont need to reallocate + unsafe { self.write_buf.set_len(0) } + } else { + self.write_buf.advance(written); } } - // remove written data - if written == len { - // flushed same amount as in buffer, we dont need to reallocate - unsafe { self.write_buf.set_len(0) } - } else { - self.write_buf.advance(written); - } + // flush + ready!(Pin::new(&mut self.io).poll_flush(cx))?; + if self.write_buf.is_empty() { Poll::Ready(Ok(())) } else {