diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 50a5671a..0ee12871 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.8] - 2023-01-30 + +* Check for nested io operations + ## [0.2.7] - 2023-01-29 * Refactor buffer api diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index b0a0e80c..601e008f 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.2.7" +version = "0.2.8" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/buf.rs b/ntex-io/src/buf.rs index 2723d310..e14720ac 100644 --- a/ntex-io/src/buf.rs +++ b/ntex-io/src/buf.rs @@ -152,6 +152,13 @@ impl Stack { } let result = f(rb.as_mut().unwrap()); + + // check nested updates + if item.0.take().is_some() { + log::error!("Nested read io operation is detected"); + io.force_close(); + } + if let Some(b) = rb { if b.is_empty() { io.memory_pool().release_read_buf(b); @@ -191,6 +198,13 @@ impl Stack { let mut wb = item.1.take(); let result = f(&mut wb); + + // check nested updates + if item.1.take().is_some() { + log::error!("Nested write io operation is detected"); + io.force_close(); + } + if let Some(b) = wb { if b.is_empty() { io.memory_pool().release_write_buf(b); diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 95e518c4..d6258891 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -407,13 +407,10 @@ mod tests { use ntex_bytes::{Bytes, PoolId, PoolRef}; use ntex_codec::BytesCodec; - use ntex_util::future::Ready; - use ntex_util::time::{sleep, Millis}; - - use crate::testing::IoTest; - use crate::{io::Flags, Io, IoRef, IoStream}; + use ntex_util::{future::Ready, time::sleep, time::Millis, time::Seconds}; use super::*; + use crate::{io::Flags, testing::IoTest, Io, IoRef, IoStream}; pub(crate) struct State(IoRef); @@ -705,6 +702,55 @@ mod tests { assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]); } + #[ntex::test] + async fn test_disconnect_during_read_backpressure() { + env_logger::init(); + let (client, server) = IoTest::create(); + client.remote_buffer_cap(0); + + let (disp, state) = Dispatcher::debug( + server, + BytesCodec, + ntex_util::services::inflight::InFlightService::new( + 1, + ntex_service::fn_service(move |msg: DispatchItem| async move { + match msg { + DispatchItem::Item(_) => { + sleep(Millis(500)).await; + return Ok::<_, ()>(None); + } + _ => (), + } + Ok(None) + }), + ), + ); + let disp = disp.keepalive_timeout(Seconds::ZERO); + let pool = PoolId::P10.pool_ref(); + pool.set_read_params(1024, 512); + state.set_memory_pool(pool); + + let (tx, rx) = ntex::channel::oneshot::channel(); + ntex::rt::spawn(async move { + let _ = disp.await; + let _ = tx.send(()); + }); + + let bytes = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(1024) + .map(char::from) + .collect::(); + client.write(bytes.clone()); + sleep(Millis(25)).await; + client.write(bytes); + sleep(Millis(25)).await; + + // close read side + state.close(); + let _ = rx.recv().await; + } + #[ntex::test] async fn test_keepalive() { let (client, server) = IoTest::create(); diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index 7ebe2729..7536d1fb 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -76,6 +76,9 @@ impl Filter for Base { if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) { Poll::Ready(ReadStatus::Terminate) + } else if flags.intersects(Flags::IO_STOPPING_FILTERS) { + self.0 .0.read_task.register(cx.waker()); + Poll::Ready(ReadStatus::Ready) } else if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) { self.0 .0.read_task.register(cx.waker()); Poll::Pending diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 5be0d46e..50256a10 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -350,8 +350,10 @@ impl Io { #[inline] /// Pause read task pub fn pause(&self) { - self.0 .0.read_task.wake(); - self.0 .0.insert_flags(Flags::RD_PAUSED); + if !self.0.flags().contains(Flags::RD_PAUSED) { + self.0 .0.read_task.wake(); + self.0 .0.insert_flags(Flags::RD_PAUSED); + } } #[inline] diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 66205de0..da21cff9 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -196,6 +196,8 @@ impl IoRef { log::debug!("start keep-alive timeout {:?}", timeout); self.0.insert_flags(Flags::KEEPALIVE); self.0.keepalive.set(timer::register(timeout, self)); + } else { + self.0.remove_flags(Flags::KEEPALIVE); } } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index ef4c3c46..3c6255c4 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -58,7 +58,7 @@ ntex-util = "0.2.0" ntex-bytes = "0.1.19" ntex-h2 = "0.2.1" ntex-rt = "0.4.7" -ntex-io = "0.2.7" +ntex-io = "0.2.8" ntex-tls = "0.2.4" ntex-tokio = { version = "0.2.1", optional = true } ntex-glommio = { version = "0.2.1", optional = true }