Drop glommio support (#511)

This commit is contained in:
Nikolay Kim 2025-03-09 18:19:34 +05:00 committed by GitHub
parent 4c1bc3249b
commit 59ffd17b91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 31 additions and 608 deletions

View file

@ -36,7 +36,7 @@ jobs:
run: cargo llvm-cov --no-report --all --no-default-features --features="ntex/default-rt,ntex/cookie,ntex/url,ntex/compress,ntex/openssl,ntex/rustls,ntex/ws,ntex/brotli"
- name: Generate coverage report
run: cargo llvm-cov report --lcov --output-path lcov.info --ignore-filename-regex="ntex-compio|ntex-tokio|ntex-glommio|ntex-async-std"
run: cargo llvm-cov report --lcov --output-path lcov.info --ignore-filename-regex="ntex-compio|ntex-tokio|ntex-async-std"
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4

View file

@ -19,7 +19,6 @@ members = [
"ntex-async-std",
"ntex-compio",
"ntex-glommio",
"ntex-tokio",
]
@ -49,7 +48,6 @@ ntex-macros = { path = "ntex-macros" }
ntex-util = { path = "ntex-util" }
ntex-compio = { path = "ntex-compio" }
ntex-glommio = { path = "ntex-glommio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-async-std = { path = "ntex-async-std" }

View file

@ -1,57 +0,0 @@
# Changes
## [0.5.2] - 2024-09-xx
* Update to glommio v0.9
## [0.4.0] - 2024-01-09
* Release
## [0.4.0-b.0] - 2024-01-07
* Use "async fn" in trait for Service definition
## [0.3.1] - 2023-11-22
* Replace async-oneshot with oneshot
## [0.3.0] - 2023-06-22
* Release v0.3.0
## [0.3.0-beta.0] - 2023-06-16
* Migrate to ntex-service 1.2
## [0.2.4] - 2023-05-30
* Fix borrow mut panic #204
## [0.2.3] - 2023-04-11
* Chore upgrade glommio to 0.8
## [0.2.2] - 2023-01-26
* Update io api usage
## [0.2.0] - 2023-01-04
* Release
## [0.2.0-beta.0] - 2022-12-28
* Migrate to ntex-service 1.0
## [0.1.2] - 2022-02-20
* Upgrade to glommio 0.7
## [0.1.1] - 2022-01-30
* Update to ntex-io 0.1.7
## [0.1.0] - 2022-01-17
* Initial release

View file

@ -1,27 +0,0 @@
[package]
name = "ntex-glommio"
version = "0.5.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex-rs/ntex.git"
documentation = "https://docs.rs/ntex-rt-glommio/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2021"
[lib]
name = "ntex_glommio"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1"
ntex-io = "2.5"
ntex-util = "2.0"
futures-lite = "2.2"
log = "0.4"
oneshot = { version = "0.1", default-features = false, features = ["async"] }
[target.'cfg(target_os = "linux")'.dependencies]
glommio = "0.9"

View file

@ -1 +0,0 @@
../LICENSE-APACHE

View file

@ -1 +0,0 @@
../LICENSE-MIT

View file

@ -1,205 +0,0 @@
use std::{any, future::poll_fn, io, pin::Pin, task::ready, task::Context, task::Poll};
use futures_lite::io::{AsyncRead, AsyncWrite};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf};
use crate::net_impl::{TcpStream, UnixStream};
impl IoStream for TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut rio = Read(self.clone());
glommio::spawn_local(async move { read.handle(&mut rio).await }).detach();
let mut wio = Write(self.clone());
glommio::spawn_local(async move { write.handle(&mut wio).await }).detach();
Some(Box::new(self))
}
}
impl IoStream for UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut rio = UnixRead(self.clone());
glommio::spawn_local(async move {
read.handle(&mut rio).await;
})
.detach();
let mut wio = UnixWrite(self);
glommio::spawn_local(async move { write.handle(&mut wio).await }).detach();
None
}
}
impl Handle for TcpStream {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
if let Ok(addr) = self.0.borrow().peer_addr() {
return Some(Box::new(types::PeerAddr(addr)));
}
}
None
}
}
/// Read io task
struct Read(TcpStream);
impl ntex_io::AsyncRead for Read {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0 .0.borrow_mut();
poll_read_buf(Pin::new(&mut *io), cx, &mut buf)
})
.await;
(buf, result)
}
}
struct Write(TcpStream);
impl ntex_io::AsyncWrite for Write {
#[inline]
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut *self.0 .0.borrow_mut(), &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
poll_fn(|cx| Pin::new(&mut *self.0 .0.borrow_mut()).poll_close(cx)).await
}
}
struct UnixRead(UnixStream);
impl ntex_io::AsyncRead for UnixRead {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0 .0.borrow_mut();
poll_read_buf(Pin::new(&mut *io), cx, &mut buf)
})
.await;
(buf, result)
}
}
struct UnixWrite(UnixStream);
impl ntex_io::AsyncWrite for UnixWrite {
#[inline]
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut *self.0 .0.borrow_mut(), &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
poll_fn(|cx| Pin::new(&mut *self.0 .0.borrow_mut()).poll_close(cx)).await
}
}
/// Flush write buffer to underlying I/O stream.
pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
io: &mut T,
buf: &mut BytesVec,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
let len = buf.len();
if len != 0 {
// log::trace!("flushing framed transport: {:?}", buf.len());
let mut written = 0;
let result = loop {
break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)))
} else {
written += n;
if written == len {
buf.clear();
Poll::Ready(Ok(()))
} else {
continue;
}
}
}
Poll::Pending => {
// remove written data
buf.advance(written);
Poll::Pending
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
Poll::Ready(Err(e))
}
};
};
// log::trace!("flushed {} bytes", written);
// flush
if written > 0 {
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
Poll::Ready(Err(e))
}
}
} else {
result
}
} else {
Poll::Ready(Ok(()))
}
}
pub fn poll_read_buf<T: AsyncRead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
let dst = unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [u8]) };
let n = ready!(io.poll_read(cx, dst))?;
// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by Read::poll_read() api
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}

