allow to replace signals impl

This commit is contained in:
Nikolay Kim 2021-12-18 17:41:57 +06:00
parent 66524a89a8
commit 087078da51
12 changed files with 146 additions and 169 deletions

View file

@ -7,9 +7,6 @@ use async_channel::{unbounded, Receiver, Sender};
use async_oneshot as oneshot;
use ntex_util::Stream;
//use tok_io::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
//use tok_io::sync::oneshot::{channel, error::RecvError, Sender};
use crate::{system::System, Runtime};
thread_local!(

View file

@ -14,20 +14,6 @@ mod tokio;
#[cfg(feature = "tokio")]
pub use self::tokio::*;
/// Asynchronous signal handling
pub mod signal {
#[cfg(unix)]
pub mod unix {
pub use tok_io::signal::unix::*;
}
pub use tok_io::signal::ctrl_c;
}
/// Task management.
pub mod task {
pub use tok_io::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
}
pub trait Runtime {
/// Spawn a future onto the single-threaded runtime.
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()>>>);
@ -36,3 +22,16 @@ pub trait Runtime {
/// completes.
fn block_on(&self, f: Pin<Box<dyn Future<Output = ()>>>);
}
/// Different types of process signals
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum Signal {
/// SIGHUP
Hup,
/// SIGINT
Int,
/// SIGTERM
Term,
/// SIGQUIT
Quit,
}

View file

@ -1,11 +1,15 @@
use std::{future::Future, io, net, net::SocketAddr, path::Path, pin::Pin};
use std::future::Future;
use std::task::{Context, Poll};
use std::{cell::RefCell, io, mem, net, net::SocketAddr, path::Path, pin::Pin, rc::Rc};
use async_oneshot as oneshot;
use ntex_bytes::PoolRef;
use ntex_io::Io;
use ntex_util::future::lazy;
pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
use tok_io::{runtime, task::LocalSet};
use crate::Runtime;
use crate::{Runtime, Signal};
/// Create new single-threaded tokio runtime.
pub fn create_runtime() -> Box<dyn Runtime> {
@ -97,6 +101,26 @@ where
})
}
thread_local! {
static SRUN: RefCell<bool> = RefCell::new(false);
static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default();
}
/// Register signal handler.
///
/// Signals are handled by oneshots, you have to re-register
/// after each signal.
pub fn signal() -> Option<oneshot::Receiver<Signal>> {
if !SRUN.with(|v| *v.borrow()) {
spawn(Signals::new());
}
SHANDLERS.with(|handlers| {
let (tx, rx) = oneshot::oneshot();
handlers.borrow_mut().push(tx);
Some(rx)
})
}
/// Single-threaded tokio runtime.
#[derive(Debug)]
struct TokioRuntime {
@ -132,3 +156,84 @@ impl Runtime for TokioRuntime {
self.local.block_on(&self.rt, f);
}
}
struct Signals {
#[cfg(not(unix))]
signal: Pin<Box<dyn Future<Output = io::Result<()>>>>,
#[cfg(unix)]
signals: Vec<(Signal, tok_io::signal::unix::Signal)>,
}
impl Signals {
pub(super) fn new() -> Signals {
SRUN.with(|h| *h.borrow_mut() = true);
#[cfg(not(unix))]
{
Signals {
signal: Box::pin(tok_io::signal::ctrl_c()),
}
}
#[cfg(unix)]
{
use tok_io::signal::unix;
let sig_map = [
(unix::SignalKind::interrupt(), Signal::Int),
(unix::SignalKind::hangup(), Signal::Hup),
(unix::SignalKind::terminate(), Signal::Term),
(unix::SignalKind::quit(), Signal::Quit),
];
let mut signals = Vec::new();
for (kind, sig) in sig_map.iter() {
match unix::signal(*kind) {
Ok(stream) => signals.push((*sig, stream)),
Err(e) => log::error!(
"Cannot initialize stream handler for {:?} err: {}",
sig,
e
),
}
}
Signals { signals }
}
}
}
impl Drop for Signals {
fn drop(&mut self) {
SRUN.with(|h| *h.borrow_mut() = false);
}
}
impl Future for Signals {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))]
{
if self.signal.as_mut().poll(cx).is_ready() {
let handlers = SHANDLERS.with(|h| mem::take(&mut *h.borrow_mut()));
for mut sender in handlers {
let _ = sender.send(Signal::Int);
}
}
Poll::Pending
}
#[cfg(unix)]
{
for (sig, fut) in self.signals.iter_mut() {
if Pin::new(fut).poll_recv(cx).is_ready() {
let handlers = SHANDLERS.with(|h| mem::take(&mut *h.borrow_mut()));
for mut sender in handlers {
let _ = sender.send(*sig);
}
}
}
Poll::Pending
}
}
}

