From 450332144d27f7ba4f768c5dbe901557bdc9cff8 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 17 Jan 2022 01:03:15 +0600 Subject: [PATCH] Glommio runtime support (#94) * add glommio runtime support * optional compilation for glommio --- .github/workflows/linux.yml | 18 +- Cargo.toml | 2 + ntex-async-std/Cargo.toml | 2 +- ntex-glommio/CHANGES.md | 5 + ntex-glommio/Cargo.toml | 30 ++ ntex-glommio/LICENSE | 1 + ntex-glommio/src/io.rs | 633 +++++++++++++++++++++++++++++++++ ntex-glommio/src/lib.rs | 92 +++++ ntex-glommio/src/signals.rs | 53 +++ ntex-rt/CHANGES.md | 4 + ntex-rt/Cargo.toml | 13 +- ntex-rt/src/lib.rs | 157 +++++++- ntex/CHANGES.md | 2 + ntex/Cargo.toml | 4 + ntex/src/http/h1/dispatcher.rs | 2 - ntex/src/lib.rs | 13 +- ntex/src/web/types/data.rs | 1 + ntex/tests/web_httpserver.rs | 2 + 18 files changed, 1024 insertions(+), 10 deletions(-) create mode 100644 ntex-glommio/CHANGES.md create mode 100644 ntex-glommio/Cargo.toml create mode 120000 ntex-glommio/LICENSE create mode 100644 ntex-glommio/src/io.rs create mode 100644 ntex-glommio/src/lib.rs create mode 100644 ntex-glommio/src/signals.rs diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index e41aa109..9bf5841d 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -63,6 +63,12 @@ jobs: cd ntex cargo test --no-default-features --no-fail-fast --features="async-std,cookie,url,compress,openssl,rustls" --lib -- --test-threads 1 + - name: Run glommio tests + timeout-minutes: 40 + run: | + cd ntex + sudo -E env PATH="$PATH" bash -c "ulimit -l 512 && ulimit -a && cargo test --no-default-features --no-fail-fast --features=\"glommio,cookie,url,compress,openssl,rustls\"" + - name: Install tarpaulin if: matrix.version == '1.56.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') continue-on-error: true @@ -82,12 +88,20 @@ jobs: cd ntex cargo tarpaulin --out Xml --output-dir=.. --no-default-features --features="async-std,cookie,url,compress,openssl,rustls" --lib + - name: Generate coverage report (glommio) + if: matrix.version == '1.56.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') + continue-on-error: true + run: | + cd ntex + sudo -E env PATH="$PATH" bash -c "ulimit -l 512 && ulimit -a && cargo tarpaulin --out Xml --no-default-features --features=\"glommio,cookie,url,compress,openssl,rustls\" --lib" + - name: Upload to Codecov if: matrix.version == '1.56.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') continue-on-error: true - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v2 with: - file: cobertura.xml + files: cobertura.xml, ./ntex/cobertura.xml + verbose: true - name: Install cargo-cache continue-on-error: true diff --git a/Cargo.toml b/Cargo.toml index c7256328..6fb1fc01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "ntex-tls", "ntex-macros", "ntex-util", + "ntex-glommio", "ntex-tokio", "ntex-async-std", ] @@ -26,5 +27,6 @@ ntex-tls = { path = "ntex-tls" } ntex-macros = { path = "ntex-macros" } ntex-util = { path = "ntex-util" } +ntex-glommio = { path = "ntex-glommio" } ntex-tokio = { path = "ntex-tokio" } ntex-async-std = { path = "ntex-async-std" } diff --git a/ntex-async-std/Cargo.toml b/ntex-async-std/Cargo.toml index 0f88ba3c..2b42cc25 100644 --- a/ntex-async-std/Cargo.toml +++ b/ntex-async-std/Cargo.toml @@ -6,7 +6,7 @@ description = "async-std intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] homepage = "https://ntex.rs" repository = "https://github.com/ntex-rs/ntex.git" -documentation = "https://docs.rs/ntex-rt-tokio/" +documentation = "https://docs.rs/ntex-rt-async-std/" categories = ["network-programming", "asynchronous"] license = "MIT" edition = "2018" diff --git a/ntex-glommio/CHANGES.md b/ntex-glommio/CHANGES.md new file mode 100644 index 00000000..38a43584 --- /dev/null +++ b/ntex-glommio/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## [0.1.0] - 2022-01-03 + +* Initial release diff --git a/ntex-glommio/Cargo.toml b/ntex-glommio/Cargo.toml new file mode 100644 index 00000000..b1eb16c3 --- /dev/null +++ b/ntex-glommio/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "ntex-glommio" +version = "0.1.0" +authors = ["ntex contributors "] +description = "glommio intergration for ntex framework" +keywords = ["network", "framework", "async", "futures"] +homepage = "https://ntex.rs" +repository = "https://github.com/ntex-rs/ntex.git" +documentation = "https://docs.rs/ntex-rt-glommio/" +categories = ["network-programming", "asynchronous"] +license = "MIT" +edition = "2018" + +[lib] +name = "ntex_glommio" +path = "src/lib.rs" + +[dependencies] +ntex-bytes = "0.1.9" +ntex-io = "0.1.4" +ntex-util = "0.1.9" +async-oneshot = "0.5.0" +futures-lite = "1.12" +futures-channel = "0.3" +derive_more = "0.99" +log = "0.4" +pin-project-lite = "0.2" + +[target.'cfg(target_os = "linux")'.dependencies] +glommio = "0.6" diff --git a/ntex-glommio/LICENSE b/ntex-glommio/LICENSE new file mode 120000 index 00000000..ea5b6064 --- /dev/null +++ b/ntex-glommio/LICENSE @@ -0,0 +1 @@ +../LICENSE \ No newline at end of file diff --git a/ntex-glommio/src/io.rs b/ntex-glommio/src/io.rs new file mode 100644 index 00000000..7855d0b1 --- /dev/null +++ b/ntex-glommio/src/io.rs @@ -0,0 +1,633 @@ +use std::task::{Context, Poll}; +use std::{any, future::Future, io, pin::Pin}; + +use futures_lite::future::FutureExt; +use futures_lite::io::{AsyncRead, AsyncWrite}; +use glommio::Task; +use ntex_bytes::{Buf, BufMut, BytesMut}; +use ntex_io::{ + types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, +}; +use ntex_util::{ready, time::sleep, time::Sleep}; + +use crate::net_impl::{TcpStream, UnixStream}; + +impl IoStream for TcpStream { + fn start(self, read: ReadContext, write: WriteContext) -> Option> { + Task::local(ReadTask::new(self.clone(), read)).detach(); + Task::local(WriteTask::new(self.clone(), write)).detach(); + Some(Box::new(self)) + } +} + +impl IoStream for UnixStream { + fn start(self, read: ReadContext, write: WriteContext) -> Option> { + Task::local(UnixReadTask::new(self.clone(), read)).detach(); + Task::local(UnixWriteTask::new(self, write)).detach(); + None + } +} + +impl Handle for TcpStream { + fn query(&self, id: any::TypeId) -> Option> { + if id == any::TypeId::of::() { + if let Ok(addr) = self.0.borrow().peer_addr() { + return Some(Box::new(types::PeerAddr(addr))); + } + } + None + } +} + +/// Read io task +struct ReadTask { + io: TcpStream, + state: ReadContext, +} + +impl ReadTask { + /// Create new read io task + fn new(io: TcpStream, state: ReadContext) -> Self { + Self { io, state } + } +} + +impl Future for ReadTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut(); + + loop { + match ready!(this.state.poll_ready(cx)) { + ReadStatus::Ready => { + let pool = this.state.memory_pool(); + let mut buf = this.state.get_read_buf(); + let (hw, lw) = pool.read_params().unpack(); + + // read data from socket + let mut new_bytes = 0; + let mut close = false; + let mut pending = false; + loop { + // make sure we've got room + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + + match poll_read_buf( + Pin::new(&mut *this.io.0.borrow_mut()), + cx, + &mut buf, + ) { + Poll::Pending => { + pending = true; + break; + } + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("glommio stream is disconnected"); + close = true; + } else { + new_bytes += n; + if new_bytes <= hw { + continue; + } + } + break; + } + Poll::Ready(Err(err)) => { + log::trace!("read task failed on io {:?}", err); + let _ = this.state.release_read_buf(buf, new_bytes); + this.state.close(Some(err)); + return Poll::Ready(()); + } + } + } + + if new_bytes == 0 && close { + this.state.close(None); + return Poll::Ready(()); + } + this.state.release_read_buf(buf, new_bytes); + return if close { + this.state.close(None); + Poll::Ready(()) + } else if pending { + Poll::Pending + } else { + continue; + }; + } + ReadStatus::Terminate => { + log::trace!("read task is instructed to shutdown"); + return Poll::Ready(()); + } + } + } + } +} + +enum IoWriteState { + Processing(Option), + Shutdown(Sleep, Shutdown), +} + +enum Shutdown { + None(Pin>>>), + Stopping(u16), +} + +/// Write io task +struct WriteTask { + st: IoWriteState, + io: TcpStream, + state: WriteContext, +} + +impl WriteTask { + /// Create new write io task + fn new(io: TcpStream, state: WriteContext) -> Self { + Self { + io, + state, + st: IoWriteState::Processing(None), + } + } +} + +impl Future for WriteTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().get_mut(); + + match this.st { + IoWriteState::Processing(ref mut delay) => { + match this.state.poll_ready(cx) { + Poll::Ready(WriteStatus::Ready) => { + if let Some(delay) = delay { + if delay.poll_elapsed(cx).is_ready() { + this.state.close(Some(io::Error::new( + io::ErrorKind::TimedOut, + "Operation timedout", + ))); + return Poll::Ready(()); + } + } + + // flush framed instance + match flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx) { + Poll::Pending | Poll::Ready(true) => Poll::Pending, + Poll::Ready(false) => Poll::Ready(()), + } + } + Poll::Ready(WriteStatus::Timeout(time)) => { + log::trace!("initiate timeout delay for {:?}", time); + if delay.is_none() { + *delay = Some(sleep(time)); + } + self.poll(cx) + } + Poll::Ready(WriteStatus::Shutdown(time)) => { + log::trace!("write task is instructed to shutdown"); + + let timeout = if let Some(delay) = delay.take() { + delay + } else { + sleep(time) + }; + + let io = this.io.clone(); + let fut = Box::pin(async move { + io.0.borrow().shutdown(std::net::Shutdown::Write).await + }); + this.st = IoWriteState::Shutdown(timeout, Shutdown::None(fut)); + self.poll(cx) + } + Poll::Ready(WriteStatus::Terminate) => { + log::trace!("write task is instructed to terminate"); + + let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); + this.state.close(None); + Poll::Ready(()) + } + Poll::Pending => Poll::Pending, + } + } + IoWriteState::Shutdown(ref mut delay, ref mut st) => { + // close WRITE side and wait for disconnect on read side. + // use disconnect timeout, otherwise it could hang forever. + loop { + match st { + Shutdown::None(ref mut fut) => { + // flush write buffer + let flush_result = + flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx); + match flush_result { + Poll::Ready(true) => { + if ready!(fut.poll(cx)).is_err() { + this.state.close(None); + return Poll::Ready(()); + } + *st = Shutdown::Stopping(0); + continue; + } + Poll::Ready(false) => { + log::trace!( + "write task is closed with err during flush" + ); + this.state.close(None); + return Poll::Ready(()); + } + _ => (), + } + } + Shutdown::Stopping(ref mut count) => { + // read until 0 or err + let mut buf = [0u8; 512]; + let io = &mut this.io; + loop { + match Pin::new(&mut *io.0.borrow_mut()) + .poll_read(cx, &mut buf) + { + Poll::Ready(Err(e)) => { + log::trace!("write task is stopped"); + this.state.close(Some(e)); + return Poll::Ready(()); + } + Poll::Ready(Ok(0)) => { + log::trace!("glommio socket is disconnected"); + this.state.close(None); + return Poll::Ready(()); + } + Poll::Ready(Ok(n)) => { + *count += n as u16; + if *count > 4096 { + log::trace!( + "write task is stopped, too much input" + ); + this.state.close(None); + return Poll::Ready(()); + } + } + Poll::Pending => break, + } + } + } + } + + // disconnect timeout + if delay.poll_elapsed(cx).is_pending() { + return Poll::Pending; + } + log::trace!("write task is stopped after delay"); + this.state.close(None); + let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); + return Poll::Ready(()); + } + } + } + } +} + +/// Flush write buffer to underlying I/O stream. +pub(super) fn flush_io( + io: &mut T, + state: &WriteContext, + cx: &mut Context<'_>, +) -> Poll { + let mut buf = if let Some(buf) = state.get_write_buf() { + buf + } else { + return Poll::Ready(true); + }; + let len = buf.len(); + let pool = state.memory_pool(); + + if len != 0 { + // log::trace!("flushing framed transport: {:?}", buf.len()); + + let mut written = 0; + while written < len { + match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { + Poll::Pending => break, + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("Disconnected during flush, written {}", written); + pool.release_write_buf(buf); + state.close(Some(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))); + return Poll::Ready(false); + } else { + written += n + } + } + Poll::Ready(Err(e)) => { + log::trace!("Error during flush: {}", e); + pool.release_write_buf(buf); + state.close(Some(e)); + return Poll::Ready(false); + } + } + } + log::trace!("flushed {} bytes", written); + + // remove written data + let result = if written == len { + buf.clear(); + if let Err(e) = state.release_write_buf(buf) { + state.close(Some(e)); + return Poll::Ready(false); + } + Poll::Ready(true) + } else { + buf.advance(written); + if let Err(e) = state.release_write_buf(buf) { + state.close(Some(e)); + return Poll::Ready(false); + } + Poll::Pending + }; + + // flush + match Pin::new(&mut *io).poll_flush(cx) { + Poll::Ready(Ok(_)) => result, + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + log::trace!("error during flush: {}", e); + state.close(Some(e)); + Poll::Ready(false) + } + } + } else { + Poll::Ready(true) + } +} + +pub fn poll_read_buf( + io: Pin<&mut T>, + cx: &mut Context<'_>, + buf: &mut BytesMut, +) -> Poll> { + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let dst = unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [u8]) }; + let n = ready!(io.poll_read(cx, dst))?; + + // Safety: This is guaranteed to be the number of initialized (and read) + // bytes due to the invariants provided by Read::poll_read() api + unsafe { + buf.advance_mut(n); + } + + Poll::Ready(Ok(n)) +} + +/// Read io task +struct UnixReadTask { + io: UnixStream, + state: ReadContext, +} + +impl UnixReadTask { + /// Create new read io task + fn new(io: UnixStream, state: ReadContext) -> Self { + Self { io, state } + } +} + +impl Future for UnixReadTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut(); + + loop { + match ready!(this.state.poll_ready(cx)) { + ReadStatus::Ready => { + let pool = this.state.memory_pool(); + let mut buf = this.state.get_read_buf(); + let (hw, lw) = pool.read_params().unpack(); + + // read data from socket + let mut new_bytes = 0; + let mut close = false; + let mut pending = false; + loop { + // make sure we've got room + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + + match poll_read_buf( + Pin::new(&mut *this.io.0.borrow_mut()), + cx, + &mut buf, + ) { + Poll::Pending => { + pending = true; + break; + } + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("glommio stream is disconnected"); + close = true; + } else { + new_bytes += n; + if new_bytes <= hw { + continue; + } + } + break; + } + Poll::Ready(Err(err)) => { + log::trace!("read task failed on io {:?}", err); + let _ = this.state.release_read_buf(buf, new_bytes); + this.state.close(Some(err)); + return Poll::Ready(()); + } + } + } + + if new_bytes == 0 && close { + this.state.close(None); + return Poll::Ready(()); + } + this.state.release_read_buf(buf, new_bytes); + return if close { + this.state.close(None); + Poll::Ready(()) + } else if pending { + Poll::Pending + } else { + continue; + }; + } + ReadStatus::Terminate => { + log::trace!("read task is instructed to shutdown"); + return Poll::Ready(()); + } + } + } + } +} + +/// Write io task +struct UnixWriteTask { + st: IoWriteState, + io: UnixStream, + state: WriteContext, +} + +impl UnixWriteTask { + /// Create new write io task + fn new(io: UnixStream, state: WriteContext) -> Self { + Self { + io, + state, + st: IoWriteState::Processing(None), + } + } +} + +impl Future for UnixWriteTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().get_mut(); + + match this.st { + IoWriteState::Processing(ref mut delay) => { + match this.state.poll_ready(cx) { + Poll::Ready(WriteStatus::Ready) => { + if let Some(delay) = delay { + if delay.poll_elapsed(cx).is_ready() { + this.state.close(Some(io::Error::new( + io::ErrorKind::TimedOut, + "Operation timedout", + ))); + return Poll::Ready(()); + } + } + + // flush framed instance + match flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx) { + Poll::Pending | Poll::Ready(true) => Poll::Pending, + Poll::Ready(false) => Poll::Ready(()), + } + } + Poll::Ready(WriteStatus::Timeout(time)) => { + log::trace!("initiate timeout delay for {:?}", time); + if delay.is_none() { + *delay = Some(sleep(time)); + } + self.poll(cx) + } + Poll::Ready(WriteStatus::Shutdown(time)) => { + log::trace!("write task is instructed to shutdown"); + + let timeout = if let Some(delay) = delay.take() { + delay + } else { + sleep(time) + }; + + let io = this.io.clone(); + let fut = Box::pin(async move { + io.0.borrow().shutdown(std::net::Shutdown::Write).await + }); + this.st = IoWriteState::Shutdown(timeout, Shutdown::None(fut)); + self.poll(cx) + } + Poll::Ready(WriteStatus::Terminate) => { + log::trace!("write task is instructed to terminate"); + + let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); + this.state.close(None); + Poll::Ready(()) + } + Poll::Pending => Poll::Pending, + } + } + IoWriteState::Shutdown(ref mut delay, ref mut st) => { + // close WRITE side and wait for disconnect on read side. + // use disconnect timeout, otherwise it could hang forever. + loop { + match st { + Shutdown::None(ref mut fut) => { + // flush write buffer + let flush_result = + flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx); + match flush_result { + Poll::Ready(true) => { + if ready!(fut.poll(cx)).is_err() { + this.state.close(None); + return Poll::Ready(()); + } + *st = Shutdown::Stopping(0); + continue; + } + Poll::Ready(false) => { + log::trace!( + "write task is closed with err during flush" + ); + this.state.close(None); + return Poll::Ready(()); + } + _ => (), + } + } + Shutdown::Stopping(ref mut count) => { + // read until 0 or err + let mut buf = [0u8; 512]; + let io = &mut this.io; + loop { + match Pin::new(&mut *io.0.borrow_mut()) + .poll_read(cx, &mut buf) + { + Poll::Ready(Err(e)) => { + log::trace!("write task is stopped"); + this.state.close(Some(e)); + return Poll::Ready(()); + } + Poll::Ready(Ok(0)) => { + log::trace!("glommio unix socket is disconnected"); + this.state.close(None); + return Poll::Ready(()); + } + Poll::Ready(Ok(n)) => { + *count += n as u16; + if *count > 4096 { + log::trace!( + "write task is stopped, too much input" + ); + this.state.close(None); + return Poll::Ready(()); + } + } + Poll::Pending => break, + } + } + } + } + + // disconnect timeout + if delay.poll_elapsed(cx).is_pending() { + return Poll::Pending; + } + log::trace!("write task is stopped after delay"); + this.state.close(None); + let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); + return Poll::Ready(()); + } + } + } + } +} diff --git a/ntex-glommio/src/lib.rs b/ntex-glommio/src/lib.rs new file mode 100644 index 00000000..81a976a7 --- /dev/null +++ b/ntex-glommio/src/lib.rs @@ -0,0 +1,92 @@ +#[cfg(target_os = "linux")] +mod io; +#[cfg(target_os = "linux")] +mod signals; + +#[cfg(target_os = "linux")] +pub use self::signals::{signal, Signal}; + +#[cfg(target_os = "linux")] +mod net_impl { + use std::os::unix::io::{FromRawFd, IntoRawFd}; + use std::{cell::RefCell, io::Result, net, net::SocketAddr, rc::Rc}; + + use ntex_bytes::PoolRef; + use ntex_io::Io; + + pub type JoinError = futures_channel::oneshot::Canceled; + + #[derive(Clone)] + pub(crate) struct TcpStream(pub(crate) Rc>); + + impl TcpStream { + fn new(io: glommio::net::TcpStream) -> Self { + Self(Rc::new(RefCell::new(io))) + } + } + + #[derive(Clone)] + pub(crate) struct UnixStream(pub(crate) Rc>); + + impl UnixStream { + fn new(io: glommio::net::UnixStream) -> Self { + Self(Rc::new(RefCell::new(io))) + } + } + + /// Opens a TCP connection to a remote host. + pub async fn tcp_connect(addr: SocketAddr) -> Result { + let sock = glommio::net::TcpStream::connect(addr).await?; + sock.set_nodelay(true)?; + Ok(Io::new(TcpStream::new(sock))) + } + + /// Opens a TCP connection to a remote host and use specified memory pool. + pub async fn tcp_connect_in(addr: SocketAddr, pool: PoolRef) -> Result { + let sock = glommio::net::TcpStream::connect(addr).await?; + sock.set_nodelay(true)?; + Ok(Io::with_memory_pool(TcpStream::new(sock), pool)) + } + + /// Opens a unix stream connection. + pub async fn unix_connect

(addr: P) -> Result + where + P: AsRef, + { + let sock = glommio::net::UnixStream::connect(addr).await?; + Ok(Io::new(UnixStream::new(sock))) + } + + /// Opens a unix stream connection and specified memory pool. + pub async fn unix_connect_in

(addr: P, pool: PoolRef) -> Result + where + P: AsRef, + { + let sock = glommio::net::UnixStream::connect(addr).await?; + Ok(Io::with_memory_pool(UnixStream::new(sock), pool)) + } + + /// Convert std TcpStream to glommio's TcpStream + pub fn from_tcp_stream(stream: net::TcpStream) -> Result { + stream.set_nonblocking(true)?; + stream.set_nodelay(true)?; + unsafe { + Ok(Io::new(TcpStream::new( + glommio::net::TcpStream::from_raw_fd(stream.into_raw_fd()), + ))) + } + } + + /// Convert std UnixStream to glommio's UnixStream + pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { + stream.set_nonblocking(true)?; + // Ok(Io::new(UnixStream::new(From::from(stream)))) + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Cannot creat glommio UnixStream from std type", + )) + } +} + +#[cfg(target_os = "linux")] +pub use self::net_impl::*; diff --git a/ntex-glommio/src/signals.rs b/ntex-glommio/src/signals.rs new file mode 100644 index 00000000..dcb39685 --- /dev/null +++ b/ntex-glommio/src/signals.rs @@ -0,0 +1,53 @@ +use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; + +use async_oneshot as oneshot; +use glommio::Task; + +thread_local! { + static SRUN: RefCell = RefCell::new(false); + static SHANDLERS: Rc>>> = Default::default(); +} + +/// Different types of process signals +#[derive(PartialEq, Clone, Copy, Debug)] +pub enum Signal { + /// SIGHUP + Hup, + /// SIGINT + Int, + /// SIGTERM + Term, + /// SIGQUIT + Quit, +} + +/// Register signal handler. +/// +/// Signals are handled by oneshots, you have to re-register +/// after each signal. +pub fn signal() -> Option> { + if !SRUN.with(|v| *v.borrow()) { + Task::local(Signals::new()).detach(); + } + SHANDLERS.with(|handlers| { + let (tx, rx) = oneshot::oneshot(); + handlers.borrow_mut().push(tx); + Some(rx) + }) +} + +struct Signals {} + +impl Signals { + pub(super) fn new() -> Signals { + Self {} + } +} + +impl Future for Signals { + type Output = (); + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(()) + } +} diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index f765bf69..c043c071 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.3] - 2022-01-xx + +* Add glommio runtime support + ## [0.4.2] - 2022-01-11 * Enable all features for tokio runtime diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index 88ce2f05..d59c946b 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.2" +version = "0.4.3" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -18,6 +18,9 @@ path = "src/lib.rs" [features] default = [] +# glommio support +glommio = ["glomm-io", "threadpool", "parking_lot", "once_cell", "num_cpus", "futures-channel"] + # tokio support tokio = ["tok-io"] @@ -34,3 +37,11 @@ pin-project-lite = "0.2" tok-io = { version = "1", package = "tokio", default-features = false, features = ["rt", "net"], optional = true } async_std = { version = "1", package = "async-std", optional = true } + +[target.'cfg(target_os = "linux")'.dependencies] +glomm-io = { version = "0.6", package = "glommio", optional = true } +threadpool = { version = "1.8.1", optional = true } +parking_lot = { version = "0.11.2", optional = true } +once_cell = { version = "1.9.0", optional = true } +num_cpus = { version = "1.13", optional = true } +futures-channel = { version = "0.3", optional = true } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index d717bf9e..e7c87df8 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -9,6 +9,137 @@ pub use self::arbiter::Arbiter; pub use self::builder::{Builder, SystemRunner}; pub use self::system::System; +#[allow(dead_code)] +#[cfg(all(feature = "glommio", target_os = "linux"))] +mod glommio { + use std::{future::Future, pin::Pin, task::Context, task::Poll}; + + use futures_channel::oneshot::{self, Canceled}; + use glomm_io::{task, Task}; + use once_cell::sync::Lazy; + use parking_lot::Mutex; + use threadpool::ThreadPool; + + /// Runs the provided future, blocking the current thread until the future + /// completes. + pub fn block_on>(fut: F) { + let ex = glomm_io::LocalExecutor::default(); + ex.run(async move { + let _ = fut.await; + }) + } + + /// Spawn a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for spawning futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn(f: F) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + JoinHandle { + fut: Either::Left( + Task::local(async move { + let _ = Task::<()>::later().await; + f.await + }) + .detach(), + ), + } + } + + /// Executes a future on the current thread. This does not create a new Arbiter + /// or Arbiter address, it is simply a helper for executing futures on the current + /// thread. + /// + /// # Panics + /// + /// This function panics if ntex system is not running. + #[inline] + pub fn spawn_fn(f: F) -> JoinHandle + where + F: FnOnce() -> R + 'static, + R: Future + 'static, + { + spawn(async move { f().await }) + } + + /// Env variable for default cpu pool size. + const ENV_CPU_POOL_VAR: &str = "THREADPOOL"; + + static DEFAULT_POOL: Lazy> = Lazy::new(|| { + let num = std::env::var(ENV_CPU_POOL_VAR) + .map_err(|_| ()) + .and_then(|val| { + val.parse().map_err(|_| { + log::warn!("Can not parse {} value, using default", ENV_CPU_POOL_VAR,) + }) + }) + .unwrap_or_else(|_| num_cpus::get() * 5); + Mutex::new( + threadpool::Builder::new() + .thread_name("ntex".to_owned()) + .num_threads(num) + .build(), + ) + }); + + thread_local! { + static POOL: ThreadPool = { + DEFAULT_POOL.lock().clone() + }; + } + + enum Either { + Left(T1), + Right(T2), + } + + /// Blocking operation completion future. It resolves with results + /// of blocking function execution. + pub struct JoinHandle { + fut: Either, oneshot::Receiver>, + } + + impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.fut { + Either::Left(ref mut f) => match Pin::new(f).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(res) => Poll::Ready(res.ok_or(Canceled)), + }, + Either::Right(ref mut f) => Pin::new(f).poll(cx), + } + } + } + + pub fn spawn_blocking(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + POOL.with(|pool| { + pool.execute(move || { + if !tx.is_canceled() { + let _ = tx.send(f()); + } + }) + }); + + JoinHandle { + fut: Either::Right(rx), + } + } +} + #[cfg(feature = "tokio")] mod tokio { use std::future::Future; @@ -136,17 +267,37 @@ mod asyncstd { #[cfg(feature = "tokio")] pub use self::tokio::*; -#[cfg(all(not(feature = "tokio"), feature = "async-std"))] +#[cfg(all( + not(feature = "tokio"), + not(feature = "glommio"), + feature = "async-std", + target_os = "linux" +))] pub use self::asyncstd::*; +#[cfg(all( + not(feature = "tokio"), + not(feature = "async-std"), + feature = "glommio" +))] +pub use self::glommio::*; + /// Runs the provided future, blocking the current thread until the future /// completes. -#[cfg(all(not(feature = "tokio"), not(feature = "async-std")))] +#[cfg(all( + not(feature = "tokio"), + not(feature = "async-std"), + not(feature = "glommio") +))] pub fn block_on>(_: F) { panic!("async runtime is not configured"); } -#[cfg(all(not(feature = "tokio"), not(feature = "async-std")))] +#[cfg(all( + not(feature = "tokio"), + not(feature = "async-std"), + not(feature = "glommio") +))] pub fn spawn(_: F) -> std::pin::Pin>> where F: std::future::Future + 'static, diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 8b52c599..8e20e14a 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -2,6 +2,8 @@ ## [0.5.10] - 2022-01-xx +* rt: Add glommio runtime support + * http: Use Io::take() method for http/1 dispatcher ## [0.5.9] - 2022-01-12 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 6cbb437a..ca5bc2d5 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -41,6 +41,9 @@ url = ["url-pkg"] # tokio runtime tokio = ["ntex-rt/tokio"] +# glommio runtime +glommio = ["ntex-rt/glommio", "ntex-glommio"] + # async-std runtime async-std = ["ntex-rt/async-std", "ntex-async-std"] @@ -55,6 +58,7 @@ ntex-tls = "0.1.2" ntex-rt = "0.4.1" ntex-io = "0.1.4" ntex-tokio = "0.1.2" +ntex-glommio = { version = "0.1.0", optional = true } ntex-async-std = { version = "0.1.0", optional = true } base64 = "0.13" diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 7ac50b85..f50277ab 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -896,8 +896,6 @@ mod tests { .set_write_params(15 * 1024, 1024); h1.inner .io - .as_ref() - .unwrap() .set_memory_pool(crate::util::PoolId::P0.pool_ref()); let mut decoder = ClientCodec::default(); diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index 69d7b040..ffc1f8c6 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -64,8 +64,19 @@ pub mod rt { #[cfg(feature = "tokio")] pub use ntex_tokio::*; - #[cfg(all(not(feature = "tokio"), feature = "async-std"))] + #[cfg(all( + feature = "async-std", + not(feature = "tokio"), + not(feature = "glommio") + ))] pub use ntex_async_std::*; + + #[cfg(all( + feature = "glommio", + not(feature = "tokio"), + not(feature = "async-std") + ))] + pub use ntex_glommio::*; } pub mod service { diff --git a/ntex/src/web/types/data.rs b/ntex/src/web/types/data.rs index 32ca4825..0b20de2b 100644 --- a/ntex/src/web/types/data.rs +++ b/ntex/src/web/types/data.rs @@ -231,6 +231,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); } + #[cfg(not(feature = "glommio"))] #[crate::rt_test] async fn test_data_drop() { struct TestData(Arc); diff --git a/ntex/tests/web_httpserver.rs b/ntex/tests/web_httpserver.rs index 9e211ae1..6d423e84 100644 --- a/ntex/tests/web_httpserver.rs +++ b/ntex/tests/web_httpserver.rs @@ -204,6 +204,7 @@ async fn test_rustls() { sys.stop(); } +#[cfg(not(feature = "glommio"))] #[ntex::test] #[cfg(unix)] async fn test_bind_uds() { @@ -253,6 +254,7 @@ async fn test_bind_uds() { sys.stop(); } +#[cfg(not(feature = "glommio"))] #[ntex::test] #[cfg(unix)] async fn test_listen_uds() {