View file

@ -1,90 +0,0 @@
#[cfg(target_os = "linux")]
mod io;
#[cfg(target_os = "linux")]
mod signals;
#[cfg(target_os = "linux")]
pub use self::signals::{signal, Signal};
#[cfg(target_os = "linux")]
mod net_impl {
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::{cell::RefCell, io::Result, net, net::SocketAddr, rc::Rc};
use ntex_bytes::PoolRef;
use ntex_io::Io;
#[derive(Clone)]
pub(crate) struct TcpStream(pub(crate) Rc<RefCell<glommio::net::TcpStream>>);
impl TcpStream {
fn new(io: glommio::net::TcpStream) -> Self {
Self(Rc::new(RefCell::new(io)))
}
}
#[derive(Clone)]
pub(crate) struct UnixStream(pub(crate) Rc<RefCell<glommio::net::UnixStream>>);
impl UnixStream {
fn new(io: glommio::net::UnixStream) -> Self {
Self(Rc::new(RefCell::new(io)))
}
}
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
let sock = glommio::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::new(TcpStream::new(sock)))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
let sock = glommio::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::with_memory_pool(TcpStream::new(sock), pool))
}
/// Opens a unix stream connection.
pub async fn unix_connect<P>(addr: P) -> Result<Io>
where
P: AsRef<std::path::Path>,
{
let sock = glommio::net::UnixStream::connect(addr).await?;
Ok(Io::new(UnixStream::new(sock)))
}
/// Opens a unix stream connection and specified memory pool.
pub async fn unix_connect_in<P>(addr: P, pool: PoolRef) -> Result<Io>
where
P: AsRef<std::path::Path>,
{
let sock = glommio::net::UnixStream::connect(addr).await?;
Ok(Io::with_memory_pool(UnixStream::new(sock), pool))
}
/// Convert std TcpStream to glommio's TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
stream.set_nonblocking(true)?;
stream.set_nodelay(true)?;
unsafe {
Ok(Io::new(TcpStream::new(
glommio::net::TcpStream::from_raw_fd(stream.into_raw_fd()),
)))
}
}
/// Convert std UnixStream to glommio's UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
stream.set_nonblocking(true)?;
unsafe {
Ok(Io::new(UnixStream::new(
glommio::net::UnixStream::from_raw_fd(stream.into_raw_fd()),
)))
}
}
}
#[cfg(target_os = "linux")]
pub use self::net_impl::*;

