diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index f6385f46..30631a7e 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.1.0] - 2024-06-27 + +* Shutdown service on error and on worker shutdown + ## [2.0.0] - 2024-05-28 * Use async fn for Service::ready() and Service::shutdown() diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index 0fc585d2..4c3dfd84 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.0.0" +version = "2.1.0" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,10 +17,10 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1" -ntex-net = "2.0" -ntex-service = "3.0" +ntex-net = "2" +ntex-service = "3" ntex-rt = "0.4" -ntex-util = "2.0" +ntex-util = "2" async-channel = "2" async-broadcast = "0.7" diff --git a/ntex-server/src/lib.rs b/ntex-server/src/lib.rs index 79cb4b1b..34246abb 100644 --- a/ntex-server/src/lib.rs +++ b/ntex-server/src/lib.rs @@ -1,8 +1,7 @@ -#![deny(rust_2018_idioms, unreachable_pub)] +#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)] #![allow(clippy::let_underscore_future)] use ntex_service::ServiceFactory; -use ntex_util::time::Millis; mod manager; pub mod net; @@ -13,10 +12,8 @@ mod wrk; pub use self::pool::WorkerPool; pub use self::server::Server; -pub use self::wrk::{Worker, WorkerStatus, WorkerStop}; - -#[doc(hidden)] pub use self::signals::{signal, Signal}; +pub use self::wrk::{Worker, WorkerStatus, WorkerStop}; /// Worker id #[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -30,23 +27,11 @@ impl WorkerId { } } -#[non_exhaustive] -#[derive(Debug)] -/// Worker message -pub enum WorkerMessage { - /// New item received - New(T), - /// Graceful shutdown in millis - Shutdown(Millis), - /// Force shutdown - ForceShutdown, -} - #[allow(async_fn_in_trait)] /// Worker service factory. pub trait ServerConfiguration: Send + Clone + 'static { type Item: Send + 'static; - type Factory: ServiceFactory> + 'static; + type Factory: ServiceFactory + 'static; /// Create service factory for handling `WorkerMessage` messages. async fn create(&self) -> Result; diff --git a/ntex-server/src/net/builder.rs b/ntex-server/src/net/builder.rs index 91daae77..8232a24a 100644 --- a/ntex-server/src/net/builder.rs +++ b/ntex-server/src/net/builder.rs @@ -13,7 +13,10 @@ use super::config::{Config, ServiceConfig}; use super::factory::{self, FactoryServiceType, OnWorkerStart, OnWorkerStartWrapper}; use super::{socket::Listener, Connection, ServerStatus, StreamServer, Token}; -/// Server builder +/// Streaming service builder +/// +/// This type can be used to construct an instance of `net streaming server` through a +/// builder-like pattern. pub struct ServerBuilder { token: Token, backlog: i32, @@ -30,6 +33,18 @@ impl Default for ServerBuilder { } } +impl fmt::Debug for ServerBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ServerBuilder") + .field("token", &self.token) + .field("backlog", &self.backlog) + .field("sockets", &self.sockets) + .field("accept", &self.accept) + .field("worker-pool", &self.pool) + .finish() + } +} + impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { @@ -383,4 +398,10 @@ mod tests { let addrs: Vec = Vec::new(); assert!(bind_addr(&addrs[..], 10).is_err()); } + + #[test] + fn test_debug() { + let builder = ServerBuilder::default(); + assert!(format!("{:?}", builder).contains("ServerBuilder")); + } } diff --git a/ntex-server/src/net/config.rs b/ntex-server/src/net/config.rs index 72a30c6f..95332781 100644 --- a/ntex-server/src/net/config.rs +++ b/ntex-server/src/net/config.rs @@ -40,9 +40,10 @@ impl Config { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ServiceConfig(pub(super) Rc>); +#[derive(Debug)] struct Socket { name: String, sockets: Vec<(Token, Listener, &'static str)>, @@ -55,6 +56,16 @@ pub(super) struct ServiceConfigInner { backlog: i32, } +impl fmt::Debug for ServiceConfigInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ServiceConfigInner") + .field("token", &self.token) + .field("backlog", &self.backlog) + .field("sockets", &self.sockets) + .finish() + } +} + impl ServiceConfig { pub(super) fn new(token: Token, backlog: i32) -> Self { ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner { diff --git a/ntex-server/src/net/counter.rs b/ntex-server/src/net/counter.rs index 5d92f173..cc6d14b8 100644 --- a/ntex-server/src/net/counter.rs +++ b/ntex-server/src/net/counter.rs @@ -2,11 +2,13 @@ use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll}; use ntex_util::task::LocalWaker; +#[derive(Debug)] /// Simple counter with ability to notify task on reaching specific number /// /// Counter could be cloned, total count is shared across all clones. pub(super) struct Counter(Rc); +#[derive(Debug)] struct CounterInner { count: Cell, capacity: usize, diff --git a/ntex-server/src/net/factory.rs b/ntex-server/src/net/factory.rs index 3cfa070b..54960c1e 100644 --- a/ntex-server/src/net/factory.rs +++ b/ntex-server/src/net/factory.rs @@ -10,6 +10,7 @@ use super::{Config, Token}; pub(super) type BoxServerService = boxed::BoxServiceFactory<(), Io, (), (), ()>; pub(crate) type FactoryServiceType = Box; +#[derive(Debug)] pub(crate) struct NetService { pub(crate) tokens: Vec<(Token, &'static str)>, pub(crate) factory: BoxServerService, diff --git a/ntex-server/src/net/mod.rs b/ntex-server/src/net/mod.rs index 098effa5..4a4d8b82 100644 --- a/ntex-server/src/net/mod.rs +++ b/ntex-server/src/net/mod.rs @@ -13,7 +13,7 @@ mod test; pub use self::accept::{AcceptLoop, AcceptNotify, AcceptorCommand}; pub use self::builder::{bind_addr, create_tcp_listener, ServerBuilder}; pub use self::config::{Config, ServiceConfig, ServiceRuntime}; -pub use self::service::{ServerMessage, StreamServer}; +pub use self::service::StreamServer; pub use self::socket::{Connection, Stream}; pub use self::test::{build_test_server, test_server, TestServer}; diff --git a/ntex-server/src/net/service.rs b/ntex-server/src/net/service.rs index 52cabc6a..a3cef6ea 100644 --- a/ntex-server/src/net/service.rs +++ b/ntex-server/src/net/service.rs @@ -1,19 +1,20 @@ +use std::fmt; + use ntex_bytes::{Pool, PoolRef}; use ntex_net::Io; use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory}; -use ntex_util::HashMap; +use ntex_util::{future::join_all, HashMap}; -use crate::{ServerConfiguration, WorkerMessage}; +use crate::ServerConfiguration; use super::accept::{AcceptNotify, AcceptorCommand}; use super::counter::Counter; use super::factory::{FactoryServiceType, NetService, OnWorkerStart}; use super::{socket::Connection, Token, MAX_CONNS_COUNTER}; -pub type ServerMessage = WorkerMessage; - pub(super) type BoxService = boxed::BoxService; +/// Net streaming server pub struct StreamServer { notify: AcceptNotify, services: Vec, @@ -34,6 +35,14 @@ impl StreamServer { } } +impl fmt::Debug for StreamServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StreamServer") + .field("services", &self.services.len()) + .finish() + } +} + /// Worker service factory. impl ServerConfiguration for StreamServer { type Item = Connection; @@ -88,11 +97,12 @@ impl Clone for StreamServer { } } +#[derive(Debug)] pub struct StreamService { services: Vec, } -impl ServiceFactory for StreamService { +impl ServiceFactory for StreamService { type Response = (); type Error = (); type Service = StreamServiceImpl; @@ -130,13 +140,14 @@ impl ServiceFactory for StreamService { } } +#[derive(Debug)] pub struct StreamServiceImpl { tokens: HashMap, services: Vec, conns: Counter, } -impl Service for StreamServiceImpl { +impl Service for StreamServiceImpl { type Response = (); type Error = (); @@ -161,35 +172,28 @@ impl Service for StreamServiceImpl { } async fn shutdown(&self) { - for svc in &self.services { - svc.shutdown().await; - } + let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await; log::info!( "Worker service shutdown, {} connections", super::num_connections() ); } - async fn call(&self, req: ServerMessage, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { - match req { - ServerMessage::New(con) => { - if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) { - let stream: Io<_> = con.io.try_into().map_err(|e| { - log::error!("Cannot convert to an async io stream: {}", e); - })?; + async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { + if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) { + let stream: Io<_> = con.io.try_into().map_err(|e| { + log::error!("Cannot convert to an async io stream: {}", e); + })?; - stream.set_tag(tag); - stream.set_memory_pool(*pool); - let guard = self.conns.get(); - let _ = ctx.call(&self.services[*idx], stream).await; - drop(guard); - Ok(()) - } else { - log::error!("Cannot get handler service for connection: {:?}", con); - Err(()) - } - } - _ => Ok(()), + stream.set_tag(tag); + stream.set_memory_pool(*pool); + let guard = self.conns.get(); + let _ = ctx.call(&self.services[*idx], stream).await; + drop(guard); + Ok(()) + } else { + log::error!("Cannot get handler service for connection: {:?}", con); + Err(()) } } } diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index b0e1fed5..f39f79a3 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -6,13 +6,13 @@ use async_broadcast::{self as bus, broadcast}; use async_channel::{unbounded, Receiver, Sender}; use ntex_rt::{spawn, Arbiter}; -use ntex_service::{Pipeline, PipelineBinding, ServiceFactory}; +use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory}; use ntex_util::future::{select, stream_recv, Either, Stream}; use ntex_util::time::{sleep, timeout_checked, Millis}; -use crate::{ServerConfiguration, WorkerId, WorkerMessage}; +use crate::{ServerConfiguration, WorkerId}; -const STOP_TIMEOUT: Millis = Millis::ONE_SEC; +const STOP_TIMEOUT: Millis = Millis(5000); #[derive(Debug)] /// Shutdown worker @@ -232,7 +232,7 @@ impl WorkerAvailabilityTx { /// Service worker /// /// Worker accepts message via unbounded channel and starts processing. -struct WorkerSt>> { +struct WorkerSt> { id: WorkerId, rx: Pin>>, stop: Pin>>, @@ -240,19 +240,17 @@ struct WorkerSt>> { availability: WorkerAvailabilityTx, } -async fn run_worker( - mut svc: PipelineBinding>, - mut wrk: WorkerSt, -) where +async fn run_worker(mut svc: PipelineBinding, mut wrk: WorkerSt) +where T: Send + 'static, - F: ServiceFactory> + 'static, + F: ServiceFactory + 'static, { loop { let fut = poll_fn(|cx| { ready!(svc.poll_ready(cx)?); if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) { - let fut = svc.call(WorkerMessage::New(item)); + let fut = svc.call(item); let _ = spawn(async move { let _ = fut.await; }); @@ -263,28 +261,27 @@ async fn run_worker( match select(fut, stream_recv(&mut wrk.stop)).await { Either::Left(Ok(())) => continue, Either::Left(Err(_)) => { + let _ = ntex_rt::spawn(async move { + svc.shutdown().await; + }); wrk.availability.set(false); } Either::Right(Some(Shutdown { timeout, result })) => { wrk.availability.set(false); - if timeout.is_zero() { - let fut = svc.call(WorkerMessage::ForceShutdown); - let _ = spawn(async move { - let _ = fut.await; - }); - sleep(STOP_TIMEOUT).await; + let timeout = if timeout.is_zero() { + STOP_TIMEOUT } else { - let fut = svc.call(WorkerMessage::Shutdown(timeout)); - let res = timeout_checked(timeout, fut).await; - let _ = result.send(res.is_ok()); + timeout }; - svc.shutdown().await; - log::info!("Stopping worker {:?}", wrk.id); + stop_svc(wrk.id, svc, timeout, Some(result)).await; + return; + } + Either::Right(None) => { + stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; return; } - Either::Right(None) => return, } loop { @@ -294,29 +291,40 @@ async fn run_worker( svc = Pipeline::new(service).bind(); break; } - Either::Left(Err(_)) => sleep(STOP_TIMEOUT).await, + Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await, Either::Right(_) => return, } } } } +async fn stop_svc( + id: WorkerId, + svc: PipelineBinding, + timeout: Millis, + result: Option>, +) where + T: Send + 'static, + F: Service + 'static, +{ + let res = timeout_checked(timeout, svc.shutdown()).await; + if let Some(result) = result { + let _ = result.send(res.is_ok()); + } + + log::info!("Worker {:?} has been stopped", id); +} + async fn create( id: WorkerId, rx: Receiver, stop: Receiver, factory: Result, availability: WorkerAvailabilityTx, -) -> Result< - ( - PipelineBinding>, - WorkerSt, - ), - (), -> +) -> Result<(PipelineBinding, WorkerSt), ()> where T: Send + 'static, - F: ServiceFactory> + 'static, + F: ServiceFactory + 'static, { availability.set(false); let factory = factory?; diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index 0c9c1172..4e4a7ded 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -83,6 +83,8 @@ pub mod server { //! General purpose tcp server pub use ntex_server::net::*; + pub use ntex_server::{signal, Signal}; + #[cfg(feature = "openssl")] pub use ntex_tls::openssl;