mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 21:37:58 +03:00
Tune compio integration (#412)
This commit is contained in:
parent
432791356c
commit
8a3a8f1df8
6 changed files with 38 additions and 25 deletions
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.1] - 2024-09-05
|
||||||
|
|
||||||
|
* Tune write task
|
||||||
|
|
||||||
## [0.1.0] - 2024-08-29
|
## [0.1.0] - 2024-08-29
|
||||||
|
|
||||||
* Initial release
|
* Initial release
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-compio"
|
name = "ntex-compio"
|
||||||
version = "0.1.0"
|
version = "0.1.1"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "compio runtime intergration for ntex framework"
|
description = "compio runtime intergration for ntex framework"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
|
|
@ -15,8 +15,10 @@ impl IoStream for crate::TcpStream {
|
||||||
compio::runtime::spawn(async move {
|
compio::runtime::spawn(async move {
|
||||||
run(&mut io, &read, write).await;
|
run(&mut io, &read, write).await;
|
||||||
|
|
||||||
let res = io.close().await;
|
match io.close().await {
|
||||||
log::debug!("{} Stream is closed, {:?}", read.tag(), res);
|
Ok(_) => log::debug!("{} Stream is closed", read.tag()),
|
||||||
|
Err(e) => log::error!("{} Stream is closed, {:?}", read.tag(), e),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.detach();
|
.detach();
|
||||||
|
|
||||||
|
@ -31,8 +33,10 @@ impl IoStream for crate::UnixStream {
|
||||||
compio::runtime::spawn(async move {
|
compio::runtime::spawn(async move {
|
||||||
run(&mut io, &read, write).await;
|
run(&mut io, &read, write).await;
|
||||||
|
|
||||||
let res = io.close().await;
|
match io.close().await {
|
||||||
log::debug!("{} Stream is closed, {:?}", read.tag(), res);
|
Ok(_) => log::debug!("{} Unix stream is closed", read.tag()),
|
||||||
|
Err(e) => log::error!("{} Unix stream is closed, {:?}", read.tag(), e),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.detach();
|
.detach();
|
||||||
|
|
||||||
|
@ -221,21 +225,23 @@ async fn write<T: AsyncWrite>(io: &mut T, state: &WriteContext) -> io::Result<()
|
||||||
let BufResult(result, buf1) = io.write(buf).await;
|
let BufResult(result, buf1) = io.write(buf).await;
|
||||||
buf = buf1;
|
buf = buf1;
|
||||||
|
|
||||||
match result {
|
return match result {
|
||||||
|
Ok(0) => Err(io::Error::new(
|
||||||
|
io::ErrorKind::WriteZero,
|
||||||
|
"failed to write frame to transport",
|
||||||
|
)),
|
||||||
Ok(size) => {
|
Ok(size) => {
|
||||||
if buf.0.len() == size {
|
if buf.0.len() == size {
|
||||||
return io.flush().await;
|
// return io.flush().await;
|
||||||
|
state.memory_pool().release_write_buf(buf.0);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
buf.0.advance(size);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
if size == 0 {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::WriteZero,
|
|
||||||
"failed to write frame to transport",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
buf.0.advance(size);
|
|
||||||
}
|
}
|
||||||
Err(e) => return Err(e),
|
Err(e) => Err(e),
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.3.1] - 2024-09-05
|
||||||
|
|
||||||
|
* Tune async io tasks support
|
||||||
|
|
||||||
## [2.3.0] - 2024-08-28
|
## [2.3.0] - 2024-08-28
|
||||||
|
|
||||||
* Extend io task contexts, for "compio" runtime compatibility
|
* Extend io task contexts, for "compio" runtime compatibility
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ntex-io"
|
name = "ntex-io"
|
||||||
version = "2.3.0"
|
version = "2.3.1"
|
||||||
authors = ["ntex contributors <team@ntex.rs>"]
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
description = "Utilities for encoding and decoding frames"
|
description = "Utilities for encoding and decoding frames"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use std::{future::poll_fn, future::Future, io, task::Context, task::Poll};
|
use std::{future::poll_fn, future::Future, io, task::Context, task::Poll};
|
||||||
|
|
||||||
use ntex_bytes::{BufMut, BytesVec, PoolRef};
|
use ntex_bytes::{BufMut, BytesVec, PoolRef};
|
||||||
use ntex_util::task;
|
|
||||||
|
|
||||||
use crate::{Flags, IoRef, ReadStatus, WriteStatus};
|
use crate::{Flags, IoRef, ReadStatus, WriteStatus};
|
||||||
|
|
||||||
|
@ -157,11 +156,11 @@ impl ReadContext {
|
||||||
{
|
{
|
||||||
let inner = &self.0 .0;
|
let inner = &self.0 .0;
|
||||||
|
|
||||||
// we already pushed new data to read buffer,
|
// // we already pushed new data to read buffer,
|
||||||
// we have to wait for dispatcher to read data from buffer
|
// // we have to wait for dispatcher to read data from buffer
|
||||||
if inner.flags.get().is_read_buf_ready() {
|
// if inner.flags.get().is_read_buf_ready() {
|
||||||
task::yield_to().await;
|
// ntex_util::task::yield_to().await;
|
||||||
}
|
// }
|
||||||
|
|
||||||
let mut buf = if inner.flags.get().is_read_buf_ready() {
|
let mut buf = if inner.flags.get().is_read_buf_ready() {
|
||||||
// read buffer is still not read by dispatcher
|
// read buffer is still not read by dispatcher
|
||||||
|
@ -175,9 +174,9 @@ impl ReadContext {
|
||||||
};
|
};
|
||||||
|
|
||||||
// make sure we've got room
|
// make sure we've got room
|
||||||
let remaining = buf.remaining_mut();
|
|
||||||
let (hw, lw) = self.0.memory_pool().read_params().unpack();
|
let (hw, lw) = self.0.memory_pool().read_params().unpack();
|
||||||
if remaining < lw {
|
let remaining = buf.remaining_mut();
|
||||||
|
if remaining <= lw {
|
||||||
buf.reserve(hw - remaining);
|
buf.reserve(hw - remaining);
|
||||||
}
|
}
|
||||||
let total = buf.len();
|
let total = buf.len();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue