Fix error handing for nested filters

This commit is contained in:
Nikolay Kim 2021-12-28 00:08:35 +06:00
parent 8b3a1bc474
commit 89f758f8c4
15 changed files with 71 additions and 75 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.0-b.8] - 2021-12-28
* Fix error handing for nested filters
## [0.1.0-b.7] - 2021-12-27
* Do not swallow decoded read bytes in case of filter error

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.1.0-b.7"
version = "0.1.0-b.8"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -27,7 +27,7 @@ tokio = ["tok-io/net", "tok-io/rt"]
[dependencies]
ntex-codec = "0.6.0"
ntex-bytes = "0.1.8"
ntex-util = "0.1.4"
ntex-util = "0.1.5"
ntex-service = "0.3.0-b.0"
bitflags = "1.3"

View file

@ -45,19 +45,13 @@ impl Filter for Base {
}
#[inline]
fn want_shutdown(&self) {
let mut flags = self.0.flags();
if !flags.intersects(Flags::IO_ERR | Flags::IO_SHUTDOWN) {
flags.insert(Flags::IO_SHUTDOWN);
self.0.set_flags(flags);
self.0 .0.read_task.wake();
self.0 .0.write_task.wake();
}
fn want_shutdown(&self, err: Option<io::Error>) {
self.0 .0.init_shutdown(err);
}
#[inline]
fn poll_shutdown(&self) -> Poll<io::Result<()>> {
self.want_shutdown();
self.want_shutdown(None);
Poll::Ready(Ok(()))
}
@ -156,7 +150,7 @@ impl Filter for NullFilter {
fn want_read(&self) {}
fn want_shutdown(&self) {}
fn want_shutdown(&self, _: Option<io::Error>) {}
fn poll_shutdown(&self) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))

View file

@ -126,46 +126,37 @@ impl IoState {
#[inline]
/// Gracefully shutdown read and write io tasks
pub(super) fn init_shutdown(&self, cx: Option<&mut Context<'_>>) {
pub(super) fn init_shutdown(&self, err: Option<io::Error>) {
let flags = self.flags.get();
if !flags.intersects(Flags::IO_ERR | Flags::IO_SHUTDOWN | Flags::IO_FILTERS) {
log::trace!("initiate io shutdown {:?}", flags);
log::trace!("initiate io shutdown {:?} {:?}", flags, err);
self.insert_flags(Flags::IO_FILTERS);
if let Err(err) = self.shutdown_filters() {
self.error.set(Some(err));
}
self.read_task.wake();
self.write_task.wake();
if let Some(cx) = cx {
self.dispatch_task.register(cx.waker());
if let Some(err) = err {
self.error.set(Some(err));
}
}
}
#[inline]
pub(super) fn shutdown_filters(&self) -> io::Result<()> {
pub(super) fn shutdown_filters(&self) {
let mut flags = self.flags.get();
if !flags.intersects(Flags::IO_ERR | Flags::IO_SHUTDOWN) {
let result = match self.filter.get().poll_shutdown() {
Poll::Pending => return Ok(()),
match self.filter.get().poll_shutdown() {
Poll::Pending => return,
Poll::Ready(Ok(())) => {
flags.insert(Flags::IO_SHUTDOWN);
Ok(())
}
Poll::Ready(Err(err)) => {
flags.insert(Flags::IO_ERR);
self.dispatch_task.wake();
Err(err)
self.error.set(Some(err));
}
};
}
self.flags.set(flags);
self.read_task.wake();
self.write_task.wake();
result
} else {
Ok(())
self.dispatch_task.wake();
}
}
@ -630,7 +621,7 @@ impl<F> Io<F> {
Poll::Ready(Ok(()))
} else {
if !flags.contains(Flags::IO_FILTERS) {
self.0 .0.init_shutdown(Some(cx));
self.0 .0.init_shutdown(None);
}
if let Some(err) = self.0 .0.error.take() {

View file

@ -55,7 +55,7 @@ pub trait Filter: 'static {
fn want_read(&self);
/// Filter wants gracefully shutdown io stream
fn want_shutdown(&self);
fn want_shutdown(&self, err: Option<sio::Error>);
fn poll_shutdown(&self) -> Poll<sio::Result<()>>;

View file

@ -31,19 +31,16 @@ impl ReadContext {
}
#[inline]
pub fn release_read_buf(&self, buf: BytesMut, nbytes: usize) -> Result<(), io::Error> {
pub fn release_read_buf(&self, buf: BytesMut, nbytes: usize) {
if buf.is_empty() {
self.0.memory_pool().release_read_buf(buf);
Ok(())
} else {
let filter = self.0.filter();
let mut dst = self.0 .0.read_buf.take();
let result = self.0.filter().release_read_buf(buf, &mut dst, nbytes);
let result = filter.release_read_buf(buf, &mut dst, nbytes);
let nbytes = result.as_ref().map(|i| *i).unwrap_or(0);
if let Some(dst) = dst {
if self.0.flags().contains(Flags::IO_FILTERS) {
self.0 .0.shutdown_filters()?;
}
if nbytes > 0 {
if dst.len() > self.0.memory_pool().read_params().high as usize {
log::trace!(
@ -66,11 +63,13 @@ impl ReadContext {
if let Err(err) = result {
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
Err(err)
} else {
Ok(())
filter.want_shutdown(Some(err));
}
}
if self.0.flags().contains(Flags::IO_FILTERS) {
self.0 .0.shutdown_filters();
}
}
}
@ -122,7 +121,7 @@ impl WriteContext {
}
if flags.contains(Flags::IO_FILTERS) {
self.0 .0.shutdown_filters()?;
self.0 .0.shutdown_filters();
}
Ok(())
}

View file

@ -100,10 +100,8 @@ impl Future for ReadTask {
this.state.close(None);
return Poll::Ready(());
}
return if let Err(e) = this.state.release_read_buf(buf, new_bytes) {
this.state.close(Some(e));
Poll::Ready(())
} else if close {
this.state.release_read_buf(buf, new_bytes);
return if close {
this.state.close(None);
Poll::Ready(())
} else if pending {
@ -525,10 +523,8 @@ mod unixstream {
this.state.close(None);
return Poll::Ready(());
}
return if let Err(e) = this.state.release_read_buf(buf, new_bytes) {
this.state.close(Some(e));
Poll::Ready(())
} else if close {
this.state.release_read_buf(buf, new_bytes);
return if close {
this.state.close(None);
Poll::Ready(())
} else if pending {