Shutdown service on error and on worker shutdown

This commit is contained in:
Nikolay Kim 2024-06-27 18:33:19 +05:00
parent e3dd4b3908
commit bfacc2e2c4
11 changed files with 114 additions and 84 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [2.1.0] - 2024-06-27
* Shutdown service on error and on worker shutdown
## [2.0.0] - 2024-05-28
* Use async fn for Service::ready() and Service::shutdown()

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-server"
version = "2.0.0"
version = "2.1.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -17,10 +17,10 @@ path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1"
ntex-net = "2.0"
ntex-service = "3.0"
ntex-net = "2"
ntex-service = "3"
ntex-rt = "0.4"
ntex-util = "2.0"
ntex-util = "2"
async-channel = "2"
async-broadcast = "0.7"

View file

@ -1,8 +1,7 @@
#![deny(rust_2018_idioms, unreachable_pub)]
#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
#![allow(clippy::let_underscore_future)]
use ntex_service::ServiceFactory;
use ntex_util::time::Millis;
mod manager;
pub mod net;
@ -13,10 +12,8 @@ mod wrk;
pub use self::pool::WorkerPool;
pub use self::server::Server;
pub use self::wrk::{Worker, WorkerStatus, WorkerStop};
#[doc(hidden)]
pub use self::signals::{signal, Signal};
pub use self::wrk::{Worker, WorkerStatus, WorkerStop};
/// Worker id
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
@ -30,23 +27,11 @@ impl WorkerId {
}
}
#[non_exhaustive]
#[derive(Debug)]
/// Worker message
pub enum WorkerMessage<T> {
/// New item received
New(T),
/// Graceful shutdown in millis
Shutdown(Millis),
/// Force shutdown
ForceShutdown,
}
#[allow(async_fn_in_trait)]
/// Worker service factory.
pub trait ServerConfiguration: Send + Clone + 'static {
type Item: Send + 'static;
type Factory: ServiceFactory<WorkerMessage<Self::Item>> + 'static;
type Factory: ServiceFactory<Self::Item> + 'static;
/// Create service factory for handling `WorkerMessage<T>` messages.
async fn create(&self) -> Result<Self::Factory, ()>;

View file

