Glommio runtime support (#94)

* add glommio runtime support

* optional compilation for glommio
This commit is contained in:
Nikolay Kim 2022-01-17 01:03:15 +06:00 committed by GitHub
parent 429073f9ff
commit 450332144d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1024 additions and 10 deletions

View file

@ -63,6 +63,12 @@ jobs:
cd ntex
cargo test --no-default-features --no-fail-fast --features="async-std,cookie,url,compress,openssl,rustls" --lib -- --test-threads 1
- name: Run glommio tests
timeout-minutes: 40
run: |
cd ntex
sudo -E env PATH="$PATH" bash -c "ulimit -l 512 && ulimit -a && cargo test --no-default-features --no-fail-fast --features=\"glommio,cookie,url,compress,openssl,rustls\""
- name: Install tarpaulin
if: matrix.version == '1.56.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
@ -82,12 +88,20 @@ jobs:
cd ntex
cargo tarpaulin --out Xml --output-dir=.. --no-default-features --features="async-std,cookie,url,compress,openssl,rustls" --lib
- name: Generate coverage report (glommio)
if: matrix.version == '1.56.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cd ntex
sudo -E env PATH="$PATH" bash -c "ulimit -l 512 && ulimit -a && cargo tarpaulin --out Xml --no-default-features --features=\"glommio,cookie,url,compress,openssl,rustls\" --lib"
- name: Upload to Codecov
if: matrix.version == '1.56.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2
with:
file: cobertura.xml
files: cobertura.xml, ./ntex/cobertura.xml
verbose: true
- name: Install cargo-cache
continue-on-error: true

View file

@ -10,6 +10,7 @@ members = [
"ntex-tls",
"ntex-macros",
"ntex-util",
"ntex-glommio",
"ntex-tokio",
"ntex-async-std",
]
@ -26,5 +27,6 @@ ntex-tls = { path = "ntex-tls" }
ntex-macros = { path = "ntex-macros" }
ntex-util = { path = "ntex-util" }
ntex-glommio = { path = "ntex-glommio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-async-std = { path = "ntex-async-std" }

View file

@ -6,7 +6,7 @@ description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex-rs/ntex.git"
documentation = "https://docs.rs/ntex-rt-tokio/"
documentation = "https://docs.rs/ntex-rt-async-std/"
categories = ["network-programming", "asynchronous"]
license = "MIT"
edition = "2018"

5
ntex-glommio/CHANGES.md Normal file
View file

@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2022-01-03
* Initial release

30
ntex-glommio/Cargo.toml Normal file
View file

@ -0,0 +1,30 @@
[package]
name = "ntex-glommio"
version = "0.1.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex-rs/ntex.git"
documentation = "https://docs.rs/ntex-rt-glommio/"
categories = ["network-programming", "asynchronous"]
license = "MIT"
edition = "2018"
[lib]
name = "ntex_glommio"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.9"
ntex-io = "0.1.4"
ntex-util = "0.1.9"
async-oneshot = "0.5.0"
futures-lite = "1.12"
futures-channel = "0.3"
derive_more = "0.99"
log = "0.4"
pin-project-lite = "0.2"
[target.'cfg(target_os = "linux")'.dependencies]
glommio = "0.6"

1
ntex-glommio/LICENSE Symbolic link
View file

@ -0,0 +1 @@
../LICENSE

633
ntex-glommio/src/io.rs Normal file
View file

@ -0,0 +1,633 @@
use std::task::{Context, Poll};
use std::{any, future::Future, io, pin::Pin};
use futures_lite::future::FutureExt;
use futures_lite::io::{AsyncRead, AsyncWrite};
use glommio::Task;
use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_util::{ready, time::sleep, time::Sleep};
use crate::net_impl::{TcpStream, UnixStream};
impl IoStream for TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
Task::local(ReadTask::new(self.clone(), read)).detach();
Task::local(WriteTask::new(self.clone(), write)).detach();
Some(Box::new(self))
}
}
impl IoStream for UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
Task::local(UnixReadTask::new(self.clone(), read)).detach();
Task::local(UnixWriteTask::new(self, write)).detach();
None
}
}
impl Handle for TcpStream {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
if let Ok(addr) = self.0.borrow().peer_addr() {
return Some(Box::new(types::PeerAddr(addr)));
}
}
None
}
}
/// Read io task
struct ReadTask {
io: TcpStream,
state: ReadContext,
}
impl ReadTask {
/// Create new read io task
fn new(io: TcpStream, state: ReadContext) -> Self {
Self { io, state }
}
}
impl Future for ReadTask {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut();
loop {
match ready!(this.state.poll_ready(cx)) {
ReadStatus::Ready => {
let pool = this.state.memory_pool();
let mut buf = this.state.get_read_buf();
let (hw, lw) = pool.read_params().unpack();
// read data from socket
let mut new_bytes = 0;
let mut close = false;
let mut pending = false;
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
match poll_read_buf(
Pin::new(&mut *this.io.0.borrow_mut()),
cx,
&mut buf,
) {
Poll::Pending => {
pending = true;
break;
}
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("glommio stream is disconnected");
close = true;
} else {
new_bytes += n;
if new_bytes <= hw {
continue;
}
}
break;
}
Poll::Ready(Err(err)) => {
log::trace!("read task failed on io {:?}", err);
let _ = this.state.release_read_buf(buf, new_bytes);
this.state.close(Some(err));
return Poll::Ready(());
}
}
}
if new_bytes == 0 && close {
this.state.close(None);
return Poll::Ready(());
}
this.state.release_read_buf(buf, new_bytes);
return if close {
this.state.close(None);
Poll::Ready(())
} else if pending {
Poll::Pending
} else {
continue;
};
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
return Poll::Ready(());
}
}
}
}
}
enum IoWriteState {
Processing(Option<Sleep>),
Shutdown(Sleep, Shutdown),
}
enum Shutdown {
None(Pin<Box<dyn Future<Output = glommio::Result<(), ()>>>>),
Stopping(u16),
}
/// Write io task
struct WriteTask {
st: IoWriteState,
io: TcpStream,
state: WriteContext,
}
impl WriteTask {
/// Create new write io task
fn new(io: TcpStream, state: WriteContext) -> Self {
Self {
io,
state,
st: IoWriteState::Processing(None),
}
}
}
impl Future for WriteTask {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
match this.st {
IoWriteState::Processing(ref mut delay) => {
match this.state.poll_ready(cx) {
Poll::Ready(WriteStatus::Ready) => {
if let Some(delay) = delay {
if delay.poll_elapsed(cx).is_ready() {
this.state.close(Some(io::Error::new(
io::ErrorKind::TimedOut,
"Operation timedout",
)));
return Poll::Ready(());
}
}
// flush framed instance
match flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx) {
Poll::Pending | Poll::Ready(true) => Poll::Pending,
Poll::Ready(false) => Poll::Ready(()),
}
}
Poll::Ready(WriteStatus::Timeout(time)) => {
log::trace!("initiate timeout delay for {:?}", time);
if delay.is_none() {
*delay = Some(sleep(time));
}
self.poll(cx)
}
Poll::Ready(WriteStatus::Shutdown(time)) => {
log::trace!("write task is instructed to shutdown");
let timeout = if let Some(delay) = delay.take() {
delay
} else {
sleep(time)
};
let io = this.io.clone();
let fut = Box::pin(async move {
io.0.borrow().shutdown(std::net::Shutdown::Write).await
});
this.st = IoWriteState::Shutdown(timeout, Shutdown::None(fut));
self.poll(cx)
}
Poll::Ready(WriteStatus::Terminate) => {
log::trace!("write task is instructed to terminate");
let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx);
this.state.close(None);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
IoWriteState::Shutdown(ref mut delay, ref mut st) => {
// close WRITE side and wait for disconnect on read side.
// use disconnect timeout, otherwise it could hang forever.
loop {
match st {
Shutdown::None(ref mut fut) => {
// flush write buffer
let flush_result =
flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx);
match flush_result {
Poll::Ready(true) => {
if ready!(fut.poll(cx)).is_err() {
this.state.close(None);
return Poll::Ready(());
}
*st = Shutdown::Stopping(0);
continue;
}
Poll::Ready(false) => {
log::trace!(
"write task is closed with err during flush"
);
this.state.close(None);
return Poll::Ready(());
}
_ => (),
}
}
Shutdown::Stopping(ref mut count) => {
// read until 0 or err
let mut buf = [0u8; 512];
let io = &mut this.io;
loop {
match Pin::new(&mut *io.0.borrow_mut())
.poll_read(cx, &mut buf)
{
Poll::Ready(Err(e)) => {
log::trace!("write task is stopped");
this.state.close(Some(e));
return Poll::Ready(());
}
Poll::Ready(Ok(0)) => {
log::trace!("glommio socket is disconnected");
this.state.close(None);
return Poll::Ready(());
}
Poll::Ready(Ok(n)) => {
*count += n as u16;
if *count > 4096 {
log::trace!(
"write task is stopped, too much input"
);
this.state.close(None);
return Poll::Ready(());
}
}
Poll::Pending => break,
}
}
}
}
// disconnect timeout
if delay.poll_elapsed(cx).is_pending() {
return Poll::Pending;
}
log::trace!("write task is stopped after delay");
this.state.close(None);
let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx);
return Poll::Ready(());
}
}
}
}
}
/// Flush write buffer to underlying I/O stream.
pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
io: &mut T,
state: &WriteContext,
cx: &mut Context<'_>,
) -> Poll<bool> {
let mut buf = if let Some(buf) = state.get_write_buf() {
buf
} else {
return Poll::Ready(true);
};
let len = buf.len();
let pool = state.memory_pool();
if len != 0 {
// log::trace!("flushing framed transport: {:?}", buf.len());
let mut written = 0;
while written < len {
match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
pool.release_write_buf(buf);
state.close(Some(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)));
return Poll::Ready(false);
} else {
written += n
}
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
pool.release_write_buf(buf);
state.close(Some(e));
return Poll::Ready(false);
}
}
}
log::trace!("flushed {} bytes", written);
// remove written data
let result = if written == len {
buf.clear();
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Ready(true)
} else {
buf.advance(written);
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Pending
};
// flush
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
state.close(Some(e));
Poll::Ready(false)
}
}
} else {
Poll::Ready(true)
}
}
pub fn poll_read_buf<T: AsyncRead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesMut,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let dst = unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [u8]) };
let n = ready!(io.poll_read(cx, dst))?;
// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by Read::poll_read() api
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
/// Read io task
struct UnixReadTask {
io: UnixStream,
state: ReadContext,
}
impl UnixReadTask {
/// Create new read io task
fn new(io: UnixStream, state: ReadContext) -> Self {
Self { io, state }
}
}
impl Future for UnixReadTask {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut();
loop {
match ready!(this.state.poll_ready(cx)) {
ReadStatus::Ready => {
let pool = this.state.memory_pool();
let mut buf = this.state.get_read_buf();
let (hw, lw) = pool.read_params().unpack();
// read data from socket
let mut new_bytes = 0;
let mut close = false;
let mut pending = false;
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
match poll_read_buf(
Pin::new(&mut *this.io.0.borrow_mut()),
cx,
&mut buf,
) {
Poll::Pending => {
pending = true;
break;
}
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("glommio stream is disconnected");
close = true;
} else {
new_bytes += n;
if new_bytes <= hw {
continue;
}
}
break;
}
Poll::Ready(Err(err)) => {
log::trace!("read task failed on io {:?}", err);
let _ = this.state.release_read_buf(buf, new_bytes);
this.state.close(Some(err));
return Poll::Ready(());
}
}
}
if new_bytes == 0 && close {
this.state.close(None);
return Poll::Ready(());
}
this.state.release_read_buf(buf, new_bytes);
return if close {
this.state.close(None);
Poll::Ready(())
} else if pending {
Poll::Pending
} else {
continue;
};
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
return Poll::Ready(());
}
}
}
}
}
/// Write io task
struct UnixWriteTask {
st: IoWriteState,
io: UnixStream,
state: WriteContext,
}
impl UnixWriteTask {
/// Create new write io task
fn new(io: UnixStream, state: WriteContext) -> Self {
Self {
io,
state,
st: IoWriteState::Processing(None),
}
}
}
impl Future for UnixWriteTask {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
match this.st {
IoWriteState::Processing(ref mut delay) => {
match this.state.poll_ready(cx) {
Poll::Ready(WriteStatus::Ready) => {
if let Some(delay) = delay {
if delay.poll_elapsed(cx).is_ready() {
this.state.close(Some(io::Error::new(
io::ErrorKind::TimedOut,
"Operation timedout",
)));
return Poll::Ready(());
}
}
// flush framed instance
match flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx) {
Poll::Pending | Poll::Ready(true) => Poll::Pending,
Poll::Ready(false) => Poll::Ready(()),
}
}
Poll::Ready(WriteStatus::Timeout(time)) => {
log::trace!("initiate timeout delay for {:?}", time);
if delay.is_none() {
*delay = Some(sleep(time));
}
self.poll(cx)
}
Poll::Ready(WriteStatus::Shutdown(time)) => {
log::trace!("write task is instructed to shutdown");
let timeout = if let Some(delay) = delay.take() {
delay
} else {
sleep(time)
};
let io = this.io.clone();
let fut = Box::pin(async move {
io.0.borrow().shutdown(std::net::Shutdown::Write).await
});
this.st = IoWriteState::Shutdown(timeout, Shutdown::None(fut));
self.poll(cx)
}
Poll::Ready(WriteStatus::Terminate) => {
log::trace!("write task is instructed to terminate");
let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx);
this.state.close(None);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
IoWriteState::Shutdown(ref mut delay, ref mut st) => {
// close WRITE side and wait for disconnect on read side.
// use disconnect timeout, otherwise it could hang forever.
loop {
match st {
Shutdown::None(ref mut fut) => {
// flush write buffer
let flush_result =
flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx);
match flush_result {
Poll::Ready(true) => {
if ready!(fut.poll(cx)).is_err() {
this.state.close(None);
return Poll::Ready(());
}
*st = Shutdown::Stopping(0);
continue;
}
Poll::Ready(false) => {
log::trace!(
"write task is closed with err during flush"
);
this.state.close(None);
return Poll::Ready(());
}
_ => (),
}
}
Shutdown::Stopping(ref mut count) => {
// read until 0 or err
let mut buf = [0u8; 512];
let io = &mut this.io;
loop {
match Pin::new(&mut *io.0.borrow_mut())
.poll_read(cx, &mut buf)
{
Poll::Ready(Err(e)) => {
log::trace!("write task is stopped");
this.state.close(Some(e));
return Poll::Ready(());
}
Poll::Ready(Ok(0)) => {
log::trace!("glommio unix socket is disconnected");
this.state.close(None);
return Poll::Ready(());
}
Poll::Ready(Ok(n)) => {
*count += n as u16;
if *count > 4096 {
log::trace!(
"write task is stopped, too much input"
);
this.state.close(None);
return Poll::Ready(());
}
}
Poll::Pending => break,
}
}
}
}
// disconnect timeout
if delay.poll_elapsed(cx).is_pending() {
return Poll::Pending;
}
log::trace!("write task is stopped after delay");
this.state.close(None);
let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx);
return Poll::Ready(());
}
}
}
}
}

