Depend on individual compio packages (#479)

This commit is contained in:
Nikolay Kim 2024-12-02 15:00:50 +05:00 committed by GitHub
parent a7666e4881
commit b5a4a3cb5b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 40 additions and 30 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.4] - 2024-12-01
* Depend on individual compio packages
## [0.2.3] - 2024-11-27
* Disable default features

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-compio"
version = "0.2.3"
version = "0.2.4"
authors = ["ntex contributors <team@ntex.rs>"]
description = "compio runtime intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -22,4 +22,9 @@ ntex-io = "2.5"
ntex-util = "2"
ntex-rt = "0.4"
log = "0.4"
compio = { version = "0.13.0", features = ["macros", "runtime", "io", "io-uring", "polling"], default-features = false }
compio-buf = "0.5"
compio-io = "0.5"
compio-net = "0.6"
compio-driver = "0.6"
compio-runtime = { version = "0.6", features = ["io-uring", "polling", "event"] }

View file

@ -1,15 +1,15 @@
use std::{any, io};
use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::TcpStream;
use compio_buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
use compio_io::{AsyncRead, AsyncWrite};
use compio_net::TcpStream;
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf};
impl IoStream for crate::TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0.clone();
compio::runtime::spawn(async move { run(io.clone(), &read, write).await }).detach();
compio_runtime::spawn(async move { run(io.clone(), &read, write).await }).detach();
Some(Box::new(HandleWrapper(self.0)))
}
@ -18,7 +18,7 @@ impl IoStream for crate::TcpStream {
#[cfg(unix)]
impl IoStream for crate::UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
compio::runtime::spawn(async move { run(self.0.clone(), &read, write).await })
compio_runtime::spawn(async move { run(self.0.clone(), &read, write).await })
.detach();
None
@ -75,7 +75,7 @@ async fn run<T: AsyncRead + AsyncWrite + Clone + 'static>(
write: WriteContext,
) {
let mut wr_io = WriteIo(io.clone());
let wr_task = compio::runtime::spawn(async move {
let wr_task = compio_runtime::spawn(async move {
write.handle(&mut wr_io).await;
log::debug!("{} Write task is stopped", write.tag());
});

View file

@ -6,21 +6,21 @@ use ntex_io::Io;
mod io;
/// Tcp stream wrapper for compio TcpStream
struct TcpStream(compio::net::TcpStream);
struct TcpStream(compio_net::TcpStream);
#[cfg(unix)]
/// Tcp stream wrapper for compio UnixStream
struct UnixStream(compio::net::UnixStream);
struct UnixStream(compio_net::UnixStream);
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
let sock = compio::net::TcpStream::connect(addr).await?;
let sock = compio_net::TcpStream::connect(addr).await?;
Ok(Io::new(TcpStream(sock)))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
let sock = compio::net::TcpStream::connect(addr).await?;
let sock = compio_net::TcpStream::connect(addr).await?;
Ok(Io::with_memory_pool(TcpStream(sock), pool))
}
@ -30,7 +30,7 @@ pub async fn unix_connect<'a, P>(addr: P) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
let sock = compio::net::UnixStream::connect(addr).await?;
let sock = compio_net::UnixStream::connect(addr).await?;
Ok(Io::new(UnixStream(sock)))
}
@ -40,22 +40,20 @@ pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
let sock = compio::net::UnixStream::connect(addr).await?;
let sock = compio_net::UnixStream::connect(addr).await?;
Ok(Io::with_memory_pool(UnixStream(sock), pool))
}
/// Convert std TcpStream to tokio's TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
stream.set_nodelay(true)?;
Ok(Io::new(TcpStream(compio::net::TcpStream::from_std(
stream,
)?)))
Ok(Io::new(TcpStream(compio_net::TcpStream::from_std(stream)?)))
}
#[cfg(unix)]
/// Convert std UnixStream to tokio's UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
Ok(Io::new(UnixStream(compio::net::UnixStream::from_std(
Ok(Io::new(UnixStream(compio_net::UnixStream::from_std(
stream,
)?)))
}

View file

@ -39,7 +39,7 @@ ntex-rt = "0.4.21"
ntex-util = "2.5"
ntex-tokio = { version = "0.5.3", optional = true }
ntex-compio = { version = "0.2.2", optional = true }
ntex-compio = { version = "0.2.4", optional = true }
ntex-glommio = { version = "0.5.2", optional = true }
ntex-async-std = { version = "0.5.1", optional = true }

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.22] - 2024-12-01
* Depend on individual compio packages
## [0.4.21] - 2024-11-25
* Update to compio 0.13

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.4.21"
version = "0.4.22"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -27,7 +27,7 @@ glommio = ["glomm-io", "futures-channel"]
tokio = ["tok-io"]
# compio support
compio = ["comp-io"]
compio = ["compio-driver", "compio-runtime"]
# async-std support
async-std = ["async_std/unstable"]
@ -39,9 +39,8 @@ log = "0.4"
oneshot = "0.1"
async_std = { version = "1", package = "async-std", optional = true }
comp-io = { version = "0.13", package = "compio", default-features = false, features = [
"runtime"
], optional = true }
compio-driver = { version = "0.6", optional = true }
compio-runtime = { version = "0.6", optional = true }
tok-io = { version = "1", package = "tokio", default-features = false, features = [
"rt",
"net",

View file

@ -127,14 +127,14 @@ mod compio {
use std::task::{ready, Context, Poll};
use std::{fmt, future::poll_fn, future::Future, pin::Pin};
use comp_io::runtime::Runtime;
use compio_runtime::Runtime;
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
log::info!(
"Starting compio runtime, driver {:?}",
comp_io::driver::DriverType::current()
compio_driver::DriverType::current()
);
let rt = Runtime::new().unwrap();
rt.block_on(fut);
@ -151,7 +151,7 @@ mod compio {
T: Send + 'static,
{
JoinHandle {
fut: Some(comp_io::runtime::spawn_blocking(f)),
fut: Some(compio_runtime::spawn_blocking(f)),
}
}
@ -168,7 +168,7 @@ mod compio {
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
let fut = comp_io::runtime::spawn(async move {
let fut = compio_runtime::spawn(async move {
if let Some(ptr) = ptr {
let mut f = std::pin::pin!(f);
let result = poll_fn(|ctx| {
@ -216,7 +216,7 @@ mod compio {
impl std::error::Error for JoinError {}
pub struct JoinHandle<T> {
fut: Option<comp_io::runtime::JoinHandle<T>>,
fut: Option<compio_runtime::JoinHandle<T>>,
}
impl<T> JoinHandle<T> {

View file

@ -70,7 +70,7 @@ ntex-util = "2.5"
ntex-bytes = "0.1.27"
ntex-server = "2.5"
ntex-h2 = "1.4"
ntex-rt = "0.4.21"
ntex-rt = "0.4.22"
ntex-io = "2.8"
ntex-net = "2.4"
ntex-tls = "2.3"