diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 9c32c3ac..283f4643 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.6.0] - 2024-10-17 + +* Return error for IoRef::write(), IoRef::with_write_buf(), Io::poll_flush() if io is closed. + ## [2.5.0] - 2024-09-10 * Refactor async io support diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 5c088194..1155b177 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.5.0" +version = "2.6.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 17e8ffb0..a4284124 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -540,7 +540,9 @@ impl Io { let flags = self.flags(); if flags.is_stopped() { - Poll::Ready(self.error().map(Err).unwrap_or(Ok(()))) + Poll::Ready(self.error().map(Err).unwrap_or_else(|| { + Err(io::Error::new(io::ErrorKind::Other, "Disconnected")) + })) } else { let st = self.st(); let len = st.buffer.write_destination_size(); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index c905a8cf..054a1906 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -170,13 +170,7 @@ impl IoRef { #[inline] /// Write bytes to a buffer and wake up write task pub fn write(&self, src: &[u8]) -> io::Result<()> { - let flags = self.0.flags.get(); - - if !flags.intersects(Flags::IO_STOPPING) { - self.with_write_buf(|buf| buf.extend_from_slice(src)) - } else { - Ok(()) - } + self.with_write_buf(|buf| buf.extend_from_slice(src)) } #[inline] @@ -196,9 +190,13 @@ impl IoRef { where F: FnOnce(&mut BytesVec) -> R, { - let result = self.0.buffer.with_write_source(self, f); - self.0.filter().process_write_buf(self, &self.0.buffer, 0)?; - Ok(result) + if self.0.flags.get().contains(Flags::IO_STOPPED) { + Err(io::Error::new(io::ErrorKind::Other, "Disconnected")) + } else { + let result = self.0.buffer.with_write_source(self, f); + self.0.filter().process_write_buf(self, &self.0.buffer, 0)?; + Ok(result) + } } #[inline] @@ -450,6 +448,19 @@ mod tests { assert_eq!(waiter.await, ()); } + #[ntex::test] + async fn write_to_closed_io() { + let (client, server) = IoTest::create(); + let state = Io::new(server); + client.close().await; + + assert!(state.is_closed()); + assert!(state.write(TEXT.as_bytes()).is_err()); + assert!(state + .with_write_buf(|buf| buf.extend_from_slice(BIN)) + .is_err()); + } + #[derive(Debug)] struct Counter { idx: usize,