92
ntex-glommio/src/lib.rs Normal file
View file

@ -0,0 +1,92 @@
#[cfg(target_os = "linux")]
mod io;
#[cfg(target_os = "linux")]
mod signals;
#[cfg(target_os = "linux")]
pub use self::signals::{signal, Signal};
#[cfg(target_os = "linux")]
mod net_impl {
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::{cell::RefCell, io::Result, net, net::SocketAddr, rc::Rc};
use ntex_bytes::PoolRef;
use ntex_io::Io;
pub type JoinError = futures_channel::oneshot::Canceled;
#[derive(Clone)]
pub(crate) struct TcpStream(pub(crate) Rc<RefCell<glommio::net::TcpStream>>);
impl TcpStream {
fn new(io: glommio::net::TcpStream) -> Self {
Self(Rc::new(RefCell::new(io)))
}
}
#[derive(Clone)]
pub(crate) struct UnixStream(pub(crate) Rc<RefCell<glommio::net::UnixStream>>);
impl UnixStream {
fn new(io: glommio::net::UnixStream) -> Self {
Self(Rc::new(RefCell::new(io)))
}
}
/// Opens a TCP connection to a remote host.
pub async fn tcp_connect(addr: SocketAddr) -> Result<Io> {
let sock = glommio::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::new(TcpStream::new(sock)))
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result<Io> {
let sock = glommio::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::with_memory_pool(TcpStream::new(sock), pool))
}
/// Opens a unix stream connection.
pub async fn unix_connect<P>(addr: P) -> Result<Io>
where
P: AsRef<std::path::Path>,
{
let sock = glommio::net::UnixStream::connect(addr).await?;
Ok(Io::new(UnixStream::new(sock)))
}
/// Opens a unix stream connection and specified memory pool.
pub async fn unix_connect_in<P>(addr: P, pool: PoolRef) -> Result<Io>
where
P: AsRef<std::path::Path>,
{
let sock = glommio::net::UnixStream::connect(addr).await?;
Ok(Io::with_memory_pool(UnixStream::new(sock), pool))
}
/// Convert std TcpStream to glommio's TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io> {
stream.set_nonblocking(true)?;
stream.set_nodelay(true)?;
unsafe {
Ok(Io::new(TcpStream::new(
glommio::net::TcpStream::from_raw_fd(stream.into_raw_fd()),
)))
}
}
/// Convert std UnixStream to glommio's UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
stream.set_nonblocking(true)?;
// Ok(Io::new(UnixStream::new(From::from(stream))))
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Cannot creat glommio UnixStream from std type",
))
}
}
#[cfg(target_os = "linux")]
pub use self::net_impl::*;

