diff --git a/Cargo.toml b/Cargo.toml index 06c3e044..5ff728e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "ntex-http", "ntex-router", "ntex-rt", + "ntex-server", "ntex-service", "ntex-tls", "ntex-macros", @@ -27,6 +28,7 @@ ntex-io = { path = "ntex-io" } ntex-http = { path = "ntex-http" } ntex-router = { path = "ntex-router" } ntex-rt = { path = "ntex-rt" } +ntex-server = { path = "ntex-server" } ntex-service = { path = "ntex-service" } ntex-tls = { path = "ntex-tls" } ntex-macros = { path = "ntex-macros" } diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index f81a49be..58f3ae8e 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -1,7 +1,7 @@ #![allow(clippy::let_underscore_future)] use std::any::{Any, TypeId}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread}; use async_channel::{unbounded, Receiver, Sender}; @@ -320,10 +320,17 @@ impl Future for SystemArbiter { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.commands).poll_next(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(cmd)) => match cmd { + let cmd = ready!(Pin::new(&mut self.commands).poll_next(cx)); + log::debug!("Received system command: {:?}", cmd); + match cmd { + None => { + log::debug!("System stopped"); + return Poll::Ready(()); + } + Some(cmd) => match cmd { SystemCommand::Exit(code) => { + log::debug!("Stopping system with {} code", code); + // stop arbiters for arb in self.arbiters.values() { arb.stop(); @@ -340,7 +347,6 @@ impl Future for SystemArbiter { self.arbiters.remove(&name); } }, - Poll::Pending => return Poll::Pending, } } } diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md new file mode 100644 index 00000000..255226c7 --- /dev/null +++ b/ntex-server/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## [0.1.0] - 2024-03-xx + +* Release diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml new file mode 100644 index 00000000..5b4e09ae --- /dev/null +++ b/ntex-server/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "ntex-server" +version = "0.1.0" +authors = ["ntex contributors "] +description = "Server for ntex framework" +keywords = ["network", "framework", "async", "futures"] +homepage = "https://ntex.rs" +repository = "https://github.com/ntex-rs/ntex.git" +documentation = "https://docs.rs/ntex-server/" +categories = ["network-programming", "asynchronous"] +license = "MIT OR Apache-2.0" +edition = "2021" + +[lib] +name = "ntex_server" +path = "src/lib.rs" + +[dependencies] +ntex-bytes = "0.1" +ntex-service = "2.0" +ntex-rt = "0.4" +ntex-util = "1.0" + +async-channel = "2.2" +async-broadcast = "0.7" +log = "0.4" +oneshot = { version = "0.1", default-features = false, features = ["async"] } + +[dev-dependencies] +ntex = { version = "1", features = ["tokio"] } +ntex-macros = "0.1.3" + +[target.'cfg(target_family = "unix")'.dependencies] +signal-hook = { version = "0.3", features=["iterator"] } + +[target.'cfg(target_family = "windows")'.dependencies] +ctrlc = "3.4" diff --git a/ntex-server/LICENSE-APACHE b/ntex-server/LICENSE-APACHE new file mode 120000 index 00000000..965b606f --- /dev/null +++ b/ntex-server/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/ntex-server/LICENSE-MIT b/ntex-server/LICENSE-MIT new file mode 120000 index 00000000..76219eb7 --- /dev/null +++ b/ntex-server/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/ntex-server/src/lib.rs b/ntex-server/src/lib.rs new file mode 100644 index 00000000..3083d788 --- /dev/null +++ b/ntex-server/src/lib.rs @@ -0,0 +1,61 @@ +#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)] +#![allow(clippy::let_underscore_future)] + +use ntex_service::ServiceFactory; +use ntex_util::time::Millis; + +mod manager; +mod pool; +mod server; +mod signals; +mod wrk; + +pub use self::pool::WorkerPool; +pub use self::server::Server; +pub use self::wrk::{Worker, WorkerStatus, WorkerStop}; + +/// Worker id +#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct WorkerId(usize); + +impl WorkerId { + pub(self) fn next(&mut self) -> WorkerId { + let id = WorkerId(self.0); + self.0 += 1; + id + } +} + +#[non_exhaustive] +#[derive(Debug)] +/// Worker message +pub enum WorkerMessage { + /// New item received + New(T), + /// Graceful shutdown in millis + Shutdown(Millis), + /// Force shutdown + ForceShutdown, +} + +#[allow(async_fn_in_trait)] +/// Worker service factory. +pub trait ServerConfiguration: Send + Clone + 'static { + type Item: Send + 'static; + type Factory: ServiceFactory> + 'static; + + /// Create service factory for handling `WorkerMessage` messages. + async fn create(&self) -> Result; + + /// Server is paused. + fn paused(&self) {} + + /// Server is resumed. + fn resumed(&self) {} + + /// Server is stopped + fn terminate(&self) {} + + /// Server is stopped + async fn stop(&self) {} +} diff --git a/ntex-server/src/manager.rs b/ntex-server/src/manager.rs new file mode 100644 index 00000000..9811b42a --- /dev/null +++ b/ntex-server/src/manager.rs @@ -0,0 +1,341 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::{cell::Cell, cell::RefCell, collections::VecDeque, rc::Rc, sync::Arc}; + +use async_channel::{unbounded, Receiver, Sender}; +use ntex_rt::System; +use ntex_util::{future::join_all, time::sleep, time::Millis}; + +use crate::server::ServerShared; +use crate::signals::Signal; +use crate::{Server, ServerConfiguration, Worker, WorkerId, WorkerPool, WorkerStatus}; + +const STOP_DELAY: Millis = Millis(500); +const RESTART_DELAY: Millis = Millis(250); + +#[derive(Clone)] +pub(crate) struct ServerManager(Rc>); + +#[derive(Debug)] +pub(crate) enum ServerCommand { + Item(T), + Pause(oneshot::Sender<()>), + Resume(oneshot::Sender<()>), + Signal(Signal), + Stop { + graceful: bool, + completion: Option>, + }, + NotifyStopped(oneshot::Sender<()>), + Worker(Update), +} + +#[derive(Debug)] +pub(crate) enum Update { + Available(Worker), + Unavailable(Worker), +} + +struct Inner { + id: Cell, + factory: F, + cfg: WorkerPool, + shared: Arc, + stopping: Cell, + stop_notify: RefCell>>, + cmd: Sender>, +} + +impl ServerManager { + pub(crate) fn start(cfg: WorkerPool, factory: F) -> Server { + log::info!("Starting {} workers", cfg.num); + + let (tx, rx) = unbounded(); + + let no_signals = cfg.no_signals; + let shared = Arc::new(ServerShared { + paused: AtomicBool::new(true), + }); + let mgr = ServerManager(Rc::new(Inner { + cfg, + factory, + id: Cell::new(WorkerId::default()), + shared: shared.clone(), + stopping: Cell::new(false), + stop_notify: RefCell::new(Vec::new()), + cmd: tx.clone(), + })); + + // handle cmd + let _ = ntex_rt::spawn(handle_cmd(mgr.clone(), rx)); + + // start workers + for _ in 0..mgr.0.cfg.num { + start_worker(mgr.clone()); + } + + let srv = Server::new(tx, shared); + + // handle signals + if !no_signals { + crate::signals::start(srv.clone()); + } + + srv + } + + pub(crate) fn factory(&self) -> F { + self.0.factory.clone() + } + + pub(crate) fn next_id(&self) -> WorkerId { + let mut id = self.0.id.get(); + let next_id = id.next(); + self.0.id.set(id); + next_id + } + + pub(crate) fn pause(&self) { + self.0.shared.paused.store(true, Ordering::Release); + self.0.factory.paused(); + } + + pub(crate) fn resume(&self) { + self.0.shared.paused.store(false, Ordering::Release); + self.0.factory.resumed(); + } + + fn available(&self, wrk: Worker) { + let _ = self + .0 + .cmd + .try_send(ServerCommand::Worker(Update::Available(wrk))); + } + + fn unavailable(&self, wrk: Worker) { + let _ = self + .0 + .cmd + .try_send(ServerCommand::Worker(Update::Unavailable(wrk))); + } + + fn add_stop_notify(&self, tx: oneshot::Sender<()>) { + self.0.stop_notify.borrow_mut().push(tx); + } + + fn stopping(&self) -> bool { + self.0.stopping.get() + } +} + +fn start_worker(mgr: ServerManager) { + let _ = ntex_rt::spawn(async move { + let id = mgr.next_id(); + let mut wrk = Worker::start(id, mgr.factory()); + + loop { + match wrk.status() { + WorkerStatus::Available => mgr.available(wrk.clone()), + WorkerStatus::Unavailable => mgr.unavailable(wrk.clone()), + WorkerStatus::Failed => { + mgr.unavailable(wrk); + sleep(RESTART_DELAY).await; + if !mgr.stopping() { + wrk = Worker::start(id, mgr.factory()); + } else { + return; + } + } + } + wrk.wait_for_status().await; + } + }); +} + +struct HandleCmdState { + next: usize, + backlog: VecDeque, + workers: Vec>, + mgr: ServerManager, +} + +impl HandleCmdState { + fn new(mgr: ServerManager) -> Self { + Self { + next: 0, + backlog: VecDeque::new(), + workers: Vec::with_capacity(mgr.0.cfg.num), + mgr, + } + } + + fn process(&mut self, mut item: F::Item) { + loop { + if !self.workers.is_empty() { + if self.next > self.workers.len() { + self.next = self.workers.len() - 1; + } + match self.workers[self.next].send(item) { + Ok(()) => { + self.next = (self.next + 1) % self.workers.len(); + break; + } + Err(i) => { + if !self.mgr.0.stopping.get() { + log::trace!("Worker failed while processing item"); + } + item = i; + self.workers.remove(self.next); + } + } + } else { + if !self.mgr.0.stopping.get() { + log::error!("No workers"); + self.backlog.push_back(item); + self.mgr.pause(); + } + break; + } + } + } + + fn update_workers(&mut self, upd: Update) { + match upd { + Update::Available(worker) => { + self.workers.push(worker); + if self.workers.len() == 1 { + self.mgr.resume(); + } else { + self.workers.sort(); + } + } + Update::Unavailable(worker) => { + if let Ok(idx) = self.workers.binary_search(&worker) { + self.workers.remove(idx); + } + if self.workers.is_empty() { + self.mgr.pause(); + } + } + } + // handle backlog + if !self.backlog.is_empty() && !self.workers.is_empty() { + while let Some(item) = self.backlog.pop_front() { + // handle worker failure + if let Err(item) = self.workers[0].send(item) { + self.backlog.push_back(item); + self.workers.remove(0); + break; + } + } + } + } + + async fn stop(&mut self, graceful: bool, completion: Option>) { + self.mgr.0.stopping.set(true); + + // stop server + self.mgr.0.factory.stop().await; + + // stop workers + if !self.workers.is_empty() { + let timeout = self.mgr.0.cfg.shutdown_timeout; + + if graceful && !timeout.is_zero() { + let futs: Vec<_> = self + .workers + .iter() + .map(|worker| worker.stop(timeout)) + .collect(); + + let _ = join_all(futs).await; + } else { + self.workers.iter().for_each(|worker| { + let _ = worker.stop(Millis::ZERO); + }); + } + } + + // notify sender + if let Some(tx) = completion { + let _ = tx.send(()); + } + + let notify = std::mem::take(&mut *self.mgr.0.stop_notify.borrow_mut()); + for tx in notify { + let _ = tx.send(()); + } + + // stop system if server was spawned + if self.mgr.0.cfg.stop_runtime { + sleep(STOP_DELAY).await; + System::current().stop(); + } + } +} + +struct DropHandle(ServerManager); + +impl Drop for DropHandle { + fn drop(&mut self) { + self.0 .0.stopping.set(true); + self.0 .0.factory.terminate(); + } +} + +async fn handle_cmd( + mgr: ServerManager, + rx: Receiver>, +) { + let _drop_hnd = DropHandle(mgr.clone()); + let mut state = HandleCmdState::new(mgr); + + loop { + let item = if let Ok(item) = rx.recv().await { + item + } else { + return; + }; + match item { + ServerCommand::Item(item) => state.process(item), + ServerCommand::Worker(upd) => state.update_workers(upd), + ServerCommand::Pause(tx) => { + state.mgr.pause(); + let _ = tx.send(()); + } + ServerCommand::Resume(tx) => { + state.mgr.resume(); + let _ = tx.send(()); + } + ServerCommand::NotifyStopped(tx) => state.mgr.add_stop_notify(tx), + ServerCommand::Stop { + graceful, + completion, + } => { + state.stop(graceful, completion).await; + return; + } + ServerCommand::Signal(sig) => { + // Signals support + // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop ntex system + match sig { + Signal::Int => { + log::info!("SIGINT received, exiting"); + state.stop(false, None).await; + return; + } + Signal::Term => { + log::info!("SIGTERM received, stopping"); + state.stop(true, None).await; + return; + } + Signal::Quit => { + log::info!("SIGQUIT received, exiting"); + state.stop(false, None).await; + return; + } + _ => (), + } + } + } + } +} diff --git a/ntex-server/src/pool.rs b/ntex-server/src/pool.rs new file mode 100644 index 00000000..229ea8ba --- /dev/null +++ b/ntex-server/src/pool.rs @@ -0,0 +1,75 @@ +use ntex_util::time::Millis; + +use crate::{Server, ServerConfiguration}; + +const DEFAULT_SHUTDOWN_TIMEOUT: Millis = Millis::from_secs(30); + +#[derive(Debug, Copy, Clone)] +/// Server builder +pub struct WorkerPool { + pub(crate) num: usize, + pub(crate) no_signals: bool, + pub(crate) stop_runtime: bool, + pub(crate) shutdown_timeout: Millis, +} + +impl Default for WorkerPool { + fn default() -> Self { + Self::new() + } +} + +impl WorkerPool { + /// Create new Server builder instance + pub fn new() -> Self { + WorkerPool { + num: std::thread::available_parallelism() + .map_or(2, std::num::NonZeroUsize::get), + no_signals: false, + stop_runtime: false, + shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT, + } + } + + /// Set number of workers to start. + /// + /// By default server uses number of available logical cpu as workers + /// count. + pub fn workers(mut self, num: usize) -> Self { + self.num = num; + self + } + + /// Stop current ntex runtime when manager get dropped. + /// + /// By default "stop runtime" is disabled. + pub fn stop_runtime(mut self) -> Self { + self.stop_runtime = true; + self + } + + /// Disable signal handling. + /// + /// By default signal handling is enabled. + pub fn disable_signals(mut self) -> Self { + self.no_signals = true; + self + } + + /// Timeout for graceful workers shutdown. + /// + /// After receiving a stop signal, workers have this much time to finish + /// serving requests. Workers still alive after the timeout are force + /// dropped. + /// + /// By default shutdown timeout sets to 30 seconds. + pub fn shutdown_timeout>(mut self, timeout: T) -> Self { + self.shutdown_timeout = timeout.into(); + self + } + + /// Starts processing incoming items and return server controller. + pub fn run(self, factory: F) -> Server { + crate::manager::ServerManager::start(self, factory) + } +} diff --git a/ntex-server/src/server.rs b/ntex-server/src/server.rs new file mode 100644 index 00000000..9d011f61 --- /dev/null +++ b/ntex-server/src/server.rs @@ -0,0 +1,113 @@ +use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc}; +use std::task::{ready, Context, Poll}; +use std::{future::Future, io, pin::Pin}; + +use async_channel::Sender; + +use crate::{manager::ServerCommand, signals::Signal}; + +#[derive(Debug)] +pub(crate) struct ServerShared { + pub(crate) paused: AtomicBool, +} + +/// Server controller +#[derive(Debug)] +pub struct Server { + shared: Arc, + cmd: Sender>, + stop: Option>, +} + +impl Server { + pub(crate) fn new(cmd: Sender>, shared: Arc) -> Self { + Server { + cmd, + shared, + stop: None, + } + } + + pub(crate) fn signal(&self, sig: Signal) { + let _ = self.cmd.try_send(ServerCommand::Signal(sig)); + } + + /// Send item to worker pool + pub fn process(&mut self, item: T) -> Result<(), T> { + if self.shared.paused.load(Ordering::Acquire) { + Err(item) + } else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) { + match e.into_inner() { + ServerCommand::Item(item) => Err(item), + _ => panic!(), + } + } else { + Ok(()) + } + } + + /// Pause accepting incoming connections + /// + /// If socket contains some pending connection, they might be dropped. + /// All opened connection remains active. + pub fn pause(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd.try_send(ServerCommand::Pause(tx)); + async move { + let _ = rx.await; + } + } + + /// Resume accepting incoming connections + pub fn resume(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd.try_send(ServerCommand::Resume(tx)); + async move { + let _ = rx.await; + } + } + + /// Stop incoming connection processing, stop all workers and exit. + /// + /// If server starts with `spawn()` method, then spawned thread get terminated. + pub fn stop(&self, graceful: bool) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd.try_send(ServerCommand::Stop { + graceful, + completion: Some(tx), + }); + async move { + let _ = rx.await; + } + } +} + +impl Clone for Server { + fn clone(&self) -> Self { + Self { + cmd: self.cmd.clone(), + shared: self.shared.clone(), + stop: None, + } + } +} + +impl Future for Server { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + if this.stop.is_none() { + let (tx, rx) = oneshot::channel(); + if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() { + return Poll::Ready(Ok(())); + } + this.stop = Some(rx); + } + + let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx)); + + Poll::Ready(Ok(())) + } +} diff --git a/ntex-server/src/signals.rs b/ntex-server/src/signals.rs new file mode 100644 index 00000000..14f6b8cd --- /dev/null +++ b/ntex-server/src/signals.rs @@ -0,0 +1,70 @@ +use std::thread; + +use crate::server::Server; + +/// Different types of process signals +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub(crate) enum Signal { + /// SIGHUP + Hup, + /// SIGINT + Int, + /// SIGTERM + Term, + /// SIGQUIT + Quit, +} + +#[cfg(target_family = "unix")] +/// Register signal handler. +/// +/// Signals are handled by oneshots, you have to re-register +/// after each signal. +pub(crate) fn start(srv: Server) { + let _ = thread::Builder::new() + .name("ntex-server signals".to_string()) + .spawn(move || { + use signal_hook::consts::signal::*; + use signal_hook::iterator::Signals; + + let sigs = vec![SIGHUP, SIGINT, SIGTERM, SIGQUIT]; + let mut signals = match Signals::new(sigs) { + Ok(signals) => signals, + Err(e) => { + log::error!("Cannot initialize signals handler: {}", e); + return; + } + }; + for info in &mut signals { + match info { + SIGHUP => srv.signal(Signal::Hup), + SIGTERM => srv.signal(Signal::Term), + SIGINT => { + srv.signal(Signal::Int); + return; + } + SIGQUIT => { + srv.signal(Signal::Quit); + return; + } + _ => {} + } + } + }); +} + +#[cfg(target_family = "windows")] +/// Register signal handler. +/// +/// Signals are handled by oneshots, you have to re-register +/// after each signal. +pub(crate) fn start(srv: Server) { + let _ = thread::Builder::new() + .name("ntex-server signals".to_string()) + .spawn(move || { + ctrlc::set_handler(move || { + srv.signal(Signal::Int); + }) + .expect("Error setting Ctrl-C handler"); + }); +} diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs new file mode 100644 index 00000000..b208aa92 --- /dev/null +++ b/ntex-server/src/wrk.rs @@ -0,0 +1,341 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{ready, Context, Poll}; +use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc}; + +use async_broadcast::{self as bus, broadcast}; +use async_channel::{unbounded, Receiver, Sender}; + +use ntex_rt::{spawn, Arbiter}; +use ntex_service::{Pipeline, ServiceFactory}; +use ntex_util::future::{select, stream_recv, Either, Stream}; +use ntex_util::time::{sleep, timeout_checked, Millis}; + +use crate::{ServerConfiguration, WorkerId, WorkerMessage}; + +const STOP_TIMEOUT: Millis = Millis::ONE_SEC; + +#[derive(Debug)] +/// Shutdown worker +struct Shutdown { + timeout: Millis, + result: oneshot::Sender, +} + +#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +/// Worker status +pub enum WorkerStatus { + Available, + #[default] + Unavailable, + Failed, +} + +#[derive(Debug)] +/// Server worker +/// +/// Worker accepts message via unbounded channel and starts processing. +pub struct Worker { + id: WorkerId, + tx1: Sender, + tx2: Sender, + avail: WorkerAvailability, + failed: Arc, +} + +impl cmp::Ord for Worker { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.id.cmp(&other.id) + } +} + +impl cmp::PartialOrd for Worker { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.id.cmp(&other.id)) + } +} + +impl hash::Hash for Worker { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl Eq for Worker {} + +impl PartialEq for Worker { + fn eq(&self, other: &Worker) -> bool { + self.id == other.id + } +} + +#[derive(Debug)] +/// Stop worker process +/// +/// Stop future resolves when worker completes processing +/// incoming items and stop arbiter +pub struct WorkerStop(oneshot::Receiver); + +impl Worker { + /// Start worker. + pub fn start(id: WorkerId, cfg: F) -> Worker + where + T: Send + 'static, + F: ServerConfiguration, + { + let (tx1, rx1) = unbounded(); + let (tx2, rx2) = unbounded(); + let (avail, avail_tx) = WorkerAvailability::create(); + + Arbiter::default().exec_fn(move || { + let _ = spawn(async move { + log::info!("Starting worker {:?}", id); + + log::debug!("Creating server instance in {:?} worker", id); + let factory = cfg.create().await; + log::debug!("Server instance has been created in {:?} worker", id); + + match create(id, rx1, rx2, factory, avail_tx).await { + Ok((svc, wrk)) => { + run_worker(svc, wrk).await; + } + Err(e) => { + log::error!("Cannot start worker: {:?}", e); + } + } + Arbiter::current().stop(); + }); + }); + + Worker { + id, + tx1, + tx2, + avail, + failed: Arc::new(AtomicBool::new(false)), + } + } + + /// Worker id. + pub fn id(&self) -> WorkerId { + self.id + } + + /// Send message to the worker. + /// + /// Returns `Ok` if message got accepted by the worker. + /// Otherwise return message back as `Err` + pub fn send(&self, msg: T) -> Result<(), T> { + self.tx1.try_send(msg).map_err(|msg| msg.into_inner()) + } + + /// Check worker status. + pub fn status(&self) -> WorkerStatus { + if self.failed.load(Ordering::Acquire) { + WorkerStatus::Failed + } else if self.avail.available() { + WorkerStatus::Available + } else { + WorkerStatus::Unavailable + } + } + + /// Wait for worker status updates + pub async fn wait_for_status(&mut self) -> WorkerStatus { + if self.failed.load(Ordering::Acquire) { + WorkerStatus::Failed + } else { + // cleanup updates + while self.avail.notify.try_recv().is_ok() {} + + if self.avail.notify.recv_direct().await.is_err() { + self.failed.store(true, Ordering::Release); + } + self.status() + } + } + + /// Stop worker. + /// + /// If timeout value is zero, force shutdown worker + pub fn stop(&self, timeout: Millis) -> WorkerStop { + let (result, rx) = oneshot::channel(); + let _ = self.tx2.try_send(Shutdown { timeout, result }); + WorkerStop(rx) + } +} + +impl Clone for Worker { + fn clone(&self) -> Self { + Worker { + id: self.id, + tx1: self.tx1.clone(), + tx2: self.tx2.clone(), + avail: self.avail.clone(), + failed: self.failed.clone(), + } + } +} + +impl Future for WorkerStop { + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match ready!(Pin::new(&mut self.0).poll(cx)) { + Ok(res) => Poll::Ready(res), + Err(_) => Poll::Ready(true), + } + } +} + +#[derive(Debug, Clone)] +struct WorkerAvailability { + notify: bus::Receiver<()>, + available: Arc, +} + +#[derive(Debug, Clone)] +struct WorkerAvailabilityTx { + notify: bus::Sender<()>, + available: Arc, +} + +impl WorkerAvailability { + fn create() -> (Self, WorkerAvailabilityTx) { + let (mut tx, rx) = broadcast(16); + tx.set_overflow(true); + + let avail = WorkerAvailability { + notify: rx, + available: Arc::new(AtomicBool::new(false)), + }; + let avail_tx = WorkerAvailabilityTx { + notify: tx, + available: avail.available.clone(), + }; + (avail, avail_tx) + } + + fn available(&self) -> bool { + self.available.load(Ordering::Acquire) + } +} + +impl WorkerAvailabilityTx { + fn set(&self, val: bool) { + let old = self.available.swap(val, Ordering::Release); + if !old && val { + let _ = self.notify.try_broadcast(()); + } + } +} + +/// Service worker +/// +/// Worker accepts message via unbounded channel and starts processing. +struct WorkerSt>> { + id: WorkerId, + rx: Pin>>, + stop: Pin>>, + factory: F, + availability: WorkerAvailabilityTx, +} + +async fn run_worker(mut svc: Pipeline, mut wrk: WorkerSt) +where + T: Send + 'static, + F: ServiceFactory> + 'static, +{ + loop { + let fut = poll_fn(|cx| { + ready!(svc.poll_ready(cx)?); + + if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) { + let fut = svc.call_static(WorkerMessage::New(item)); + let _ = spawn(async move { + let _ = fut.await; + }); + } + Poll::Ready(Ok::<(), F::Error>(())) + }); + + match select(fut, stream_recv(&mut wrk.stop)).await { + Either::Left(Ok(())) => continue, + Either::Left(Err(_)) => { + wrk.availability.set(false); + } + Either::Right(Some(Shutdown { timeout, result })) => { + wrk.availability.set(false); + + if timeout.is_zero() { + let fut = svc.call_static(WorkerMessage::ForceShutdown); + let _ = spawn(async move { + let _ = fut.await; + }); + sleep(STOP_TIMEOUT).await; + } else { + let fut = svc.call_static(WorkerMessage::Shutdown(timeout)); + let res = timeout_checked(timeout, fut).await; + let _ = result.send(res.is_ok()); + }; + poll_fn(|cx| svc.poll_shutdown(cx)).await; + + log::info!("Stopping worker {:?}", wrk.id); + return; + } + Either::Right(None) => return, + } + + loop { + match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await { + Either::Left(Ok(service)) => { + wrk.availability.set(true); + svc = Pipeline::new(service); + break; + } + Either::Left(Err(_)) => sleep(STOP_TIMEOUT).await, + Either::Right(_) => return, + } + } + } +} + +async fn create( + id: WorkerId, + rx: Receiver, + stop: Receiver, + factory: Result, + availability: WorkerAvailabilityTx, +) -> Result<(Pipeline, WorkerSt), ()> +where + T: Send + 'static, + F: ServiceFactory> + 'static, +{ + availability.set(false); + let factory = factory?; + + let rx = Box::pin(rx); + let mut stop = Box::pin(stop); + + let svc = match select(factory.create(()), stream_recv(&mut stop)).await { + Either::Left(Ok(svc)) => Pipeline::new(svc), + Either::Left(Err(_)) => return Err(()), + Either::Right(Some(Shutdown { result, .. })) => { + log::trace!("Shutdown uninitialized worker"); + let _ = result.send(false); + return Err(()); + } + Either::Right(None) => return Err(()), + }; + availability.set(true); + + Ok(( + svc, + WorkerSt { + id, + factory, + availability, + rx: Box::pin(rx), + stop: Box::pin(stop), + }, + )) +} diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 1ad72e4f..3bb1b193 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.2] - 2024-03-20 + +* Add boxed rc service factory + ## [2.0.1] - 2024-02-07 * Add fmt::Debug impl for PipelineCall diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 64a64b70..d6d3ec17 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.1.0] - 2024-03-xx + +* Added server worker's management utils + ## [1.0.1] - 2024-01-19 * Allow to lock readiness for Condition diff --git a/ntex-util/src/channel/mpsc.rs b/ntex-util/src/channel/mpsc.rs index b5cbc865..695637c2 100644 --- a/ntex-util/src/channel/mpsc.rs +++ b/ntex-util/src/channel/mpsc.rs @@ -133,7 +133,7 @@ pub struct WeakSender { } impl WeakSender { - /// Upgrade to Sender + /// Upgrade to `Sender` pub fn upgrade(&self) -> Option> { self.shared.upgrade().map(|shared| Sender { shared }) } diff --git a/ntex-util/src/lib.rs b/ntex-util/src/lib.rs index 5fecad45..4ea95c53 100644 --- a/ntex-util/src/lib.rs +++ b/ntex-util/src/lib.rs @@ -1,13 +1,16 @@ //! Utilities for ntex framework #![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)] +#[doc(hidden)] +pub use std::task::ready; + pub mod channel; pub mod future; pub mod services; pub mod task; pub mod time; -pub use futures_core::{ready, Stream}; +pub use futures_core::Stream; pub use futures_sink::Sink; pub use ntex_rt::spawn; diff --git a/ntex-util/src/time/mod.rs b/ntex-util/src/time/mod.rs index 34f6c4b9..8356397a 100644 --- a/ntex-util/src/time/mod.rs +++ b/ntex-util/src/time/mod.rs @@ -64,7 +64,7 @@ where TimeoutChecked::new_with_delay(future, dur.into()) } -/// Future returned by [`sleep`](sleep). +/// Future returned by [`sleep`]. /// /// # Examples /// @@ -126,7 +126,7 @@ impl Future for Sleep { } } -/// Future returned by [`deadline`](deadline). +/// Future returned by [`deadline`]. /// /// # Examples /// diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index ee94deb4..ada05a3d 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [1.2.0] - 2024-03-xx + +* Refactor server workers management + +* Move ntex::server to separate crate + ## [1.1.2] - 2024-03-12 * Update ntex-h2 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 408219f5..21274864 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "1.1.2" +version = "1.2.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -56,6 +56,7 @@ ntex-service = "2.0.1" ntex-macros = "0.1.3" ntex-util = "1.0.1" ntex-bytes = "0.1.24" +ntex-server = "0.1.0" ntex-h2 = "0.5.1" ntex-rt = "0.4.11" ntex-io = "1.0.1" @@ -64,7 +65,7 @@ ntex-tokio = { version = "0.4.0", optional = true } ntex-glommio = { version = "0.4.0", optional = true } ntex-async-std = { version = "0.4.0", optional = true } -async-channel = "2.1" +async-channel = "2.2" base64 = "0.22" bitflags = "2.4" log = "0.4" diff --git a/ntex/examples/echo.rs b/ntex/examples/echo.rs index abba358e..8138e3f0 100644 --- a/ntex/examples/echo.rs +++ b/ntex/examples/echo.rs @@ -4,14 +4,14 @@ use futures_util::StreamExt; use log::info; use ntex::http::header::HeaderValue; use ntex::http::{HttpService, Request, Response}; -use ntex::{server::Server, time::Seconds, util::BytesMut}; +use ntex::{time::Seconds, util::BytesMut}; #[ntex::main] async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "echo=info"); env_logger::init(); - Server::build() + ntex::server::build() .bind("echo", "127.0.0.1:8080", |_| { HttpService::build() .headers_read_rate(Seconds(1), Seconds(5), 128) diff --git a/ntex/examples/echo2.rs b/ntex/examples/echo2.rs index ce2f6af0..219b983f 100644 --- a/ntex/examples/echo2.rs +++ b/ntex/examples/echo2.rs @@ -3,7 +3,7 @@ use std::{env, io}; use futures_util::StreamExt; use log::info; use ntex::http::{header::HeaderValue, HttpService, Request, Response}; -use ntex::{server::Server, util::BytesMut}; +use ntex::util::BytesMut; async fn handle_request(mut req: Request) -> Result { let mut body = BytesMut::new(); @@ -22,7 +22,7 @@ async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "trace"); env_logger::init(); - Server::build() + ntex::server::build() .bind("echo", "127.0.0.1:8080", |_| { HttpService::build().h2(handle_request) })? diff --git a/ntex/examples/hello-world.rs b/ntex/examples/hello-world.rs index 3b1d7a0d..b6a255e9 100644 --- a/ntex/examples/hello-world.rs +++ b/ntex/examples/hello-world.rs @@ -3,14 +3,14 @@ use std::{env, io}; use log::info; use ntex::http::header::HeaderValue; use ntex::http::{HttpService, Response}; -use ntex::{server::Server, time::Seconds, util::Ready}; +use ntex::{time::Seconds, util::Ready}; #[ntex::main] async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "ntex=trace,hello_world=info"); env_logger::init(); - Server::build() + ntex::server::build() .bind("hello-world", "127.0.0.1:8080", |_| { HttpService::build() .headers_read_rate(Seconds(1), Seconds(3), 128) diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index 2f42ed73..90a62520 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -73,7 +73,7 @@ impl Connector { let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); let _ = ssl .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| error!("Cannot set ALPN protocol: {:?}", e)); + .map_err(|e| log::error!("Cannot set ALPN protocol: {:?}", e)); ssl.set_verify(tls_openssl::ssl::SslVerifyMode::NONE); diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index 25e29e9f..23520fa2 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -21,7 +21,7 @@ pub(super) async fn send_request( where B: MessageBody, { - trace!("Sending client request: {:?} {:?}", head, body.size()); + log::trace!("Sending client request: {:?} {:?}", head, body.size()); let length = body.size(); let eof = if head.as_ref().method == Method::HEAD { true diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 96209755..799e582c 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -125,7 +125,7 @@ where req: Connect, _: ServiceCtx<'_, Self>, ) -> Result { - trace!("Get connection for {:?}", req.uri); + log::trace!("Get connection for {:?}", req.uri); let inner = self.inner.clone(); let waiters = self.waiters.clone(); @@ -140,7 +140,7 @@ where match result { // use existing connection Acquire::Acquired(io, created) => { - trace!("Use existing {:?} connection for {:?}", io, req.uri); + log::trace!("Use existing {:?} connection for {:?}", io, req.uri); Ok(Connection::new( io, created, @@ -149,7 +149,7 @@ where } // open new tcp connection Acquire::Available => { - trace!("Connecting to {:?}", req.uri); + log::trace!("Connecting to {:?}", req.uri); let uri = req.uri.clone(); let (tx, rx) = waiters.borrow_mut().pool.channel(); OpenConnection::spawn(key, tx, uri, inner, &self.connector, req); @@ -161,7 +161,7 @@ where } // pool is full, wait Acquire::NotAvailable => { - trace!( + log::trace!( "Pool is full, waiting for available connections for {:?}", req.uri ); @@ -217,7 +217,7 @@ impl Waiters { let (req, tx) = waiters.front().unwrap(); // check if waiter is still alive if tx.is_canceled() { - trace!("Waiter for {:?} is gone, remove waiter", req.uri); + log::trace!("Waiter for {:?} is gone, remove waiter", req.uri); waiters.pop_front(); continue; }; @@ -338,7 +338,7 @@ where while let Some((req, tx)) = waiters.front() { // is waiter still alive if tx.is_canceled() { - trace!("Waiter for {:?} is gone, cleanup", req.uri); + log::trace!("Waiter for {:?} is gone, cleanup", req.uri); cleanup = true; waiters.pop_front(); continue; @@ -348,7 +348,7 @@ where match result { Acquire::NotAvailable => break, Acquire::Acquired(io, created) => { - trace!( + log::trace!( "Use existing {:?} connection for {:?}, wake up waiter", io, req.uri @@ -362,7 +362,7 @@ where ))); } Acquire::Available => { - trace!("Connecting to {:?} and wake up waiter", req.uri); + log::trace!("Connecting to {:?} and wake up waiter", req.uri); cleanup = true; let (connect, tx) = waiters.pop_front().unwrap(); let uri = connect.uri.clone(); @@ -446,7 +446,7 @@ where // open tcp connection match ready!(this.fut.poll(cx)) { Err(err) => { - trace!( + log::trace!( "Failed to open client connection for {:?} with error {:?}", &this.key.authority, err diff --git a/ntex/src/http/encoding/encoder.rs b/ntex/src/http/encoding/encoder.rs index 09516b66..7678a553 100644 --- a/ntex/src/http/encoding/encoder.rs +++ b/ntex/src/http/encoding/encoder.rs @@ -248,21 +248,21 @@ impl ContentEncoder { ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), Err(err) => { - trace!("Error decoding br encoding: {}", err); + log::trace!("Error decoding br encoding: {}", err); Err(err) } }, ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), Err(err) => { - trace!("Error decoding gzip encoding: {}", err); + log::trace!("Error decoding gzip encoding: {}", err); Err(err) } }, ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), Err(err) => { - trace!("Error decoding deflate encoding: {}", err); + log::trace!("Error decoding deflate encoding: {}", err); Err(err) } }, diff --git a/ntex/src/http/error.rs b/ntex/src/http/error.rs index ff87df7e..85642d6c 100644 --- a/ntex/src/http/error.rs +++ b/ntex/src/http/error.rs @@ -39,9 +39,9 @@ impl From for Response { fn from(err: T) -> Response { let resp = err.error_response(); if resp.head().status == StatusCode::INTERNAL_SERVER_ERROR { - error!("Internal Server Error: {:?}", err); + log::error!("Internal Server Error: {:?}", err); } else { - debug!("Error in response: {:?}", err); + log::debug!("Error in response: {:?}", err); } resp } diff --git a/ntex/src/http/h1/decoder.rs b/ntex/src/http/h1/decoder.rs index a8dc4a13..8ebe4cbf 100644 --- a/ntex/src/http/h1/decoder.rs +++ b/ntex/src/http/h1/decoder.rs @@ -258,7 +258,7 @@ impl MessageType for Request { } httparse::Status::Partial => { if src.len() >= MAX_BUFFER_SIZE { - trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + log::trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); return Err(DecodeError::TooLarge(src.len())); } return Ok(None); @@ -274,7 +274,7 @@ impl MessageType for Request { // disallow HTTP/1.0 POST requests that do not contain a Content-Length headers // see https://datatracker.ietf.org/doc/html/rfc1945#section-7.2.2 if ver == Version::HTTP_10 && method == Method::POST && length.is_none() { - debug!("no Content-Length specified for HTTP/1.0 POST request"); + log::debug!("no Content-Length specified for HTTP/1.0 POST request"); return Err(DecodeError::Header); } diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index 6ce47baf..cdfaaeb3 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -425,7 +425,7 @@ where } } Some(Err(e)) => { - error!("Response payload stream error: {:?}", e); + log::error!("Response payload stream error: {:?}", e); return Err(e.into()); } } diff --git a/ntex/src/http/response.rs b/ntex/src/http/response.rs index c2706a0c..faea582c 100644 --- a/ntex/src/http/response.rs +++ b/ntex/src/http/response.rs @@ -734,7 +734,7 @@ impl fmt::Debug for ResponseBuilder { fn log_error>(err: T) -> HttpError { let e = err.into(); - error!("Error in ResponseBuilder {}", e); + log::error!("Error in ResponseBuilder {}", e); e } diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index d23d6aa5..a7ea10a3 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -6,7 +6,7 @@ use coo_kie::{Cookie, CookieJar}; use crate::io::{Filter, Io}; use crate::ws::{error::WsClientError, WsClient, WsConnection}; -use crate::{rt::System, server::Server, service::ServiceFactory}; +use crate::{rt::System, service::ServiceFactory}; use crate::{time::Millis, time::Seconds, util::Bytes}; use super::client::{Client, ClientRequest, ClientResponse, Connector}; @@ -222,7 +222,7 @@ fn parts(parts: &mut Option) -> &mut Inner { pub fn server(factory: F) -> TestServer where F: Fn() -> R + Send + Clone + 'static, - R: ServiceFactory, + R: ServiceFactory + 'static, { let (tx, rx) = mpsc::channel(); @@ -232,14 +232,14 @@ where let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - tx.send((sys.system(), local_addr)).unwrap(); - - sys.run(|| { - Server::build() + let system = sys.system(); + sys.run(move || { + crate::server::build() .listen("test", tcp, move |_| factory())? .workers(1) .disable_signals() .run(); + tx.send((system, local_addr)).unwrap(); Ok(()) }) }); diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index ab77abde..e8041d71 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -10,7 +10,7 @@ rust_2018_idioms, unreachable_pub, // missing_debug_implementations, - // missing_docs, + // missing_docs )] #![allow( type_alias_bounds, @@ -21,9 +21,6 @@ clippy::new_without_default )] -#[macro_use] -extern crate log; - #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub use ntex_macros::{rt_main as main, rt_test as test}; diff --git a/ntex/src/server/accept.rs b/ntex/src/server/accept.rs index 48c7103c..0a2ccd74 100644 --- a/ntex/src/server/accept.rs +++ b/ntex/src/server/accept.rs @@ -1,12 +1,12 @@ use std::time::{Duration, Instant}; -use std::{cell::Cell, fmt, io, num::NonZeroUsize, sync::mpsc, sync::Arc, thread}; +use std::{cell::Cell, fmt, io, sync::mpsc, sync::Arc, thread}; +use std::{collections::VecDeque, num::NonZeroUsize}; use polling::{Event, Events, Poller}; use crate::{rt::System, time::sleep, time::Millis, util::Either}; -use super::socket::{Listener, SocketAddr}; -use super::worker::{Connection, WorkerClient}; +use super::socket::{Connection, Listener, SocketAddr}; use super::{Server, ServerStatus, Token}; const EXIT_TIMEOUT: Duration = Duration::from_millis(100); @@ -14,13 +14,12 @@ const ERR_TIMEOUT: Duration = Duration::from_millis(500); const ERR_SLEEP_TIMEOUT: Millis = Millis(525); #[derive(Debug)] -pub(super) enum Command { - Stop(mpsc::Sender<()>), +pub enum AcceptorCommand { + Stop(oneshot::Sender<()>), + Terminate, Pause, Resume, - Worker(WorkerClient), Timer, - WorkerAvailable, } #[derive(Debug)] @@ -33,27 +32,29 @@ struct ServerSocketInfo { } #[derive(Debug, Clone)] -pub(super) struct AcceptNotify(Arc, mpsc::Sender); +pub struct AcceptNotify(Arc, mpsc::Sender); impl AcceptNotify { - pub(super) fn new(waker: Arc, tx: mpsc::Sender) -> Self { + fn new(waker: Arc, tx: mpsc::Sender) -> Self { AcceptNotify(waker, tx) } - pub(super) fn send(&self, cmd: Command) { + pub fn send(&self, cmd: AcceptorCommand) { let _ = self.1.send(cmd); let _ = self.0.notify(); } } -pub(super) struct AcceptLoop { +/// Streamin io accept loop +pub struct AcceptLoop { notify: AcceptNotify, - inner: Option<(mpsc::Receiver, Arc, Server)>, + inner: Option<(mpsc::Receiver, Arc)>, status_handler: Option>, } impl AcceptLoop { - pub(super) fn new(srv: Server) -> AcceptLoop { + /// Create accept loop + pub fn new() -> AcceptLoop { // Create a poller instance let poll = Arc::new( Poller::new() @@ -66,45 +67,37 @@ impl AcceptLoop { AcceptLoop { notify, - inner: Some((rx, poll, srv)), + inner: Some((rx, poll)), status_handler: None, } } - pub(super) fn send(&self, msg: Command) { - self.notify.send(msg) - } - - pub(super) fn notify(&self) -> AcceptNotify { + /// Get notification api for the loop + pub fn notify(&self) -> AcceptNotify { self.notify.clone() } - pub(super) fn set_status_handler(&mut self, f: F) + pub fn set_status_handler(&mut self, f: F) where F: FnMut(ServerStatus) + Send + 'static, { self.status_handler = Some(Box::new(f)); } - pub(super) fn start( - &mut self, - socks: Vec<(Token, Listener)>, - workers: Vec, - ) { - let (rx, poll, srv) = self + /// Start accept loop + pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) { + let (rx, poll) = self .inner .take() .expect("AcceptLoop cannot be used multiple times"); - let status_handler = self.status_handler.take(); Accept::start( rx, poll, socks, srv, - workers, self.notify.clone(), - status_handler, + self.status_handler.take(), ); } } @@ -121,23 +114,21 @@ impl fmt::Debug for AcceptLoop { struct Accept { poller: Arc, - rx: mpsc::Receiver, + rx: mpsc::Receiver, sockets: Vec, - workers: Vec, srv: Server, notify: AcceptNotify, - next: usize, backpressure: bool, + backlog: VecDeque, status_handler: Option>, } impl Accept { fn start( - rx: mpsc::Receiver, + rx: mpsc::Receiver, poller: Arc, socks: Vec<(Token, Listener)>, srv: Server, - workers: Vec, notify: AcceptNotify, status_handler: Option>, ) { @@ -148,15 +139,14 @@ impl Accept { .name("ntex-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - Accept::new(rx, poller, socks, workers, srv, notify, status_handler).poll() + Accept::new(rx, poller, socks, srv, notify, status_handler).poll() }); } fn new( - rx: mpsc::Receiver, + rx: mpsc::Receiver, poller: Arc, socks: Vec<(Token, Listener)>, - workers: Vec, srv: Server, notify: AcceptNotify, status_handler: Option>, @@ -176,12 +166,11 @@ impl Accept { poller, rx, sockets, - workers, notify, srv, status_handler, - next: 0, - backpressure: false, + backpressure: true, + backlog: VecDeque::new(), } } @@ -194,12 +183,6 @@ impl Accept { fn poll(&mut self) { log::trace!("Starting server accept loop"); - // Add all sources - for (idx, info) in self.sockets.iter().enumerate() { - log::info!("Starting socket listener on {}", info.addr); - self.add_source(idx); - } - // Create storage for events let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap()); @@ -261,7 +244,7 @@ impl Accept { let notify = self.notify.clone(); System::current().arbiter().spawn(Box::pin(async move { sleep(ERR_SLEEP_TIMEOUT).await; - notify.send(Command::Timer); + notify.send(AcceptorCommand::Timer); })); } else { info.registered.set(true); @@ -300,57 +283,43 @@ impl Accept { } } - fn process_cmd(&mut self) -> Either<(), Option>> { + fn process_cmd(&mut self) -> Either<(), Option>> { loop { match self.rx.try_recv() { Ok(cmd) => match cmd { - Command::Stop(rx) => { - log::trace!("Stopping accept loop"); - for (key, info) in self.sockets.iter().enumerate() { - log::info!("Stopping socket listener on {}", info.addr); - self.remove_source(key); + AcceptorCommand::Stop(rx) => { + if !self.backpressure { + log::trace!("Stopping accept loop"); + self.backpressure(true); } - self.update_status(ServerStatus::NotReady); break Either::Right(Some(rx)); } - Command::Pause => { - log::trace!("Pausing accept loop"); - for (key, info) in self.sockets.iter().enumerate() { - log::info!("Stopping socket listener on {}", info.addr); - self.remove_source(key); + AcceptorCommand::Terminate => { + log::trace!("Stopping accept loop"); + self.backpressure(true); + break Either::Right(None); + } + AcceptorCommand::Pause => { + if !self.backpressure { + log::trace!("Pausing accept loop"); + self.backpressure(true); } - self.update_status(ServerStatus::NotReady); } - Command::Resume => { - log::trace!("Resuming accept loop"); - for (key, info) in self.sockets.iter().enumerate() { - log::info!("Resuming socket listener on {}", info.addr); - self.add_source(key); + AcceptorCommand::Resume => { + if self.backpressure { + log::trace!("Resuming accept loop"); + self.backpressure(false); } - self.update_status(ServerStatus::Ready); } - Command::Worker(worker) => { - log::trace!("Adding new worker to accept loop"); - self.backpressure(false); - self.workers.push(worker); - } - Command::Timer => { + AcceptorCommand::Timer => { self.process_timer(); } - Command::WorkerAvailable => { - log::trace!("Worker is available"); - self.backpressure(false); - } }, Err(err) => { break match err { mpsc::TryRecvError::Empty => Either::Left(()), mpsc::TryRecvError::Disconnected => { - for (key, info) in self.sockets.iter().enumerate() { - log::info!("Stopping socket listener on {}", info.addr); - self.remove_source(key); - } - + self.backpressure(true); Either::Right(None) } } @@ -366,107 +335,57 @@ impl Accept { ServerStatus::Ready }); - if self.backpressure { - if !on { - self.backpressure = false; - for (key, info) in self.sockets.iter().enumerate() { - if info.timeout.get().is_none() { - // socket with timeout will re-register itself after timeout - log::info!( - "Resuming socket listener on {} after back-pressure", - info.addr - ); - self.add_source(key); - } + if self.backpressure && !on { + // handle backlog + while let Some(msg) = self.backlog.pop_front() { + if let Err(msg) = self.srv.process(msg) { + log::trace!("Server is unavailable"); + self.backlog.push_front(msg); + return; } } - } else if on { + + // re-enable acceptors + self.backpressure = false; + for (key, info) in self.sockets.iter().enumerate() { + if info.timeout.get().is_none() { + // socket with timeout will re-register itself after timeout + log::info!( + "Resuming socket listener on {} after back-pressure", + info.addr + ); + self.add_source(key); + } + } + } else if !self.backpressure && on { self.backpressure = true; for key in 0..self.sockets.len() { // disable err timeout let info = &mut self.sockets[key]; if info.timeout.take().is_none() { - log::trace!("Enabling back-pressure for {}", info.addr); + log::info!("Stopping socket listener on {}", info.addr); self.remove_source(key); } } } } - fn accept_one(&mut self, mut msg: Connection) { - log::trace!( - "Accepting connection: {:?} bp: {}", - msg.io, - self.backpressure - ); - - if self.backpressure { - while !self.workers.is_empty() { - match self.workers[self.next].send(msg) { - Ok(_) => (), - Err(tmp) => { - log::trace!("Worker failed while processing connection"); - self.update_status(ServerStatus::WorkerFailed); - self.srv.worker_faulted(self.workers[self.next].idx); - msg = tmp; - self.workers.swap_remove(self.next); - if self.workers.is_empty() { - log::error!("No workers"); - return; - } else if self.workers.len() <= self.next { - self.next = 0; - } - continue; - } - } - self.next = (self.next + 1) % self.workers.len(); - break; - } - } else { - let mut idx = 0; - while idx < self.workers.len() { - idx += 1; - if self.workers[self.next].available() { - match self.workers[self.next].send(msg) { - Ok(_) => { - log::trace!("Sent to worker {:?}", self.next); - self.next = (self.next + 1) % self.workers.len(); - return; - } - Err(tmp) => { - log::trace!("Worker failed while processing connection"); - self.update_status(ServerStatus::WorkerFailed); - self.srv.worker_faulted(self.workers[self.next].idx); - msg = tmp; - self.workers.swap_remove(self.next); - if self.workers.is_empty() { - log::error!("No workers"); - self.backpressure(true); - return; - } else if self.workers.len() <= self.next { - self.next = 0; - } - continue; - } - } - } - self.next = (self.next + 1) % self.workers.len(); - } - // enable backpressure - log::trace!("No available workers, enable back-pressure"); - self.backpressure(true); - self.accept_one(msg); - } - } - fn accept(&mut self, token: usize) -> bool { loop { - let msg = if let Some(info) = self.sockets.get_mut(token) { + if let Some(info) = self.sockets.get_mut(token) { match info.sock.accept() { - Ok(Some(io)) => Connection { - io, - token: info.token, - }, + Ok(Some(io)) => { + let msg = Connection { + io, + token: info.token, + }; + if let Err(msg) = self.srv.process(msg) { + log::trace!("Server is unavailable"); + self.backlog.push_back(msg); + self.backpressure(true); + return false; + } + } Ok(None) => return true, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true, Err(ref e) if connection_error(e) => continue, @@ -479,16 +398,12 @@ impl Accept { let notify = self.notify.clone(); System::current().arbiter().spawn(Box::pin(async move { sleep(ERR_SLEEP_TIMEOUT).await; - notify.send(Command::Timer); + notify.send(AcceptorCommand::Timer); })); return false; } } - } else { - return false; - }; - - self.accept_one(msg); + } } } } diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index f93e310a..0f501846 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -1,40 +1,25 @@ -use std::{fmt, future::Future, io, marker, mem, net, pin::Pin, task::Context, task::Poll}; +use std::{fmt, future::Future, io, net}; -use async_channel::unbounded; -use log::{error, info}; +use ntex_server::{Server, WorkerPool}; use socket2::{Domain, SockAddr, Socket, Type}; -use crate::rt::{spawn, Signal, System}; -use crate::time::{sleep, Millis}; -use crate::{io::Io, service::ServiceFactory, util::join_all, util::Stream}; +use crate::service::ServiceFactory; +use crate::{io::Io, time::Millis}; -use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::config::{ - Config, ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime, -}; -use super::service::{Factory, InternalServiceFactory}; -use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; -use super::{socket::Listener, Server, ServerCommand, ServerStatus, Token}; - -const STOP_DELAY: Millis = Millis(300); - -type ServerStream = Pin>>; +use super::accept::AcceptLoop; +use super::config::{Config, ServiceConfig}; +use super::factory::{self, FactoryServiceType, OnWorkerStart, OnWorkerStartWrapper}; +use super::{socket::Listener, Connection, ServerStatus, StreamServer, Token}; /// Server builder pub struct ServerBuilder { - threads: usize, token: Token, backlog: i32, - workers: Vec<(usize, WorkerClient)>, - services: Vec>, + services: Vec, sockets: Vec<(Token, String, Listener)>, + on_worker_start: Vec>, accept: AcceptLoop, - exit: bool, - shutdown_timeout: Millis, - no_signals: bool, - cmd: ServerStream, - server: Server, - notify: Vec>, + pool: WorkerPool, } impl Default for ServerBuilder { @@ -46,24 +31,14 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded(); - let server = Server::new(tx); - ServerBuilder { - threads: std::thread::available_parallelism() - .map_or(2, std::num::NonZeroUsize::get), token: Token(0), - workers: Vec::new(), services: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(server.clone()), + on_worker_start: Vec::new(), + accept: AcceptLoop::new(), backlog: 2048, - exit: false, - shutdown_timeout: Millis::from_secs(30), - no_signals: false, - cmd: Box::pin(rx), - notify: Vec::new(), - server, + pool: WorkerPool::new(), } } @@ -72,7 +47,7 @@ impl ServerBuilder { /// By default server uses number of available logical cpu as workers /// count. pub fn workers(mut self, num: usize) -> Self { - self.threads = num; + self.pool = self.pool.workers(num); self } @@ -98,7 +73,7 @@ impl ServerBuilder { /// /// By default max connections is set to a 25k per worker. pub fn maxconn(self, num: usize) -> Self { - worker::max_concurrent_connections(num); + super::max_concurrent_connections(num); self } @@ -106,7 +81,7 @@ impl ServerBuilder { /// /// By default "stop runtime" is disabled. pub fn stop_runtime(mut self) -> Self { - self.exit = true; + self.pool = self.pool.stop_runtime(); self } @@ -114,7 +89,7 @@ impl ServerBuilder { /// /// By default signal handling is enabled. pub fn disable_signals(mut self) -> Self { - self.no_signals = true; + self.pool = self.pool.disable_signals(); self } @@ -126,7 +101,7 @@ impl ServerBuilder { /// /// By default shutdown timeout sets to 30 seconds. pub fn shutdown_timeout>(mut self, timeout: T) -> Self { - self.shutdown_timeout = timeout.into(); + self.pool = self.pool.shutdown_timeout(timeout); self } @@ -150,19 +125,14 @@ impl ServerBuilder { where F: Fn(&mut ServiceConfig) -> io::Result<()>, { - let mut cfg = ServiceConfig::new(self.threads, self.backlog); + let mut cfg = ServiceConfig::new(self.token, self.backlog); f(&mut cfg)?; - let mut cfg = cfg.0.borrow_mut(); - let mut srv = ConfiguredService::new(cfg.apply.take().unwrap()); - for (name, lst, tag) in mem::take(&mut cfg.services) { - let token = self.token.next(); - srv.stream(token, name.clone(), lst.local_addr()?, tag); - self.sockets.push((token, name, Listener::from_tcp(lst))); - } - self.services.push(Box::new(srv)); - self.threads = cfg.threads; + let (token, sockets, factory) = cfg.into_factory(); + self.token = token; + self.sockets.extend(sockets); + self.services.push(factory); Ok(self) } @@ -177,20 +147,14 @@ impl ServerBuilder { F: Fn(ServiceConfig) -> R, R: Future>, { - let cfg = ServiceConfig::new(self.threads, self.backlog); - let inner = cfg.0.clone(); + let cfg = ServiceConfig::new(self.token, self.backlog); - f(cfg).await?; + f(cfg.clone()).await?; - let mut cfg = inner.borrow_mut(); - let mut srv = ConfiguredService::new(cfg.apply.take().unwrap()); - for (name, lst, tag) in mem::take(&mut cfg.services) { - let token = self.token.next(); - srv.stream(token, name.clone(), lst.local_addr()?, tag); - self.sockets.push((token, name, Listener::from_tcp(lst))); - } - self.services.push(Box::new(srv)); - self.threads = cfg.threads; + let (token, sockets, factory) = cfg.into_factory(); + self.token = token; + self.sockets.extend(sockets); + self.services.push(factory); Ok(self) } @@ -201,44 +165,38 @@ impl ServerBuilder { /// It get executed in the worker thread. pub fn on_worker_start(mut self, f: F) -> Self where - F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, + F: Fn() -> R + Send + Clone + 'static, R: Future> + 'static, E: fmt::Display + 'static, { - self.services - .push(Box::new(ConfiguredService::new(Box::new(ConfigWrapper { - f, - _t: marker::PhantomData, - })))); + self.on_worker_start.push(OnWorkerStartWrapper::create(f)); self } /// Add new service to the server. - pub fn bind, R>( - mut self, - name: N, - addr: U, - factory: F, - ) -> io::Result + pub fn bind(mut self, name: N, addr: U, factory: F) -> io::Result where U: net::ToSocketAddrs, + N: AsRef, F: Fn(Config) -> R + Send + Clone + 'static, - R: ServiceFactory, + R: ServiceFactory + 'static, { let sockets = bind_addr(addr, self.backlog)?; + let mut tokens = Vec::new(); for lst in sockets { let token = self.token.next(); - self.services.push(Factory::create( - name.as_ref().to_string(), - token, - factory.clone(), - lst.local_addr()?, - "", - )); self.sockets .push((token, name.as_ref().to_string(), Listener::from_tcp(lst))); + tokens.push((token, "")); } + + self.services.push(factory::create_factory_service( + name.as_ref().to_string(), + tokens, + factory, + )); + Ok(self) } @@ -249,7 +207,7 @@ impl ServerBuilder { N: AsRef, U: AsRef, F: Fn(Config) -> R + Send + Clone + 'static, - R: ServiceFactory, + R: ServiceFactory + 'static, { use std::os::unix::net::UnixListener; @@ -278,17 +236,13 @@ impl ServerBuilder { ) -> io::Result where F: Fn(Config) -> R + Send + Clone + 'static, - R: ServiceFactory, + R: ServiceFactory + 'static, { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; let token = self.token.next(); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(Factory::create( + self.services.push(factory::create_factory_service( name.as_ref().to_string(), - token, + vec![(token, "")], factory, - addr, - "", )); self.sockets .push((token, name.as_ref().to_string(), Listener::from_uds(lst))); @@ -304,15 +258,13 @@ impl ServerBuilder { ) -> io::Result where F: Fn(Config) -> R + Send + Clone + 'static, - R: ServiceFactory, + R: ServiceFactory + 'static, { let token = self.token.next(); - self.services.push(Factory::create( + self.services.push(factory::create_factory_service( name.as_ref().to_string(), - token, + vec![(token, "")], factory, - lst.local_addr()?, - "", )); self.sockets .push((token, name.as_ref().to_string(), Listener::from_tcp(lst))); @@ -343,208 +295,28 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> Server { + pub fn run(self) -> Server { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { - info!("Starting {} workers", self.threads); - - // start workers - let mut workers = Vec::new(); - for idx in 0..self.threads { - let worker = self.start_worker(idx, self.accept.notify()); - workers.push(worker.clone()); - self.workers.push((idx, worker)); - } - - // start accept thread - for sock in &self.sockets { - info!("Starting \"{}\" service on {}", sock.1, sock.2); - } - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - workers, + let srv = StreamServer::new( + self.accept.notify(), + self.services, + self.on_worker_start, ); + let svc = self.pool.run(srv); - // handle signals - if !self.no_signals { - spawn(signals(self.server.clone())); - } + let sockets = self + .sockets + .into_iter() + .map(|sock| { + log::info!("Starting \"{}\" service on {}", sock.1, sock.2); + (sock.0, sock.2) + }) + .collect(); + self.accept.start(sockets, svc.clone()); - // start http server actor - let server = self.server.clone(); - spawn(self); - server - } - } - - fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { - let avail = WorkerAvailability::new(notify); - let services: Vec> = - self.services.iter().map(|v| v.clone_factory()).collect(); - - Worker::start(idx, services, avail, self.shutdown_timeout) - } - - fn handle_cmd(&mut self, item: ServerCommand) { - match item { - ServerCommand::Pause(tx) => { - self.accept.send(Command::Pause); - let _ = tx.send(()); - } - ServerCommand::Resume(tx) => { - self.accept.send(Command::Resume); - let _ = tx.send(()); - } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop ntex system - match sig { - Signal::Int => { - info!("SIGINT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - Signal::Term => { - info!("SIGTERM received, stopping"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - Signal::Quit => { - info!("SIGQUIT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - _ => (), - } - } - ServerCommand::Notify(tx) => { - self.notify.push(tx); - } - ServerCommand::Stop { - graceful, - completion, - } => { - let exit = self.exit; - - // stop accept thread - let (tx, rx) = std::sync::mpsc::channel(); - self.accept.send(Command::Stop(tx)); - let _ = rx.recv(); - - let notify = std::mem::take(&mut self.notify); - - // stop workers - if !self.workers.is_empty() && graceful { - let futs: Vec<_> = self - .workers - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect(); - spawn(async move { - let _ = join_all(futs).await; - - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } - if exit { - sleep(STOP_DELAY).await; - System::current().stop(); - } - }); - } else { - self.workers.iter().for_each(move |worker| { - worker.1.stop(false); - }); - - // we need to stop system if server was spawned - if self.exit { - spawn(async { - sleep(STOP_DELAY).await; - System::current().stop(); - }); - } - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } - } - } - ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.workers.len(); - 'found: loop { - for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let worker = self.start_worker(new_idx, self.accept.notify()); - self.workers.push((new_idx, worker.clone())); - self.accept.send(Command::Worker(worker)); - } - } - } - } -} - -impl Future for ServerBuilder { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.cmd).poll_next(cx) { - Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), - Poll::Ready(None) => return Poll::Pending, - Poll::Pending => return Poll::Pending, - } - } - } -} - -async fn signals(srv: Server) { - loop { - if let Some(rx) = crate::rt::signal() { - if let Ok(sig) = rx.await { - srv.signal(sig); - } else { - return; - } - } else { - log::info!("Signals are not supported by current runtime"); - return; + svc } } } diff --git a/ntex/src/server/config.rs b/ntex/src/server/config.rs index c68f4fb6..0b040d9f 100644 --- a/ntex/src/server/config.rs +++ b/ntex/src/server/config.rs @@ -1,15 +1,13 @@ use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc}; -use log::error; - use crate::io::Io; -use crate::service::{self, boxed, ServiceFactory as NServiceFactory}; +use crate::service::{IntoServiceFactory, ServiceFactory}; use crate::util::{BoxFuture, HashMap, PoolId, Ready}; -use super::service::{ - BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, +use super::factory::{ + self, BoxServerService, FactoryService, FactoryServiceType, NetService, }; -use super::{builder::bind_addr, counter::CounterGuard, Token}; +use super::{builder::bind_addr, socket::Listener, Token}; #[derive(Clone, Debug)] pub struct Config(Rc); @@ -41,31 +39,30 @@ impl Config { } } -#[derive(Debug)] +#[derive(Clone)] pub struct ServiceConfig(pub(super) Rc>); -#[derive(Debug)] +struct Socket { + name: String, + sockets: Vec<(Token, Listener, &'static str)>, +} + pub(super) struct ServiceConfigInner { - pub(super) services: Vec<(String, net::TcpListener, &'static str)>, - pub(super) apply: Option>, - pub(super) threads: usize, - pub(super) backlog: i32, - applied: bool, + token: Token, + apply: Option>, + sockets: Vec, + backlog: i32, } impl ServiceConfig { - pub(super) fn new(threads: usize, backlog: i32) -> Self { + pub(super) fn new(token: Token, backlog: i32) -> Self { ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner { - threads, + token, backlog, - services: Vec::new(), - applied: false, - apply: Some(Box::new(ConfigWrapper { - f: |_| { - not_configured(); - Ready::Ok::<_, &'static str>(()) - }, - _t: marker::PhantomData, + sockets: Vec::new(), + apply: Some(OnWorkerStartWrapper::create(|_| { + not_configured(); + Ready::Ok::<_, &str>(()) })), }))) } @@ -75,39 +72,41 @@ impl ServiceConfig { where U: net::ToSocketAddrs, { - let sockets = bind_addr(addr, self.0.borrow().backlog)?; + let mut inner = self.0.borrow_mut(); - for lst in sockets { - self.listen(name.as_ref(), lst); - } + let sockets = bind_addr(addr, inner.backlog)?; + let socket = Socket { + name: name.as_ref().to_string(), + sockets: sockets + .into_iter() + .map(|lst| (inner.token.next(), Listener::from_tcp(lst), "")) + .collect(), + }; + inner.sockets.push(socket); Ok(self) } /// Add new service to the server. pub fn listen>(&self, name: N, lst: net::TcpListener) -> &Self { - { - let mut inner = self.0.borrow_mut(); - if !inner.applied { - inner.apply = Some(Box::new(ConfigWrapper { - f: |_| { - not_configured(); - Ready::Ok::<_, &'static str>(()) - }, - _t: marker::PhantomData, - })); - } - inner.services.push((name.as_ref().to_string(), lst, "")); - } + let mut inner = self.0.borrow_mut(); + let socket = Socket { + name: name.as_ref().to_string(), + sockets: vec![(inner.token.next(), Listener::from_tcp(lst), "")], + }; + inner.sockets.push(socket); + self } /// Set io tag for configured service. pub fn set_tag>(&self, name: N, tag: &'static str) -> &Self { let mut inner = self.0.borrow_mut(); - for svc in &mut inner.services { - if svc.0 == name.as_ref() { - svc.2 = tag; + for sock in &mut inner.sockets { + if sock.name == name.as_ref() { + for item in &mut sock.sockets { + item.2 = tag; + } } } self @@ -117,179 +116,115 @@ impl ServiceConfig { /// /// This function get called during worker runtime configuration stage. /// It get executed in the worker thread. - pub fn on_worker_start(&self, f: F) -> io::Result<()> + pub fn on_worker_start(&self, f: F) -> &Self where F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, R: Future> + 'static, E: fmt::Display + 'static, { - self.0.borrow_mut().applied = true; - self.0.borrow_mut().apply = Some(Box::new(ConfigWrapper { - f, - _t: marker::PhantomData, - })); - Ok(()) + self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f)); + self } -} -pub(super) struct ConfiguredService { - rt: Box, - names: HashMap, - topics: HashMap, - services: Vec, -} + pub(super) fn into_factory( + self, + ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) { + let mut inner = self.0.borrow_mut(); -impl ConfiguredService { - pub(super) fn new(rt: Box) -> Self { - ConfiguredService { - rt, - names: HashMap::default(), - topics: HashMap::default(), - services: Vec::new(), + let mut sockets = Vec::new(); + let mut names = HashMap::default(); + for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() { + names.insert( + s.name.clone(), + Entry { + idx, + pool: PoolId::DEFAULT, + tokens: s + .sockets + .iter() + .map(|(token, _, tag)| (*token, *tag)) + .collect(), + }, + ); + + sockets.extend( + s.sockets + .into_iter() + .map(|(token, lst, _)| (token, s.name.clone(), lst)), + ); } - } - pub(super) fn stream( - &mut self, - token: Token, - name: String, - addr: net::SocketAddr, - tag: &'static str, - ) { - self.names.insert(token, (name.clone(), addr)); - self.topics.insert(name, (token, tag)); - self.services.push(token); + ( + inner.token, + sockets, + Box::new(ConfiguredService { + rt: inner.apply.take().unwrap(), + names, + }), + ) } } -impl InternalServiceFactory for ConfiguredService { - fn name(&self, token: Token) -> &str { - &self.names[&token].0 - } +struct ConfiguredService { + rt: Box, + names: HashMap, +} - fn set_tag(&mut self, token: Token, tag: &'static str) { - for item in self.topics.values_mut() { - if item.0 == token { - item.1 = tag; - } - } - } - - fn clone_factory(&self) -> Box { +impl FactoryService for ConfiguredService { + fn clone_factory(&self) -> FactoryServiceType { Box::new(Self { rt: self.rt.clone(), names: self.names.clone(), - topics: self.topics.clone(), - services: self.services.clone(), }) } - fn create(&self) -> BoxFuture<'static, Result, ()>> { + fn create(&self) -> BoxFuture<'static, Result, ()>> { // configure services - let rt = ServiceRuntime::new(self.topics.clone()); - let cfg_fut = self.rt.configure(ServiceRuntime(rt.0.clone())); - let mut names = self.names.clone(); - let tokens = self.services.clone(); + let rt = ServiceRuntime::new(self.names.clone()); + let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone())); // construct services Box::pin(async move { cfg_fut.await?; rt.validate(); + let names = mem::take(&mut rt.0.borrow_mut().names); let mut services = mem::take(&mut rt.0.borrow_mut().services); - // TODO: Proper error handling here - let onstart = mem::take(&mut rt.0.borrow_mut().onstart); - for f in onstart.into_iter() { - f.await; - } - let mut res = vec![]; - for token in tokens { - if let Some(srv) = services.remove(&token) { - let newserv = srv.create(()); - match newserv.await { - Ok(serv) => { - res.push((token, serv)); - } - Err(_) => { - error!("Cannot construct service"); - return Err(()); - } + + let mut res = Vec::new(); + while let Some(Some(svc)) = services.pop() { + for entry in names.values() { + if entry.idx == services.len() { + res.push(NetService { + pool: entry.pool, + tokens: entry.tokens.clone(), + factory: svc, + }); + break; } - } else { - let name = names.remove(&token).unwrap().0; - res.push(( - token, - boxed::service(StreamService::new( - service::fn_service(move |_: Io| { - error!("Service {:?} is not configured", name); - Ready::<_, ()>::Ok(()) - }), - "UNKNOWN", - PoolId::P0, - )), - )); - }; + } } Ok(res) }) } } -pub(super) trait ServiceRuntimeConfiguration: fmt::Debug { - fn clone(&self) -> Box; - - fn configure(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>; -} - -pub(super) struct ConfigWrapper { - pub(super) f: F, - pub(super) _t: marker::PhantomData<(R, E)>, -} - -// SAFETY: we dont store R or E in ConfigWrapper -unsafe impl Send for ConfigWrapper {} - -impl fmt::Debug for ConfigWrapper { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ConfigWrapper") - .field("f", &std::any::type_name::()) - .finish() - } -} - -impl ServiceRuntimeConfiguration for ConfigWrapper -where - F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, - R: Future> + 'static, - E: fmt::Display + 'static, -{ - fn clone(&self) -> Box { - Box::new(ConfigWrapper { - f: self.f.clone(), - _t: marker::PhantomData, - }) - } - - fn configure(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> { - let f = self.f.clone(); - Box::pin(async move { - (f)(rt).await.map_err(|e| { - error!("Cannot configure service: {}", e); - }) - }) - } -} - fn not_configured() { - error!("Service is not configured"); + log::error!("Service is not configured"); } pub struct ServiceRuntime(Rc>); +#[derive(Debug, Clone)] +struct Entry { + idx: usize, + pool: PoolId, + tokens: Vec<(Token, &'static str)>, +} + struct ServiceRuntimeInner { - names: HashMap, - services: HashMap, - onstart: Vec>, + names: HashMap, + services: Vec>, } impl fmt::Debug for ServiceRuntime { @@ -298,25 +233,23 @@ impl fmt::Debug for ServiceRuntime { f.debug_struct("ServiceRuntimer") .field("names", &inner.names) .field("services", &inner.services) - .field("onstart", &inner.onstart.len()) .finish() } } impl ServiceRuntime { - fn new(names: HashMap) -> Self { + fn new(names: HashMap) -> Self { ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner { + services: (0..names.len()).map(|_| None).collect(), names, - services: HashMap::default(), - onstart: Vec::new(), }))) } fn validate(&self) { let inner = self.0.as_ref().borrow(); for (name, item) in &inner.names { - if !inner.services.contains_key(&item.0) { - error!("Service {:?} is not configured", name); + if inner.services[item.idx].is_none() { + log::error!("Service {:?} is not configured", name); } } } @@ -327,8 +260,8 @@ impl ServiceRuntime { /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. pub fn service(&self, name: &str, service: F) where - F: service::IntoServiceFactory, - T: service::ServiceFactory + 'static, + F: IntoServiceFactory, + T: ServiceFactory + 'static, T::Service: 'static, T::InitError: fmt::Debug, { @@ -341,73 +274,73 @@ impl ServiceRuntime { /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. pub fn service_in(&self, name: &str, pool: PoolId, service: F) where - F: service::IntoServiceFactory, - T: service::ServiceFactory + 'static, + F: IntoServiceFactory, + T: ServiceFactory + 'static, T::Service: 'static, T::InitError: fmt::Debug, { let mut inner = self.0.borrow_mut(); - if let Some((token, tag)) = inner.names.get(name) { - let token = *token; - let tag = *tag; - inner.services.insert( - token, - boxed::factory(ServiceFactory { - tag, - pool, - inner: service.into_factory(), - }), - ); + if let Some(entry) = inner.names.get_mut(name) { + let idx = entry.idx; + entry.pool = pool; + inner.services[idx] = Some(factory::create_boxed_factory( + name.to_string(), + service.into_factory(), + )); } else { panic!("Unknown service: {:?}", name); } } - - /// Execute future before services initialization. - pub fn on_start(&self, fut: F) - where - F: Future + 'static, - { - self.0.borrow_mut().onstart.push(Box::pin(fut)) - } } -type BoxServiceFactory = service::boxed::BoxServiceFactory< - (), - (Option, ServerMessage), - (), - (), - (), ->; +trait OnWorkerStart: Send { + fn clone(&self) -> Box; -struct ServiceFactory { - inner: T, - tag: &'static str, - pool: PoolId, + fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>; } -impl service::ServiceFactory<(Option, ServerMessage)> for ServiceFactory +struct OnWorkerStartWrapper { + pub(super) f: F, + pub(super) _t: marker::PhantomData<(R, E)>, +} + +impl OnWorkerStartWrapper where - T: service::ServiceFactory, - T::Service: 'static, - T::Error: 'static, - T::InitError: fmt::Debug + 'static, + F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, + R: Future> + 'static, + E: fmt::Display + 'static, { - type Response = (); - type Error = (); - type InitError = (); - type Service = BoxedServerService; - - async fn create(&self, _: ()) -> Result { - let tag = self.tag; - let pool = self.pool; - - match self.inner.create(()).await { - Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))), - Err(e) => { - error!("Cannot construct service: {:?}", e); - Err(()) - } - } + pub(super) fn create(f: F) -> Box { + Box::new(Self { + f, + _t: marker::PhantomData, + }) + } +} + +// SAFETY: Send cannot be provided authomatically because of R param +// but R always get executed in one thread and never leave it +unsafe impl Send for OnWorkerStartWrapper where F: Send {} + +impl OnWorkerStart for OnWorkerStartWrapper +where + F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, + R: Future> + 'static, + E: fmt::Display + 'static, +{ + fn clone(&self) -> Box { + Box::new(Self { + f: self.f.clone(), + _t: marker::PhantomData, + }) + } + + fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> { + let f = self.f.clone(); + Box::pin(async move { + (f)(rt).await.map_err(|e| { + log::error!("On worker start callback failed: {}", e); + }) + }) } } diff --git a/ntex/src/server/factory.rs b/ntex/src/server/factory.rs new file mode 100644 index 00000000..9fa9c277 --- /dev/null +++ b/ntex/src/server/factory.rs @@ -0,0 +1,206 @@ +use std::task::{Context, Poll}; +use std::{fmt, future::Future, marker::PhantomData}; + +use crate::io::Io; +use crate::service::{boxed, Service, ServiceCtx, ServiceFactory}; +use crate::util::{BoxFuture, PoolId, Ready}; + +use super::{Config, Token}; + +pub(super) type BoxServerService = boxed::BoxServiceFactory<(), Io, (), (), ()>; +pub(crate) type FactoryServiceType = Box; + +pub(crate) struct NetService { + pub(crate) tokens: Vec<(Token, &'static str)>, + pub(crate) factory: BoxServerService, + pub(crate) pool: PoolId, +} + +pub(crate) trait FactoryService: Send { + fn name(&self, _: Token) -> &str { + "" + } + + fn set_tag(&mut self, _: Token, _: &'static str) {} + + fn clone_factory(&self) -> Box; + + fn create(&self) -> BoxFuture<'static, Result, ()>>; +} + +pub(crate) fn create_boxed_factory(name: String, factory: S) -> BoxServerService +where + S: ServiceFactory + 'static, +{ + boxed::factory(ServerServiceFactory { name, factory }) +} + +pub(crate) fn create_factory_service( + name: String, + tokens: Vec<(Token, &'static str)>, + factory: F, +) -> Box +where + F: Fn(Config) -> R + Send + Clone + 'static, + R: ServiceFactory + 'static, +{ + Box::new(Factory { + tokens, + name: name.clone(), + factory: move |cfg| { + Ready::Ok::<_, &'static str>(create_boxed_factory(name.clone(), (factory)(cfg))) + }, + _t: PhantomData, + }) +} + +struct Factory { + name: String, + tokens: Vec<(Token, &'static str)>, + factory: F, + _t: PhantomData<(R, E)>, +} + +impl FactoryService for Factory +where + F: Fn(Config) -> R + Send + Clone + 'static, + R: Future> + 'static, + E: fmt::Display + 'static, +{ + fn name(&self, _: Token) -> &str { + &self.name + } + + fn clone_factory(&self) -> Box { + Box::new(Self { + name: self.name.clone(), + tokens: self.tokens.clone(), + factory: self.factory.clone(), + _t: PhantomData, + }) + } + + fn set_tag(&mut self, token: Token, tag: &'static str) { + for item in &mut self.tokens { + if item.0 == token { + item.1 = tag; + } + } + } + + fn create(&self) -> BoxFuture<'static, Result, ()>> { + let cfg = Config::default(); + let pool = cfg.get_pool_id(); + let name = self.name.clone(); + let tokens = self.tokens.clone(); + let factory_fut = (self.factory)(cfg); + + Box::pin(async move { + let factory = factory_fut.await.map_err(|_| { + log::error!("Cannot create {:?} service", name); + })?; + + Ok(vec![NetService { + tokens, + factory, + pool, + }]) + }) + } +} + +struct ServerServiceFactory { + name: String, + factory: S, +} + +impl ServiceFactory for ServerServiceFactory +where + S: ServiceFactory, +{ + type Response = (); + type Error = (); + type Service = ServerService; + type InitError = (); + + async fn create(&self, _: ()) -> Result { + self.factory + .create(()) + .await + .map(|inner| ServerService { inner }) + .map_err(|_| log::error!("Cannot construct {:?} service", self.name)) + } +} + +struct ServerService { + inner: S, +} + +impl Service for ServerService +where + S: Service, +{ + type Response = (); + type Error = (); + + crate::forward_poll_shutdown!(inner); + + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(|_| ()) + } + + async fn call(&self, req: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { + ctx.call(&self.inner, req).await.map(|_| ()).map_err(|_| ()) + } +} + +// SAFETY: Send cannot be provided authomatically because of E and R params +// but R always get executed in one thread and never leave it +unsafe impl Send for Factory where F: Send {} + +pub(crate) trait OnWorkerStart { + fn clone_fn(&self) -> Box; + + fn run(&self) -> BoxFuture<'static, Result<(), ()>>; +} + +pub(super) struct OnWorkerStartWrapper { + pub(super) f: F, + pub(super) _t: PhantomData<(R, E)>, +} + +unsafe impl Send for OnWorkerStartWrapper where F: Send {} + +impl OnWorkerStartWrapper +where + F: Fn() -> R + Send + Clone + 'static, + R: Future> + 'static, + E: fmt::Display + 'static, +{ + pub(super) fn create(f: F) -> Box { + Box::new(Self { f, _t: PhantomData }) + } +} + +impl OnWorkerStart for OnWorkerStartWrapper +where + F: Fn() -> R + Send + Clone + 'static, + R: Future> + 'static, + E: fmt::Display + 'static, +{ + fn clone_fn(&self) -> Box { + Box::new(Self { + f: self.f.clone(), + _t: PhantomData, + }) + } + + fn run(&self) -> BoxFuture<'static, Result<(), ()>> { + let f = self.f.clone(); + Box::pin(async move { + (f)().await.map_err(|e| { + log::error!("On worker start callback failed: {}", e); + }) + }) + } +} diff --git a/ntex/src/server/mod.rs b/ntex/src/server/mod.rs index 62fb3016..f24db036 100644 --- a/ntex/src/server/mod.rs +++ b/ntex/src/server/mod.rs @@ -1,16 +1,14 @@ //! General purpose tcp server -use std::{future::Future, io, pin::Pin, task::Context, task::Poll}; - -use async_channel::Sender; +use std::sync::atomic::{AtomicUsize, Ordering}; mod accept; mod builder; mod config; mod counter; +mod factory; mod service; mod socket; mod test; -mod worker; #[cfg(feature = "openssl")] pub use ntex_tls::openssl; @@ -21,10 +19,16 @@ pub use ntex_tls::rustls; pub use ntex_tls::max_concurrent_ssl_accept; pub(crate) use self::builder::create_tcp_listener; + +pub use self::accept::{AcceptLoop, AcceptNotify, AcceptorCommand}; pub use self::builder::ServerBuilder; pub use self::config::{Config, ServiceConfig, ServiceRuntime}; +pub use self::service::{ServerMessage, StreamServer}; +pub use self::socket::{Connection, Stream}; pub use self::test::{build_test_server, test_server, TestServer}; +pub type Server = ntex_server::Server; + #[non_exhaustive] #[derive(Copy, Clone, Debug, PartialEq, Eq)] /// Server readiness status @@ -36,10 +40,11 @@ pub enum ServerStatus { /// Socket id token #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -struct Token(usize); +pub struct Token(usize); impl Token { - pub(self) fn next(&mut self) -> Token { + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> Token { let token = Token(self.0); self.0 += 1; token @@ -58,103 +63,23 @@ pub enum SslError { Service(E), } -#[derive(Debug)] -enum ServerCommand { - WorkerFaulted(usize), - Pause(oneshot::Sender<()>), - Resume(oneshot::Sender<()>), - Signal(crate::rt::Signal), - /// Whether to try and shut down gracefully - Stop { - graceful: bool, - completion: Option>, - }, - /// Notify of server stop - Notify(oneshot::Sender<()>), +static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); + +thread_local! { + static MAX_CONNS_COUNTER: self::counter::Counter = + self::counter::Counter::new(MAX_CONNS.load(Ordering::Relaxed)); } -/// Server controller -#[derive(Debug)] -pub struct Server(Sender, Option>); - -impl Server { - fn new(tx: Sender) -> Self { - Server(tx, None) - } - - /// Start server building process - pub fn build() -> ServerBuilder { - ServerBuilder::default() - } - - fn signal(&self, sig: crate::rt::Signal) { - let _ = self.0.try_send(ServerCommand::Signal(sig)); - } - - fn worker_faulted(&self, idx: usize) { - let _ = self.0.try_send(ServerCommand::WorkerFaulted(idx)); - } - - /// Pause accepting incoming connections - /// - /// If socket contains some pending connection, they might be dropped. - /// All opened connection remains active. - pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.try_send(ServerCommand::Pause(tx)); - async move { - let _ = rx.await; - } - } - - /// Resume accepting incoming connections - pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.try_send(ServerCommand::Resume(tx)); - async move { - let _ = rx.await; - } - } - - /// Stop incoming connection processing, stop all workers and exit. - /// - /// If server starts with `spawn()` method, then spawned thread get terminated. - pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.try_send(ServerCommand::Stop { - graceful, - completion: Some(tx), - }); - async move { - let _ = rx.await; - } - } +/// Sets the maximum per-worker number of concurrent connections. +/// +/// All socket listeners will stop accepting connections when this limit is +/// reached for each worker. +/// +/// By default max connections is set to a 25k per worker. +pub(super) fn max_concurrent_connections(num: usize) { + MAX_CONNS.store(num, Ordering::Relaxed); } -impl Clone for Server { - fn clone(&self) -> Self { - Self(self.0.clone(), None) - } -} - -impl Future for Server { - type Output = io::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - if this.1.is_none() { - let (tx, rx) = oneshot::channel(); - if this.0.try_send(ServerCommand::Notify(tx)).is_err() { - return Poll::Ready(Ok(())); - } - this.1 = Some(rx); - } - - match Pin::new(this.1.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Ready(Err(_)) => Poll::Ready(Ok(())), - } - } +pub(super) fn num_connections() -> usize { + MAX_CONNS_COUNTER.with(|conns| conns.total()) } diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index 882b4993..1fa3575b 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -1,74 +1,170 @@ -use std::{net::SocketAddr, rc::Rc, task::Context, task::Poll}; +use std::{task::Context, task::Poll}; -use log::error; +use ntex_server::{ServerConfiguration, WorkerMessage}; +use crate::io::Io; use crate::service::{boxed, Service, ServiceCtx, ServiceFactory}; -use crate::util::{BoxFuture, Pool, PoolId, PoolRef}; -use crate::{io::Io, time::Millis}; +use crate::util::{HashMap, Pool, PoolRef}; -use super::{counter::CounterGuard, socket::Stream, Config, Token}; +use super::accept::{AcceptNotify, AcceptorCommand}; +use super::counter::Counter; +use super::factory::{FactoryServiceType, NetService, OnWorkerStart}; +use super::{socket::Connection, Token, MAX_CONNS_COUNTER}; -/// Server message -pub(super) enum ServerMessage { - /// New stream - Connect(Stream), - /// Gracefull shutdown in millis - Shutdown(Millis), - /// Force shutdown - ForceShutdown, +pub type ServerMessage = WorkerMessage; + +pub(super) type BoxService = boxed::BoxService; + +pub struct StreamServer { + notify: AcceptNotify, + services: Vec, + on_worker_start: Vec>, } -pub(super) trait StreamServiceFactory: Send + Clone + 'static { - type Factory: ServiceFactory; - - fn create(&self, _: Config) -> Self::Factory; -} - -pub(super) trait InternalServiceFactory: Send { - fn name(&self, token: Token) -> &str; - - fn set_tag(&mut self, token: Token, tag: &'static str); - - fn clone_factory(&self) -> Box; - - fn create(&self) -> BoxFuture<'static, Result, ()>>; -} - -pub(super) type BoxedServerService = - boxed::BoxService<(Option, ServerMessage), (), ()>; - -#[derive(Clone)] -pub(super) struct StreamService { - service: Rc, - tag: &'static str, - pool: Pool, - pool_ref: PoolRef, -} - -impl StreamService { - pub(crate) fn new(service: T, tag: &'static str, pid: PoolId) -> Self { - StreamService { - tag, - pool: pid.pool(), - pool_ref: pid.pool_ref(), - service: Rc::new(service), +impl StreamServer { + pub(crate) fn new( + notify: AcceptNotify, + services: Vec, + on_worker_start: Vec>, + ) -> Self { + Self { + notify, + services, + on_worker_start, } } } -impl Service<(Option, ServerMessage)> for StreamService -where - T: Service, -{ +/// Worker service factory. +impl ServerConfiguration for StreamServer { + type Item = Connection; + type Factory = StreamService; + + /// Create service factory for handling `WorkerMessage` messages. + async fn create(&self) -> Result { + // on worker start callbacks + for cb in &self.on_worker_start { + cb.run().await?; + } + + // construct services + let mut services = Vec::new(); + for svc in &self.services { + services.extend(svc.create().await?); + } + + Ok(StreamService { services }) + } + + /// Server is paused + fn paused(&self) { + self.notify.send(AcceptorCommand::Pause); + } + + /// Server is resumed + fn resumed(&self) { + self.notify.send(AcceptorCommand::Resume); + } + + /// Server is stopped + fn terminate(&self) { + self.notify.send(AcceptorCommand::Terminate); + } + + /// Server is stopped + async fn stop(&self) { + let (tx, rx) = oneshot::channel(); + self.notify.send(AcceptorCommand::Stop(tx)); + let _ = rx.await; + } +} + +impl Clone for StreamServer { + fn clone(&self) -> Self { + Self { + notify: self.notify.clone(), + services: self.services.iter().map(|s| s.clone_factory()).collect(), + on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(), + } + } +} + +pub struct StreamService { + services: Vec, +} + +impl ServiceFactory for StreamService { + type Response = (); + type Error = (); + type Service = StreamServiceImpl; + type InitError = (); + + async fn create(&self, _: ()) -> Result { + let mut tokens = HashMap::default(); + let mut services = Vec::new(); + + for info in &self.services { + match info.factory.create(()).await { + Ok(svc) => { + services.push(svc); + let idx = services.len() - 1; + for (token, tag) in &info.tokens { + tokens.insert( + *token, + (idx, *tag, info.pool.pool(), info.pool.pool_ref()), + ); + } + } + Err(_) => { + log::error!("Cannot construct service: {:?}", info.tokens); + return Err(()); + } + } + } + + let conns = MAX_CONNS_COUNTER.with(|conns| conns.priv_clone()); + + Ok(StreamServiceImpl { + tokens, + services, + conns, + }) + } +} + +pub struct StreamServiceImpl { + tokens: HashMap, + services: Vec, + conns: Counter, +} + +impl Service for StreamServiceImpl { type Response = (); type Error = (); - crate::forward_poll_shutdown!(service); - - #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready = self.service.poll_ready(cx).map_err(|_| ())?.is_ready(); - let ready = self.pool.poll_ready(cx).is_ready() && ready; + let mut ready = self.conns.available(cx); + for (idx, svc) in self.services.iter().enumerate() { + match svc.poll_ready(cx) { + Poll::Pending => ready = false, + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(_)) => { + for (idx_, tag, _, _) in self.tokens.values() { + if idx == *idx_ { + log::error!("{}: Service readiness has failed", tag); + break; + } + } + return Poll::Ready(Err(())); + } + } + } + + // check memory pools + for (_, _, pool, _) in self.tokens.values() { + ready = pool.poll_ready(cx).is_ready() && ready; + } + if ready { Poll::Ready(Ok(())) } else { @@ -76,25 +172,41 @@ where } } - async fn call( - &self, - (guard, req): (Option, ServerMessage), - ctx: ServiceCtx<'_, Self>, - ) -> Result<(), ()> { - match req { - ServerMessage::Connect(stream) => { - let stream = stream.try_into().map_err(|e| { - error!("Cannot convert to an async io stream: {}", e); - }); + fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { + let mut ready = true; + for svc in &self.services { + match svc.poll_shutdown(cx) { + Poll::Pending => ready = false, + Poll::Ready(_) => (), + } + } + if ready { + log::info!( + "Worker service shutdown, {} connections", + super::num_connections() + ); + Poll::Ready(()) + } else { + Poll::Pending + } + } - if let Ok(stream) = stream { - let stream: Io<_> = stream; - stream.set_tag(self.tag); - stream.set_memory_pool(self.pool_ref); - let _ = ctx.call(self.service.as_ref(), stream).await; + async fn call(&self, req: ServerMessage, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { + match req { + ServerMessage::New(con) => { + if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) { + let stream: Io<_> = con.io.try_into().map_err(|e| { + log::error!("Cannot convert to an async io stream: {}", e); + })?; + + stream.set_tag(tag); + stream.set_memory_pool(*pool); + let guard = self.conns.get(); + let _ = ctx.call(&self.services[*idx], stream).await; drop(guard); Ok(()) } else { + log::error!("Cannot get handler service for connection: {:?}", con); Err(()) } } @@ -102,104 +214,3 @@ where } } } - -pub(super) struct Factory { - name: String, - tag: &'static str, - inner: F, - token: Token, - addr: SocketAddr, -} - -impl Factory -where - F: StreamServiceFactory, -{ - pub(crate) fn create( - name: String, - token: Token, - inner: F, - addr: SocketAddr, - tag: &'static str, - ) -> Box { - Box::new(Self { - name, - token, - inner, - addr, - tag, - }) - } -} - -impl InternalServiceFactory for Factory -where - F: StreamServiceFactory, -{ - fn name(&self, _: Token) -> &str { - &self.name - } - - fn set_tag(&mut self, _: Token, tag: &'static str) { - self.tag = tag; - } - - fn clone_factory(&self) -> Box { - Box::new(Self { - name: self.name.clone(), - inner: self.inner.clone(), - token: self.token, - addr: self.addr, - tag: self.tag, - }) - } - - fn create(&self) -> BoxFuture<'static, Result, ()>> { - let token = self.token; - let tag = self.tag; - let cfg = Config::default(); - let pool = cfg.get_pool_id(); - let factory = self.inner.create(cfg); - - Box::pin(async move { - match factory.create(()).await { - Ok(inner) => { - let service = boxed::service(StreamService::new(inner, tag, pool)); - Ok(vec![(token, service)]) - } - Err(_) => Err(()), - } - }) - } -} - -impl InternalServiceFactory for Box { - fn name(&self, token: Token) -> &str { - self.as_ref().name(token) - } - - fn set_tag(&mut self, token: Token, tag: &'static str) { - self.as_mut().set_tag(token, tag); - } - - fn clone_factory(&self) -> Box { - self.as_ref().clone_factory() - } - - fn create(&self) -> BoxFuture<'static, Result, ()>> { - self.as_ref().create() - } -} - -impl StreamServiceFactory for F -where - F: Fn(Config) -> T + Send + Clone + 'static, - T: ServiceFactory, -{ - type Factory = T; - - #[inline] - fn create(&self, cfg: Config) -> T { - (self)(cfg) - } -} diff --git a/ntex/src/server/socket.rs b/ntex/src/server/socket.rs index 540213c1..112fb530 100644 --- a/ntex/src/server/socket.rs +++ b/ntex/src/server/socket.rs @@ -1,8 +1,14 @@ use std::{fmt, io, net}; -use crate::{io::Io, rt}; +use crate::{io::Io, rt, server::Token}; -pub(crate) enum Listener { +#[derive(Debug)] +pub struct Connection { + pub(crate) io: Stream, + pub(crate) token: Token, +} + +pub enum Listener { Tcp(net::TcpListener), #[cfg(unix)] Uds(std::os::unix::net::UnixListener), diff --git a/ntex/src/server/test.rs b/ntex/src/server/test.rs index 76a8be67..2b4eb36c 100644 --- a/ntex/src/server/test.rs +++ b/ntex/src/server/test.rs @@ -4,7 +4,7 @@ use std::{io, net, sync::mpsc, thread}; use socket2::{Domain, SockAddr, Socket, Type}; use crate::rt::{tcp_connect, System}; -use crate::server::{Server, ServerBuilder}; +use crate::server::ServerBuilder; use crate::{io::Io, service::ServiceFactory}; /// Start test server @@ -41,7 +41,7 @@ use crate::{io::Io, service::ServiceFactory}; pub fn test_server(factory: F) -> TestServer where F: Fn() -> R + Send + Clone + 'static, - R: ServiceFactory, + R: ServiceFactory + 'static, { let (tx, rx) = mpsc::channel(); @@ -53,7 +53,7 @@ where tx.send((sys.system(), local_addr)).unwrap(); sys.run(|| { - Server::build() + ServerBuilder::new() .listen("test", tcp, move |_| factory())? .set_tag("test", "TEST-SERVER") .workers(1) @@ -81,7 +81,7 @@ where tx.send(sys.system()).unwrap(); sys.run(|| { - factory(Server::build()).workers(1).disable_signals().run(); + factory(super::build()).workers(1).disable_signals().run(); Ok(()) }) }); diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs deleted file mode 100644 index ebd25ec8..00000000 --- a/ntex/src/server/worker.rs +++ /dev/null @@ -1,705 +0,0 @@ -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; - -use async_channel::{unbounded, Receiver, Sender}; - -use crate::rt::{spawn, Arbiter}; -use crate::service::Pipeline; -use crate::time::{sleep, Millis, Sleep}; -use crate::util::{ - join_all, ready, select, stream_recv, BoxFuture, Either, Stream as FutStream, -}; - -use super::accept::{AcceptNotify, Command}; -use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; -use super::{counter::Counter, socket::Stream, Token}; - -type ServerStopCommand = Pin>>; -type ServerWorkerCommand = Pin>>; - -#[derive(Debug)] -pub(super) struct WorkerCommand(Connection); - -#[derive(Debug)] -/// Stop worker message. Returns `true` on successful shutdown -/// and `false` if some connections are still alive. -pub(super) struct StopCommand { - graceful: bool, - result: oneshot::Sender, -} - -#[derive(Debug)] -pub(super) struct Connection { - pub(super) io: Stream, - pub(super) token: Token, -} - -const STOP_TIMEOUT: Millis = Millis::ONE_SEC; -static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); - -/// Sets the maximum per-worker number of concurrent connections. -/// -/// All socket listeners will stop accepting connections when this limit is -/// reached for each worker. -/// -/// By default max connections is set to a 25k per worker. -pub(super) fn max_concurrent_connections(num: usize) { - MAX_CONNS.store(num, Ordering::Relaxed); -} - -pub(super) fn num_connections() -> usize { - MAX_CONNS_COUNTER.with(|conns| conns.total()) -} - -thread_local! { - static MAX_CONNS_COUNTER: Counter = - Counter::new(MAX_CONNS.load(Ordering::Relaxed)); -} - -#[derive(Clone, Debug)] -pub(super) struct WorkerClient { - pub(super) idx: usize, - tx1: Sender, - tx2: Sender, - avail: WorkerAvailability, -} - -impl WorkerClient { - pub(super) fn new( - idx: usize, - tx1: Sender, - tx2: Sender, - avail: WorkerAvailability, - ) -> Self { - WorkerClient { - idx, - tx1, - tx2, - avail, - } - } - - pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> { - self.tx1 - .try_send(WorkerCommand(msg)) - .map_err(|msg| msg.into_inner().0) - } - - pub(super) fn available(&self) -> bool { - self.avail.available() - } - - pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (result, rx) = oneshot::channel(); - let _ = self.tx2.try_send(StopCommand { graceful, result }); - rx - } -} - -#[derive(Debug, Clone)] -pub(super) struct WorkerAvailability { - notify: AcceptNotify, - available: Arc, -} - -impl WorkerAvailability { - pub(super) fn new(notify: AcceptNotify) -> Self { - WorkerAvailability { - notify, - available: Arc::new(AtomicBool::new(false)), - } - } - - pub(super) fn available(&self) -> bool { - self.available.load(Ordering::Acquire) - } - - pub(super) fn set(&self, val: bool) { - let old = self.available.swap(val, Ordering::Release); - if !old && val { - self.notify.send(Command::WorkerAvailable) - } - } -} - -/// Service worker -/// -/// Worker accepts Socket objects via unbounded channel and starts stream -/// processing. -pub(super) struct Worker { - rx: ServerWorkerCommand, - rx2: ServerStopCommand, - services: Vec, - availability: WorkerAvailability, - conns: Counter, - factories: Vec>, - state: WorkerState, - shutdown_timeout: Millis, -} - -struct WorkerService { - factory: usize, - status: WorkerServiceStatus, - service: Pipeline, -} - -impl WorkerService { - fn created(&mut self, service: BoxedServerService) { - self.service = Pipeline::new(service); - self.status = WorkerServiceStatus::Unavailable; - } -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -enum WorkerServiceStatus { - Available, - Unavailable, - Failed, - Restarting, - Stopping, - Stopped, -} - -impl Worker { - pub(super) fn start( - idx: usize, - factories: Vec>, - availability: WorkerAvailability, - shutdown_timeout: Millis, - ) -> WorkerClient { - let (tx1, rx1) = unbounded(); - let (tx2, rx2) = unbounded(); - let avail = availability.clone(); - - Arbiter::default().exec_fn(move || { - spawn(async move { - match Worker::create(rx1, rx2, factories, availability, shutdown_timeout) - .await - { - Ok(wrk) => { - spawn(wrk); - } - Err(e) => { - error!("Cannot start worker: {:?}", e); - Arbiter::current().stop(); - } - } - }); - }); - - WorkerClient::new(idx, tx1, tx2, avail) - } - - async fn create( - rx: Receiver, - rx2: Receiver, - factories: Vec>, - availability: WorkerAvailability, - shutdown_timeout: Millis, - ) -> Result { - availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { - rx: Box::pin(rx), - rx2: Box::pin(rx2), - availability, - factories, - shutdown_timeout, - services: Vec::new(), - conns: conns.priv_clone(), - state: WorkerState::Unavailable, - }); - - let mut fut: Vec> = Vec::new(); - for (idx, factory) in wrk.factories.iter().enumerate() { - let f = factory.create(); - fut.push(Box::pin(async move { - let r = f.await?; - - Ok::<_, ()>( - r.into_iter() - .map(|(t, s): (Token, _)| (idx, t, s)) - .collect::>(), - ) - })); - } - - let res: Result, _> = - match select(join_all(fut), stream_recv(&mut wrk.rx2)).await { - Either::Left(result) => result.into_iter().collect(), - Either::Right(Some(StopCommand { result, .. })) => { - trace!("Shutdown uninitialized worker"); - wrk.shutdown(true); - let _ = result.send(false); - return Err(()); - } - Either::Right(None) => Err(()), - }; - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service: service.into(), - status: WorkerServiceStatus::Unavailable, - }); - } - } - Ok(wrk) - } - Err(_) => Err(()), - } - } - - fn shutdown(&mut self, force: bool) { - if force { - self.services.iter_mut().for_each(|srv| { - if srv.status == WorkerServiceStatus::Available { - srv.status = WorkerServiceStatus::Stopped; - let fut = srv - .service - .call_static((None, ServerMessage::ForceShutdown)); - spawn(async move { - let _ = fut.await; - }); - } - }); - } else { - let timeout = self.shutdown_timeout; - self.services.iter_mut().for_each(move |srv| { - if srv.status == WorkerServiceStatus::Available { - srv.status = WorkerServiceStatus::Stopping; - - let fut = srv - .service - .call_static((None, ServerMessage::Shutdown(timeout))); - spawn(async move { - let _ = fut.await; - }); - } - }); - } - } - - fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = self.conns.available(cx); - let mut failed = None; - for (idx, srv) in &mut self.services.iter_mut().enumerate() { - if srv.status == WorkerServiceStatus::Available - || srv.status == WorkerServiceStatus::Unavailable - { - match srv.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - if srv.status == WorkerServiceStatus::Unavailable { - trace!( - "Service {:?} is available", - self.factories[srv.factory].name(Token(idx)) - ); - srv.status = WorkerServiceStatus::Available; - } - } - Poll::Pending => { - ready = false; - - if srv.status == WorkerServiceStatus::Available { - trace!( - "Service {:?} is unavailable", - self.factories[srv.factory].name(Token(idx)) - ); - srv.status = WorkerServiceStatus::Unavailable; - } - } - Poll::Ready(Err(_)) => { - error!( - "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(Token(idx)) - ); - failed = Some((Token(idx), srv.factory)); - srv.status = WorkerServiceStatus::Failed; - } - } - } - } - if let Some(idx) = failed { - Err(idx) - } else { - Ok(ready) - } - } -} - -enum WorkerState { - Available, - Unavailable, - Restarting( - usize, - Token, - BoxFuture<'static, Result, ()>>, - ), - Shutdown(Sleep, Sleep, Option>), -} - -impl Future for Worker { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // `StopWorker` message handler - let stop = Pin::new(&mut self.rx2).poll_next(cx); - if let Poll::Ready(Some(StopCommand { graceful, result })) = stop { - self.availability.set(false); - let num = num_connections(); - if num == 0 { - info!("Shutting down worker, 0 connections"); - let _ = result.send(true); - return Poll::Ready(()); - } else if graceful { - self.shutdown(false); - let num = num_connections(); - if num != 0 { - info!("Graceful worker shutdown, {} connections", num); - self.state = WorkerState::Shutdown( - sleep(STOP_TIMEOUT), - sleep(self.shutdown_timeout), - Some(result), - ); - } else { - let _ = result.send(true); - return Poll::Ready(()); - } - } else { - info!("Force shutdown worker, {} connections", num); - self.shutdown(true); - let _ = result.send(false); - return Poll::Ready(()); - } - } - - match self.state { - WorkerState::Unavailable => { - match self.check_readiness(cx) { - Ok(true) => { - // process requests from wait queue - self.state = WorkerState::Available; - self.availability.set(true); - self.poll(cx) - } - Ok(false) => Poll::Pending, - Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting( - idx, - token, - self.factories[idx].create(), - ); - self.poll(cx) - } - } - } - WorkerState::Restarting(idx, token, ref mut fut) => { - match Pin::new(fut).poll(cx) { - Poll::Ready(Ok(item)) => { - // TODO: deal with multiple services - if let Some((token, service)) = item.into_iter().next() { - trace!( - "Service {:?} has been restarted", - self.factories[idx].name(token) - ); - self.services[token.0].created(service); - // service is restarted, now wait for readiness - self.state = WorkerState::Unavailable; - return self.poll(cx); - } - } - Poll::Ready(Err(_)) => { - panic!( - "Cannot restart {:?} service", - self.factories[idx].name(token) - ); - } - Poll::Pending => return Poll::Pending, - } - self.poll(cx) - } - WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { - let num = num_connections(); - if num == 0 { - let _ = tx.take().unwrap().send(true); - Arbiter::current().stop(); - return Poll::Ready(()); - } - - // check graceful timeout - match t2.poll_elapsed(cx) { - Poll::Pending => (), - Poll::Ready(_) => { - let _ = tx.take().unwrap().send(false); - self.shutdown(true); - Arbiter::current().stop(); - return Poll::Ready(()); - } - } - - // sleep for 1 second and then check again - match t1.poll_elapsed(cx) { - Poll::Pending => (), - Poll::Ready(_) => { - *t1 = sleep(STOP_TIMEOUT); - let _ = t1.poll_elapsed(cx); - } - } - Poll::Pending - } - WorkerState::Available => { - loop { - match self.check_readiness(cx) { - Ok(true) => (), - Ok(false) => { - trace!("Worker is unavailable"); - self.availability.set(false); - self.state = WorkerState::Unavailable; - return self.poll(cx); - } - Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.availability.set(false); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting( - idx, - token, - self.factories[idx].create(), - ); - return self.poll(cx); - } - } - - let next = ready!(Pin::new(&mut self.rx).poll_next(cx)); - if let Some(WorkerCommand(msg)) = next { - // handle incoming io stream - let guard = self.conns.get(); - let srv = &self.services[msg.token.0]; - - if log::log_enabled!(log::Level::Trace) { - trace!( - "Got socket for service: {:?}", - self.factories[srv.factory].name(msg.token) - ); - } - let fut = srv - .service - .call_static((Some(guard), ServerMessage::Connect(msg.io))); - spawn(async move { - let _ = fut.await; - }); - } else { - return Poll::Ready(()); - } - } - } - } - } -} - -#[cfg(test)] -mod tests { - use std::sync::Mutex; - - use super::*; - use crate::io::Io; - use crate::server::service::Factory; - use crate::service::{Service, ServiceCtx, ServiceFactory}; - use crate::util::lazy; - - #[derive(Clone, Copy, Debug)] - enum St { - Fail, - Ready, - Pending, - } - - #[derive(Clone)] - struct SrvFactory { - st: Arc>, - counter: Arc>, - } - - impl ServiceFactory for SrvFactory { - type Response = (); - type Error = (); - type Service = Srv; - type InitError = (); - - async fn create(&self, _: ()) -> Result { - let mut cnt = self.counter.lock().unwrap(); - *cnt += 1; - Ok(Srv { - st: self.st.clone(), - }) - } - } - - struct Srv { - st: Arc>, - } - - impl Service for Srv { - type Response = (); - type Error = (); - - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { - let st: St = { *self.st.lock().unwrap() }; - match st { - St::Fail => { - *self.st.lock().unwrap() = St::Pending; - Poll::Ready(Err(())) - } - St::Ready => Poll::Ready(Ok(())), - St::Pending => Poll::Pending, - } - } - - fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> { - match *self.st.lock().unwrap() { - St::Ready => Poll::Ready(()), - St::Fail | St::Pending => Poll::Pending, - } - } - - async fn call(&self, _: Io, _: ServiceCtx<'_, Self>) -> Result<(), ()> { - Ok(()) - } - } - - #[crate::rt_test] - #[allow(clippy::mutex_atomic)] - async fn basics() { - let (_tx1, rx1) = unbounded(); - let (tx2, rx2) = unbounded(); - let (sync_tx, _sync_rx) = std::sync::mpsc::channel(); - let poll = Arc::new(polling::Poller::new().unwrap()); - let waker = poll.clone(); - let avail = - WorkerAvailability::new(AcceptNotify::new(waker.clone(), sync_tx.clone())); - - let st = Arc::new(Mutex::new(St::Pending)); - let counter = Arc::new(Mutex::new(0)); - - let f = SrvFactory { - st: st.clone(), - counter: counter.clone(), - }; - - let mut worker = Worker::create( - rx1, - rx2, - vec![Factory::create( - "test".to_string(), - Token(0), - move |_| f.clone(), - "127.0.0.1:8080".parse().unwrap(), - "TEST", - )], - avail.clone(), - Millis(5_000), - ) - .await - .unwrap(); - assert_eq!(*counter.lock().unwrap(), 1); - - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(!avail.available()); - - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(!avail.available()); - - *st.lock().unwrap() = St::Ready; - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(avail.available()); - - *st.lock().unwrap() = St::Pending; - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(!avail.available()); - - *st.lock().unwrap() = St::Ready; - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(avail.available()); - - // restart - *st.lock().unwrap() = St::Fail; - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(!avail.available()); - - *st.lock().unwrap() = St::Fail; - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(!avail.available()); - - *st.lock().unwrap() = St::Ready; - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(avail.available()); - - // shutdown - let g = MAX_CONNS_COUNTER.with(|conns| conns.get()); - - let (tx, rx) = oneshot::channel(); - tx2.try_send(StopCommand { - graceful: true, - result: tx, - }) - .unwrap(); - - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(!avail.available()); - drop(g); - assert!(lazy(|cx| Pin::new(&mut worker).poll(cx)).await.is_ready()); - let _ = rx.await; - - // force shutdown - let (_tx1, rx1) = unbounded(); - let (tx2, rx2) = unbounded(); - let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone())); - let f = SrvFactory { - st: st.clone(), - counter: counter.clone(), - }; - - let mut worker = Worker::create( - rx1, - rx2, - vec![Factory::create( - "test".to_string(), - Token(0), - move |_| f.clone(), - "127.0.0.1:8080".parse().unwrap(), - "TEST", - )], - avail.clone(), - Millis(5_000), - ) - .await - .unwrap(); - - // shutdown - let _g = MAX_CONNS_COUNTER.with(|conns| conns.get()); - - *st.lock().unwrap() = St::Ready; - let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; - assert!(avail.available()); - - let (tx, rx) = oneshot::channel(); - tx2.try_send(StopCommand { - graceful: false, - result: tx, - }) - .unwrap(); - - assert!(lazy(|cx| Pin::new(&mut worker).poll(cx)).await.is_ready()); - let _ = rx.await; - } -} diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 5c7c0d46..22d2dd3a 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -608,7 +608,7 @@ where let local_addr = tcp.local_addr().unwrap(); sys.run(move || { - let builder = Server::build().workers(1).disable_signals(); + let builder = crate::server::build().workers(1).disable_signals(); let srv = match cfg.stream { StreamType::Tcp => match cfg.tp { diff --git a/ntex/src/ws/codec.rs b/ntex/src/ws/codec.rs index 6f051cc8..6dba8961 100644 --- a/ntex/src/ws/codec.rs +++ b/ntex/src/ws/codec.rs @@ -263,7 +263,7 @@ impl Decoder for Codec { } OpCode::Bad => Err(ProtocolError::BadOpCode), _ => { - error!("Unfinished fragment {:?}", opcode); + log::error!("Unfinished fragment {:?}", opcode); Err(ProtocolError::ContinuationFragment(opcode)) } } diff --git a/ntex/src/ws/frame.rs b/ntex/src/ws/frame.rs index 17a80d63..7a13ccbf 100644 --- a/ntex/src/ws/frame.rs +++ b/ntex/src/ws/frame.rs @@ -1,4 +1,3 @@ -use log::debug; use nanorand::{Rng, WyRand}; use super::proto::{CloseCode, CloseReason, OpCode}; @@ -117,7 +116,7 @@ impl Parser { return Err(ProtocolError::InvalidLength(length)); } OpCode::Close if length > 125 => { - debug!("Received close frame with payload length exceeding 125. Morphing to protocol close frame."); + log::debug!("Received close frame with payload length exceeding 125. Morphing to protocol close frame."); return Ok(Some((true, OpCode::Close, None))); } _ => (), diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index 433a968d..17750c2a 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -513,6 +513,9 @@ async fn test_h1_head_binary() { #[ntex::test] async fn test_h1_head_binary2() { + std::env::set_var("RUST_LOG", "trace"); + let _ = env_logger::try_init(); + let srv = test_server(|| { HttpService::build().h1(|_| Ready::Ok::<_, io::Error>(Response::Ok().body(STR))) }); diff --git a/ntex/tests/server.rs b/ntex/tests/server.rs index d19bd1e9..36cf7298 100644 --- a/ntex/tests/server.rs +++ b/ntex/tests/server.rs @@ -4,7 +4,7 @@ use std::{io, io::Read, net, sync::mpsc, sync::Arc, thread, time}; use ntex::codec::BytesCodec; use ntex::io::Io; -use ntex::server::{Server, TestServer}; +use ntex::server::{build, TestServer}; use ntex::service::fn_service; use ntex::util::{Bytes, Ready}; @@ -16,7 +16,7 @@ fn test_bind() { let h = thread::spawn(move || { let sys = ntex::rt::System::new("test"); sys.run(move || { - let srv = Server::build() + let srv = build() .workers(1) .disable_signals() .bind("test", addr, move |_| { @@ -44,8 +44,8 @@ async fn test_listen() { let h = thread::spawn(move || { let sys = ntex::rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - sys.run(move || { - let srv = Server::build() + let _ = sys.run(move || { + let srv = build() .disable_signals() .workers(1) .listen("test", lst, move |_| fn_service(|_| Ready::Ok::<_, ()>(()))) @@ -53,7 +53,7 @@ async fn test_listen() { .run(); let _ = tx.send((srv, ntex::rt::System::current())); Ok(()) - }) + }); }); let (srv, sys) = rx.recv().unwrap(); @@ -74,7 +74,7 @@ async fn test_run() { let h = thread::spawn(move || { let sys = ntex::rt::System::new("test"); sys.run(move || { - let srv = Server::build() + let srv = build() .backlog(100) .disable_signals() .bind("test", addr, move |_| { @@ -144,7 +144,7 @@ fn test_on_worker_start() { let num2 = num2.clone(); let sys = ntex::rt::System::new("test"); sys.run(move || { - let srv = Server::build() + let srv = build() .disable_signals() .configure(move |cfg| { let num = num.clone(); @@ -162,12 +162,11 @@ fn test_on_worker_start() { let _ = num.fetch_add(1, Relaxed); Ok::<_, io::Error>(()) } - }) - .unwrap(); + }); Ok::<_, io::Error>(()) }) .unwrap() - .on_worker_start(move |_| { + .on_worker_start(move || { let _ = num2.fetch_add(1, Relaxed); Ready::Ok::<_, io::Error>(()) }) @@ -202,42 +201,53 @@ fn test_configure_async() { let num = num2.clone(); let num2 = num2.clone(); let sys = ntex::rt::System::new("test"); - sys.block_on(async move { - let srv = Server::build() - .disable_signals() - .configure_async(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .on_worker_start(move |rt| { - let num = num.clone(); - async move { - rt.service("addr1", fn_service(|_| Ready::Ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| Ready::Ok::<_, ()>(()))); - let _ = num.fetch_add(1, Relaxed); - Ok::<_, io::Error>(()) - } - }) - .unwrap(); - Ready::Ok::<_, io::Error>(()) - }) - .await - .unwrap() - .on_worker_start(move |_| { - let _ = num2.fetch_add(1, Relaxed); - Ready::Ok::<_, io::Error>(()) - }) - .workers(1) - .run(); - let _ = tx.send((srv, ntex::rt::System::current())); + let _ = sys.run(move || { + ntex_rt::spawn(async move { + let srv = build() + .disable_signals() + .configure_async(move |cfg| { + let num = num.clone(); + let lst = net::TcpListener::bind(addr3).unwrap(); + cfg.bind("addr1", addr1) + .unwrap() + .bind("addr2", addr2) + .unwrap() + .listen("addr3", lst) + .set_tag("addr1", "srv-addr1") + .on_worker_start(move |rt| { + assert!(format!("{:?}", rt).contains("ServiceRuntime")); + let num = num.clone(); + async move { + rt.service( + "addr1", + fn_service(|_| Ready::Ok::<_, ()>(())), + ); + rt.service( + "addr3", + fn_service(|_| Ready::Ok::<_, ()>(())), + ); + let _ = num.fetch_add(1, Relaxed); + Ok::<_, io::Error>(()) + } + }); + Ready::Ok::<_, io::Error>(()) + }) + .await + .unwrap() + .on_worker_start(move || { + let _ = num2.fetch_add(1, Relaxed); + Ready::Ok::<_, io::Error>(()) + }) + .workers(1) + .run(); + let _ = tx.send((srv.clone(), ntex::rt::System::current())); + let _ = srv.await; + }); Ok::<_, io::Error>(()) - }) + }); }); let (_, sys) = rx.recv().unwrap(); + thread::sleep(time::Duration::from_millis(500)); assert!(net::TcpStream::connect(addr1).is_ok()); @@ -263,7 +273,7 @@ fn test_panic_in_worker() { let counter = counter2.clone(); sys.run(move || { let counter = counter.clone(); - let srv = Server::build() + let srv = build() .workers(1) .disable_signals() .bind("test", addr, move |_| {