Better io tags handling (#269)

This commit is contained in:
Nikolay Kim 2023-12-14 15:42:19 +06:00 committed by GitHub
parent 6adeadfff0
commit fb1d2a268d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 108 additions and 60 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.3.4] - 2023-12-14
* Better io tag handling
## [0.3.3] - 2023-12-12
* Add io tag support

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-connect"
version = "0.3.3"
version = "0.3.4"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntexwork connect utils for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -37,7 +37,7 @@ async-std = ["ntex-rt/async-std", "ntex-async-std"]
ntex-service = "1.2.7"
ntex-bytes = "0.1.21"
ntex-http = "0.1.11"
ntex-io = "0.3.15"
ntex-io = "0.3.16"
ntex-rt = "0.4.7"
ntex-tls = "0.3.3"
ntex-util = "0.3.4"

View file

@ -56,7 +56,7 @@ impl<T: Address> Connector<T> {
let openssl = self.openssl.clone();
let io = conn.await?;
trace!("{}SSL Handshake start for: {:?}", io.tag(), host);
trace!("{}: SSL Handshake start for: {:?}", io.tag(), host);
match openssl.configure() {
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e).into()),
@ -67,11 +67,11 @@ impl<T: Address> Connector<T> {
let tag = io.tag();
match IoSslConnector::new(ssl).create(io).await {
Ok(io) => {
trace!("{}SSL Handshake success: {:?}", tag, host);
trace!("{}: SSL Handshake success: {:?}", tag, host);
Ok(io)
}
Err(e) => {
trace!("{}SSL Handshake error: {:?}", tag, e);
trace!("{}: SSL Handshake error: {:?}", tag, e);
Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into())
}
}

View file

@ -64,7 +64,7 @@ impl<T: Address + 'static> Connector<T> {
let connector = self.inner.clone();
let io = conn.await?;
trace!("{}SSL Handshake start for: {:?}", io.tag(), host);
trace!("{}: SSL Handshake start for: {:?}", io.tag(), host);
let tag = io.tag();
let host = ServerName::try_from(host.as_str())
@ -73,11 +73,11 @@ impl<T: Address + 'static> Connector<T> {
match connector.create(io).await {
Ok(io) => {
trace!("{}TLS Handshake success: {:?}", tag, &host);
trace!("{}: TLS Handshake success: {:?}", tag, &host);
Ok(io)
}
Err(e) => {
trace!("{}TLS Handshake error: {:?}", tag, e);
trace!("{}: TLS Handshake error: {:?}", tag, e);
Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into())
}
}

View file

@ -169,7 +169,7 @@ impl<'f, T: Address> Future for ConnectServiceResponse<'f, T> {
));
self.poll(cx)
} else {
error!("{}TCP connector: got unresolved address", self.tag);
error!("{}: TCP connector: got unresolved address", self.tag);
Poll::Ready(Err(ConnectError::Unresolved))
}
}

View file

