diff --git a/Cargo.toml b/Cargo.toml index 45d44daf..b0846dac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ members = [ "ntex-http", "ntex-router", "ntex-rt", - "ntex-runtime", "ntex-net", "ntex-server", "ntex-service", @@ -17,9 +16,9 @@ members = [ "ntex-macros", "ntex-util", - "ntex-async-std", "ntex-compio", "ntex-tokio", + "ntex-runtime", ] [workspace.package] @@ -40,7 +39,6 @@ ntex-net = { path = "ntex-net" } ntex-http = { path = "ntex-http" } ntex-router = { path = "ntex-router" } ntex-rt = { path = "ntex-rt" } -ntex-runtime = { path = "ntex-runtime" } ntex-server = { path = "ntex-server" } ntex-service = { path = "ntex-service" } ntex-tls = { path = "ntex-tls" } @@ -49,7 +47,7 @@ ntex-util = { path = "ntex-util" } ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } -ntex-async-std = { path = "ntex-async-std" } +ntex-runtime = { path = "ntex-runtime" } [workspace.dependencies] async-task = "4.5.0" diff --git a/ntex-async-std/CHANGES.md b/ntex-async-std/CHANGES.md deleted file mode 100644 index 53ba72ac..00000000 --- a/ntex-async-std/CHANGES.md +++ /dev/null @@ -1,45 +0,0 @@ -# Changes - -## [0.4.0] - 2024-01-09 - -* Release - -## [0.4.0-b.0] - 2024-01-07 - -* Use "async fn" in trait for Service definition - -## [0.3.2] - 2023-11-22 - -* Replace async-oneshot with oneshot - -## [0.3.1] - 2023-11-12 - -* Optimize io read task - -## [0.3.0] - 2023-06-22 - -* Release v0.3.0 - -## [0.3.0-beta.0] - 2023-06-16 - -* Migrate to ntex-service 1.2 - -## [0.2.2] - 2023-01-26 - -* Update io api usage - -## [0.2.0] - 2023-01-04 - -* Release - -## [0.2.0-beta.0] - 2022-12-28 - -* Migrate to ntex-service 1.0 - -## [0.1.1] - 2022-01-30 - -* Update to ntex-io 0.1.7 - -## [0.1.0] - 2022-01-03 - -* Initial release diff --git a/ntex-async-std/Cargo.toml b/ntex-async-std/Cargo.toml deleted file mode 100644 index 21d9b93b..00000000 --- a/ntex-async-std/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "ntex-async-std" -version = "0.5.1" -authors = ["ntex contributors "] -description = "async-std intergration for ntex framework" -keywords = ["network", "framework", "async", "futures"] -homepage = "https://ntex.rs" -repository = "https://github.com/ntex-rs/ntex.git" -documentation = "https://docs.rs/ntex-rt-async-std/" -categories = ["network-programming", "asynchronous"] -license = "MIT OR Apache-2.0" -edition = "2021" - -[lib] -name = "ntex_async_std" -path = "src/lib.rs" - -[dependencies] -ntex-bytes = "0.1" -ntex-io = "2.5" -ntex-util = "2.0" -log = "0.4" -async-std = { version = "1", features = ["unstable"] } -oneshot = { version = "0.1", default-features = false, features = ["async"] } diff --git a/ntex-async-std/LICENSE-APACHE b/ntex-async-std/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/ntex-async-std/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/ntex-async-std/LICENSE-MIT b/ntex-async-std/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/ntex-async-std/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/ntex-async-std/src/io.rs b/ntex-async-std/src/io.rs deleted file mode 100644 index 7180aeae..00000000 --- a/ntex-async-std/src/io.rs +++ /dev/null @@ -1,220 +0,0 @@ -use std::{ - any, cell::RefCell, future::poll_fn, io, pin::Pin, task::ready, task::Context, - task::Poll, -}; - -use async_std::io::{Read as ARead, Write as AWrite}; -use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf}; - -use crate::TcpStream; - -impl IoStream for TcpStream { - fn start(self, read: ReadContext, write: WriteContext) -> Option> { - let mut rio = Read(RefCell::new(self.clone())); - async_std::task::spawn_local(async move { - read.handle(&mut rio).await; - }); - let mut wio = Write(RefCell::new(self.clone())); - async_std::task::spawn_local(async move { - write.handle(&mut wio).await; - }); - Some(Box::new(self)) - } -} - -impl Handle for TcpStream { - fn query(&self, id: any::TypeId) -> Option> { - if id == any::TypeId::of::() { - if let Ok(addr) = self.0.peer_addr() { - return Some(Box::new(types::PeerAddr(addr))); - } - } - None - } -} - -/// Read io task -struct Read(RefCell); - -impl ntex_io::AsyncRead for Read { - async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { - // read data from socket - let result = poll_fn(|cx| { - let mut io = self.0.borrow_mut(); - poll_read_buf(Pin::new(&mut io.0), cx, &mut buf) - }) - .await; - (buf, result) - } -} - -struct Write(RefCell); - -impl ntex_io::AsyncWrite for Write { - #[inline] - async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { - poll_fn(|cx| { - if let Some(mut b) = buf.take() { - let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx); - buf.set(b); - result - } else { - Poll::Ready(Ok(())) - } - }) - .await - } - - #[inline] - async fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - - #[inline] - async fn shutdown(&mut self) -> io::Result<()> { - self.0.borrow().0.shutdown(std::net::Shutdown::Both) - } -} - -/// Flush write buffer to underlying I/O stream. -pub(super) fn flush_io( - io: &mut T, - buf: &mut BytesVec, - cx: &mut Context<'_>, -) -> Poll> { - let len = buf.len(); - - if len != 0 { - // log::trace!("flushing framed transport: {:?}", buf.len()); - - let mut written = 0; - let result = loop { - break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { - Poll::Ready(Ok(n)) => { - if n == 0 { - log::trace!("Disconnected during flush, written {}", written); - Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))) - } else { - written += n; - if written == len { - buf.clear(); - Poll::Ready(Ok(())) - } else { - continue; - } - } - } - Poll::Pending => { - // remove written data - buf.advance(written); - Poll::Pending - } - Poll::Ready(Err(e)) => { - log::trace!("Error during flush: {}", e); - Poll::Ready(Err(e)) - } - }; - }; - // log::trace!("flushed {} bytes", written); - - // flush - if written > 0 { - match Pin::new(&mut *io).poll_flush(cx) { - Poll::Ready(Ok(_)) => result, - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - log::trace!("error during flush: {}", e); - Poll::Ready(Err(e)) - } - } - } else { - result - } - } else { - Poll::Ready(Ok(())) - } -} - -pub fn poll_read_buf( - io: Pin<&mut T>, - cx: &mut Context<'_>, - buf: &mut BytesVec, -) -> Poll> { - let dst = unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [u8]) }; - let n = ready!(io.poll_read(cx, dst))?; - - // Safety: This is guaranteed to be the number of initialized (and read) - // bytes due to the invariants provided by Read::poll_read() api - unsafe { - buf.advance_mut(n); - } - - Poll::Ready(Ok(n)) -} - -#[cfg(unix)] -mod unixstream { - use super::*; - use crate::UnixStream; - - impl IoStream for UnixStream { - fn start(self, read: ReadContext, write: WriteContext) -> Option> { - let mut rio = Read(RefCell::new(self.clone())); - async_std::task::spawn_local(async move { - read.handle(&mut rio).await; - }); - let mut wio = Write(RefCell::new(self)); - async_std::task::spawn_local(async move { - write.handle(&mut wio).await; - }); - None - } - } - - /// Read io task - struct Read(RefCell); - - impl ntex_io::AsyncRead for Read { - async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { - // read data from socket - let result = poll_fn(|cx| { - let mut io = self.0.borrow_mut(); - poll_read_buf(Pin::new(&mut io.0), cx, &mut buf) - }) - .await; - (buf, result) - } - } - - struct Write(RefCell); - - impl ntex_io::AsyncWrite for Write { - #[inline] - async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { - poll_fn(|cx| { - if let Some(mut b) = buf.take() { - let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx); - buf.set(b); - result - } else { - Poll::Ready(Ok(())) - } - }) - .await - } - - #[inline] - async fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - - #[inline] - async fn shutdown(&mut self) -> io::Result<()> { - self.0.borrow().0.shutdown(std::net::Shutdown::Both) - } - } -} diff --git a/ntex-async-std/src/lib.rs b/ntex-async-std/src/lib.rs deleted file mode 100644 index e347d282..00000000 --- a/ntex-async-std/src/lib.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{io::Result, net, net::SocketAddr}; - -use ntex_bytes::PoolRef; -use ntex_io::Io; - -mod io; -mod signals; - -pub use self::signals::{signal, Signal}; - -#[derive(Clone)] -struct TcpStream(async_std::net::TcpStream); - -#[cfg(unix)] -#[derive(Clone)] -struct UnixStream(async_std::os::unix::net::UnixStream); - -/// Opens a TCP connection to a remote host. -pub async fn tcp_connect(addr: SocketAddr) -> Result { - let sock = async_std::net::TcpStream::connect(addr).await?; - sock.set_nodelay(true)?; - Ok(Io::new(TcpStream(sock))) -} - -/// Opens a TCP connection to a remote host and use specified memory pool. -pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result { - let sock = async_std::net::TcpStream::connect(addr).await?; - sock.set_nodelay(true)?; - Ok(Io::with_memory_pool(TcpStream(sock), pool)) -} - -#[cfg(unix)] -/// Opens a unix stream connection. -pub async fn unix_connect

