Upgrade to glommio 0.7 (#104)

This commit is contained in:
Nikolay Kim 2022-02-20 18:31:49 +06:00
parent 569c7969bc
commit 575e534428
10 changed files with 45 additions and 80 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.2] - 2022-02-20
* Upgrade to glommio 0.7
## [0.1.1] - 2022-01-30
* Update to ntex-io 0.1.7

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-glommio"
version = "0.1.1"
version = "0.1.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,14 +16,12 @@ name = "ntex_glommio"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.11"
ntex-io = "0.1.7"
ntex-util = "0.1.13"
ntex-bytes = "0.1.14"
ntex-io = "0.1.8"
ntex-util = "0.1.16"
async-oneshot = "0.5.0"
futures-lite = "1.12"
futures-channel = "0.3"
log = "0.4"
pin-project-lite = "0.2"
[target.'cfg(target_os = "linux")'.dependencies]
glommio = "0.6"
glommio = "0.7"

View file

@ -3,7 +3,6 @@ use std::{any, future::Future, io, pin::Pin};
use futures_lite::future::FutureExt;
use futures_lite::io::{AsyncRead, AsyncWrite};
use glommio::Task;
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
@ -14,16 +13,16 @@ use crate::net_impl::{TcpStream, UnixStream};
impl IoStream for TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
Task::local(ReadTask::new(self.clone(), read)).detach();
Task::local(WriteTask::new(self.clone(), write)).detach();
glommio::spawn_local(ReadTask::new(self.clone(), read)).detach();
glommio::spawn_local(WriteTask::new(self.clone(), write)).detach();
Some(Box::new(self))
}
}
impl IoStream for UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
Task::local(UnixReadTask::new(self.clone(), read)).detach();
Task::local(UnixWriteTask::new(self, write)).detach();
glommio::spawn_local(UnixReadTask::new(self.clone(), read)).detach();
glommio::spawn_local(UnixWriteTask::new(self, write)).detach();
None
}
}

View file

@ -14,8 +14,6 @@ mod net_impl {
use ntex_bytes::PoolRef;
use ntex_io::Io;
pub type JoinError = futures_channel::oneshot::Canceled;
#[derive(Clone)]
pub(crate) struct TcpStream(pub(crate) Rc<RefCell<glommio::net::TcpStream>>);
@ -80,11 +78,11 @@ mod net_impl {
/// Convert std UnixStream to glommio's UnixStream
pub fn from_unix_stream(stream: std::os::unix::net::UnixStream) -> Result<Io> {
stream.set_nonblocking(true)?;
// Ok(Io::new(UnixStream::new(From::from(stream))))
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Cannot creat glommio UnixStream from std type",
))
unsafe {
Ok(Io::new(UnixStream::new(
glommio::net::UnixStream::from_raw_fd(stream.into_raw_fd()),
)))
}
}
}

View file