View file

@ -0,0 +1,53 @@
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use async_oneshot as oneshot;
use glommio::Task;
thread_local! {
static SRUN: RefCell<bool> = RefCell::new(false);
static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default();
}
/// Different types of process signals
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum Signal {
/// SIGHUP
Hup,
/// SIGINT
Int,
/// SIGTERM
Term,
/// SIGQUIT
Quit,
}
/// Register signal handler.
///
/// Signals are handled by oneshots, you have to re-register
/// after each signal.
pub fn signal() -> Option<oneshot::Receiver<Signal>> {
if !SRUN.with(|v| *v.borrow()) {
Task::local(Signals::new()).detach();
}
SHANDLERS.with(|handlers| {
let (tx, rx) = oneshot::oneshot();
handlers.borrow_mut().push(tx);
Some(rx)
})
}
struct Signals {}
impl Signals {
pub(super) fn new() -> Signals {
Self {}
}
}
impl Future for Signals {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(())
}
}

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.3] - 2022-01-xx
* Add glommio runtime support
## [0.4.2] - 2022-01-11
* Enable all features for tokio runtime

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.4.2"
version = "0.4.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -18,6 +18,9 @@ path = "src/lib.rs"
[features]
default = []
# glommio support
glommio = ["glomm-io", "threadpool", "parking_lot", "once_cell", "num_cpus", "futures-channel"]
# tokio support
tokio = ["tok-io"]
@ -34,3 +37,11 @@ pin-project-lite = "0.2"
tok-io = { version = "1", package = "tokio", default-features = false, features = ["rt", "net"], optional = true }
async_std = { version = "1", package = "async-std", optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
glomm-io = { version = "0.6", package = "glommio", optional = true }
threadpool = { version = "1.8.1", optional = true }
parking_lot = { version = "0.11.2", optional = true }
once_cell = { version = "1.9.0", optional = true }
num_cpus = { version = "1.13", optional = true }
futures-channel = { version = "0.3", optional = true }

View file

@ -9,6 +9,137 @@ pub use self::arbiter::Arbiter;
pub use self::builder::{Builder, SystemRunner};
pub use self::system::System;
#[allow(dead_code)]
#[cfg(all(feature = "glommio", target_os = "linux"))]
mod glommio {
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use futures_channel::oneshot::{self, Canceled};
use glomm_io::{task, Task};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use threadpool::ThreadPool;
/// Runs the provided future, blocking the current thread until the future
/// completes.
pub fn block_on<F: Future<Output = ()>>(fut: F) {
let ex = glomm_io::LocalExecutor::default();
ex.run(async move {
let _ = fut.await;
})
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
JoinHandle {
fut: Either::Left(
Task::local(async move {
let _ = Task::<()>::later().await;
f.await
})
.detach(),
),
}
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move { f().await })
}
/// Env variable for default cpu pool size.
const ENV_CPU_POOL_VAR: &str = "THREADPOOL";
static DEFAULT_POOL: Lazy<Mutex<ThreadPool>> = Lazy::new(|| {
let num = std::env::var(ENV_CPU_POOL_VAR)
.map_err(|_| ())
.and_then(|val| {
val.parse().map_err(|_| {
log::warn!("Can not parse {} value, using default", ENV_CPU_POOL_VAR,)
})
})
.unwrap_or_else(|_| num_cpus::get() * 5);
Mutex::new(
threadpool::Builder::new()
.thread_name("ntex".to_owned())
.num_threads(num)
.build(),
)
});
thread_local! {
static POOL: ThreadPool = {
DEFAULT_POOL.lock().clone()
};
}
enum Either<T1, T2> {
Left(T1),
Right(T2),
}
/// Blocking operation completion future. It resolves with results
/// of blocking function execution.
pub struct JoinHandle<T> {
fut: Either<task::JoinHandle<T>, oneshot::Receiver<T>>,
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, Canceled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.fut {
Either::Left(ref mut f) => match Pin::new(f).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(res) => Poll::Ready(res.ok_or(Canceled)),
},
Either::Right(ref mut f) => Pin::new(f).poll(cx),
}
}
}
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
POOL.with(|pool| {
pool.execute(move || {
if !tx.is_canceled() {
let _ = tx.send(f());
}
})
});
JoinHandle {
fut: Either::Right(rx),
}
}
}
#[cfg(feature = "tokio")]
mod tokio {
use std::future::Future;
@ -136,17 +267,37 @@ mod asyncstd {
#[cfg(feature = "tokio")]
pub use self::tokio::*;
#[cfg(all(not(feature = "tokio"), feature = "async-std"))]
#[cfg(all(
not(feature = "tokio"),
not(feature = "glommio"),
feature = "async-std",
target_os = "linux"
))]
pub use self::asyncstd::*;
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
feature = "glommio"
))]
pub use self::glommio::*;
/// Runs the provided future, blocking the current thread until the future
/// completes.
#[cfg(all(not(feature = "tokio"), not(feature = "async-std")))]
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "glommio")
))]
pub fn block_on<F: std::future::Future<Output = ()>>(_: F) {
panic!("async runtime is not configured");
}
#[cfg(all(not(feature = "tokio"), not(feature = "async-std")))]
#[cfg(all(
not(feature = "tokio"),
not(feature = "async-std"),
not(feature = "glommio")
))]
pub fn spawn<F>(_: F) -> std::pin::Pin<Box<dyn std::future::Future<Output = F::Output>>>
where
F: std::future::Future + 'static,

