Check service readiness once per decoded item

This commit is contained in:
Nikolay Kim 2024-11-10 14:33:19 +05:00
parent 0a376457f1
commit e7c7526908
3 changed files with 21 additions and 7 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.8.3] - 2024-11-10
* Check service readiness once per decoded item
## [2.8.2] - 2024-11-05 ## [2.8.2] - 2024-11-05
* Do not rely on not_ready(), always check service readiness * Do not rely on not_ready(), always check service readiness

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "2.8.2" version = "2.8.3"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -126,11 +126,11 @@ pin_project_lite::pin_project! {
bitflags::bitflags! { bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 { struct Flags: u8 {
const READY_ERR = 0b00001; const READY_ERR = 0b000001;
const IO_ERR = 0b00010; const IO_ERR = 0b000010;
const KA_ENABLED = 0b00100; const KA_ENABLED = 0b000100;
const KA_TIMEOUT = 0b01000; const KA_TIMEOUT = 0b001000;
const READ_TIMEOUT = 0b10000; const READ_TIMEOUT = 0b010000;
const READY = 0b100000; const READY = 0b100000;
} }
} }
@ -342,6 +342,7 @@ where
PollService::Continue => continue, PollService::Continue => continue,
}; };
slf.flags.remove(Flags::READY);
slf.call_service(cx, item); slf.call_service(cx, item);
} }
// handle write back-pressure // handle write back-pressure
@ -471,9 +472,18 @@ where
} }
fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> { fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
if self.flags.contains(Flags::READY) {
if self.shared.service.poll_not_ready(cx).is_ready() {
self.flags.remove(Flags::READY);
} else {
return Poll::Ready(self.check_error());
}
}
// wait until service becomes ready // wait until service becomes ready
match self.shared.service.poll_ready(cx) { match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => { Poll::Ready(Ok(_)) => {
self.flags.insert(Flags::READY);
let _ = self.shared.service.poll_not_ready(cx); let _ = self.shared.service.poll_not_ready(cx);
Poll::Ready(self.check_error()) Poll::Ready(self.check_error())
} }