Prepare io release

This commit is contained in:
Nikolay Kim 2024-01-09 21:10:26 +06:00
parent c864b10e62
commit 587b248b57
13 changed files with 121 additions and 56 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.0] - 2024-01-09
* Log io tags
## [0.4.0-b.0] - 2024-01-07
* Use "async fn" in trait for Service definition

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-tokio"
version = "0.4.0-b.0"
version = "0.4.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "tokio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -17,8 +17,8 @@ path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.21"
ntex-io = "1.0.0-b.1"
ntex-util = "1.0.0-b.1"
ntex-io = "1.0.0"
ntex-util = "1.0.0"
log = "0.4"
pin-project-lite = "0.2"
tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] }

View file

@ -69,7 +69,10 @@ impl Future for ReadTask {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("tcp stream is disconnected");
log::trace!(
"{}: Tcp stream is disconnected",
this.state.tag()
);
Poll::Ready(Ok(()))
} else if buf.len() < hw {
continue;
@ -78,7 +81,11 @@ impl Future for ReadTask {
}
}
Poll::Ready(Err(err)) => {
log::trace!("read task failed on io {:?}", err);
log::trace!(
"{}: Read task failed on io {:?}",
this.state.tag(),
err
);
Poll::Ready(Err(err))
}
};
@ -86,7 +93,7 @@ impl Future for ReadTask {
})
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
log::trace!("{}: Read task is instructed to shutdown", this.state.tag());
Poll::Ready(())
}
}
@ -148,7 +155,8 @@ impl Future for WriteTask {
match ready!(this.state.with_buf(|buf| flush_io(
&mut *this.io.borrow_mut(),
buf,
cx
cx,
&this.state
))) {
Ok(()) => Poll::Pending,
Err(e) => {
@ -158,14 +166,21 @@ impl Future for WriteTask {
}
}
WriteStatus::Timeout(time) => {
log::trace!("initiate timeout delay for {:?}", time);
log::trace!(
"{}: Initiate timeout delay for {:?}",
this.state.tag(),
time
);
if delay.is_none() {
*delay = Some(sleep(time));
}
self.poll(cx)
}
WriteStatus::Shutdown(time) => {
log::trace!("write task is instructed to shutdown");
log::trace!(
"{}: Write task is instructed to shutdown",
this.state.tag()
);
let timeout = if let Some(delay) = delay.take() {
delay
@ -177,7 +192,10 @@ impl Future for WriteTask {
self.poll(cx)
}
WriteStatus::Terminate => {
log::trace!("write task is instructed to terminate");
log::trace!(
"{}: Write task is instructed to terminate",
this.state.tag()
);
if !matches!(
this.io.borrow().linger(),
@ -201,14 +219,17 @@ impl Future for WriteTask {
Shutdown::None => {
// flush write buffer
let mut io = this.io.borrow_mut();
match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) {
match this
.state
.with_buf(|buf| flush_io(&mut *io, buf, cx, &this.state))
{
Poll::Ready(Ok(())) => {
*st = Shutdown::Flushed;
continue;
}
Poll::Ready(Err(err)) => {
log::trace!(
"write task is closed with err during flush, {:?}",
"{}: Write task is closed with err during flush, {:?}", this.state.tag(),
err
);
this.state.close(Some(err));
@ -226,7 +247,8 @@ impl Future for WriteTask {
}
Poll::Ready(Err(e)) => {
log::trace!(
"write task is closed with err during shutdown"
"{}: Write task is closed with err during shutdown",
this.state.tag()
);
this.state.close(Some(e));
return Poll::Ready(());
@ -246,13 +268,16 @@ impl Future for WriteTask {
if read_buf.filled().is_empty() =>
{
this.state.close(None);
log::trace!("tokio write task is stopped");
log::trace!(
"{}: Tokio write task is stopped",
this.state.tag()
);
return Poll::Ready(());
}
Poll::Pending => {
*count += read_buf.filled().len() as u16;
if *count > 4096 {
log::trace!("tokio write task is stopped, too much input");
log::trace!("{}: Tokio write task is stopped, too much input", this.state.tag());
this.state.close(None);
return Poll::Ready(());
}
@ -268,7 +293,7 @@ impl Future for WriteTask {
if delay.poll_elapsed(cx).is_pending() {
return Poll::Pending;
}
log::trace!("write task is stopped after delay");
log::trace!("{}: Write task is stopped after delay", this.state.tag());
this.state.close(None);
return Poll::Ready(());
}
@ -282,19 +307,24 @@ pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
io: &mut T,
buf: &mut Option<BytesVec>,
cx: &mut Context<'_>,
st: &WriteContext,
) -> Poll<io::Result<()>> {
if let Some(buf) = buf {
let len = buf.len();
if len != 0 {
// log::trace!("flushing framed transport: {:?}", buf.len());
// log::trace!("{}: Flushing framed transport: {:?}", st.tag(), buf.len());
let mut written = 0;
let result = loop {
break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
log::trace!(
"{}: Disconnected during flush, written {}",
st.tag(),
written
);
Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
@ -315,12 +345,12 @@ pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
Poll::Pending
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
log::trace!("{}: Error during flush: {}", st.tag(), e);
Poll::Ready(Err(e))
}
};
};
// log::trace!("flushed {} bytes", written);
// log::trace!("{}: flushed {} bytes", st.tag(), written);
// flush
return if written > 0 {
@ -328,7 +358,7 @@ pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
log::trace!("{}: Error during flush: {}", st.tag(), e);
Poll::Ready(Err(e))
}
}
@ -476,7 +506,10 @@ mod unixstream {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("tokio unix stream is disconnected");
log::trace!(
"{}: Tokio unix stream is disconnected",
this.state.tag()
);
Poll::Ready(Ok(()))
} else if buf.len() < hw {
continue;
@ -485,14 +518,21 @@ mod unixstream {
}
}
Poll::Ready(Err(err)) => {
log::trace!("unix stream read task failed {:?}", err);
log::trace!(
"{}: Unix stream read task failed {:?}",
this.state.tag(),
err
);
Poll::Ready(Err(err))
}
};
}
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
log::trace!(
"{}: Read task is instructed to shutdown",
this.state.tag()
);
Poll::Ready(Ok(()))
}
}
@ -542,7 +582,8 @@ mod unixstream {
match ready!(this.state.with_buf(|buf| flush_io(
&mut *this.io.borrow_mut(),
buf,
cx
cx,
&this.state
))) {
Ok(()) => Poll::Pending,
Err(e) => {
@ -558,7 +599,10 @@ mod unixstream {
self.poll(cx)
}
Poll::Ready(WriteStatus::Shutdown(time)) => {
log::trace!("write task is instructed to shutdown");
log::trace!(
"{}: Write task is instructed to shutdown",
this.state.tag()
);
let timeout = if let Some(delay) = delay.take() {
delay
@ -570,7 +614,10 @@ mod unixstream {
self.poll(cx)
}
Poll::Ready(WriteStatus::Terminate) => {
log::trace!("write task is instructed to terminate");
log::trace!(
"{}: Write task is instructed to terminate",
this.state.tag()
);
let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx);
this.state.close(None);
@ -587,15 +634,16 @@ mod unixstream {
Shutdown::None => {
// flush write buffer
let mut io = this.io.borrow_mut();
match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx))
{
match this.state.with_buf(|buf| {
flush_io(&mut *io, buf, cx, &this.state)
}) {
Poll::Ready(Ok(())) => {
*st = Shutdown::Flushed;
continue;
}
Poll::Ready(Err(err)) => {
log::trace!(
"write task is closed with err during flush, {:?}",
"{}: Write task is closed with err during flush, {:?}", this.state.tag(),
err
);
this.state.close(Some(err));
@ -614,7 +662,7 @@ mod unixstream {
}
Poll::Ready(Err(e)) => {
log::trace!(
"write task is closed with err during shutdown"
"{}: Write task is closed with err during shutdown", this.state.tag()
);
this.state.close(Some(e));
return Poll::Ready(());
@ -634,14 +682,17 @@ mod unixstream {
if read_buf.filled().is_empty() =>
{
this.state.close(None);
log::trace!("write task is stopped");
log::trace!(
"{}: Write task is stopped",
this.state.tag()
);
return Poll::Ready(());
}
Poll::Pending => {
*count += read_buf.filled().len() as u16;
if *count > 4096 {
log::trace!(
"write task is stopped, too much input"
"{}: Write task is stopped, too much input", this.state.tag()
);
this.state.close(None);
return Poll::Ready(());
@ -658,7 +709,10 @@ mod unixstream {
if delay.poll_elapsed(cx).is_pending() {
return Poll::Pending;
}
log::trace!("write task is stopped after delay");
log::trace!(
"{}: Write task is stopped after delay",
this.state.tag()
);
this.state.close(None);
return Poll::Ready(());
}