@ -1,5 +1,9 @@
# Changes
## [0.3.16] - 2023-12-14
* Better io tags handling
## [0.3.15] - 2023-12-12
* Add io tags for logging

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.3.15"
version = "0.3.16"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -338,7 +338,7 @@ where
}
Err(RecvError::Stop) => {
log::trace!(
"{}Dispatcher is instructed to stop",
"{}: Dispatcher is instructed to stop",
slf.shared.io.tag()
);
slf.st = DispatcherState::Stop;
@ -351,7 +351,7 @@ where
}
Err(RecvError::Decoder(err)) => {
log::trace!(
"{}Decoder error, stopping dispatcher: {:?}",
"{}: Decoder error, stopping dispatcher: {:?}",
slf.shared.io.tag(),
err
);
@ -360,7 +360,7 @@ where
}
Err(RecvError::PeerGone(err)) => {
log::trace!(
"{}Peer is gone, stopping dispatcher: {:?}",
"{}: Peer is gone, stopping dispatcher: {:?}",
slf.shared.io.tag(),
err
);
@ -443,7 +443,7 @@ where
DispatcherState::Shutdown => {
return if slf.shared.service.poll_shutdown(cx).is_ready() {
log::trace!(
"{}Service shutdown is completed, stop",
"{}: Service shutdown is completed, stop",
slf.shared.io.tag()
);
@ -472,7 +472,7 @@ where
// check for errors
Poll::Ready(if let Some(err) = self.shared.error.take() {
log::trace!(
"{}Error occured, stopping dispatcher",
"{}: Error occured, stopping dispatcher",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
@ -493,7 +493,7 @@ where
// pause io read task
Poll::Pending => {
log::trace!(
"{}Service is not ready, register dispatch task",
"{}: Service is not ready, register dispatch task",
self.shared.io.tag()
);
@ -504,7 +504,7 @@ where
match ready!(self.shared.io.poll_read_pause(cx)) {
IoStatusUpdate::KeepAlive => {
log::trace!(
"{}Keep-alive error, stopping dispatcher during pause",
"{}: Keep-alive error, stopping dispatcher during pause",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
@ -512,7 +512,7 @@ where
}
IoStatusUpdate::Stop => {
log::trace!(
"{}Dispatcher is instructed to stop during pause",
"{}: Dispatcher is instructed to stop during pause",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
@ -520,7 +520,7 @@ where
}
IoStatusUpdate::PeerGone(err) => {
log::trace!(
"{}Peer is gone during pause, stopping dispatcher: {:?}",
"{}: Peer is gone during pause, stopping dispatcher: {:?}",
self.shared.io.tag(),
err
);
@ -533,7 +533,7 @@ where
// handle service readiness error
Poll::Ready(Err(err)) => {
log::trace!(
"{}Service readiness check failed, stopping",
"{}: Service readiness check failed, stopping",
self.shared.io.tag()
);
self.st = DispatcherState::Stop;
@ -558,7 +558,7 @@ where
&& !self.flags.contains(Flags::KA_TIMEOUT)
{
log::debug!(
"{}Start keep-alive timer {:?}",
"{}: Start keep-alive timer {:?}",
self.shared.io.tag(),
self.cfg.keepalive_timeout_secs()
);
@ -599,7 +599,7 @@ where
if max.is_zero() || !self.read_max_timeout.is_zero() {
log::trace!(
"{}Frame read rate {:?}, extend timer",
"{}: Frame read rate {:?}, extend timer",
self.shared.io.tag(),
total
);
@ -607,7 +607,7 @@ where
return Ok(());
}
log::trace!(
"{}Max payload timeout has been reached",
"{}: Max payload timeout has been reached",
self.shared.io.tag()
);
}
@ -616,7 +616,7 @@ where
}
log::trace!(
"{}Keep-alive error, stopping dispatcher",
"{}: Keep-alive error, stopping dispatcher",
self.shared.io.tag()
);
Err(DispatchItem::KeepAliveTimeout)

View file

@ -71,6 +71,8 @@ pub(crate) struct IoState {
pub(super) on_disconnect: Cell<Option<Box<Vec<LocalWaker>>>>,
}
const DEFAULT_TAG: &str = "IO";
impl IoState {
pub(super) fn insert_flags(&self, f: Flags) {
let mut flags = self.flags.get();
@ -90,7 +92,7 @@ impl IoState {
}
pub(super) fn notify_timeout(&self) {
log::trace!("{}Timeout, notify dispatcher", self.tag.get());
log::trace!("{}: Timeout, notify dispatcher", self.tag.get());
let mut flags = self.flags.get();
if !flags.contains(Flags::DSP_TIMEOUT) {
@ -130,7 +132,7 @@ impl IoState {
.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS)
{
log::trace!(
"{}Initiate io shutdown {:?}",
"{}: Initiate io shutdown {:?}",
self.tag.get(),
self.flags.get()
);
@ -203,7 +205,7 @@ impl Io {
handle: Cell::new(None),
timeout: Cell::new(TimerHandle::default()),
on_disconnect: Cell::new(None),
tag: Cell::new(""),
tag: Cell::new(DEFAULT_TAG),
});
let filter = Box::new(Base::new(IoRef(inner.clone())));
@ -260,7 +262,7 @@ impl<F> Io<F> {
handle: Cell::new(None),
timeout: Cell::new(TimerHandle::default()),
on_disconnect: Cell::new(None),
tag: Cell::new(""),
tag: Cell::new(DEFAULT_TAG),
});
let state = mem::replace(&mut self.0, IoRef(inner));
@ -552,7 +554,7 @@ impl<F> Io<F> {
Poll::Pending | Poll::Ready(Ok(Some(()))) => {
if log::log_enabled!(log::Level::Debug) && decoded.remains != 0 {
log::debug!(
"{}Not enough data to decode next frame",
"{}: Not enough data to decode next frame",
self.tag()
);
}
@ -702,7 +704,7 @@ impl<F> Drop for Io<F> {
if !self.0.flags().contains(Flags::IO_STOPPED) && self.1.is_set() {
log::trace!(
"{}Io is dropped, force stopping io streams {:?}",
"{}: Io is dropped, force stopping io streams {:?}",
self.tag(),
self.0.flags()
);

View file

@ -62,7 +62,7 @@ impl IoRef {
/// Dispatcher does not wait for uncompleted responses. Io stream get terminated
/// without any graceful period.
pub fn force_close(&self) {
log::trace!("{}Force close io stream object", self.tag());
log::trace!("{}: Force close io stream object", self.tag());
self.0.insert_flags(
Flags::DSP_STOP
| Flags::IO_STOPPED
@ -84,7 +84,7 @@ impl IoRef {
.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS)
{
log::trace!(
"{}Initiate io shutdown {:?}",
"{}: Initiate io shutdown {:?}",
self.tag(),
self.0.flags.get()
);
@ -122,7 +122,7 @@ impl IoRef {
.map_or_else(
|err| {
log::trace!(
"{}Got io error while encoding, error: {:?}",
"{}: Got io error while encoding, error: {:?}",
self.tag(),
err
);
@ -132,7 +132,7 @@ impl IoRef {
|item| item,
)
} else {
log::trace!("{}Io is closed/closing, skip frame encoding", self.tag());
log::trace!("{}: Io is closed/closing, skip frame encoding", self.tag());
Ok(())
}
}
@ -248,12 +248,12 @@ impl IoRef {
if cur_hnd.is_set() {
let hnd = timer::update(cur_hnd, timeout, self);
if hnd != cur_hnd {
log::debug!("{}Update timer {:?}", self.tag(), timeout);
log::debug!("{}: Update timer {:?}", self.tag(), timeout);
self.0.timeout.set(hnd);
}
hnd
} else {
log::debug!("{}Start timer {:?}", self.tag(), timeout);
log::debug!("{}: Start timer {:?}", self.tag(), timeout);
let hnd = timer::register(timeout, self);
self.0.timeout.set(hnd);
hnd
@ -272,7 +272,7 @@ impl IoRef {
pub fn stop_timer(&self) {
let hnd = self.0.timeout.get();
if hnd.is_set() {
log::debug!("{}Stop timer", self.tag());
log::debug!("{}: Stop timer", self.tag());
self.0.timeout.set(timer::TimerHandle::ZERO);
timer::unregister(hnd, self)
}

View file

@ -46,7 +46,7 @@ impl ReadContext {
// dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
log::trace!(
"{}Io read buffer is too large {}, enable read back-pressure",
"{}: Io read buffer is too large {}, enable read back-pressure",
self.0.tag(),
total
);
@ -62,7 +62,7 @@ impl ReadContext {
inner.read_task.wake();
}
}
log::trace!("{}New {} bytes available, wakeup dispatcher", self.0.tag(), nbytes);
log::trace!("{}: New {} bytes available, wakeup dispatcher", self.0.tag(), nbytes);
inner.dispatch_task.wake();
} else {
if nbytes >= hw {

View file

@ -1,5 +1,9 @@
# Changes
## [0.7.15] - 2023-12-14
* Better io tags handling
## [0.7.14] - 2023-12-12
* Add io tag support for server

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.7.14"
version = "0.7.15"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -49,7 +49,7 @@ async-std = ["ntex-rt/async-std", "ntex-async-std", "ntex-connect/async-std"]
[dependencies]
ntex-codec = "0.6.2"
ntex-connect = "0.3.3"
ntex-connect = "0.3.4"
ntex-http = "0.1.11"
ntex-router = "0.5.2"
ntex-service = "1.2.7"
@ -58,7 +58,7 @@ ntex-util = "0.3.4"
ntex-bytes = "0.1.21"
ntex-h2 = "0.4.4"
ntex-rt = "0.4.11"
ntex-io = "0.3.15"
ntex-io = "0.3.16"
ntex-tls = "0.3.3"
ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.1", optional = true }

View file

@ -250,7 +250,8 @@ where
});
if result.is_err() {
log::error!(
"Expect handler returned error: {:?}",
"{}: Expect handler returned error: {:?}",
this.inner.io.tag(),
result.err().unwrap()
);
*this.st = State::Stop;
@ -385,7 +386,11 @@ where
let io = this.inner.io.take();
let req = req.take().unwrap();
log::trace!("switching to upgrade service for {:?}", req);
log::trace!(
"{}: Switching to upgrade service for {:?}",
this.inner.io.tag(),
req
);
// Handle UPGRADE request
let config = this.inner.config.clone();
@ -494,7 +499,7 @@ where
cx: &mut Context<'_>,
call_state: &mut std::pin::Pin<&mut CallState<S, X>>,
) -> Poll<State<B>> {
log::trace!("Trying to read http message");
log::trace!("{}: Trying to read http message", self.io.tag());
loop {
let result = match self.io.poll_recv_decode(&self.codec, cx) {
@ -515,7 +520,12 @@ where
// decode incoming bytes stream
return match result {
Ok((mut req, pl)) => {
log::trace!("Http message is received: {:?} and payload {:?}", req, pl);
log::trace!(
"{}: Http message is received: {:?} and payload {:?}",
self.io.tag(),
req,
pl
);
// configure request payload
let upgrade = match pl {
@ -541,7 +551,7 @@ where
if upgrade {
// Handle UPGRADE request
log::trace!("Prepare io for upgrade handler");
log::trace!("{}: Prepare io for upgrade handler", self.io.tag(),);
Poll::Ready(State::Upgrade(Some(req)))
} else {
if req.upgrade() {
@ -563,7 +573,7 @@ where
}
Err(RecvError::WriteBackpressure) => {
if let Err(err) = ready!(self.io.poll_flush(cx, false)) {
log::trace!("Peer is gone with {:?}", err);
log::trace!("{}: Peer is gone with {:?}", self.io.tag(), err);
self.error = Some(DispatchError::PeerGone(Some(err)));
Poll::Ready(State::Stop)
} else {
@ -572,24 +582,24 @@ where
}
Err(RecvError::Decoder(err)) => {
// Malformed requests, respond with 400
log::trace!("Malformed request: {:?}", err);
log::trace!("{}: Malformed request: {:?}", self.io.tag(), err);
let (res, body) = Response::BadRequest().finish().into_parts();
self.error = Some(DispatchError::Parse(err));
Poll::Ready(self.send_response(res, body.into_body()))
}
Err(RecvError::PeerGone(err)) => {
log::trace!("Peer is gone with {:?}", err);
log::trace!("{}: Peer is gone with {:?}", self.io.tag(), err);
self.error = Some(DispatchError::PeerGone(err));
Poll::Ready(State::Stop)
}
Err(RecvError::Stop) => {
log::trace!("Dispatcher is instructed to stop");
log::trace!("{}: Dispatcher is instructed to stop", self.io.tag());
Poll::Ready(State::Stop)
}
Err(RecvError::KeepAlive) => {
if self.flags.contains(Flags::READ_HDRS_TIMEOUT) {
if let Err(err) = self.handle_timeout() {
log::trace!("Slow request timeout");
log::trace!("{}: Slow request timeout", self.io.tag());
let (req, body) =
Response::RequestTimeout().finish().into_parts();
let _ = self.send_response(req, body.into_body());
@ -598,7 +608,10 @@ where
continue;
}
} else {
log::trace!("Keep-alive timeout, close connection");
log::trace!(
"{}: Keep-alive timeout, close connection",
self.io.tag()
);
}
Poll::Ready(State::Stop)
}
@ -607,7 +620,12 @@ where
}
fn send_response(&mut self, msg: Response<()>, body: ResponseBody<B>) -> State<B> {
trace!("Sending response: {:?} body: {:?}", msg, body.size());
trace!(
"{}: Sending response: {:?} body: {:?}",
self.io.tag(),
msg,
body.size()
);
// we dont need to process responses if socket is disconnected
// but we still want to handle requests with app service
// so we skip response processing for droppped connection
@ -649,7 +667,7 @@ where
) -> Option<State<B>> {
match item {
Some(Ok(item)) => {
trace!("Got response chunk: {:?}", item.len());
trace!("{}: Got response chunk: {:?}", self.io.tag(), item.len());
match self.io.encode(Message::Chunk(Some(item)), &self.codec) {
Ok(_) => None,
Err(err) => {
@ -659,7 +677,7 @@ where
}
}
None => {
trace!("Response payload eof {:?}", self.flags);
trace!("{}: Response payload eof {:?}", self.io.tag(), self.flags);
if let Err(err) = self.io.encode(Message::Chunk(None), &self.codec) {
self.error = Some(DispatchError::Encode(err));
Some(State::Stop)
@ -672,7 +690,11 @@ where
}
}
Some(Err(e)) => {
trace!("Error during response body poll: {:?}", e);
trace!(
"{}: Error during response body poll: {:?}",
self.io.tag(),
e
);
self.error = Some(DispatchError::ResponsePayload(e));
Some(State::Stop)
}
@ -859,7 +881,11 @@ where
// start timer for next period
if cfg.max_timeout.is_zero() || !self.read_max_timeout.is_zero() {
log::trace!("Bytes read rate {:?}, extend timer", total);
log::trace!(
"{}: Bytes read rate {:?}, extend timer",
self.io.tag(),
total
);
self.io.start_timer_secs(cfg.timeout);
return Ok(());
}
@ -900,7 +926,11 @@ where
// no new data, start keep-alive timer
if self.codec.keepalive() {
if !self.flags.contains(Flags::READ_KA_TIMEOUT) {
log::debug!("Start keep-alive timer {:?}", self.config.keep_alive);
log::debug!(
"{}: Start keep-alive timer {:?}",
self.io.tag(),
self.config.keep_alive
);
self.flags.insert(Flags::READ_KA_TIMEOUT);
if self.config.keep_alive_enabled() {
self.io.start_timer_secs(self.config.keep_alive);
@ -911,7 +941,11 @@ where
return Some(State::Stop);
}
} else if let Some(ref cfg) = self.config.headers_read_rate {
log::debug!("Start headers read timer {:?}", cfg.timeout);
log::debug!(
"{}: Start headers read timer {:?}",
self.io.tag(),
cfg.timeout
);
// we got new data but not enough to parse single frame
// start read timer