From b8a8e98c1c98bbfa72b837eb01bfe8fcd4a0d0e8 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 18 Dec 2021 11:46:11 +0600 Subject: [PATCH] allow to replace async runtime --- ntex-io/Cargo.toml | 15 +- ntex-io/src/dispatcher.rs | 10 +- ntex-io/src/lib.rs | 13 +- ntex-io/src/testing.rs | 273 +++++++++++++++++------------------ ntex-io/src/time.rs | 4 +- ntex-io/src/tokio_impl.rs | 43 +++++- ntex-io/src/tokio_rt.rs | 37 +++++ ntex-rt/Cargo.toml | 14 +- ntex-rt/src/arbiter.rs | 36 ++--- ntex-rt/src/builder.rs | 147 +++++++------------ ntex-rt/src/lib.rs | 78 +++------- ntex-rt/src/runtime.rs | 99 ------------- ntex-rt/src/system.rs | 7 +- ntex-rt/src/time.rs | 88 ----------- ntex-rt/src/tokio.rs | 134 +++++++++++++++++ ntex/CHANGES.md | 2 +- ntex/Cargo.toml | 7 +- ntex/src/connect/rustls.rs | 2 +- ntex/src/connect/service.rs | 39 +++-- ntex/src/http/test.rs | 2 +- ntex/src/server/builder.rs | 4 +- ntex/src/server/socket.rs | 14 +- ntex/src/server/test.rs | 13 +- ntex/src/web/test.rs | 4 +- ntex/tests/server.rs | 20 +-- ntex/tests/web_httpserver.rs | 28 ++-- 26 files changed, 529 insertions(+), 604 deletions(-) create mode 100644 ntex-io/src/tokio_rt.rs delete mode 100644 ntex-rt/src/runtime.rs delete mode 100644 ntex-rt/src/time.rs create mode 100644 ntex-rt/src/tokio.rs diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index bf8b4b56..d6101fac 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -18,25 +18,26 @@ path = "src/lib.rs" [features] default = ["tokio"] -# tokio support -tokio = ["tok-io"] +# tokio traits support +tokio-traits = ["tok-io/net"] + +# tokio runtime support +tokio = ["tok-io/net"] [dependencies] bitflags = "1.3" fxhash = "0.2.1" -ntex-codec = "0.5.1" +ntex-codec = "0.6.0" ntex-bytes = "0.1.7" ntex-util = "0.1.2" ntex-service = "0.2.1" log = "0.4" pin-project-lite = "0.2" -tok-io = { version = "1", package = "tokio", default-features = false, features = ["net"], optional = true } - -backtrace = "*" +tok-io = { version = "1", package = "tokio", default-features = false, optional = true } [dev-dependencies] ntex = "0.5.0-b.0" futures = "0.3" rand = "0.8" -env_logger = "0.9" \ No newline at end of file +env_logger = "0.9" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 53d10dd7..e52af0d7 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -1,15 +1,13 @@ //! Framed transport dispatcher -use std::{ - cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll, time, -}; +use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time}; use ntex_bytes::Pool; use ntex_codec::{Decoder, Encoder}; use ntex_service::{IntoService, Service}; +use ntex_util::future::Either; use ntex_util::time::{now, Seconds}; -use ntex_util::{future::Either, spawn}; -use super::{DispatchItem, IoBoxed, ReadRef, Timer, WriteRef}; +use super::{rt::spawn, DispatchItem, IoBoxed, ReadRef, Timer, WriteRef}; type Response = ::Item; @@ -178,7 +176,7 @@ where } } -impl Future for Dispatcher +impl future::Future for Dispatcher where S: Service, Response = Option>> + 'static, U: Decoder + Encoder + 'static, diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 4cb888bb..54bbfc2c 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -1,17 +1,19 @@ use std::{any::Any, any::TypeId, fmt, future::Future, io, task::Context, task::Poll}; pub mod testing; +pub mod types; mod dispatcher; mod filter; mod state; mod tasks; mod time; -pub mod types; mod utils; -#[cfg(feature = "tokio")] +#[cfg(any(feature = "tokio-traits", feature = "tokio"))] mod tokio_impl; +#[cfg(any(feature = "tokio"))] +mod tokio_rt; use ntex_bytes::BytesMut; use ntex_codec::{Decoder, Encoder}; @@ -128,6 +130,13 @@ where } } +pub mod rt { + //! async runtime helpers + + #[cfg(feature = "tokio")] + pub use crate::tokio_rt::*; +} + #[cfg(test)] mod tests { use super::*; diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 9bc1a8e9..29b77168 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -37,7 +37,7 @@ pub struct IoTest { } bitflags::bitflags! { - struct Flags: u8 { + struct IoTestFlags: u8 { const FLUSHED = 0b0000_0001; const CLOSED = 0b0000_0010; } @@ -61,35 +61,35 @@ struct State { struct Channel { buf: BytesMut, buf_cap: usize, - flags: Flags, + flags: IoTestFlags, waker: AtomicWaker, - read: IoState, - write: IoState, + read: IoTestState, + write: IoTestState, } impl Channel { fn is_closed(&self) -> bool { - self.flags.contains(Flags::CLOSED) + self.flags.contains(IoTestFlags::CLOSED) } } -impl Default for Flags { +impl Default for IoTestFlags { fn default() -> Self { - Flags::empty() + IoTestFlags::empty() } } #[derive(Debug)] -enum IoState { +enum IoTestState { Ok, Pending, Close, Err(io::Error), } -impl Default for IoState { +impl Default for IoTestState { fn default() -> Self { - IoState::Ok + IoTestState::Ok } } @@ -139,19 +139,19 @@ impl IoTest { /// Set read to Pending state pub fn read_pending(&self) { - self.remote.lock().unwrap().borrow_mut().read = IoState::Pending; + self.remote.lock().unwrap().borrow_mut().read = IoTestState::Pending; } /// Set read to error pub fn read_error(&self, err: io::Error) { let channel = self.remote.lock().unwrap(); - channel.borrow_mut().read = IoState::Err(err); + channel.borrow_mut().read = IoTestState::Err(err); channel.borrow().waker.wake(); } /// Set write error on remote side pub fn write_error(&self, err: io::Error) { - self.local.lock().unwrap().borrow_mut().write = IoState::Err(err); + self.local.lock().unwrap().borrow_mut().write = IoTestState::Err(err); self.remote.lock().unwrap().borrow().waker.wake(); } @@ -180,7 +180,7 @@ impl IoTest { { let guard = self.remote.lock().unwrap(); let mut remote = guard.borrow_mut(); - remote.read = IoState::Close; + remote.read = IoTestState::Close; remote.waker.wake(); log::trace!("close remote socket"); } @@ -256,13 +256,13 @@ impl IoTest { } match mem::take(&mut ch.read) { - IoState::Ok => Poll::Pending, - IoState::Close => { - ch.read = IoState::Close; + IoTestState::Ok => Poll::Pending, + IoTestState::Close => { + ch.read = IoTestState::Close; Poll::Ready(Ok(0)) } - IoState::Pending => Poll::Pending, - IoState::Err(e) => Poll::Ready(Err(e)), + IoTestState::Pending => Poll::Pending, + IoTestState::Err(e) => Poll::Ready(Err(e)), } } @@ -275,12 +275,12 @@ impl IoTest { let mut ch = guard.borrow_mut(); match mem::take(&mut ch.write) { - IoState::Ok => { + IoTestState::Ok => { let cap = cmp::min(buf.len(), ch.buf_cap); if cap > 0 { ch.buf.extend(&buf[..cap]); ch.buf_cap -= cap; - ch.flags.remove(Flags::FLUSHED); + ch.flags.remove(IoTestFlags::FLUSHED); ch.waker.wake(); Poll::Ready(Ok(cap)) } else { @@ -297,8 +297,8 @@ impl IoTest { Poll::Pending } } - IoState::Close => Poll::Ready(Ok(0)), - IoState::Pending => { + IoTestState::Close => Poll::Ready(Ok(0)), + IoTestState::Pending => { *self .local .lock() @@ -311,7 +311,7 @@ impl IoTest { .borrow_mut() = Some(cx.waker().clone()); Poll::Pending } - IoState::Err(e) => Poll::Ready(Err(e)), + IoTestState::Err(e) => Poll::Ready(Err(e)), } } } @@ -346,125 +346,15 @@ impl Drop for IoTest { } } -#[cfg(feature = "tokio")] -mod tokio { - use std::task::{Context, Poll}; - use std::{cmp, io, mem, pin::Pin}; - - use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf}; - - use super::{Flags, IoState, IoTest}; - - impl AsyncRead for IoTest { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let guard = self.local.lock().unwrap(); - let mut ch = guard.borrow_mut(); - *ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone()); - - if !ch.buf.is_empty() { - let size = std::cmp::min(ch.buf.len(), buf.remaining()); - let b = ch.buf.split_to(size); - buf.put_slice(&b); - return Poll::Ready(Ok(())); - } - - match mem::take(&mut ch.read) { - IoState::Ok => Poll::Pending, - IoState::Close => { - ch.read = IoState::Close; - Poll::Ready(Ok(())) - } - IoState::Pending => Poll::Pending, - IoState::Err(e) => Poll::Ready(Err(e)), - } - } - } - - impl AsyncWrite for IoTest { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let guard = self.remote.lock().unwrap(); - let mut ch = guard.borrow_mut(); - - match mem::take(&mut ch.write) { - IoState::Ok => { - let cap = cmp::min(buf.len(), ch.buf_cap); - if cap > 0 { - ch.buf.extend(&buf[..cap]); - ch.buf_cap -= cap; - ch.flags.remove(Flags::FLUSHED); - ch.waker.wake(); - Poll::Ready(Ok(cap)) - } else { - *self - .local - .lock() - .unwrap() - .borrow_mut() - .waker - .0 - .lock() - .unwrap() - .borrow_mut() = Some(cx.waker().clone()); - Poll::Pending - } - } - IoState::Close => Poll::Ready(Ok(0)), - IoState::Pending => { - *self - .local - .lock() - .unwrap() - .borrow_mut() - .waker - .0 - .lock() - .unwrap() - .borrow_mut() = Some(cx.waker().clone()); - Poll::Pending - } - IoState::Err(e) => Poll::Ready(Err(e)), - } - } - - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - self.local - .lock() - .unwrap() - .borrow_mut() - .flags - .insert(Flags::CLOSED); - Poll::Ready(Ok(())) - } - } -} - impl IoStream for IoTest { fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = Rc::new(self); - ntex_util::spawn(ReadTask { + crate::rt::spawn(ReadTask { io: io.clone(), state: read, }); - ntex_util::spawn(WriteTask { + crate::rt::spawn(WriteTask { io: io.clone(), state: write, st: IoWriteState::Processing(None), @@ -615,7 +505,7 @@ impl Future for WriteTask { .unwrap() .borrow_mut() .flags - .insert(Flags::CLOSED); + .insert(IoTestFlags::CLOSED); this.state.close(None); Poll::Ready(()) } @@ -652,7 +542,7 @@ impl Future for WriteTask { .unwrap() .borrow_mut() .flags - .insert(Flags::CLOSED); + .insert(IoTestFlags::CLOSED); *st = Shutdown::Stopping; continue; } @@ -752,6 +642,113 @@ pub(super) fn flush_io( } } +#[cfg(any(feature = "tokio", feature = "tokio-traits"))] +mod tokio_impl { + use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf}; + + use super::*; + + impl AsyncRead for IoTest { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let guard = self.local.lock().unwrap(); + let mut ch = guard.borrow_mut(); + *ch.waker.0.lock().unwrap().borrow_mut() = Some(cx.waker().clone()); + + if !ch.buf.is_empty() { + let size = std::cmp::min(ch.buf.len(), buf.remaining()); + let b = ch.buf.split_to(size); + buf.put_slice(&b); + return Poll::Ready(Ok(())); + } + + match mem::take(&mut ch.read) { + IoTestState::Ok => Poll::Pending, + IoTestState::Close => { + ch.read = IoTestState::Close; + Poll::Ready(Ok(())) + } + IoTestState::Pending => Poll::Pending, + IoTestState::Err(e) => Poll::Ready(Err(e)), + } + } + } + + impl AsyncWrite for IoTest { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let guard = self.remote.lock().unwrap(); + let mut ch = guard.borrow_mut(); + + match mem::take(&mut ch.write) { + IoTestState::Ok => { + let cap = cmp::min(buf.len(), ch.buf_cap); + if cap > 0 { + ch.buf.extend(&buf[..cap]); + ch.buf_cap -= cap; + ch.flags.remove(IoTestFlags::FLUSHED); + ch.waker.wake(); + Poll::Ready(Ok(cap)) + } else { + *self + .local + .lock() + .unwrap() + .borrow_mut() + .waker + .0 + .lock() + .unwrap() + .borrow_mut() = Some(cx.waker().clone()); + Poll::Pending + } + } + IoTestState::Close => Poll::Ready(Ok(0)), + IoTestState::Pending => { + *self + .local + .lock() + .unwrap() + .borrow_mut() + .waker + .0 + .lock() + .unwrap() + .borrow_mut() = Some(cx.waker().clone()); + Poll::Pending + } + IoTestState::Err(e) => Poll::Ready(Err(e)), + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + self.local + .lock() + .unwrap() + .borrow_mut() + .flags + .insert(IoTestFlags::CLOSED); + Poll::Ready(Ok(())) + } + } +} + #[cfg(test)] #[allow(clippy::redundant_clone)] mod tests { diff --git a/ntex-io/src/time.rs b/ntex-io/src/time.rs index de6e6d65..95511722 100644 --- a/ntex-io/src/time.rs +++ b/ntex-io/src/time.rs @@ -2,10 +2,10 @@ use std::{ cell::RefCell, collections::BTreeMap, collections::HashSet, rc::Rc, time::Instant, }; -use ntex_util::spawn; use ntex_util::time::{now, sleep, Millis}; -use super::state::{IoRef, IoStateInner}; +use crate::rt::spawn; +use crate::state::{IoRef, IoStateInner}; pub struct Timer(Rc>); diff --git a/ntex-io/src/tokio_impl.rs b/ntex-io/src/tokio_impl.rs index 54005908..6471569b 100644 --- a/ntex-io/src/tokio_impl.rs +++ b/ntex-io/src/tokio_impl.rs @@ -1,12 +1,12 @@ use std::task::{Context, Poll}; -use std::{any, cell::RefCell, cmp, future::Future, io, pin::Pin, rc::Rc}; +use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc}; -use ntex_bytes::{Buf, BufMut}; +use ntex_bytes::{Buf, BufMut, BytesMut}; use ntex_util::time::{sleep, Sleep}; use tok_io::io::{AsyncRead, AsyncWrite, ReadBuf}; use tok_io::net::TcpStream; -use super::{ +use crate::{ types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext, WriteReadiness, }; @@ -71,7 +71,7 @@ impl Future for ReadTask { buf.reserve(hw - remaining); } - match ntex_codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf) { + match poll_read_buf(Pin::new(&mut *io), cx, &mut buf) { Poll::Pending => break, Poll::Ready(Ok(n)) => { if n == 0 { @@ -505,8 +505,7 @@ mod unixstream { buf.reserve(hw - remaining); } - match ntex_codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf) - { + match poll_read_buf(Pin::new(&mut *io), cx, &mut buf) { Poll::Pending => break, Poll::Ready(Ok(n)) => { if n == 0 { @@ -699,3 +698,35 @@ mod unixstream { } } } + +pub fn poll_read_buf( + io: Pin<&mut T>, + cx: &mut Context<'_>, + buf: &mut BytesMut, +) -> Poll> { + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let n = { + let dst = + unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [mem::MaybeUninit]) }; + let mut buf = ReadBuf::uninit(dst); + let ptr = buf.filled().as_ptr(); + if io.poll_read(cx, &mut buf)?.is_pending() { + return Poll::Pending; + } + + // Ensure the pointer does not change from under us + assert_eq!(ptr, buf.filled().as_ptr()); + buf.filled().len() + }; + + // Safety: This is guaranteed to be the number of initialized (and read) + // bytes due to the invariants provided by `ReadBuf::filled`. + unsafe { + buf.advance_mut(n); + } + + Poll::Ready(Ok(n)) +} diff --git a/ntex-io/src/tokio_rt.rs b/ntex-io/src/tokio_rt.rs new file mode 100644 index 00000000..7ff07a9f --- /dev/null +++ b/ntex-io/src/tokio_rt.rs @@ -0,0 +1,37 @@ +//! async net providers +use ntex_util::future::lazy; +use std::future::Future; + +/// Spawn a future on the current thread. This does not create a new Arbiter +/// or Arbiter address, it is simply a helper for spawning futures on the current +/// thread. +/// +/// # Panics +/// +/// This function panics if ntex system is not running. +#[inline] +pub fn spawn(f: F) -> tok_io::task::JoinHandle +where + F: Future + 'static, +{ + tok_io::task::spawn_local(f) +} + +/// Executes a future on the current thread. This does not create a new Arbiter +/// or Arbiter address, it is simply a helper for executing futures on the current +/// thread. +/// +/// # Panics +/// +/// This function panics if ntex system is not running. +#[inline] +pub fn spawn_fn(f: F) -> tok_io::task::JoinHandle +where + F: FnOnce() -> R + 'static, + R: Future + 'static, +{ + spawn(async move { + let r = lazy(|_| f()).await; + r.await + }) +} diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index a54b9620..dc599ca8 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.3.2" +version = "0.4.0-b.0" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -15,7 +15,17 @@ edition = "2018" name = "ntex_rt" path = "src/lib.rs" +[features] +default = ["tokio"] + +# tokio support +tokio = ["tok-io", "ntex-io/tokio"] + [dependencies] +ntex-bytes = "0.1.7" +ntex-io = "0.1.0" ntex-util = "0.1.2" +log = "0.4" pin-project-lite = "0.2" -tokio = { version = "1", default-features = false, features = ["rt", "net", "time", "signal", "sync"] } + +tok-io = { version = "1", package = "tokio", default-features = false, features = ["rt", "signal", "sync"], optional = true } diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index e8eea0bd..c2d2dd45 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -3,12 +3,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot::{channel, error::RecvError, Sender}; -use tokio::task::LocalSet; +use tok_io::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tok_io::sync::oneshot::{channel, error::RecvError, Sender}; -use super::runtime::Runtime; -use super::system::System; +use crate::{system::System, Runtime}; thread_local!( static ADDR: RefCell> = RefCell::new(None); @@ -38,27 +36,27 @@ impl fmt::Debug for Arbiter { } } +impl Default for Arbiter { + fn default() -> Arbiter { + Arbiter::new() + } +} + impl Clone for Arbiter { fn clone(&self) -> Self { Self::with_sender(self.sender.clone()) } } -impl Default for Arbiter { - fn default() -> Self { - Self::new() - } -} - impl Arbiter { - pub(super) fn new_system(local: &LocalSet) -> Self { + pub(super) fn new_system(rt: &Box) -> Self { let (tx, rx) = unbounded_channel(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); STORAGE.with(|cell| cell.borrow_mut().clear()); - local.spawn_local(ArbiterController { stop: None, rx }); + rt.spawn(Box::pin(ArbiterController { stop: None, rx })); arb } @@ -89,7 +87,7 @@ impl Arbiter { let handle = thread::Builder::new() .name(name.clone()) .spawn(move || { - let rt = Runtime::new().expect("Cannot create Runtime"); + let rt = crate::create_runtime(); let arb = Arbiter::with_sender(arb_tx); let (stop, stop_rx) = channel(); @@ -98,10 +96,10 @@ impl Arbiter { System::set_current(sys); // start arbiter controller - rt.spawn(ArbiterController { + rt.spawn(Box::pin(ArbiterController { stop: Some(stop), rx: arb_rx, - }); + })); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); // register arbiter @@ -110,7 +108,9 @@ impl Arbiter { .send(SystemCommand::RegisterArbiter(id, arb)); // run loop - let _ = rt.block_on(stop_rx); + rt.block_on(Box::pin(async move { + let _ = stop_rx.await; + })); // unregister arbiter let _ = System::current() @@ -265,7 +265,7 @@ impl Future for ArbiterController { return Poll::Ready(()); } ArbiterCommand::Execute(fut) => { - tokio::task::spawn_local(fut); + tok_io::task::spawn(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index f76c05a8..7f2e0fb6 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -1,13 +1,11 @@ -use std::{borrow::Cow, future::Future, io}; +use std::{cell::RefCell, future::Future, io, rc::Rc}; use ntex_util::future::lazy; -use tokio::sync::mpsc::unbounded_channel; -use tokio::sync::oneshot::{channel, Receiver}; -use tokio::task::LocalSet; +use tok_io::sync::mpsc::unbounded_channel; +use tok_io::sync::oneshot::{channel, Receiver}; -use super::arbiter::{Arbiter, SystemArbiter}; -use super::runtime::Runtime; -use super::system::System; +use crate::arbiter::{Arbiter, SystemArbiter}; +use crate::{create_runtime, Runtime, System}; /// Builder struct for a ntex runtime. /// @@ -16,8 +14,7 @@ use super::system::System; /// run a function in its context. pub struct Builder { /// Name of the System. Defaults to "ntex" if unset. - name: Cow<'static, str>, - + name: String, /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. stop_on_panic: bool, } @@ -25,14 +22,14 @@ pub struct Builder { impl Builder { pub(super) fn new() -> Self { Builder { - name: Cow::Borrowed("ntex"), + name: "ntex".into(), stop_on_panic: false, } } /// Sets the name of the System. - pub fn name>(mut self, name: T) -> Self { - self.name = Cow::Owned(name.into()); + pub fn name>(mut self, name: N) -> Self { + self.name = name.as_ref().into(); self } @@ -52,15 +49,6 @@ impl Builder { self.create_runtime(|| {}) } - /// Create new System that can run asynchronously. - /// This method could be used to run ntex system in existing tokio - /// runtime. - /// - /// This method panics if it cannot start the system arbiter - pub fn finish_with(self, local: &LocalSet) -> AsyncSystemRunner { - self.create_async_runtime(local) - } - /// This function will start tokio runtime and will finish once the /// `System::stop()` message get called. /// Function `f` get called within tokio runtime context. @@ -71,25 +59,6 @@ impl Builder { self.create_runtime(f).run() } - fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { - let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded_channel(); - - let _system = System::construct( - sys_sender, - Arbiter::new_system(local), - self.stop_on_panic, - ); - - // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); - - // start the system arbiter - let _ = local.spawn_local(arb); - - AsyncSystemRunner { stop, _system } - } - fn create_runtime(self, f: F) -> SystemRunner where F: FnOnce() + 'static, @@ -97,65 +66,26 @@ impl Builder { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded_channel(); - let rt = Runtime::new().unwrap(); - - // set ntex-util spawn fn - ntex_util::set_spawn_fn(|fut| { - tokio::task::spawn_local(fut); - }); + let rt = create_runtime(); // system arbiter - let _system = System::construct( - sys_sender, - Arbiter::new_system(rt.local()), - self.stop_on_panic, - ); + let _system = + System::construct(sys_sender, Arbiter::new_system(&rt), self.stop_on_panic); let arb = SystemArbiter::new(stop_tx, sys_receiver); - rt.spawn(arb); + rt.spawn(Box::pin(arb)); // init system arbiter and run configuration method - rt.block_on(lazy(move |_| f())); + let runner = SystemRunner { rt, stop, _system }; + runner.block_on(lazy(move |_| f())); - SystemRunner { rt, stop, _system } - } -} - -#[derive(Debug)] -pub struct AsyncSystemRunner { - stop: Receiver, - _system: System, -} - -impl AsyncSystemRunner { - /// This function will start event loop and returns a future that - /// resolves once the `System::stop()` function is called. - pub fn run(self) -> impl Future> + Send { - let AsyncSystemRunner { stop, .. } = self; - - // run loop - async move { - match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } - } + runner } } /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] -#[derive(Debug)] pub struct SystemRunner { - rt: Runtime, + rt: Box, stop: Receiver, _system: System, } @@ -167,7 +97,7 @@ impl SystemRunner { let SystemRunner { rt, stop, .. } = self; // run loop - match rt.block_on(stop) { + match block_on(&rt, stop).take() { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -184,23 +114,48 @@ impl SystemRunner { /// Execute a future and wait for result. #[inline] - pub fn block_on(&mut self, fut: F) -> O + pub fn block_on(&self, fut: F) -> R where - F: Future, + F: Future + 'static, + R: 'static, { - self.rt.block_on(fut) + block_on(&self.rt, fut).take() } /// Execute a function with enabled executor. #[inline] - pub fn exec(&mut self, f: F) -> R + pub fn exec(&self, f: F) -> R where - F: FnOnce() -> R, + F: FnOnce() -> R + 'static, + R: 'static, { - self.rt.block_on(lazy(|_| f())) + self.block_on(lazy(|_| f())) } } +pub struct BlockResult(Rc>>); + +impl BlockResult { + pub fn take(self) -> T { + self.0.borrow_mut().take().unwrap() + } +} + +#[inline] +fn block_on(rt: &Box, fut: F) -> BlockResult +where + F: Future + 'static, + R: 'static, +{ + let result = Rc::new(RefCell::new(None)); + let result_inner = result.clone(); + rt.block_on(Box::pin(async move { + let r = fut.await; + *result_inner.borrow_mut() = Some(r); + })); + BlockResult(result) +} + #[cfg(test)] mod tests { use std::sync::mpsc; @@ -213,10 +168,10 @@ mod tests { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() + let rt = tok_io::runtime::Builder::new_current_thread() .build() .unwrap(); - let local = tokio::task::LocalSet::new(); + let local = tok_io::task::LocalSet::new(); let runner = crate::System::build() .stop_on_panic(true) diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 3a65f6d9..f8bdad93 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -1,82 +1,38 @@ //! A runtime implementation that runs everything on the current thread. -use ntex_util::future::lazy; -use std::future::Future; +use std::{future::Future, pin::Pin}; mod arbiter; mod builder; -mod runtime; mod system; -mod time; - -#[doc(hidden)] -pub mod time_driver { - pub use super::time::*; -} - pub use self::arbiter::Arbiter; pub use self::builder::{Builder, SystemRunner}; -pub use self::runtime::Runtime; pub use self::system::System; -/// Spawn a future on the current thread. This does not create a new Arbiter -/// or Arbiter address, it is simply a helper for spawning futures on the current -/// thread. -/// -/// # Panics -/// -/// This function panics if ntex system is not running. -#[inline] -pub fn spawn(f: F) -> self::task::JoinHandle -where - F: Future + 'static, -{ - tokio::task::spawn_local(f) -} - -/// Executes a future on the current thread. This does not create a new Arbiter -/// or Arbiter address, it is simply a helper for executing futures on the current -/// thread. -/// -/// # Panics -/// -/// This function panics if ntex system is not running. -#[inline] -pub fn spawn_fn(f: F) -> tokio::task::JoinHandle -where - F: FnOnce() -> R + 'static, - R: Future + 'static, -{ - tokio::task::spawn_local(async move { - let r = lazy(|_| f()).await; - r.await - }) -} +#[cfg(feature = "tokio")] +mod tokio; +#[cfg(feature = "tokio")] +pub use self::tokio::*; /// Asynchronous signal handling pub mod signal { #[cfg(unix)] pub mod unix { - pub use tokio::signal::unix::*; + pub use tok_io::signal::unix::*; } - pub use tokio::signal::ctrl_c; -} - -/// TCP/UDP/Unix bindings -pub mod net { - pub use tokio::net::UdpSocket; - pub use tokio::net::{TcpListener, TcpStream}; - - #[cfg(unix)] - pub mod unix { - pub use tokio::net::{UnixDatagram, UnixListener, UnixStream}; - } - - #[cfg(unix)] - pub use self::unix::*; + pub use tok_io::signal::ctrl_c; } /// Task management. pub mod task { - pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle}; + pub use tok_io::task::{spawn_blocking, yield_now, JoinError, JoinHandle}; +} + +pub trait Runtime { + /// Spawn a future onto the single-threaded runtime. + fn spawn(&self, future: Pin>>); + + /// Runs the provided future, blocking the current thread until the future + /// completes. + fn block_on(&self, f: Pin>>); } diff --git a/ntex-rt/src/runtime.rs b/ntex-rt/src/runtime.rs deleted file mode 100644 index 364ea299..00000000 --- a/ntex-rt/src/runtime.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::future::Future; -use std::io; -use tokio::{runtime, task::LocalSet}; - -/// Single-threaded runtime provides a way to start reactor -/// and runtime on the current thread. -/// -/// See [module level][mod] documentation for more details. -/// -/// [mod]: index.html -#[derive(Debug)] -pub struct Runtime { - local: LocalSet, - rt: runtime::Runtime, -} - -impl Runtime { - #[allow(clippy::new_ret_no_self)] - /// Returns a new runtime initialized with default configuration values. - pub fn new() -> io::Result { - let rt = runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build()?; - - Ok(Runtime { - rt, - local: LocalSet::new(), - }) - } - - pub(super) fn local(&self) -> &LocalSet { - &self.local - } - - /// Spawn a future onto the single-threaded runtime. - /// - /// See [module level][mod] documentation for more details. - /// - /// [mod]: index.html - /// - /// # Examples - /// - /// ```rust,ignore - /// # use futures::{future, Future, Stream}; - /// use ntex_rt::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|_| { - /// println!("running on the runtime"); - /// })); - /// # } - /// # pub fn main() {} - /// ``` - /// - /// # Panics - /// - /// This function panics if the spawn fails. Failure occurs if the executor - /// is currently at capacity and is unable to spawn a new future. - pub fn spawn(&self, future: F) -> &Self - where - F: Future + 'static, - { - self.local.spawn_local(future); - self - } - - /// Runs the provided future, blocking the current thread until the future - /// completes. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. Once the function returns, any uncompleted futures - /// remain pending in the `Runtime` instance. These futures will not run - /// until `block_on` or `run` is called again. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution by calling `block_on` or `run`. - pub fn block_on(&self, f: F) -> F::Output - where - F: Future, - { - // set ntex-util spawn fn - ntex_util::set_spawn_fn(|fut| { - crate::spawn(fut); - }); - - self.local.block_on(&self.rt, f) - } -} diff --git a/ntex-rt/src/system.rs b/ntex-rt/src/system.rs index 6f8cddd0..343dfe50 100644 --- a/ntex-rt/src/system.rs +++ b/ntex-rt/src/system.rs @@ -1,6 +1,5 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{cell::RefCell, io}; -use tokio::sync::mpsc::UnboundedSender; +use std::{cell::RefCell, io, sync::atomic::AtomicUsize, sync::atomic::Ordering}; +use tok_io::sync::mpsc::UnboundedSender; use super::arbiter::{Arbiter, SystemCommand}; use super::builder::{Builder, SystemRunner}; @@ -49,7 +48,7 @@ impl System { /// Create new system. /// /// This method panics if it can not create tokio runtime - pub fn new>(name: T) -> SystemRunner { + pub fn new(name: &str) -> SystemRunner { Self::build().name(name).finish() } diff --git a/ntex-rt/src/time.rs b/ntex-rt/src/time.rs deleted file mode 100644 index 5f755566..00000000 --- a/ntex-rt/src/time.rs +++ /dev/null @@ -1,88 +0,0 @@ -/// Utilities for tracking time. -use std::{future::Future, pin::Pin, task, task::Poll, time::Duration, time::Instant}; -use tokio::time; - -pub use tokio::time::{interval, Interval}; -pub use tokio::time::{timeout, Timeout}; - -/// Waits until `deadline` is reached. -/// -/// No work is performed while awaiting on the sleep future to complete. `Sleep` -/// operates at millisecond granularity and should not be used for tasks that -/// require high-resolution timers. -/// -/// # Cancellation -/// -/// Canceling a sleep instance is done by dropping the returned future. No additional -/// cleanup work is required. -#[inline] -pub fn sleep_until(deadline: Instant) -> Sleep { - Sleep { - inner: time::sleep_until(deadline.into()), - } -} - -/// Waits until `duration` has elapsed. -/// -/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous -/// analog to `std::thread::sleep`. -pub fn sleep(duration: Duration) -> Sleep { - Sleep { - inner: time::sleep(duration), - } -} - -#[doc(hidden)] -/// Creates new [`Interval`] that yields with interval of `period` with the -/// first tick completing at `start`. The default `MissedTickBehavior` is -/// `Burst`, but this can be configured -/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior). -#[inline] -pub fn interval_at(start: Instant, period: Duration) -> Interval { - time::interval_at(start.into(), period) -} - -pin_project_lite::pin_project! { - /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). - #[derive(Debug)] - pub struct Sleep { - #[pin] - inner: time::Sleep, - } -} - -impl Sleep { - /// Returns the instant at which the future will complete. - #[inline] - pub fn deadline(&self) -> Instant { - self.inner.deadline().into_std() - } - - /// Returns `true` if `Sleep` has elapsed. - /// - /// A `Sleep` instance is elapsed when the requested duration has elapsed. - #[inline] - pub fn is_elapsed(&self) -> bool { - self.inner.is_elapsed() - } - - /// Resets the `Sleep` instance to a new deadline. - /// - /// Calling this function allows changing the instant at which the `Sleep` - /// future completes without having to create new associated state. - /// - /// This function can be called both before and after the future has - /// completed. - #[inline] - pub fn reset(self: Pin<&mut Self>, deadline: Instant) { - self.project().inner.reset(deadline.into()); - } -} - -impl Future for Sleep { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.project().inner.poll(cx) - } -} diff --git a/ntex-rt/src/tokio.rs b/ntex-rt/src/tokio.rs new file mode 100644 index 00000000..bb9f0d63 --- /dev/null +++ b/ntex-rt/src/tokio.rs @@ -0,0 +1,134 @@ +use std::{future::Future, io, net, net::SocketAddr, path::Path, pin::Pin}; + +use ntex_bytes::PoolRef; +use ntex_io::Io; +use ntex_util::future::lazy; +use tok_io::{runtime, task::LocalSet}; + +use crate::Runtime; + +/// Create new single-threaded tokio runtime. +pub fn create_runtime() -> Box { + Box::new(TokioRuntime::new().unwrap()) +} + +/// Opens a TCP connection to a remote host. +pub fn tcp_connect( + addr: SocketAddr, +) -> Pin>>> { + Box::pin(async move { + let sock = tok_io::net::TcpStream::connect(addr).await?; + sock.set_nodelay(true)?; + Ok(Io::new(sock)) + }) +} + +/// Opens a TCP connection to a remote host and use specified memory pool. +pub fn tcp_connect_in( + addr: SocketAddr, + pool: PoolRef, +) -> Pin>>> { + Box::pin(async move { + let sock = tok_io::net::TcpStream::connect(addr).await?; + sock.set_nodelay(true)?; + Ok(Io::with_memory_pool(sock, pool)) + }) +} + +#[cfg(unix)] +/// Opens a unix stream connection. +pub fn unix_connect