(addr: P) -> Result -where - P: AsRef, -{ - let sock = async_std::os::unix::net::UnixStream::connect(addr).await?; - Ok(Io::new(UnixStream(sock))) -} - -#[cfg(unix)] -/// Opens a unix stream connection and specified memory pool. -pub async fn unix_connect_in

(addr: P, pool: PoolRef) -> Result -where - P: AsRef, -{ - let sock = async_std::os::unix::net::UnixStream::connect(addr).await?; - Ok(Io::with_memory_pool(UnixStream(sock), pool)) -} - -/// Convert std TcpStream to async-std's TcpStream -pub fn from_tcp_stream(stream: net::TcpStream) -> Result { - stream.set_nonblocking(true)?; - stream.set_nodelay(true)?; - Ok(Io::new(TcpStream(async_std::net::TcpStream::from(stream)))) -} - -#[cfg(unix)] -/// Convert std UnixStream to async-std's UnixStream -pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { - stream.set_nonblocking(true)?; - Ok(Io::new(UnixStream(From::from(stream)))) -} diff --git a/ntex-async-std/src/signals.rs b/ntex-async-std/src/signals.rs deleted file mode 100644 index d90135ad..00000000 --- a/ntex-async-std/src/signals.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; - -thread_local! { - static SRUN: RefCell = const { RefCell::new(false) }; - static SHANDLERS: Rc>>> = Default::default(); -} - -/// Different types of process signals -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum Signal { - /// SIGHUP - Hup, - /// SIGINT - Int, - /// SIGTERM - Term, - /// SIGQUIT - Quit, -} - -/// Register signal handler. -/// -/// Signals are handled by oneshots, you have to re-register -/// after each signal. -pub fn signal() -> Option> { - if !SRUN.with(|v| *v.borrow()) { - async_std::task::spawn_local(Signals::new()); - } - SHANDLERS.with(|handlers| { - let (tx, rx) = oneshot::channel(); - handlers.borrow_mut().push(tx); - Some(rx) - }) -} - -struct Signals {} - -impl Signals { - pub(super) fn new() -> Signals { - Self {} - } -} - -impl Future for Signals { - type Output = (); - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Ready(()) - } -} diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 8afe3ad4..461cc180 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -6,6 +6,8 @@ * Drop glommio support +* Drop async-std support + ## [2.4.0] - 2024-09-25 * Update to glommio v0.9 diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 266d0a84..47155fca 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -24,9 +24,6 @@ tokio = ["ntex-rt/tokio", "ntex-tokio"] # compio runtime compio = ["ntex-rt/compio", "ntex-compio"] -# async-std runtime -async-std = ["ntex-rt/async-std", "ntex-async-std"] - # default ntex runtime default-rt = ["ntex-rt/default-rt", "ntex-runtime", "ntex-iodriver", "slab", "socket2"] @@ -40,8 +37,6 @@ ntex-util = "2.5" ntex-tokio = { version = "0.5.3", optional = true } ntex-compio = { version = "0.2.4", optional = true } -ntex-async-std = { version = "0.5.1", optional = true } - ntex-runtime = { version = "0.1.0", optional = true } ntex-iodriver = { version = "0.1.0", optional = true } diff --git a/ntex-net/src/compat.rs b/ntex-net/src/compat.rs index 0092c172..e48e9ca2 100644 --- a/ntex-net/src/compat.rs +++ b/ntex-net/src/compat.rs @@ -9,7 +9,6 @@ pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in}; #[cfg(all( feature = "default-rt", not(feature = "tokio"), - not(feature = "async-std"), not(feature = "compio") ))] pub use crate::rt::{ @@ -20,7 +19,6 @@ pub use crate::rt::{ #[cfg(all( feature = "compio", not(feature = "tokio"), - not(feature = "async-std"), not(feature = "default-rt") ))] pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in}; @@ -29,34 +27,15 @@ pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in}; unix, feature = "compio", not(feature = "tokio"), - not(feature = "async-std"), not(feature = "default-rt") ))] pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in}; #[cfg(all( - feature = "async-std", not(feature = "tokio"), not(feature = "compio"), not(feature = "default-rt") ))] -pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in}; - -#[cfg(all( - unix, - feature = "async-std", - not(feature = "tokio"), - not(feature = "compio"), - not(feature = "default-rt") -))] -pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in}; - -#[cfg(all( - not(feature = "tokio"), - not(feature = "compio"), - not(feature = "async-std"), - not(feature = "default-rt") -))] mod no_rt { use ntex_io::Io; @@ -124,7 +103,6 @@ mod no_rt { #[cfg(all( not(feature = "tokio"), not(feature = "compio"), - not(feature = "async-std"), not(feature = "default-rt") ))] pub use no_rt::*; diff --git a/ntex-net/src/lib.rs b/ntex-net/src/lib.rs index 39199f4b..83ca23f3 100644 --- a/ntex-net/src/lib.rs +++ b/ntex-net/src/lib.rs @@ -12,7 +12,6 @@ pub use self::compat::*; #[cfg(all( feature = "default-rt", not(feature = "tokio"), - not(feature = "async-std"), not(feature = "compio") ))] mod rt; diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index 0ecdc9ac..415bea7e 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -6,6 +6,8 @@ * Drop glommio support +* Drop async-std support + ## [0.4.24] - 2025-01-03 * Relax runtime requirements diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 613bb157..1b58367f 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -29,16 +29,12 @@ compio = ["compio-driver", "compio-runtime"] # default ntex runtime default-rt = ["ntex-runtime", "ntex-iodriver"] -# async-std support -async-std = ["async_std/unstable"] - [dependencies] async-channel = "2" futures-core = "0.3" log = "0.4" oneshot = "0.1" -async_std = { version = "1", package = "async-std", optional = true } compio-driver = { version = "0.6", optional = true } compio-runtime = { version = "0.6", optional = true } tok-io = { version = "1", package = "tokio", default-features = false, features = [ diff --git a/ntex-rt/build.rs b/ntex-rt/build.rs index 81f6c0a9..1e866507 100644 --- a/ntex-rt/build.rs +++ b/ntex-rt/build.rs @@ -7,7 +7,6 @@ fn main() { let _ = match key.as_ref() { "CARGO_FEATURE_COMPIO" => features.insert("compio"), "CARGO_FEATURE_TOKIO" => features.insert("tokio"), - "CARGO_FEATURE_ASYNC_STD" => features.insert("async-std"), "CARGO_FEATURE_DEFAULT_RT" => features.insert("default-rt"), _ => false, }; diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 38446396..3089dcec 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -402,112 +402,9 @@ mod default_rt { } } -#[allow(dead_code)] -#[cfg(feature = "async-std")] -mod asyncstd { - use std::future::{poll_fn, Future}; - use std::{fmt, pin::Pin, task::ready, task::Context, task::Poll}; - - /// Runs the provided future, blocking the current thread until the future - /// completes. - pub fn block_on>(fut: F) { - async_std::task::block_on(fut); - } - - /// Spawn a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for spawning futures on the current - /// thread. - /// - /// # Panics - /// - /// This function panics if ntex system is not running. - #[inline] - pub fn spawn(mut f: F) -> JoinHandle - where - F: Future + 'static, - { - let ptr = crate::CB.with(|cb| (cb.borrow().0)()); - JoinHandle { - fut: async_std::task::spawn_local(async move { - if let Some(ptr) = ptr { - let mut f = unsafe { Pin::new_unchecked(&mut f) }; - let result = poll_fn(|ctx| { - let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr)); - let result = f.as_mut().poll(ctx); - crate::CB.with(|cb| (cb.borrow().2)(new_ptr)); - result - }) - .await; - crate::CB.with(|cb| (cb.borrow().3)(ptr)); - result - } else { - f.await - } - }), - } - } - - /// Executes a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for executing futures on the current - /// thread. - /// - /// # Panics - /// - /// This function panics if ntex system is not running. - #[inline] - pub fn spawn_fn(f: F) -> JoinHandle - where - F: FnOnce() -> R + 'static, - R: Future + 'static, - { - spawn(async move { f().await }) - } - - /// Spawns a blocking task. - /// - /// The task will be spawned onto a thread pool specifically dedicated - /// to blocking tasks. This is useful to prevent long-running synchronous - /// operations from blocking the main futures executor. - pub fn spawn_blocking(f: F) -> JoinHandle - where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, - { - JoinHandle { - fut: async_std::task::spawn_blocking(f), - } - } - - #[derive(Debug, Copy, Clone)] - pub struct JoinError; - - impl fmt::Display for JoinError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "JoinError") - } - } - - impl std::error::Error for JoinError {} - - pub struct JoinHandle { - fut: async_std::task::JoinHandle, - } - - impl Future for JoinHandle { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx)))) - } - } -} - #[cfg(feature = "tokio")] pub use self::tokio::*; -#[cfg(feature = "async-std")] -pub use self::asyncstd::*; - #[cfg(feature = "compio")] pub use self::compio::*; @@ -517,7 +414,6 @@ pub use self::default_rt::*; #[allow(dead_code)] #[cfg(all( not(feature = "tokio"), - not(feature = "async-std"), not(feature = "compio"), not(feature = "default-rt") ))] @@ -581,7 +477,6 @@ mod no_rt { #[cfg(all( not(feature = "tokio"), - not(feature = "async-std"), not(feature = "compio"), not(feature = "default-rt") ))]