allow to replace async runtime

This commit is contained in:
Nikolay Kim 2021-12-18 11:46:11 +06:00
parent aa5f6e4b55
commit b8a8e98c1c
26 changed files with 529 additions and 604 deletions

View file

@ -18,25 +18,26 @@ path = "src/lib.rs"
[features] [features]
default = ["tokio"] default = ["tokio"]
# tokio support # tokio traits support
tokio = ["tok-io"] tokio-traits = ["tok-io/net"]
# tokio runtime support
tokio = ["tok-io/net"]
[dependencies] [dependencies]
bitflags = "1.3" bitflags = "1.3"
fxhash = "0.2.1" fxhash = "0.2.1"
ntex-codec = "0.5.1" ntex-codec = "0.6.0"
ntex-bytes = "0.1.7" ntex-bytes = "0.1.7"
ntex-util = "0.1.2" ntex-util = "0.1.2"
ntex-service = "0.2.1" ntex-service = "0.2.1"
log = "0.4" log = "0.4"
pin-project-lite = "0.2" pin-project-lite = "0.2"
tok-io = { version = "1", package = "tokio", default-features = false, features = ["net"], optional = true } tok-io = { version = "1", package = "tokio", default-features = false, optional = true }
backtrace = "*"
[dev-dependencies] [dev-dependencies]
ntex = "0.5.0-b.0" ntex = "0.5.0-b.0"
futures = "0.3" futures = "0.3"
rand = "0.8" rand = "0.8"
env_logger = "0.9" env_logger = "0.9"

View file

@ -1,15 +1,13 @@
//! Framed transport dispatcher //! Framed transport dispatcher
use std::{ use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time};
cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll, time,
};
use ntex_bytes::Pool; use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
use ntex_service::{IntoService, Service}; use ntex_service::{IntoService, Service};
use ntex_util::future::Either;
use ntex_util::time::{now, Seconds}; use ntex_util::time::{now, Seconds};
use ntex_util::{future::Either, spawn};
use super::{DispatchItem, IoBoxed, ReadRef, Timer, WriteRef}; use super::{rt::spawn, DispatchItem, IoBoxed, ReadRef, Timer, WriteRef};
type Response<U> = <U as Encoder>::Item; type Response<U> = <U as Encoder>::Item;
@ -178,7 +176,7 @@ where
} }
} }
impl<S, U> Future for Dispatcher<S, U> impl<S, U> future::Future for Dispatcher<S, U>
where where
S: Service<Request = DispatchItem<U>, Response = Option<Response<U>>> + 'static, S: Service<Request = DispatchItem<U>, Response = Option<Response<U>>> + 'static,
U: Decoder + Encoder + 'static, U: Decoder + Encoder + 'static,

View file

