improve tokio streams support

This commit is contained in:
Nikolay Kim 2021-12-28 11:14:29 +06:00
parent deaa2656ff
commit d23bb69fff
2 changed files with 29 additions and 9 deletions

View file

@ -4,6 +4,8 @@
* Fix error handing for nested filters
* Improve tokio streams support
## [0.1.0-b.7] - 2021-12-27
* Do not swallow decoded read bytes in case of filter error

View file

@ -81,7 +81,7 @@ impl Future for ReadTask {
close = true;
} else {
new_bytes += n;
if new_bytes < hw {
if new_bytes <= hw {
continue;
}
}
@ -129,7 +129,7 @@ enum IoWriteState {
enum Shutdown {
None,
Flushed,
Stopping,
Stopping(u16),
}
/// Write io task
@ -230,7 +230,7 @@ impl Future for WriteTask {
// shutdown WRITE side
match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) {
Poll::Ready(Ok(_)) => {
*st = Shutdown::Stopping;
*st = Shutdown::Stopping(0);
continue;
}
Poll::Ready(Err(e)) => {
@ -243,7 +243,7 @@ impl Future for WriteTask {
_ => (),
}
}
Shutdown::Stopping => {
Shutdown::Stopping(ref mut count) => {
// read until 0 or err
let mut buf = [0u8; 512];
let mut io = this.io.borrow_mut();
@ -257,7 +257,16 @@ impl Future for WriteTask {
log::trace!("write task is stopped");
return Poll::Ready(());
}
Poll::Pending => break,
Poll::Pending => {
*count += read_buf.filled().len() as u16;
if *count > 8196 {
log::trace!(
"write task is stopped, too much input"
);
return Poll::Ready(());
}
break;
}
_ => (),
}
}
@ -502,7 +511,7 @@ mod unixstream {
close = true;
} else {
new_bytes += n;
if new_bytes < hw {
if new_bytes <= hw {
continue;
}
}
@ -639,7 +648,7 @@ mod unixstream {
match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx)
{
Poll::Ready(Ok(_)) => {
*st = Shutdown::Stopping;
*st = Shutdown::Stopping(0);
continue;
}
Poll::Ready(Err(e)) => {
@ -652,7 +661,7 @@ mod unixstream {
_ => (),
}
}
Shutdown::Stopping => {
Shutdown::Stopping(ref mut count) => {
// read until 0 or err
let mut buf = [0u8; 512];
let mut io = this.io.borrow_mut();
@ -666,7 +675,16 @@ mod unixstream {
log::trace!("write task is stopped");
return Poll::Ready(());
}
Poll::Pending => break,
Poll::Pending => {
*count += read_buf.filled().len() as u16;
if *count > 8196 {
log::trace!(
"write task is stopped, too much input"
);
return Poll::Ready(());
}
break;
}
_ => (),
}
}