Allow to set io tag for web server (#426)

This commit is contained in:
Nikolay Kim 2024-09-24 13:44:47 +05:00 committed by GitHub
parent b50aa31fdc
commit 302e7951d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 51 additions and 18 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-server" name = "ntex-server"
version = "2.3.0" version = "2.4.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Server for ntex framework" description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -3,7 +3,8 @@ use std::{cell::Cell, cell::RefCell, collections::VecDeque, rc::Rc, sync::Arc};
use async_channel::{unbounded, Receiver, Sender}; use async_channel::{unbounded, Receiver, Sender};
use ntex_rt::System; use ntex_rt::System;
use ntex_util::{future::join_all, time::sleep, time::Millis}; use ntex_util::future::join_all;
use ntex_util::time::{sleep, timeout, Millis};
use crate::server::ServerShared; use crate::server::ServerShared;
use crate::signals::Signal; use crate::signals::Signal;
@ -238,16 +239,13 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
// stop workers // stop workers
if !self.workers.is_empty() { if !self.workers.is_empty() {
let timeout = self.mgr.0.cfg.shutdown_timeout; let to = self.mgr.0.cfg.shutdown_timeout;
if graceful && !timeout.is_zero() { if graceful && !to.is_zero() {
let futs: Vec<_> = self let futs: Vec<_> =
.workers self.workers.iter().map(|worker| worker.stop(to)).collect();
.iter()
.map(|worker| worker.stop(timeout))
.collect();
let _ = join_all(futs).await; let _ = timeout(to, join_all(futs)).await;
} else { } else {
self.workers.iter().for_each(|worker| { self.workers.iter().for_each(|worker| {
let _ = worker.stop(Millis::ZERO); let _ = worker.stop(Millis::ZERO);

View file

@ -288,7 +288,7 @@ impl ServerBuilder {
Ok(self) Ok(self)
} }
/// Add new service to the server. /// Set io tag for named service.
pub fn set_tag<N: AsRef<str>>(mut self, name: N, tag: &'static str) -> Self { pub fn set_tag<N: AsRef<str>>(mut self, name: N, tag: &'static str) -> Self {
let mut token = None; let mut token = None;
for sock in &self.sockets { for sock in &self.sockets {

View file

@ -16,12 +16,14 @@ pub struct Config(Rc<InnerServiceConfig>);
#[derive(Debug)] #[derive(Debug)]
pub(super) struct InnerServiceConfig { pub(super) struct InnerServiceConfig {
pub(super) pool: Cell<PoolId>, pub(super) pool: Cell<PoolId>,
pub(super) tag: Cell<Option<&'static str>>,
} }
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self(Rc::new(InnerServiceConfig { Self(Rc::new(InnerServiceConfig {
pool: Cell::new(PoolId::DEFAULT), pool: Cell::new(PoolId::DEFAULT),
tag: Cell::new(None),
})) }))
} }
} }
@ -35,9 +37,19 @@ impl Config {
self self
} }
/// Set io tag for the service.
pub fn tag(&self, tag: &'static str) -> &Self {
self.0.tag.set(Some(tag));
self
}
pub(super) fn get_pool_id(&self) -> PoolId { pub(super) fn get_pool_id(&self) -> PoolId {
self.0.pool.get() self.0.pool.get()
} }
pub(super) fn get_tag(&self) -> Option<&'static str> {
self.0.tag.get()
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View file

@ -91,20 +91,24 @@ where
fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> { fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
let cfg = Config::default(); let cfg = Config::default();
let pool = cfg.get_pool_id();
let name = self.name.clone(); let name = self.name.clone();
let tokens = self.tokens.clone(); let mut tokens = self.tokens.clone();
let factory_fut = (self.factory)(cfg); let factory_fut = (self.factory)(cfg.clone());
Box::pin(async move { Box::pin(async move {
let factory = factory_fut.await.map_err(|_| { let factory = factory_fut.await.map_err(|_| {
log::error!("Cannot create {:?} service", name); log::error!("Cannot create {:?} service", name);
})?; })?;
if let Some(tag) = cfg.get_tag() {
for item in &mut tokens {
item.1 = tag;
}
}
Ok(vec![NetService { Ok(vec![NetService {
tokens, tokens,
factory, factory,
pool, pool: cfg.get_pool_id(),
}]) }])
}) })
} }

View file

@ -12,7 +12,7 @@ use ntex_util::time::{sleep, timeout_checked, Millis};
use crate::{ServerConfiguration, WorkerId}; use crate::{ServerConfiguration, WorkerId};
const STOP_TIMEOUT: Millis = Millis(5000); const STOP_TIMEOUT: Millis = Millis(3000);
#[derive(Debug)] #[derive(Debug)]
/// Shutdown worker /// Shutdown worker
@ -284,6 +284,7 @@ where
} }
} }
// re-create service
loop { loop {
match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await { match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
Either::Left(Ok(service)) => { Either::Left(Ok(service)) => {

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.5.0] - 2024-09-24
* Allow to set io tag for web server
## [2.4.0] - 2024-09-05 ## [2.4.0] - 2024-09-05
* Add experimental `compio` runtime support * Add experimental `compio` runtime support

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "2.4.1" version = "2.5.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -68,7 +68,7 @@ ntex-service = "3.0"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "2" ntex-util = "2"
ntex-bytes = "0.1.27" ntex-bytes = "0.1.27"
ntex-server = "2.3" ntex-server = "2.4"
ntex-h2 = "1.1" ntex-h2 = "1.1"
ntex-rt = "0.4.17" ntex-rt = "0.4.17"
ntex-io = "2.5" ntex-io = "2.5"

View file

@ -40,6 +40,7 @@ async fn main() -> std::io::Result<()> {
.bind("0.0.0.0:8081")? .bind("0.0.0.0:8081")?
.workers(4) .workers(4)
.keep_alive(http::KeepAlive::Disabled) .keep_alive(http::KeepAlive::Disabled)
.tag("MY-SERVER")
.run() .run()
.await .await
} }

View file

@ -21,6 +21,7 @@ struct Config {
ssl_handshake_timeout: Seconds, ssl_handshake_timeout: Seconds,
headers_read_rate: Option<ReadRate>, headers_read_rate: Option<ReadRate>,
payload_read_rate: Option<ReadRate>, payload_read_rate: Option<ReadRate>,
tag: &'static str,
pool: PoolId, pool: PoolId,
} }
@ -107,6 +108,7 @@ where
max_timeout: Seconds(13), max_timeout: Seconds(13),
}), }),
payload_read_rate: None, payload_read_rate: None,
tag: "WEB",
pool: PoolId::P0, pool: PoolId::P0,
})), })),
backlog: 1024, backlog: 1024,
@ -308,6 +310,12 @@ where
self self
} }
/// Set io tag for web server
pub fn tag(self, tag: &'static str) -> Self {
self.config.lock().unwrap().tag = tag;
self
}
/// Set memory pool. /// Set memory pool.
/// ///
/// Use specified memory pool for memory allocations. /// Use specified memory pool for memory allocations.
@ -334,6 +342,7 @@ where
addr, addr,
c.host.clone().unwrap_or_else(|| format!("{}", addr)), c.host.clone().unwrap_or_else(|| format!("{}", addr)),
); );
r.tag(c.tag);
r.memory_pool(c.pool); r.memory_pool(c.pool);
HttpService::build_with_config(c.into_cfg()) HttpService::build_with_config(c.into_cfg())
@ -373,6 +382,7 @@ where
addr, addr,
c.host.clone().unwrap_or_else(|| format!("{}", addr)), c.host.clone().unwrap_or_else(|| format!("{}", addr)),
); );
r.tag(c.tag);
r.memory_pool(c.pool); r.memory_pool(c.pool);
HttpService::build_with_config(c.into_cfg()) HttpService::build_with_config(c.into_cfg())
@ -414,6 +424,7 @@ where
addr, addr,
c.host.clone().unwrap_or_else(|| format!("{}", addr)), c.host.clone().unwrap_or_else(|| format!("{}", addr)),
); );
r.tag(c.tag);
r.memory_pool(c.pool); r.memory_pool(c.pool);
HttpService::build_with_config(c.into_cfg()) HttpService::build_with_config(c.into_cfg())
@ -522,6 +533,7 @@ where
socket_addr, socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
r.tag(c.tag);
r.memory_pool(c.pool); r.memory_pool(c.pool);
HttpService::build_with_config(c.into_cfg()) HttpService::build_with_config(c.into_cfg())
@ -553,6 +565,7 @@ where
socket_addr, socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
r.tag(c.tag);
r.memory_pool(c.pool); r.memory_pool(c.pool);
HttpService::build_with_config(c.into_cfg()) HttpService::build_with_config(c.into_cfg())