@ -1,7 +1,6 @@
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use async_oneshot as oneshot;
use glommio::Task;
thread_local! {
static SRUN: RefCell<bool> = RefCell::new(false);
@ -27,7 +26,7 @@ pub enum Signal {
/// after each signal.
pub fn signal() -> Option<oneshot::Receiver<Signal>> {
if !SRUN.with(|v| *v.borrow()) {
Task::local(Signals::new()).detach();
glommio::spawn_local(Signals::new()).detach();
}
SHANDLERS.with(|handlers| {
let (tx, rx) = oneshot::oneshot();

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.4] - 2022-02-20
* Upgrade to glommio 0.7
## [0.4.3] - 2022-01-17
* Add glommio runtime support

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.4.3"
version = "0.4.4"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -19,7 +19,7 @@ path = "src/lib.rs"
default = []
# glommio support
glommio = ["glomm-io", "threadpool", "parking_lot", "once_cell", "num_cpus", "futures-channel"]
glommio = ["glomm-io", "futures-channel"]
# tokio support
tokio = ["tok-io"]
@ -32,15 +32,10 @@ async-oneshot = "0.5.0"
async-channel = "1.6.1"
futures-core = "0.3"
log = "0.4"
pin-project-lite = "0.2"
tok-io = { version = "1", package = "tokio", default-features = false, features = ["rt", "net"], optional = true }
async_std = { version = "1", package = "async-std", optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
glomm-io = { version = "0.6", package = "glommio", optional = true }
threadpool = { version = "1.8.1", optional = true }
parking_lot = { version = "0.11.2", optional = true }
once_cell = { version = "1.9.0", optional = true }
num_cpus = { version = "1.13", optional = true }
glomm-io = { version = "0.7", package = "glommio", optional = true }
futures-channel = { version = "0.3", optional = true }

View file

@ -12,11 +12,10 @@ pub use self::system::System;
mod glommio {
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use futures_channel::oneshot::{self, Canceled};
use glomm_io::{task, Task};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use threadpool::ThreadPool;
use futures_channel::oneshot::Canceled;
use glomm_io::task;
pub type JoinError = Canceled;
/// Runs the provided future, blocking the current thread until the future
/// completes.
@ -42,8 +41,8 @@ mod glommio {
{
JoinHandle {
fut: Either::Left(
Task::local(async move {
let _ = Task::<()>::later().await;
glomm_io::spawn_local(async move {
glomm_io::executor().yield_now().await;
f.await
})
.detach(),
@ -67,30 +66,15 @@ mod glommio {
spawn(async move { f().await })
}
/// Env variable for default cpu pool size.
const ENV_CPU_POOL_VAR: &str = "THREADPOOL";
static DEFAULT_POOL: Lazy<Mutex<ThreadPool>> = Lazy::new(|| {
let num = std::env::var(ENV_CPU_POOL_VAR)
.map_err(|_| ())
.and_then(|val| {
val.parse().map_err(|_| {
log::warn!("Can not parse {} value, using default", ENV_CPU_POOL_VAR,)
})
})
.unwrap_or_else(|_| num_cpus::get() * 5);
Mutex::new(
threadpool::Builder::new()
.thread_name("ntex".to_owned())
.num_threads(num)
.build(),
)
});
thread_local! {
static POOL: ThreadPool = {
DEFAULT_POOL.lock().clone()
};
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = glomm_io::executor().spawn_blocking(f);
JoinHandle {
fut: Either::Right(Box::pin(async move { Ok(fut.await) })),
}
}
enum Either<T1, T2> {
@ -101,7 +85,8 @@ mod glommio {
/// Blocking operation completion future. It resolves with results
/// of blocking function execution.
pub struct JoinHandle<T> {
fut: Either<task::JoinHandle<T>, oneshot::Receiver<T>>,
fut:
Either<task::JoinHandle<T>, Pin<Box<dyn Future<Output = Result<T, Canceled>>>>>,
}
impl<T> Future for JoinHandle<T> {
@ -117,25 +102,6 @@ mod glommio {
}
}
}
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
POOL.with(|pool| {
pool.execute(move || {
if !tx.is_canceled() {
let _ = tx.send(f());
}
})
});
JoinHandle {
fut: Either::Right(rx),
}
}
}
#[cfg(feature = "tokio")]

View file

@ -770,6 +770,7 @@ mod tests {
decoder.decode(buf).unwrap().unwrap()
}
#[cfg(feature = "tokio")]
#[crate::rt_test]
async fn test_on_request() {
let (client, server) = Io::create();

View file

@ -23,6 +23,7 @@ fn ssl_acceptor() -> tls_openssl::ssl::SslAcceptor {
#[cfg(feature = "rustls")]
use tls_rustls::ServerConfig;
#[cfg(feature = "rustls")]
fn tls_acceptor() -> Arc<ServerConfig> {
use rustls_pemfile::{certs, pkcs8_private_keys};