Handle io flush during write back-pressure (#246)

This commit is contained in:
Nikolay Kim 2023-11-12 21:19:33 +06:00 committed by GitHub
parent 2a19b7f995
commit c6b26123ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 53 additions and 38 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.3.7] - 2023-11-12
* Handle io flush during write back-pressure
## [0.3.6] - 2023-11-11 ## [0.3.6] - 2023-11-11
* Add support for frame read timeout * Add support for frame read timeout

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.3.6" version = "0.3.7"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-codec = "0.6.2" ntex-codec = "0.6.2"
ntex-bytes = "0.1.20" ntex-bytes = "0.1.21"
ntex-util = "0.3.4" ntex-util = "0.3.4"
ntex-service = "1.2.7" ntex-service = "1.2.7"
@ -29,4 +29,4 @@ pin-project-lite = "0.2"
rand = "0.8" rand = "0.8"
env_logger = "0.10" env_logger = "0.10"
ntex = { version = "0.7.0", features = ["tokio"] } ntex = { version = "0.7", features = ["tokio"] }

View file

@ -31,7 +31,7 @@ impl Default for DispatcherConfig {
fn default() -> Self { fn default() -> Self {
DispatcherConfig(Rc::new(DispatcherConfigInner { DispatcherConfig(Rc::new(DispatcherConfigInner {
keepalive_timeout: Cell::new(Seconds(30).into()), keepalive_timeout: Cell::new(Seconds(30).into()),
disconnect_timeout: Cell::new(Seconds(1).into()), disconnect_timeout: Cell::new(Seconds(1)),
frame_read_rate: Cell::new(0), frame_read_rate: Cell::new(0),
frame_read_enabled: Cell::new(false), frame_read_enabled: Cell::new(false),
frame_read_timeout: Cell::new(Seconds::ZERO.into()), frame_read_timeout: Cell::new(Seconds::ZERO.into()),
@ -92,8 +92,9 @@ impl DispatcherConfig {
/// Set read rate parameters for single frame. /// Set read rate parameters for single frame.
/// ///
/// Set max timeout for reading single frame. If the client sends data, /// Set max timeout for reading single frame. If the client
/// increase the timeout by 1 second for every. /// sends `rate` amount of data, increase the timeout by 1 second for every.
/// But no more than `max_timeout` timeout.
/// ///
/// By default frame read rate is disabled. /// By default frame read rate is disabled.
pub fn set_frame_read_rate( pub fn set_frame_read_rate(
@ -320,8 +321,7 @@ where
loop { loop {
match slf.st { match slf.st {
DispatcherState::Processing => { DispatcherState::Processing => {
let srv = ready!(slf.poll_service(cx)); let item = match ready!(slf.poll_service(cx)) {
let item = match srv {
PollService::Ready => { PollService::Ready => {
// decode incoming bytes if buffer is ready // decode incoming bytes if buffer is ready
match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) { match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) {
@ -386,11 +386,12 @@ where
DispatcherState::Backpressure => { DispatcherState::Backpressure => {
let item = match ready!(slf.poll_service(cx)) { let item = match ready!(slf.poll_service(cx)) {
PollService::Ready => { PollService::Ready => {
if slf.shared.io.poll_flush(cx, false).is_ready() { if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false)) {
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(Some(err))
} else {
slf.st = DispatcherState::Processing; slf.st = DispatcherState::Processing;
DispatchItem::WBackPressureDisabled DispatchItem::WBackPressureDisabled
} else {
return Poll::Pending;
} }
} }
PollService::Item(item) => item, PollService::Item(item) => item,

View file

@ -78,15 +78,15 @@ impl Filter for Base {
if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) { if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
Poll::Ready(ReadStatus::Terminate) Poll::Ready(ReadStatus::Terminate)
} else if flags.intersects(Flags::IO_STOPPING_FILTERS) {
self.0 .0.read_task.register(cx.waker());
Poll::Ready(ReadStatus::Ready)
} else if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) {
self.0 .0.read_task.register(cx.waker());
Poll::Pending
} else { } else {
self.0 .0.read_task.register(cx.waker()); self.0 .0.read_task.register(cx.waker());
Poll::Ready(ReadStatus::Ready) if flags.intersects(Flags::IO_STOPPING_FILTERS) {
Poll::Ready(ReadStatus::Ready)
} else if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) {
Poll::Pending
} else {
Poll::Ready(ReadStatus::Ready)
}
} }
} }

View file

@ -38,13 +38,13 @@ impl ReadContext {
// handle buffer changes // handle buffer changes
if nbytes > 0 { if nbytes > 0 {
let buf_full = nbytes >= hw;
let filter = self.0.filter(); let filter = self.0.filter();
let _ = filter let _ = filter
.process_read_buf(&self.0, &inner.buffer, 0, nbytes) .process_read_buf(&self.0, &inner.buffer, 0, nbytes)
.and_then(|status| { .and_then(|status| {
if status.nbytes > 0 { if status.nbytes > 0 {
if buf_full || inner.buffer.read_destination_size() >= hw { // dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
log::trace!( log::trace!(
"io read buffer is too large {}, enable read back-pressure", "io read buffer is too large {}, enable read back-pressure",
total total
@ -52,19 +52,30 @@ impl ReadContext {
inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
} else { } else {
inner.insert_flags(Flags::RD_READY); inner.insert_flags(Flags::RD_READY);
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
} }
log::trace!("new {} bytes available, wakeup dispatcher", nbytes); log::trace!("new {} bytes available, wakeup dispatcher", nbytes);
inner.dispatch_task.wake(); inner.dispatch_task.wake();
} else if buf_full { } else {
// read task is paused because of read back-pressure if nbytes >= hw {
// but there is no new data in top most read buffer // read task is paused because of read back-pressure
// so we need to wake up read task to read more data // but there is no new data in top most read buffer
// otherwise read task would sleep forever // so we need to wake up read task to read more data
inner.read_task.wake(); // otherwise read task would sleep forever
} else if inner.flags.get().contains(Flags::RD_FORCE_READY) { inner.read_task.wake();
// in case of "force read" we must wake up dispatch task }
// if we read any data from source if inner.flags.get().contains(Flags::RD_FORCE_READY) {
inner.dispatch_task.wake(); // in case of "force read" we must wake up dispatch task
// if we read any data from source
inner.dispatch_task.wake();
}
} }
// while reading, filter wrote some data // while reading, filter wrote some data
@ -78,8 +89,8 @@ impl ReadContext {
}) })
.map_err(|err| { .map_err(|err| {
inner.dispatch_task.wake(); inner.dispatch_task.wake();
inner.insert_flags(Flags::RD_READY);
inner.io_stopped(Some(err)); inner.io_stopped(Some(err));
inner.insert_flags(Flags::RD_READY);
}); });
} }

View file

@ -132,8 +132,8 @@ impl Future for WriteTask {
match this.st { match this.st {
IoWriteState::Processing(ref mut delay) => { IoWriteState::Processing(ref mut delay) => {
match this.state.poll_ready(cx) { match ready!(this.state.poll_ready(cx)) {
Poll::Ready(WriteStatus::Ready) => { WriteStatus::Ready => {
if let Some(delay) = delay { if let Some(delay) = delay {
if delay.poll_elapsed(cx).is_ready() { if delay.poll_elapsed(cx).is_ready() {
this.state.close(Some(io::Error::new( this.state.close(Some(io::Error::new(
@ -157,14 +157,14 @@ impl Future for WriteTask {
} }
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { WriteStatus::Timeout(time) => {
log::trace!("initiate timeout delay for {:?}", time); log::trace!("initiate timeout delay for {:?}", time);
if delay.is_none() { if delay.is_none() {
*delay = Some(sleep(time)); *delay = Some(sleep(time));
} }
self.poll(cx) self.poll(cx)
} }
Poll::Ready(WriteStatus::Shutdown(time)) => { WriteStatus::Shutdown(time) => {
log::trace!("write task is instructed to shutdown"); log::trace!("write task is instructed to shutdown");
let timeout = if let Some(delay) = delay.take() { let timeout = if let Some(delay) = delay.take() {
@ -176,7 +176,7 @@ impl Future for WriteTask {
this.st = IoWriteState::Shutdown(timeout, Shutdown::None); this.st = IoWriteState::Shutdown(timeout, Shutdown::None);
self.poll(cx) self.poll(cx)
} }
Poll::Ready(WriteStatus::Terminate) => { WriteStatus::Terminate => {
log::trace!("write task is instructed to terminate"); log::trace!("write task is instructed to terminate");
if !matches!( if !matches!(
@ -191,7 +191,6 @@ impl Future for WriteTask {
this.state.close(None); this.state.close(None);
Poll::Ready(()) Poll::Ready(())
} }
Poll::Pending => Poll::Pending,
} }
} }
IoWriteState::Shutdown(ref mut delay, ref mut st) => { IoWriteState::Shutdown(ref mut delay, ref mut st) => {

View file

@ -58,7 +58,7 @@ ntex-util = "0.3.4"
ntex-bytes = "0.1.21" ntex-bytes = "0.1.21"
ntex-h2 = "0.4.4" ntex-h2 = "0.4.4"
ntex-rt = "0.4.10" ntex-rt = "0.4.10"
ntex-io = "0.3.6" ntex-io = "0.3.7"
ntex-tls = "0.3.2" ntex-tls = "0.3.2"
ntex-tokio = { version = "0.3.1", optional = true } ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.0", optional = true } ntex-glommio = { version = "0.3.0", optional = true }