From 6adeadfff00a8ccc845dc647433de90693244351 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 12 Dec 2023 18:18:52 +0600 Subject: [PATCH] Add io tags for logging (#267) --- ntex-connect/CHANGES.md | 4 +++ ntex-connect/Cargo.toml | 14 ++++---- ntex-connect/src/openssl.rs | 7 ++-- ntex-connect/src/rustls.rs | 7 ++-- ntex-connect/src/service.rs | 43 +++++++++++++++++++++---- ntex-io/CHANGES.md | 2 ++ ntex-io/src/dispatcher.rs | 64 +++++++++++++++++++++++++++++-------- ntex-io/src/io.rs | 19 ++++++++--- ntex-io/src/ioref.rs | 36 ++++++++++++++++----- ntex-io/src/tasks.rs | 5 +-- ntex/CHANGES.md | 4 +++ ntex/Cargo.toml | 6 ++-- ntex/src/server/builder.rs | 34 +++++++++++++++++--- ntex/src/server/config.rs | 52 +++++++++++++++++++++++------- ntex/src/server/service.rs | 28 +++++++++++++--- ntex/src/server/test.rs | 1 + ntex/src/server/worker.rs | 2 ++ 17 files changed, 258 insertions(+), 70 deletions(-) diff --git a/ntex-connect/CHANGES.md b/ntex-connect/CHANGES.md index 1c3b2d2d..fed99578 100644 --- a/ntex-connect/CHANGES.md +++ b/ntex-connect/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.3] - 2023-12-12 + +* Add io tag support + ## [0.3.2] - 2023-10-02 * Add `Clone` impl for `Connect` type diff --git a/ntex-connect/Cargo.toml b/ntex-connect/Cargo.toml index 8e5c43e5..7fe004c8 100644 --- a/ntex-connect/Cargo.toml +++ b/ntex-connect/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-connect" -version = "0.3.2" +version = "0.3.3" authors = ["ntex contributors "] description = "ntexwork connect utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -34,13 +34,13 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"] async-std = ["ntex-rt/async-std", "ntex-async-std"] [dependencies] -ntex-service = "1.2.6" -ntex-bytes = "0.1.19" -ntex-http = "0.1.10" -ntex-io = "0.3.3" +ntex-service = "1.2.7" +ntex-bytes = "0.1.21" +ntex-http = "0.1.11" +ntex-io = "0.3.15" ntex-rt = "0.4.7" -ntex-tls = "0.3.1" -ntex-util = "0.3.2" +ntex-tls = "0.3.3" +ntex-util = "0.3.4" log = "0.4" thiserror = "1.0" diff --git a/ntex-connect/src/openssl.rs b/ntex-connect/src/openssl.rs index eb52e77d..3197da08 100644 --- a/ntex-connect/src/openssl.rs +++ b/ntex-connect/src/openssl.rs @@ -56,7 +56,7 @@ impl Connector { let openssl = self.openssl.clone(); let io = conn.await?; - trace!("SSL Handshake start for: {:?}", host); + trace!("{}SSL Handshake start for: {:?}", io.tag(), host); match openssl.configure() { Err(e) => Err(io::Error::new(io::ErrorKind::Other, e).into()), @@ -64,13 +64,14 @@ impl Connector { let ssl = config .into_ssl(&host) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let tag = io.tag(); match IoSslConnector::new(ssl).create(io).await { Ok(io) => { - trace!("SSL Handshake success: {:?}", host); + trace!("{}SSL Handshake success: {:?}", tag, host); Ok(io) } Err(e) => { - trace!("SSL Handshake error: {:?}", e); + trace!("{}SSL Handshake error: {:?}", tag, e); Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into()) } } diff --git a/ntex-connect/src/rustls.rs b/ntex-connect/src/rustls.rs index 90144afe..b57801bf 100644 --- a/ntex-connect/src/rustls.rs +++ b/ntex-connect/src/rustls.rs @@ -64,19 +64,20 @@ impl Connector { let connector = self.inner.clone(); let io = conn.await?; - trace!("SSL Handshake start for: {:?}", host); + trace!("{}SSL Handshake start for: {:?}", io.tag(), host); + let tag = io.tag(); let host = ServerName::try_from(host.as_str()) .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))?; let connector = connector.server_name(host.clone()); match connector.create(io).await { Ok(io) => { - trace!("TLS Handshake success: {:?}", &host); + trace!("{}TLS Handshake success: {:?}", tag, &host); Ok(io) } Err(e) => { - trace!("TLS Handshake error: {:?}", e); + trace!("{}TLS Handshake error: {:?}", tag, e); Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into()) } } diff --git a/ntex-connect/src/service.rs b/ntex-connect/src/service.rs index 9ae25b13..6c879461 100644 --- a/ntex-connect/src/service.rs +++ b/ntex-connect/src/service.rs @@ -12,6 +12,7 @@ use crate::{net::tcp_connect_in, Address, Connect, ConnectError, Resolver}; pub struct Connector { resolver: Resolver, pool: PoolRef, + tag: &'static str, } impl Connector { @@ -20,10 +21,11 @@ impl Connector { Connector { resolver: Resolver::new(), pool: PoolId::P0.pool_ref(), + tag: "", } } - /// Set memory pool. + /// Set memory pool /// /// Use specified memory pool for memory allocations. By default P0 /// memory pool is used. @@ -31,6 +33,14 @@ impl Connector { self.pool = id.pool_ref(); self } + + /// Set io tag + /// + /// Set tag to opened io object. + pub fn tag(mut self, tag: &'static str) -> Self { + self.tag = tag; + self + } } impl Connector { @@ -41,6 +51,7 @@ impl Connector { { ConnectServiceResponse { state: ConnectState::Resolve(Box::pin(self.resolver.lookup(message.into()))), + tag: self.tag, pool: self.pool, } .await @@ -57,6 +68,7 @@ impl Clone for Connector { fn clone(&self) -> Self { Connector { resolver: self.resolver.clone(), + tag: self.tag, pool: self.pool, } } @@ -65,6 +77,7 @@ impl Clone for Connector { impl fmt::Debug for Connector { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connector") + .field("tagr", &self.tag) .field("resolver", &self.resolver) .field("memory_pool", &self.pool) .finish() @@ -91,7 +104,11 @@ impl Service> for Connector { #[inline] fn call<'a>(&'a self, req: Connect, _: ServiceCtx<'a, Self>) -> Self::Future<'a> { - ConnectServiceResponse::new(Box::pin(self.resolver.lookup(req))) + ConnectServiceResponse { + state: ConnectState::Resolve(Box::pin(self.resolver.lookup(req))), + pool: PoolId::P0.pool_ref(), + tag: self.tag, + } } } @@ -104,6 +121,7 @@ enum ConnectState<'f, T: Address> { pub struct ConnectServiceResponse<'f, T: Address> { state: ConnectState<'f, T>, pool: PoolRef, + tag: &'static str, } impl<'f, T: Address> ConnectServiceResponse<'f, T> { @@ -111,6 +129,7 @@ impl<'f, T: Address> ConnectServiceResponse<'f, T> { Self { state: ConnectState::Resolve(fut), pool: PoolId::P0.pool_ref(), + tag: "", } } } @@ -118,6 +137,7 @@ impl<'f, T: Address> ConnectServiceResponse<'f, T> { impl<'f, T: Address> fmt::Debug for ConnectServiceResponse<'f, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ConnectServiceResponse") + .field("tag", &self.tag) .field("pool", &self.pool) .finish() } @@ -136,7 +156,7 @@ impl<'f, T: Address> Future for ConnectServiceResponse<'f, T> { if let Some(addr) = addr { self.state = ConnectState::Connect(TcpConnectorResponse::new( - req, port, addr, self.pool, + req, port, addr, self.tag, self.pool, )); self.poll(cx) } else if let Some(addr) = req.addr() { @@ -144,11 +164,12 @@ impl<'f, T: Address> Future for ConnectServiceResponse<'f, T> { req, addr.port(), Either::Left(addr), + self.tag, self.pool, )); self.poll(cx) } else { - error!("TCP connector: got unresolved address"); + error!("{}TCP connector: got unresolved address", self.tag); Poll::Ready(Err(ConnectError::Unresolved)) } } @@ -165,6 +186,7 @@ struct TcpConnectorResponse { addrs: Option>, #[allow(clippy::type_complexity)] stream: Option>>, + tag: &'static str, pool: PoolRef, } @@ -173,10 +195,12 @@ impl TcpConnectorResponse { req: T, port: u16, addr: Either>, + tag: &'static str, pool: PoolRef, ) -> TcpConnectorResponse { trace!( - "TCP connector - connecting to {:?} addr:{:?} port:{}", + "{}TCP connector - connecting to {:?} addr:{:?} port:{}", + tag, req.host(), addr, port @@ -187,10 +211,12 @@ impl TcpConnectorResponse { req: Some(req), addrs: None, stream: Some(Box::pin(tcp_connect_in(addr, pool))), + tag, pool, port, }, Either::Right(addrs) => TcpConnectorResponse { + tag, port, pool, req: Some(req), @@ -202,7 +228,8 @@ impl TcpConnectorResponse { fn can_continue(&self, err: &io::Error) -> bool { trace!( - "TCP connector - failed to connect to {:?} port: {} err: {:?}", + "{}TCP connector - failed to connect to {:?} port: {} err: {:?}", + self.tag, self.req.as_ref().unwrap().host(), self.port, err @@ -224,10 +251,12 @@ impl Future for TcpConnectorResponse { Poll::Ready(Ok(sock)) => { let req = this.req.take().unwrap(); trace!( - "TCP connector - successfully connected to connecting to {:?} - {:?}", + "{}TCP connector - successfully connected to connecting to {:?} - {:?}", + this.tag, req.host(), sock.query::().get() ); + sock.set_tag(this.tag); return Poll::Ready(Ok(sock)); } Poll::Pending => return Poll::Pending, diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 5125cc26..cec955c5 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -2,6 +2,8 @@ ## [0.3.15] - 2023-12-12 +* Add io tags for logging + * Stop dispatcher timers on memory pool pause ## [0.3.14] - 2023-12-10 diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index dc1e0167..332ea1d2 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -337,7 +337,10 @@ where } } Err(RecvError::Stop) => { - log::trace!("dispatcher is instructed to stop"); + log::trace!( + "{}Dispatcher is instructed to stop", + slf.shared.io.tag() + ); slf.st = DispatcherState::Stop; continue; } @@ -348,7 +351,8 @@ where } Err(RecvError::Decoder(err)) => { log::trace!( - "decoder error, stopping dispatcher: {:?}", + "{}Decoder error, stopping dispatcher: {:?}", + slf.shared.io.tag(), err ); slf.st = DispatcherState::Stop; @@ -356,7 +360,8 @@ where } Err(RecvError::PeerGone(err)) => { log::trace!( - "peer is gone, stopping dispatcher: {:?}", + "{}Peer is gone, stopping dispatcher: {:?}", + slf.shared.io.tag(), err ); slf.st = DispatcherState::Stop; @@ -437,7 +442,10 @@ where // shutdown service DispatcherState::Shutdown => { return if slf.shared.service.poll_shutdown(cx).is_ready() { - log::trace!("service shutdown is completed, stop"); + log::trace!( + "{}Service shutdown is completed, stop", + slf.shared.io.tag() + ); Poll::Ready(if let Some(err) = slf.error.take() { Err(err) @@ -463,7 +471,10 @@ where Poll::Ready(Ok(_)) => { // check for errors Poll::Ready(if let Some(err) = self.shared.error.take() { - log::trace!("error occured, stopping dispatcher"); + log::trace!( + "{}Error occured, stopping dispatcher", + self.shared.io.tag() + ); self.st = DispatcherState::Stop; match err { @@ -481,7 +492,10 @@ where } // pause io read task Poll::Pending => { - log::trace!("service is not ready, register dispatch task"); + log::trace!( + "{}Service is not ready, register dispatch task", + self.shared.io.tag() + ); // remove all timers self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); @@ -489,18 +503,25 @@ where match ready!(self.shared.io.poll_read_pause(cx)) { IoStatusUpdate::KeepAlive => { - log::trace!("keep-alive error, stopping dispatcher during pause"); + log::trace!( + "{}Keep-alive error, stopping dispatcher during pause", + self.shared.io.tag() + ); self.st = DispatcherState::Stop; Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout)) } IoStatusUpdate::Stop => { - log::trace!("dispatcher is instructed to stop during pause"); + log::trace!( + "{}Dispatcher is instructed to stop during pause", + self.shared.io.tag() + ); self.st = DispatcherState::Stop; Poll::Ready(PollService::Continue) } IoStatusUpdate::PeerGone(err) => { log::trace!( - "peer is gone during pause, stopping dispatcher: {:?}", + "{}Peer is gone during pause, stopping dispatcher: {:?}", + self.shared.io.tag(), err ); self.st = DispatcherState::Stop; @@ -511,7 +532,10 @@ where } // handle service readiness error Poll::Ready(Err(err)) => { - log::trace!("service readiness check failed, stopping"); + log::trace!( + "{}Service readiness check failed, stopping", + self.shared.io.tag() + ); self.st = DispatcherState::Stop; self.error = Some(err); self.flags.insert(Flags::READY_ERR); @@ -534,7 +558,8 @@ where && !self.flags.contains(Flags::KA_TIMEOUT) { log::debug!( - "Start keep-alive timer {:?}", + "{}Start keep-alive timer {:?}", + self.shared.io.tag(), self.cfg.keepalive_timeout_secs() ); self.flags.insert(Flags::KA_TIMEOUT); @@ -573,17 +598,27 @@ where } if max.is_zero() || !self.read_max_timeout.is_zero() { - log::trace!("Frame read rate {:?}, extend timer", total); + log::trace!( + "{}Frame read rate {:?}, extend timer", + self.shared.io.tag(), + total + ); self.shared.io.start_timer_secs(timeout); return Ok(()); } - log::trace!("Max payload timeout has been reached"); + log::trace!( + "{}Max payload timeout has been reached", + self.shared.io.tag() + ); } return Err(DispatchItem::ReadTimeout); } } - log::trace!("Keep-alive error, stopping dispatcher"); + log::trace!( + "{}Keep-alive error, stopping dispatcher", + self.shared.io.tag() + ); Err(DispatchItem::KeepAliveTimeout) } } @@ -676,6 +711,7 @@ mod tests { let state = Io::new(io); let pool = state.memory_pool().pool(); state.set_disconnect_timeout(cfg.disconnect_timeout()); + state.set_tag("DBG: "); let flags = if cfg.keepalive_timeout_secs().is_zero() { super::Flags::empty() diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 23345eb0..4406a64a 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -66,6 +66,7 @@ pub(crate) struct IoState { pub(super) filter: Cell<&'static dyn Filter>, pub(super) handle: Cell>>, pub(super) timeout: Cell, + pub(super) tag: Cell<&'static str>, #[allow(clippy::box_collection)] pub(super) on_disconnect: Cell>>>, } @@ -89,7 +90,7 @@ impl IoState { } pub(super) fn notify_timeout(&self) { - log::trace!("timeout, notify dispatcher"); + log::trace!("{}Timeout, notify dispatcher", self.tag.get()); let mut flags = self.flags.get(); if !flags.contains(Flags::DSP_TIMEOUT) { @@ -128,7 +129,11 @@ impl IoState { .get() .intersects(Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS) { - log::trace!("initiate io shutdown {:?}", self.flags.get()); + log::trace!( + "{}Initiate io shutdown {:?}", + self.tag.get(), + self.flags.get() + ); self.insert_flags(Flags::IO_STOPPING_FILTERS); self.read_task.wake(); } @@ -198,6 +203,7 @@ impl Io { handle: Cell::new(None), timeout: Cell::new(TimerHandle::default()), on_disconnect: Cell::new(None), + tag: Cell::new(""), }); let filter = Box::new(Base::new(IoRef(inner.clone()))); @@ -254,6 +260,7 @@ impl Io { handle: Cell::new(None), timeout: Cell::new(TimerHandle::default()), on_disconnect: Cell::new(None), + tag: Cell::new(""), }); let state = mem::replace(&mut self.0, IoRef(inner)); @@ -544,7 +551,10 @@ impl Io { match self.poll_read_ready(cx) { Poll::Pending | Poll::Ready(Ok(Some(()))) => { if log::log_enabled!(log::Level::Debug) && decoded.remains != 0 { - log::debug!("not enough data to decode next frame"); + log::debug!( + "{}Not enough data to decode next frame", + self.tag() + ); } Ok(decoded) } @@ -692,7 +702,8 @@ impl Drop for Io { if !self.0.flags().contains(Flags::IO_STOPPED) && self.1.is_set() { log::trace!( - "io is dropped, force stopping io streams {:?}", + "{}Io is dropped, force stopping io streams {:?}", + self.tag(), self.0.flags() ); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 2881c080..2dd5db25 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -62,7 +62,7 @@ impl IoRef { /// Dispatcher does not wait for uncompleted responses. Io stream get terminated /// without any graceful period. pub fn force_close(&self) { - log::trace!("force close io stream object"); + log::trace!("{}Force close io stream object", self.tag()); self.0.insert_flags( Flags::DSP_STOP | Flags::IO_STOPPED @@ -83,7 +83,11 @@ impl IoRef { .get() .intersects(Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS) { - log::trace!("initiate io shutdown {:?}", self.0.flags.get()); + log::trace!( + "{}Initiate io shutdown {:?}", + self.tag(), + self.0.flags.get() + ); self.0.insert_flags(Flags::IO_STOPPING_FILTERS); self.0.read_task.wake(); } @@ -117,14 +121,18 @@ impl IoRef { // in that case mark io as failed .map_or_else( |err| { - log::trace!("Got io error while encoding, error: {:?}", err); + log::trace!( + "{}Got io error while encoding, error: {:?}", + self.tag(), + err + ); self.0.io_stopped(Some(err)); Ok(()) }, |item| item, ) } else { - log::trace!("Io is closed/closing, skip frame encoding"); + log::trace!("{}Io is closed/closing, skip frame encoding", self.tag()); Ok(()) } } @@ -240,20 +248,20 @@ impl IoRef { if cur_hnd.is_set() { let hnd = timer::update(cur_hnd, timeout, self); if hnd != cur_hnd { - log::debug!("update timer {:?}", timeout); + log::debug!("{}Update timer {:?}", self.tag(), timeout); self.0.timeout.set(hnd); } hnd } else { - log::debug!("start timer {:?}", timeout); + log::debug!("{}Start timer {:?}", self.tag(), timeout); let hnd = timer::register(timeout, self); self.0.timeout.set(hnd); hnd } } else { if cur_hnd.is_set() { - timer::unregister(cur_hnd, self); self.0.timeout.set(timer::TimerHandle::ZERO); + timer::unregister(cur_hnd, self); } timer::TimerHandle::ZERO } @@ -264,7 +272,7 @@ impl IoRef { pub fn stop_timer(&self) { let hnd = self.0.timeout.get(); if hnd.is_set() { - log::debug!("unregister timer"); + log::debug!("{}Stop timer", self.tag()); self.0.timeout.set(timer::TimerHandle::ZERO); timer::unregister(hnd, self) } @@ -286,6 +294,18 @@ impl IoRef { self.stop_timer() } + #[inline] + /// Get tag + pub fn tag(&self) -> &'static str { + self.0.tag.get() + } + + #[inline] + /// Set tag + pub fn set_tag(&self, tag: &'static str) { + self.0.tag.set(tag) + } + #[inline] /// Notify when io stream get disconnected pub fn on_disconnect(&self) -> OnDisconnect { diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index e2bbff07..b201f095 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -46,7 +46,8 @@ impl ReadContext { // dest buffer has new data, wake up dispatcher if inner.buffer.read_destination_size() >= hw { log::trace!( - "io read buffer is too large {}, enable read back-pressure", + "{}Io read buffer is too large {}, enable read back-pressure", + self.0.tag(), total ); inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); @@ -61,7 +62,7 @@ impl ReadContext { inner.read_task.wake(); } } - log::trace!("new {} bytes available, wakeup dispatcher", nbytes); + log::trace!("{}New {} bytes available, wakeup dispatcher", self.0.tag(), nbytes); inner.dispatch_task.wake(); } else { if nbytes >= hw { diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index db937cc9..141fb073 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.14] - 2023-12-12 + +* Add io tag support for server + ## [0.7.13] - 2023-11-29 * Refactor h1 timers diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index c8c58dd7..3937d583 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.13" +version = "0.7.14" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -49,7 +49,7 @@ async-std = ["ntex-rt/async-std", "ntex-async-std", "ntex-connect/async-std"] [dependencies] ntex-codec = "0.6.2" -ntex-connect = "0.3.2" +ntex-connect = "0.3.3" ntex-http = "0.1.11" ntex-router = "0.5.2" ntex-service = "1.2.7" @@ -59,7 +59,7 @@ ntex-bytes = "0.1.21" ntex-h2 = "0.4.4" ntex-rt = "0.4.11" ntex-io = "0.3.15" -ntex-tls = "0.3.2" +ntex-tls = "0.3.3" ntex-tokio = { version = "0.3.1", optional = true } ntex-glommio = { version = "0.3.1", optional = true } ntex-async-std = { version = "0.3.2", optional = true } diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index 21dbb0d3..cb23ee11 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -156,9 +156,9 @@ impl ServerBuilder { let mut cfg = cfg.0.borrow_mut(); let mut srv = ConfiguredService::new(cfg.apply.take().unwrap()); - for (name, lst) in mem::take(&mut cfg.services) { + for (name, lst, tag) in mem::take(&mut cfg.services) { let token = self.token.next(); - srv.stream(token, name.clone(), lst.local_addr()?); + srv.stream(token, name.clone(), lst.local_addr()?, tag); self.sockets.push((token, name, Listener::from_tcp(lst))); } self.services.push(Box::new(srv)); @@ -184,9 +184,9 @@ impl ServerBuilder { let mut cfg = inner.borrow_mut(); let mut srv = ConfiguredService::new(cfg.apply.take().unwrap()); - for (name, lst) in mem::take(&mut cfg.services) { + for (name, lst, tag) in mem::take(&mut cfg.services) { let token = self.token.next(); - srv.stream(token, name.clone(), lst.local_addr()?); + srv.stream(token, name.clone(), lst.local_addr()?, tag); self.sockets.push((token, name, Listener::from_tcp(lst))); } self.services.push(Box::new(srv)); @@ -234,6 +234,7 @@ impl ServerBuilder { token, factory.clone(), lst.local_addr()?, + "", )); self.sockets .push((token, name.as_ref().to_string(), Listener::from_tcp(lst))); @@ -287,6 +288,7 @@ impl ServerBuilder { token, factory, addr, + "", )); self.sockets .push((token, name.as_ref().to_string(), Listener::from_uds(lst))); @@ -310,12 +312,36 @@ impl ServerBuilder { token, factory, lst.local_addr()?, + "", )); self.sockets .push((token, name.as_ref().to_string(), Listener::from_tcp(lst))); Ok(self) } + /// Add new service to the server. + pub fn set_tag>(mut self, name: N, tag: &'static str) -> Self { + let mut token = None; + for sock in &self.sockets { + if &sock.1 == name.as_ref() { + token = Some(sock.0); + break; + } + } + + if let Some(token) = token { + for svc in &mut self.services { + if svc.name(token) == name.as_ref() { + svc.set_tag(token, tag); + } + } + } else { + panic!("Cannot find service by name {:?}", name.as_ref()); + } + + self + } + /// Starts processing incoming connections and return server controller. pub fn run(mut self) -> Server { if self.sockets.is_empty() { diff --git a/ntex/src/server/config.rs b/ntex/src/server/config.rs index 8f9c8303..fb3dead4 100644 --- a/ntex/src/server/config.rs +++ b/ntex/src/server/config.rs @@ -46,7 +46,7 @@ pub struct ServiceConfig(pub(super) Rc>); #[derive(Debug)] pub(super) struct ServiceConfigInner { - pub(super) services: Vec<(String, net::TcpListener)>, + pub(super) services: Vec<(String, net::TcpListener, &'static str)>, pub(super) apply: Option>, pub(super) threads: usize, pub(super) backlog: i32, @@ -97,7 +97,18 @@ impl ServiceConfig { _t: marker::PhantomData, })); } - inner.services.push((name.as_ref().to_string(), lst)); + inner.services.push((name.as_ref().to_string(), lst, "")); + } + self + } + + /// Set io tag for configured service. + pub fn set_tag>(&self, name: N, tag: &'static str) -> &Self { + let mut inner = self.0.borrow_mut(); + for svc in &mut inner.services { + if svc.0 == name.as_ref() { + svc.2 = tag; + } } self } @@ -124,7 +135,7 @@ impl ServiceConfig { pub(super) struct ConfiguredService { rt: Box, names: HashMap, - topics: HashMap, + topics: HashMap, services: Vec, } @@ -138,9 +149,15 @@ impl ConfiguredService { } } - pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) { + pub(super) fn stream( + &mut self, + token: Token, + name: String, + addr: net::SocketAddr, + tag: &'static str, + ) { self.names.insert(token, (name.clone(), addr)); - self.topics.insert(name, token); + self.topics.insert(name, (token, tag)); self.services.push(token); } } @@ -150,6 +167,14 @@ impl InternalServiceFactory for ConfiguredService { &self.names[&token].0 } + fn set_tag(&mut self, token: Token, tag: &'static str) { + for item in self.topics.values_mut() { + if item.0 == token { + item.1 = tag; + } + } + } + fn clone_factory(&self) -> Box { Box::new(Self { rt: self.rt.clone(), @@ -199,6 +224,7 @@ impl InternalServiceFactory for ConfiguredService { error!("Service {:?} is not configured", name); Ready::<_, ()>::Ok(()) }), + "UNKNOWN", PoolId::P0, )), )); @@ -261,7 +287,7 @@ fn not_configured() { pub struct ServiceRuntime(Rc>); struct ServiceRuntimeInner { - names: HashMap, + names: HashMap, services: HashMap, onstart: Vec>, } @@ -278,7 +304,7 @@ impl fmt::Debug for ServiceRuntime { } impl ServiceRuntime { - fn new(names: HashMap) -> Self { + fn new(names: HashMap) -> Self { ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner { names, services: HashMap::default(), @@ -288,8 +314,8 @@ impl ServiceRuntime { fn validate(&self) { let inner = self.0.as_ref().borrow(); - for (name, token) in &inner.names { - if !inner.services.contains_key(token) { + for (name, item) in &inner.names { + if !inner.services.contains_key(&item.0) { error!("Service {:?} is not configured", name); } } @@ -321,11 +347,13 @@ impl ServiceRuntime { T::InitError: fmt::Debug, { let mut inner = self.0.borrow_mut(); - if let Some(token) = inner.names.get(name) { + if let Some((token, tag)) = inner.names.get(name) { let token = *token; + let tag = *tag; inner.services.insert( token, boxed::factory(ServiceFactory { + tag, pool, inner: service.into_factory(), }), @@ -354,6 +382,7 @@ type BoxServiceFactory = service::boxed::BoxServiceFactory< struct ServiceFactory { inner: T, + tag: &'static str, pool: PoolId, } @@ -371,11 +400,12 @@ where type Future<'f> = BoxFuture<'f, Result> where Self: 'f; fn create(&self, _: ()) -> Self::Future<'_> { + let tag = self.tag; let pool = self.pool; let fut = self.inner.create(()); Box::pin(async move { match fut.await { - Ok(s) => Ok(boxed::service(StreamService::new(s, pool))), + Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))), Err(e) => { error!("Cannot construct service: {:?}", e); Err(()) diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index e9a8d72e..248dffda 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, rc::Rc, task::Context, task::Poll}; use log::error; use crate::service::{boxed, Service, ServiceCtx, ServiceFactory}; -use crate::util::{BoxFuture, Pool, PoolId}; +use crate::util::{BoxFuture, Pool, PoolId, PoolRef}; use crate::{io::Io, time::Millis}; use super::{counter::CounterGuard, socket::Stream, Config, Token}; @@ -27,6 +27,8 @@ pub(super) trait StreamServiceFactory: Send + Clone + 'static { pub(super) trait InternalServiceFactory: Send { fn name(&self, token: Token) -> &str; + fn set_tag(&mut self, token: Token, tag: &'static str); + fn clone_factory(&self) -> Box; fn create(&self) -> BoxFuture<'static, Result, ()>>; @@ -38,13 +40,17 @@ pub(super) type BoxedServerService = #[derive(Clone)] pub(super) struct StreamService { service: Rc, + tag: &'static str, pool: Pool, + pool_ref: PoolRef, } impl StreamService { - pub(crate) fn new(service: T, pid: PoolId) -> Self { + pub(crate) fn new(service: T, tag: &'static str, pid: PoolId) -> Self { StreamService { + tag, pool: pid.pool(), + pool_ref: pid.pool_ref(), service: Rc::new(service), } } @@ -85,7 +91,8 @@ where if let Ok(stream) = stream { let stream: Io<_> = stream; - stream.set_memory_pool(self.pool.pool_ref()); + stream.set_tag(self.tag); + stream.set_memory_pool(self.pool_ref); let _ = ctx.call(self.service.as_ref(), stream).await; drop(guard); Ok(()) @@ -101,6 +108,7 @@ where pub(super) struct Factory { name: String, + tag: &'static str, inner: F, token: Token, addr: SocketAddr, @@ -115,12 +123,14 @@ where token: Token, inner: F, addr: SocketAddr, + tag: &'static str, ) -> Box { Box::new(Self { name, token, inner, addr, + tag, }) } } @@ -133,17 +143,23 @@ where &self.name } + fn set_tag(&mut self, _: Token, tag: &'static str) { + self.tag = tag; + } + fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), token: self.token, addr: self.addr, + tag: self.tag, }) } fn create(&self) -> BoxFuture<'static, Result, ()>> { let token = self.token; + let tag = self.tag; let cfg = Config::default(); let pool = cfg.get_pool_id(); let factory = self.inner.create(cfg); @@ -151,7 +167,7 @@ where Box::pin(async move { match factory.create(()).await { Ok(inner) => { - let service = boxed::service(StreamService::new(inner, pool)); + let service = boxed::service(StreamService::new(inner, tag, pool)); Ok(vec![(token, service)]) } Err(_) => Err(()), @@ -165,6 +181,10 @@ impl InternalServiceFactory for Box { self.as_ref().name(token) } + fn set_tag(&mut self, token: Token, tag: &'static str) { + self.as_mut().set_tag(token, tag); + } + fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } diff --git a/ntex/src/server/test.rs b/ntex/src/server/test.rs index a0a0ac20..1267ab06 100644 --- a/ntex/src/server/test.rs +++ b/ntex/src/server/test.rs @@ -55,6 +55,7 @@ where sys.run(|| { Server::build() .listen("test", tcp, move |_| factory())? + .set_tag("test", "TEST: ") .workers(1) .disable_signals() .run(); diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index b144b39b..8a56407e 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -606,6 +606,7 @@ mod tests { Token(0), move |_| f.clone(), "127.0.0.1:8080".parse().unwrap(), + "TEST", )], avail.clone(), Millis(5_000), @@ -678,6 +679,7 @@ mod tests { Token(0), move |_| f.clone(), "127.0.0.1:8080".parse().unwrap(), + "TEST", )], avail.clone(), Millis(5_000),