View file

@ -1,50 +0,0 @@
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
thread_local! {
static SRUN: RefCell<bool> = const { RefCell::new(false) };
static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default();
}
/// Different types of process signals
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum Signal {
/// SIGHUP
Hup,
/// SIGINT
Int,
/// SIGTERM
Term,
/// SIGQUIT
Quit,
}
/// Register signal handler.
///
/// Signals are handled by oneshots, you have to re-register
/// after each signal.
pub fn signal() -> Option<oneshot::Receiver<Signal>> {
if !SRUN.with(|v| *v.borrow()) {
glommio::spawn_local(Signals::new()).detach();
}
SHANDLERS.with(|handlers| {
let (tx, rx) = oneshot::channel();
handlers.borrow_mut().push(tx);
Some(rx)
})
}
struct Signals {}
impl Signals {
pub(super) fn new() -> Signals {
Self {}
}
}
impl Future for Signals {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(())
}
}

View file

@ -1,5 +1,11 @@
# Changes
## [2.5.0] - 2025-03-10
* Add ntex-runtime support
* Drop glommio support
## [2.4.0] - 2024-09-25
* Update to glommio v0.9

View file

@ -24,9 +24,6 @@ tokio = ["ntex-rt/tokio", "ntex-tokio"]
# compio runtime
compio = ["ntex-rt/compio", "ntex-compio"]
# glommio runtime
glommio = ["ntex-rt/glommio", "ntex-glommio"]
# async-std runtime
async-std = ["ntex-rt/async-std", "ntex-async-std"]
@ -43,7 +40,6 @@ ntex-util = "2.5"
ntex-tokio = { version = "0.5.3", optional = true }
ntex-compio = { version = "0.2.4", optional = true }
ntex-glommio = { version = "0.5.2", optional = true }
ntex-async-std = { version = "0.5.1", optional = true }
ntex-runtime = { version = "0.1.0", optional = true }

View file

@ -10,8 +10,7 @@ pub use ntex_tokio::{from_unix_stream, unix_connect, unix_connect_in};
feature = "default-rt",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "glommio")
not(feature = "compio")
))]
pub use crate::rt::{
from_tcp_stream, from_unix_stream, tcp_connect, tcp_connect_in, unix_connect,
@ -22,8 +21,7 @@ pub use crate::rt::{
feature = "compio",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in};
@ -32,8 +30,7 @@ pub use ntex_compio::{from_tcp_stream, tcp_connect, tcp_connect_in};
feature = "compio",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in};
@ -41,8 +38,7 @@ pub use ntex_compio::{from_unix_stream, unix_connect, unix_connect_in};
feature = "async-std",
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in};
@ -51,36 +47,15 @@ pub use ntex_async_std::{from_tcp_stream, tcp_connect, tcp_connect_in};
feature = "async-std",
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
pub use ntex_async_std::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
feature = "glommio",
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "default-rt"),
not(feature = "async-std")
))]
pub use ntex_glommio::{from_tcp_stream, tcp_connect, tcp_connect_in};
#[cfg(all(
unix,
feature = "glommio",
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "default-rt"),
not(feature = "async-std")
))]
pub use ntex_glommio::{from_unix_stream, unix_connect, unix_connect_in};
#[cfg(all(
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "async-std"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
mod no_rt {
use ntex_io::Io;
@ -150,7 +125,6 @@ mod no_rt {
not(feature = "tokio"),
not(feature = "compio"),
not(feature = "async-std"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
pub use no_rt::*;

View file

@ -13,7 +13,6 @@ pub use self::compat::*;
feature = "default-rt",
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "glommio")
not(feature = "compio")
))]
mod rt;

View file

@ -1,5 +1,11 @@
# Changes
## [0.4.25] - 2025-03-10
* Add "ntex-runtime" support
* Drop glommio support
## [0.4.24] - 2025-01-03
* Relax runtime requirements

