Remove neon::net dep (#523)

This commit is contained in:
Nikolay Kim 2025-03-14 15:13:31 +05:00 committed by GitHub
parent 9a8a2b3216
commit 81eaf88752
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 179 additions and 183 deletions

View file

@ -40,7 +40,7 @@ ntex-util = "2.5"
ntex-tokio = { version = "0.5.3", optional = true }
ntex-compio = { version = "0.2.4", optional = true }
ntex-neon = { version = "0.1.1", optional = true }
ntex-neon = { version = "0.1.3", optional = true }
bitflags = { workspace = true }
cfg-if = { workspace = true }

86
ntex-net/src/helpers.rs Normal file
View file

@ -0,0 +1,86 @@
use std::{io, net::SocketAddr, os::fd::FromRawFd, path::Path};
use ntex_neon::syscall;
use ntex_util::channel::oneshot::channel;
use socket2::{Protocol, SockAddr, Socket, Type};
pub(crate) fn pool_io_err<T, E>(result: std::result::Result<T, E>) -> io::Result<T> {
result.map_err(|_| io::Error::new(io::ErrorKind::Other, "Thread pool panic"))
}
pub(crate) async fn connect(addr: SocketAddr) -> io::Result<Socket> {
let addr = SockAddr::from(addr);
let domain = addr.domain().into();
connect_inner(addr, domain, Type::STREAM.into(), Protocol::TCP.into()).await
}
pub(crate) async fn connect_unix(path: impl AsRef<Path>) -> io::Result<Socket> {
let addr = SockAddr::unix(path)?;
connect_inner(addr, socket2::Domain::UNIX.into(), Type::STREAM.into(), 0).await
}
async fn connect_inner(
addr: SockAddr,
domain: i32,
socket_type: i32,
protocol: i32,
) -> io::Result<Socket> {
#[allow(unused_mut)]
let mut ty = socket_type;
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
))]
{
ty |= libc::SOCK_CLOEXEC;
}
let fd = ntex_rt::spawn_blocking(move || syscall!(libc::socket(domain, ty, protocol)))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.and_then(pool_io_err)?;
let (sender, rx) = channel();
crate::rt_impl::connect::ConnectOps::current().connect(fd, addr, sender)?;
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
.and_then(|item| item)?;
Ok(unsafe { Socket::from_raw_fd(fd) })
}
pub(crate) fn prep_socket(sock: Socket) -> io::Result<Socket> {
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "espidf",
target_os = "vita",
)))]
sock.set_cloexec(true)?;
#[cfg(any(
target_os = "ios",
target_os = "macos",
target_os = "tvos",
target_os = "watchos",
))]
sock.set_nosigpipe(true)?;
sock.set_nonblocking(true)?;
Ok(sock)
}

View file

@ -27,3 +27,6 @@ cfg_if::cfg_if! {
pub use self::compat::*;
}
}
#[cfg(all(unix, feature = "neon"))]
mod helpers;

View file

@ -1,67 +1,11 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::{cell::RefCell, collections::VecDeque, io, path::Path, rc::Rc, task::Poll};
use std::os::fd::{AsRawFd, RawFd};
use std::{cell::RefCell, collections::VecDeque, io, rc::Rc, task::Poll};
use ntex_neon::driver::op::{Handler, Interest};
use ntex_neon::driver::{AsRawFd, DriverApi, RawFd};
use ntex_neon::net::{Socket, TcpStream, UnixStream};
use ntex_neon::driver::{DriverApi, Handler, Interest};
use ntex_neon::{syscall, Runtime};
use ntex_util::channel::oneshot::{channel, Sender};
use ntex_util::channel::oneshot::Sender;
use slab::Slab;
use socket2::{Protocol, SockAddr, Type};
pub(crate) async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
let addr = SockAddr::from(addr);
let socket = if cfg!(windows) {
let bind_addr = if addr.is_ipv4() {
SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
} else if addr.is_ipv6() {
SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))
} else {
return Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"Unsupported address domain.",
));
};
Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?
} else {
Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP)).await?
};
let (sender, rx) = channel();
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?;
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
.and_then(|item| item)?;
Ok(TcpStream::from_socket(socket))
}
pub(crate) async fn connect_unix(path: impl AsRef<Path>) -> io::Result<UnixStream> {
let addr = SockAddr::unix(path)?;
#[cfg(windows)]
let socket = {
let new_addr = empty_unix_socket();
Socket::bind(&new_addr, Type::STREAM, None).await?
};
#[cfg(unix)]
let socket = {
use socket2::Domain;
Socket::new(Domain::UNIX, Type::STREAM, None).await?
};
let (sender, rx) = channel();
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender)?;
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
.and_then(|item| item)?;
Ok(UnixStream::from_socket(socket))
}
use socket2::SockAddr;
#[derive(Clone)]
pub(crate) struct ConnectOps(Rc<ConnectOpsInner>);