View file

@ -2,6 +2,8 @@
## [0.5.10] - 2022-01-xx
* rt: Add glommio runtime support
* http: Use Io::take() method for http/1 dispatcher
## [0.5.9] - 2022-01-12

View file

@ -41,6 +41,9 @@ url = ["url-pkg"]
# tokio runtime
tokio = ["ntex-rt/tokio"]
# glommio runtime
glommio = ["ntex-rt/glommio", "ntex-glommio"]
# async-std runtime
async-std = ["ntex-rt/async-std", "ntex-async-std"]
@ -55,6 +58,7 @@ ntex-tls = "0.1.2"
ntex-rt = "0.4.1"
ntex-io = "0.1.4"
ntex-tokio = "0.1.2"
ntex-glommio = { version = "0.1.0", optional = true }
ntex-async-std = { version = "0.1.0", optional = true }
base64 = "0.13"

View file

@ -896,8 +896,6 @@ mod tests {
.set_write_params(15 * 1024, 1024);
h1.inner
.io
.as_ref()
.unwrap()
.set_memory_pool(crate::util::PoolId::P0.pool_ref());
let mut decoder = ClientCodec::default();

View file

@ -64,8 +64,19 @@ pub mod rt {
#[cfg(feature = "tokio")]
pub use ntex_tokio::*;
#[cfg(all(not(feature = "tokio"), feature = "async-std"))]
#[cfg(all(
feature = "async-std",
not(feature = "tokio"),
not(feature = "glommio")
))]
pub use ntex_async_std::*;
#[cfg(all(
feature = "glommio",
not(feature = "tokio"),
not(feature = "async-std")
))]
pub use ntex_glommio::*;
}
pub mod service {

View file

@ -231,6 +231,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
}
#[cfg(not(feature = "glommio"))]
#[crate::rt_test]
async fn test_data_drop() {
struct TestData(Arc<AtomicUsize>);

View file

@ -204,6 +204,7 @@ async fn test_rustls() {
sys.stop();
}
#[cfg(not(feature = "glommio"))]
#[ntex::test]
#[cfg(unix)]
async fn test_bind_uds() {
@ -253,6 +254,7 @@ async fn test_bind_uds() {
sys.stop();
}
#[cfg(not(feature = "glommio"))]
#[ntex::test]
#[cfg(unix)]
async fn test_listen_uds() {