View file

@ -20,9 +20,6 @@ path = "src/lib.rs"
[features]
default = []
# glommio support
glommio = ["glomm-io", "futures-channel"]
# tokio support
tokio = ["tok-io"]
@ -51,7 +48,3 @@ tok-io = { version = "1", package = "tokio", default-features = false, features
ntex-runtime = { version = "0.1", optional = true }
ntex-iodriver = { version = "0.1", optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
glomm-io = { version = "0.9", package = "glommio", optional = true }
futures-channel = { version = "0.3", optional = true }

View file

@ -7,7 +7,6 @@ fn main() {
let _ = match key.as_ref() {
"CARGO_FEATURE_COMPIO" => features.insert("compio"),
"CARGO_FEATURE_TOKIO" => features.insert("tokio"),
"CARGO_FEATURE_GLOMMIO" => features.insert("glommio"),
"CARGO_FEATURE_ASYNC_STD" => features.insert("async-std"),
"CARGO_FEATURE_DEFAULT_RT" => features.insert("default-rt"),
_ => false,

View file

@ -502,129 +502,12 @@ mod asyncstd {
}
}
#[allow(dead_code)]
#[cfg(all(feature = "glommio", target_os = "linux"))]
mod glommio {
use std::future::{poll_fn, Future};
use std::{pin::Pin, task::Context, task::Poll};
use futures_channel::oneshot::Canceled;
use glomm_io::task;
pub type JoinError = Canceled;
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
let ex = glomm_io::LocalExecutor::default();
ex.run(async move {
let _ = fut.await;
})
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
JoinHandle {
fut: Either::Left(
glomm_io::spawn_local(async move {
if let Some(ptr) = ptr {
glomm_io::executor().yield_now().await;
let mut f = unsafe { Pin::new_unchecked(&mut f) };
let result = poll_fn(|ctx| {
let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
let result = f.as_mut().poll(ctx);
crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
result
})
.await;
crate::CB.with(|cb| (cb.borrow().3)(ptr));
result
} else {
glomm_io::executor().yield_now().await;
f.await
}
})
.detach(),
),
}
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = glomm_io::executor().spawn_blocking(f);
JoinHandle {
fut: Either::Right(Box::pin(async move { Ok(fut.await) })),
}
}
enum Either<T1, T2> {
Left(T1),
Right(T2),
}
/// Blocking operation completion future. It resolves with results
/// of blocking function execution.
#[allow(clippy::type_complexity)]
pub struct JoinHandle<T> {
fut:
Either<task::JoinHandle<T>, Pin<Box<dyn Future<Output = Result<T, Canceled>>>>>,
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, Canceled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.fut {
Either::Left(ref mut f) => match Pin::new(f).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(res) => Poll::Ready(res.ok_or(Canceled)),
},
Either::Right(ref mut f) => Pin::new(f).poll(cx),
}
}
}
}
#[cfg(feature = "tokio")]
pub use self::tokio::*;
#[cfg(feature = "async-std")]
pub use self::asyncstd::*;
#[cfg(feature = "glommio")]
pub use self::glommio::*;
#[cfg(feature = "compio")]
pub use self::compio::*;
@ -636,8 +519,7 @@ pub use self::default_rt::*;
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
mod no_rt {
use std::task::{Context, Poll};
@ -701,7 +583,6 @@ mod no_rt {
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "compio"),
not(feature = "default-rt"),
not(feature = "glommio")
not(feature = "default-rt")
))]
pub use self::no_rt::*;

View file

@ -1,5 +1,13 @@
# Changes
## [2.12.0] - 2025-03-10
* Add "ntex-runtime" support
* Drop glommio support
* Drop async-std support
## [2.11.0] - 2025-01-31
* Cpu affinity support for server

View file

@ -45,12 +45,6 @@ url = ["url-pkg"]
# tokio runtime
tokio = ["ntex-net/tokio"]
# glommio runtime
glommio = ["ntex-net/glommio"]
# async-std runtime
async-std = ["ntex-net/async-std"]
# compio runtime
compio = ["ntex-net/compio"]