From 0c61a78d60ad8884c9b1f2123799f4c6eb61e37f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 31 Jan 2023 00:53:36 +0600 Subject: [PATCH] Better handling service readiness (#171) * Better handling service readiness --- ntex-io/CHANGES.md | 6 ++++++ ntex-io/Cargo.toml | 2 +- ntex-io/src/dispatcher.rs | 34 ++++++++++++++++++++++++++-------- ntex-io/src/io.rs | 13 +++++++++++++ ntex/Cargo.toml | 2 +- 5 files changed, 47 insertions(+), 10 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 0ee12871..bcb44341 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.2.9] - 2023-01-31 + +* Register Dispatcher waker when service is not ready + +* Add Io::poll_read_pause() method, pauses read task and check io status + ## [0.2.8] - 2023-01-30 * Check for nested io operations diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 601e008f..69514a7e 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.2.8" +version = "0.2.9" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index d6258891..e6bc6134 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -71,7 +71,7 @@ enum DispatcherError { enum PollService { Item(DispatchItem), - ServiceError, + Continue, Ready, } @@ -244,7 +244,7 @@ where } } PollService::Item(item) => item, - PollService::ServiceError => continue, + PollService::Continue => continue, }; // call service @@ -269,7 +269,7 @@ where } } PollService::Item(item) => item, - PollService::ServiceError => continue, + PollService::Continue => continue, }; // call service @@ -357,7 +357,7 @@ where } DispatcherError::Service(err) => { self.error.set(Some(err)); - PollService::ServiceError + PollService::Continue } } } else { @@ -367,8 +367,27 @@ where // pause io read task Poll::Pending => { log::trace!("service is not ready, register dispatch task"); - io.pause(); - Poll::Pending + match ready!(io.poll_read_pause(cx)) { + IoStatusUpdate::KeepAlive => { + log::trace!("keep-alive error, stopping dispatcher during pause"); + self.st.set(DispatcherState::Stop); + Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout)) + } + IoStatusUpdate::Stop => { + log::trace!("dispatcher is instructed to stop during pause"); + self.st.set(DispatcherState::Stop); + Poll::Ready(PollService::Continue) + } + IoStatusUpdate::PeerGone(err) => { + log::trace!( + "peer is gone during pause, stopping dispatcher: {:?}", + err + ); + self.st.set(DispatcherState::Stop); + Poll::Ready(PollService::Item(DispatchItem::Disconnect(err))) + } + IoStatusUpdate::WriteBackpressure => Poll::Pending, + } } // handle service readiness error Poll::Ready(Err(err)) => { @@ -376,7 +395,7 @@ where self.st.set(DispatcherState::Stop); self.error.set(Some(err)); self.insert_flags(Flags::READY_ERR); - Poll::Ready(PollService::ServiceError) + Poll::Ready(PollService::Continue) } } } @@ -704,7 +723,6 @@ mod tests { #[ntex::test] async fn test_disconnect_during_read_backpressure() { - env_logger::init(); let (client, server) = IoTest::create(); client.remote_buffer_cap(0); diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 50256a10..f84e1ab0 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -536,6 +536,19 @@ impl Io { } } + #[inline] + /// Pause read task + /// + /// Returns status updates + pub fn poll_read_pause(&self, cx: &mut Context<'_>) -> Poll { + self.pause(); + let result = self.poll_status_update(cx); + if !result.is_pending() { + self.0 .0.dispatch_task.register(cx.waker()); + } + result + } + #[inline] /// Wait for status updates pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll { diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 3c6255c4..a896043b 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -58,7 +58,7 @@ ntex-util = "0.2.0" ntex-bytes = "0.1.19" ntex-h2 = "0.2.1" ntex-rt = "0.4.7" -ntex-io = "0.2.8" +ntex-io = "0.2.9" ntex-tls = "0.2.4" ntex-tokio = { version = "0.2.1", optional = true } ntex-glommio = { version = "0.2.1", optional = true }