Return error for IoRef::write(), IoRef::with_write_buf(), Io::poll_flush() if io is closed

This commit is contained in:
Nikolay Kim 2024-10-07 20:10:40 +02:00
parent effce6915f
commit 21f84dafd5
4 changed files with 29 additions and 12 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.6.0] - 2024-10-17
* Return error for IoRef::write(), IoRef::with_write_buf(), Io::poll_flush() if io is closed.
## [2.5.0] - 2024-09-10 ## [2.5.0] - 2024-09-10
* Refactor async io support * Refactor async io support

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "2.5.0" version = "2.6.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -540,7 +540,9 @@ impl<F> Io<F> {
let flags = self.flags(); let flags = self.flags();
if flags.is_stopped() { if flags.is_stopped() {
Poll::Ready(self.error().map(Err).unwrap_or(Ok(()))) Poll::Ready(self.error().map(Err).unwrap_or_else(|| {
Err(io::Error::new(io::ErrorKind::Other, "Disconnected"))
}))
} else { } else {
let st = self.st(); let st = self.st();
let len = st.buffer.write_destination_size(); let len = st.buffer.write_destination_size();

View file

@ -170,13 +170,7 @@ impl IoRef {
#[inline] #[inline]
/// Write bytes to a buffer and wake up write task /// Write bytes to a buffer and wake up write task
pub fn write(&self, src: &[u8]) -> io::Result<()> { pub fn write(&self, src: &[u8]) -> io::Result<()> {
let flags = self.0.flags.get(); self.with_write_buf(|buf| buf.extend_from_slice(src))
if !flags.intersects(Flags::IO_STOPPING) {
self.with_write_buf(|buf| buf.extend_from_slice(src))
} else {
Ok(())
}
} }
#[inline] #[inline]
@ -196,9 +190,13 @@ impl IoRef {
where where
F: FnOnce(&mut BytesVec) -> R, F: FnOnce(&mut BytesVec) -> R,
{ {
let result = self.0.buffer.with_write_source(self, f); if self.0.flags.get().contains(Flags::IO_STOPPED) {
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?; Err(io::Error::new(io::ErrorKind::Other, "Disconnected"))
Ok(result) } else {
let result = self.0.buffer.with_write_source(self, f);
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?;
Ok(result)
}
} }
#[inline] #[inline]
@ -450,6 +448,19 @@ mod tests {
assert_eq!(waiter.await, ()); assert_eq!(waiter.await, ());
} }
#[ntex::test]
async fn write_to_closed_io() {
let (client, server) = IoTest::create();
let state = Io::new(server);
client.close().await;
assert!(state.is_closed());
assert!(state.write(TEXT.as_bytes()).is_err());
assert!(state
.with_write_buf(|buf| buf.extend_from_slice(BIN))
.is_err());
}
#[derive(Debug)] #[derive(Debug)]
struct Counter { struct Counter {
idx: usize, idx: usize,