Do not check readiness for dispatcher call

This commit is contained in:
Nikolay Kim 2024-10-10 18:57:24 +05:00
parent c670b51983
commit d101fbc668
5 changed files with 33 additions and 18 deletions

View file

@ -1,10 +1,16 @@
# Changes # Changes
## [2.6.1] - 2024-10-17 ## [2.7.0] - 2024-10-10
* Do not check readiness for dispatcher call
* Handle service readiness errors during shutdown in dispatcher
## [2.6.1] - 2024-10-07
* Return error for Io::poll_read_ready() if io is closed. * Return error for Io::poll_read_ready() if io is closed.
## [2.6.0] - 2024-10-17 ## [2.6.0] - 2024-10-07
* Return error for IoRef::write(), IoRef::with_write_buf(), Io::poll_flush() if io is closed. * Return error for IoRef::write(), IoRef::with_write_buf(), Io::poll_flush() if io is closed.

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "2.6.1" version = "2.7.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -16,8 +16,8 @@ name = "ntex_io"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
ntex-codec = "0.6.2" ntex-codec = "0.6"
ntex-bytes = "0.1.24" ntex-bytes = "0.1"
ntex-util = "2.3" ntex-util = "2.3"
ntex-service = "3" ntex-service = "3"

View file

@ -366,7 +366,11 @@ where
// service may relay on poll_ready for response results // service may relay on poll_ready for response results
if !slf.flags.contains(Flags::READY_ERR) { if !slf.flags.contains(Flags::READY_ERR) {
let _ = slf.shared.service.poll_ready(cx); if let Poll::Ready(res) = slf.shared.service.poll_ready(cx) {
if res.is_err() {
slf.flags.insert(Flags::READY_ERR);
}
}
} }
if slf.shared.inflight.get() == 0 { if slf.shared.inflight.get() == 0 {
@ -422,7 +426,7 @@ where
U: Decoder + Encoder + 'static, U: Decoder + Encoder + 'static,
{ {
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) { fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
let mut fut = self.shared.service.call(item); let mut fut = self.shared.service.call_nowait(item);
self.shared.inflight.set(self.shared.inflight.get() + 1); self.shared.inflight.set(self.shared.inflight.get() + 1);
// optimize first call // optimize first call
@ -434,7 +438,7 @@ where
} }
} else { } else {
let shared = self.shared.clone(); let shared = self.shared.clone();
let _ = spawn(async move { spawn(async move {
let result = fut.await; let result = fut.await;
shared.handle_result(result, &shared.io, true); shared.handle_result(result, &shared.io, true);
}); });
@ -468,7 +472,7 @@ where
// pause io read task // pause io read task
Poll::Pending => { Poll::Pending => {
log::trace!( log::trace!(
"{}: Service is not ready, register dispatch task", "{}: Service is not ready, register dispatcher",
self.shared.io.tag() self.shared.io.tag()
); );
@ -610,8 +614,8 @@ mod tests {
use ntex_bytes::{Bytes, BytesMut, PoolId, PoolRef}; use ntex_bytes::{Bytes, BytesMut, PoolId, PoolRef};
use ntex_codec::BytesCodec; use ntex_codec::BytesCodec;
use ntex_service::ServiceCtx; use ntex_service::{chain, fn_service, ServiceCtx};
use ntex_util::{time::sleep, time::Millis}; use ntex_util::{channel::condition, time::sleep, time::Millis};
use rand::Rng; use rand::Rng;
use super::*; use super::*;
@ -862,13 +866,14 @@ mod tests {
} }
let (disp, state) = Dispatcher::debug(server, BytesCodec, Srv(counter.clone())); let (disp, state) = Dispatcher::debug(server, BytesCodec, Srv(counter.clone()));
spawn(async move {
let _ = disp.await;
});
state state
.io() .io()
.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec) .encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec)
.unwrap(); .unwrap();
spawn(async move {
let _ = disp.await;
});
// buffer should be flushed // buffer should be flushed
client.remote_buffer_cap(1024); client.remote_buffer_cap(1024);

View file

@ -246,15 +246,19 @@ impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> {
// SAFETY: `fut` never moves // SAFETY: `fut` never moves
let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) }; let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) };
match result { match result {
task::Poll::Pending => self.ctx.waiters.register(self.ctx.idx, cx), task::Poll::Pending => {
self.ctx.waiters.register(self.ctx.idx, cx);
task::Poll::Pending
}
task::Poll::Ready(res) => { task::Poll::Ready(res) => {
self.completed = true; self.completed = true;
self.ctx.waiters.notify(); self.ctx.waiters.notify();
return task::Poll::Ready(res); task::Poll::Ready(res)
} }
} }
} else {
task::Poll::Pending
} }
task::Poll::Pending
} }
} }

View file

@ -71,7 +71,7 @@ ntex-bytes = "0.1.27"
ntex-server = "2.4" ntex-server = "2.4"
ntex-h2 = "1.1" ntex-h2 = "1.1"
ntex-rt = "0.4.18" ntex-rt = "0.4.18"
ntex-io = "2.6" ntex-io = "2.7"
ntex-net = "2.4" ntex-net = "2.4"
ntex-tls = "2.1" ntex-tls = "2.1"