diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index b22886eb..d9f4a63a 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.7] - 2023-11-12 + +* Handle io flush during write back-pressure + ## [0.3.6] - 2023-11-11 * Add support for frame read timeout diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 4290dc4c..7dce0d56 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.6" +version = "0.3.7" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.2" -ntex-bytes = "0.1.20" +ntex-bytes = "0.1.21" ntex-util = "0.3.4" ntex-service = "1.2.7" @@ -29,4 +29,4 @@ pin-project-lite = "0.2" rand = "0.8" env_logger = "0.10" -ntex = { version = "0.7.0", features = ["tokio"] } +ntex = { version = "0.7", features = ["tokio"] } diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 84b8e5d7..5a4c34b7 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -31,7 +31,7 @@ impl Default for DispatcherConfig { fn default() -> Self { DispatcherConfig(Rc::new(DispatcherConfigInner { keepalive_timeout: Cell::new(Seconds(30).into()), - disconnect_timeout: Cell::new(Seconds(1).into()), + disconnect_timeout: Cell::new(Seconds(1)), frame_read_rate: Cell::new(0), frame_read_enabled: Cell::new(false), frame_read_timeout: Cell::new(Seconds::ZERO.into()), @@ -92,8 +92,9 @@ impl DispatcherConfig { /// Set read rate parameters for single frame. /// - /// Set max timeout for reading single frame. If the client sends data, - /// increase the timeout by 1 second for every. + /// Set max timeout for reading single frame. If the client + /// sends `rate` amount of data, increase the timeout by 1 second for every. + /// But no more than `max_timeout` timeout. /// /// By default frame read rate is disabled. pub fn set_frame_read_rate( @@ -320,8 +321,7 @@ where loop { match slf.st { DispatcherState::Processing => { - let srv = ready!(slf.poll_service(cx)); - let item = match srv { + let item = match ready!(slf.poll_service(cx)) { PollService::Ready => { // decode incoming bytes if buffer is ready match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) { @@ -386,11 +386,12 @@ where DispatcherState::Backpressure => { let item = match ready!(slf.poll_service(cx)) { PollService::Ready => { - if slf.shared.io.poll_flush(cx, false).is_ready() { + if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false)) { + slf.st = DispatcherState::Stop; + DispatchItem::Disconnect(Some(err)) + } else { slf.st = DispatcherState::Processing; DispatchItem::WBackPressureDisabled - } else { - return Poll::Pending; } } PollService::Item(item) => item, diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index 970b5c74..a25700d3 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -78,15 +78,15 @@ impl Filter for Base { if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) { Poll::Ready(ReadStatus::Terminate) - } else if flags.intersects(Flags::IO_STOPPING_FILTERS) { - self.0 .0.read_task.register(cx.waker()); - Poll::Ready(ReadStatus::Ready) - } else if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) { - self.0 .0.read_task.register(cx.waker()); - Poll::Pending } else { self.0 .0.read_task.register(cx.waker()); - Poll::Ready(ReadStatus::Ready) + if flags.intersects(Flags::IO_STOPPING_FILTERS) { + Poll::Ready(ReadStatus::Ready) + } else if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) { + Poll::Pending + } else { + Poll::Ready(ReadStatus::Ready) + } } } diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 8c6edf17..e2bbff07 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -38,13 +38,13 @@ impl ReadContext { // handle buffer changes if nbytes > 0 { - let buf_full = nbytes >= hw; let filter = self.0.filter(); let _ = filter .process_read_buf(&self.0, &inner.buffer, 0, nbytes) .and_then(|status| { if status.nbytes > 0 { - if buf_full || inner.buffer.read_destination_size() >= hw { + // dest buffer has new data, wake up dispatcher + if inner.buffer.read_destination_size() >= hw { log::trace!( "io read buffer is too large {}, enable read back-pressure", total @@ -52,19 +52,30 @@ impl ReadContext { inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); } else { inner.insert_flags(Flags::RD_READY); + + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } } log::trace!("new {} bytes available, wakeup dispatcher", nbytes); inner.dispatch_task.wake(); - } else if buf_full { - // read task is paused because of read back-pressure - // but there is no new data in top most read buffer - // so we need to wake up read task to read more data - // otherwise read task would sleep forever - inner.read_task.wake(); - } else if inner.flags.get().contains(Flags::RD_FORCE_READY) { - // in case of "force read" we must wake up dispatch task - // if we read any data from source - inner.dispatch_task.wake(); + } else { + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + if inner.flags.get().contains(Flags::RD_FORCE_READY) { + // in case of "force read" we must wake up dispatch task + // if we read any data from source + inner.dispatch_task.wake(); + } } // while reading, filter wrote some data @@ -78,8 +89,8 @@ impl ReadContext { }) .map_err(|err| { inner.dispatch_task.wake(); - inner.insert_flags(Flags::RD_READY); inner.io_stopped(Some(err)); + inner.insert_flags(Flags::RD_READY); }); } diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index 982e215c..8a167a46 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -132,8 +132,8 @@ impl Future for WriteTask { match this.st { IoWriteState::Processing(ref mut delay) => { - match this.state.poll_ready(cx) { - Poll::Ready(WriteStatus::Ready) => { + match ready!(this.state.poll_ready(cx)) { + WriteStatus::Ready => { if let Some(delay) = delay { if delay.poll_elapsed(cx).is_ready() { this.state.close(Some(io::Error::new( @@ -157,14 +157,14 @@ impl Future for WriteTask { } } } - Poll::Ready(WriteStatus::Timeout(time)) => { + WriteStatus::Timeout(time) => { log::trace!("initiate timeout delay for {:?}", time); if delay.is_none() { *delay = Some(sleep(time)); } self.poll(cx) } - Poll::Ready(WriteStatus::Shutdown(time)) => { + WriteStatus::Shutdown(time) => { log::trace!("write task is instructed to shutdown"); let timeout = if let Some(delay) = delay.take() { @@ -176,7 +176,7 @@ impl Future for WriteTask { this.st = IoWriteState::Shutdown(timeout, Shutdown::None); self.poll(cx) } - Poll::Ready(WriteStatus::Terminate) => { + WriteStatus::Terminate => { log::trace!("write task is instructed to terminate"); if !matches!( @@ -191,7 +191,6 @@ impl Future for WriteTask { this.state.close(None); Poll::Ready(()) } - Poll::Pending => Poll::Pending, } } IoWriteState::Shutdown(ref mut delay, ref mut st) => { diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 3fbe6926..5bb49710 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -58,7 +58,7 @@ ntex-util = "0.3.4" ntex-bytes = "0.1.21" ntex-h2 = "0.4.4" ntex-rt = "0.4.10" -ntex-io = "0.3.6" +ntex-io = "0.3.7" ntex-tls = "0.3.2" ntex-tokio = { version = "0.3.1", optional = true } ntex-glommio = { version = "0.3.0", optional = true }