Drop async-std support (#512)

This commit is contained in:
Nikolay Kim 2025-03-09 18:53:47 +05:00 committed by GitHub
parent 59ffd17b91
commit 8ffa646af7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 6 additions and 547 deletions

View file

@ -9,7 +9,6 @@ members = [
"ntex-http",
"ntex-router",
"ntex-rt",
"ntex-runtime",
"ntex-net",
"ntex-server",
"ntex-service",
@ -17,9 +16,9 @@ members = [
"ntex-macros",
"ntex-util",
"ntex-async-std",
"ntex-compio",
"ntex-tokio",
"ntex-runtime",
]
[workspace.package]
@ -40,7 +39,6 @@ ntex-net = { path = "ntex-net" }
ntex-http = { path = "ntex-http" }
ntex-router = { path = "ntex-router" }
ntex-rt = { path = "ntex-rt" }
ntex-runtime = { path = "ntex-runtime" }
ntex-server = { path = "ntex-server" }
ntex-service = { path = "ntex-service" }
ntex-tls = { path = "ntex-tls" }
@ -49,7 +47,7 @@ ntex-util = { path = "ntex-util" }
ntex-compio = { path = "ntex-compio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-async-std = { path = "ntex-async-std" }
ntex-runtime = { path = "ntex-runtime" }
[workspace.dependencies]
async-task = "4.5.0"

View file

@ -1,45 +0,0 @@
# Changes
## [0.4.0] - 2024-01-09
* Release
## [0.4.0-b.0] - 2024-01-07
* Use "async fn" in trait for Service definition
## [0.3.2] - 2023-11-22
* Replace async-oneshot with oneshot
## [0.3.1] - 2023-11-12
* Optimize io read task
## [0.3.0] - 2023-06-22
* Release v0.3.0
## [0.3.0-beta.0] - 2023-06-16
* Migrate to ntex-service 1.2
## [0.2.2] - 2023-01-26
* Update io api usage
## [0.2.0] - 2023-01-04
* Release
## [0.2.0-beta.0] - 2022-12-28
* Migrate to ntex-service 1.0
## [0.1.1] - 2022-01-30
* Update to ntex-io 0.1.7
## [0.1.0] - 2022-01-03
* Initial release

View file

@ -1,24 +0,0 @@
[package]
name = "ntex-async-std"
version = "0.5.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex-rs/ntex.git"
documentation = "https://docs.rs/ntex-rt-async-std/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2021"
[lib]
name = "ntex_async_std"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1"
ntex-io = "2.5"
ntex-util = "2.0"
log = "0.4"
async-std = { version = "1", features = ["unstable"] }
oneshot = { version = "0.1", default-features = false, features = ["async"] }

View file

@ -1 +0,0 @@
../LICENSE-APACHE

View file

@ -1 +0,0 @@
../LICENSE-MIT

View file

@ -1,220 +0,0 @@
use std::{
any, cell::RefCell, future::poll_fn, io, pin::Pin, task::ready, task::Context,
task::Poll,
};
use async_std::io::{Read as ARead, Write as AWrite};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf};
use crate::TcpStream;
impl IoStream for TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut rio = Read(RefCell::new(self.clone()));
async_std::task::spawn_local(async move {
read.handle(&mut rio).await;
});
let mut wio = Write(RefCell::new(self.clone()));
async_std::task::spawn_local(async move {
write.handle(&mut wio).await;
});
Some(Box::new(self))
}
}
impl Handle for TcpStream {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
if let Ok(addr) = self.0.peer_addr() {
return Some(Box::new(types::PeerAddr(addr)));
}
}
None
}
}
/// Read io task
struct Read(RefCell<TcpStream>);
impl ntex_io::AsyncRead for Read {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0.borrow_mut();
poll_read_buf(Pin::new(&mut io.0), cx, &mut buf)
})
.await;
(buf, result)
}
}
struct Write(RefCell<TcpStream>);
impl ntex_io::AsyncWrite for Write {
#[inline]
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
self.0.borrow().0.shutdown(std::net::Shutdown::Both)
}
}
/// Flush write buffer to underlying I/O stream.
pub(super) fn flush_io<T: ARead + AWrite + Unpin>(
io: &mut T,
buf: &mut BytesVec,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
let len = buf.len();
if len != 0 {
// log::trace!("flushing framed transport: {:?}", buf.len());
let mut written = 0;
let result = loop {
break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)))
} else {
written += n;
if written == len {
buf.clear();
Poll::Ready(Ok(()))
} else {
continue;
}
}
}
Poll::Pending => {
// remove written data
buf.advance(written);
Poll::Pending
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
Poll::Ready(Err(e))
}
};
};
// log::trace!("flushed {} bytes", written);
// flush
if written > 0 {
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
Poll::Ready(Err(e))
}
}
} else {
result
}
} else {
Poll::Ready(Ok(()))
}
}
pub fn poll_read_buf<T: ARead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
let dst = unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [u8]) };
let n = ready!(io.poll_read(cx, dst))?;
// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by Read::poll_read() api
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
#[cfg(unix)]
mod unixstream {
use super::*;
use crate::UnixStream;
impl IoStream for UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut rio = Read(RefCell::new(self.clone()));
async_std::task::spawn_local(async move {
read.handle(&mut rio).await;
});
let mut wio = Write(RefCell::new(self));
async_std::task::spawn_local(async move {
write.handle(&mut wio).await;
});
None
}
}
/// Read io task
struct Read(RefCell<UnixStream>);
impl ntex_io::AsyncRead for Read {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0.borrow_mut();
poll_read_buf(Pin::new(&mut io.0), cx, &mut buf)
})
.await;
(buf, result)
}
}
struct Write(RefCell<UnixStream>);
impl ntex_io::AsyncWrite for Write {
#[inline]
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
self.0.borrow().0.shutdown(std::net::Shutdown::Both)
}
}
}

