From 575e534428bb0b4a78675da8e64f9f03869b0de7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 20 Feb 2022 18:31:49 +0600 Subject: [PATCH] Upgrade to glommio 0.7 (#104) --- ntex-glommio/CHANGES.md | 4 ++ ntex-glommio/Cargo.toml | 12 +++--- ntex-glommio/src/io.rs | 9 ++--- ntex-glommio/src/lib.rs | 12 +++--- ntex-glommio/src/signals.rs | 3 +- ntex-rt/CHANGES.md | 4 ++ ntex-rt/Cargo.toml | 11 ++---- ntex-rt/src/lib.rs | 68 +++++++++------------------------- ntex/src/http/h1/dispatcher.rs | 1 + ntex/tests/connect.rs | 1 + 10 files changed, 45 insertions(+), 80 deletions(-) diff --git a/ntex-glommio/CHANGES.md b/ntex-glommio/CHANGES.md index 3124d5db..9d9d555f 100644 --- a/ntex-glommio/CHANGES.md +++ b/ntex-glommio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.2] - 2022-02-20 + +* Upgrade to glommio 0.7 + ## [0.1.1] - 2022-01-30 * Update to ntex-io 0.1.7 diff --git a/ntex-glommio/Cargo.toml b/ntex-glommio/Cargo.toml index f772e724..5791fc97 100644 --- a/ntex-glommio/Cargo.toml +++ b/ntex-glommio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-glommio" -version = "0.1.1" +version = "0.1.2" authors = ["ntex contributors "] description = "glommio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,14 +16,12 @@ name = "ntex_glommio" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.11" -ntex-io = "0.1.7" -ntex-util = "0.1.13" +ntex-bytes = "0.1.14" +ntex-io = "0.1.8" +ntex-util = "0.1.16" async-oneshot = "0.5.0" futures-lite = "1.12" -futures-channel = "0.3" log = "0.4" -pin-project-lite = "0.2" [target.'cfg(target_os = "linux")'.dependencies] -glommio = "0.6" +glommio = "0.7" diff --git a/ntex-glommio/src/io.rs b/ntex-glommio/src/io.rs index 62ce7a2f..08510317 100644 --- a/ntex-glommio/src/io.rs +++ b/ntex-glommio/src/io.rs @@ -3,7 +3,6 @@ use std::{any, future::Future, io, pin::Pin}; use futures_lite::future::FutureExt; use futures_lite::io::{AsyncRead, AsyncWrite}; -use glommio::Task; use ntex_bytes::{Buf, BufMut, BytesVec}; use ntex_io::{ types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, @@ -14,16 +13,16 @@ use crate::net_impl::{TcpStream, UnixStream}; impl IoStream for TcpStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { - Task::local(ReadTask::new(self.clone(), read)).detach(); - Task::local(WriteTask::new(self.clone(), write)).detach(); + glommio::spawn_local(ReadTask::new(self.clone(), read)).detach(); + glommio::spawn_local(WriteTask::new(self.clone(), write)).detach(); Some(Box::new(self)) } } impl IoStream for UnixStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { - Task::local(UnixReadTask::new(self.clone(), read)).detach(); - Task::local(UnixWriteTask::new(self, write)).detach(); + glommio::spawn_local(UnixReadTask::new(self.clone(), read)).detach(); + glommio::spawn_local(UnixWriteTask::new(self, write)).detach(); None } } diff --git a/ntex-glommio/src/lib.rs b/ntex-glommio/src/lib.rs index 81a976a7..8c8885b5 100644 --- a/ntex-glommio/src/lib.rs +++ b/ntex-glommio/src/lib.rs @@ -14,8 +14,6 @@ mod net_impl { use ntex_bytes::PoolRef; use ntex_io::Io; - pub type JoinError = futures_channel::oneshot::Canceled; - #[derive(Clone)] pub(crate) struct TcpStream(pub(crate) Rc>); @@ -80,11 +78,11 @@ mod net_impl { /// Convert std UnixStream to glommio's UnixStream pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result { stream.set_nonblocking(true)?; - // Ok(Io::new(UnixStream::new(From::from(stream)))) - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Cannot creat glommio UnixStream from std type", - )) + unsafe { + Ok(Io::new(UnixStream::new( + glommio::net::UnixStream::from_raw_fd(stream.into_raw_fd()), + ))) + } } } diff --git a/ntex-glommio/src/signals.rs b/ntex-glommio/src/signals.rs index dcb39685..305a8b7d 100644 --- a/ntex-glommio/src/signals.rs +++ b/ntex-glommio/src/signals.rs @@ -1,7 +1,6 @@ use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; use async_oneshot as oneshot; -use glommio::Task; thread_local! { static SRUN: RefCell = RefCell::new(false); @@ -27,7 +26,7 @@ pub enum Signal { /// after each signal. pub fn signal() -> Option> { if !SRUN.with(|v| *v.borrow()) { - Task::local(Signals::new()).detach(); + glommio::spawn_local(Signals::new()).detach(); } SHANDLERS.with(|handlers| { let (tx, rx) = oneshot::oneshot(); diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index 5f2db117..b12ff186 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.4] - 2022-02-20 + +* Upgrade to glommio 0.7 + ## [0.4.3] - 2022-01-17 * Add glommio runtime support diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index d0c84491..8bb9f6e8 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.3" +version = "0.4.4" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -19,7 +19,7 @@ path = "src/lib.rs" default = [] # glommio support -glommio = ["glomm-io", "threadpool", "parking_lot", "once_cell", "num_cpus", "futures-channel"] +glommio = ["glomm-io", "futures-channel"] # tokio support tokio = ["tok-io"] @@ -32,15 +32,10 @@ async-oneshot = "0.5.0" async-channel = "1.6.1" futures-core = "0.3" log = "0.4" -pin-project-lite = "0.2" tok-io = { version = "1", package = "tokio", default-features = false, features = ["rt", "net"], optional = true } async_std = { version = "1", package = "async-std", optional = true } [target.'cfg(target_os = "linux")'.dependencies] -glomm-io = { version = "0.6", package = "glommio", optional = true } -threadpool = { version = "1.8.1", optional = true } -parking_lot = { version = "0.11.2", optional = true } -once_cell = { version = "1.9.0", optional = true } -num_cpus = { version = "1.13", optional = true } +glomm-io = { version = "0.7", package = "glommio", optional = true } futures-channel = { version = "0.3", optional = true } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 30c52567..ea3970e1 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -12,11 +12,10 @@ pub use self::system::System; mod glommio { use std::{future::Future, pin::Pin, task::Context, task::Poll}; - use futures_channel::oneshot::{self, Canceled}; - use glomm_io::{task, Task}; - use once_cell::sync::Lazy; - use parking_lot::Mutex; - use threadpool::ThreadPool; + use futures_channel::oneshot::Canceled; + use glomm_io::task; + + pub type JoinError = Canceled; /// Runs the provided future, blocking the current thread until the future /// completes. @@ -42,8 +41,8 @@ mod glommio { { JoinHandle { fut: Either::Left( - Task::local(async move { - let _ = Task::<()>::later().await; + glomm_io::spawn_local(async move { + glomm_io::executor().yield_now().await; f.await }) .detach(), @@ -67,30 +66,15 @@ mod glommio { spawn(async move { f().await }) } - /// Env variable for default cpu pool size. - const ENV_CPU_POOL_VAR: &str = "THREADPOOL"; - - static DEFAULT_POOL: Lazy> = Lazy::new(|| { - let num = std::env::var(ENV_CPU_POOL_VAR) - .map_err(|_| ()) - .and_then(|val| { - val.parse().map_err(|_| { - log::warn!("Can not parse {} value, using default", ENV_CPU_POOL_VAR,) - }) - }) - .unwrap_or_else(|_| num_cpus::get() * 5); - Mutex::new( - threadpool::Builder::new() - .thread_name("ntex".to_owned()) - .num_threads(num) - .build(), - ) - }); - - thread_local! { - static POOL: ThreadPool = { - DEFAULT_POOL.lock().clone() - }; + pub fn spawn_blocking(f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let fut = glomm_io::executor().spawn_blocking(f); + JoinHandle { + fut: Either::Right(Box::pin(async move { Ok(fut.await) })), + } } enum Either { @@ -101,7 +85,8 @@ mod glommio { /// Blocking operation completion future. It resolves with results /// of blocking function execution. pub struct JoinHandle { - fut: Either, oneshot::Receiver>, + fut: + Either, Pin>>>>, } impl Future for JoinHandle { @@ -117,25 +102,6 @@ mod glommio { } } } - - pub fn spawn_blocking(f: F) -> JoinHandle - where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - POOL.with(|pool| { - pool.execute(move || { - if !tx.is_canceled() { - let _ = tx.send(f()); - } - }) - }); - - JoinHandle { - fut: Either::Right(rx), - } - } } #[cfg(feature = "tokio")] diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 225efcf9..7a5cbc9b 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -770,6 +770,7 @@ mod tests { decoder.decode(buf).unwrap().unwrap() } + #[cfg(feature = "tokio")] #[crate::rt_test] async fn test_on_request() { let (client, server) = Io::create(); diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index 0ab43649..9703234a 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -23,6 +23,7 @@ fn ssl_acceptor() -> tls_openssl::ssl::SslAcceptor { #[cfg(feature = "rustls")] use tls_rustls::ServerConfig; + #[cfg(feature = "rustls")] fn tls_acceptor() -> Arc { use rustls_pemfile::{certs, pkcs8_private_keys};