diff --git a/Cargo.toml b/Cargo.toml index 7897b095..fcacfd41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ ntex-tls = { path = "ntex-tls" } ntex-macros = { path = "ntex-macros" } ntex-util = { path = "ntex-util" } +ntex-compio = { path = "ntex-compio" } ntex-glommio = { path = "ntex-glommio" } ntex-tokio = { path = "ntex-tokio" } ntex-async-std = { path = "ntex-async-std" } diff --git a/ntex-compio/CHANGES.md b/ntex-compio/CHANGES.md new file mode 100644 index 00000000..c6a373d4 --- /dev/null +++ b/ntex-compio/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## [0.1.0] - 2024-08-29 + +* Initial release diff --git a/ntex-compio/Cargo.toml b/ntex-compio/Cargo.toml new file mode 100644 index 00000000..a6ab12a7 --- /dev/null +++ b/ntex-compio/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ntex-compio" +version = "0.1.0" +authors = ["ntex contributors "] +description = "compio runtime intergration for ntex framework" +keywords = ["network", "framework", "async", "futures"] +homepage = "https://ntex.rs" +repository = "https://github.com/ntex-rs/ntex.git" +documentation = "https://docs.rs/ntex-compio/" +categories = ["network-programming", "asynchronous"] +license = "MIT OR Apache-2.0" +edition = "2021" +rust-version = "1.75" + +[lib] +name = "ntex_compio" +path = "src/lib.rs" + +[dependencies] +ntex-bytes = "0.1" +ntex-io = "2.3" +ntex-util = "2" +log = "0.4" +compio-net = "0.4.1" +compio = { version = "0.11.0", features = ["macros", "io", "runtime"] } diff --git a/ntex-compio/LICENSE-APACHE b/ntex-compio/LICENSE-APACHE new file mode 120000 index 00000000..965b606f --- /dev/null +++ b/ntex-compio/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/ntex-compio/LICENSE-MIT b/ntex-compio/LICENSE-MIT new file mode 120000 index 00000000..76219eb7 --- /dev/null +++ b/ntex-compio/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/ntex-compio/src/io.rs b/ntex-compio/src/io.rs new file mode 100644 index 00000000..8a940475 --- /dev/null +++ b/ntex-compio/src/io.rs @@ -0,0 +1,242 @@ +use std::{any, io}; + +use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit}; +use compio::io::{AsyncRead, AsyncWrite}; +use compio::net::TcpStream; +use ntex_bytes::{Buf, BufMut, BytesVec}; +use ntex_io::{ + types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, +}; +use ntex_util::{future::select, future::Either, time::sleep}; + +impl IoStream for crate::TcpStream { + fn start(self, read: ReadContext, write: WriteContext) -> Option> { + let mut io = self.0.clone(); + compio::runtime::spawn(async move { + run(&mut io, &read, write).await; + + let res = io.close().await; + log::debug!("{} Stream is closed, {:?}", read.tag(), res); + }) + .detach(); + + Some(Box::new(HandleWrapper(self.0))) + } +} + +#[cfg(unix)] +impl IoStream for crate::UnixStream { + fn start(self, read: ReadContext, write: WriteContext) -> Option> { + let mut io = self.0; + compio::runtime::spawn(async move { + run(&mut io, &read, write).await; + + let res = io.close().await; + log::debug!("{} Stream is closed, {:?}", read.tag(), res); + }) + .detach(); + + None + } +} + +struct HandleWrapper(TcpStream); + +impl Handle for HandleWrapper { + fn query(&self, id: any::TypeId) -> Option> { + if id == any::TypeId::of::() { + if let Ok(addr) = self.0.peer_addr() { + return Some(Box::new(types::PeerAddr(addr))); + } + } + None + } +} + +struct CompioBuf(BytesVec); + +unsafe impl IoBuf for CompioBuf { + #[inline] + fn as_buf_ptr(&self) -> *const u8 { + self.0.chunk().as_ptr() + } + + #[inline] + fn buf_len(&self) -> usize { + self.0.len() + } + + #[inline] + fn buf_capacity(&self) -> usize { + self.0.remaining_mut() + } +} + +unsafe impl IoBufMut for CompioBuf { + fn as_buf_mut_ptr(&mut self) -> *mut u8 { + self.0.chunk_mut().as_mut_ptr() + } +} + +impl SetBufInit for CompioBuf { + unsafe fn set_buf_init(&mut self, len: usize) { + self.0.set_len(len + self.0.len()); + } +} + +async fn run( + io: &mut T, + read: &ReadContext, + write: WriteContext, +) { + let mut wr_io = io.clone(); + let wr_task = compio::runtime::spawn(async move { + write_task(&mut wr_io, &write).await; + log::debug!("{} Write task is stopped", write.tag()); + }); + + read_task(io, read).await; + log::debug!("{} Read task is stopped", read.tag()); + + if !wr_task.is_finished() { + let _ = wr_task.await; + } +} + +/// Read io task +async fn read_task(io: &mut T, state: &ReadContext) { + loop { + match state.ready().await { + ReadStatus::Ready => { + let result = state + .with_buf_async(|buf| async { + let BufResult(result, buf) = + match select(io.read(CompioBuf(buf)), state.wait_for_close()) + .await + { + Either::Left(res) => res, + Either::Right(_) => return (Default::default(), Ok(1)), + }; + + match result { + Ok(n) => { + if n == 0 { + log::trace!( + "{}: Tcp stream is disconnected", + state.tag() + ); + } + (buf.0, Ok(n)) + } + Err(err) => { + log::trace!( + "{}: Read task failed on io {:?}", + state.tag(), + err + ); + (buf.0, Err(err)) + } + } + }) + .await; + + if result.is_ready() { + break; + } + } + ReadStatus::Terminate => { + log::trace!("{}: Read task is instructed to shutdown", state.tag()); + break; + } + } + } +} + +/// Write io task +async fn write_task(mut io: T, state: &WriteContext) { + let mut delay = None; + + loop { + let result = if let Some(ref mut sleep) = delay { + let result = match select(sleep, state.ready()).await { + Either::Left(_) => { + state.close(Some(io::Error::new( + io::ErrorKind::TimedOut, + "Operation timedout", + ))); + return; + } + Either::Right(res) => res, + }; + delay = None; + result + } else { + state.ready().await + }; + + match result { + WriteStatus::Ready => { + // write io stream + match write(&mut io, state).await { + Ok(()) => continue, + Err(e) => { + state.close(Some(e)); + } + } + } + WriteStatus::Timeout(time) => { + log::trace!("{}: Initiate timeout delay for {:?}", state.tag(), time); + delay = Some(sleep(time)); + continue; + } + WriteStatus::Shutdown(time) => { + log::trace!("{}: Write task is instructed to shutdown", state.tag()); + + let fut = async { + write(&mut io, state).await?; + io.flush().await?; + io.shutdown().await?; + Ok(()) + }; + match select(sleep(time), fut).await { + Either::Left(_) => state.close(None), + Either::Right(res) => state.close(res.err()), + } + } + WriteStatus::Terminate => { + log::trace!("{}: Write task is instructed to terminate", state.tag()); + state.close(io.shutdown().await.err()); + } + } + break; + } +} + +// write to io stream +async fn write(io: &mut T, state: &WriteContext) -> io::Result<()> { + state + .with_buf_async(|buf| async { + let mut buf = CompioBuf(buf); + loop { + let BufResult(result, buf1) = io.write(buf).await; + buf = buf1; + + match result { + Ok(size) => { + if buf.0.len() == size { + return io.flush().await; + } + if size == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + )); + } + buf.0.advance(size); + } + Err(e) => return Err(e), + } + } + }) + .await +} diff --git a/ntex-compio/src/lib.rs b/ntex-compio/src/lib.rs new file mode 100644 index 00000000..4d034824 --- /dev/null +++ b/ntex-compio/src/lib.rs @@ -0,0 +1,61 @@ +use std::{io::Result, net, net::SocketAddr}; + +use ntex_bytes::PoolRef; +use ntex_io::Io; + +mod io; + +/// Tcp stream wrapper for compio TcpStream +struct TcpStream(compio::net::TcpStream); + +#[cfg(unix)] +/// Tcp stream wrapper for compio UnixStream +struct UnixStream(compio::net::UnixStream); + +/// Opens a TCP connection to a remote host. +pub async fn tcp_connect(addr: SocketAddr) -> Result { + let sock = compio::net::TcpStream::connect(addr).await?; + Ok(Io::new(TcpStream(sock))) +} + +/// Opens a TCP connection to a remote host and use specified memory pool. +pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result { + let sock = compio::net::TcpStream::connect(addr).await?; + Ok(Io::with_memory_pool(TcpStream(sock), pool)) +} + +#[cfg(unix)] +/// Opens a unix stream connection. +pub async fn unix_connect<'a, P>(addr: P) -> Result +where + P: AsRef + 'a, +{ + let sock = compio::net::UnixStream::connect(addr).await?; + Ok(Io::new(UnixStream(sock))) +} + +#[cfg(unix)] +/// Opens a unix stream connection and specified memory pool. +pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result +where + P: AsRef + 'a, +{ + let sock = compio::net::UnixStream::connect(addr).await?; + Ok(Io::with_memory_pool(UnixStream(sock), pool)) +} + +/// Convert std TcpStream to tokio's TcpStream +pub fn from_tcp_stream(stream: net::TcpStream) -> Result { + stream.set_nodelay(true)?; + Ok(Io::new(TcpStream(compio::net::TcpStream::from_std( + stream, + )?))) +} + +#[cfg(unix)] +/// Convert std UnixStream to tokio's UnixStream +pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { + Ok(Io::new(UnixStream(compio::net::UnixStream::from_std( + stream, + )?))) +}