cleanup rt api

This commit is contained in:
Nikolay Kim 2020-04-15 13:04:33 +06:00
parent ee124716db
commit 12a18436c9
12 changed files with 134 additions and 120 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.1] - 2020-04-15
* Api cleanup
## [0.1.0] - 2020-03-31
* Remove support to spawn futures with stopped runtime

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]

View file

@ -8,7 +8,7 @@ use std::{fmt, thread};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot::{channel, Canceled, Sender};
use futures::{future, Future, FutureExt, Stream};
use futures::{Future, Stream};
use tokio::task::LocalSet;
use super::runtime::Runtime;
@ -28,17 +28,6 @@ pub(super) enum ArbiterCommand {
ExecuteFn(Box<dyn FnExec>),
}
impl fmt::Debug for ArbiterCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
ArbiterCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"),
}
}
}
#[derive(Debug)]
/// Arbiters provide an asynchronous execution environment for actors, functions
/// and futures. When an Arbiter is created, it spawns a new OS thread, and
/// hosts an event loop. Some Arbiter functions execute on the current thread.
@ -47,6 +36,12 @@ pub struct Arbiter {
thread_handle: Option<thread::JoinHandle<()>>,
}
impl fmt::Debug for Arbiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Arbiter")
}
}
impl Clone for Arbiter {
fn clone(&self) -> Self {
Self::with_sender(self.sender.clone())
@ -139,30 +134,6 @@ impl Arbiter {
}
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// Panics if arbiter is not started.
#[inline]
pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
tokio::task::spawn_local(future);
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
R: Future<Output = ()> + 'static,
{
Arbiter::spawn(future::lazy(|_| f()).flatten())
}
/// Send a future to the Arbiter's thread, and spawn it.
pub fn send<F>(&self, future: F)
where
@ -173,19 +144,6 @@ impl Arbiter {
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
}
/// Send a function to the Arbiter's thread, and execute it. Any result from the function
/// is discarded.
pub fn exec_fn<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let _ = self
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
f();
})));
}
/// Send a function to the Arbiter's thread. This function will be executed asynchronously.
/// A future is created, and when resolved will contain the result of the function sent
/// to the Arbiters thread.
@ -205,7 +163,20 @@ impl Arbiter {
rx
}
/// Set item to arbiter storage
/// Send a function to the Arbiter's thread, and execute it. Any result from the function
/// is discarded.
pub fn exec_fn<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let _ = self
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
f();
})));
}
/// Set item to current arbiter's storage
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| {
cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item))

View file

