mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
Stop write task if io is closed
This commit is contained in:
parent
3edb54ffdf
commit
8f9601d421
10 changed files with 59 additions and 25 deletions
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-io"
|
name = "ntex-io"
|
||||||
version = "2.3.1"
|
version = "2.4.0"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Utilities for encoding and decoding frames"
|
description = "Utilities for encoding and decoding frames"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
|
|
@ -97,25 +97,26 @@ impl Filter for Base {
|
||||||
|
|
||||||
if flags.contains(Flags::IO_STOPPED) {
|
if flags.contains(Flags::IO_STOPPED) {
|
||||||
Poll::Ready(WriteStatus::Terminate)
|
Poll::Ready(WriteStatus::Terminate)
|
||||||
} else if flags.intersects(Flags::IO_STOPPING) {
|
|
||||||
Poll::Ready(WriteStatus::Shutdown(
|
|
||||||
self.0 .0.disconnect_timeout.get().into(),
|
|
||||||
))
|
|
||||||
} else if flags.contains(Flags::IO_STOPPING_FILTERS)
|
|
||||||
&& !flags.contains(Flags::IO_FILTERS_TIMEOUT)
|
|
||||||
{
|
|
||||||
flags.insert(Flags::IO_FILTERS_TIMEOUT);
|
|
||||||
self.0.set_flags(flags);
|
|
||||||
self.0 .0.write_task.register(cx.waker());
|
|
||||||
Poll::Ready(WriteStatus::Timeout(
|
|
||||||
self.0 .0.disconnect_timeout.get().into(),
|
|
||||||
))
|
|
||||||
} else if flags.intersects(Flags::WR_PAUSED) {
|
|
||||||
self.0 .0.write_task.register(cx.waker());
|
|
||||||
Poll::Pending
|
|
||||||
} else {
|
} else {
|
||||||
self.0 .0.write_task.register(cx.waker());
|
self.0 .0.write_task.register(cx.waker());
|
||||||
Poll::Ready(WriteStatus::Ready)
|
|
||||||
|
if flags.intersects(Flags::IO_STOPPING) {
|
||||||
|
Poll::Ready(WriteStatus::Shutdown(
|
||||||
|
self.0 .0.disconnect_timeout.get().into(),
|
||||||
|
))
|
||||||
|
} else if flags.contains(Flags::IO_STOPPING_FILTERS)
|
||||||
|
&& !flags.contains(Flags::IO_FILTERS_TIMEOUT)
|
||||||
|
{
|
||||||
|
flags.insert(Flags::IO_FILTERS_TIMEOUT);
|
||||||
|
self.0.set_flags(flags);
|
||||||
|
Poll::Ready(WriteStatus::Timeout(
|
||||||
|
self.0 .0.disconnect_timeout.get().into(),
|
||||||
|
))
|
||||||
|
} else if flags.intersects(Flags::WR_PAUSED) {
|
||||||
|
Poll::Pending
|
||||||
|
} else {
|
||||||
|
Poll::Ready(WriteStatus::Ready)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,10 @@ impl IoRef {
|
||||||
.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED)
|
.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn is_io_closed(&self) -> bool {
|
||||||
|
self.0.flags.get().intersects(Flags::IO_STOPPED)
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Check if write back-pressure is enabled
|
/// Check if write back-pressure is enabled
|
||||||
pub fn is_wr_backpressure(&self) -> bool {
|
pub fn is_wr_backpressure(&self) -> bool {
|
||||||
|
|
|
@ -296,6 +296,17 @@ impl WriteContext {
|
||||||
self.0.filter().poll_write_ready(cx)
|
self.0.filter().poll_write_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
/// Check if io is closed
|
||||||
|
pub fn poll_close(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
if self.0.is_io_closed() {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
self.0 .0.write_task.register(cx.waker());
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Get write buffer
|
/// Get write buffer
|
||||||
pub fn with_buf<F>(&self, f: F) -> Poll<io::Result<()>>
|
pub fn with_buf<F>(&self, f: F) -> Poll<io::Result<()>>
|
||||||
where
|
where
|
||||||
|
|
|
@ -34,11 +34,11 @@ async-std = ["ntex-rt/async-std", "ntex-async-std"]
|
||||||
ntex-service = "3"
|
ntex-service = "3"
|
||||||
ntex-bytes = "0.1"
|
ntex-bytes = "0.1"
|
||||||
ntex-http = "0.1"
|
ntex-http = "0.1"
|
||||||
ntex-io = "2.3"
|
ntex-io = "2.4"
|
||||||
ntex-rt = "0.4.14"
|
ntex-rt = "0.4.14"
|
||||||
ntex-util = "2"
|
ntex-util = "2"
|
||||||
|
|
||||||
ntex-tokio = { version = "0.5", optional = true }
|
ntex-tokio = { version = "0.5.1", optional = true }
|
||||||
ntex-compio = { version = "0.1", optional = true }
|
ntex-compio = { version = "0.1", optional = true }
|
||||||
ntex-glommio = { version = "0.5", optional = true }
|
ntex-glommio = { version = "0.5", optional = true }
|
||||||
ntex-async-std = { version = "0.5", optional = true }
|
ntex-async-std = { version = "0.5", optional = true }
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.5.1] - 2024-09-06
|
||||||
|
|
||||||
|
* Stop write task if io is closed
|
||||||
|
|
||||||
## [0.4.0] - 2024-01-09
|
## [0.4.0] - 2024-01-09
|
||||||
|
|
||||||
* Log io tags
|
* Log io tags
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-tokio"
|
name = "ntex-tokio"
|
||||||
version = "0.5.0"
|
version = "0.5.1"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "tokio intergration for ntex framework"
|
description = "tokio intergration for ntex framework"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@ -17,7 +17,7 @@ path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
ntex-bytes = "0.1"
|
ntex-bytes = "0.1"
|
||||||
ntex-io = "2.0"
|
ntex-io = "2.4"
|
||||||
ntex-util = "2.0"
|
ntex-util = "2"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] }
|
tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] }
|
||||||
|
|
|
@ -137,6 +137,10 @@ impl Future for WriteTask {
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.as_mut().get_mut();
|
let this = self.as_mut().get_mut();
|
||||||
|
|
||||||
|
if this.state.poll_close(cx).is_ready() {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
|
||||||
match this.st {
|
match this.st {
|
||||||
IoWriteState::Processing(ref mut delay) => {
|
IoWriteState::Processing(ref mut delay) => {
|
||||||
match ready!(this.state.poll_ready(cx)) {
|
match ready!(this.state.poll_ready(cx)) {
|
||||||
|
@ -215,6 +219,9 @@ impl Future for WriteTask {
|
||||||
// close WRITE side and wait for disconnect on read side.
|
// close WRITE side and wait for disconnect on read side.
|
||||||
// use disconnect timeout, otherwise it could hang forever.
|
// use disconnect timeout, otherwise it could hang forever.
|
||||||
loop {
|
loop {
|
||||||
|
if this.state.poll_close(cx).is_ready() {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
match st {
|
match st {
|
||||||
Shutdown::None => {
|
Shutdown::None => {
|
||||||
// flush write buffer
|
// flush write buffer
|
||||||
|
@ -564,6 +571,10 @@ mod unixstream {
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.as_mut().get_mut();
|
let this = self.as_mut().get_mut();
|
||||||
|
|
||||||
|
if this.state.poll_close(cx).is_ready() {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
|
||||||
match this.st {
|
match this.st {
|
||||||
IoWriteState::Processing(ref mut delay) => {
|
IoWriteState::Processing(ref mut delay) => {
|
||||||
match this.state.poll_ready(cx) {
|
match this.state.poll_ready(cx) {
|
||||||
|
@ -630,6 +641,9 @@ mod unixstream {
|
||||||
// close WRITE side and wait for disconnect on read side.
|
// close WRITE side and wait for disconnect on read side.
|
||||||
// use disconnect timeout, otherwise it could hang forever.
|
// use disconnect timeout, otherwise it could hang forever.
|
||||||
loop {
|
loop {
|
||||||
|
if this.state.poll_close(cx).is_ready() {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
match st {
|
match st {
|
||||||
Shutdown::None => {
|
Shutdown::None => {
|
||||||
// flush write buffer
|
// flush write buffer
|
||||||
|
|
|
@ -71,7 +71,7 @@ ntex-bytes = "0.1.27"
|
||||||
ntex-server = "2.3"
|
ntex-server = "2.3"
|
||||||
ntex-h2 = "1.1"
|
ntex-h2 = "1.1"
|
||||||
ntex-rt = "0.4.15"
|
ntex-rt = "0.4.15"
|
||||||
ntex-io = "2.3"
|
ntex-io = "2.4"
|
||||||
ntex-net = "2.1"
|
ntex-net = "2.1"
|
||||||
ntex-tls = "2.1"
|
ntex-tls = "2.1"
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,7 @@ where
|
||||||
///
|
///
|
||||||
/// To disable timeout set value to 0.
|
/// To disable timeout set value to 0.
|
||||||
///
|
///
|
||||||
/// By default disconnect timeout is set to 3 seconds.
|
/// By default disconnect timeout is set to 1 seconds.
|
||||||
pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self {
|
pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self {
|
||||||
self.config.disconnect_timeout(timeout);
|
self.config.disconnect_timeout(timeout);
|
||||||
self
|
self
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue