diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 3ddfa4a4..ff95c38e 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.2] - 2024-03-31 + +* Add IoRef::is_wr_backpressure() method + ## [1.0.1] - 2024-02-05 * Add IoBoxed::take() method diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index aa77ff42..a75780cf 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "1.0.1" +version = "1.0.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,10 +18,10 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.2" ntex-bytes = "0.1.24" -ntex-util = "1.0.0" -ntex-service = "2.0.0" +ntex-util = "1.0" +ntex-service = "2.0" -bitflags = "2.4" +bitflags = "2" log = "0.4" pin-project-lite = "0.2" diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index a25700d3..b06048cf 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -134,12 +134,16 @@ impl Filter for Base { s.with_write_destination(io, |buf| { if let Some(buf) = buf { let len = buf.len(); - if len > 0 && self.0.flags().contains(Flags::WR_PAUSED) { + let flags = self.0.flags(); + if len > 0 && flags.contains(Flags::WR_PAUSED) { self.0 .0.remove_flags(Flags::WR_PAUSED); self.0 .0.write_task.wake(); } - if len >= self.0.memory_pool().write_params_high() { + if len >= self.0.memory_pool().write_params_high() + && !flags.contains(Flags::WR_BACKPRESSURE) + { self.0 .0.insert_flags(Flags::WR_BACKPRESSURE); + self.0 .0.dispatch_task.wake(); } } }); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index f553d62e..f938137c 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -41,6 +41,12 @@ impl IoRef { .intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) } + #[inline] + /// Check if write back-pressure is enabled + pub fn is_wr_backpressure(&self) -> bool { + self.0.flags.get().contains(Flags::WR_BACKPRESSURE) + } + #[inline] /// Wake dispatcher task pub fn wake(&self) {