View file

@ -26,6 +26,6 @@ pin-project-lite = "0.2.6"
[dev-dependencies]
ntex = "0.5.0-b.0"
ntex-rt = "0.3.2"
ntex-rt = "0.4.0-b.0"
ntex-macros = "0.1.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }

View file

@ -41,7 +41,7 @@ impl<T: Address> Resolver<T> {
format!("{}:{}", req.host(), req.port())
};
let fut = crate::rt::task::spawn_blocking(move || {
let fut = crate::rt::spawn_blocking(move || {
net::ToSocketAddrs::to_socket_addrs(&host)
});

View file

@ -6,7 +6,7 @@ use flate2::write::{GzDecoder, ZlibDecoder};
use super::Writer;
use crate::http::error::PayloadError;
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
use crate::rt::task::{spawn_blocking, JoinHandle};
use crate::rt::{spawn_blocking, JoinHandle};
use crate::{util::Bytes, Stream};
const INPLACE: usize = 2049;

View file

@ -7,7 +7,7 @@ use flate2::write::{GzEncoder, ZlibEncoder};
use crate::http::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::{ContentEncoding, HeaderValue, CONTENT_ENCODING};
use crate::http::{ResponseHead, StatusCode};
use crate::rt::task::{spawn_blocking, JoinHandle};
use crate::rt::{spawn_blocking, JoinHandle};
use crate::util::Bytes;
use super::Writer;

View file

@ -9,7 +9,6 @@ pub use http::Error as HttpError;
use crate::http::body::Body;
use crate::http::response::Response;
use crate::rt::task::JoinError;
use crate::util::{BytesMut, Either};
/// Error that can be converted to `Response`
@ -244,8 +243,8 @@ pub enum BlockingError<E: fmt::Debug> {
impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
impl From<JoinError> for PayloadError {
fn from(_: JoinError) -> Self {
impl From<crate::rt::JoinError> for PayloadError {
fn from(_: crate::rt::JoinError) -> Self {
PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",

View file

@ -8,13 +8,12 @@ use futures_core::Stream;
use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type};
use crate::rt::{spawn, System};
use crate::rt::{spawn, Signal, System};
use crate::{time::sleep, time::Millis, util::join_all, util::PoolId};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime};
use super::service::{Factory, InternalServiceFactory, StreamServiceFactory};
use super::signals::{Signal, Signals};
use super::socket::Listener;
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{Server, ServerCommand, ServerStatus, Token};
@ -332,7 +331,7 @@ impl ServerBuilder {
// handle signals
if !self.no_signals {
spawn(Signals::new(self.server.clone()));
spawn(signals(self.server.clone()));
}
// start http server actor
@ -489,6 +488,21 @@ impl Future for ServerBuilder {
}
}
async fn signals(srv: Server) {
loop {
if let Some(rx) = crate::rt::signal() {
if let Ok(sig) = rx.await {
srv.signal(sig);
} else {
return;
}
} else {
log::info!("Signals are not supported by current runtime");
return;
}
}
}
pub(super) fn bind_addr<S: net::ToSocketAddrs>(
addr: S,
backlog: i32,
@ -543,51 +557,6 @@ pub(crate) fn create_tcp_listener(
#[cfg(test)]
mod tests {
use super::*;
use crate::server::{signals, Server, TestServer};
use crate::service::fn_service;
#[cfg(unix)]
#[crate::rt_test]
async fn test_signals() {
use std::{net, sync::mpsc, thread};
fn start(tx: mpsc::Sender<(Server, net::SocketAddr)>) -> thread::JoinHandle<()> {
thread::spawn(move || {
let sys = crate::rt::System::new("test");
let addr = TestServer::unused_addr();
let srv = sys.exec(move || {
crate::server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})
.unwrap()
.start()
});
let _ = tx.send((srv, addr));
let _ = sys.run();
})
}
for sig in &[
signals::Signal::Int,
signals::Signal::Term,
signals::Signal::Quit,
] {
let (tx, rx) = mpsc::channel();
let h = start(tx);
let (srv, addr) = rx.recv().unwrap();
crate::time::sleep(Millis(300)).await;
assert!(net::TcpStream::connect(addr).is_ok());
srv.signal(*sig);
crate::time::sleep(Millis(300)).await;
assert!(net::TcpStream::connect(addr).is_err());
let _ = h.join();
}
}
#[test]
fn test_bind_addr() {

View file

@ -8,7 +8,6 @@ mod accept;
mod builder;
mod config;
mod service;
mod signals;
mod socket;
mod test;
mod worker;
@ -57,7 +56,7 @@ enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(signals::Signal),
Signal(crate::rt::Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
@ -81,7 +80,7 @@ impl Server {
ServerBuilder::default()
}
fn signal(&self, sig: signals::Signal) {
fn signal(&self, sig: crate::rt::Signal) {
let _ = self.0.try_send(ServerCommand::Signal(sig));
}

View file

@ -1,91 +0,0 @@
use std::{future::Future, pin::Pin, task::Context, task::Poll};
use crate::server::Server;
/// Different types of process signals
#[allow(dead_code)]
#[derive(PartialEq, Clone, Copy, Debug)]
pub(crate) enum Signal {
/// SIGHUP
Hup,
/// SIGINT
Int,
/// SIGTERM
Term,
/// SIGQUIT
Quit,
}
pub(super) struct Signals {
srv: Server,
#[cfg(not(unix))]
signal: Pin<Box<dyn Future<Output = std::io::Result<()>>>>,
#[cfg(unix)]
signals: Vec<(Signal, crate::rt::signal::unix::Signal)>,
}
impl Signals {
pub(super) fn new(srv: Server) -> Signals {
#[cfg(not(unix))]
{
Signals {
srv,
signal: Box::pin(crate::rt::signal::ctrl_c()),
}
}
#[cfg(unix)]
{
use crate::rt::signal::unix;
let sig_map = [
(unix::SignalKind::interrupt(), Signal::Int),
(unix::SignalKind::hangup(), Signal::Hup),
(unix::SignalKind::terminate(), Signal::Term),
(unix::SignalKind::quit(), Signal::Quit),
];
let mut signals = Vec::new();
for (kind, sig) in sig_map.iter() {
match unix::signal(*kind) {
Ok(stream) => signals.push((*sig, stream)),
Err(e) => log::error!(
"Cannot initialize stream handler for {:?} err: {}",
sig,
e
),
}
}
Signals { srv, signals }
}
}
}
impl Future for Signals {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))]
match self.signal.as_mut().poll(cx) {
Poll::Ready(_) => {
self.srv.signal(Signal::Int);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
#[cfg(unix)]
{
let mut sigs = Vec::new();
for (sig, fut) in self.signals.iter_mut() {
if Pin::new(fut).poll_recv(cx).is_ready() {
sigs.push(*sig)
}
}
for sig in sigs {
self.srv.signal(sig);
}
Poll::Pending
}
}
}

View file

@ -261,7 +261,7 @@ where
I: Send + 'static,
E: Send + std::fmt::Debug + 'static,
{
match ntex_rt::task::spawn_blocking(f).await {
match crate::rt::spawn_blocking(f).await {
Ok(res) => res.map_err(BlockingError::Error),
Err(_) => Err(BlockingError::Canceled),
}