diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 46027804..7e53228c 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -4,6 +4,8 @@ * Fix error handing for nested filters +* Improve tokio streams support + ## [0.1.0-b.7] - 2021-12-27 * Do not swallow decoded read bytes in case of filter error diff --git a/ntex-io/src/tokio_impl.rs b/ntex-io/src/tokio_impl.rs index 911b78a5..df357521 100644 --- a/ntex-io/src/tokio_impl.rs +++ b/ntex-io/src/tokio_impl.rs @@ -81,7 +81,7 @@ impl Future for ReadTask { close = true; } else { new_bytes += n; - if new_bytes < hw { + if new_bytes <= hw { continue; } } @@ -129,7 +129,7 @@ enum IoWriteState { enum Shutdown { None, Flushed, - Stopping, + Stopping(u16), } /// Write io task @@ -230,7 +230,7 @@ impl Future for WriteTask { // shutdown WRITE side match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) { Poll::Ready(Ok(_)) => { - *st = Shutdown::Stopping; + *st = Shutdown::Stopping(0); continue; } Poll::Ready(Err(e)) => { @@ -243,7 +243,7 @@ impl Future for WriteTask { _ => (), } } - Shutdown::Stopping => { + Shutdown::Stopping(ref mut count) => { // read until 0 or err let mut buf = [0u8; 512]; let mut io = this.io.borrow_mut(); @@ -257,7 +257,16 @@ impl Future for WriteTask { log::trace!("write task is stopped"); return Poll::Ready(()); } - Poll::Pending => break, + Poll::Pending => { + *count += read_buf.filled().len() as u16; + if *count > 8196 { + log::trace!( + "write task is stopped, too much input" + ); + return Poll::Ready(()); + } + break; + } _ => (), } } @@ -502,7 +511,7 @@ mod unixstream { close = true; } else { new_bytes += n; - if new_bytes < hw { + if new_bytes <= hw { continue; } } @@ -639,7 +648,7 @@ mod unixstream { match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) { Poll::Ready(Ok(_)) => { - *st = Shutdown::Stopping; + *st = Shutdown::Stopping(0); continue; } Poll::Ready(Err(e)) => { @@ -652,7 +661,7 @@ mod unixstream { _ => (), } } - Shutdown::Stopping => { + Shutdown::Stopping(ref mut count) => { // read until 0 or err let mut buf = [0u8; 512]; let mut io = this.io.borrow_mut(); @@ -666,7 +675,16 @@ mod unixstream { log::trace!("write task is stopped"); return Poll::Ready(()); } - Poll::Pending => break, + Poll::Pending => { + *count += read_buf.filled().len() as u16; + if *count > 8196 { + log::trace!( + "write task is stopped, too much input" + ); + return Poll::Ready(()); + } + break; + } _ => (), } }