@ -49,14 +49,16 @@ impl Builder {
/// Create new System.
///
/// This method panics if it can not create tokio runtime
pub fn build(self) -> SystemRunner {
pub fn finish(self) -> SystemRunner {
self.create_runtime(|| {})
}
/// Create new System that can run asynchronously.
/// This method could be used to run ntex system in existing tokio
/// runtime.
///
/// This method panics if it cannot start the system arbiter
pub(super) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
pub fn finish_with(self, local: &LocalSet) -> AsyncSystemRunner {
self.create_async_runtime(local)
}
@ -115,7 +117,7 @@ impl Builder {
}
#[derive(Debug)]
pub(super) struct AsyncSystemRunner {
pub struct AsyncSystemRunner {
stop: Receiver<i32>,
system: System,
}
@ -123,9 +125,7 @@ pub(super) struct AsyncSystemRunner {
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called.
pub(super) fn run_nonblocking(
self,
) -> impl Future<Output = Result<(), io::Error>> + Send {
pub fn run(self) -> impl Future<Output = Result<(), io::Error>> + Send {
let AsyncSystemRunner { stop, .. } = self;
// run loop
@ -198,3 +198,50 @@ impl SystemRunner {
self.rt.block_on(lazy(|_| f()))
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use std::thread;
use super::*;
#[test]
fn test_async() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let local = tokio::task::LocalSet::new();
let runner = crate::System::build()
.stop_on_panic(true)
.finish_with(&local);
tx.send(System::current()).unwrap();
let _ = rt.block_on(local.run_until(runner.run()));
});
let mut s = System::new("test");
let sys = rx.recv().unwrap();
let id = sys.id();
let (tx, rx) = mpsc::channel();
sys.arbiter().exec_fn(move || {
let _ = tx.send(System::current().id());
});
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
let id2 = s
.block_on(sys.arbiter().exec(|| System::current().id()))
.unwrap();
assert_eq!(id, id2);
let (tx, rx) = mpsc::channel();
sys.arbiter().send(Box::pin(async move {
let _ = tx.send(System::current().id());
}));
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
}
}

View file

@ -1,4 +1,6 @@
//! A runtime implementation that runs everything on the current thread.
use futures::future::{self, Future, FutureExt};
mod arbiter;
mod builder;
mod runtime;
@ -15,21 +17,34 @@ pub use ntex_rt_macros::{rt_main as main, rt_test as test};
#[doc(hidden)]
pub use actix_threadpool as blocking;
/// Spawns a future on the current arbiter.
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F)
pub fn spawn<F>(f: F) -> tokio::task::JoinHandle<F::Output>
where
F: futures::Future<Output = ()> + 'static,
F: futures::Future + 'static,
{
if !System::is_set() {
panic!("System is not running");
tokio::task::spawn_local(f)
}
Arbiter::spawn(f);
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
///
/// # Panics
///
/// This function panics if ntex system is not running.
pub fn spawn_fn<F, R>(f: F) -> tokio::task::JoinHandle<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
{
tokio::task::spawn_local(future::lazy(|_| f()).flatten())
}
/// Asynchronous signal handling

View file

@ -1,10 +1,8 @@
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::channel::mpsc::UnboundedSender;
use tokio::task::LocalSet;
use super::arbiter::{Arbiter, SystemCommand};
use super::builder::{Builder, SystemRunner};
@ -45,7 +43,7 @@ impl System {
///
/// This allows to customize the runtime. See struct level docs on
/// `Builder` for more information.
pub fn builder() -> Builder {
pub fn build() -> Builder {
Builder::new()
}
@ -54,21 +52,7 @@ impl System {
///
/// This method panics if it can not create tokio runtime
pub fn new<T: Into<String>>(name: T) -> SystemRunner {
Self::builder().name(name).build()
}
#[allow(clippy::new_ret_no_self)]
/// Create new system using provided tokio Handle.
///
/// This method panics if it can not spawn system arbiter
pub fn run_in_tokio<T: Into<String>>(
name: T,
local: &LocalSet,
) -> impl Future<Output = io::Result<()>> {
Self::builder()
.name(name)
.build_async(local)
.run_nonblocking()
Self::build().name(name).finish()
}
/// Get current running system.
@ -79,11 +63,6 @@ impl System {
})
}
/// Set current running system.
pub(super) fn is_set() -> bool {
CURRENT.with(|cell| cell.borrow().is_some())
}
/// Set current running system.
#[doc(hidden)]
pub fn set_current(sys: System) {
@ -92,17 +71,6 @@ impl System {
})
}
/// Execute function with system reference.
pub fn with_current<F, R>(f: F) -> R
where
F: FnOnce(&System) -> R,
{
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => f(sys),
None => panic!("System is not running"),
})
}
/// System id
pub fn id(&self) -> usize {
self.id
@ -140,6 +108,6 @@ impl System {
where
F: FnOnce() + 'static,
{
Self::builder().run(f)
Builder::new().run(f)
}
}

View file

@ -1,11 +1,13 @@
# Changes
## [0.1.11] - 2020-04-xx
## [0.1.11] - 2020-04-15
* ntex::web: Allow to add multiple routes at once
* ntex::web: Add `App::with_config` method, simplifies app service factory.
* ntex::web: Fix error type for Either responder
## [0.1.10] - 2020-04-13
* ntex::channel: mpsc::Sender::close() must close receiver

View file

@ -283,7 +283,7 @@ impl Spawn for Handle {
where
F: Future<Output = Result<(), ProtoError>> + Send + 'static,
{
crate::rt::Arbiter::spawn(future.map(|_| ()));
crate::rt::spawn(future.map(|_| ()));
}
}

View file

@ -332,7 +332,7 @@ where
{
if let Some(timeout) = self.disconnect_timeout {
if let ConnectionType::H1(io) = conn.io {
crate::rt::spawn(CloseConnection::new(io, timeout))
crate::rt::spawn(CloseConnection::new(io, timeout));
}
}
} else {
@ -346,7 +346,7 @@ where
if let ConnectionType::H1(io) = io {
crate::rt::spawn(CloseConnection::new(
io, timeout,
))
));
}
}
continue;
@ -378,7 +378,7 @@ where
self.acquired -= 1;
if let Some(timeout) = self.disconnect_timeout {
if let ConnectionType::H1(io) = io {
crate::rt::spawn(CloseConnection::new(io, timeout))
crate::rt::spawn(CloseConnection::new(io, timeout));
}
}
self.check_availibility();
@ -527,7 +527,7 @@ where
h2: None,
rx: Some(rx),
inner: Some(inner),
})
});
}
}