View file

@ -1,64 +0,0 @@
use std::{io::Result, net, net::SocketAddr};
use ntex_bytes::PoolRef;
use ntex_io::Io;
mod io;
mod signals;
pub use self::signals::{signal, Signal};
#[derive(Clone)]
struct TcpStream(async_std::net::TcpStream);
#[cfg(unix)]
#[derive(Clone)]
struct UnixStream(async_std::os::unix::net::UnixStream);
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
let sock = async_std::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::new(TcpStream(sock)))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
let sock = async_std::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::with_memory_pool(TcpStream(sock), pool))
}
#[cfg(unix)]
/// Opens a unix stream connection.
pub async fn unix_connect<P>(addr: P) -> Result<Io>
where
P: AsRef<async_std::path::Path>,
{
let sock = async_std::os::unix::net::UnixStream::connect(addr).await?;
Ok(Io::new(UnixStream(sock)))
}
#[cfg(unix)]
/// Opens a unix stream connection and specified memory pool.
pub async fn unix_connect_in<P>(addr: P, pool: PoolRef) -> Result<Io>
where
P: AsRef<async_std::path::Path>,
{
let sock = async_std::os::unix::net::UnixStream::connect(addr).await?;
Ok(Io::with_memory_pool(UnixStream(sock), pool))
}
/// Convert std TcpStream to async-std's TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
stream.set_nonblocking(true)?;
stream.set_nodelay(true)?;
Ok(Io::new(TcpStream(async_std::net::TcpStream::from(stream))))
}
#[cfg(unix)]
/// Convert std UnixStream to async-std's UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
stream.set_nonblocking(true)?;
Ok(Io::new(UnixStream(From::from(stream))))
}

View file

@ -1,50 +0,0 @@
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
thread_local! {
static SRUN: RefCell<bool> = const { RefCell::new(false) };
static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default();
}
/// Different types of process signals
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum Signal {
/// SIGHUP
Hup,
/// SIGINT
Int,
/// SIGTERM
Term,
/// SIGQUIT
Quit,
}
/// Register signal handler.
///
/// Signals are handled by oneshots, you have to re-register
/// after each signal.
pub fn signal() -> Option<oneshot::Receiver<Signal>> {
if !SRUN.with(|v| *v.borrow()) {
async_std::task::spawn_local(Signals::new());
}
SHANDLERS.with(|handlers| {
let (tx, rx) = oneshot::channel();
handlers.borrow_mut().push(tx);
Some(rx)
})
}
struct Signals {}
impl Signals {
pub(super) fn new() -> Signals {
Self {}
}
}
impl Future for Signals {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(())
}
}

View file

@ -6,6 +6,8 @@
* Drop glommio support
* Drop async-std support
## [2.4.0] - 2024-09-25
* Update to glommio v0.9

View file