@ -1,17 +1,19 @@
use std::{any::Any, any::TypeId, fmt, future::Future, io, task::Context, task::Poll}; use std::{any::Any, any::TypeId, fmt, future::Future, io, task::Context, task::Poll};
pub mod testing; pub mod testing;
pub mod types;
mod dispatcher; mod dispatcher;
mod filter; mod filter;
mod state; mod state;
mod tasks; mod tasks;
mod time; mod time;
pub mod types;
mod utils; mod utils;
#[cfg(feature = "tokio")] #[cfg(any(feature = "tokio-traits", feature = "tokio"))]
mod tokio_impl; mod tokio_impl;
#[cfg(any(feature = "tokio"))]
mod tokio_rt;
use ntex_bytes::BytesMut; use ntex_bytes::BytesMut;
use ntex_codec::{Decoder, Encoder}; use ntex_codec::{Decoder, Encoder};
@ -128,6 +130,13 @@ where
} }
} }
pub mod rt {
//! async runtime helpers
#[cfg(feature = "tokio")]
pub use crate::tokio_rt::*;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -37,7 +37,7 @@ pub struct IoTest {
} }
bitflags::bitflags! { bitflags::bitflags! {
struct Flags: u8 { struct IoTestFlags: u8 {
const FLUSHED = 0b0000_0001; const FLUSHED = 0b0000_0001;
const CLOSED = 0b0000_0010; const CLOSED = 0b0000_0010;
} }
@ -61,35 +61,35 @@ struct State {
struct Channel { struct Channel {
buf: BytesMut, buf: BytesMut,
buf_cap: usize, buf_cap: usize,
flags: Flags, flags: IoTestFlags,
waker: AtomicWaker, waker: AtomicWaker,
read: IoState, read: IoTestState,
write: IoState, write: IoTestState,
} }
impl Channel { impl Channel {
fn is_closed(&self) -> bool { fn is_closed(&self) -> bool {
self.flags.contains(Flags::CLOSED) self.flags.contains(IoTestFlags::CLOSED)
} }
} }
impl Default for Flags { impl Default for IoTestFlags {
fn default() -> Self { fn default() -> Self {
Flags::empty() IoTestFlags::empty()
} }
} }
#[derive(Debug)] #[derive(Debug)]
enum IoState { enum IoTestState {
Ok, Ok,
Pending, Pending,
Close, Close,
Err(io::Error), Err(io::Error),
} }
impl Default for IoState { impl Default for IoTestState {
fn default() -> Self { fn default() -> Self {
IoState::Ok IoTestState::Ok
} }
} }
@ -139,19 +139,19 @@ impl IoTest {
/// Set read to Pending state /// Set read to Pending state
pub fn read_pending(&self) { pub fn read_pending(&self) {
self.remote.lock().unwrap().borrow_mut().read = IoState::Pending; self.remote.lock().unwrap().borrow_mut().read = IoTestState::Pending;
} }
/// Set read to error /// Set read to error
pub fn read_error(&self, err: io::Error) { pub fn read_error(&self, err: io::Error) {
let channel = self.remote.lock().unwrap(); let channel = self.remote.lock().unwrap();
channel.borrow_mut().read = IoState::Err(err); channel.borrow_mut().read = IoTestState::Err(err);
channel.borrow().waker.wake(); channel.borrow().waker.wake();
} }
/// Set write error on remote side /// Set write error on remote side
pub fn write_error(&self, err: io::Error) { pub fn write_error(&self, err: io::Error) {
self.local.lock().unwrap().borrow_mut().write = IoState::Err(err); self.local.lock().unwrap().borrow_mut().write = IoTestState::Err(err);
self.remote.lock().unwrap().borrow().waker.wake(); self.remote.lock().unwrap().borrow().waker.wake();
} }
@ -180,7 +180,7 @@ impl IoTest {
{ {
let guard = self.remote.lock().unwrap(); let guard = self.remote.lock().unwrap();
let mut remote = guard.borrow_mut(); let mut remote = guard.borrow_mut();
remote.read = IoState::Close; remote.read = IoTestState::Close;
remote.waker.wake(); remote.waker.wake();
log::trace!("close remote socket"); log::trace!("close remote socket");
} }
@ -256,13 +256,13 @@ impl IoTest {
} }
match mem::take(&mut ch.read) { match mem::take(&mut ch.read) {
IoState::Ok => Poll::Pending, IoTestState::Ok => Poll::Pending,
IoState::Close => { IoTestState::Close => {
ch.read = IoState::Close; ch.read = IoTestState::Close;
Poll::Ready(Ok(0)) Poll::Ready(Ok(0))
} }
IoState::Pending => Poll::Pending, IoTestState::Pending => Poll::Pending,
IoState::Err(e) => Poll::Ready(Err(e)), IoTestState::Err(e) => Poll::Ready(Err(e)),
} }
} }
@ -275,12 +275,12 @@ impl IoTest {
let mut ch = guard.borrow_mut(); let mut ch = guard.borrow_mut();
match mem::take(&mut ch.write) { match mem::take(&mut ch.write) {
IoState::Ok => { IoTestState::Ok => {
let cap = cmp::min(buf.len(), ch.buf_cap); let cap = cmp::min(buf.len(), ch.buf_cap);
if cap > 0 { if cap > 0 {
ch.buf.extend(&buf[..cap]); ch.buf.extend(&buf[..cap]);
ch.buf_cap -= cap; ch.buf_cap -= cap;
ch.flags.remove(Flags::FLUSHED); ch.flags.remove(IoTestFlags::FLUSHED);
ch.waker.wake(); ch.waker.wake();
Poll::Ready(Ok(cap)) Poll::Ready(Ok(cap))
} else { } else {
@ -297,8 +297,8 @@ impl IoTest {
Poll::Pending Poll::Pending
} }
} }
IoState::Close => Poll::Ready(Ok(0)), IoTestState::Close => Poll::Ready(Ok(0)),
IoState::Pending => { IoTestState::Pending => {
*self *self
.local .local
.lock() .lock()
@ -311,7 +311,7 @@ impl IoTest {
.borrow_mut() = Some(cx.waker().clone()); .borrow_mut() = Some(cx.waker().clone());
Poll::Pending Poll::Pending
} }
IoState::Err(e) => Poll::Ready(Err(e)), IoTestState::Err(e) => Poll::Ready(Err(e)),
} }
} }
} }
@ -346,125 +346,15 @@ impl Drop for IoTest {
} }
} }
#[cfg(feature = "tokio")]
mod tokio {
use std::task::{Context, Poll};
use std::{cmp, io, mem, pin::Pin};
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
use super::{Flags, IoState, IoTest};
impl AsyncRead for IoTest {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let guard = self.local.lock().unwrap();
let mut ch = guard.borrow_mut();
*ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone());
if !ch.buf.is_empty() {
let size = std::cmp::min(ch.buf.len(), buf.remaining());
let b = ch.buf.split_to(size);
buf.put_slice(&b);
return Poll::Ready(Ok(()));
}
match mem::take(&mut ch.read) {
IoState::Ok => Poll::Pending,
IoState::Close => {
ch.read = IoState::Close;
Poll::Ready(Ok(()))
}
IoState::Pending => Poll::Pending,
IoState::Err(e) => Poll::Ready(Err(e)),
}
}
}
impl AsyncWrite for IoTest {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let guard = self.remote.lock().unwrap();
let mut ch = guard.borrow_mut();
match mem::take(&mut ch.write) {
IoState::Ok => {
let cap = cmp::min(buf.len(), ch.buf_cap);
if cap > 0 {
ch.buf.extend(&buf[..cap]);
ch.buf_cap -= cap;
ch.flags.remove(Flags::FLUSHED);
ch.waker.wake();
Poll::Ready(Ok(cap))
} else {
*self
.local
.lock()
.unwrap()
.borrow_mut()
.waker
.0
.lock()
.unwrap()
.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
}
}
IoState::Close => Poll::Ready(Ok(0)),
IoState::Pending => {
*self
.local
.lock()
.unwrap()
.borrow_mut()
.waker
.0
.lock()
.unwrap()
.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
}
IoState::Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_flush(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.local
.lock()
.unwrap()
.borrow_mut()
.flags
.insert(Flags::CLOSED);
Poll::Ready(Ok(()))
}
}
}
impl IoStream for IoTest { impl IoStream for IoTest {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> { fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let io = Rc::new(self); let io = Rc::new(self);
ntex_util::spawn(ReadTask { crate::rt::spawn(ReadTask {
io: io.clone(), io: io.clone(),
state: read, state: read,
}); });
ntex_util::spawn(WriteTask { crate::rt::spawn(WriteTask {
io: io.clone(), io: io.clone(),
state: write, state: write,
st: IoWriteState::Processing(None), st: IoWriteState::Processing(None),
@ -615,7 +505,7 @@ impl Future for WriteTask {
.unwrap() .unwrap()
.borrow_mut() .borrow_mut()
.flags .flags
.insert(Flags::CLOSED); .insert(IoTestFlags::CLOSED);
this.state.close(None); this.state.close(None);
Poll::Ready(()) Poll::Ready(())
} }
@ -652,7 +542,7 @@ impl Future for WriteTask {
.unwrap() .unwrap()
.borrow_mut() .borrow_mut()
.flags .flags
.insert(Flags::CLOSED); .insert(IoTestFlags::CLOSED);
*st = Shutdown::Stopping; *st = Shutdown::Stopping;
continue; continue;
} }
@ -752,6 +642,113 @@ pub(super) fn flush_io(
} }
} }
#[cfg(any(feature = "tokio", feature = "tokio-traits"))]
mod tokio_impl {
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
use super::*;
impl AsyncRead for IoTest {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let guard = self.local.lock().unwrap();
let mut ch = guard.borrow_mut();
*ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone());
if !ch.buf.is_empty() {
let size = std::cmp::min(ch.buf.len(), buf.remaining());
let b = ch.buf.split_to(size);
buf.put_slice(&b);
return Poll::Ready(Ok(()));
}
match mem::take(&mut ch.read) {
IoTestState::Ok => Poll::Pending,
IoTestState::Close => {
ch.read = IoTestState::Close;
Poll::Ready(Ok(()))
}
IoTestState::Pending => Poll::Pending,
IoTestState::Err(e) => Poll::Ready(Err(e)),
}
}
}
impl AsyncWrite for IoTest {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let guard = self.remote.lock().unwrap();
let mut ch = guard.borrow_mut();
match mem::take(&mut ch.write) {
IoTestState::Ok => {
let cap = cmp::min(buf.len(), ch.buf_cap);
if cap > 0 {
ch.buf.extend(&buf[..cap]);
ch.buf_cap -= cap;
ch.flags.remove(IoTestFlags::FLUSHED);
ch.waker.wake();
Poll::Ready(Ok(cap))
} else {
*self
.local
.lock()
.unwrap()
.borrow_mut()
.waker
.0
.lock()
.unwrap()
.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
}
}
IoTestState::Close => Poll::Ready(Ok(0)),
IoTestState::Pending => {
*self
.local
.lock()
.unwrap()
.borrow_mut()
.waker
.0
.lock()
.unwrap()
.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
}
IoTestState::Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_flush(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.local
.lock()
.unwrap()
.borrow_mut()
.flags
.insert(IoTestFlags::CLOSED);
Poll::Ready(Ok(()))
}
}
}
#[cfg(test)] #[cfg(test)]
#[allow(clippy::redundant_clone)] #[allow(clippy::redundant_clone)]
mod tests { mod tests {

View file

@ -2,10 +2,10 @@ use std::{
cell::RefCell, collections::BTreeMap, collections::HashSet, rc::Rc, time::Instant, cell::RefCell, collections::BTreeMap, collections::HashSet, rc::Rc, time::Instant,
}; };
use ntex_util::spawn;
use ntex_util::time::{now, sleep, Millis}; use ntex_util::time::{now, sleep, Millis};
use super::state::{IoRef, IoStateInner}; use crate::rt::spawn;
use crate::state::{IoRef, IoStateInner};
pub struct Timer(Rc<RefCell<Inner>>); pub struct Timer(Rc<RefCell<Inner>>);

View file

@ -1,12 +1,12 @@
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{any, cell::RefCell, cmp, future::Future, io, pin::Pin, rc::Rc}; use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut}; use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_util::time::{sleep, Sleep}; use ntex_util::time::{sleep, Sleep};
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf}; use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
use tok_io::net::TcpStream; use tok_io::net::TcpStream;
use super::{ use crate::{
types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext, types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext,
WriteReadiness, WriteReadiness,
}; };
@ -71,7 +71,7 @@ impl Future for ReadTask {
buf.reserve(hw - remaining); buf.reserve(hw - remaining);
} }
match ntex_codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf) { match poll_read_buf(Pin::new(&mut *io), cx, &mut buf) {
Poll::Pending => break, Poll::Pending => break,
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
if n == 0 { if n == 0 {
@ -505,8 +505,7 @@ mod unixstream {
buf.reserve(hw - remaining); buf.reserve(hw - remaining);
} }
match ntex_codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf) match poll_read_buf(Pin::new(&mut *io), cx, &mut buf) {
{
Poll::Pending => break, Poll::Pending => break,
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
if n == 0 { if n == 0 {
@ -699,3 +698,35 @@ mod unixstream {
} }
} }
} }
pub fn poll_read_buf<T: AsyncRead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesMut,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let n = {
let dst =
unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [mem::MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
if io.poll_read(cx, &mut buf)?.is_pending() {
return Poll::Pending;
}
// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
buf.filled().len()
};
// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}

37
ntex-io/src/tokio_rt.rs Normal file
View file

@ -0,0 +1,37 @@
//! async net providers
use ntex_util::future::lazy;
use std::future::Future;
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
where
F: Future + 'static,
{
tok_io::task::spawn_local(f)
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move {
let r = lazy(|_| f()).await;
r.await
})
}

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-rt" name = "ntex-rt"
version = "0.3.2" version = "0.4.0-b.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime" description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -15,7 +15,17 @@ edition = "2018"
name = "ntex_rt" name = "ntex_rt"
path = "src/lib.rs" path = "src/lib.rs"
[features]
default = ["tokio"]
# tokio support
tokio = ["tok-io", "ntex-io/tokio"]
[dependencies] [dependencies]
ntex-bytes = "0.1.7"
ntex-io = "0.1.0"
ntex-util = "0.1.2" ntex-util = "0.1.2"
log = "0.4"
pin-project-lite = "0.2" pin-project-lite = "0.2"
tokio = { version = "1", default-features = false, features = ["rt", "net", "time", "signal", "sync"] }
tok-io = { version = "1", package = "tokio", default-features = false, features = ["rt", "signal", "sync"], optional = true }

View file

@ -3,12 +3,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread}; use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tok_io::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::{channel, error::RecvError, Sender}; use tok_io::sync::oneshot::{channel, error::RecvError, Sender};
use tokio::task::LocalSet;
use super::runtime::Runtime; use crate::{system::System, Runtime};
use super::system::System;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
@ -38,27 +36,27 @@ impl fmt::Debug for Arbiter {
} }
} }
impl Default for Arbiter {
fn default() -> Arbiter {
Arbiter::new()
}
}
impl Clone for Arbiter { impl Clone for Arbiter {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self::with_sender(self.sender.clone()) Self::with_sender(self.sender.clone())
} }
} }
impl Default for Arbiter {
fn default() -> Self {
Self::new()
}
}
impl Arbiter { impl Arbiter {
pub(super) fn new_system(local: &LocalSet) -> Self { pub(super) fn new_system(rt: &Box<dyn Runtime>) -> Self {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let arb = Arbiter::with_sender(tx); let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(ArbiterController { stop: None, rx }); rt.spawn(Box::pin(ArbiterController { stop: None, rx }));
arb arb
} }
@ -89,7 +87,7 @@ impl Arbiter {
let handle = thread::Builder::new() let handle = thread::Builder::new()
.name(name.clone()) .name(name.clone())
.spawn(move || { .spawn(move || {
let rt = Runtime::new().expect("Cannot create Runtime"); let rt = crate::create_runtime();
let arb = Arbiter::with_sender(arb_tx); let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = channel(); let (stop, stop_rx) = channel();
@ -98,10 +96,10 @@ impl Arbiter {
System::set_current(sys); System::set_current(sys);
// start arbiter controller // start arbiter controller
rt.spawn(ArbiterController { rt.spawn(Box::pin(ArbiterController {
stop: Some(stop), stop: Some(stop),
rx: arb_rx, rx: arb_rx,
}); }));
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register arbiter // register arbiter
@ -110,7 +108,9 @@ impl Arbiter {
.send(SystemCommand::RegisterArbiter(id, arb)); .send(SystemCommand::RegisterArbiter(id, arb));
// run loop // run loop
let _ = rt.block_on(stop_rx); rt.block_on(Box::pin(async move {
let _ = stop_rx.await;
}));
// unregister arbiter // unregister arbiter
let _ = System::current() let _ = System::current()
@ -265,7 +265,7 @@ impl Future for ArbiterController {
return Poll::Ready(()); return Poll::Ready(());
} }
ArbiterCommand::Execute(fut) => { ArbiterCommand::Execute(fut) => {
tokio::task::spawn_local(fut); tok_io::task::spawn(fut);
} }
ArbiterCommand::ExecuteFn(f) => { ArbiterCommand::ExecuteFn(f) => {
f.call_box(); f.call_box();

View file

@ -1,13 +1,11 @@
use std::{borrow::Cow, future::Future, io}; use std::{cell::RefCell, future::Future, io, rc::Rc};
use ntex_util::future::lazy; use ntex_util::future::lazy;
use tokio::sync::mpsc::unbounded_channel; use tok_io::sync::mpsc::unbounded_channel;
use tokio::sync::oneshot::{channel, Receiver}; use tok_io::sync::oneshot::{channel, Receiver};
use tokio::task::LocalSet;
use super::arbiter::{Arbiter, SystemArbiter}; use crate::arbiter::{Arbiter, SystemArbiter};
use super::runtime::Runtime; use crate::{create_runtime, Runtime, System};
use super::system::System;
/// Builder struct for a ntex runtime. /// Builder struct for a ntex runtime.
/// ///
@ -16,8 +14,7 @@ use super::system::System;
/// run a function in its context. /// run a function in its context.
pub struct Builder { pub struct Builder {
/// Name of the System. Defaults to "ntex" if unset. /// Name of the System. Defaults to "ntex" if unset.
name: Cow<'static, str>, name: String,
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
stop_on_panic: bool, stop_on_panic: bool,
} }
@ -25,14 +22,14 @@ pub struct Builder {
impl Builder { impl Builder {
pub(super) fn new() -> Self { pub(super) fn new() -> Self {
Builder { Builder {
name: Cow::Borrowed("ntex"), name: "ntex".into(),
stop_on_panic: false, stop_on_panic: false,
} }
} }
/// Sets the name of the System. /// Sets the name of the System.
pub fn name<T: Into<String>>(mut self, name: T) -> Self { pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
self.name = Cow::Owned(name.into()); self.name = name.as_ref().into();
self self
} }
@ -52,15 +49,6 @@ impl Builder {
self.create_runtime(|| {}) self.create_runtime(|| {})
} }
/// Create new System that can run asynchronously.
/// This method could be used to run ntex system in existing tokio
/// runtime.
///
/// This method panics if it cannot start the system arbiter
pub fn finish_with(self, local: &LocalSet) -> AsyncSystemRunner {
self.create_async_runtime(local)
}
/// This function will start tokio runtime and will finish once the /// This function will start tokio runtime and will finish once the
/// `System::stop()` message get called. /// `System::stop()` message get called.
/// Function `f` get called within tokio runtime context. /// Function `f` get called within tokio runtime context.
@ -71,25 +59,6 @@ impl Builder {
self.create_runtime(f).run() self.create_runtime(f).run()
} }
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded_channel();
let _system = System::construct(
sys_sender,
Arbiter::new_system(local),
self.stop_on_panic,
);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
// start the system arbiter
let _ = local.spawn_local(arb);
AsyncSystemRunner { stop, _system }
}
fn create_runtime<F>(self, f: F) -> SystemRunner fn create_runtime<F>(self, f: F) -> SystemRunner
where where
F: FnOnce() + 'static, F: FnOnce() + 'static,
@ -97,65 +66,26 @@ impl Builder {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded_channel(); let (sys_sender, sys_receiver) = unbounded_channel();
let rt = Runtime::new().unwrap(); let rt = create_runtime();
// set ntex-util spawn fn
ntex_util::set_spawn_fn(|fut| {
tokio::task::spawn_local(fut);
});
// system arbiter // system arbiter
let _system = System::construct( let _system =
sys_sender, System::construct(sys_sender, Arbiter::new_system(&rt), self.stop_on_panic);
Arbiter::new_system(rt.local()),
self.stop_on_panic,
);
let arb = SystemArbiter::new(stop_tx, sys_receiver); let arb = SystemArbiter::new(stop_tx, sys_receiver);
rt.spawn(arb); rt.spawn(Box::pin(arb));
// init system arbiter and run configuration method // init system arbiter and run configuration method
rt.block_on(lazy(move |_| f())); let runner = SystemRunner { rt, stop, _system };
runner.block_on(lazy(move |_| f()));
SystemRunner { rt, stop, _system } runner
}
}
#[derive(Debug)]
pub struct AsyncSystemRunner {
stop: Receiver<i32>,
_system: System,
}
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called.
pub fn run(self) -> impl Future<Output = Result<(), io::Error>> + Send {
let AsyncSystemRunner { stop, .. } = self;
// run loop
async move {
match stop.await {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
} }
} }
/// Helper object that runs System's event loop /// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"] #[must_use = "SystemRunner must be run"]
#[derive(Debug)]
pub struct SystemRunner { pub struct SystemRunner {
rt: Runtime, rt: Box<dyn Runtime>,
stop: Receiver<i32>, stop: Receiver<i32>,
_system: System, _system: System,
} }
@ -167,7 +97,7 @@ impl SystemRunner {
let SystemRunner { rt, stop, .. } = self; let SystemRunner { rt, stop, .. } = self;
// run loop // run loop
match rt.block_on(stop) { match block_on(&rt, stop).take() {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -184,23 +114,48 @@ impl SystemRunner {
/// Execute a future and wait for result. /// Execute a future and wait for result.
#[inline] #[inline]
pub fn block_on<F, O>(&mut self, fut: F) -> O pub fn block_on<F, R>(&self, fut: F) -> R
where where
F: Future<Output = O>, F: Future<Output = R> + 'static,
R: 'static,
{ {
self.rt.block_on(fut) block_on(&self.rt, fut).take()
} }
/// Execute a function with enabled executor. /// Execute a function with enabled executor.
#[inline] #[inline]
pub fn exec<F, R>(&mut self, f: F) -> R pub fn exec<F, R>(&self, f: F) -> R
where where
F: FnOnce() -> R, F: FnOnce() -> R + 'static,
R: 'static,
{ {
self.rt.block_on(lazy(|_| f())) self.block_on(lazy(|_| f()))
} }
} }
pub struct BlockResult<T>(Rc<RefCell<Option<T>>>);
impl<T> BlockResult<T> {
pub fn take(self) -> T {
self.0.borrow_mut().take().unwrap()
}
}
#[inline]
fn block_on<F, R>(rt: &Box<dyn Runtime>, fut: F) -> BlockResult<R>
where
F: Future<Output = R> + 'static,
R: 'static,
{
let result = Rc::new(RefCell::new(None));
let result_inner = result.clone();
rt.block_on(Box::pin(async move {
let r = fut.await;
*result_inner.borrow_mut() = Some(r);
}));
BlockResult(result)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::mpsc; use std::sync::mpsc;
@ -213,10 +168,10 @@ mod tests {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tok_io::runtime::Builder::new_current_thread()
.build() .build()
.unwrap(); .unwrap();
let local = tokio::task::LocalSet::new(); let local = tok_io::task::LocalSet::new();
let runner = crate::System::build() let runner = crate::System::build()
.stop_on_panic(true) .stop_on_panic(true)

View file

@ -1,82 +1,38 @@
//! A runtime implementation that runs everything on the current thread. //! A runtime implementation that runs everything on the current thread.
use ntex_util::future::lazy; use std::{future::Future, pin::Pin};
use std::future::Future;
mod arbiter; mod arbiter;
mod builder; mod builder;
mod runtime;
mod system; mod system;
mod time;
#[doc(hidden)]
pub mod time_driver {
pub use super::time::*;
}
pub use self::arbiter::Arbiter; pub use self::arbiter::Arbiter;
pub use self::builder::{Builder, SystemRunner}; pub use self::builder::{Builder, SystemRunner};
pub use self::runtime::Runtime;
pub use self::system::System; pub use self::system::System;
/// Spawn a future on the current thread. This does not create a new Arbiter #[cfg(feature = "tokio")]
/// or Arbiter address, it is simply a helper for spawning futures on the current mod tokio;
/// thread. #[cfg(feature = "tokio")]
/// pub use self::tokio::*;
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> self::task::JoinHandle<F::Output>
where
F: Future + 'static,
{
tokio::task::spawn_local(f)
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> tokio::task::JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
tokio::task::spawn_local(async move {
let r = lazy(|_| f()).await;
r.await
})
}
/// Asynchronous signal handling /// Asynchronous signal handling
pub mod signal { pub mod signal {
#[cfg(unix)] #[cfg(unix)]
pub mod unix { pub mod unix {
pub use tokio::signal::unix::*; pub use tok_io::signal::unix::*;
} }
pub use tokio::signal::ctrl_c; pub use tok_io::signal::ctrl_c;
}
/// TCP/UDP/Unix bindings
pub mod net {
pub use tokio::net::UdpSocket;
pub use tokio::net::{TcpListener, TcpStream};
#[cfg(unix)]
pub mod unix {
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
}
#[cfg(unix)]
pub use self::unix::*;
} }
/// Task management. /// Task management.
pub mod task { pub mod task {
pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle}; pub use tok_io::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
}
pub trait Runtime {
/// Spawn a future onto the single-threaded runtime.
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()>>>);
/// Runs the provided future, blocking the current thread until the future
/// completes.
fn block_on(&self, f: Pin<Box<dyn Future<Output = ()>>>);
} }

View file

@ -1,99 +0,0 @@
use std::future::Future;
use std::io;
use tokio::{runtime, task::LocalSet};
/// Single-threaded runtime provides a way to start reactor
/// and runtime on the current thread.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
#[derive(Debug)]
pub struct Runtime {
local: LocalSet,
rt: runtime::Runtime,
}
impl Runtime {
#[allow(clippy::new_ret_no_self)]
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
let rt = runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()?;
Ok(Runtime {
rt,
local: LocalSet::new(),
})
}
pub(super) fn local(&self) -> &LocalSet {
&self.local
}
/// Spawn a future onto the single-threaded runtime.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```rust,ignore
/// # use futures::{future, Future, Stream};
/// use ntex_rt::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|_| {
/// println!("running on the runtime");
/// }));
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&self, future: F) -> &Self
where
F: Future<Output = ()> + 'static,
{
self.local.spawn_local(future);
self
}
/// Runs the provided future, blocking the current thread until the future
/// completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function will **also** execute any spawned futures on the
/// current thread, but will **not** block until these other spawned futures
/// have completed. Once the function returns, any uncompleted futures
/// remain pending in the `Runtime` instance. These futures will not run
/// until `block_on` or `run` is called again.
///
/// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
// set ntex-util spawn fn
ntex_util::set_spawn_fn(|fut| {
crate::spawn(fut);
});
self.local.block_on(&self.rt, f)
}
}

View file

@ -1,6 +1,5 @@
use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cell::RefCell, io, sync::atomic::AtomicUsize, sync::atomic::Ordering};
use std::{cell::RefCell, io}; use tok_io::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::UnboundedSender;
use super::arbiter::{Arbiter, SystemCommand}; use super::arbiter::{Arbiter, SystemCommand};
use super::builder::{Builder, SystemRunner}; use super::builder::{Builder, SystemRunner};
@ -49,7 +48,7 @@ impl System {
/// Create new system. /// Create new system.
/// ///
/// This method panics if it can not create tokio runtime /// This method panics if it can not create tokio runtime
pub fn new<T: Into<String>>(name: T) -> SystemRunner { pub fn new(name: &str) -> SystemRunner {
Self::build().name(name).finish() Self::build().name(name).finish()
} }

View file

@ -1,88 +0,0 @@
/// Utilities for tracking time.
use std::{future::Future, pin::Pin, task, task::Poll, time::Duration, time::Instant};
use tokio::time;
pub use tokio::time::{interval, Interval};
pub use tokio::time::{timeout, Timeout};
/// Waits until `deadline` is reached.
///
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
/// operates at millisecond granularity and should not be used for tasks that
/// require high-resolution timers.
///
/// # Cancellation
///
/// Canceling a sleep instance is done by dropping the returned future. No additional
/// cleanup work is required.
#[inline]
pub fn sleep_until(deadline: Instant) -> Sleep {
Sleep {
inner: time::sleep_until(deadline.into()),
}
}
/// Waits until `duration` has elapsed.
///
/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
/// analog to `std::thread::sleep`.
pub fn sleep(duration: Duration) -> Sleep {
Sleep {
inner: time::sleep(duration),
}
}
#[doc(hidden)]
/// Creates new [`Interval`] that yields with interval of `period` with the
/// first tick completing at `start`. The default `MissedTickBehavior` is
/// `Burst`, but this can be configured
/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
#[inline]
pub fn interval_at(start: Instant, period: Duration) -> Interval {
time::interval_at(start.into(), period)
}
pin_project_lite::pin_project! {
/// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
#[derive(Debug)]
pub struct Sleep {
#[pin]
inner: time::Sleep,
}
}
impl Sleep {
/// Returns the instant at which the future will complete.
#[inline]
pub fn deadline(&self) -> Instant {
self.inner.deadline().into_std()
}
/// Returns `true` if `Sleep` has elapsed.
///
/// A `Sleep` instance is elapsed when the requested duration has elapsed.
#[inline]
pub fn is_elapsed(&self) -> bool {
self.inner.is_elapsed()
}
/// Resets the `Sleep` instance to a new deadline.
///
/// Calling this function allows changing the instant at which the `Sleep`
/// future completes without having to create new associated state.
///
/// This function can be called both before and after the future has
/// completed.
#[inline]
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.project().inner.reset(deadline.into());
}
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

134
ntex-rt/src/tokio.rs Normal file
View file

@ -0,0 +1,134 @@
use std::{future::Future, io, net, net::SocketAddr, path::Path, pin::Pin};
use ntex_bytes::PoolRef;
use ntex_io::Io;
use ntex_util::future::lazy;
use tok_io::{runtime, task::LocalSet};
use crate::Runtime;
/// Create new single-threaded tokio runtime.
pub fn create_runtime() -> Box<dyn Runtime> {
Box::new(TokioRuntime::new().unwrap())
}
/// Opens a TCP connection to a remote host.
pub fn tcp_connect(
addr: SocketAddr,
) -> Pin<Box<dyn Future<Output = Result<Io, io::Error>>>> {
Box::pin(async move {
let sock = tok_io::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::new(sock))
})
}
/// Opens a TCP connection to a remote host and use specified memory pool.
pub fn tcp_connect_in(
addr: SocketAddr,
pool: PoolRef,
) -> Pin<Box<dyn Future<Output = Result<Io, io::Error>>>> {
Box::pin(async move {
let sock = tok_io::net::TcpStream::connect(addr).await?;
sock.set_nodelay(true)?;
Ok(Io::with_memory_pool(sock, pool))
})
}
#[cfg(unix)]
/// Opens a unix stream connection.
pub fn unix_connect<P>(addr: P) -> Pin<Box<dyn Future<Output = Result<Io, io::Error>>>>
where
P: AsRef<Path> + 'static,
{
Box::pin(async move {
let sock = tok_io::net::UnixStream::connect(addr).await?;
Ok(Io::new(sock))
})
}
/// Convert std TcpStream to tokio's TcpStream
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io, io::Error> {
stream.set_nonblocking(true)?;
stream.set_nodelay(true)?;
Ok(Io::new(tok_io::net::TcpStream::from_std(stream)?))
}
#[cfg(unix)]
/// Convert std UnixStream to tokio's UnixStream
pub fn from_unix_stream(
stream: std::os::unix::net::UnixStream,
) -> Result<Io, io::Error> {
stream.set_nonblocking(true)?;
Ok(Io::new(tok_io::net::UnixStream::from_std(stream)?))
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
where
F: Future + 'static,
{
tok_io::task::spawn_local(f)
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
spawn(async move {
let r = lazy(|_| f()).await;
r.await
})
}
/// Single-threaded tokio runtime.
#[derive(Debug)]
struct TokioRuntime {
local: LocalSet,
rt: runtime::Runtime,
}
impl TokioRuntime {
/// Returns a new runtime initialized with default configuration values.
fn new() -> io::Result<Self> {
let rt = runtime::Builder::new_current_thread().enable_io().build()?;
Ok(Self {
rt,
local: LocalSet::new(),
})
}
}
impl Runtime for TokioRuntime {
/// Spawn a future onto the single-threaded runtime.
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()>>>) {
self.local.spawn_local(future);
}
/// Runs the provided future, blocking the current thread until the future
/// completes.
fn block_on(&self, f: Pin<Box<dyn Future<Output = ()>>>) {
// set ntex-util spawn fn
ntex_util::set_spawn_fn(|fut| {
tok_io::task::spawn_local(fut);
});
self.local.block_on(&self.rt, f);
}
}

View file

@ -6,7 +6,7 @@
* Move ntex::time to ntex-util crate * Move ntex::time to ntex-util crate
* Replace mio with poller for accept loop * Replace mio with polling for accept loop
## [0.4.13] - 2021-12-07 ## [0.4.13] - 2021-12-07

View file

@ -43,16 +43,17 @@ http-framework = ["h2", "http", "httparse",
"httpdate", "encoding_rs", "mime", "percent-encoding", "serde_json", "serde_urlencoded"] "httpdate", "encoding_rs", "mime", "percent-encoding", "serde_json", "serde_urlencoded"]
[dependencies] [dependencies]
ntex-codec = "0.5.1" ntex-codec = "0.6.0"
ntex-rt = "0.3.2"
ntex-router = "0.5.1" ntex-router = "0.5.1"
ntex-service = "0.2.1" ntex-service = "0.2.1"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "0.1.2" ntex-util = "0.1.2"
ntex-bytes = "0.1.7" ntex-bytes = "0.1.7"
ntex-io = { version = "0.1", features = ["tokio"] } ntex-io = "0.1"
ntex-tls = "0.1" ntex-tls = "0.1"
ntex-rt = { version = "0.4.0-b.0", default-features = false, features = ["tokio"] }
base64 = "0.13" base64 = "0.13"
bitflags = "1.3" bitflags = "1.3"
derive_more = "0.99.14" derive_more = "0.99.14"

View file

@ -141,7 +141,7 @@ mod tests {
let factory = Connector::new(config).clone(); let factory = Connector::new(config).clone();
let srv = factory.new_service(()).await.unwrap(); let srv = factory.new_service(()).await.unwrap();
let result = srv let _result = srv
.call(Connect::new("www.rust-lang.org").set_addr(Some(server.addr()))) .call(Connect::new("www.rust-lang.org").set_addr(Some(server.addr())))
.await; .await;

View file

@ -1,8 +1,8 @@
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin}; use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin};
use crate::io::Io; use crate::io::{types, Io};
use crate::rt::net::TcpStream; use crate::rt::tcp_connect_in;
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
use crate::util::{Either, PoolId, PoolRef, Ready}; use crate::util::{Either, PoolId, PoolRef, Ready};
@ -128,7 +128,7 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
if let Some(addr) = addr { if let Some(addr) = addr {
self.state = ConnectState::Connect(TcpConnectorResponse::new( self.state = ConnectState::Connect(TcpConnectorResponse::new(
req, port, addr, req, port, addr, self.pool,
)); ));
self.poll(cx) self.poll(cx)
} else if let Some(addr) = req.addr() { } else if let Some(addr) = req.addr() {
@ -136,6 +136,7 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
req, req,
addr.port(), addr.port(),
Either::Left(addr), Either::Left(addr),
self.pool,
)); ));
self.poll(cx) self.poll(cx)
} else { } else {
@ -144,12 +145,7 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
} }
} }
}, },
ConnectState::Connect(ref mut fut) => match Pin::new(fut).poll(cx)? { ConnectState::Connect(ref mut fut) => Pin::new(fut).poll(cx),
Poll::Pending => Poll::Pending,
Poll::Ready(stream) => {
Poll::Ready(Ok(Io::with_memory_pool(stream, self.pool)))
}
},
} }
} }
} }
@ -159,7 +155,8 @@ struct TcpConnectorResponse<T> {
req: Option<T>, req: Option<T>,
port: u16, port: u16,
addrs: Option<VecDeque<SocketAddr>>, addrs: Option<VecDeque<SocketAddr>>,
stream: Option<Pin<Box<dyn Future<Output = Result<TcpStream, io::Error>>>>>, stream: Option<Pin<Box<dyn Future<Output = Result<Io, io::Error>>>>>,
pool: PoolRef,
} }
impl<T: Address> TcpConnectorResponse<T> { impl<T: Address> TcpConnectorResponse<T> {
@ -167,6 +164,7 @@ impl<T: Address> TcpConnectorResponse<T> {
req: T, req: T,
port: u16, port: u16,
addr: Either<SocketAddr, VecDeque<SocketAddr>>, addr: Either<SocketAddr, VecDeque<SocketAddr>>,
pool: PoolRef,
) -> TcpConnectorResponse<T> { ) -> TcpConnectorResponse<T> {
trace!( trace!(
"TCP connector - connecting to {:?} port:{}", "TCP connector - connecting to {:?} port:{}",
@ -177,13 +175,15 @@ impl<T: Address> TcpConnectorResponse<T> {
match addr { match addr {
Either::Left(addr) => TcpConnectorResponse { Either::Left(addr) => TcpConnectorResponse {
req: Some(req), req: Some(req),
port,
addrs: None, addrs: None,
stream: Some(Box::pin(TcpStream::connect(addr))), stream: Some(tcp_connect_in(addr, pool)),
pool,
port,
}, },
Either::Right(addrs) => TcpConnectorResponse { Either::Right(addrs) => TcpConnectorResponse {
req: Some(req),
port, port,
pool,
req: Some(req),
addrs: Some(addrs), addrs: Some(addrs),
stream: None, stream: None,
}, },
@ -202,7 +202,7 @@ impl<T: Address> TcpConnectorResponse<T> {
} }
impl<T: Address> Future for TcpConnectorResponse<T> { impl<T: Address> Future for TcpConnectorResponse<T> {
type Output = Result<TcpStream, ConnectError>; type Output = Result<Io, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
@ -212,16 +212,11 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
if let Some(new) = this.stream.as_mut() { if let Some(new) = this.stream.as_mut() {
match new.as_mut().poll(cx) { match new.as_mut().poll(cx) {
Poll::Ready(Ok(sock)) => { Poll::Ready(Ok(sock)) => {
if let Err(err) = sock.set_nodelay(true) {
if !this.can_continue(&err) {
return Poll::Ready(Err(err.into()));
}
}
let req = this.req.take().unwrap(); let req = this.req.take().unwrap();
trace!( trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}", "TCP connector - successfully connected to connecting to {:?} - {:?}",
req.host(), sock.peer_addr() req.host(),
sock.query::<types::PeerAddr>().get()
); );
return Poll::Ready(Ok(sock)); return Poll::Ready(Ok(sock));
} }
@ -236,7 +231,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
// try to connect // try to connect
let addr = this.addrs.as_mut().unwrap().pop_front().unwrap(); let addr = this.addrs.as_mut().unwrap().pop_front().unwrap();
this.stream = Some(Box::pin(TcpStream::connect(addr))); this.stream = Some(tcp_connect_in(addr, this.pool));
} }
} }
} }

View file

@ -214,7 +214,7 @@ pub fn server<F: StreamServiceFactory>(factory: F) -> TestServer {
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let mut sys = System::new("test-server"); let sys = System::new("test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();

View file

@ -553,9 +553,9 @@ mod tests {
fn start(tx: mpsc::Sender<(Server, net::SocketAddr)>) -> thread::JoinHandle<()> { fn start(tx: mpsc::Sender<(Server, net::SocketAddr)>) -> thread::JoinHandle<()> {
thread::spawn(move || { thread::spawn(move || {
let mut sys = crate::rt::System::new("test"); let sys = crate::rt::System::new("test");
let addr = TestServer::unused_addr(); let addr = TestServer::unused_addr();
let srv = sys.exec(|| { let srv = sys.exec(move || {
crate::server::build() crate::server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()

View file

@ -1,6 +1,6 @@
use std::{convert::TryFrom, fmt, io, net}; use std::{convert::TryFrom, fmt, io, net};
use crate::{io::Io, rt::net::TcpStream}; use crate::{io::Io, rt};
pub(crate) enum Listener { pub(crate) enum Listener {
Tcp(net::TcpListener), Tcp(net::TcpListener),
@ -145,17 +145,9 @@ impl TryFrom<Stream> for Io {
fn try_from(sock: Stream) -> Result<Self, Self::Error> { fn try_from(sock: Stream) -> Result<Self, Self::Error> {
match sock { match sock {
Stream::Tcp(stream) => { Stream::Tcp(stream) => rt::from_tcp_stream(stream),
stream.set_nonblocking(true)?;
stream.set_nodelay(true)?;
Ok(Io::new(TcpStream::from_std(stream)?))
}
#[cfg(unix)] #[cfg(unix)]
Stream::Uds(stream) => { Stream::Uds(stream) => rt::from_unix_stream(stream),
use crate::rt::net::UnixStream;
stream.set_nonblocking(true)?;
Ok(Io::new(UnixStream::from_std(stream)?))
}
} }
} }
} }

View file

@ -3,7 +3,8 @@ use std::{io, net, sync::mpsc, thread};
use socket2::{Domain, SockAddr, Socket, Type}; use socket2::{Domain, SockAddr, Socket, Type};
use crate::rt::{net::TcpStream, System}; use crate::io::Io;
use crate::rt::{tcp_connect, System};
use crate::server::{Server, ServerBuilder, StreamServiceFactory}; use crate::server::{Server, ServerBuilder, StreamServiceFactory};
/// Start test server /// Start test server
@ -42,7 +43,7 @@ pub fn test_server<F: StreamServiceFactory>(factory: F) -> TestServer {
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let mut sys = System::new("ntex-test-server"); let sys = System::new("ntex-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
@ -73,7 +74,7 @@ where
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let mut sys = System::new("ntex-test-server"); let sys = System::new("ntex-test-server");
sys.exec(|| { sys.exec(|| {
factory(Server::build()) factory(Server::build())
@ -105,9 +106,9 @@ impl TestServer {
self.addr self.addr
} }
/// Connect to server, return TcpStream /// Connect to server, return Io
pub async fn connect(&self) -> io::Result<TcpStream> { pub async fn connect(&self) -> io::Result<Io> {
TcpStream::connect(self.addr).await tcp_connect(self.addr).await
} }
/// Stop http server /// Stop http server

View file

@ -611,7 +611,7 @@ where
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let mut sys = System::new("ntex-test-server"); let sys = System::new("ntex-test-server");
let cfg = cfg.clone(); let cfg = cfg.clone();
let factory = factory.clone(); let factory = factory.clone();
@ -619,7 +619,7 @@ where
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
let srv = sys.exec(|| { let srv = sys.exec(move || {
let builder = Server::build().workers(1).disable_signals(); let builder = Server::build().workers(1).disable_signals();
match cfg.stream { match cfg.stream {

View file

@ -16,8 +16,8 @@ fn test_bind() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let srv = sys.exec(|| { let srv = sys.exec(move || {
Server::build() Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
@ -42,9 +42,9 @@ fn test_listen() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let lst = net::TcpListener::bind(addr).unwrap(); let lst = net::TcpListener::bind(addr).unwrap();
sys.exec(|| { sys.exec(move || {
Server::build() Server::build()
.disable_signals() .disable_signals()
.workers(1) .workers(1)
@ -70,8 +70,8 @@ fn test_start() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let srv = sys.exec(|| { let srv = sys.exec(move || {
Server::build() Server::build()
.backlog(100) .backlog(100)
.disable_signals() .disable_signals()
@ -140,8 +140,8 @@ fn test_on_worker_start() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num2.clone(); let num = num2.clone();
let num2 = num2.clone(); let num2 = num2.clone();
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let srv = sys.exec(|| { let srv = sys.exec(move || {
Server::build() Server::build()
.disable_signals() .disable_signals()
.configure(move |cfg| { .configure(move |cfg| {
@ -196,7 +196,7 @@ fn test_panic_in_worker() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let counter = counter2.clone(); let counter = counter2.clone();
let srv = sys.exec(move || { let srv = sys.exec(move || {
let counter = counter.clone(); let counter = counter.clone();
@ -215,7 +215,7 @@ fn test_panic_in_worker() {
.start() .start()
}); });
let _ = tx.send((srv.clone(), ntex::rt::System::current())); let _ = tx.send((srv.clone(), ntex::rt::System::current()));
sys.exec(move || ntex_rt::spawn(srv.map(|_| ()))); sys.exec(move || ntex::rt::spawn(srv.map(|_| ())));
let _ = sys.run(); let _ = sys.run();
}); });
let (_, sys) = rx.recv().unwrap(); let (_, sys) = rx.recv().unwrap();

View file

@ -4,7 +4,7 @@ use std::{sync::mpsc, thread, time::Duration};
use tls_openssl::ssl::SslAcceptorBuilder; use tls_openssl::ssl::SslAcceptorBuilder;
use ntex::web::{self, App, HttpResponse, HttpServer}; use ntex::web::{self, App, HttpResponse, HttpServer};
use ntex::{io::Io, server::TestServer, time::Seconds}; use ntex::{rt, server::TestServer, time::Seconds};
#[cfg(unix)] #[cfg(unix)]
#[ntex::test] #[ntex::test]
@ -13,9 +13,9 @@ async fn test_run() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let srv = sys.exec(|| { let srv = sys.exec(move || {
HttpServer::new(|| { HttpServer::new(|| {
App::new().service( App::new().service(
web::resource("/") web::resource("/")
@ -104,10 +104,10 @@ async fn test_openssl() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let builder = ssl_acceptor().unwrap(); let builder = ssl_acceptor().unwrap();
let srv = sys.exec(|| { let srv = sys.exec(move || {
HttpServer::new(|| { HttpServer::new(|| {
App::new().service(web::resource("/").route(web::to( App::new().service(web::resource("/").route(web::to(
|req: HttpRequest| async move { |req: HttpRequest| async move {
@ -157,7 +157,7 @@ async fn test_rustls() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
// load ssl keys // load ssl keys
let cert_file = &mut BufReader::new(File::open("./tests/cert.pem").unwrap()); let cert_file = &mut BufReader::new(File::open("./tests/cert.pem").unwrap());
@ -174,7 +174,7 @@ async fn test_rustls() {
.with_single_cert(cert_chain, keys) .with_single_cert(cert_chain, keys)
.unwrap(); .unwrap();
let srv = sys.exec(|| { let srv = sys.exec(move || {
HttpServer::new(|| { HttpServer::new(|| {
App::new().service(web::resource("/").route(web::to( App::new().service(web::resource("/").route(web::to(
|req: HttpRequest| async move { |req: HttpRequest| async move {
@ -215,9 +215,9 @@ async fn test_bind_uds() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let srv = sys.exec(|| { let srv = sys.exec(move || {
HttpServer::new(|| { HttpServer::new(|| {
App::new().service( App::new().service(
web::resource("/") web::resource("/")
@ -244,9 +244,7 @@ async fn test_bind_uds() {
.connector( .connector(
client::Connector::default() client::Connector::default()
.connector(ntex::service::fn_service(|_| async { .connector(ntex::service::fn_service(|_| async {
let stream = Ok(rt::unix_connect("/tmp/uds-test").await?)
ntex::rt::net::UnixStream::connect("/tmp/uds-test").await?;
Ok(Io::new(stream))
})) }))
.finish(), .finish(),
) )
@ -267,7 +265,7 @@ async fn test_listen_uds() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let mut sys = ntex::rt::System::new("test"); let sys = ntex::rt::System::new("test");
let srv = sys.exec(|| { let srv = sys.exec(|| {
let lst = std::os::unix::net::UnixListener::bind("/tmp/uds-test2").unwrap(); let lst = std::os::unix::net::UnixListener::bind("/tmp/uds-test2").unwrap();
@ -298,9 +296,7 @@ async fn test_listen_uds() {
.connector( .connector(
client::Connector::default() client::Connector::default()
.connector(ntex::service::fn_service(|_| async { .connector(ntex::service::fn_service(|_| async {
let stream = Ok(rt::unix_connect("/tmp/uds-test2").await?)
ntex::rt::net::UnixStream::connect("/tmp/uds-test2").await?;
Ok(Io::new(stream))
})) }))
.finish(), .finish(),
) )