diff --git a/ntex-compio/CHANGES.md b/ntex-compio/CHANGES.md index c6a373d4..46374448 100644 --- a/ntex-compio/CHANGES.md +++ b/ntex-compio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.1] - 2024-09-05 + +* Tune write task + ## [0.1.0] - 2024-08-29 * Initial release diff --git a/ntex-compio/Cargo.toml b/ntex-compio/Cargo.toml index a6ab12a7..533311a7 100644 --- a/ntex-compio/Cargo.toml +++ b/ntex-compio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-compio" -version = "0.1.0" +version = "0.1.1" authors = ["ntex contributors "] description = "compio runtime intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-compio/src/io.rs b/ntex-compio/src/io.rs index 8a940475..aaabbe7d 100644 --- a/ntex-compio/src/io.rs +++ b/ntex-compio/src/io.rs @@ -15,8 +15,10 @@ impl IoStream for crate::TcpStream { compio::runtime::spawn(async move { run(&mut io, &read, write).await; - let res = io.close().await; - log::debug!("{} Stream is closed, {:?}", read.tag(), res); + match io.close().await { + Ok(_) => log::debug!("{} Stream is closed", read.tag()), + Err(e) => log::error!("{} Stream is closed, {:?}", read.tag(), e), + } }) .detach(); @@ -31,8 +33,10 @@ impl IoStream for crate::UnixStream { compio::runtime::spawn(async move { run(&mut io, &read, write).await; - let res = io.close().await; - log::debug!("{} Stream is closed, {:?}", read.tag(), res); + match io.close().await { + Ok(_) => log::debug!("{} Unix stream is closed", read.tag()), + Err(e) => log::error!("{} Unix stream is closed, {:?}", read.tag(), e), + } }) .detach(); @@ -221,21 +225,23 @@ async fn write(io: &mut T, state: &WriteContext) -> io::Result<() let BufResult(result, buf1) = io.write(buf).await; buf = buf1; - match result { + return match result { + Ok(0) => Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + )), Ok(size) => { if buf.0.len() == size { - return io.flush().await; + // return io.flush().await; + state.memory_pool().release_write_buf(buf.0); + Ok(()) + } else { + buf.0.advance(size); + continue; } - if size == 0 { - return Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - )); - } - buf.0.advance(size); } - Err(e) => return Err(e), - } + Err(e) => Err(e), + }; } }) .await diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index b8490723..4f296504 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.3.1] - 2024-09-05 + +* Tune async io tasks support + ## [2.3.0] - 2024-08-28 * Extend io task contexts, for "compio" runtime compatibility diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index c66aca7f..be05d6b4 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.3.0" +version = "2.3.1" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 11c2df78..e8e25332 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -1,7 +1,6 @@ use std::{future::poll_fn, future::Future, io, task::Context, task::Poll}; use ntex_bytes::{BufMut, BytesVec, PoolRef}; -use ntex_util::task; use crate::{Flags, IoRef, ReadStatus, WriteStatus}; @@ -157,11 +156,11 @@ impl ReadContext { { let inner = &self.0 .0; - // we already pushed new data to read buffer, - // we have to wait for dispatcher to read data from buffer - if inner.flags.get().is_read_buf_ready() { - task::yield_to().await; - } + // // we already pushed new data to read buffer, + // // we have to wait for dispatcher to read data from buffer + // if inner.flags.get().is_read_buf_ready() { + // ntex_util::task::yield_to().await; + // } let mut buf = if inner.flags.get().is_read_buf_ready() { // read buffer is still not read by dispatcher @@ -175,9 +174,9 @@ impl ReadContext { }; // make sure we've got room - let remaining = buf.remaining_mut(); let (hw, lw) = self.0.memory_pool().read_params().unpack(); - if remaining < lw { + let remaining = buf.remaining_mut(); + if remaining <= lw { buf.reserve(hw - remaining); } let total = buf.len();