View file

@ -1,7 +1,7 @@
use std::os::fd::{AsRawFd, RawFd};
use std::{cell::Cell, collections::VecDeque, io, rc::Rc, task, task::Poll};
use ntex_neon::driver::op::{close_socket, Handler, Interest};
use ntex_neon::driver::{AsRawFd, DriverApi, RawFd};
use ntex_neon::driver::{DriverApi, Handler, Interest};
use ntex_neon::{syscall, Runtime};
use slab::Slab;
@ -211,7 +211,11 @@ impl<T> StreamCtl<T> {
self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd));
if let Some(io) = io {
std::mem::forget(io);
close_socket(fd).await?;
ntex_rt::spawn_blocking(move || syscall!(libc::close(fd)))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.and_then(crate::helpers::pool_io_err)?;
}
Ok(())
}

View file

@ -3,7 +3,8 @@ use std::{any, future::poll_fn, task::Poll};
use ntex_io::{
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_neon::{net::TcpStream, spawn};
use ntex_rt::spawn;
use socket2::Socket;
use super::driver::{StreamCtl, StreamOps};
@ -13,7 +14,7 @@ impl IoStream for super::TcpStream {
let context = read.context();
let ctl = StreamOps::current().register(io, context.clone());
let ctl2 = ctl.clone();
spawn(async move { run(ctl, context).await }).detach();
spawn(async move { run(ctl, context).await });
Some(Box::new(HandleWrapper(ctl2)))
}
@ -24,19 +25,19 @@ impl IoStream for super::UnixStream {
let io = self.0;
let context = read.context();
let ctl = StreamOps::current().register(io, context.clone());
spawn(async move { run(ctl, context).await }).detach();
spawn(async move { run(ctl, context).await });
None
}
}
struct HandleWrapper(StreamCtl<TcpStream>);
struct HandleWrapper(StreamCtl<Socket>);
impl Handle for HandleWrapper {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok()));
if let Some(addr) = addr {
if let Some(addr) = addr.and_then(|addr| addr.as_socket()) {
return Some(Box::new(types::PeerAddr(addr)));
}
}

View file

