Better handling service readiness (#171)

* Better handling service readiness
This commit is contained in:
Nikolay Kim 2023-01-31 00:53:36 +06:00 committed by GitHub
parent 7f3efca56b
commit 0c61a78d60
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 10 deletions

View file

@ -1,5 +1,11 @@
# Changes
## [0.2.9] - 2023-01-31
* Register Dispatcher waker when service is not ready
* Add Io::poll_read_pause() method, pauses read task and check io status
## [0.2.8] - 2023-01-30
* Check for nested io operations

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.2.8"
version = "0.2.9"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -71,7 +71,7 @@ enum DispatcherError<S, U> {
enum PollService<U: Encoder + Decoder> {
Item(DispatchItem<U>),
ServiceError,
Continue,
Ready,
}
@ -244,7 +244,7 @@ where
}
}
PollService::Item(item) => item,
PollService::ServiceError => continue,
PollService::Continue => continue,
};
// call service
@ -269,7 +269,7 @@ where
}
}
PollService::Item(item) => item,
PollService::ServiceError => continue,
PollService::Continue => continue,
};
// call service
@ -357,7 +357,7 @@ where
}
DispatcherError::Service(err) => {
self.error.set(Some(err));
PollService::ServiceError
PollService::Continue
}
}
} else {
@ -367,8 +367,27 @@ where
// pause io read task
Poll::Pending => {
log::trace!("service is not ready, register dispatch task");
io.pause();
Poll::Pending
match ready!(io.poll_read_pause(cx)) {
IoStatusUpdate::KeepAlive => {
log::trace!("keep-alive error, stopping dispatcher during pause");
self.st.set(DispatcherState::Stop);
Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout))
}
IoStatusUpdate::Stop => {
log::trace!("dispatcher is instructed to stop during pause");
self.st.set(DispatcherState::Stop);
Poll::Ready(PollService::Continue)
}
IoStatusUpdate::PeerGone(err) => {
log::trace!(
"peer is gone during pause, stopping dispatcher: {:?}",
err
);
self.st.set(DispatcherState::Stop);
Poll::Ready(PollService::Item(DispatchItem::Disconnect(err)))
}
IoStatusUpdate::WriteBackpressure => Poll::Pending,
}
}
// handle service readiness error
Poll::Ready(Err(err)) => {
@ -376,7 +395,7 @@ where
self.st.set(DispatcherState::Stop);
self.error.set(Some(err));
self.insert_flags(Flags::READY_ERR);
Poll::Ready(PollService::ServiceError)
Poll::Ready(PollService::Continue)
}
}
}
@ -704,7 +723,6 @@ mod tests {
#[ntex::test]
async fn test_disconnect_during_read_backpressure() {
env_logger::init();
let (client, server) = IoTest::create();
client.remote_buffer_cap(0);

View file

@ -536,6 +536,19 @@ impl<F> Io<F> {
}
}
#[inline]
/// Pause read task
///
/// Returns status updates
pub fn poll_read_pause(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
self.pause();
let result = self.poll_status_update(cx);
if !result.is_pending() {
self.0 .0.dispatch_task.register(cx.waker());
}
result
}
#[inline]
/// Wait for status updates
pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {

View file

@ -58,7 +58,7 @@ ntex-util = "0.2.0"
ntex-bytes = "0.1.19"
ntex-h2 = "0.2.1"
ntex-rt = "0.4.7"
ntex-io = "0.2.8"
ntex-io = "0.2.9"
ntex-tls = "0.2.4"
ntex-tokio = { version = "0.2.1", optional = true }
ntex-glommio = { version = "0.2.1", optional = true }