mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 04:47:39 +03:00
Compio net subsystem integration (#406)
This commit is contained in:
parent
75c892742c
commit
20011a9120
7 changed files with 336 additions and 0 deletions
|
@ -34,6 +34,7 @@ ntex-tls = { path = "ntex-tls" }
|
||||||
ntex-macros = { path = "ntex-macros" }
|
ntex-macros = { path = "ntex-macros" }
|
||||||
ntex-util = { path = "ntex-util" }
|
ntex-util = { path = "ntex-util" }
|
||||||
|
|
||||||
|
ntex-compio = { path = "ntex-compio" }
|
||||||
ntex-glommio = { path = "ntex-glommio" }
|
ntex-glommio = { path = "ntex-glommio" }
|
||||||
ntex-tokio = { path = "ntex-tokio" }
|
ntex-tokio = { path = "ntex-tokio" }
|
||||||
ntex-async-std = { path = "ntex-async-std" }
|
ntex-async-std = { path = "ntex-async-std" }
|
||||||
|
|
5
ntex-compio/CHANGES.md
Normal file
5
ntex-compio/CHANGES.md
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.0] - 2024-08-29
|
||||||
|
|
||||||
|
* Initial release
|
25
ntex-compio/Cargo.toml
Normal file
25
ntex-compio/Cargo.toml
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
[package]
|
||||||
|
name = "ntex-compio"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["ntex contributors <team@ntex.rs>"]
|
||||||
|
description = "compio runtime intergration for ntex framework"
|
||||||
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
homepage = "https://ntex.rs"
|
||||||
|
repository = "https://github.com/ntex-rs/ntex.git"
|
||||||
|
documentation = "https://docs.rs/ntex-compio/"
|
||||||
|
categories = ["network-programming", "asynchronous"]
|
||||||
|
license = "MIT OR Apache-2.0"
|
||||||
|
edition = "2021"
|
||||||
|
rust-version = "1.75"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
name = "ntex_compio"
|
||||||
|
path = "src/lib.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
ntex-bytes = "0.1"
|
||||||
|
ntex-io = "2.3"
|
||||||
|
ntex-util = "2"
|
||||||
|
log = "0.4"
|
||||||
|
compio-net = "0.4.1"
|
||||||
|
compio = { version = "0.11.0", features = ["macros", "io", "runtime"] }
|
1
ntex-compio/LICENSE-APACHE
Symbolic link
1
ntex-compio/LICENSE-APACHE
Symbolic link
|
@ -0,0 +1 @@
|
||||||
|
../LICENSE-APACHE
|
1
ntex-compio/LICENSE-MIT
Symbolic link
1
ntex-compio/LICENSE-MIT
Symbolic link
|
@ -0,0 +1 @@
|
||||||
|
../LICENSE-MIT
|
242
ntex-compio/src/io.rs
Normal file
242
ntex-compio/src/io.rs
Normal file
|
@ -0,0 +1,242 @@
|
||||||
|
use std::{any, io};
|
||||||
|
|
||||||
|
use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
|
||||||
|
use compio::io::{AsyncRead, AsyncWrite};
|
||||||
|
use compio::net::TcpStream;
|
||||||
|
use ntex_bytes::{Buf, BufMut, BytesVec};
|
||||||
|
use ntex_io::{
|
||||||
|
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
|
||||||
|
};
|
||||||
|
use ntex_util::{future::select, future::Either, time::sleep};
|
||||||
|
|
||||||
|
impl IoStream for crate::TcpStream {
|
||||||
|
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
|
||||||
|
let mut io = self.0.clone();
|
||||||
|
compio::runtime::spawn(async move {
|
||||||
|
run(&mut io, &read, write).await;
|
||||||
|
|
||||||
|
let res = io.close().await;
|
||||||
|
log::debug!("{} Stream is closed, {:?}", read.tag(), res);
|
||||||
|
})
|
||||||
|
.detach();
|
||||||
|
|
||||||
|
Some(Box::new(HandleWrapper(self.0)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
impl IoStream for crate::UnixStream {
|
||||||
|
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
|
||||||
|
let mut io = self.0;
|
||||||
|
compio::runtime::spawn(async move {
|
||||||
|
run(&mut io, &read, write).await;
|
||||||
|
|
||||||
|
let res = io.close().await;
|
||||||
|
log::debug!("{} Stream is closed, {:?}", read.tag(), res);
|
||||||
|
})
|
||||||
|
.detach();
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct HandleWrapper(TcpStream);
|
||||||
|
|
||||||
|
impl Handle for HandleWrapper {
|
||||||
|
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
|
||||||
|
if id == any::TypeId::of::<types::PeerAddr>() {
|
||||||
|
if let Ok(addr) = self.0.peer_addr() {
|
||||||
|
return Some(Box::new(types::PeerAddr(addr)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CompioBuf(BytesVec);
|
||||||
|
|
||||||
|
unsafe impl IoBuf for CompioBuf {
|
||||||
|
#[inline]
|
||||||
|
fn as_buf_ptr(&self) -> *const u8 {
|
||||||
|
self.0.chunk().as_ptr()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn buf_len(&self) -> usize {
|
||||||
|
self.0.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn buf_capacity(&self) -> usize {
|
||||||
|
self.0.remaining_mut()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl IoBufMut for CompioBuf {
|
||||||
|
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
|
||||||
|
self.0.chunk_mut().as_mut_ptr()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SetBufInit for CompioBuf {
|
||||||
|
unsafe fn set_buf_init(&mut self, len: usize) {
|
||||||
|
self.0.set_len(len + self.0.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run<T: AsyncRead + AsyncWrite + Clone + 'static>(
|
||||||
|
io: &mut T,
|
||||||
|
read: &ReadContext,
|
||||||
|
write: WriteContext,
|
||||||
|
) {
|
||||||
|
let mut wr_io = io.clone();
|
||||||
|
let wr_task = compio::runtime::spawn(async move {
|
||||||
|
write_task(&mut wr_io, &write).await;
|
||||||
|
log::debug!("{} Write task is stopped", write.tag());
|
||||||
|
});
|
||||||
|
|
||||||
|
read_task(io, read).await;
|
||||||
|
log::debug!("{} Read task is stopped", read.tag());
|
||||||
|
|
||||||
|
if !wr_task.is_finished() {
|
||||||
|
let _ = wr_task.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read io task
|
||||||
|
async fn read_task<T: AsyncRead>(io: &mut T, state: &ReadContext) {
|
||||||
|
loop {
|
||||||
|
match state.ready().await {
|
||||||
|
ReadStatus::Ready => {
|
||||||
|
let result = state
|
||||||
|
.with_buf_async(|buf| async {
|
||||||
|
let BufResult(result, buf) =
|
||||||
|
match select(io.read(CompioBuf(buf)), state.wait_for_close())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Either::Left(res) => res,
|
||||||
|
Either::Right(_) => return (Default::default(), Ok(1)),
|
||||||
|
};
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
log::trace!(
|
||||||
|
"{}: Tcp stream is disconnected",
|
||||||
|
state.tag()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
(buf.0, Ok(n))
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::trace!(
|
||||||
|
"{}: Read task failed on io {:?}",
|
||||||
|
state.tag(),
|
||||||
|
err
|
||||||
|
);
|
||||||
|
(buf.0, Err(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if result.is_ready() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReadStatus::Terminate => {
|
||||||
|
log::trace!("{}: Read task is instructed to shutdown", state.tag());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write io task
|
||||||
|
async fn write_task<T: AsyncWrite>(mut io: T, state: &WriteContext) {
|
||||||
|
let mut delay = None;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let result = if let Some(ref mut sleep) = delay {
|
||||||
|
let result = match select(sleep, state.ready()).await {
|
||||||
|
Either::Left(_) => {
|
||||||
|
state.close(Some(io::Error::new(
|
||||||
|
io::ErrorKind::TimedOut,
|
||||||
|
"Operation timedout",
|
||||||
|
)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Either::Right(res) => res,
|
||||||
|
};
|
||||||
|
delay = None;
|
||||||
|
result
|
||||||
|
} else {
|
||||||
|
state.ready().await
|
||||||
|
};
|
||||||
|
|
||||||
|
match result {
|
||||||
|
WriteStatus::Ready => {
|
||||||
|
// write io stream
|
||||||
|
match write(&mut io, state).await {
|
||||||
|
Ok(()) => continue,
|
||||||
|
Err(e) => {
|
||||||
|
state.close(Some(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WriteStatus::Timeout(time) => {
|
||||||
|
log::trace!("{}: Initiate timeout delay for {:?}", state.tag(), time);
|
||||||
|
delay = Some(sleep(time));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
WriteStatus::Shutdown(time) => {
|
||||||
|
log::trace!("{}: Write task is instructed to shutdown", state.tag());
|
||||||
|
|
||||||
|
let fut = async {
|
||||||
|
write(&mut io, state).await?;
|
||||||
|
io.flush().await?;
|
||||||
|
io.shutdown().await?;
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
match select(sleep(time), fut).await {
|
||||||
|
Either::Left(_) => state.close(None),
|
||||||
|
Either::Right(res) => state.close(res.err()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WriteStatus::Terminate => {
|
||||||
|
log::trace!("{}: Write task is instructed to terminate", state.tag());
|
||||||
|
state.close(io.shutdown().await.err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// write to io stream
|
||||||
|
async fn write<T: AsyncWrite>(io: &mut T, state: &WriteContext) -> io::Result<()> {
|
||||||
|
state
|
||||||
|
.with_buf_async(|buf| async {
|
||||||
|
let mut buf = CompioBuf(buf);
|
||||||
|
loop {
|
||||||
|
let BufResult(result, buf1) = io.write(buf).await;
|
||||||
|
buf = buf1;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(size) => {
|
||||||
|
if buf.0.len() == size {
|
||||||
|
return io.flush().await;
|
||||||
|
}
|
||||||
|
if size == 0 {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::WriteZero,
|
||||||
|
"failed to write frame to transport",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
buf.0.advance(size);
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
61
ntex-compio/src/lib.rs
Normal file
61
ntex-compio/src/lib.rs
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
use std::{io::Result, net, net::SocketAddr};
|
||||||
|
|
||||||
|
use ntex_bytes::PoolRef;
|
||||||
|
use ntex_io::Io;
|
||||||
|
|
||||||
|
mod io;
|
||||||
|
|
||||||
|
/// Tcp stream wrapper for compio TcpStream
|
||||||
|
struct TcpStream(compio::net::TcpStream);
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
/// Tcp stream wrapper for compio UnixStream
|
||||||
|
struct UnixStream(compio::net::UnixStream);
|
||||||
|
|
||||||
|
/// Opens a TCP connection to a remote host.
|
||||||
|
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
|
||||||
|
let sock = compio::net::TcpStream::connect(addr).await?;
|
||||||
|
Ok(Io::new(TcpStream(sock)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Opens a TCP connection to a remote host and use specified memory pool.
|
||||||
|
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
|
||||||
|
let sock = compio::net::TcpStream::connect(addr).await?;
|
||||||
|
Ok(Io::with_memory_pool(TcpStream(sock), pool))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
/// Opens a unix stream connection.
|
||||||
|
pub async fn unix_connect<'a, P>(addr: P) -> Result<Io>
|
||||||
|
where
|
||||||
|
P: AsRef<std::path::Path> + 'a,
|
||||||
|
{
|
||||||
|
let sock = compio::net::UnixStream::connect(addr).await?;
|
||||||
|
Ok(Io::new(UnixStream(sock)))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
/// Opens a unix stream connection and specified memory pool.
|
||||||
|
pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result<Io>
|
||||||
|
where
|
||||||
|
P: AsRef<std::path::Path> + 'a,
|
||||||
|
{
|
||||||
|
let sock = compio::net::UnixStream::connect(addr).await?;
|
||||||
|
Ok(Io::with_memory_pool(UnixStream(sock), pool))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert std TcpStream to tokio's TcpStream
|
||||||
|
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
|
||||||
|
stream.set_nodelay(true)?;
|
||||||
|
Ok(Io::new(TcpStream(compio::net::TcpStream::from_std(
|
||||||
|
stream,
|
||||||
|
)?)))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
/// Convert std UnixStream to tokio's UnixStream
|
||||||
|
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
|
||||||
|
Ok(Io::new(UnixStream(compio::net::UnixStream::from_std(
|
||||||
|
stream,
|
||||||
|
)?)))
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue