Use updated Service trait

This commit is contained in:
Nikolay Kim 2024-11-03 22:16:03 +05:00
parent d004234f22
commit 5c64b13a8a
6 changed files with 71 additions and 28 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.5.0] - 2024-11-04
* Use updated Service trait
## [2.3.0] - 2024-07-16 ## [2.3.0] - 2024-07-16
* Add Server to TestServer * Add Server to TestServer

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-server" name = "ntex-server"
version = "2.4.0" version = "2.5.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Server for ntex framework" description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -18,7 +18,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-bytes = "0.1" ntex-bytes = "0.1"
ntex-net = "2" ntex-net = "2"
ntex-service = "3" ntex-service = "3.3"
ntex-rt = "0.4" ntex-rt = "0.4"
ntex-util = "2" ntex-util = "2"

View file

@ -1,4 +1,4 @@
use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll}; use std::{cell::Cell, future::poll_fn, rc::Rc, task};
use ntex_util::task::LocalWaker; use ntex_util::task::LocalWaker;
@ -30,14 +30,29 @@ impl Counter {
CounterGuard::new(self.0.clone()) CounterGuard::new(self.0.clone())
} }
pub(crate) fn is_available(&self) -> bool {
self.0.count.get() < self.0.capacity
}
/// Check if counter is not at capacity. If counter at capacity /// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task. /// it registers notification for current task.
pub(super) async fn available(&self) { pub(crate) async fn available(&self) {
poll_fn(|cx| { poll_fn(|cx| {
if self.0.available(cx) { if self.0.available(cx) {
Poll::Ready(()) task::Poll::Ready(())
} else { } else {
Poll::Pending task::Poll::Pending
}
})
.await
}
pub(crate) async fn unavailable(&self) {
poll_fn(|cx| {
if self.0.available(cx) {
task::Poll::Pending
} else {
task::Poll::Ready(())
} }
}) })
.await .await
@ -72,7 +87,11 @@ impl Drop for CounterGuard {
impl CounterInner { impl CounterInner {
fn inc(&self) { fn inc(&self) {
self.count.set(self.count.get() + 1); let num = self.count.get() + 1;
self.count.set(num);
if num == self.capacity {
self.task.wake();
}
} }
fn dec(&self) { fn dec(&self) {
@ -84,11 +103,7 @@ impl CounterInner {
} }
fn available(&self, cx: &mut task::Context<'_>) -> bool { fn available(&self, cx: &mut task::Context<'_>) -> bool {
if self.count.get() < self.capacity { self.task.register(cx.waker());
true self.count.get() < self.capacity
} else {
self.task.register(cx.waker());
false
}
} }
} }

View file

@ -148,6 +148,7 @@ where
type Response = (); type Response = ();
type Error = (); type Error = ();
ntex_service::forward_notready!(inner);
ntex_service::forward_shutdown!(inner); ntex_service::forward_shutdown!(inner);
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {

View file

@ -1,4 +1,4 @@
use std::fmt; use std::{fmt, future::poll_fn, future::Future, pin::Pin, task::Poll};
use ntex_bytes::{Pool, PoolRef}; use ntex_bytes::{Pool, PoolRef};
use ntex_net::Io; use ntex_net::Io;
@ -152,25 +152,48 @@ impl Service<Connection> for StreamServiceImpl {
type Error = (); type Error = ();
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.conns.available().await; if !self.conns.is_available() {
self.conns.available().await;
}
for (idx, svc) in self.services.iter().enumerate() { for (idx, svc) in self.services.iter().enumerate() {
match ctx.ready(svc).await { if let Err(_) = ctx.ready(svc).await {
Ok(()) => (), for (idx_, tag, _, _) in self.tokens.values() {
Err(_) => { if idx == *idx_ {
for (idx_, tag, _, _) in self.tokens.values() { log::error!("{}: Service readiness has failed", tag);
if idx == *idx_ { break;
log::error!("{}: Service readiness has failed", tag);
break;
}
} }
return Err(());
} }
return Err(());
} }
} }
Ok(()) Ok(())
} }
#[inline]
async fn not_ready(&self) {
let mut futs: Vec<_> = self
.services
.iter()
.map(|s| Box::pin(s.not_ready()))
.collect();
if self.conns.is_available() {
ntex_util::future::select(
self.conns.unavailable(),
poll_fn(move |cx| {
for f in &mut futs {
if Pin::new(f).poll(cx).is_ready() {
return Poll::Ready(());
}
}
Poll::Pending
}),
)
.await;
}
}
async fn shutdown(&self) { async fn shutdown(&self) {
let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await; let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
log::info!( log::info!(

View file

@ -64,14 +64,14 @@ brotli = ["dep:brotli2"]
ntex-codec = "0.6.2" ntex-codec = "0.6.2"
ntex-http = "0.1.12" ntex-http = "0.1.12"
ntex-router = "0.5.3" ntex-router = "0.5.3"
ntex-service = "3.1" ntex-service = "3.3"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "2" ntex-util = "2.5"
ntex-bytes = "0.1.27" ntex-bytes = "0.1.27"
ntex-server = "2.4" ntex-server = "2.5"
ntex-h2 = "1.2" ntex-h2 = "1.2"
ntex-rt = "0.4.19" ntex-rt = "0.4.19"
ntex-io = "2.7" ntex-io = "2.8"
ntex-net = "2.4" ntex-net = "2.4"
ntex-tls = "2.1" ntex-tls = "2.1"