diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 5c495ff7..ea80abf1 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -2,6 +2,10 @@ ## [0.1.0-b.10] - 2021-12-30 +* IoRef::close() method initiates io stream shutdown + +* IoRef::force_close() method terminates io stream + * Cleanup Filter trait, removed closed,want_read,want_shutdown methods * Cleanup internal flags on io error diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 0f9b33b6..669fba23 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -133,6 +133,7 @@ impl IoState { self.read_task.wake(); self.write_task.wake(); self.dispatch_task.wake(); + self.shutdown_filters(); } } @@ -153,7 +154,19 @@ impl IoState { Poll::Ready(Err(err)) => { self.io_stopped(Some(err)); } - Poll::Pending => (), + Poll::Pending => { + let flags = self.flags.get(); + // check read buffer, if buffer is not consumed it is unlikely + // that filter will properly complete shutdown + if flags.contains(Flags::RD_PAUSED) + || flags.contains(Flags::RD_BUF_FULL | Flags::RD_READY) + { + self.read_task.wake(); + self.write_task.wake(); + self.dispatch_task.wake(); + self.insert_flags(Flags::IO_STOPPING); + } + } } } } diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 84f36b2c..59024b86 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -77,19 +77,25 @@ impl IoRef { #[inline] /// Gracefully close connection /// - /// First stop dispatcher, then dispatcher stops io tasks + /// Notify dispatcher and initiate io stream shutdown process pub fn close(&self) { self.0.insert_flags(Flags::DSP_STOP); - self.0.dispatch_task.wake(); + self.0.init_shutdown(None); } #[inline] /// Force close connection /// - /// Dispatcher does not wait for uncompleted responses, but flushes io buffers. + /// Dispatcher does not wait for uncompleted responses. Io stream get terminated + /// without any graceful period. pub fn force_close(&self) { log::trace!("force close io stream object"); - self.0.insert_flags(Flags::DSP_STOP | Flags::IO_STOPPING); + self.0.insert_flags( + Flags::DSP_STOP + | Flags::IO_STOPPED + | Flags::IO_STOPPING + | Flags::IO_STOPPING_FILTERS, + ); self.0.read_task.wake(); self.0.write_task.wake(); self.0.dispatch_task.wake();