Add io tags for logging (#267)

This commit is contained in:
Nikolay Kim 2023-12-12 18:18:52 +06:00 committed by GitHub
parent 8ee296a399
commit 6adeadfff0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 258 additions and 70 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.3.3] - 2023-12-12
* Add io tag support
## [0.3.2] - 2023-10-02
* Add `Clone` impl for `Connect<T>` type

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-connect"
version = "0.3.2"
version = "0.3.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntexwork connect utils for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -34,13 +34,13 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"]
async-std = ["ntex-rt/async-std", "ntex-async-std"]
[dependencies]
ntex-service = "1.2.6"
ntex-bytes = "0.1.19"
ntex-http = "0.1.10"
ntex-io = "0.3.3"
ntex-service = "1.2.7"
ntex-bytes = "0.1.21"
ntex-http = "0.1.11"
ntex-io = "0.3.15"
ntex-rt = "0.4.7"
ntex-tls = "0.3.1"
ntex-util = "0.3.2"
ntex-tls = "0.3.3"
ntex-util = "0.3.4"
log = "0.4"
thiserror = "1.0"

View file

@ -56,7 +56,7 @@ impl<T: Address> Connector<T> {
let openssl = self.openssl.clone();
let io = conn.await?;
trace!("SSL Handshake start for: {:?}", host);
trace!("{}SSL Handshake start for: {:?}", io.tag(), host);
match openssl.configure() {
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e).into()),
@ -64,13 +64,14 @@ impl<T: Address> Connector<T> {
let ssl = config
.into_ssl(&host)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let tag = io.tag();
match IoSslConnector::new(ssl).create(io).await {
Ok(io) => {
trace!("SSL Handshake success: {:?}", host);
trace!("{}SSL Handshake success: {:?}", tag, host);
Ok(io)
}
Err(e) => {
trace!("SSL Handshake error: {:?}", e);
trace!("{}SSL Handshake error: {:?}", tag, e);
Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into())
}
}

View file

@ -64,19 +64,20 @@ impl<T: Address + 'static> Connector<T> {
let connector = self.inner.clone();
let io = conn.await?;
trace!("SSL Handshake start for: {:?}", host);
trace!("{}SSL Handshake start for: {:?}", io.tag(), host);
let tag = io.tag();
let host = ServerName::try_from(host.as_str())
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))?;
let connector = connector.server_name(host.clone());
match connector.create(io).await {
Ok(io) => {
trace!("TLS Handshake success: {:?}", &host);
trace!("{}TLS Handshake success: {:?}", tag, &host);
Ok(io)
}
Err(e) => {
trace!("TLS Handshake error: {:?}", e);
trace!("{}TLS Handshake error: {:?}", tag, e);
Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into())
}
}

View file

