mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
Merge pull request #72 from ntex-rs/rt-customization
Allow to replace async runtime
This commit is contained in:
commit
151c8841c5
34 changed files with 705 additions and 800 deletions
|
@ -18,25 +18,26 @@ path = "src/lib.rs"
|
|||
[features]
|
||||
default = ["tokio"]
|
||||
|
||||
# tokio support
|
||||
tokio = ["tok-io"]
|
||||
# tokio traits support
|
||||
tokio-traits = ["tok-io/net"]
|
||||
|
||||
# tokio runtime support
|
||||
tokio = ["tok-io/net"]
|
||||
|
||||
[dependencies]
|
||||
bitflags = "1.3"
|
||||
fxhash = "0.2.1"
|
||||
ntex-codec = "0.5.1"
|
||||
ntex-codec = "0.6.0"
|
||||
ntex-bytes = "0.1.7"
|
||||
ntex-util = "0.1.2"
|
||||
ntex-service = "0.2.1"
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.2"
|
||||
|
||||
tok-io = { version = "1", package = "tokio", default-features = false, features = ["net"], optional = true }
|
||||
|
||||
backtrace = "*"
|
||||
tok-io = { version = "1", package = "tokio", default-features = false, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
ntex = "0.5.0-b.0"
|
||||
futures = "0.3"
|
||||
rand = "0.8"
|
||||
env_logger = "0.9"
|
||||
env_logger = "0.9"
|
||||
|
|
|
@ -1,15 +1,13 @@
|
|||
//! Framed transport dispatcher
|
||||
use std::{
|
||||
cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll, time,
|
||||
};
|
||||
use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time};
|
||||
|
||||
use ntex_bytes::Pool;
|
||||
use ntex_codec::{Decoder, Encoder};
|
||||
use ntex_service::{IntoService, Service};
|
||||
use ntex_util::future::Either;
|
||||
use ntex_util::time::{now, Seconds};
|
||||
use ntex_util::{future::Either, spawn};
|
||||
|
||||
use super::{DispatchItem, IoBoxed, ReadRef, Timer, WriteRef};
|
||||
use super::{rt::spawn, DispatchItem, IoBoxed, ReadRef, Timer, WriteRef};
|
||||
|
||||
type Response<U> = <U as Encoder>::Item;
|
||||
|
||||
|
@ -178,7 +176,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, U> Future for Dispatcher<S, U>
|
||||
impl<S, U> future::Future for Dispatcher<S, U>
|
||||
where
|
||||
S: Service<Request = DispatchItem<U>, Response = Option<Response<U>>> + 'static,
|
||||
U: Decoder + Encoder + 'static,
|
||||
|
|
|
@ -1,17 +1,19 @@
|
|||
use std::{any::Any, any::TypeId, fmt, future::Future, io, task::Context, task::Poll};
|
||||
|
||||
pub mod testing;
|
||||
pub mod types;
|
||||
|
||||
mod dispatcher;
|
||||
mod filter;
|
||||
mod state;
|
||||
mod tasks;
|
||||
mod time;
|
||||
pub mod types;
|
||||
mod utils;
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
#[cfg(any(feature = "tokio-traits", feature = "tokio"))]
|
||||
mod tokio_impl;
|
||||
#[cfg(any(feature = "tokio"))]
|
||||
mod tokio_rt;
|
||||
|
||||
use ntex_bytes::BytesMut;
|
||||
use ntex_codec::{Decoder, Encoder};
|
||||
|
@ -128,6 +130,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub mod rt {
|
||||
//! async runtime helpers
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use crate::tokio_rt::*;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
@ -37,7 +37,7 @@ pub struct IoTest {
|
|||
}
|
||||
|
||||
bitflags::bitflags! {
|
||||
struct Flags: u8 {
|
||||
struct IoTestFlags: u8 {
|
||||
const FLUSHED = 0b0000_0001;
|
||||
const CLOSED = 0b0000_0010;
|
||||
}
|
||||
|
@ -61,35 +61,35 @@ struct State {
|
|||
struct Channel {
|
||||
buf: BytesMut,
|
||||
buf_cap: usize,
|
||||
flags: Flags,
|
||||
flags: IoTestFlags,
|
||||
waker: AtomicWaker,
|
||||
read: IoState,
|
||||
write: IoState,
|
||||
read: IoTestState,
|
||||
write: IoTestState,
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
fn is_closed(&self) -> bool {
|
||||
self.flags.contains(Flags::CLOSED)
|
||||
self.flags.contains(IoTestFlags::CLOSED)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Flags {
|
||||
impl Default for IoTestFlags {
|
||||
fn default() -> Self {
|
||||
Flags::empty()
|
||||
IoTestFlags::empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum IoState {
|
||||
enum IoTestState {
|
||||
Ok,
|
||||
Pending,
|
||||
Close,
|
||||
Err(io::Error),
|
||||
}
|
||||
|
||||
impl Default for IoState {
|
||||
impl Default for IoTestState {
|
||||
fn default() -> Self {
|
||||
IoState::Ok
|
||||
IoTestState::Ok
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,19 +139,19 @@ impl IoTest {
|
|||
|
||||
/// Set read to Pending state
|
||||
pub fn read_pending(&self) {
|
||||
self.remote.lock().unwrap().borrow_mut().read = IoState::Pending;
|
||||
self.remote.lock().unwrap().borrow_mut().read = IoTestState::Pending;
|
||||
}
|
||||
|
||||
/// Set read to error
|
||||
pub fn read_error(&self, err: io::Error) {
|
||||
let channel = self.remote.lock().unwrap();
|
||||
channel.borrow_mut().read = IoState::Err(err);
|
||||
channel.borrow_mut().read = IoTestState::Err(err);
|
||||
channel.borrow().waker.wake();
|
||||
}
|
||||
|
||||
/// Set write error on remote side
|
||||
pub fn write_error(&self, err: io::Error) {
|
||||
self.local.lock().unwrap().borrow_mut().write = IoState::Err(err);
|
||||
self.local.lock().unwrap().borrow_mut().write = IoTestState::Err(err);
|
||||
self.remote.lock().unwrap().borrow().waker.wake();
|
||||
}
|
||||
|
||||
|
@ -180,7 +180,7 @@ impl IoTest {
|
|||
{
|
||||
let guard = self.remote.lock().unwrap();
|
||||
let mut remote = guard.borrow_mut();
|
||||
remote.read = IoState::Close;
|
||||
remote.read = IoTestState::Close;
|
||||
remote.waker.wake();
|
||||
log::trace!("close remote socket");
|
||||
}
|
||||
|
@ -256,13 +256,13 @@ impl IoTest {
|
|||
}
|
||||
|
||||
match mem::take(&mut ch.read) {
|
||||
IoState::Ok => Poll::Pending,
|
||||
IoState::Close => {
|
||||
ch.read = IoState::Close;
|
||||
IoTestState::Ok => Poll::Pending,
|
||||
IoTestState::Close => {
|
||||
ch.read = IoTestState::Close;
|
||||
Poll::Ready(Ok(0))
|
||||
}
|
||||
IoState::Pending => Poll::Pending,
|
||||
IoState::Err(e) => Poll::Ready(Err(e)),
|
||||
IoTestState::Pending => Poll::Pending,
|
||||
IoTestState::Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,12 +275,12 @@ impl IoTest {
|
|||
let mut ch = guard.borrow_mut();
|
||||
|
||||
match mem::take(&mut ch.write) {
|
||||
IoState::Ok => {
|
||||
IoTestState::Ok => {
|
||||
let cap = cmp::min(buf.len(), ch.buf_cap);
|
||||
if cap > 0 {
|
||||
ch.buf.extend(&buf[..cap]);
|
||||
ch.buf_cap -= cap;
|
||||
ch.flags.remove(Flags::FLUSHED);
|
||||
ch.flags.remove(IoTestFlags::FLUSHED);
|
||||
ch.waker.wake();
|
||||
Poll::Ready(Ok(cap))
|
||||
} else {
|
||||
|
@ -297,8 +297,8 @@ impl IoTest {
|
|||
Poll::Pending
|
||||
}
|
||||
}
|
||||
IoState::Close => Poll::Ready(Ok(0)),
|
||||
IoState::Pending => {
|
||||
IoTestState::Close => Poll::Ready(Ok(0)),
|
||||
IoTestState::Pending => {
|
||||
*self
|
||||
.local
|
||||
.lock()
|
||||
|
@ -311,7 +311,7 @@ impl IoTest {
|
|||
.borrow_mut() = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
IoState::Err(e) => Poll::Ready(Err(e)),
|
||||
IoTestState::Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -346,125 +346,15 @@ impl Drop for IoTest {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
mod tokio {
|
||||
use std::task::{Context, Poll};
|
||||
use std::{cmp, io, mem, pin::Pin};
|
||||
|
||||
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
|
||||
use super::{Flags, IoState, IoTest};
|
||||
|
||||
impl AsyncRead for IoTest {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
let guard = self.local.lock().unwrap();
|
||||
let mut ch = guard.borrow_mut();
|
||||
*ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone());
|
||||
|
||||
if !ch.buf.is_empty() {
|
||||
let size = std::cmp::min(ch.buf.len(), buf.remaining());
|
||||
let b = ch.buf.split_to(size);
|
||||
buf.put_slice(&b);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
match mem::take(&mut ch.read) {
|
||||
IoState::Ok => Poll::Pending,
|
||||
IoState::Close => {
|
||||
ch.read = IoState::Close;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
IoState::Pending => Poll::Pending,
|
||||
IoState::Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for IoTest {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let guard = self.remote.lock().unwrap();
|
||||
let mut ch = guard.borrow_mut();
|
||||
|
||||
match mem::take(&mut ch.write) {
|
||||
IoState::Ok => {
|
||||
let cap = cmp::min(buf.len(), ch.buf_cap);
|
||||
if cap > 0 {
|
||||
ch.buf.extend(&buf[..cap]);
|
||||
ch.buf_cap -= cap;
|
||||
ch.flags.remove(Flags::FLUSHED);
|
||||
ch.waker.wake();
|
||||
Poll::Ready(Ok(cap))
|
||||
} else {
|
||||
*self
|
||||
.local
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.waker
|
||||
.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut() = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
IoState::Close => Poll::Ready(Ok(0)),
|
||||
IoState::Pending => {
|
||||
*self
|
||||
.local
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.waker
|
||||
.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut() = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
IoState::Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
self.local
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.flags
|
||||
.insert(Flags::CLOSED);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IoStream for IoTest {
|
||||
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
|
||||
let io = Rc::new(self);
|
||||
|
||||
ntex_util::spawn(ReadTask {
|
||||
crate::rt::spawn(ReadTask {
|
||||
io: io.clone(),
|
||||
state: read,
|
||||
});
|
||||
ntex_util::spawn(WriteTask {
|
||||
crate::rt::spawn(WriteTask {
|
||||
io: io.clone(),
|
||||
state: write,
|
||||
st: IoWriteState::Processing(None),
|
||||
|
@ -615,7 +505,7 @@ impl Future for WriteTask {
|
|||
.unwrap()
|
||||
.borrow_mut()
|
||||
.flags
|
||||
.insert(Flags::CLOSED);
|
||||
.insert(IoTestFlags::CLOSED);
|
||||
this.state.close(None);
|
||||
Poll::Ready(())
|
||||
}
|
||||
|
@ -652,7 +542,7 @@ impl Future for WriteTask {
|
|||
.unwrap()
|
||||
.borrow_mut()
|
||||
.flags
|
||||
.insert(Flags::CLOSED);
|
||||
.insert(IoTestFlags::CLOSED);
|
||||
*st = Shutdown::Stopping;
|
||||
continue;
|
||||
}
|
||||
|
@ -752,6 +642,113 @@ pub(super) fn flush_io(
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "tokio", feature = "tokio-traits"))]
|
||||
mod tokio_impl {
|
||||
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
|
||||
use super::*;
|
||||
|
||||
impl AsyncRead for IoTest {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
let guard = self.local.lock().unwrap();
|
||||
let mut ch = guard.borrow_mut();
|
||||
*ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone());
|
||||
|
||||
if !ch.buf.is_empty() {
|
||||
let size = std::cmp::min(ch.buf.len(), buf.remaining());
|
||||
let b = ch.buf.split_to(size);
|
||||
buf.put_slice(&b);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
match mem::take(&mut ch.read) {
|
||||
IoTestState::Ok => Poll::Pending,
|
||||
IoTestState::Close => {
|
||||
ch.read = IoTestState::Close;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
IoTestState::Pending => Poll::Pending,
|
||||
IoTestState::Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for IoTest {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let guard = self.remote.lock().unwrap();
|
||||
let mut ch = guard.borrow_mut();
|
||||
|
||||
match mem::take(&mut ch.write) {
|
||||
IoTestState::Ok => {
|
||||
let cap = cmp::min(buf.len(), ch.buf_cap);
|
||||
if cap > 0 {
|
||||
ch.buf.extend(&buf[..cap]);
|
||||
ch.buf_cap -= cap;
|
||||
ch.flags.remove(IoTestFlags::FLUSHED);
|
||||
ch.waker.wake();
|
||||
Poll::Ready(Ok(cap))
|
||||
} else {
|
||||
*self
|
||||
.local
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.waker
|
||||
.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut() = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
IoTestState::Close => Poll::Ready(Ok(0)),
|
||||
IoTestState::Pending => {
|
||||
*self
|
||||
.local
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.waker
|
||||
.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut() = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
IoTestState::Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
self.local
|
||||
.lock()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.flags
|
||||
.insert(IoTestFlags::CLOSED);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::redundant_clone)]
|
||||
mod tests {
|
||||
|
|
|
@ -2,10 +2,10 @@ use std::{
|
|||
cell::RefCell, collections::BTreeMap, collections::HashSet, rc::Rc, time::Instant,
|
||||
};
|
||||
|
||||
use ntex_util::spawn;
|
||||
use ntex_util::time::{now, sleep, Millis};
|
||||
|
||||
use super::state::{IoRef, IoStateInner};
|
||||
use crate::rt::spawn;
|
||||
use crate::state::{IoRef, IoStateInner};
|
||||
|
||||
pub struct Timer(Rc<RefCell<Inner>>);
|
||||
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use std::task::{Context, Poll};
|
||||
use std::{any, cell::RefCell, cmp, future::Future, io, pin::Pin, rc::Rc};
|
||||
use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc};
|
||||
|
||||
use ntex_bytes::{Buf, BufMut};
|
||||
use ntex_bytes::{Buf, BufMut, BytesMut};
|
||||
use ntex_util::time::{sleep, Sleep};
|
||||
use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tok_io::net::TcpStream;
|
||||
|
||||
use super::{
|
||||
use crate::{
|
||||
types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext,
|
||||
WriteReadiness,
|
||||
};
|
||||
|
@ -71,7 +71,7 @@ impl Future for ReadTask {
|
|||
buf.reserve(hw - remaining);
|
||||
}
|
||||
|
||||
match ntex_codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf) {
|
||||
match poll_read_buf(Pin::new(&mut *io), cx, &mut buf) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(Ok(n)) => {
|
||||
if n == 0 {
|
||||
|
@ -505,8 +505,7 @@ mod unixstream {
|
|||
buf.reserve(hw - remaining);
|
||||
}
|
||||
|
||||
match ntex_codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf)
|
||||
{
|
||||
match poll_read_buf(Pin::new(&mut *io), cx, &mut buf) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(Ok(n)) => {
|
||||
if n == 0 {
|
||||
|
@ -699,3 +698,35 @@ mod unixstream {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_read_buf<T: AsyncRead>(
|
||||
io: Pin<&mut T>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut BytesMut,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
if !buf.has_remaining_mut() {
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
|
||||
let n = {
|
||||
let dst =
|
||||
unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [mem::MaybeUninit<u8>]) };
|
||||
let mut buf = ReadBuf::uninit(dst);
|
||||
let ptr = buf.filled().as_ptr();
|
||||
if io.poll_read(cx, &mut buf)?.is_pending() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
// Ensure the pointer does not change from under us
|
||||
assert_eq!(ptr, buf.filled().as_ptr());
|
||||
buf.filled().len()
|
||||
};
|
||||
|
||||
// Safety: This is guaranteed to be the number of initialized (and read)
|
||||
// bytes due to the invariants provided by `ReadBuf::filled`.
|
||||
unsafe {
|
||||
buf.advance_mut(n);
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
|
|
37
ntex-io/src/tokio_rt.rs
Normal file
37
ntex-io/src/tokio_rt.rs
Normal file
|
@ -0,0 +1,37 @@
|
|||
//! async net providers
|
||||
use ntex_util::future::lazy;
|
||||
use std::future::Future;
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
tok_io::task::spawn_local(f)
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
spawn(async move {
|
||||
let r = lazy(|_| f()).await;
|
||||
r.await
|
||||
})
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-rt"
|
||||
version = "0.3.2"
|
||||
version = "0.4.0-b.0"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "ntex runtime"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -15,7 +15,19 @@ edition = "2018"
|
|||
name = "ntex_rt"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[features]
|
||||
default = ["tokio"]
|
||||
|
||||
# tokio support
|
||||
tokio = ["tok-io", "ntex-io/tokio"]
|
||||
|
||||
[dependencies]
|
||||
ntex-bytes = "0.1.7"
|
||||
ntex-io = "0.1.0"
|
||||
ntex-util = "0.1.2"
|
||||
async-oneshot = "0.5.0"
|
||||
async-channel = "1.6.1"
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.2"
|
||||
tokio = { version = "1", default-features = false, features = ["rt", "net", "time", "signal", "sync"] }
|
||||
|
||||
tok-io = { version = "1", package = "tokio", default-features = false, features = ["rt", "net", "signal"], optional = true }
|
||||
|
|
|
@ -3,12 +3,11 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
|||
use std::task::{Context, Poll};
|
||||
use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
|
||||
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::oneshot::{channel, error::RecvError, Sender};
|
||||
use tokio::task::LocalSet;
|
||||
use async_channel::{unbounded, Receiver, Sender};
|
||||
use async_oneshot as oneshot;
|
||||
use ntex_util::Stream;
|
||||
|
||||
use super::runtime::Runtime;
|
||||
use super::system::System;
|
||||
use crate::{system::System, Runtime};
|
||||
|
||||
thread_local!(
|
||||
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
||||
|
@ -28,7 +27,7 @@ pub(super) enum ArbiterCommand {
|
|||
/// and futures. When an Arbiter is created, it spawns a new OS thread, and
|
||||
/// hosts an event loop. Some Arbiter functions execute on the current thread.
|
||||
pub struct Arbiter {
|
||||
sender: UnboundedSender<ArbiterCommand>,
|
||||
sender: Sender<ArbiterCommand>,
|
||||
thread_handle: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
|
@ -38,27 +37,28 @@ impl fmt::Debug for Arbiter {
|
|||
}
|
||||
}
|
||||
|
||||
impl Default for Arbiter {
|
||||
fn default() -> Arbiter {
|
||||
Arbiter::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Arbiter {
|
||||
fn clone(&self) -> Self {
|
||||
Self::with_sender(self.sender.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Arbiter {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Arbiter {
|
||||
pub(super) fn new_system(local: &LocalSet) -> Self {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
#[allow(clippy::borrowed_box)]
|
||||
pub(super) fn new_system(rt: &Box<dyn Runtime>) -> Self {
|
||||
let (tx, rx) = unbounded();
|
||||
|
||||
let arb = Arbiter::with_sender(tx);
|
||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||
|
||||
local.spawn_local(ArbiterController { stop: None, rx });
|
||||
rt.spawn(Box::pin(ArbiterController { stop: None, rx }));
|
||||
|
||||
arb
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ impl Arbiter {
|
|||
|
||||
/// Stop arbiter from continuing it's event loop.
|
||||
pub fn stop(&self) {
|
||||
let _ = self.sender.send(ArbiterCommand::Stop);
|
||||
let _ = self.sender.try_send(ArbiterCommand::Stop);
|
||||
}
|
||||
|
||||
/// Spawn new thread and run event loop in spawned thread.
|
||||
|
@ -83,39 +83,41 @@ impl Arbiter {
|
|||
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
let name = format!("ntex-rt:worker:{}", id);
|
||||
let sys = System::current();
|
||||
let (arb_tx, arb_rx) = unbounded_channel();
|
||||
let (arb_tx, arb_rx) = unbounded();
|
||||
let arb_tx2 = arb_tx.clone();
|
||||
|
||||
let handle = thread::Builder::new()
|
||||
.name(name.clone())
|
||||
.spawn(move || {
|
||||
let rt = Runtime::new().expect("Cannot create Runtime");
|
||||
let rt = crate::create_runtime();
|
||||
let arb = Arbiter::with_sender(arb_tx);
|
||||
|
||||
let (stop, stop_rx) = channel();
|
||||
let (stop, stop_rx) = oneshot::oneshot();
|
||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||
|
||||
System::set_current(sys);
|
||||
|
||||
// start arbiter controller
|
||||
rt.spawn(ArbiterController {
|
||||
rt.spawn(Box::pin(ArbiterController {
|
||||
stop: Some(stop),
|
||||
rx: arb_rx,
|
||||
});
|
||||
}));
|
||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||
|
||||
// register arbiter
|
||||
let _ = System::current()
|
||||
.sys()
|
||||
.send(SystemCommand::RegisterArbiter(id, arb));
|
||||
.try_send(SystemCommand::RegisterArbiter(id, arb));
|
||||
|
||||
// run loop
|
||||
let _ = rt.block_on(stop_rx);
|
||||
rt.block_on(Box::pin(async move {
|
||||
let _ = stop_rx.await;
|
||||
}));
|
||||
|
||||
// unregister arbiter
|
||||
let _ = System::current()
|
||||
.sys()
|
||||
.send(SystemCommand::UnregisterArbiter(id));
|
||||
.try_send(SystemCommand::UnregisterArbiter(id));
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
|
||||
|
@ -132,21 +134,23 @@ impl Arbiter {
|
|||
where
|
||||
F: Future<Output = ()> + Send + Unpin + 'static,
|
||||
{
|
||||
let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future)));
|
||||
let _ = self
|
||||
.sender
|
||||
.try_send(ArbiterCommand::Execute(Box::new(future)));
|
||||
}
|
||||
|
||||
/// Send a function to the Arbiter's thread. This function will be executed asynchronously.
|
||||
/// A future is created, and when resolved will contain the result of the function sent
|
||||
/// to the Arbiters thread.
|
||||
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, RecvError>>
|
||||
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, oneshot::Closed>>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
R: Sync + Send + 'static,
|
||||
{
|
||||
let (tx, rx) = channel();
|
||||
let (mut tx, rx) = oneshot::oneshot();
|
||||
let _ = self
|
||||
.sender
|
||||
.send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||
.try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||
if !tx.is_closed() {
|
||||
let _ = tx.send(f());
|
||||
}
|
||||
|
@ -162,7 +166,7 @@ impl Arbiter {
|
|||
{
|
||||
let _ = self
|
||||
.sender
|
||||
.send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||
.try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||
f();
|
||||
})));
|
||||
}
|
||||
|
@ -215,7 +219,7 @@ impl Arbiter {
|
|||
})
|
||||
}
|
||||
|
||||
fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
|
||||
fn with_sender(sender: Sender<ArbiterCommand>) -> Self {
|
||||
Self {
|
||||
sender,
|
||||
thread_handle: None,
|
||||
|
@ -233,8 +237,8 @@ impl Arbiter {
|
|||
}
|
||||
|
||||
struct ArbiterController {
|
||||
stop: Option<Sender<i32>>,
|
||||
rx: UnboundedReceiver<ArbiterCommand>,
|
||||
stop: Option<oneshot::Sender<i32>>,
|
||||
rx: Receiver<ArbiterCommand>,
|
||||
}
|
||||
|
||||
impl Drop for ArbiterController {
|
||||
|
@ -255,17 +259,17 @@ impl Future for ArbiterController {
|
|||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
match Pin::new(&mut self.rx).poll_recv(cx) {
|
||||
match Pin::new(&mut self.rx).poll_next(cx) {
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Ready(Some(item)) => match item {
|
||||
ArbiterCommand::Stop => {
|
||||
if let Some(stop) = self.stop.take() {
|
||||
if let Some(mut stop) = self.stop.take() {
|
||||
let _ = stop.send(0);
|
||||
};
|
||||
return Poll::Ready(());
|
||||
}
|
||||
ArbiterCommand::Execute(fut) => {
|
||||
tokio::task::spawn_local(fut);
|
||||
tok_io::task::spawn(fut);
|
||||
}
|
||||
ArbiterCommand::ExecuteFn(f) => {
|
||||
f.call_box();
|
||||
|
@ -286,15 +290,15 @@ pub(super) enum SystemCommand {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SystemArbiter {
|
||||
stop: Option<Sender<i32>>,
|
||||
commands: UnboundedReceiver<SystemCommand>,
|
||||
stop: Option<oneshot::Sender<i32>>,
|
||||
commands: Receiver<SystemCommand>,
|
||||
arbiters: HashMap<usize, Arbiter>,
|
||||
}
|
||||
|
||||
impl SystemArbiter {
|
||||
pub(super) fn new(
|
||||
stop: Sender<i32>,
|
||||
commands: UnboundedReceiver<SystemCommand>,
|
||||
stop: oneshot::Sender<i32>,
|
||||
commands: Receiver<SystemCommand>,
|
||||
) -> Self {
|
||||
SystemArbiter {
|
||||
commands,
|
||||
|
@ -309,7 +313,7 @@ impl Future for SystemArbiter {
|
|||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
match Pin::new(&mut self.commands).poll_recv(cx) {
|
||||
match Pin::new(&mut self.commands).poll_next(cx) {
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Ready(Some(cmd)) => match cmd {
|
||||
SystemCommand::Exit(code) => {
|
||||
|
@ -318,7 +322,7 @@ impl Future for SystemArbiter {
|
|||
arb.stop();
|
||||
}
|
||||
// stop event loop
|
||||
if let Some(stop) = self.stop.take() {
|
||||
if let Some(mut stop) = self.stop.take() {
|
||||
let _ = stop.send(code);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
use std::{borrow::Cow, future::Future, io};
|
||||
use std::{cell::RefCell, future::Future, io, rc::Rc};
|
||||
|
||||
use async_channel::unbounded;
|
||||
use async_oneshot as oneshot;
|
||||
use ntex_util::future::lazy;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
use tokio::sync::oneshot::{channel, Receiver};
|
||||
use tokio::task::LocalSet;
|
||||
|
||||
use super::arbiter::{Arbiter, SystemArbiter};
|
||||
use super::runtime::Runtime;
|
||||
use super::system::System;
|
||||
use crate::arbiter::{Arbiter, SystemArbiter};
|
||||
use crate::{create_runtime, Runtime, System};
|
||||
|
||||
/// Builder struct for a ntex runtime.
|
||||
///
|
||||
|
@ -16,8 +14,7 @@ use super::system::System;
|
|||
/// run a function in its context.
|
||||
pub struct Builder {
|
||||
/// Name of the System. Defaults to "ntex" if unset.
|
||||
name: Cow<'static, str>,
|
||||
|
||||
name: String,
|
||||
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
|
||||
stop_on_panic: bool,
|
||||
}
|
||||
|
@ -25,14 +22,14 @@ pub struct Builder {
|
|||
impl Builder {
|
||||
pub(super) fn new() -> Self {
|
||||
Builder {
|
||||
name: Cow::Borrowed("ntex"),
|
||||
name: "ntex".into(),
|
||||
stop_on_panic: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the name of the System.
|
||||
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
|
||||
self.name = Cow::Owned(name.into());
|
||||
pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
|
||||
self.name = name.as_ref().into();
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -52,15 +49,6 @@ impl Builder {
|
|||
self.create_runtime(|| {})
|
||||
}
|
||||
|
||||
/// Create new System that can run asynchronously.
|
||||
/// This method could be used to run ntex system in existing tokio
|
||||
/// runtime.
|
||||
///
|
||||
/// This method panics if it cannot start the system arbiter
|
||||
pub fn finish_with(self, local: &LocalSet) -> AsyncSystemRunner {
|
||||
self.create_async_runtime(local)
|
||||
}
|
||||
|
||||
/// This function will start tokio runtime and will finish once the
|
||||
/// `System::stop()` message get called.
|
||||
/// Function `f` get called within tokio runtime context.
|
||||
|
@ -71,92 +59,34 @@ impl Builder {
|
|||
self.create_runtime(f).run()
|
||||
}
|
||||
|
||||
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
|
||||
let (stop_tx, stop) = channel();
|
||||
let (sys_sender, sys_receiver) = unbounded_channel();
|
||||
|
||||
let _system = System::construct(
|
||||
sys_sender,
|
||||
Arbiter::new_system(local),
|
||||
self.stop_on_panic,
|
||||
);
|
||||
|
||||
// system arbiter
|
||||
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
||||
|
||||
// start the system arbiter
|
||||
let _ = local.spawn_local(arb);
|
||||
|
||||
AsyncSystemRunner { stop, _system }
|
||||
}
|
||||
|
||||
fn create_runtime<F>(self, f: F) -> SystemRunner
|
||||
where
|
||||
F: FnOnce() + 'static,
|
||||
{
|
||||
let (stop_tx, stop) = channel();
|
||||
let (sys_sender, sys_receiver) = unbounded_channel();
|
||||
let (stop_tx, stop) = oneshot::oneshot();
|
||||
let (sys_sender, sys_receiver) = unbounded();
|
||||
|
||||
let rt = Runtime::new().unwrap();
|
||||
|
||||
// set ntex-util spawn fn
|
||||
ntex_util::set_spawn_fn(|fut| {
|
||||
tokio::task::spawn_local(fut);
|
||||
});
|
||||
let rt = create_runtime();
|
||||
|
||||
// system arbiter
|
||||
let _system = System::construct(
|
||||
sys_sender,
|
||||
Arbiter::new_system(rt.local()),
|
||||
self.stop_on_panic,
|
||||
);
|
||||
let _system =
|
||||
System::construct(sys_sender, Arbiter::new_system(&rt), self.stop_on_panic);
|
||||
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
||||
rt.spawn(arb);
|
||||
rt.spawn(Box::pin(arb));
|
||||
|
||||
// init system arbiter and run configuration method
|
||||
rt.block_on(lazy(move |_| f()));
|
||||
let runner = SystemRunner { rt, stop, _system };
|
||||
runner.block_on(lazy(move |_| f()));
|
||||
|
||||
SystemRunner { rt, stop, _system }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AsyncSystemRunner {
|
||||
stop: Receiver<i32>,
|
||||
_system: System,
|
||||
}
|
||||
|
||||
impl AsyncSystemRunner {
|
||||
/// This function will start event loop and returns a future that
|
||||
/// resolves once the `System::stop()` function is called.
|
||||
pub fn run(self) -> impl Future<Output = Result<(), io::Error>> + Send {
|
||||
let AsyncSystemRunner { stop, .. } = self;
|
||||
|
||||
// run loop
|
||||
async move {
|
||||
match stop.await {
|
||||
Ok(code) => {
|
||||
if code != 0 {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("Non-zero exit code: {}", code),
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||
}
|
||||
}
|
||||
runner
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper object that runs System's event loop
|
||||
#[must_use = "SystemRunner must be run"]
|
||||
#[derive(Debug)]
|
||||
pub struct SystemRunner {
|
||||
rt: Runtime,
|
||||
stop: Receiver<i32>,
|
||||
rt: Box<dyn Runtime>,
|
||||
stop: oneshot::Receiver<i32>,
|
||||
_system: System,
|
||||
}
|
||||
|
||||
|
@ -167,7 +97,7 @@ impl SystemRunner {
|
|||
let SystemRunner { rt, stop, .. } = self;
|
||||
|
||||
// run loop
|
||||
match rt.block_on(stop) {
|
||||
match block_on(&rt, stop).take() {
|
||||
Ok(code) => {
|
||||
if code != 0 {
|
||||
Err(io::Error::new(
|
||||
|
@ -178,29 +108,55 @@ impl SystemRunner {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a future and wait for result.
|
||||
#[inline]
|
||||
pub fn block_on<F, O>(&mut self, fut: F) -> O
|
||||
pub fn block_on<F, R>(&self, fut: F) -> R
|
||||
where
|
||||
F: Future<Output = O>,
|
||||
F: Future<Output = R> + 'static,
|
||||
R: 'static,
|
||||
{
|
||||
self.rt.block_on(fut)
|
||||
block_on(&self.rt, fut).take()
|
||||
}
|
||||
|
||||
/// Execute a function with enabled executor.
|
||||
#[inline]
|
||||
pub fn exec<F, R>(&mut self, f: F) -> R
|
||||
pub fn exec<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce() -> R,
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: 'static,
|
||||
{
|
||||
self.rt.block_on(lazy(|_| f()))
|
||||
self.block_on(lazy(|_| f()))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlockResult<T>(Rc<RefCell<Option<T>>>);
|
||||
|
||||
impl<T> BlockResult<T> {
|
||||
pub fn take(self) -> T {
|
||||
self.0.borrow_mut().take().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(clippy::borrowed_box)]
|
||||
fn block_on<F, R>(rt: &Box<dyn Runtime>, fut: F) -> BlockResult<R>
|
||||
where
|
||||
F: Future<Output = R> + 'static,
|
||||
R: 'static,
|
||||
{
|
||||
let result = Rc::new(RefCell::new(None));
|
||||
let result_inner = result.clone();
|
||||
rt.block_on(Box::pin(async move {
|
||||
let r = fut.await;
|
||||
*result_inner.borrow_mut() = Some(r);
|
||||
}));
|
||||
BlockResult(result)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::mpsc;
|
||||
|
@ -213,19 +169,12 @@ mod tests {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.build()
|
||||
.unwrap();
|
||||
let local = tokio::task::LocalSet::new();
|
||||
|
||||
let runner = crate::System::build()
|
||||
.stop_on_panic(true)
|
||||
.finish_with(&local);
|
||||
let runner = crate::System::build().stop_on_panic(true).finish();
|
||||
|
||||
tx.send(System::current()).unwrap();
|
||||
let _ = rt.block_on(local.run_until(runner.run()));
|
||||
let _ = runner.run();
|
||||
});
|
||||
let mut s = System::new("test");
|
||||
let s = System::new("test");
|
||||
|
||||
let sys = rx.recv().unwrap();
|
||||
let id = sys.id();
|
||||
|
|
|
@ -1,82 +1,37 @@
|
|||
//! A runtime implementation that runs everything on the current thread.
|
||||
use ntex_util::future::lazy;
|
||||
use std::future::Future;
|
||||
use std::{future::Future, pin::Pin};
|
||||
|
||||
mod arbiter;
|
||||
mod builder;
|
||||
mod runtime;
|
||||
mod system;
|
||||
|
||||
mod time;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub mod time_driver {
|
||||
pub use super::time::*;
|
||||
}
|
||||
|
||||
pub use self::arbiter::Arbiter;
|
||||
pub use self::builder::{Builder, SystemRunner};
|
||||
pub use self::runtime::Runtime;
|
||||
pub use self::system::System;
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(f: F) -> self::task::JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
tokio::task::spawn_local(f)
|
||||
#[cfg(feature = "tokio")]
|
||||
mod tokio;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use self::tokio::*;
|
||||
|
||||
pub trait Runtime {
|
||||
/// Spawn a future onto the single-threaded runtime.
|
||||
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()>>>);
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
fn block_on(&self, f: Pin<Box<dyn Future<Output = ()>>>);
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> tokio::task::JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
tokio::task::spawn_local(async move {
|
||||
let r = lazy(|_| f()).await;
|
||||
r.await
|
||||
})
|
||||
}
|
||||
|
||||
/// Asynchronous signal handling
|
||||
pub mod signal {
|
||||
#[cfg(unix)]
|
||||
pub mod unix {
|
||||
pub use tokio::signal::unix::*;
|
||||
}
|
||||
pub use tokio::signal::ctrl_c;
|
||||
}
|
||||
|
||||
/// TCP/UDP/Unix bindings
|
||||
pub mod net {
|
||||
pub use tokio::net::UdpSocket;
|
||||
pub use tokio::net::{TcpListener, TcpStream};
|
||||
|
||||
#[cfg(unix)]
|
||||
pub mod unix {
|
||||
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub use self::unix::*;
|
||||
}
|
||||
|
||||
/// Task management.
|
||||
pub mod task {
|
||||
pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
|
||||
/// Different types of process signals
|
||||
#[derive(PartialEq, Clone, Copy, Debug)]
|
||||
pub enum Signal {
|
||||
/// SIGHUP
|
||||
Hup,
|
||||
/// SIGINT
|
||||
Int,
|
||||
/// SIGTERM
|
||||
Term,
|
||||
/// SIGQUIT
|
||||
Quit,
|
||||
}
|
||||
|
|
|
@ -1,99 +0,0 @@
|
|||
use std::future::Future;
|
||||
use std::io;
|
||||
use tokio::{runtime, task::LocalSet};
|
||||
|
||||
/// Single-threaded runtime provides a way to start reactor
|
||||
/// and runtime on the current thread.
|
||||
///
|
||||
/// See [module level][mod] documentation for more details.
|
||||
///
|
||||
/// [mod]: index.html
|
||||
#[derive(Debug)]
|
||||
pub struct Runtime {
|
||||
local: LocalSet,
|
||||
rt: runtime::Runtime,
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
/// Returns a new runtime initialized with default configuration values.
|
||||
pub fn new() -> io::Result<Runtime> {
|
||||
let rt = runtime::Builder::new_current_thread()
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.build()?;
|
||||
|
||||
Ok(Runtime {
|
||||
rt,
|
||||
local: LocalSet::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn local(&self) -> &LocalSet {
|
||||
&self.local
|
||||
}
|
||||
|
||||
/// Spawn a future onto the single-threaded runtime.
|
||||
///
|
||||
/// See [module level][mod] documentation for more details.
|
||||
///
|
||||
/// [mod]: index.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// # use futures::{future, Future, Stream};
|
||||
/// use ntex_rt::Runtime;
|
||||
///
|
||||
/// # fn dox() {
|
||||
/// // Create the runtime
|
||||
/// let mut rt = Runtime::new().unwrap();
|
||||
///
|
||||
/// // Spawn a future onto the runtime
|
||||
/// rt.spawn(future::lazy(|_| {
|
||||
/// println!("running on the runtime");
|
||||
/// }));
|
||||
/// # }
|
||||
/// # pub fn main() {}
|
||||
/// ```
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if the spawn fails. Failure occurs if the executor
|
||||
/// is currently at capacity and is unable to spawn a new future.
|
||||
pub fn spawn<F>(&self, future: F) -> &Self
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
self.local.spawn_local(future);
|
||||
self
|
||||
}
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
///
|
||||
/// This function can be used to synchronously block the current thread
|
||||
/// until the provided `future` has resolved either successfully or with an
|
||||
/// error. The result of the future is then returned from this function
|
||||
/// call.
|
||||
///
|
||||
/// Note that this function will **also** execute any spawned futures on the
|
||||
/// current thread, but will **not** block until these other spawned futures
|
||||
/// have completed. Once the function returns, any uncompleted futures
|
||||
/// remain pending in the `Runtime` instance. These futures will not run
|
||||
/// until `block_on` or `run` is called again.
|
||||
///
|
||||
/// The caller is responsible for ensuring that other spawned futures
|
||||
/// complete execution by calling `block_on` or `run`.
|
||||
pub fn block_on<F>(&self, f: F) -> F::Output
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
// set ntex-util spawn fn
|
||||
ntex_util::set_spawn_fn(|fut| {
|
||||
crate::spawn(fut);
|
||||
});
|
||||
|
||||
self.local.block_on(&self.rt, f)
|
||||
}
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::{cell::RefCell, io};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use async_channel::Sender;
|
||||
use std::{cell::RefCell, io, sync::atomic::AtomicUsize, sync::atomic::Ordering};
|
||||
|
||||
use super::arbiter::{Arbiter, SystemCommand};
|
||||
use super::builder::{Builder, SystemRunner};
|
||||
|
@ -11,7 +10,7 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
|||
#[derive(Clone, Debug)]
|
||||
pub struct System {
|
||||
id: usize,
|
||||
sys: UnboundedSender<SystemCommand>,
|
||||
sys: Sender<SystemCommand>,
|
||||
arbiter: Arbiter,
|
||||
stop_on_panic: bool,
|
||||
}
|
||||
|
@ -23,7 +22,7 @@ thread_local!(
|
|||
impl System {
|
||||
/// Constructs new system and sets it as current
|
||||
pub(super) fn construct(
|
||||
sys: UnboundedSender<SystemCommand>,
|
||||
sys: Sender<SystemCommand>,
|
||||
arbiter: Arbiter,
|
||||
stop_on_panic: bool,
|
||||
) -> Self {
|
||||
|
@ -49,7 +48,7 @@ impl System {
|
|||
/// Create new system.
|
||||
///
|
||||
/// This method panics if it can not create tokio runtime
|
||||
pub fn new<T: Into<String>>(name: T) -> SystemRunner {
|
||||
pub fn new(name: &str) -> SystemRunner {
|
||||
Self::build().name(name).finish()
|
||||
}
|
||||
|
||||
|
@ -81,10 +80,10 @@ impl System {
|
|||
|
||||
/// Stop the system with a particular exit code.
|
||||
pub fn stop_with_code(&self, code: i32) {
|
||||
let _ = self.sys.send(SystemCommand::Exit(code));
|
||||
let _ = self.sys.try_send(SystemCommand::Exit(code));
|
||||
}
|
||||
|
||||
pub(super) fn sys(&self) -> &UnboundedSender<SystemCommand> {
|
||||
pub(super) fn sys(&self) -> &Sender<SystemCommand> {
|
||||
&self.sys
|
||||
}
|
||||
|
||||
|
|
|
@ -1,88 +0,0 @@
|
|||
/// Utilities for tracking time.
|
||||
use std::{future::Future, pin::Pin, task, task::Poll, time::Duration, time::Instant};
|
||||
use tokio::time;
|
||||
|
||||
pub use tokio::time::{interval, Interval};
|
||||
pub use tokio::time::{timeout, Timeout};
|
||||
|
||||
/// Waits until `deadline` is reached.
|
||||
///
|
||||
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
|
||||
/// operates at millisecond granularity and should not be used for tasks that
|
||||
/// require high-resolution timers.
|
||||
///
|
||||
/// # Cancellation
|
||||
///
|
||||
/// Canceling a sleep instance is done by dropping the returned future. No additional
|
||||
/// cleanup work is required.
|
||||
#[inline]
|
||||
pub fn sleep_until(deadline: Instant) -> Sleep {
|
||||
Sleep {
|
||||
inner: time::sleep_until(deadline.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits until `duration` has elapsed.
|
||||
///
|
||||
/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
|
||||
/// analog to `std::thread::sleep`.
|
||||
pub fn sleep(duration: Duration) -> Sleep {
|
||||
Sleep {
|
||||
inner: time::sleep(duration),
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
/// Creates new [`Interval`] that yields with interval of `period` with the
|
||||
/// first tick completing at `start`. The default `MissedTickBehavior` is
|
||||
/// `Burst`, but this can be configured
|
||||
/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
|
||||
#[inline]
|
||||
pub fn interval_at(start: Instant, period: Duration) -> Interval {
|
||||
time::interval_at(start.into(), period)
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
/// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
|
||||
#[derive(Debug)]
|
||||
pub struct Sleep {
|
||||
#[pin]
|
||||
inner: time::Sleep,
|
||||
}
|
||||
}
|
||||
|
||||
impl Sleep {
|
||||
/// Returns the instant at which the future will complete.
|
||||
#[inline]
|
||||
pub fn deadline(&self) -> Instant {
|
||||
self.inner.deadline().into_std()
|
||||
}
|
||||
|
||||
/// Returns `true` if `Sleep` has elapsed.
|
||||
///
|
||||
/// A `Sleep` instance is elapsed when the requested duration has elapsed.
|
||||
#[inline]
|
||||
pub fn is_elapsed(&self) -> bool {
|
||||
self.inner.is_elapsed()
|
||||
}
|
||||
|
||||
/// Resets the `Sleep` instance to a new deadline.
|
||||
///
|
||||
/// Calling this function allows changing the instant at which the `Sleep`
|
||||
/// future completes without having to create new associated state.
|
||||
///
|
||||
/// This function can be called both before and after the future has
|
||||
/// completed.
|
||||
#[inline]
|
||||
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
|
||||
self.project().inner.reset(deadline.into());
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Sleep {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
self.project().inner.poll(cx)
|
||||
}
|
||||
}
|
239
ntex-rt/src/tokio.rs
Normal file
239
ntex-rt/src/tokio.rs
Normal file
|
@ -0,0 +1,239 @@
|
|||
use std::future::Future;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{cell::RefCell, io, mem, net, net::SocketAddr, path::Path, pin::Pin, rc::Rc};
|
||||
|
||||
use async_oneshot as oneshot;
|
||||
use ntex_bytes::PoolRef;
|
||||
use ntex_io::Io;
|
||||
use ntex_util::future::lazy;
|
||||
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
|
||||
use tok_io::{runtime, task::LocalSet};
|
||||
|
||||
use crate::{Runtime, Signal};
|
||||
|
||||
/// Create new single-threaded tokio runtime.
|
||||
pub fn create_runtime() -> Box<dyn Runtime> {
|
||||
Box::new(TokioRuntime::new().unwrap())
|
||||
}
|
||||
|
||||
/// Opens a TCP connection to a remote host.
|
||||
pub fn tcp_connect(
|
||||
addr: SocketAddr,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Io, io::Error>>>> {
|
||||
Box::pin(async move {
|
||||
let sock = tok_io::net::TcpStream::connect(addr).await?;
|
||||
sock.set_nodelay(true)?;
|
||||
Ok(Io::new(sock))
|
||||
})
|
||||
}
|
||||
|
||||
/// Opens a TCP connection to a remote host and use specified memory pool.
|
||||
pub fn tcp_connect_in(
|
||||
addr: SocketAddr,
|
||||
pool: PoolRef,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Io, io::Error>>>> {
|
||||
Box::pin(async move {
|
||||
let sock = tok_io::net::TcpStream::connect(addr).await?;
|
||||
sock.set_nodelay(true)?;
|
||||
Ok(Io::with_memory_pool(sock, pool))
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
/// Opens a unix stream connection.
|
||||
pub fn unix_connect<P>(addr: P) -> Pin<Box<dyn Future<Output = Result<Io, io::Error>>>>
|
||||
where
|
||||
P: AsRef<Path> + 'static,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let sock = tok_io::net::UnixStream::connect(addr).await?;
|
||||
Ok(Io::new(sock))
|
||||
})
|
||||
}
|
||||
|
||||
/// Convert std TcpStream to tokio's TcpStream
|
||||
pub fn from_tcp_stream(stream: net::TcpStream) -> Result<Io, io::Error> {
|
||||
stream.set_nonblocking(true)?;
|
||||
stream.set_nodelay(true)?;
|
||||
Ok(Io::new(tok_io::net::TcpStream::from_std(stream)?))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
/// Convert std UnixStream to tokio's UnixStream
|
||||
pub fn from_unix_stream(
|
||||
stream: std::os::unix::net::UnixStream,
|
||||
) -> Result<Io, io::Error> {
|
||||
stream.set_nonblocking(true)?;
|
||||
Ok(Io::new(tok_io::net::UnixStream::from_std(stream)?))
|
||||
}
|
||||
|
||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
|
||||
where
|
||||
F: Future + 'static,
|
||||
{
|
||||
tok_io::task::spawn_local(f)
|
||||
}
|
||||
|
||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if ntex system is not running.
|
||||
#[inline]
|
||||
pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future + 'static,
|
||||
{
|
||||
spawn(async move {
|
||||
let r = lazy(|_| f()).await;
|
||||
r.await
|
||||
})
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static SRUN: RefCell<bool> = RefCell::new(false);
|
||||
static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default();
|
||||
}
|
||||
|
||||
/// Register signal handler.
|
||||
///
|
||||
/// Signals are handled by oneshots, you have to re-register
|
||||
/// after each signal.
|
||||
pub fn signal() -> Option<oneshot::Receiver<Signal>> {
|
||||
if !SRUN.with(|v| *v.borrow()) {
|
||||
spawn(Signals::new());
|
||||
}
|
||||
SHANDLERS.with(|handlers| {
|
||||
let (tx, rx) = oneshot::oneshot();
|
||||
handlers.borrow_mut().push(tx);
|
||||
Some(rx)
|
||||
})
|
||||
}
|
||||
|
||||
/// Single-threaded tokio runtime.
|
||||
#[derive(Debug)]
|
||||
struct TokioRuntime {
|
||||
local: LocalSet,
|
||||
rt: runtime::Runtime,
|
||||
}
|
||||
impl TokioRuntime {
|
||||
/// Returns a new runtime initialized with default configuration values.
|
||||
fn new() -> io::Result<Self> {
|
||||
let rt = runtime::Builder::new_current_thread().enable_io().build()?;
|
||||
|
||||
Ok(Self {
|
||||
rt,
|
||||
local: LocalSet::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Runtime for TokioRuntime {
|
||||
/// Spawn a future onto the single-threaded runtime.
|
||||
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()>>>) {
|
||||
self.local.spawn_local(future);
|
||||
}
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future
|
||||
/// completes.
|
||||
fn block_on(&self, f: Pin<Box<dyn Future<Output = ()>>>) {
|
||||
// set ntex-util spawn fn
|
||||
ntex_util::set_spawn_fn(|fut| {
|
||||
tok_io::task::spawn_local(fut);
|
||||
});
|
||||
|
||||
self.local.block_on(&self.rt, f);
|
||||
}
|
||||
}
|
||||
|
||||
struct Signals {
|
||||
#[cfg(not(unix))]
|
||||
signal: Pin<Box<dyn Future<Output = io::Result<()>>>>,
|
||||
#[cfg(unix)]
|
||||
signals: Vec<(Signal, tok_io::signal::unix::Signal)>,
|
||||
}
|
||||
|
||||
impl Signals {
|
||||
pub(super) fn new() -> Signals {
|
||||
SRUN.with(|h| *h.borrow_mut() = true);
|
||||
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
Signals {
|
||||
signal: Box::pin(tok_io::signal::ctrl_c()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use tok_io::signal::unix;
|
||||
|
||||
let sig_map = [
|
||||
(unix::SignalKind::interrupt(), Signal::Int),
|
||||
(unix::SignalKind::hangup(), Signal::Hup),
|
||||
(unix::SignalKind::terminate(), Signal::Term),
|
||||
(unix::SignalKind::quit(), Signal::Quit),
|
||||
];
|
||||
|
||||
let mut signals = Vec::new();
|
||||
for (kind, sig) in sig_map.iter() {
|
||||
match unix::signal(*kind) {
|
||||
Ok(stream) => signals.push((*sig, stream)),
|
||||
Err(e) => log::error!(
|
||||
"Cannot initialize stream handler for {:?} err: {}",
|
||||
sig,
|
||||
e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
Signals { signals }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Signals {
|
||||
fn drop(&mut self) {
|
||||
SRUN.with(|h| *h.borrow_mut() = false);
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Signals {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
if self.signal.as_mut().poll(cx).is_ready() {
|
||||
let handlers = SHANDLERS.with(|h| mem::take(&mut *h.borrow_mut()));
|
||||
for mut sender in handlers {
|
||||
let _ = sender.send(Signal::Int);
|
||||
}
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
#[cfg(unix)]
|
||||
{
|
||||
for (sig, fut) in self.signals.iter_mut() {
|
||||
if Pin::new(fut).poll_recv(cx).is_ready() {
|
||||
let handlers = SHANDLERS.with(|h| mem::take(&mut *h.borrow_mut()));
|
||||
for mut sender in handlers {
|
||||
let _ = sender.send(*sig);
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,6 +26,6 @@ pin-project-lite = "0.2.6"
|
|||
|
||||
[dev-dependencies]
|
||||
ntex = "0.5.0-b.0"
|
||||
ntex-rt = "0.3.2"
|
||||
ntex-rt = "0.4.0-b.0"
|
||||
ntex-macros = "0.1.3"
|
||||
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
* Move ntex::time to ntex-util crate
|
||||
|
||||
* Replace mio with poller for accept loop
|
||||
* Replace mio with polling for accept loop
|
||||
|
||||
## [0.4.13] - 2021-12-07
|
||||
|
||||
|
|
|
@ -43,16 +43,17 @@ http-framework = ["h2", "http", "httparse",
|
|||
"httpdate", "encoding_rs", "mime", "percent-encoding", "serde_json", "serde_urlencoded"]
|
||||
|
||||
[dependencies]
|
||||
ntex-codec = "0.5.1"
|
||||
ntex-rt = "0.3.2"
|
||||
ntex-codec = "0.6.0"
|
||||
ntex-router = "0.5.1"
|
||||
ntex-service = "0.2.1"
|
||||
ntex-macros = "0.1.3"
|
||||
ntex-util = "0.1.2"
|
||||
ntex-bytes = "0.1.7"
|
||||
ntex-io = { version = "0.1", features = ["tokio"] }
|
||||
ntex-io = "0.1"
|
||||
ntex-tls = "0.1"
|
||||
|
||||
ntex-rt = { version = "0.4.0-b.0", default-features = false, features = ["tokio"] }
|
||||
|
||||
base64 = "0.13"
|
||||
bitflags = "1.3"
|
||||
derive_more = "0.99.14"
|
||||
|
|
|
@ -41,7 +41,7 @@ impl<T: Address> Resolver<T> {
|
|||
format!("{}:{}", req.host(), req.port())
|
||||
};
|
||||
|
||||
let fut = crate::rt::task::spawn_blocking(move || {
|
||||
let fut = crate::rt::spawn_blocking(move || {
|
||||
net::ToSocketAddrs::to_socket_addrs(&host)
|
||||
});
|
||||
|
||||
|
|
|
@ -141,7 +141,7 @@ mod tests {
|
|||
let factory = Connector::new(config).clone();
|
||||
|
||||
let srv = factory.new_service(()).await.unwrap();
|
||||
let result = srv
|
||||
let _result = srv
|
||||
.call(Connect::new("www.rust-lang.org").set_addr(Some(server.addr())))
|
||||
.await;
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
use std::task::{Context, Poll};
|
||||
use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin};
|
||||
|
||||
use crate::io::Io;
|
||||
use crate::rt::net::TcpStream;
|
||||
use crate::io::{types, Io};
|
||||
use crate::rt::tcp_connect_in;
|
||||
use crate::service::{Service, ServiceFactory};
|
||||
use crate::util::{Either, PoolId, PoolRef, Ready};
|
||||
|
||||
|
@ -128,7 +128,7 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
|
|||
|
||||
if let Some(addr) = addr {
|
||||
self.state = ConnectState::Connect(TcpConnectorResponse::new(
|
||||
req, port, addr,
|
||||
req, port, addr, self.pool,
|
||||
));
|
||||
self.poll(cx)
|
||||
} else if let Some(addr) = req.addr() {
|
||||
|
@ -136,6 +136,7 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
|
|||
req,
|
||||
addr.port(),
|
||||
Either::Left(addr),
|
||||
self.pool,
|
||||
));
|
||||
self.poll(cx)
|
||||
} else {
|
||||
|
@ -144,12 +145,7 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
|
|||
}
|
||||
}
|
||||
},
|
||||
ConnectState::Connect(ref mut fut) => match Pin::new(fut).poll(cx)? {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(stream) => {
|
||||
Poll::Ready(Ok(Io::with_memory_pool(stream, self.pool)))
|
||||
}
|
||||
},
|
||||
ConnectState::Connect(ref mut fut) => Pin::new(fut).poll(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -159,7 +155,8 @@ struct TcpConnectorResponse<T> {
|
|||
req: Option<T>,
|
||||
port: u16,
|
||||
addrs: Option<VecDeque<SocketAddr>>,
|
||||
stream: Option<Pin<Box<dyn Future<Output = Result<TcpStream, io::Error>>>>>,
|
||||
stream: Option<Pin<Box<dyn Future<Output = Result<Io, io::Error>>>>>,
|
||||
pool: PoolRef,
|
||||
}
|
||||
|
||||
impl<T: Address> TcpConnectorResponse<T> {
|
||||
|
@ -167,6 +164,7 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||
req: T,
|
||||
port: u16,
|
||||
addr: Either<SocketAddr, VecDeque<SocketAddr>>,
|
||||
pool: PoolRef,
|
||||
) -> TcpConnectorResponse<T> {
|
||||
trace!(
|
||||
"TCP connector - connecting to {:?} port:{}",
|
||||
|
@ -177,13 +175,15 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||
match addr {
|
||||
Either::Left(addr) => TcpConnectorResponse {
|
||||
req: Some(req),
|
||||
port,
|
||||
addrs: None,
|
||||
stream: Some(Box::pin(TcpStream::connect(addr))),
|
||||
stream: Some(tcp_connect_in(addr, pool)),
|
||||
pool,
|
||||
port,
|
||||
},
|
||||
Either::Right(addrs) => TcpConnectorResponse {
|
||||
req: Some(req),
|
||||
port,
|
||||
pool,
|
||||
req: Some(req),
|
||||
addrs: Some(addrs),
|
||||
stream: None,
|
||||
},
|
||||
|
@ -202,7 +202,7 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||
}
|
||||
|
||||
impl<T: Address> Future for TcpConnectorResponse<T> {
|
||||
type Output = Result<TcpStream, ConnectError>;
|
||||
type Output = Result<Io, ConnectError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
@ -212,16 +212,11 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
|||
if let Some(new) = this.stream.as_mut() {
|
||||
match new.as_mut().poll(cx) {
|
||||
Poll::Ready(Ok(sock)) => {
|
||||
if let Err(err) = sock.set_nodelay(true) {
|
||||
if !this.can_continue(&err) {
|
||||
return Poll::Ready(Err(err.into()));
|
||||
}
|
||||
}
|
||||
|
||||
let req = this.req.take().unwrap();
|
||||
trace!(
|
||||
"TCP connector - successfully connected to connecting to {:?} - {:?}",
|
||||
req.host(), sock.peer_addr()
|
||||
req.host(),
|
||||
sock.query::<types::PeerAddr>().get()
|
||||
);
|
||||
return Poll::Ready(Ok(sock));
|
||||
}
|
||||
|
@ -236,7 +231,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
|||
|
||||
// try to connect
|
||||
let addr = this.addrs.as_mut().unwrap().pop_front().unwrap();
|
||||
this.stream = Some(Box::pin(TcpStream::connect(addr)));
|
||||
this.stream = Some(tcp_connect_in(addr, this.pool));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use flate2::write::{GzDecoder, ZlibDecoder};
|
|||
use super::Writer;
|
||||
use crate::http::error::PayloadError;
|
||||
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
|
||||
use crate::rt::task::{spawn_blocking, JoinHandle};
|
||||
use crate::rt::{spawn_blocking, JoinHandle};
|
||||
use crate::{util::Bytes, Stream};
|
||||
|
||||
const INPLACE: usize = 2049;
|
||||
|
|
|
@ -7,7 +7,7 @@ use flate2::write::{GzEncoder, ZlibEncoder};
|
|||
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
|
||||
use crate::http::header::{ContentEncoding, HeaderValue, CONTENT_ENCODING};
|
||||
use crate::http::{ResponseHead, StatusCode};
|
||||
use crate::rt::task::{spawn_blocking, JoinHandle};
|
||||
use crate::rt::{spawn_blocking, JoinHandle};
|
||||
use crate::util::Bytes;
|
||||
|
||||
use super::Writer;
|
||||
|
|
|
@ -9,7 +9,6 @@ pub use http::Error as HttpError;
|
|||
|
||||
use crate::http::body::Body;
|
||||
use crate::http::response::Response;
|
||||
use crate::rt::task::JoinError;
|
||||
use crate::util::{BytesMut, Either};
|
||||
|
||||
/// Error that can be converted to `Response`
|
||||
|
@ -244,8 +243,8 @@ pub enum BlockingError<E: fmt::Debug> {
|
|||
|
||||
impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
|
||||
|
||||
impl From<JoinError> for PayloadError {
|
||||
fn from(_: JoinError) -> Self {
|
||||
impl From<crate::rt::JoinError> for PayloadError {
|
||||
fn from(_: crate::rt::JoinError) -> Self {
|
||||
PayloadError::Io(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Operation is canceled",
|
||||
|
|
|
@ -214,7 +214,7 @@ pub fn server<F: StreamServiceFactory>(factory: F) -> TestServer {
|
|||
|
||||
// run server in separate thread
|
||||
thread::spawn(move || {
|
||||
let mut sys = System::new("test-server");
|
||||
let sys = System::new("test-server");
|
||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let local_addr = tcp.local_addr().unwrap();
|
||||
|
||||
|
|
|
@ -8,13 +8,12 @@ use futures_core::Stream;
|
|||
use log::{error, info};
|
||||
use socket2::{Domain, SockAddr, Socket, Type};
|
||||
|
||||
use crate::rt::{spawn, System};
|
||||
use crate::rt::{spawn, Signal, System};
|
||||
use crate::{time::sleep, time::Millis, util::join_all, util::PoolId};
|
||||
|
||||
use super::accept::{AcceptLoop, AcceptNotify, Command};
|
||||
use super::config::{ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime};
|
||||
use super::service::{Factory, InternalServiceFactory, StreamServiceFactory};
|
||||
use super::signals::{Signal, Signals};
|
||||
use super::socket::Listener;
|
||||
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
|
||||
use super::{Server, ServerCommand, ServerStatus, Token};
|
||||
|
@ -332,7 +331,7 @@ impl ServerBuilder {
|
|||
|
||||
// handle signals
|
||||
if !self.no_signals {
|
||||
spawn(Signals::new(self.server.clone()));
|
||||
spawn(signals(self.server.clone()));
|
||||
}
|
||||
|
||||
// start http server actor
|
||||
|
@ -489,6 +488,21 @@ impl Future for ServerBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
async fn signals(srv: Server) {
|
||||
loop {
|
||||
if let Some(rx) = crate::rt::signal() {
|
||||
if let Ok(sig) = rx.await {
|
||||
srv.signal(sig);
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
log::info!("Signals are not supported by current runtime");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn bind_addr<S: net::ToSocketAddrs>(
|
||||
addr: S,
|
||||
backlog: i32,
|
||||
|
@ -543,51 +557,6 @@ pub(crate) fn create_tcp_listener(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::server::{signals, Server, TestServer};
|
||||
use crate::service::fn_service;
|
||||
|
||||
#[cfg(unix)]
|
||||
#[crate::rt_test]
|
||||
async fn test_signals() {
|
||||
use std::{net, sync::mpsc, thread};
|
||||
|
||||
fn start(tx: mpsc::Sender<(Server, net::SocketAddr)>) -> thread::JoinHandle<()> {
|
||||
thread::spawn(move || {
|
||||
let mut sys = crate::rt::System::new("test");
|
||||
let addr = TestServer::unused_addr();
|
||||
let srv = sys.exec(|| {
|
||||
crate::server::build()
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
.bind("test", addr, move || {
|
||||
fn_service(|_| async { Ok::<_, ()>(()) })
|
||||
})
|
||||
.unwrap()
|
||||
.start()
|
||||
});
|
||||
let _ = tx.send((srv, addr));
|
||||
let _ = sys.run();
|
||||
})
|
||||
}
|
||||
|
||||
for sig in &[
|
||||
signals::Signal::Int,
|
||||
signals::Signal::Term,
|
||||
signals::Signal::Quit,
|
||||
] {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let h = start(tx);
|
||||
let (srv, addr) = rx.recv().unwrap();
|
||||
|
||||
crate::time::sleep(Millis(300)).await;
|
||||
assert!(net::TcpStream::connect(addr).is_ok());
|
||||
|
||||
srv.signal(*sig);
|
||||
crate::time::sleep(Millis(300)).await;
|
||||
assert!(net::TcpStream::connect(addr).is_err());
|
||||
let _ = h.join();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bind_addr() {
|
||||
|
|
|
@ -8,7 +8,6 @@ mod accept;
|
|||
mod builder;
|
||||
mod config;
|
||||
mod service;
|
||||
mod signals;
|
||||
mod socket;
|
||||
mod test;
|
||||
mod worker;
|
||||
|
@ -57,7 +56,7 @@ enum ServerCommand {
|
|||
WorkerFaulted(usize),
|
||||
Pause(oneshot::Sender<()>),
|
||||
Resume(oneshot::Sender<()>),
|
||||
Signal(signals::Signal),
|
||||
Signal(crate::rt::Signal),
|
||||
/// Whether to try and shut down gracefully
|
||||
Stop {
|
||||
graceful: bool,
|
||||
|
@ -81,7 +80,7 @@ impl Server {
|
|||
ServerBuilder::default()
|
||||
}
|
||||
|
||||
fn signal(&self, sig: signals::Signal) {
|
||||
fn signal(&self, sig: crate::rt::Signal) {
|
||||
let _ = self.0.try_send(ServerCommand::Signal(sig));
|
||||
}
|
||||
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
use std::{future::Future, pin::Pin, task::Context, task::Poll};
|
||||
|
||||
use crate::server::Server;
|
||||
|
||||
/// Different types of process signals
|
||||
#[allow(dead_code)]
|
||||
#[derive(PartialEq, Clone, Copy, Debug)]
|
||||
pub(crate) enum Signal {
|
||||
/// SIGHUP
|
||||
Hup,
|
||||
/// SIGINT
|
||||
Int,
|
||||
/// SIGTERM
|
||||
Term,
|
||||
/// SIGQUIT
|
||||
Quit,
|
||||
}
|
||||
|
||||
pub(super) struct Signals {
|
||||
srv: Server,
|
||||
#[cfg(not(unix))]
|
||||
signal: Pin<Box<dyn Future<Output = std::io::Result<()>>>>,
|
||||
#[cfg(unix)]
|
||||
signals: Vec<(Signal, crate::rt::signal::unix::Signal)>,
|
||||
}
|
||||
|
||||
impl Signals {
|
||||
pub(super) fn new(srv: Server) -> Signals {
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
Signals {
|
||||
srv,
|
||||
signal: Box::pin(crate::rt::signal::ctrl_c()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use crate::rt::signal::unix;
|
||||
|
||||
let sig_map = [
|
||||
(unix::SignalKind::interrupt(), Signal::Int),
|
||||
(unix::SignalKind::hangup(), Signal::Hup),
|
||||
(unix::SignalKind::terminate(), Signal::Term),
|
||||
(unix::SignalKind::quit(), Signal::Quit),
|
||||
];
|
||||
|
||||
let mut signals = Vec::new();
|
||||
for (kind, sig) in sig_map.iter() {
|
||||
match unix::signal(*kind) {
|
||||
Ok(stream) => signals.push((*sig, stream)),
|
||||
Err(e) => log::error!(
|
||||
"Cannot initialize stream handler for {:?} err: {}",
|
||||
sig,
|
||||
e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
Signals { srv, signals }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Signals {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
#[cfg(not(unix))]
|
||||
match self.signal.as_mut().poll(cx) {
|
||||
Poll::Ready(_) => {
|
||||
self.srv.signal(Signal::Int);
|
||||
Poll::Ready(())
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut sigs = Vec::new();
|
||||
for (sig, fut) in self.signals.iter_mut() {
|
||||
if Pin::new(fut).poll_recv(cx).is_ready() {
|
||||
sigs.push(*sig)
|
||||
}
|
||||
}
|
||||
for sig in sigs {
|
||||
self.srv.signal(sig);
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
use std::{convert::TryFrom, fmt, io, net};
|
||||
|
||||
use crate::{io::Io, rt::net::TcpStream};
|
||||
use crate::{io::Io, rt};
|
||||
|
||||
pub(crate) enum Listener {
|
||||
Tcp(net::TcpListener),
|
||||
|
@ -145,17 +145,9 @@ impl TryFrom<Stream> for Io {
|
|||
|
||||
fn try_from(sock: Stream) -> Result<Self, Self::Error> {
|
||||
match sock {
|
||||
Stream::Tcp(stream) => {
|
||||
stream.set_nonblocking(true)?;
|
||||
stream.set_nodelay(true)?;
|
||||
Ok(Io::new(TcpStream::from_std(stream)?))
|
||||
}
|
||||
Stream::Tcp(stream) => rt::from_tcp_stream(stream),
|
||||
#[cfg(unix)]
|
||||
Stream::Uds(stream) => {
|
||||
use crate::rt::net::UnixStream;
|
||||
stream.set_nonblocking(true)?;
|
||||
Ok(Io::new(UnixStream::from_std(stream)?))
|
||||
}
|
||||
Stream::Uds(stream) => rt::from_unix_stream(stream),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,8 @@ use std::{io, net, sync::mpsc, thread};
|
|||
|
||||
use socket2::{Domain, SockAddr, Socket, Type};
|
||||
|
||||
use crate::rt::{net::TcpStream, System};
|
||||
use crate::io::Io;
|
||||
use crate::rt::{tcp_connect, System};
|
||||
use crate::server::{Server, ServerBuilder, StreamServiceFactory};
|
||||
|
||||
/// Start test server
|
||||
|
@ -42,7 +43,7 @@ pub fn test_server<F: StreamServiceFactory>(factory: F) -> TestServer {
|
|||
|
||||
// run server in separate thread
|
||||
thread::spawn(move || {
|
||||
let mut sys = System::new("ntex-test-server");
|
||||
let sys = System::new("ntex-test-server");
|
||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let local_addr = tcp.local_addr().unwrap();
|
||||
|
||||
|
@ -73,7 +74,7 @@ where
|
|||
|
||||
// run server in separate thread
|
||||
thread::spawn(move || {
|
||||
let mut sys = System::new("ntex-test-server");
|
||||
let sys = System::new("ntex-test-server");
|
||||
|
||||
sys.exec(|| {
|
||||
factory(Server::build())
|
||||
|
@ -105,9 +106,9 @@ impl TestServer {
|
|||
self.addr
|
||||
}
|
||||
|
||||
/// Connect to server, return TcpStream
|
||||
pub async fn connect(&self) -> io::Result<TcpStream> {
|
||||
TcpStream::connect(self.addr).await
|
||||
/// Connect to server, return Io
|
||||
pub async fn connect(&self) -> io::Result<Io> {
|
||||
tcp_connect(self.addr).await
|
||||
}
|
||||
|
||||
/// Stop http server
|
||||
|
|
|
@ -611,7 +611,7 @@ where
|
|||
|
||||
// run server in separate thread
|
||||
thread::spawn(move || {
|
||||
let mut sys = System::new("ntex-test-server");
|
||||
let sys = System::new("ntex-test-server");
|
||||
|
||||
let cfg = cfg.clone();
|
||||
let factory = factory.clone();
|
||||
|
@ -619,7 +619,7 @@ where
|
|||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let local_addr = tcp.local_addr().unwrap();
|
||||
|
||||
let srv = sys.exec(|| {
|
||||
let srv = sys.exec(move || {
|
||||
let builder = Server::build().workers(1).disable_signals();
|
||||
|
||||
match cfg.stream {
|
||||
|
|
|
@ -261,7 +261,7 @@ where
|
|||
I: Send + 'static,
|
||||
E: Send + std::fmt::Debug + 'static,
|
||||
{
|
||||
match ntex_rt::task::spawn_blocking(f).await {
|
||||
match crate::rt::spawn_blocking(f).await {
|
||||
Ok(res) => res.map_err(BlockingError::Error),
|
||||
Err(_) => Err(BlockingError::Canceled),
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ fn test_bind() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let srv = sys.exec(|| {
|
||||
let sys = ntex::rt::System::new("test");
|
||||
let srv = sys.exec(move || {
|
||||
Server::build()
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
|
@ -42,9 +42,9 @@ fn test_listen() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let sys = ntex::rt::System::new("test");
|
||||
let lst = net::TcpListener::bind(addr).unwrap();
|
||||
sys.exec(|| {
|
||||
sys.exec(move || {
|
||||
Server::build()
|
||||
.disable_signals()
|
||||
.workers(1)
|
||||
|
@ -70,8 +70,8 @@ fn test_start() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let srv = sys.exec(|| {
|
||||
let sys = ntex::rt::System::new("test");
|
||||
let srv = sys.exec(move || {
|
||||
Server::build()
|
||||
.backlog(100)
|
||||
.disable_signals()
|
||||
|
@ -140,8 +140,8 @@ fn test_on_worker_start() {
|
|||
let h = thread::spawn(move || {
|
||||
let num = num2.clone();
|
||||
let num2 = num2.clone();
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let srv = sys.exec(|| {
|
||||
let sys = ntex::rt::System::new("test");
|
||||
let srv = sys.exec(move || {
|
||||
Server::build()
|
||||
.disable_signals()
|
||||
.configure(move |cfg| {
|
||||
|
@ -196,7 +196,7 @@ fn test_panic_in_worker() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let sys = ntex::rt::System::new("test");
|
||||
let counter = counter2.clone();
|
||||
let srv = sys.exec(move || {
|
||||
let counter = counter.clone();
|
||||
|
@ -215,7 +215,7 @@ fn test_panic_in_worker() {
|
|||
.start()
|
||||
});
|
||||
let _ = tx.send((srv.clone(), ntex::rt::System::current()));
|
||||
sys.exec(move || ntex_rt::spawn(srv.map(|_| ())));
|
||||
sys.exec(move || ntex::rt::spawn(srv.map(|_| ())));
|
||||
let _ = sys.run();
|
||||
});
|
||||
let (_, sys) = rx.recv().unwrap();
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::{sync::mpsc, thread, time::Duration};
|
|||
use tls_openssl::ssl::SslAcceptorBuilder;
|
||||
|
||||
use ntex::web::{self, App, HttpResponse, HttpServer};
|
||||
use ntex::{io::Io, server::TestServer, time::Seconds};
|
||||
use ntex::{rt, server::TestServer, time::Seconds};
|
||||
|
||||
#[cfg(unix)]
|
||||
#[ntex::test]
|
||||
|
@ -13,9 +13,9 @@ async fn test_run() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let sys = ntex::rt::System::new("test");
|
||||
|
||||
let srv = sys.exec(|| {
|
||||
let srv = sys.exec(move || {
|
||||
HttpServer::new(|| {
|
||||
App::new().service(
|
||||
web::resource("/")
|
||||
|
@ -104,10 +104,10 @@ async fn test_openssl() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let sys = ntex::rt::System::new("test");
|
||||
let builder = ssl_acceptor().unwrap();
|
||||
|
||||
let srv = sys.exec(|| {
|
||||
let srv = sys.exec(move || {
|
||||
HttpServer::new(|| {
|
||||
App::new().service(web::resource("/").route(web::to(
|
||||
|req: HttpRequest| async move {
|
||||
|
@ -157,7 +157,7 @@ async fn test_rustls() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let sys = ntex::rt::System::new("test");
|
||||
|
||||
// load ssl keys
|
||||
let cert_file = &mut BufReader::new(File::open("./tests/cert.pem").unwrap());
|
||||
|
@ -174,7 +174,7 @@ async fn test_rustls() {
|
|||
.with_single_cert(cert_chain, keys)
|
||||
.unwrap();
|
||||
|
||||
let srv = sys.exec(|| {
|
||||
let srv = sys.exec(move || {
|
||||
HttpServer::new(|| {
|
||||
App::new().service(web::resource("/").route(web::to(
|
||||
|req: HttpRequest| async move {
|
||||
|
@ -215,9 +215,9 @@ async fn test_bind_uds() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let sys = ntex::rt::System::new("test");
|
||||
|
||||
let srv = sys.exec(|| {
|
||||
let srv = sys.exec(move || {
|
||||
HttpServer::new(|| {
|
||||
App::new().service(
|
||||
web::resource("/")
|
||||
|
@ -244,9 +244,7 @@ async fn test_bind_uds() {
|
|||
.connector(
|
||||
client::Connector::default()
|
||||
.connector(ntex::service::fn_service(|_| async {
|
||||
let stream =
|
||||
ntex::rt::net::UnixStream::connect("/tmp/uds-test").await?;
|
||||
Ok(Io::new(stream))
|
||||
Ok(rt::unix_connect("/tmp/uds-test").await?)
|
||||
}))
|
||||
.finish(),
|
||||
)
|
||||
|
@ -267,7 +265,7 @@ async fn test_listen_uds() {
|
|||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut sys = ntex::rt::System::new("test");
|
||||
let sys = ntex::rt::System::new("test");
|
||||
|
||||
let srv = sys.exec(|| {
|
||||
let lst = std::os::unix::net::UnixListener::bind("/tmp/uds-test2").unwrap();
|
||||
|
@ -298,9 +296,7 @@ async fn test_listen_uds() {
|
|||
.connector(
|
||||
client::Connector::default()
|
||||
.connector(ntex::service::fn_service(|_| async {
|
||||
let stream =
|
||||
ntex::rt::net::UnixStream::connect("/tmp/uds-test2").await?;
|
||||
Ok(Io::new(stream))
|
||||
Ok(rt::unix_connect("/tmp/uds-test2").await?)
|
||||
}))
|
||||
.finish(),
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue