Fix flush framed write task

This commit is contained in:
Nikolay Kim 2021-01-20 18:17:33 +06:00
parent a501712bc0
commit 79c4a34dbc
4 changed files with 38 additions and 31 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.0-b.2] - 2021-01-20
* Fix flush framed write task
## [0.2.0-b.1] - 2021-01-19
* Introduce ntex::framed module

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.2.0-b.1"
version = "0.2.0-b.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -433,7 +433,6 @@ mod tests {
use bytes::Bytes;
use futures::future::FutureExt;
use crate::channel::condition::Condition;
use crate::codec::BytesCodec;
use crate::rt::time::delay_for;
use crate::testing::Io;

View file

@ -178,46 +178,50 @@ pub(super) fn flush<T>(
io: &mut T,
buf: &mut BytesMut,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>>
) -> Poll<io::Result<()>>
where
T: AsyncRead + AsyncWrite + Unpin,
{
// log::trace!("flushing framed transport: {}", len);
let len = buf.len();
if len == 0 {
return Poll::Ready(Ok(()));
}
let mut written = 0;
while written < len {
match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)));
} else {
written += n
if len != 0 {
let mut written = 0;
while written < len {
match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)));
} else {
written += n
}
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
return Poll::Ready(Err(e));
}
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
return Poll::Ready(Err(e));
}
}
// log::trace!("flushed {} bytes", written);
// remove written data
if written == len {
// flushed same amount as in buffer, we dont need to reallocate
unsafe { buf.set_len(0) }
} else {
buf.advance(written);
}
}
// log::trace!("flushed {} bytes", written);
// remove written data
if written == len {
// flushed same amount as in buffer, we dont need to reallocate
unsafe { buf.set_len(0) }
} else {
buf.advance(written);
}
// flush
futures::ready!(Pin::new(&mut *io).poll_flush(cx))?;
if buf.is_empty() {
Poll::Ready(Ok(()))
} else {