@ -12,6 +12,7 @@ use crate::{net::tcp_connect_in, Address, Connect, ConnectError, Resolver};
pub struct Connector<T> {
resolver: Resolver<T>,
pool: PoolRef,
tag: &'static str,
}
impl<T> Connector<T> {
@ -20,10 +21,11 @@ impl<T> Connector<T> {
Connector {
resolver: Resolver::new(),
pool: PoolId::P0.pool_ref(),
tag: "",
}
}
/// Set memory pool.
/// Set memory pool
///
/// Use specified memory pool for memory allocations. By default P0
/// memory pool is used.
@ -31,6 +33,14 @@ impl<T> Connector<T> {
self.pool = id.pool_ref();
self
}
/// Set io tag
///
/// Set tag to opened io object.
pub fn tag(mut self, tag: &'static str) -> Self {
self.tag = tag;
self
}
}
impl<T: Address> Connector<T> {
@ -41,6 +51,7 @@ impl<T: Address> Connector<T> {
{
ConnectServiceResponse {
state: ConnectState::Resolve(Box::pin(self.resolver.lookup(message.into()))),
tag: self.tag,
pool: self.pool,
}
.await
@ -57,6 +68,7 @@ impl<T> Clone for Connector<T> {
fn clone(&self) -> Self {
Connector {
resolver: self.resolver.clone(),
tag: self.tag,
pool: self.pool,
}
}
@ -65,6 +77,7 @@ impl<T> Clone for Connector<T> {
impl<T> fmt::Debug for Connector<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connector")
.field("tagr", &self.tag)
.field("resolver", &self.resolver)
.field("memory_pool", &self.pool)
.finish()
@ -91,7 +104,11 @@ impl<T: Address> Service<Connect<T>> for Connector<T> {
#[inline]
fn call<'a>(&'a self, req: Connect<T>, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
ConnectServiceResponse::new(Box::pin(self.resolver.lookup(req)))
ConnectServiceResponse {
state: ConnectState::Resolve(Box::pin(self.resolver.lookup(req))),
pool: PoolId::P0.pool_ref(),
tag: self.tag,
}
}
}
@ -104,6 +121,7 @@ enum ConnectState<'f, T: Address> {
pub struct ConnectServiceResponse<'f, T: Address> {
state: ConnectState<'f, T>,
pool: PoolRef,
tag: &'static str,
}
impl<'f, T: Address> ConnectServiceResponse<'f, T> {
@ -111,6 +129,7 @@ impl<'f, T: Address> ConnectServiceResponse<'f, T> {
Self {
state: ConnectState::Resolve(fut),
pool: PoolId::P0.pool_ref(),
tag: "",
}
}
}
@ -118,6 +137,7 @@ impl<'f, T: Address> ConnectServiceResponse<'f, T> {
impl<'f, T: Address> fmt::Debug for ConnectServiceResponse<'f, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectServiceResponse")
.field("tag", &self.tag)
.field("pool", &self.pool)
.finish()
}
@ -136,7 +156,7 @@ impl<'f, T: Address> Future for ConnectServiceResponse<'f, T> {
if let Some(addr) = addr {
self.state = ConnectState::Connect(TcpConnectorResponse::new(
req, port, addr, self.pool,
req, port, addr, self.tag, self.pool,
));
self.poll(cx)
} else if let Some(addr) = req.addr() {
@ -144,11 +164,12 @@ impl<'f, T: Address> Future for ConnectServiceResponse<'f, T> {
req,
addr.port(),
Either::Left(addr),
self.tag,
self.pool,
));
self.poll(cx)
} else {
error!("TCP connector: got unresolved address");
error!("{}TCP connector: got unresolved address", self.tag);
Poll::Ready(Err(ConnectError::Unresolved))
}
}
@ -165,6 +186,7 @@ struct TcpConnectorResponse<T> {
addrs: Option<VecDeque<SocketAddr>>,
#[allow(clippy::type_complexity)]
stream: Option<BoxFuture<'static, Result<Io, io::Error>>>,
tag: &'static str,
pool: PoolRef,
}
@ -173,10 +195,12 @@ impl<T: Address> TcpConnectorResponse<T> {
req: T,
port: u16,
addr: Either<SocketAddr, VecDeque<SocketAddr>>,
tag: &'static str,
pool: PoolRef,
) -> TcpConnectorResponse<T> {
trace!(
"TCP connector - connecting to {:?} addr:{:?} port:{}",
"{}TCP connector - connecting to {:?} addr:{:?} port:{}",
tag,
req.host(),
addr,
port
@ -187,10 +211,12 @@ impl<T: Address> TcpConnectorResponse<T> {
req: Some(req),
addrs: None,
stream: Some(Box::pin(tcp_connect_in(addr, pool))),
tag,
pool,
port,
},
Either::Right(addrs) => TcpConnectorResponse {
tag,
port,
pool,
req: Some(req),
@ -202,7 +228,8 @@ impl<T: Address> TcpConnectorResponse<T> {
fn can_continue(&self, err: &io::Error) -> bool {
trace!(
"TCP connector - failed to connect to {:?} port: {} err: {:?}",
"{}TCP connector - failed to connect to {:?} port: {} err: {:?}",
self.tag,
self.req.as_ref().unwrap().host(),
self.port,
err
@ -224,10 +251,12 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
Poll::Ready(Ok(sock)) => {
let req = this.req.take().unwrap();
trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}",
"{}TCP connector - successfully connected to connecting to {:?} - {:?}",
this.tag,
req.host(),
sock.query::<types::PeerAddr>().get()
);
sock.set_tag(this.tag);
return Poll::Ready(Ok(sock));
}
Poll::Pending => return Poll::Pending,

View file

@ -2,6 +2,8 @@
## [0.3.15] - 2023-12-12
* Add io tags for logging
* Stop dispatcher timers on memory pool pause
## [0.3.14] - 2023-12-10

View file

@ -337,7 +337,10 @@ where
}
}
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
log::trace!(
"{}Dispatcher is instructed to stop",
slf.shared.io.tag()
);
slf.st = DispatcherState::Stop;
continue;
}
@ -348,7 +351,8 @@ where
}
Err(RecvError::Decoder(err)) => {
log::trace!(
"decoder error, stopping dispatcher: {:?}",
"{}Decoder error, stopping dispatcher: {:?}",
slf.shared.io.tag(),
err
);
slf.st = DispatcherState::Stop;
@ -356,7 +360,8 @@ where
}
Err(RecvError::PeerGone(err)) => {
log::trace!(
"peer is gone, stopping dispatcher: {:?}",
"{}Peer is gone, stopping dispatcher: {:?}",
slf.shared.io.tag(),
err
);
slf.st = DispatcherState::Stop;
@ -437,7 +442,10 @@ where
// shutdown service
DispatcherState::Shutdown => {
return if slf.shared.service.poll_shutdown(cx).is_ready() {
log::trace!("service shutdown is completed, stop");
log::trace!(
"{}Service shutdown is completed, stop",
slf.shared.io.tag()
);
Poll::Ready(if let Some(err) = slf.error.take() {
Err(err)
@ -463,7 +471,10 @@ where
Poll::Ready(Ok(_)) => {
// check for errors
Poll::Ready(if let Some(err) = self.shared.error.take() {
log::trace!("error occured, stopping dispatcher");
log::trace!(
"{}Error occured, stopping dispatcher",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
match err {
@ -481,7 +492,10 @@ where
}
// pause io read task
Poll::Pending => {
log::trace!("service is not ready, register dispatch task");
log::trace!(
"{}Service is not ready, register dispatch task",
self.shared.io.tag()
);
// remove all timers
self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
@ -489,18 +503,25 @@ where
match ready!(self.shared.io.poll_read_pause(cx)) {
IoStatusUpdate::KeepAlive => {
log::trace!("keep-alive error, stopping dispatcher during pause");
log::trace!(
"{}Keep-alive error, stopping dispatcher during pause",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout))
}
IoStatusUpdate::Stop => {
log::trace!("dispatcher is instructed to stop during pause");
log::trace!(
"{}Dispatcher is instructed to stop during pause",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
Poll::Ready(PollService::Continue)
}
IoStatusUpdate::PeerGone(err) => {
log::trace!(
"peer is gone during pause, stopping dispatcher: {:?}",
"{}Peer is gone during pause, stopping dispatcher: {:?}",
self.shared.io.tag(),
err
);
self.st = DispatcherState::Stop;
@ -511,7 +532,10 @@ where
}
// handle service readiness error
Poll::Ready(Err(err)) => {
log::trace!("service readiness check failed, stopping");
log::trace!(
"{}Service readiness check failed, stopping",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
self.error = Some(err);
self.flags.insert(Flags::READY_ERR);
@ -534,7 +558,8 @@ where
&& !self.flags.contains(Flags::KA_TIMEOUT)
{
log::debug!(
"Start keep-alive timer {:?}",
"{}Start keep-alive timer {:?}",
self.shared.io.tag(),
self.cfg.keepalive_timeout_secs()
);
self.flags.insert(Flags::KA_TIMEOUT);
@ -573,17 +598,27 @@ where
}
if max.is_zero() || !self.read_max_timeout.is_zero() {
log::trace!("Frame read rate {:?}, extend timer", total);
log::trace!(
"{}Frame read rate {:?}, extend timer",
self.shared.io.tag(),
total
);
self.shared.io.start_timer_secs(timeout);
return Ok(());
}
log::trace!("Max payload timeout has been reached");
log::trace!(
"{}Max payload timeout has been reached",
self.shared.io.tag()
);
}
return Err(DispatchItem::ReadTimeout);
}
}
log::trace!("Keep-alive error, stopping dispatcher");
log::trace!(
"{}Keep-alive error, stopping dispatcher",
self.shared.io.tag()
);
Err(DispatchItem::KeepAliveTimeout)
}
}
@ -676,6 +711,7 @@ mod tests {
let state = Io::new(io);
let pool = state.memory_pool().pool();
state.set_disconnect_timeout(cfg.disconnect_timeout());
state.set_tag("DBG: ");
let flags = if cfg.keepalive_timeout_secs().is_zero() {
super::Flags::empty()

View file

@ -66,6 +66,7 @@ pub(crate) struct IoState {
pub(super) filter: Cell<&'static dyn Filter>,
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
pub(super) timeout: Cell<TimerHandle>,
pub(super) tag: Cell<&'static str>,
#[allow(clippy::box_collection)]
pub(super) on_disconnect: Cell<Option<Box<Vec<LocalWaker>>>>,
}
@ -89,7 +90,7 @@ impl IoState {
}
pub(super) fn notify_timeout(&self) {
log::trace!("timeout, notify dispatcher");
log::trace!("{}Timeout, notify dispatcher", self.tag.get());
let mut flags = self.flags.get();
if !flags.contains(Flags::DSP_TIMEOUT) {
@ -128,7 +129,11 @@ impl IoState {
.get()
.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS)
{
log::trace!("initiate io shutdown {:?}", self.flags.get());
log::trace!(
"{}Initiate io shutdown {:?}",
self.tag.get(),
self.flags.get()
);
self.insert_flags(Flags::IO_STOPPING_FILTERS);
self.read_task.wake();
}
@ -198,6 +203,7 @@ impl Io {
handle: Cell::new(None),
timeout: Cell::new(TimerHandle::default()),
on_disconnect: Cell::new(None),
tag: Cell::new(""),
});
let filter = Box::new(Base::new(IoRef(inner.clone())));
@ -254,6 +260,7 @@ impl<F> Io<F> {
handle: Cell::new(None),
timeout: Cell::new(TimerHandle::default()),
on_disconnect: Cell::new(None),
tag: Cell::new(""),
});
let state = mem::replace(&mut self.0, IoRef(inner));
@ -544,7 +551,10 @@ impl<F> Io<F> {
match self.poll_read_ready(cx) {
Poll::Pending | Poll::Ready(Ok(Some(()))) => {
if log::log_enabled!(log::Level::Debug) && decoded.remains != 0 {
log::debug!("not enough data to decode next frame");
log::debug!(
"{}Not enough data to decode next frame",
self.tag()
);
}
Ok(decoded)
}
@ -692,7 +702,8 @@ impl<F> Drop for Io<F> {
if !self.0.flags().contains(Flags::IO_STOPPED) && self.1.is_set() {
log::trace!(
"io is dropped, force stopping io streams {:?}",
"{}Io is dropped, force stopping io streams {:?}",
self.tag(),
self.0.flags()
);

View file

@ -62,7 +62,7 @@ impl IoRef {
/// Dispatcher does not wait for uncompleted responses. Io stream get terminated
/// without any graceful period.
pub fn force_close(&self) {
log::trace!("force close io stream object");
log::trace!("{}Force close io stream object", self.tag());
self.0.insert_flags(
Flags::DSP_STOP
| Flags::IO_STOPPED
@ -83,7 +83,11 @@ impl IoRef {
.get()
.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS)
{
log::trace!("initiate io shutdown {:?}", self.0.flags.get());
log::trace!(
"{}Initiate io shutdown {:?}",
self.tag(),
self.0.flags.get()
);
self.0.insert_flags(Flags::IO_STOPPING_FILTERS);
self.0.read_task.wake();
}
@ -117,14 +121,18 @@ impl IoRef {
// in that case mark io as failed
.map_or_else(
|err| {
log::trace!("Got io error while encoding, error: {:?}", err);
log::trace!(
"{}Got io error while encoding, error: {:?}",
self.tag(),
err
);
self.0.io_stopped(Some(err));
Ok(())
},
|item| item,
)
} else {
log::trace!("Io is closed/closing, skip frame encoding");
log::trace!("{}Io is closed/closing, skip frame encoding", self.tag());
Ok(())
}
}
@ -240,20 +248,20 @@ impl IoRef {
if cur_hnd.is_set() {
let hnd = timer::update(cur_hnd, timeout, self);
if hnd != cur_hnd {
log::debug!("update timer {:?}", timeout);
log::debug!("{}Update timer {:?}", self.tag(), timeout);
self.0.timeout.set(hnd);
}
hnd
} else {
log::debug!("start timer {:?}", timeout);
log::debug!("{}Start timer {:?}", self.tag(), timeout);
let hnd = timer::register(timeout, self);
self.0.timeout.set(hnd);
hnd
}
} else {
if cur_hnd.is_set() {
timer::unregister(cur_hnd, self);
self.0.timeout.set(timer::TimerHandle::ZERO);
timer::unregister(cur_hnd, self);
}
timer::TimerHandle::ZERO
}
@ -264,7 +272,7 @@ impl IoRef {
pub fn stop_timer(&self) {
let hnd = self.0.timeout.get();
if hnd.is_set() {
log::debug!("unregister timer");
log::debug!("{}Stop timer", self.tag());
self.0.timeout.set(timer::TimerHandle::ZERO);
timer::unregister(hnd, self)
}
@ -286,6 +294,18 @@ impl IoRef {
self.stop_timer()
}
#[inline]
/// Get tag
pub fn tag(&self) -> &'static str {
self.0.tag.get()
}
#[inline]
/// Set tag
pub fn set_tag(&self, tag: &'static str) {
self.0.tag.set(tag)
}
#[inline]
/// Notify when io stream get disconnected
pub fn on_disconnect(&self) -> OnDisconnect {

View file

@ -46,7 +46,8 @@ impl ReadContext {
// dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
log::trace!(
"io read buffer is too large {}, enable read back-pressure",
"{}Io read buffer is too large {}, enable read back-pressure",
self.0.tag(),
total
);
inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
@ -61,7 +62,7 @@ impl ReadContext {
inner.read_task.wake();
}
}
log::trace!("new {} bytes available, wakeup dispatcher", nbytes);
log::trace!("{}New {} bytes available, wakeup dispatcher", self.0.tag(), nbytes);
inner.dispatch_task.wake();
} else {
if nbytes >= hw {

View file

@ -1,5 +1,9 @@
# Changes
## [0.7.14] - 2023-12-12
* Add io tag support for server
## [0.7.13] - 2023-11-29
* Refactor h1 timers

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.7.13"
version = "0.7.14"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -49,7 +49,7 @@ async-std = ["ntex-rt/async-std", "ntex-async-std", "ntex-connect/async-std"]
[dependencies]
ntex-codec = "0.6.2"
ntex-connect = "0.3.2"
ntex-connect = "0.3.3"
ntex-http = "0.1.11"
ntex-router = "0.5.2"
ntex-service = "1.2.7"
@ -59,7 +59,7 @@ ntex-bytes = "0.1.21"
ntex-h2 = "0.4.4"
ntex-rt = "0.4.11"
ntex-io = "0.3.15"
ntex-tls = "0.3.2"
ntex-tls = "0.3.3"
ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.1", optional = true }
ntex-async-std = { version = "0.3.2", optional = true }

View file

@ -156,9 +156,9 @@ impl ServerBuilder {
let mut cfg = cfg.0.borrow_mut();
let mut srv = ConfiguredService::new(cfg.apply.take().unwrap());
for (name, lst) in mem::take(&mut cfg.services) {
for (name, lst, tag) in mem::take(&mut cfg.services) {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?);
srv.stream(token, name.clone(), lst.local_addr()?, tag);
self.sockets.push((token, name, Listener::from_tcp(lst)));
}
self.services.push(Box::new(srv));
@ -184,9 +184,9 @@ impl ServerBuilder {
let mut cfg = inner.borrow_mut();
let mut srv = ConfiguredService::new(cfg.apply.take().unwrap());
for (name, lst) in mem::take(&mut cfg.services) {
for (name, lst, tag) in mem::take(&mut cfg.services) {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?);
srv.stream(token, name.clone(), lst.local_addr()?, tag);
self.sockets.push((token, name, Listener::from_tcp(lst)));
}
self.services.push(Box::new(srv));
@ -234,6 +234,7 @@ impl ServerBuilder {
token,
factory.clone(),
lst.local_addr()?,
"",
));
self.sockets
.push((token, name.as_ref().to_string(), Listener::from_tcp(lst)));
@ -287,6 +288,7 @@ impl ServerBuilder {
token,
factory,
addr,
"",
));
self.sockets
.push((token, name.as_ref().to_string(), Listener::from_uds(lst)));
@ -310,12 +312,36 @@ impl ServerBuilder {
token,
factory,
lst.local_addr()?,
"",
));
self.sockets
.push((token, name.as_ref().to_string(), Listener::from_tcp(lst)));
Ok(self)
}
/// Add new service to the server.
pub fn set_tag<N: AsRef<str>>(mut self, name: N, tag: &'static str) -> Self {
let mut token = None;
for sock in &self.sockets {
if &sock.1 == name.as_ref() {
token = Some(sock.0);
break;
}
}
if let Some(token) = token {
for svc in &mut self.services {
if svc.name(token) == name.as_ref() {
svc.set_tag(token, tag);
}
}
} else {
panic!("Cannot find service by name {:?}", name.as_ref());
}
self
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server {
if self.sockets.is_empty() {

View file

@ -46,7 +46,7 @@ pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
#[derive(Debug)]
pub(super) struct ServiceConfigInner {
pub(super) services: Vec<(String, net::TcpListener)>,
pub(super) services: Vec<(String, net::TcpListener, &'static str)>,
pub(super) apply: Option<Box<dyn ServiceRuntimeConfiguration + Send>>,
pub(super) threads: usize,
pub(super) backlog: i32,
@ -97,7 +97,18 @@ impl ServiceConfig {
_t: marker::PhantomData,
}));
}
inner.services.push((name.as_ref().to_string(), lst));
inner.services.push((name.as_ref().to_string(), lst, ""));
}
self
}
/// Set io tag for configured service.
pub fn set_tag<N: AsRef<str>>(&self, name: N, tag: &'static str) -> &Self {
let mut inner = self.0.borrow_mut();
for svc in &mut inner.services {
if svc.0 == name.as_ref() {
svc.2 = tag;
}
}
self
}
@ -124,7 +135,7 @@ impl ServiceConfig {
pub(super) struct ConfiguredService {
rt: Box<dyn ServiceRuntimeConfiguration + Send>,
names: HashMap<Token, (String, net::SocketAddr)>,
topics: HashMap<String, Token>,
topics: HashMap<String, (Token, &'static str)>,
services: Vec<Token>,
}
@ -138,9 +149,15 @@ impl ConfiguredService {
}
}
pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) {
pub(super) fn stream(
&mut self,
token: Token,
name: String,
addr: net::SocketAddr,
tag: &'static str,
) {
self.names.insert(token, (name.clone(), addr));
self.topics.insert(name, token);
self.topics.insert(name, (token, tag));
self.services.push(token);
}
}
@ -150,6 +167,14 @@ impl InternalServiceFactory for ConfiguredService {
&self.names[&token].0
}
fn set_tag(&mut self, token: Token, tag: &'static str) {
for item in self.topics.values_mut() {
if item.0 == token {
item.1 = tag;
}
}
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
rt: self.rt.clone(),
@ -199,6 +224,7 @@ impl InternalServiceFactory for ConfiguredService {
error!("Service {:?} is not configured", name);
Ready::<_, ()>::Ok(())
}),
"UNKNOWN",
PoolId::P0,
)),
));
@ -261,7 +287,7 @@ fn not_configured() {
pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
struct ServiceRuntimeInner {
names: HashMap<String, Token>,
names: HashMap<String, (Token, &'static str)>,
services: HashMap<Token, BoxServiceFactory>,
onstart: Vec<BoxFuture<'static, ()>>,
}
@ -278,7 +304,7 @@ impl fmt::Debug for ServiceRuntime {
}
impl ServiceRuntime {
fn new(names: HashMap<String, Token>) -> Self {
fn new(names: HashMap<String, (Token, &'static str)>) -> Self {
ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
names,
services: HashMap::default(),
@ -288,8 +314,8 @@ impl ServiceRuntime {
fn validate(&self) {
let inner = self.0.as_ref().borrow();
for (name, token) in &inner.names {
if !inner.services.contains_key(token) {
for (name, item) in &inner.names {
if !inner.services.contains_key(&item.0) {
error!("Service {:?} is not configured", name);
}
}
@ -321,11 +347,13 @@ impl ServiceRuntime {
T::InitError: fmt::Debug,
{
let mut inner = self.0.borrow_mut();
if let Some(token) = inner.names.get(name) {
if let Some((token, tag)) = inner.names.get(name) {
let token = *token;
let tag = *tag;
inner.services.insert(
token,
boxed::factory(ServiceFactory {
tag,
pool,
inner: service.into_factory(),
}),
@ -354,6 +382,7 @@ type BoxServiceFactory = service::boxed::BoxServiceFactory<
struct ServiceFactory<T> {
inner: T,
tag: &'static str,
pool: PoolId,
}
@ -371,11 +400,12 @@ where
type Future<'f> = BoxFuture<'f, Result<BoxedServerService, ()>> where Self: 'f;
fn create(&self, _: ()) -> Self::Future<'_> {
let tag = self.tag;
let pool = self.pool;
let fut = self.inner.create(());
Box::pin(async move {
match fut.await {
Ok(s) => Ok(boxed::service(StreamService::new(s, pool))),
Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))),
Err(e) => {
error!("Cannot construct service: {:?}", e);
Err(())

View file

@ -3,7 +3,7 @@ use std::{net::SocketAddr, rc::Rc, task::Context, task::Poll};
use log::error;
use crate::service::{boxed, Service, ServiceCtx, ServiceFactory};
use crate::util::{BoxFuture, Pool, PoolId};
use crate::util::{BoxFuture, Pool, PoolId, PoolRef};
use crate::{io::Io, time::Millis};
use super::{counter::CounterGuard, socket::Stream, Config, Token};
@ -27,6 +27,8 @@ pub(super) trait StreamServiceFactory: Send + Clone + 'static {
pub(super) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str;
fn set_tag(&mut self, token: Token, tag: &'static str);
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
@ -38,13 +40,17 @@ pub(super) type BoxedServerService =
#[derive(Clone)]
pub(super) struct StreamService<T> {
service: Rc<T>,
tag: &'static str,
pool: Pool,
pool_ref: PoolRef,
}
impl<T> StreamService<T> {
pub(crate) fn new(service: T, pid: PoolId) -> Self {
pub(crate) fn new(service: T, tag: &'static str, pid: PoolId) -> Self {
StreamService {
tag,
pool: pid.pool(),
pool_ref: pid.pool_ref(),
service: Rc::new(service),
}
}
@ -85,7 +91,8 @@ where
if let Ok(stream) = stream {
let stream: Io<_> = stream;
stream.set_memory_pool(self.pool.pool_ref());
stream.set_tag(self.tag);
stream.set_memory_pool(self.pool_ref);
let _ = ctx.call(self.service.as_ref(), stream).await;
drop(guard);
Ok(())
@ -101,6 +108,7 @@ where
pub(super) struct Factory<F: StreamServiceFactory> {
name: String,
tag: &'static str,
inner: F,
token: Token,
addr: SocketAddr,
@ -115,12 +123,14 @@ where
token: Token,
inner: F,
addr: SocketAddr,
tag: &'static str,
) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
name,
token,
inner,
addr,
tag,
})
}
}
@ -133,17 +143,23 @@ where
&self.name
}
fn set_tag(&mut self, _: Token, tag: &'static str) {
self.tag = tag;
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
token: self.token,
addr: self.addr,
tag: self.tag,
})
}
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
let token = self.token;
let tag = self.tag;
let cfg = Config::default();
let pool = cfg.get_pool_id();
let factory = self.inner.create(cfg);
@ -151,7 +167,7 @@ where
Box::pin(async move {
match factory.create(()).await {
Ok(inner) => {
let service = boxed::service(StreamService::new(inner, pool));
let service = boxed::service(StreamService::new(inner, tag, pool));
Ok(vec![(token, service)])
}
Err(_) => Err(()),
@ -165,6 +181,10 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
self.as_ref().name(token)
}
fn set_tag(&mut self, token: Token, tag: &'static str) {
self.as_mut().set_tag(token, tag);
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory()
}

View file

@ -55,6 +55,7 @@ where
sys.run(|| {
Server::build()
.listen("test", tcp, move |_| factory())?
.set_tag("test", "TEST: ")
.workers(1)
.disable_signals()
.run();

View file

@ -606,6 +606,7 @@ mod tests {
Token(0),
move |_| f.clone(),
"127.0.0.1:8080".parse().unwrap(),
"TEST",
)],
avail.clone(),
Millis(5_000),
@ -678,6 +679,7 @@ mod tests {
Token(0),
move |_| f.clone(),
"127.0.0.1:8080".parse().unwrap(),
"TEST",
)],
avail.clone(),
Millis(5_000),