diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index f2ae2db4..56bcca1b 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.8.0] - 2024-11-04 + +* Use updated Service trait + ## [2.7.1] - 2024-10-15 * Disconnect on error from service readiness check diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index f9012d53..b4f00de0 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.7.1" +version = "2.8.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -19,7 +19,7 @@ path = "src/lib.rs" ntex-codec = "0.6" ntex-bytes = "0.1" ntex-util = "2.5" -ntex-service = "3" +ntex-service = "3.3" bitflags = "2" log = "0.4" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 4cd49ea3..03c73934 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -446,30 +446,38 @@ where } } - fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.shared.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - // check for errors - Poll::Ready(if let Some(err) = self.shared.error.take() { - log::trace!( - "{}: Error occured, stopping dispatcher", - self.shared.io.tag() - ); - self.st = DispatcherState::Stop; + fn check_error(&mut self) -> PollService { + // check for errors + if let Some(err) = self.shared.error.take() { + log::trace!( + "{}: Error occured, stopping dispatcher", + self.shared.io.tag() + ); + self.st = DispatcherState::Stop; - match err { - DispatcherError::Encoder(err) => { - PollService::Item(DispatchItem::EncoderError(err)) - } - DispatcherError::Service(err) => { - self.error = Some(err); - PollService::Continue - } - } - } else { - PollService::Ready - }) + match err { + DispatcherError::Encoder(err) => { + PollService::Item(DispatchItem::EncoderError(err)) + } + DispatcherError::Service(err) => { + self.error = Some(err); + PollService::Continue + } } + } else { + PollService::Ready + } + } + + fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll> { + // check service readiness + if self.shared.service.poll_not_ready(cx).is_pending() { + return Poll::Ready(self.check_error()); + } + + // wait until service becomes ready + match self.shared.service.poll_ready(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(self.check_error()), // pause io read task Poll::Pending => { log::trace!( @@ -850,13 +858,15 @@ mod tests { impl Service> for Srv { type Response = Option>; - type Error = (); + type Error = &'static str; - async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> { + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { self.0.set(self.0.get() + 1); - Err(()) + Err("test") } + async fn not_ready(&self) {} + async fn call( &self, _: DispatchItem, @@ -868,7 +878,8 @@ mod tests { let (disp, state) = Dispatcher::debug(server, BytesCodec, Srv(counter.clone())); spawn(async move { - let _ = disp.await; + let res = disp.await; + assert_eq!(res, Err("test")); }); state