diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 21ddae28..8389d61d 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,10 +1,16 @@ # Changes -## [2.6.1] - 2024-10-17 +## [2.7.0] - 2024-10-10 + +* Do not check readiness for dispatcher call + +* Handle service readiness errors during shutdown in dispatcher + +## [2.6.1] - 2024-10-07 * Return error for Io::poll_read_ready() if io is closed. -## [2.6.0] - 2024-10-17 +## [2.6.0] - 2024-10-07 * Return error for IoRef::write(), IoRef::with_write_buf(), Io::poll_flush() if io is closed. diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 75302c19..8ab6d0a2 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.6.1" +version = "2.7.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -16,8 +16,8 @@ name = "ntex_io" path = "src/lib.rs" [dependencies] -ntex-codec = "0.6.2" -ntex-bytes = "0.1.24" +ntex-codec = "0.6" +ntex-bytes = "0.1" ntex-util = "2.3" ntex-service = "3" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 92299813..f4737e80 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -366,7 +366,11 @@ where // service may relay on poll_ready for response results if !slf.flags.contains(Flags::READY_ERR) { - let _ = slf.shared.service.poll_ready(cx); + if let Poll::Ready(res) = slf.shared.service.poll_ready(cx) { + if res.is_err() { + slf.flags.insert(Flags::READY_ERR); + } + } } if slf.shared.inflight.get() == 0 { @@ -422,7 +426,7 @@ where U: Decoder + Encoder + 'static, { fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem) { - let mut fut = self.shared.service.call(item); + let mut fut = self.shared.service.call_nowait(item); self.shared.inflight.set(self.shared.inflight.get() + 1); // optimize first call @@ -434,7 +438,7 @@ where } } else { let shared = self.shared.clone(); - let _ = spawn(async move { + spawn(async move { let result = fut.await; shared.handle_result(result, &shared.io, true); }); @@ -468,7 +472,7 @@ where // pause io read task Poll::Pending => { log::trace!( - "{}: Service is not ready, register dispatch task", + "{}: Service is not ready, register dispatcher", self.shared.io.tag() ); @@ -610,8 +614,8 @@ mod tests { use ntex_bytes::{Bytes, BytesMut, PoolId, PoolRef}; use ntex_codec::BytesCodec; - use ntex_service::ServiceCtx; - use ntex_util::{time::sleep, time::Millis}; + use ntex_service::{chain, fn_service, ServiceCtx}; + use ntex_util::{channel::condition, time::sleep, time::Millis}; use rand::Rng; use super::*; @@ -862,13 +866,14 @@ mod tests { } let (disp, state) = Dispatcher::debug(server, BytesCodec, Srv(counter.clone())); + spawn(async move { + let _ = disp.await; + }); + state .io() .encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec) .unwrap(); - spawn(async move { - let _ = disp.await; - }); // buffer should be flushed client.remote_buffer_cap(1024); diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index db0da28f..a5b563cd 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -246,15 +246,19 @@ impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> { // SAFETY: `fut` never moves let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) }; match result { - task::Poll::Pending => self.ctx.waiters.register(self.ctx.idx, cx), + task::Poll::Pending => { + self.ctx.waiters.register(self.ctx.idx, cx); + task::Poll::Pending + } task::Poll::Ready(res) => { self.completed = true; self.ctx.waiters.notify(); - return task::Poll::Ready(res); + task::Poll::Ready(res) } } + } else { + task::Poll::Pending } - task::Poll::Pending } } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 9a6dcd3c..a38680ff 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -71,7 +71,7 @@ ntex-bytes = "0.1.27" ntex-server = "2.4" ntex-h2 = "1.1" ntex-rt = "0.4.18" -ntex-io = "2.6" +ntex-io = "2.7" ntex-net = "2.4" ntex-tls = "2.1"