Use updated Service trait (#453)

This commit is contained in:
Nikolay Kim 2024-11-03 21:27:34 +05:00 committed by GitHub
parent c303d02f89
commit d004234f22
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 43 additions and 28 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [2.8.0] - 2024-11-04
* Use updated Service trait
## [2.7.1] - 2024-10-15
* Disconnect on error from service readiness check

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.7.1"
version = "2.8.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -19,7 +19,7 @@ path = "src/lib.rs"
ntex-codec = "0.6"
ntex-bytes = "0.1"
ntex-util = "2.5"
ntex-service = "3"
ntex-service = "3.3"
bitflags = "2"
log = "0.4"

View file

@ -446,30 +446,38 @@ where
}
}
fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
// check for errors
Poll::Ready(if let Some(err) = self.shared.error.take() {
log::trace!(
"{}: Error occured, stopping dispatcher",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
fn check_error(&mut self) -> PollService<U> {
// check for errors
if let Some(err) = self.shared.error.take() {
log::trace!(
"{}: Error occured, stopping dispatcher",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
match err {
DispatcherError::Encoder(err) => {
PollService::Item(DispatchItem::EncoderError(err))
}
DispatcherError::Service(err) => {
self.error = Some(err);
PollService::Continue
}
}
} else {
PollService::Ready
})
match err {
DispatcherError::Encoder(err) => {
PollService::Item(DispatchItem::EncoderError(err))
}
DispatcherError::Service(err) => {
self.error = Some(err);
PollService::Continue
}
}
} else {
PollService::Ready
}
}
fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
// check service readiness
if self.shared.service.poll_not_ready(cx).is_pending() {
return Poll::Ready(self.check_error());
}
// wait until service becomes ready
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(self.check_error()),
// pause io read task
Poll::Pending => {
log::trace!(
@ -850,13 +858,15 @@ mod tests {
impl Service<DispatchItem<BytesCodec>> for Srv {
type Response = Option<Response<BytesCodec>>;
type Error = ();
type Error = &'static str;
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.0.set(self.0.get() + 1);
Err(())
Err("test")
}
async fn not_ready(&self) {}
async fn call(
&self,
_: DispatchItem<BytesCodec>,
@ -868,7 +878,8 @@ mod tests {
let (disp, state) = Dispatcher::debug(server, BytesCodec, Srv(counter.clone()));
spawn(async move {
let _ = disp.await;
let res = disp.await;
assert_eq!(res, Err("test"));
});
state