View file

@ -978,9 +978,8 @@ mod tests {
stream,
None,
None,
)
.map(|_| ()),
)
),
);
}
fn load(decoder: &mut ClientCodec, buf: &mut BytesMut) -> ResponseHead {

View file

@ -390,7 +390,7 @@ impl ServerBuilder {
}
ready(())
}),
)
);
} else {
// we need to stop system if server was spawned
if self.exit {

View file

@ -916,7 +916,7 @@ impl TestServer {
}
pub async fn load_body<S>(
&mut self,
&self,
mut response: ClientResponse<S>,
) -> Result<Bytes, PayloadError>
where
@ -927,7 +927,7 @@ impl TestServer {
/// Connect to websocket server at a given path
pub async fn ws_at(
&mut self,
&self,
path: &str,
) -> Result<Framed<impl AsyncRead + AsyncWrite, crate::ws::Codec>, WsClientError>
{
@ -938,7 +938,7 @@ impl TestServer {
/// Connect to a websocket server
pub async fn ws(
&mut self,
&self,
) -> Result<Framed<impl AsyncRead + AsyncWrite, crate::ws::Codec>, WsClientError>
{
self.ws_at("/").await
@ -1169,7 +1169,7 @@ mod tests {
async fn test_test_methods() {
let srv = server(|| {
App::new().service(
web::resource("/index.html").route((
web::resource("/").route((
web::route()
.method(Method::PUT)
.to(|| async { HttpResponse::Ok() }),
@ -1184,15 +1184,23 @@ mod tests {
.to(|| async { HttpResponse::Ok() }),
)),
)
})
.await;
});
assert_eq!(srv.put().send().await.unwrap().status(), StatusCode::OK);
assert_eq!(srv.patch().send().await.unwrap().status(), StatusCode::OK);
assert_eq!(srv.delete().send().await.unwrap().status(), StatusCode::OK);
assert_eq!(srv.options().send().await.unwrap().status(), StatusCode::OK);
assert_eq!(srv.put("/").send().await.unwrap().status(), StatusCode::OK);
assert_eq!(
srv.patch("/").send().await.unwrap().status(),
StatusCode::OK
);
assert_eq!(
srv.delete("/").send().await.unwrap().status(),
StatusCode::OK
);
assert_eq!(
srv.options("/").send().await.unwrap().status(),
StatusCode::OK
);
let res = srv.put().send().await.unwrap();
let res = srv.put("").send().await.unwrap();
assert_eq!(srv.load_body(res).await.unwrap(), Bytes::new());
}
}