@ -24,9 +24,6 @@ tokio = ["ntex-rt/tokio", "ntex-tokio"]
# compio runtime
compio = ["ntex-rt/compio", "ntex-compio"]
# async-std runtime
async-std = ["ntex-rt/async-std", "ntex-async-std"]
# default ntex runtime
default-rt = ["ntex-rt/default-rt", "ntex-runtime", "ntex-iodriver", "slab", "socket2"]
@ -40,8 +37,6 @@ ntex-util = "2.5"
ntex-tokio = { version = "0.5.3", optional = true }
ntex-compio = { version = "0.2.4", optional = true }
ntex-async-std = { version = "0.5.1", optional = true }
ntex-runtime = { version = "0.1.0", optional = true }
ntex-iodriver = { version = "0.1.0", optional = true }

View file

@ -9,7 +9,6 @@ pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "default-rt",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio")
))]
pub use crate::rt::{
@ -20,7 +19,6 @@ pub use crate::rt::{
#[cfg(all(
feature = "compio",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "default-rt")
))]
pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in};
@ -29,34 +27,15 @@ pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in};
unix,
feature = "compio",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "default-rt")
))]
pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "async-std",
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "default-rt")
))]
pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(
unix,
feature = "async-std",
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "default-rt")
))]
pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "async-std"),
not(feature = "default-rt")
))]
mod no_rt {
use ntex_io::Io;
@ -124,7 +103,6 @@ mod no_rt {
#[cfg(all(
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "async-std"),
not(feature = "default-rt")
))]
pub use no_rt::*;

View file

@ -12,7 +12,6 @@ pub use self::compat::*;
#[cfg(all(
feature = "default-rt",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio")
))]
mod rt;

View file

@ -6,6 +6,8 @@
* Drop glommio support
* Drop async-std support
## [0.4.24] - 2025-01-03
* Relax runtime requirements

View file

@ -29,16 +29,12 @@ compio = ["compio-driver", "compio-runtime"]
# default ntex runtime
default-rt = ["ntex-runtime", "ntex-iodriver"]
# async-std support
async-std = ["async_std/unstable"]
[dependencies]
async-channel = "2"
futures-core = "0.3"
log = "0.4"
oneshot = "0.1"
async_std = { version = "1", package = "async-std", optional = true }
compio-driver = { version = "0.6", optional = true }
compio-runtime = { version = "0.6", optional = true }
tok-io = { version = "1", package = "tokio", default-features = false, features = [

View file

@ -7,7 +7,6 @@ fn main() {
let _ = match key.as_ref() {
"CARGO_FEATURE_COMPIO" => features.insert("compio"),
"CARGO_FEATURE_TOKIO" => features.insert("tokio"),
"CARGO_FEATURE_ASYNC_STD" => features.insert("async-std"),
"CARGO_FEATURE_DEFAULT_RT" => features.insert("default-rt"),
_ => false,
};

View file

@ -402,112 +402,9 @@ mod default_rt {
}
}
#[allow(dead_code)]
#[cfg(feature = "async-std")]
mod asyncstd {
use std::future::{poll_fn, Future};
use std::{fmt, pin::Pin, task::ready, task::Context, task::Poll};
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
async_std::task::block_on(fut);
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
JoinHandle {
fut: async_std::task::spawn_local(async move {
if let Some(ptr) = ptr {
let mut f = unsafe { Pin::new_unchecked(&mut f) };
let result = poll_fn(|ctx| {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else {
f.await
}
}),
}
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated
/// to blocking tasks. This is useful to prevent long-running synchronous
/// operations from blocking the main futures executor.
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
JoinHandle {
fut: async_std::task::spawn_blocking(f),
}
}
#[derive(Debug, Copy, Clone)]
pub struct JoinError;
impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "JoinError")
}
}
impl std::error::Error for JoinError {}
pub struct JoinHandle<T> {
fut: async_std::task::JoinHandle<T>,
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx))))
}
}
}
#[cfg(feature = "tokio")]
pub use self::tokio::*;
#[cfg(feature = "async-std")]
pub use self::asyncstd::*;
#[cfg(feature = "compio")]
pub use self::compio::*;
@ -517,7 +414,6 @@ pub use self::default_rt::*;
#[allow(dead_code)]
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "default-rt")
))]
@ -581,7 +477,6 @@ mod no_rt {
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "default-rt")
))]