@ -2,27 +2,31 @@ use std::{io::Result, net, net::SocketAddr};
use ntex_bytes::PoolRef;
use ntex_io::Io;
use socket2::Socket;
mod connect;
pub(crate) mod connect;
mod driver;
mod io;
/// Tcp stream wrapper for neon TcpStream
struct TcpStream(ntex_neon::net::TcpStream);
struct TcpStream(socket2::Socket);
/// Tcp stream wrapper for neon UnixStream
struct UnixStream(ntex_neon::net::UnixStream);
struct UnixStream(socket2::Socket);
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
let sock = connect::connect(addr).await?;
Ok(Io::new(TcpStream(sock)))
let sock = crate::helpers::connect(addr).await?;
Ok(Io::new(TcpStream(crate::helpers::prep_socket(sock)?)))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
let sock = connect::connect(addr).await?;
Ok(Io::with_memory_pool(TcpStream(sock), pool))
let sock = crate::helpers::connect(addr).await?;
Ok(Io::with_memory_pool(
TcpStream(crate::helpers::prep_socket(sock)?),
pool,
))
}
/// Opens a unix stream connection.
@ -30,8 +34,8 @@ pub async fn unix_connect<'a, P>(addr: P) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
let sock = connect::connect_unix(addr).await?;
Ok(Io::new(UnixStream(sock)))
let sock = crate::helpers::connect_unix(addr).await?;
Ok(Io::new(UnixStream(crate::helpers::prep_socket(sock)?)))
}
/// Opens a unix stream connection and specified memory pool.
@ -39,21 +43,24 @@ pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
let sock = connect::connect_unix(addr).await?;
Ok(Io::with_memory_pool(UnixStream(sock), pool))
let sock = crate::helpers::connect_unix(addr).await?;
Ok(Io::with_memory_pool(
UnixStream(crate::helpers::prep_socket(sock)?),
pool,
))
}
/// Convert std TcpStream to tokio's TcpStream
/// Convert std TcpStream to TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
stream.set_nodelay(true)?;
Ok(Io::new(TcpStream(ntex_neon::net::TcpStream::from_std(
stream,
Ok(Io::new(TcpStream(crate::helpers::prep_socket(
Socket::from(stream),
)?)))
}
/// Convert std UnixStream to tokio's UnixStream
/// Convert std UnixStream to UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
Ok(Io::new(UnixStream(ntex_neon::net::UnixStream::from_std(
stream,
Ok(Io::new(UnixStream(crate::helpers::prep_socket(
Socket::from(stream),
)?)))
}

View file

@ -1,68 +1,10 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::{cell::RefCell, io, path::Path, rc::Rc};
use std::{cell::RefCell, io, os::fd::RawFd, rc::Rc};
use io_uring::{opcode, types::Fd};
use ntex_neon::driver::op::Handler;
use ntex_neon::driver::{AsRawFd, DriverApi, RawFd};
use ntex_neon::net::{Socket, TcpStream, UnixStream};
use ntex_neon::Runtime;
use ntex_util::channel::oneshot::{channel, Sender};
use ntex_neon::{driver::DriverApi, driver::Handler, Runtime};
use ntex_util::channel::oneshot::Sender;
use slab::Slab;
use socket2::{Protocol, SockAddr, Type};
pub(crate) async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
let addr = SockAddr::from(addr);
let socket = if cfg!(windows) {
let bind_addr = if addr.is_ipv4() {
SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
} else if addr.is_ipv6() {
SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))
} else {
return Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"Unsupported address domain.",
));
};
Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?
} else {
Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP)).await?
};
let (sender, rx) = channel();
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender);
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
.and_then(|item| item)?;
Ok(TcpStream::from_socket(socket))
}
pub(crate) async fn connect_unix(path: impl AsRef<Path>) -> io::Result<UnixStream> {
let addr = SockAddr::unix(path)?;
#[cfg(windows)]
let socket = {
let new_addr = empty_unix_socket();
Socket::bind(&new_addr, Type::STREAM, None).await?
};
#[cfg(unix)]
let socket = {
use socket2::Domain;
Socket::new(Domain::UNIX, Type::STREAM, None).await?
};
let (sender, rx) = channel();
ConnectOps::current().connect(socket.as_raw_fd(), addr, sender);
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::Other, "IO Driver is gone"))
.and_then(|item| item)?;
Ok(UnixStream::from_socket(socket))
}
use socket2::SockAddr;
#[derive(Clone)]
pub(crate) struct ConnectOps(Rc<ConnectOpsInner>);
@ -104,14 +46,14 @@ impl ConnectOps {
fd: RawFd,
addr: SockAddr,
sender: Sender<io::Result<()>>,
) -> usize {
) -> io::Result<()> {
let id = self.0.ops.borrow_mut().insert(sender);
self.0.api.submit(
id as u32,
opcode::Connect::new(Fd(fd), addr.as_ptr(), addr.len()).build(),
);
id
Ok(())
}
}

View file