(addr: P) -> Pin>>> +where + P: AsRef + 'static, +{ + Box::pin(async move { + let sock = tok_io::net::UnixStream::connect(addr).await?; + Ok(Io::new(sock)) + }) +} + +/// Convert std TcpStream to tokio's TcpStream +pub fn from_tcp_stream(stream: net::TcpStream) -> Result { + stream.set_nonblocking(true)?; + stream.set_nodelay(true)?; + Ok(Io::new(tok_io::net::TcpStream::from_std(stream)?)) +} + +#[cfg(unix)] +/// Convert std UnixStream to tokio's UnixStream +pub fn from_unix_stream( + stream: std::os::unix::net::UnixStream, +) -> Result { + stream.set_nonblocking(true)?; + Ok(Io::new(tok_io::net::UnixStream::from_std(stream)?)) +} + +/// Spawn a future on the current thread. This does not create a new Arbiter +/// or Arbiter address, it is simply a helper for spawning futures on the current +/// thread. +/// +/// # Panics +/// +/// This function panics if ntex system is not running. +#[inline] +pub fn spawn(f: F) -> tok_io::task::JoinHandle +where + F: Future + 'static, +{ + tok_io::task::spawn_local(f) +} + +/// Executes a future on the current thread. This does not create a new Arbiter +/// or Arbiter address, it is simply a helper for executing futures on the current +/// thread. +/// +/// # Panics +/// +/// This function panics if ntex system is not running. +#[inline] +pub fn spawn_fn(f: F) -> tok_io::task::JoinHandle +where + F: FnOnce() -> R + 'static, + R: Future + 'static, +{ + spawn(async move { + let r = lazy(|_| f()).await; + r.await + }) +} + +/// Single-threaded tokio runtime. +#[derive(Debug)] +struct TokioRuntime { + local: LocalSet, + rt: runtime::Runtime, +} +impl TokioRuntime { + /// Returns a new runtime initialized with default configuration values. + fn new() -> io::Result { + let rt = runtime::Builder::new_current_thread().enable_io().build()?; + + Ok(Self { + rt, + local: LocalSet::new(), + }) + } +} + +impl Runtime for TokioRuntime { + /// Spawn a future onto the single-threaded runtime. + fn spawn(&self, future: Pin>>) { + self.local.spawn_local(future); + } + + /// Runs the provided future, blocking the current thread until the future + /// completes. + fn block_on(&self, f: Pin>>) { + // set ntex-util spawn fn + ntex_util::set_spawn_fn(|fut| { + tok_io::task::spawn_local(fut); + }); + + self.local.block_on(&self.rt, f); + } +} diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index a293cf50..3d327bfb 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -6,7 +6,7 @@ * Move ntex::time to ntex-util crate -* Replace mio with poller for accept loop +* Replace mio with polling for accept loop ## [0.4.13] - 2021-12-07 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 17dabe74..c6b0b349 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -43,16 +43,17 @@ http-framework = ["h2", "http", "httparse", "httpdate", "encoding_rs", "mime", "percent-encoding", "serde_json", "serde_urlencoded"] [dependencies] -ntex-codec = "0.5.1" -ntex-rt = "0.3.2" +ntex-codec = "0.6.0" ntex-router = "0.5.1" ntex-service = "0.2.1" ntex-macros = "0.1.3" ntex-util = "0.1.2" ntex-bytes = "0.1.7" -ntex-io = { version = "0.1", features = ["tokio"] } +ntex-io = "0.1" ntex-tls = "0.1" +ntex-rt = { version = "0.4.0-b.0", default-features = false, features = ["tokio"] } + base64 = "0.13" bitflags = "1.3" derive_more = "0.99.14" diff --git a/ntex/src/connect/rustls.rs b/ntex/src/connect/rustls.rs index e832b5e9..24af0037 100644 --- a/ntex/src/connect/rustls.rs +++ b/ntex/src/connect/rustls.rs @@ -141,7 +141,7 @@ mod tests { let factory = Connector::new(config).clone(); let srv = factory.new_service(()).await.unwrap(); - let result = srv + let _result = srv .call(Connect::new("www.rust-lang.org").set_addr(Some(server.addr()))) .await; diff --git a/ntex/src/connect/service.rs b/ntex/src/connect/service.rs index 38060033..9b47e393 100644 --- a/ntex/src/connect/service.rs +++ b/ntex/src/connect/service.rs @@ -1,8 +1,8 @@ use std::task::{Context, Poll}; use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin}; -use crate::io::Io; -use crate::rt::net::TcpStream; +use crate::io::{types, Io}; +use crate::rt::tcp_connect_in; use crate::service::{Service, ServiceFactory}; use crate::util::{Either, PoolId, PoolRef, Ready}; @@ -128,7 +128,7 @@ impl Future for ConnectServiceResponse { if let Some(addr) = addr { self.state = ConnectState::Connect(TcpConnectorResponse::new( - req, port, addr, + req, port, addr, self.pool, )); self.poll(cx) } else if let Some(addr) = req.addr() { @@ -136,6 +136,7 @@ impl Future for ConnectServiceResponse { req, addr.port(), Either::Left(addr), + self.pool, )); self.poll(cx) } else { @@ -144,12 +145,7 @@ impl Future for ConnectServiceResponse { } } }, - ConnectState::Connect(ref mut fut) => match Pin::new(fut).poll(cx)? { - Poll::Pending => Poll::Pending, - Poll::Ready(stream) => { - Poll::Ready(Ok(Io::with_memory_pool(stream, self.pool))) - } - }, + ConnectState::Connect(ref mut fut) => Pin::new(fut).poll(cx), } } } @@ -159,7 +155,8 @@ struct TcpConnectorResponse { req: Option, port: u16, addrs: Option>, - stream: Option>>>>, + stream: Option>>>>, + pool: PoolRef, } impl TcpConnectorResponse { @@ -167,6 +164,7 @@ impl TcpConnectorResponse { req: T, port: u16, addr: Either>, + pool: PoolRef, ) -> TcpConnectorResponse { trace!( "TCP connector - connecting to {:?} port:{}", @@ -177,13 +175,15 @@ impl TcpConnectorResponse { match addr { Either::Left(addr) => TcpConnectorResponse { req: Some(req), - port, addrs: None, - stream: Some(Box::pin(TcpStream::connect(addr))), + stream: Some(tcp_connect_in(addr, pool)), + pool, + port, }, Either::Right(addrs) => TcpConnectorResponse { - req: Some(req), port, + pool, + req: Some(req), addrs: Some(addrs), stream: None, }, @@ -202,7 +202,7 @@ impl TcpConnectorResponse { } impl Future for TcpConnectorResponse { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -212,16 +212,11 @@ impl Future for TcpConnectorResponse { if let Some(new) = this.stream.as_mut() { match new.as_mut().poll(cx) { Poll::Ready(Ok(sock)) => { - if let Err(err) = sock.set_nodelay(true) { - if !this.can_continue(&err) { - return Poll::Ready(Err(err.into())); - } - } - let req = this.req.take().unwrap(); trace!( "TCP connector - successfully connected to connecting to {:?} - {:?}", - req.host(), sock.peer_addr() + req.host(), + sock.query::().get() ); return Poll::Ready(Ok(sock)); } @@ -236,7 +231,7 @@ impl Future for TcpConnectorResponse { // try to connect let addr = this.addrs.as_mut().unwrap().pop_front().unwrap(); - this.stream = Some(Box::pin(TcpStream::connect(addr))); + this.stream = Some(tcp_connect_in(addr, this.pool)); } } } diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index e70917af..2293f911 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -214,7 +214,7 @@ pub fn server(factory: F) -> TestServer { // run server in separate thread thread::spawn(move || { - let mut sys = System::new("test-server"); + let sys = System::new("test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index ac5a5acb..2acf6087 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -553,9 +553,9 @@ mod tests { fn start(tx: mpsc::Sender<(Server, net::SocketAddr)>) -> thread::JoinHandle<()> { thread::spawn(move || { - let mut sys = crate::rt::System::new("test"); + let sys = crate::rt::System::new("test"); let addr = TestServer::unused_addr(); - let srv = sys.exec(|| { + let srv = sys.exec(move || { crate::server::build() .workers(1) .disable_signals() diff --git a/ntex/src/server/socket.rs b/ntex/src/server/socket.rs index e34250f1..29b7ef90 100644 --- a/ntex/src/server/socket.rs +++ b/ntex/src/server/socket.rs @@ -1,6 +1,6 @@ use std::{convert::TryFrom, fmt, io, net}; -use crate::{io::Io, rt::net::TcpStream}; +use crate::{io::Io, rt}; pub(crate) enum Listener { Tcp(net::TcpListener), @@ -145,17 +145,9 @@ impl TryFrom for Io { fn try_from(sock: Stream) -> Result { match sock { - Stream::Tcp(stream) => { - stream.set_nonblocking(true)?; - stream.set_nodelay(true)?; - Ok(Io::new(TcpStream::from_std(stream)?)) - } + Stream::Tcp(stream) => rt::from_tcp_stream(stream), #[cfg(unix)] - Stream::Uds(stream) => { - use crate::rt::net::UnixStream; - stream.set_nonblocking(true)?; - Ok(Io::new(UnixStream::from_std(stream)?)) - } + Stream::Uds(stream) => rt::from_unix_stream(stream), } } } diff --git a/ntex/src/server/test.rs b/ntex/src/server/test.rs index 87eed26d..22101ad8 100644 --- a/ntex/src/server/test.rs +++ b/ntex/src/server/test.rs @@ -3,7 +3,8 @@ use std::{io, net, sync::mpsc, thread}; use socket2::{Domain, SockAddr, Socket, Type}; -use crate::rt::{net::TcpStream, System}; +use crate::io::Io; +use crate::rt::{tcp_connect, System}; use crate::server::{Server, ServerBuilder, StreamServiceFactory}; /// Start test server @@ -42,7 +43,7 @@ pub fn test_server(factory: F) -> TestServer { // run server in separate thread thread::spawn(move || { - let mut sys = System::new("ntex-test-server"); + let sys = System::new("ntex-test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); @@ -73,7 +74,7 @@ where // run server in separate thread thread::spawn(move || { - let mut sys = System::new("ntex-test-server"); + let sys = System::new("ntex-test-server"); sys.exec(|| { factory(Server::build()) @@ -105,9 +106,9 @@ impl TestServer { self.addr } - /// Connect to server, return TcpStream - pub async fn connect(&self) -> io::Result { - TcpStream::connect(self.addr).await + /// Connect to server, return Io + pub async fn connect(&self) -> io::Result { + tcp_connect(self.addr).await } /// Stop http server diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 7e4b9850..85085275 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -611,7 +611,7 @@ where // run server in separate thread thread::spawn(move || { - let mut sys = System::new("ntex-test-server"); + let sys = System::new("ntex-test-server"); let cfg = cfg.clone(); let factory = factory.clone(); @@ -619,7 +619,7 @@ where let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - let srv = sys.exec(|| { + let srv = sys.exec(move || { let builder = Server::build().workers(1).disable_signals(); match cfg.stream { diff --git a/ntex/tests/server.rs b/ntex/tests/server.rs index 964f8ecb..0e9e471e 100644 --- a/ntex/tests/server.rs +++ b/ntex/tests/server.rs @@ -16,8 +16,8 @@ fn test_bind() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); - let srv = sys.exec(|| { + let sys = ntex::rt::System::new("test"); + let srv = sys.exec(move || { Server::build() .workers(1) .disable_signals() @@ -42,9 +42,9 @@ fn test_listen() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); + let sys = ntex::rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - sys.exec(|| { + sys.exec(move || { Server::build() .disable_signals() .workers(1) @@ -70,8 +70,8 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); - let srv = sys.exec(|| { + let sys = ntex::rt::System::new("test"); + let srv = sys.exec(move || { Server::build() .backlog(100) .disable_signals() @@ -140,8 +140,8 @@ fn test_on_worker_start() { let h = thread::spawn(move || { let num = num2.clone(); let num2 = num2.clone(); - let mut sys = ntex::rt::System::new("test"); - let srv = sys.exec(|| { + let sys = ntex::rt::System::new("test"); + let srv = sys.exec(move || { Server::build() .disable_signals() .configure(move |cfg| { @@ -196,7 +196,7 @@ fn test_panic_in_worker() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); + let sys = ntex::rt::System::new("test"); let counter = counter2.clone(); let srv = sys.exec(move || { let counter = counter.clone(); @@ -215,7 +215,7 @@ fn test_panic_in_worker() { .start() }); let _ = tx.send((srv.clone(), ntex::rt::System::current())); - sys.exec(move || ntex_rt::spawn(srv.map(|_| ()))); + sys.exec(move || ntex::rt::spawn(srv.map(|_| ()))); let _ = sys.run(); }); let (_, sys) = rx.recv().unwrap(); diff --git a/ntex/tests/web_httpserver.rs b/ntex/tests/web_httpserver.rs index b8f4bbda..9fe387fd 100644 --- a/ntex/tests/web_httpserver.rs +++ b/ntex/tests/web_httpserver.rs @@ -4,7 +4,7 @@ use std::{sync::mpsc, thread, time::Duration}; use tls_openssl::ssl::SslAcceptorBuilder; use ntex::web::{self, App, HttpResponse, HttpServer}; -use ntex::{io::Io, server::TestServer, time::Seconds}; +use ntex::{rt, server::TestServer, time::Seconds}; #[cfg(unix)] #[ntex::test] @@ -13,9 +13,9 @@ async fn test_run() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); + let sys = ntex::rt::System::new("test"); - let srv = sys.exec(|| { + let srv = sys.exec(move || { HttpServer::new(|| { App::new().service( web::resource("/") @@ -104,10 +104,10 @@ async fn test_openssl() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); + let sys = ntex::rt::System::new("test"); let builder = ssl_acceptor().unwrap(); - let srv = sys.exec(|| { + let srv = sys.exec(move || { HttpServer::new(|| { App::new().service(web::resource("/").route(web::to( |req: HttpRequest| async move { @@ -157,7 +157,7 @@ async fn test_rustls() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); + let sys = ntex::rt::System::new("test"); // load ssl keys let cert_file = &mut BufReader::new(File::open("./tests/cert.pem").unwrap()); @@ -174,7 +174,7 @@ async fn test_rustls() { .with_single_cert(cert_chain, keys) .unwrap(); - let srv = sys.exec(|| { + let srv = sys.exec(move || { HttpServer::new(|| { App::new().service(web::resource("/").route(web::to( |req: HttpRequest| async move { @@ -215,9 +215,9 @@ async fn test_bind_uds() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); + let sys = ntex::rt::System::new("test"); - let srv = sys.exec(|| { + let srv = sys.exec(move || { HttpServer::new(|| { App::new().service( web::resource("/") @@ -244,9 +244,7 @@ async fn test_bind_uds() { .connector( client::Connector::default() .connector(ntex::service::fn_service(|_| async { - let stream = - ntex::rt::net::UnixStream::connect("/tmp/uds-test").await?; - Ok(Io::new(stream)) + Ok(rt::unix_connect("/tmp/uds-test").await?) })) .finish(), ) @@ -267,7 +265,7 @@ async fn test_listen_uds() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let mut sys = ntex::rt::System::new("test"); + let sys = ntex::rt::System::new("test"); let srv = sys.exec(|| { let lst = std::os::unix::net::UnixListener::bind("/tmp/uds-test2").unwrap(); @@ -298,9 +296,7 @@ async fn test_listen_uds() { .connector( client::Connector::default() .connector(ntex::service::fn_service(|_| async { - let stream = - ntex::rt::net::UnixStream::connect("/tmp/uds-test2").await?; - Ok(Io::new(stream)) + Ok(rt::unix_connect("/tmp/uds-test2").await?) })) .finish(), )