@ -13,7 +13,10 @@ use super::config::{Config, ServiceConfig};
use super::factory::{self, FactoryServiceType, OnWorkerStart, OnWorkerStartWrapper};
use super::{socket::Listener, Connection, ServerStatus, StreamServer, Token};
/// Server builder
/// Streaming service builder
///
/// This type can be used to construct an instance of `net streaming server` through a
/// builder-like pattern.
pub struct ServerBuilder {
token: Token,
backlog: i32,
@ -30,6 +33,16 @@ impl Default for ServerBuilder {
}
}
impl fmt::Debug for ServerBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ServerBuilder")
.field("token", &self.token)
.field("backlog", &self.backlog)
.field("sockets", &self.sockets)
.finish()
}
}
impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {

View file

@ -40,9 +40,10 @@ impl Config {
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
#[derive(Debug)]
struct Socket {
name: String,
sockets: Vec<(Token, Listener, &'static str)>,
@ -55,6 +56,16 @@ pub(super) struct ServiceConfigInner {
backlog: i32,
}
impl fmt::Debug for ServiceConfigInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ServiceConfigInner")
.field("token", &self.token)
.field("backlog", &self.backlog)
.field("sockets", &self.sockets)
.finish()
}
}
impl ServiceConfig {
pub(super) fn new(token: Token, backlog: i32) -> Self {
ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {

View file

@ -2,11 +2,13 @@ use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll};
use ntex_util::task::LocalWaker;
#[derive(Debug)]
/// Simple counter with ability to notify task on reaching specific number
///
/// Counter could be cloned, total count is shared across all clones.
pub(super) struct Counter(Rc<CounterInner>);
#[derive(Debug)]
struct CounterInner {
count: Cell<usize>,
capacity: usize,

View file

@ -10,6 +10,7 @@ use super::{Config, Token};
pub(super) type BoxServerService = boxed::BoxServiceFactory<(), Io, (), (), ()>;
pub(crate) type FactoryServiceType = Box<dyn FactoryService>;
#[derive(Debug)]
pub(crate) struct NetService {
pub(crate) tokens: Vec<(Token, &'static str)>,
pub(crate) factory: BoxServerService,

View file

@ -13,7 +13,7 @@ mod test;
pub use self::accept::{AcceptLoop, AcceptNotify, AcceptorCommand};
pub use self::builder::{bind_addr, create_tcp_listener, ServerBuilder};
pub use self::config::{Config, ServiceConfig, ServiceRuntime};
pub use self::service::{ServerMessage, StreamServer};
pub use self::service::StreamServer;
pub use self::socket::{Connection, Stream};
pub use self::test::{build_test_server, test_server, TestServer};

View file

@ -1,19 +1,20 @@
use std::fmt;
use ntex_bytes::{Pool, PoolRef};
use ntex_net::Io;
use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
use ntex_util::HashMap;
use ntex_util::{future::join_all, HashMap};
use crate::{ServerConfiguration, WorkerMessage};
use crate::ServerConfiguration;
use super::accept::{AcceptNotify, AcceptorCommand};
use super::counter::Counter;
use super::factory::{FactoryServiceType, NetService, OnWorkerStart};
use super::{socket::Connection, Token, MAX_CONNS_COUNTER};
pub type ServerMessage = WorkerMessage<Connection>;
pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
/// Net streaming server
pub struct StreamServer {
notify: AcceptNotify,
services: Vec<FactoryServiceType>,
@ -34,6 +35,14 @@ impl StreamServer {
}
}
impl fmt::Debug for StreamServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamServer")
.field("services", &self.services.len())
.finish()
}
}
/// Worker service factory.
impl ServerConfiguration for StreamServer {
type Item = Connection;
@ -88,11 +97,12 @@ impl Clone for StreamServer {
}
}
#[derive(Debug)]
pub struct StreamService {
services: Vec<NetService>,
}
impl ServiceFactory<ServerMessage> for StreamService {
impl ServiceFactory<Connection> for StreamService {
type Response = ();
type Error = ();
type Service = StreamServiceImpl;
@ -130,13 +140,14 @@ impl ServiceFactory<ServerMessage> for StreamService {
}
}
#[derive(Debug)]
pub struct StreamServiceImpl {
tokens: HashMap<Token, (usize, &'static str, Pool, PoolRef)>,
services: Vec<BoxService>,
conns: Counter,
}
impl Service<ServerMessage> for StreamServiceImpl {
impl Service<Connection> for StreamServiceImpl {
type Response = ();
type Error = ();
@ -161,35 +172,28 @@ impl Service<ServerMessage> for StreamServiceImpl {
}
async fn shutdown(&self) {
for svc in &self.services {
svc.shutdown().await;
}
let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
log::info!(
"Worker service shutdown, {} connections",
super::num_connections()
);
}
async fn call(&self, req: ServerMessage, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
match req {
ServerMessage::New(con) => {
if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) {
let stream: Io<_> = con.io.try_into().map_err(|e| {
log::error!("Cannot convert to an async io stream: {}", e);
})?;
async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) {
let stream: Io<_> = con.io.try_into().map_err(|e| {
log::error!("Cannot convert to an async io stream: {}", e);
})?;
stream.set_tag(tag);
stream.set_memory_pool(*pool);
let guard = self.conns.get();
let _ = ctx.call(&self.services[*idx], stream).await;
drop(guard);
Ok(())
} else {
log::error!("Cannot get handler service for connection: {:?}", con);
Err(())
}
}
_ => Ok(()),
stream.set_tag(tag);
stream.set_memory_pool(*pool);
let guard = self.conns.get();
let _ = ctx.call(&self.services[*idx], stream).await;
drop(guard);
Ok(())
} else {
log::error!("Cannot get handler service for connection: {:?}", con);
Err(())
}
}
}

View file

@ -6,13 +6,13 @@ use async_broadcast::{self as bus, broadcast};
use async_channel::{unbounded, Receiver, Sender};
use ntex_rt::{spawn, Arbiter};
use ntex_service::{Pipeline, PipelineBinding, ServiceFactory};
use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory};
use ntex_util::future::{select, stream_recv, Either, Stream};
use ntex_util::time::{sleep, timeout_checked, Millis};
use crate::{ServerConfiguration, WorkerId, WorkerMessage};
use crate::{ServerConfiguration, WorkerId};
const STOP_TIMEOUT: Millis = Millis::ONE_SEC;
const STOP_TIMEOUT: Millis = Millis(5000);
#[derive(Debug)]
/// Shutdown worker
@ -232,7 +232,7 @@ impl WorkerAvailabilityTx {
/// Service worker
///
/// Worker accepts message via unbounded channel and starts processing.
struct WorkerSt<T, F: ServiceFactory<WorkerMessage<T>>> {
struct WorkerSt<T, F: ServiceFactory<T>> {
id: WorkerId,
rx: Pin<Box<dyn Stream<Item = T>>>,
stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
@ -240,19 +240,17 @@ struct WorkerSt<T, F: ServiceFactory<WorkerMessage<T>>> {
availability: WorkerAvailabilityTx,
}
async fn run_worker<T, F>(
mut svc: PipelineBinding<F::Service, WorkerMessage<T>>,
mut wrk: WorkerSt<T, F>,
) where
async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, T>, mut wrk: WorkerSt<T, F>)
where
T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static,
F: ServiceFactory<T> + 'static,
{
loop {
let fut = poll_fn(|cx| {
ready!(svc.poll_ready(cx)?);
if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) {
let fut = svc.call(WorkerMessage::New(item));
let fut = svc.call(item);
let _ = spawn(async move {
let _ = fut.await;
});
@ -263,28 +261,27 @@ async fn run_worker<T, F>(
match select(fut, stream_recv(&mut wrk.stop)).await {
Either::Left(Ok(())) => continue,
Either::Left(Err(_)) => {
let _ = ntex_rt::spawn(async move {
svc.shutdown().await;
});
wrk.availability.set(false);
}
Either::Right(Some(Shutdown { timeout, result })) => {
wrk.availability.set(false);
if timeout.is_zero() {
let fut = svc.call(WorkerMessage::ForceShutdown);
let _ = spawn(async move {
let _ = fut.await;
});
sleep(STOP_TIMEOUT).await;
let timeout = if timeout.is_zero() {
STOP_TIMEOUT
} else {
let fut = svc.call(WorkerMessage::Shutdown(timeout));
let res = timeout_checked(timeout, fut).await;
let _ = result.send(res.is_ok());
timeout
};
svc.shutdown().await;
log::info!("Stopping worker {:?}", wrk.id);
stop_svc(wrk.id, svc, timeout, Some(result)).await;
return;
}
Either::Right(None) => {
stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await;
return;
}
Either::Right(None) => return,
}
loop {
@ -294,29 +291,40 @@ async fn run_worker<T, F>(
svc = Pipeline::new(service).bind();
break;
}
Either::Left(Err(_)) => sleep(STOP_TIMEOUT).await,
Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await,
Either::Right(_) => return,
}
}
}
}
async fn stop_svc<T, F>(
id: WorkerId,
svc: PipelineBinding<F, T>,
timeout: Millis,
result: Option<oneshot::Sender<bool>>,
) where
T: Send + 'static,
F: Service<T> + 'static,
{
let res = timeout_checked(timeout, svc.shutdown()).await;
if let Some(result) = result {
let _ = result.send(res.is_ok());
}
log::info!("Worker {:?} has been stopped", id);
}
async fn create<T, F>(
id: WorkerId,
rx: Receiver<T>,
stop: Receiver<Shutdown>,
factory: Result<F, ()>,
availability: WorkerAvailabilityTx,
) -> Result<
(
PipelineBinding<F::Service, WorkerMessage<T>>,
WorkerSt<T, F>,
),
(),
>
) -> Result<(PipelineBinding<F::Service, T>, WorkerSt<T, F>), ()>
where
T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static,
F: ServiceFactory<T> + 'static,
{
availability.set(false);
let factory = factory?;

View file

@ -83,6 +83,8 @@ pub mod server {
//! General purpose tcp server
pub use ntex_server::net::*;
pub use ntex_server::{signal, Signal};
#[cfg(feature = "openssl")]
pub use ntex_tls::openssl;