@ -1,8 +1,7 @@
use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, rc::Rc, task::Poll};
use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, os, rc::Rc, task::Poll};
use io_uring::{opcode, squeue::Entry, types::Fd};
use ntex_neon::driver::{op::Handler, AsRawFd, DriverApi};
use ntex_neon::Runtime;
use ntex_neon::{driver::DriverApi, driver::Handler, Runtime};
use ntex_util::channel::oneshot;
use slab::Slab;
@ -57,7 +56,7 @@ struct StreamOpsStorage<T> {
streams: Slab<StreamItem<T>>,
}
impl<T: AsRawFd + 'static> StreamOps<T> {
impl<T: os::fd::AsRawFd + 'static> StreamOps<T> {
pub(crate) fn current() -> Self {
Runtime::value(|rt| {
let mut inner = None;

View file

@ -3,7 +3,8 @@ use std::{any, future::poll_fn, task::Poll};
use ntex_io::{
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_neon::{net::TcpStream, spawn};
use ntex_rt::spawn;
use socket2::Socket;
use super::driver::{StreamCtl, StreamOps};
@ -13,7 +14,7 @@ impl IoStream for super::TcpStream {
let context = read.context();
let ctl = StreamOps::current().register(io, context.clone());
let ctl2 = ctl.clone();
spawn(async move { run(ctl, context).await }).detach();
spawn(async move { run(ctl, context).await });
Some(Box::new(HandleWrapper(ctl2)))
}
@ -24,19 +25,19 @@ impl IoStream for super::UnixStream {
let io = self.0;
let context = read.context();
let ctl = StreamOps::current().register(io, context.clone());
spawn(async move { run(ctl, context).await }).detach();
spawn(async move { run(ctl, context).await });
None
}
}
struct HandleWrapper(StreamCtl<TcpStream>);
struct HandleWrapper(StreamCtl<Socket>);
impl Handle for HandleWrapper {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
let addr = self.0.with_io(|io| io.and_then(|io| io.peer_addr().ok()));
if let Some(addr) = addr {
if let Some(addr) = addr.and_then(|addr| addr.as_socket()) {
return Some(Box::new(types::PeerAddr(addr)));
}
}

View file

@ -2,27 +2,31 @@ use std::{io::Result, net, net::SocketAddr};
use ntex_bytes::PoolRef;
use ntex_io::Io;
use socket2::Socket;
mod connect;
pub(crate) mod connect;
mod driver;
mod io;
/// Tcp stream wrapper for neon TcpStream
struct TcpStream(ntex_neon::net::TcpStream);
struct TcpStream(Socket);
/// Tcp stream wrapper for neon UnixStream
struct UnixStream(ntex_neon::net::UnixStream);
struct UnixStream(Socket);
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
let sock = connect::connect(addr).await?;
Ok(Io::new(TcpStream(sock)))
let sock = crate::helpers::connect(addr).await?;
Ok(Io::new(TcpStream(crate::helpers::prep_socket(sock)?)))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
let sock = connect::connect(addr).await?;
Ok(Io::with_memory_pool(TcpStream(sock), pool))
let sock = crate::helpers::connect(addr).await?;
Ok(Io::with_memory_pool(
TcpStream(crate::helpers::prep_socket(sock)?),
pool,
))
}
/// Opens a unix stream connection.
@ -30,8 +34,8 @@ pub async fn unix_connect<'a, P>(addr: P) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
let sock = connect::connect_unix(addr).await?;
Ok(Io::new(UnixStream(sock)))
let sock = crate::helpers::connect_unix(addr).await?;
Ok(Io::new(UnixStream(crate::helpers::prep_socket(sock)?)))
}
/// Opens a unix stream connection and specified memory pool.
@ -39,21 +43,24 @@ pub async fn unix_connect_in<'a, P>(addr: P, pool: PoolRef) -> Result<Io>
where
P: AsRef<std::path::Path> + 'a,
{
let sock = connect::connect_unix(addr).await?;
Ok(Io::with_memory_pool(UnixStream(sock), pool))
let sock = crate::helpers::connect_unix(addr).await?;
Ok(Io::with_memory_pool(
UnixStream(crate::helpers::prep_socket(sock)?),
pool,
))
}
/// Convert std TcpStream to tokio's TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
stream.set_nodelay(true)?;
Ok(Io::new(TcpStream(ntex_neon::net::TcpStream::from_std(
stream,
Ok(Io::new(TcpStream(crate::helpers::prep_socket(
Socket::from(stream),
)?)))
}
/// Convert std UnixStream to tokio's UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
Ok(Io::new(UnixStream(ntex_neon::net::UnixStream::from_std(
stream,
Ok(Io::new(UnixStream(crate::helpers::prep_socket(
Socket::from(stream),
)?)))
}

View file

@ -8,10 +8,10 @@ use coo_kie::{Cookie, CookieJar};
use crate::io::Filter;
use crate::io::Io;
use crate::server::Server;
use crate::service::ServiceFactory;
#[cfg(feature = "ws")]
use crate::ws::{error::WsClientError, WsClient, WsConnection};
use crate::{rt::System, service::ServiceFactory};
use crate::{time::Millis, time::Seconds, util::Bytes};
use crate::{rt::System, time::Millis, time::Seconds, util::Bytes};
use super::client::{Client, ClientRequest, ClientResponse, Connector};
use super::error::{HttpError, PayloadError};
@ -248,12 +248,13 @@ where
Ok(())
})
});
// wait for server
if std::env::var("GITHUB_ACTIONS") == Ok("true".to_string()) {
thread::sleep(std::time::Duration::from_millis(150));
}
let (system, server, addr) = rx.recv().unwrap();
// wait for server
thread::sleep(std::time::Duration::from_millis(50));
TestServer {
addr,
system,

View file

@ -701,12 +701,13 @@ where
Ok(())
})
});
// wait for server
if std::env::var("GITHUB_ACTIONS") == Ok("true".to_string()) {
thread::sleep(std::time::Duration::from_millis(150));
}
let (system, server, addr) = rx.recv().unwrap();
// wait for server
thread::sleep(std::time::Duration::from_millis(50));
let client = {
let connector = {
#[cfg(feature = "openssl")]