Refactor worker management (#312)

This commit is contained in:
Nikolay Kim 2024-03-23 07:17:04 +01:00 committed by GitHub
parent ca16e4a5e4
commit d393d87164
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
46 changed files with 1925 additions and 1782 deletions

View file

@ -9,6 +9,7 @@ members = [
"ntex-http",
"ntex-router",
"ntex-rt",
"ntex-server",
"ntex-service",
"ntex-tls",
"ntex-macros",
@ -27,6 +28,7 @@ ntex-io = { path = "ntex-io" }
ntex-http = { path = "ntex-http" }
ntex-router = { path = "ntex-router" }
ntex-rt = { path = "ntex-rt" }
ntex-server = { path = "ntex-server" }
ntex-service = { path = "ntex-service" }
ntex-tls = { path = "ntex-tls" }
ntex-macros = { path = "ntex-macros" }

View file

@ -1,7 +1,7 @@
#![allow(clippy::let_underscore_future)]
use std::any::{Any, TypeId};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
use async_channel::{unbounded, Receiver, Sender};
@ -320,10 +320,17 @@ impl Future for SystemArbiter {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.commands).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd {
let cmd = ready!(Pin::new(&mut self.commands).poll_next(cx));
log::debug!("Received system command: {:?}", cmd);
match cmd {
None => {
log::debug!("System stopped");
return Poll::Ready(());
}
Some(cmd) => match cmd {
SystemCommand::Exit(code) => {
log::debug!("Stopping system with {} code", code);
// stop arbiters
for arb in self.arbiters.values() {
arb.stop();
@ -340,7 +347,6 @@ impl Future for SystemArbiter {
self.arbiters.remove(&name);
}
},
Poll::Pending => return Poll::Pending,
}
}
}

5
ntex-server/CHANGES.md Normal file
View file

@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2024-03-xx
* Release

37
ntex-server/Cargo.toml Normal file
View file

@ -0,0 +1,37 @@
[package]
name = "ntex-server"
version = "0.1.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex-rs/ntex.git"
documentation = "https://docs.rs/ntex-server/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2021"
[lib]
name = "ntex_server"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1"
ntex-service = "2.0"
ntex-rt = "0.4"
ntex-util = "1.0"
async-channel = "2.2"
async-broadcast = "0.7"
log = "0.4"
oneshot = { version = "0.1", default-features = false, features = ["async"] }
[dev-dependencies]
ntex = { version = "1", features = ["tokio"] }
ntex-macros = "0.1.3"
[target.'cfg(target_family = "unix")'.dependencies]
signal-hook = { version = "0.3", features=["iterator"] }
[target.'cfg(target_family = "windows")'.dependencies]
ctrlc = "3.4"

1
ntex-server/LICENSE-APACHE Symbolic link
View file

@ -0,0 +1 @@
../LICENSE-APACHE

1
ntex-server/LICENSE-MIT Symbolic link
View file

@ -0,0 +1 @@
../LICENSE-MIT

61
ntex-server/src/lib.rs Normal file
View file

@ -0,0 +1,61 @@
#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
#![allow(clippy::let_underscore_future)]
use ntex_service::ServiceFactory;
use ntex_util::time::Millis;
mod manager;
mod pool;
mod server;
mod signals;
mod wrk;
pub use self::pool::WorkerPool;
pub use self::server::Server;
pub use self::wrk::{Worker, WorkerStatus, WorkerStop};
/// Worker id
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct WorkerId(usize);
impl WorkerId {
pub(self) fn next(&mut self) -> WorkerId {
let id = WorkerId(self.0);
self.0 += 1;
id
}
}
#[non_exhaustive]
#[derive(Debug)]
/// Worker message
pub enum WorkerMessage<T> {
/// New item received
New(T),
/// Graceful shutdown in millis
Shutdown(Millis),
/// Force shutdown
ForceShutdown,
}
#[allow(async_fn_in_trait)]
/// Worker service factory.
pub trait ServerConfiguration: Send + Clone + 'static {
type Item: Send + 'static;
type Factory: ServiceFactory<WorkerMessage<Self::Item>> + 'static;
/// Create service factory for handling `WorkerMessage<T>` messages.
async fn create(&self) -> Result<Self::Factory, ()>;
/// Server is paused.
fn paused(&self) {}
/// Server is resumed.
fn resumed(&self) {}
/// Server is stopped
fn terminate(&self) {}
/// Server is stopped
async fn stop(&self) {}
}

341
ntex-server/src/manager.rs Normal file
View file

@ -0,0 +1,341 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::{cell::Cell, cell::RefCell, collections::VecDeque, rc::Rc, sync::Arc};
use async_channel::{unbounded, Receiver, Sender};
use ntex_rt::System;
use ntex_util::{future::join_all, time::sleep, time::Millis};
use crate::server::ServerShared;
use crate::signals::Signal;
use crate::{Server, ServerConfiguration, Worker, WorkerId, WorkerPool, WorkerStatus};
const STOP_DELAY: Millis = Millis(500);
const RESTART_DELAY: Millis = Millis(250);
#[derive(Clone)]
pub(crate) struct ServerManager<F: ServerConfiguration>(Rc<Inner<F>>);
#[derive(Debug)]
pub(crate) enum ServerCommand<T> {
Item(T),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(Signal),
Stop {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
NotifyStopped(oneshot::Sender<()>),
Worker(Update<T>),
}
#[derive(Debug)]
pub(crate) enum Update<T> {
Available(Worker<T>),
Unavailable(Worker<T>),
}
struct Inner<F: ServerConfiguration> {
id: Cell<WorkerId>,
factory: F,
cfg: WorkerPool,
shared: Arc<ServerShared>,
stopping: Cell<bool>,
stop_notify: RefCell<Vec<oneshot::Sender<()>>>,
cmd: Sender<ServerCommand<F::Item>>,
}
impl<F: ServerConfiguration> ServerManager<F> {
pub(crate) fn start(cfg: WorkerPool, factory: F) -> Server<F::Item> {
log::info!("Starting {} workers", cfg.num);
let (tx, rx) = unbounded();
let no_signals = cfg.no_signals;
let shared = Arc::new(ServerShared {
paused: AtomicBool::new(true),
});
let mgr = ServerManager(Rc::new(Inner {
cfg,
factory,
id: Cell::new(WorkerId::default()),
shared: shared.clone(),
stopping: Cell::new(false),
stop_notify: RefCell::new(Vec::new()),
cmd: tx.clone(),
}));
// handle cmd
let _ = ntex_rt::spawn(handle_cmd(mgr.clone(), rx));
// start workers
for _ in 0..mgr.0.cfg.num {
start_worker(mgr.clone());
}
let srv = Server::new(tx, shared);
// handle signals
if !no_signals {
crate::signals::start(srv.clone());
}
srv
}
pub(crate) fn factory(&self) -> F {
self.0.factory.clone()
}
pub(crate) fn next_id(&self) -> WorkerId {
let mut id = self.0.id.get();
let next_id = id.next();
self.0.id.set(id);
next_id
}
pub(crate) fn pause(&self) {
self.0.shared.paused.store(true, Ordering::Release);
self.0.factory.paused();
}
pub(crate) fn resume(&self) {
self.0.shared.paused.store(false, Ordering::Release);
self.0.factory.resumed();
}
fn available(&self, wrk: Worker<F::Item>) {
let _ = self
.0
.cmd
.try_send(ServerCommand::Worker(Update::Available(wrk)));
}
fn unavailable(&self, wrk: Worker<F::Item>) {
let _ = self
.0
.cmd
.try_send(ServerCommand::Worker(Update::Unavailable(wrk)));
}
fn add_stop_notify(&self, tx: oneshot::Sender<()>) {
self.0.stop_notify.borrow_mut().push(tx);
}
fn stopping(&self) -> bool {
self.0.stopping.get()
}
}
fn start_worker<F: ServerConfiguration>(mgr: ServerManager<F>) {
let _ = ntex_rt::spawn(async move {
let id = mgr.next_id();
let mut wrk = Worker::start(id, mgr.factory());
loop {
match wrk.status() {
WorkerStatus::Available => mgr.available(wrk.clone()),
WorkerStatus::Unavailable => mgr.unavailable(wrk.clone()),
WorkerStatus::Failed => {
mgr.unavailable(wrk);
sleep(RESTART_DELAY).await;
if !mgr.stopping() {
wrk = Worker::start(id, mgr.factory());
} else {
return;
}
}
}
wrk.wait_for_status().await;
}
});
}
struct HandleCmdState<F: ServerConfiguration> {
next: usize,
backlog: VecDeque<F::Item>,
workers: Vec<Worker<F::Item>>,
mgr: ServerManager<F>,
}
impl<F: ServerConfiguration> HandleCmdState<F> {
fn new(mgr: ServerManager<F>) -> Self {
Self {
next: 0,
backlog: VecDeque::new(),
workers: Vec::with_capacity(mgr.0.cfg.num),
mgr,
}
}
fn process(&mut self, mut item: F::Item) {
loop {
if !self.workers.is_empty() {
if self.next > self.workers.len() {
self.next = self.workers.len() - 1;
}
match self.workers[self.next].send(item) {
Ok(()) => {
self.next = (self.next + 1) % self.workers.len();
break;
}
Err(i) => {
if !self.mgr.0.stopping.get() {
log::trace!("Worker failed while processing item");
}
item = i;
self.workers.remove(self.next);
}
}
} else {
if !self.mgr.0.stopping.get() {
log::error!("No workers");
self.backlog.push_back(item);
self.mgr.pause();
}
break;
}
}
}
fn update_workers(&mut self, upd: Update<F::Item>) {
match upd {
Update::Available(worker) => {
self.workers.push(worker);
if self.workers.len() == 1 {
self.mgr.resume();
} else {
self.workers.sort();
}
}
Update::Unavailable(worker) => {
if let Ok(idx) = self.workers.binary_search(&worker) {
self.workers.remove(idx);
}
if self.workers.is_empty() {
self.mgr.pause();
}
}
}
// handle backlog
if !self.backlog.is_empty() && !self.workers.is_empty() {
while let Some(item) = self.backlog.pop_front() {
// handle worker failure
if let Err(item) = self.workers[0].send(item) {
self.backlog.push_back(item);
self.workers.remove(0);
break;
}
}
}
}
async fn stop(&mut self, graceful: bool, completion: Option<oneshot::Sender<()>>) {
self.mgr.0.stopping.set(true);
// stop server
self.mgr.0.factory.stop().await;
// stop workers
if !self.workers.is_empty() {
let timeout = self.mgr.0.cfg.shutdown_timeout;
if graceful && !timeout.is_zero() {
let futs: Vec<_> = self
.workers
.iter()
.map(|worker| worker.stop(timeout))
.collect();
let _ = join_all(futs).await;
} else {
self.workers.iter().for_each(|worker| {
let _ = worker.stop(Millis::ZERO);
});
}
}
// notify sender
if let Some(tx) = completion {
let _ = tx.send(());
}
let notify = std::mem::take(&mut *self.mgr.0.stop_notify.borrow_mut());
for tx in notify {
let _ = tx.send(());
}
// stop system if server was spawned
if self.mgr.0.cfg.stop_runtime {
sleep(STOP_DELAY).await;
System::current().stop();
}
}
}
struct DropHandle<F: ServerConfiguration>(ServerManager<F>);
impl<T: ServerConfiguration> Drop for DropHandle<T> {
fn drop(&mut self) {
self.0 .0.stopping.set(true);
self.0 .0.factory.terminate();
}
}
async fn handle_cmd<F: ServerConfiguration>(
mgr: ServerManager<F>,
rx: Receiver<ServerCommand<F::Item>>,
) {
let _drop_hnd = DropHandle(mgr.clone());
let mut state = HandleCmdState::new(mgr);
loop {
let item = if let Ok(item) = rx.recv().await {
item
} else {
return;
};
match item {
ServerCommand::Item(item) => state.process(item),
ServerCommand::Worker(upd) => state.update_workers(upd),
ServerCommand::Pause(tx) => {
state.mgr.pause();
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
state.mgr.resume();
let _ = tx.send(());
}
ServerCommand::NotifyStopped(tx) => state.mgr.add_stop_notify(tx),
ServerCommand::Stop {
graceful,
completion,
} => {
state.stop(graceful, completion).await;
return;
}
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop ntex system
match sig {
Signal::Int => {
log::info!("SIGINT received, exiting");
state.stop(false, None).await;
return;
}
Signal::Term => {
log::info!("SIGTERM received, stopping");
state.stop(true, None).await;
return;
}
Signal::Quit => {
log::info!("SIGQUIT received, exiting");
state.stop(false, None).await;
return;
}
_ => (),
}
}
}
}
}

75
ntex-server/src/pool.rs Normal file
View file

@ -0,0 +1,75 @@
use ntex_util::time::Millis;
use crate::{Server, ServerConfiguration};
const DEFAULT_SHUTDOWN_TIMEOUT: Millis = Millis::from_secs(30);
#[derive(Debug, Copy, Clone)]
/// Server builder
pub struct WorkerPool {
pub(crate) num: usize,
pub(crate) no_signals: bool,
pub(crate) stop_runtime: bool,
pub(crate) shutdown_timeout: Millis,
}
impl Default for WorkerPool {
fn default() -> Self {
Self::new()
}
}
impl WorkerPool {
/// Create new Server builder instance
pub fn new() -> Self {
WorkerPool {
num: std::thread::available_parallelism()
.map_or(2, std::num::NonZeroUsize::get),
no_signals: false,
stop_runtime: false,
shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT,
}
}
/// Set number of workers to start.
///
/// By default server uses number of available logical cpu as workers
/// count.
pub fn workers(mut self, num: usize) -> Self {
self.num = num;
self
}
/// Stop current ntex runtime when manager get dropped.
///
/// By default "stop runtime" is disabled.
pub fn stop_runtime(mut self) -> Self {
self.stop_runtime = true;
self
}
/// Disable signal handling.
///
/// By default signal handling is enabled.
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish
/// serving requests. Workers still alive after the timeout are force
/// dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.shutdown_timeout = timeout.into();
self
}
/// Starts processing incoming items and return server controller.
pub fn run<F: ServerConfiguration>(self, factory: F) -> Server<F::Item> {
crate::manager::ServerManager::start(self, factory)
}
}

113
ntex-server/src/server.rs Normal file
View file

@ -0,0 +1,113 @@
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
use std::task::{ready, Context, Poll};
use std::{future::Future, io, pin::Pin};
use async_channel::Sender;
use crate::{manager::ServerCommand, signals::Signal};
#[derive(Debug)]
pub(crate) struct ServerShared {
pub(crate) paused: AtomicBool,
}
/// Server controller
#[derive(Debug)]
pub struct Server<T> {
shared: Arc<ServerShared>,
cmd: Sender<ServerCommand<T>>,
stop: Option<oneshot::Receiver<()>>,
}
impl<T> Server<T> {
pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
Server {
cmd,
shared,
stop: None,
}
}
pub(crate) fn signal(&self, sig: Signal) {
let _ = self.cmd.try_send(ServerCommand::Signal(sig));
}
/// Send item to worker pool
pub fn process(&mut self, item: T) -> Result<(), T> {
if self.shared.paused.load(Ordering::Acquire) {
Err(item)
} else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
match e.into_inner() {
ServerCommand::Item(item) => Err(item),
_ => panic!(),
}
} else {
Ok(())
}
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd.try_send(ServerCommand::Pause(tx));
async move {
let _ = rx.await;
}
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd.try_send(ServerCommand::Resume(tx));
async move {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd.try_send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async move {
let _ = rx.await;
}
}
}
impl<T> Clone for Server<T> {
fn clone(&self) -> Self {
Self {
cmd: self.cmd.clone(),
shared: self.shared.clone(),
stop: None,
}
}
}
impl<T> Future for Server<T> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.stop.is_none() {
let (tx, rx) = oneshot::channel();
if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.stop = Some(rx);
}
let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
Poll::Ready(Ok(()))
}
}

View file

@ -0,0 +1,70 @@
use std::thread;
use crate::server::Server;
/// Different types of process signals
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum Signal {
/// SIGHUP
Hup,
/// SIGINT
Int,
/// SIGTERM
Term,
/// SIGQUIT
Quit,
}
#[cfg(target_family = "unix")]
/// Register signal handler.
///
/// Signals are handled by oneshots, you have to re-register
/// after each signal.
pub(crate) fn start<T: Send + 'static>(srv: Server<T>) {
let _ = thread::Builder::new()
.name("ntex-server signals".to_string())
.spawn(move || {
use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
let sigs = vec![SIGHUP, SIGINT, SIGTERM, SIGQUIT];
let mut signals = match Signals::new(sigs) {
Ok(signals) => signals,
Err(e) => {
log::error!("Cannot initialize signals handler: {}", e);
return;
}
};
for info in &mut signals {
match info {
SIGHUP => srv.signal(Signal::Hup),
SIGTERM => srv.signal(Signal::Term),
SIGINT => {
srv.signal(Signal::Int);
return;
}
SIGQUIT => {
srv.signal(Signal::Quit);
return;
}
_ => {}
}
}
});
}
#[cfg(target_family = "windows")]
/// Register signal handler.
///
/// Signals are handled by oneshots, you have to re-register
/// after each signal.
pub(crate) fn start<T: Send + 'static>(srv: Server<T>) {
let _ = thread::Builder::new()
.name("ntex-server signals".to_string())
.spawn(move || {
ctrlc::set_handler(move || {
srv.signal(Signal::Int);
})
.expect("Error setting Ctrl-C handler");
});
}

341
ntex-server/src/wrk.rs Normal file
View file

@ -0,0 +1,341 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{ready, Context, Poll};
use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc};
use async_broadcast::{self as bus, broadcast};
use async_channel::{unbounded, Receiver, Sender};
use ntex_rt::{spawn, Arbiter};
use ntex_service::{Pipeline, ServiceFactory};
use ntex_util::future::{select, stream_recv, Either, Stream};
use ntex_util::time::{sleep, timeout_checked, Millis};
use crate::{ServerConfiguration, WorkerId, WorkerMessage};
const STOP_TIMEOUT: Millis = Millis::ONE_SEC;
#[derive(Debug)]
/// Shutdown worker
struct Shutdown {
timeout: Millis,
result: oneshot::Sender<bool>,
}
#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
/// Worker status
pub enum WorkerStatus {
Available,
#[default]
Unavailable,
Failed,
}
#[derive(Debug)]
/// Server worker
///
/// Worker accepts message via unbounded channel and starts processing.
pub struct Worker<T> {
id: WorkerId,
tx1: Sender<T>,
tx2: Sender<Shutdown>,
avail: WorkerAvailability,
failed: Arc<AtomicBool>,
}
impl<T> cmp::Ord for Worker<T> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.id.cmp(&other.id)
}
}
impl<T> cmp::PartialOrd for Worker<T> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.id.cmp(&other.id))
}
}
impl<T> hash::Hash for Worker<T> {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl<T> Eq for Worker<T> {}
impl<T> PartialEq for Worker<T> {
fn eq(&self, other: &Worker<T>) -> bool {
self.id == other.id
}
}
#[derive(Debug)]
/// Stop worker process
///
/// Stop future resolves when worker completes processing
/// incoming items and stop arbiter
pub struct WorkerStop(oneshot::Receiver<bool>);
impl<T> Worker<T> {
/// Start worker.
pub fn start<F>(id: WorkerId, cfg: F) -> Worker<T>
where
T: Send + 'static,
F: ServerConfiguration<Item = T>,
{
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let (avail, avail_tx) = WorkerAvailability::create();
Arbiter::default().exec_fn(move || {
let _ = spawn(async move {
log::info!("Starting worker {:?}", id);
log::debug!("Creating server instance in {:?} worker", id);
let factory = cfg.create().await;
log::debug!("Server instance has been created in {:?} worker", id);
match create(id, rx1, rx2, factory, avail_tx).await {
Ok((svc, wrk)) => {
run_worker(svc, wrk).await;
}
Err(e) => {
log::error!("Cannot start worker: {:?}", e);
}
}
Arbiter::current().stop();
});
});
Worker {
id,
tx1,
tx2,
avail,
failed: Arc::new(AtomicBool::new(false)),
}
}
/// Worker id.
pub fn id(&self) -> WorkerId {
self.id
}
/// Send message to the worker.
///
/// Returns `Ok` if message got accepted by the worker.
/// Otherwise return message back as `Err`
pub fn send(&self, msg: T) -> Result<(), T> {
self.tx1.try_send(msg).map_err(|msg| msg.into_inner())
}
/// Check worker status.
pub fn status(&self) -> WorkerStatus {
if self.failed.load(Ordering::Acquire) {
WorkerStatus::Failed
} else if self.avail.available() {
WorkerStatus::Available
} else {
WorkerStatus::Unavailable
}
}
/// Wait for worker status updates
pub async fn wait_for_status(&mut self) -> WorkerStatus {
if self.failed.load(Ordering::Acquire) {
WorkerStatus::Failed
} else {
// cleanup updates
while self.avail.notify.try_recv().is_ok() {}
if self.avail.notify.recv_direct().await.is_err() {
self.failed.store(true, Ordering::Release);
}
self.status()
}
}
/// Stop worker.
///
/// If timeout value is zero, force shutdown worker
pub fn stop(&self, timeout: Millis) -> WorkerStop {
let (result, rx) = oneshot::channel();
let _ = self.tx2.try_send(Shutdown { timeout, result });
WorkerStop(rx)
}
}
impl<T> Clone for Worker<T> {
fn clone(&self) -> Self {
Worker {
id: self.id,
tx1: self.tx1.clone(),
tx2: self.tx2.clone(),
avail: self.avail.clone(),
failed: self.failed.clone(),
}
}
}
impl Future for WorkerStop {
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.0).poll(cx)) {
Ok(res) => Poll::Ready(res),
Err(_) => Poll::Ready(true),
}
}
}
#[derive(Debug, Clone)]
struct WorkerAvailability {
notify: bus::Receiver<()>,
available: Arc<AtomicBool>,
}
#[derive(Debug, Clone)]
struct WorkerAvailabilityTx {
notify: bus::Sender<()>,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
fn create() -> (Self, WorkerAvailabilityTx) {
let (mut tx, rx) = broadcast(16);
tx.set_overflow(true);
let avail = WorkerAvailability {
notify: rx,
available: Arc::new(AtomicBool::new(false)),
};
let avail_tx = WorkerAvailabilityTx {
notify: tx,
available: avail.available.clone(),
};
(avail, avail_tx)
}
fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
}
impl WorkerAvailabilityTx {
fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release);
if !old && val {
let _ = self.notify.try_broadcast(());
}
}
}
/// Service worker
///
/// Worker accepts message via unbounded channel and starts processing.
struct WorkerSt<T, F: ServiceFactory<WorkerMessage<T>>> {
id: WorkerId,
rx: Pin<Box<dyn Stream<Item = T>>>,
stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
factory: F,
availability: WorkerAvailabilityTx,
}
async fn run_worker<T, F>(mut svc: Pipeline<F::Service>, mut wrk: WorkerSt<T, F>)
where
T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static,
{
loop {
let fut = poll_fn(|cx| {
ready!(svc.poll_ready(cx)?);
if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) {
let fut = svc.call_static(WorkerMessage::New(item));
let _ = spawn(async move {
let _ = fut.await;
});
}
Poll::Ready(Ok::<(), F::Error>(()))
});
match select(fut, stream_recv(&mut wrk.stop)).await {
Either::Left(Ok(())) => continue,
Either::Left(Err(_)) => {
wrk.availability.set(false);
}
Either::Right(Some(Shutdown { timeout, result })) => {
wrk.availability.set(false);
if timeout.is_zero() {
let fut = svc.call_static(WorkerMessage::ForceShutdown);
let _ = spawn(async move {
let _ = fut.await;
});
sleep(STOP_TIMEOUT).await;
} else {
let fut = svc.call_static(WorkerMessage::Shutdown(timeout));
let res = timeout_checked(timeout, fut).await;
let _ = result.send(res.is_ok());
};
poll_fn(|cx| svc.poll_shutdown(cx)).await;
log::info!("Stopping worker {:?}", wrk.id);
return;
}
Either::Right(None) => return,
}
loop {
match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
Either::Left(Ok(service)) => {
wrk.availability.set(true);
svc = Pipeline::new(service);
break;
}
Either::Left(Err(_)) => sleep(STOP_TIMEOUT).await,
Either::Right(_) => return,
}
}
}
}
async fn create<T, F>(
id: WorkerId,
rx: Receiver<T>,
stop: Receiver<Shutdown>,
factory: Result<F, ()>,
availability: WorkerAvailabilityTx,
) -> Result<(Pipeline<F::Service>, WorkerSt<T, F>), ()>
where
T: Send + 'static,
F: ServiceFactory<WorkerMessage<T>> + 'static,
{
availability.set(false);
let factory = factory?;
let rx = Box::pin(rx);
let mut stop = Box::pin(stop);
let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
Either::Left(Ok(svc)) => Pipeline::new(svc),
Either::Left(Err(_)) => return Err(()),
Either::Right(Some(Shutdown { result, .. })) => {
log::trace!("Shutdown uninitialized worker");
let _ = result.send(false);
return Err(());
}
Either::Right(None) => return Err(()),
};
availability.set(true);
Ok((
svc,
WorkerSt {
id,
factory,
availability,
rx: Box::pin(rx),
stop: Box::pin(stop),
},
))
}

View file

@ -1,5 +1,9 @@
# Changes
## [2.0.2] - 2024-03-20
* Add boxed rc service factory
## [2.0.1] - 2024-02-07
* Add fmt::Debug impl for PipelineCall

View file

@ -1,5 +1,9 @@
# Changes
## [1.1.0] - 2024-03-xx
* Added server worker's management utils
## [1.0.1] - 2024-01-19
* Allow to lock readiness for Condition

View file

@ -133,7 +133,7 @@ pub struct WeakSender<T> {
}
impl<T> WeakSender<T> {
/// Upgrade to Sender<T>
/// Upgrade to `Sender<T>`
pub fn upgrade(&self) -> Option<Sender<T>> {
self.shared.upgrade().map(|shared| Sender { shared })
}

View file

@ -1,13 +1,16 @@
//! Utilities for ntex framework
#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
#[doc(hidden)]
pub use std::task::ready;
pub mod channel;
pub mod future;
pub mod services;
pub mod task;
pub mod time;
pub use futures_core::{ready, Stream};
pub use futures_core::Stream;
pub use futures_sink::Sink;
pub use ntex_rt::spawn;

View file

@ -64,7 +64,7 @@ where
TimeoutChecked::new_with_delay(future, dur.into())
}
/// Future returned by [`sleep`](sleep).
/// Future returned by [`sleep`].
///
/// # Examples
///
@ -126,7 +126,7 @@ impl Future for Sleep {
}
}
/// Future returned by [`deadline`](deadline).
/// Future returned by [`deadline`].
///
/// # Examples
///

View file

@ -1,5 +1,11 @@
# Changes
## [1.2.0] - 2024-03-xx
* Refactor server workers management
* Move ntex::server to separate crate
## [1.1.2] - 2024-03-12
* Update ntex-h2

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "1.1.2"
version = "1.2.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -56,6 +56,7 @@ ntex-service = "2.0.1"
ntex-macros = "0.1.3"
ntex-util = "1.0.1"
ntex-bytes = "0.1.24"
ntex-server = "0.1.0"
ntex-h2 = "0.5.1"
ntex-rt = "0.4.11"
ntex-io = "1.0.1"
@ -64,7 +65,7 @@ ntex-tokio = { version = "0.4.0", optional = true }
ntex-glommio = { version = "0.4.0", optional = true }
ntex-async-std = { version = "0.4.0", optional = true }
async-channel = "2.1"
async-channel = "2.2"
base64 = "0.22"
bitflags = "2.4"
log = "0.4"

View file

@ -4,14 +4,14 @@ use futures_util::StreamExt;
use log::info;
use ntex::http::header::HeaderValue;
use ntex::http::{HttpService, Request, Response};
use ntex::{server::Server, time::Seconds, util::BytesMut};
use ntex::{time::Seconds, util::BytesMut};
#[ntex::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "echo=info");
env_logger::init();
Server::build()
ntex::server::build()
.bind("echo", "127.0.0.1:8080", |_| {
HttpService::build()
.headers_read_rate(Seconds(1), Seconds(5), 128)

View file

@ -3,7 +3,7 @@ use std::{env, io};
use futures_util::StreamExt;
use log::info;
use ntex::http::{header::HeaderValue, HttpService, Request, Response};
use ntex::{server::Server, util::BytesMut};
use ntex::util::BytesMut;
async fn handle_request(mut req: Request) -> Result<Response, io::Error> {
let mut body = BytesMut::new();
@ -22,7 +22,7 @@ async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "trace");
env_logger::init();
Server::build()
ntex::server::build()
.bind("echo", "127.0.0.1:8080", |_| {
HttpService::build().h2(handle_request)
})?

View file

@ -3,14 +3,14 @@ use std::{env, io};
use log::info;
use ntex::http::header::HeaderValue;
use ntex::http::{HttpService, Response};
use ntex::{server::Server, time::Seconds, util::Ready};
use ntex::{time::Seconds, util::Ready};
#[ntex::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "ntex=trace,hello_world=info");
env_logger::init();
Server::build()
ntex::server::build()
.bind("hello-world", "127.0.0.1:8080", |_| {
HttpService::build()
.headers_read_rate(Seconds(1), Seconds(3), 128)

View file

@ -73,7 +73,7 @@ impl Connector {
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
let _ = ssl
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| error!("Cannot set ALPN protocol: {:?}", e));
.map_err(|e| log::error!("Cannot set ALPN protocol: {:?}", e));
ssl.set_verify(tls_openssl::ssl::SslVerifyMode::NONE);

View file

@ -21,7 +21,7 @@ pub(super) async fn send_request<B>(
where
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.size());
log::trace!("Sending client request: {:?} {:?}", head, body.size());
let length = body.size();
let eof = if head.as_ref().method == Method::HEAD {
true

View file

@ -125,7 +125,7 @@ where
req: Connect,
_: ServiceCtx<'_, Self>,
) -> Result<Connection, ConnectError> {
trace!("Get connection for {:?}", req.uri);
log::trace!("Get connection for {:?}", req.uri);
let inner = self.inner.clone();
let waiters = self.waiters.clone();
@ -140,7 +140,7 @@ where
match result {
// use existing connection
Acquire::Acquired(io, created) => {
trace!("Use existing {:?} connection for {:?}", io, req.uri);
log::trace!("Use existing {:?} connection for {:?}", io, req.uri);
Ok(Connection::new(
io,
created,
@ -149,7 +149,7 @@ where
}
// open new tcp connection
Acquire::Available => {
trace!("Connecting to {:?}", req.uri);
log::trace!("Connecting to {:?}", req.uri);
let uri = req.uri.clone();
let (tx, rx) = waiters.borrow_mut().pool.channel();
OpenConnection::spawn(key, tx, uri, inner, &self.connector, req);
@ -161,7 +161,7 @@ where
}
// pool is full, wait
Acquire::NotAvailable => {
trace!(
log::trace!(
"Pool is full, waiting for available connections for {:?}",
req.uri
);
@ -217,7 +217,7 @@ impl Waiters {
let (req, tx) = waiters.front().unwrap();
// check if waiter is still alive
if tx.is_canceled() {
trace!("Waiter for {:?} is gone, remove waiter", req.uri);
log::trace!("Waiter for {:?} is gone, remove waiter", req.uri);
waiters.pop_front();
continue;
};
@ -338,7 +338,7 @@ where
while let Some((req, tx)) = waiters.front() {
// is waiter still alive
if tx.is_canceled() {
trace!("Waiter for {:?} is gone, cleanup", req.uri);
log::trace!("Waiter for {:?} is gone, cleanup", req.uri);
cleanup = true;
waiters.pop_front();
continue;
@ -348,7 +348,7 @@ where
match result {
Acquire::NotAvailable => break,
Acquire::Acquired(io, created) => {
trace!(
log::trace!(
"Use existing {:?} connection for {:?}, wake up waiter",
io,
req.uri
@ -362,7 +362,7 @@ where
)));
}
Acquire::Available => {
trace!("Connecting to {:?} and wake up waiter", req.uri);
log::trace!("Connecting to {:?} and wake up waiter", req.uri);
cleanup = true;
let (connect, tx) = waiters.pop_front().unwrap();
let uri = connect.uri.clone();
@ -446,7 +446,7 @@ where
// open tcp connection
match ready!(this.fut.poll(cx)) {
Err(err) => {
trace!(
log::trace!(
"Failed to open client connection for {:?} with error {:?}",
&this.key.authority,
err

View file

@ -248,21 +248,21 @@ impl ContentEncoder {
ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(()),
Err(err) => {
trace!("Error decoding br encoding: {}", err);
log::trace!("Error decoding br encoding: {}", err);
Err(err)
}
},
ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(()),
Err(err) => {
trace!("Error decoding gzip encoding: {}", err);
log::trace!("Error decoding gzip encoding: {}", err);
Err(err)
}
},
ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(()),
Err(err) => {
trace!("Error decoding deflate encoding: {}", err);
log::trace!("Error decoding deflate encoding: {}", err);
Err(err)
}
},

View file

@ -39,9 +39,9 @@ impl<T: ResponseError> From<T> for Response {
fn from(err: T) -> Response {
let resp = err.error_response();
if resp.head().status == StatusCode::INTERNAL_SERVER_ERROR {
error!("Internal Server Error: {:?}", err);
log::error!("Internal Server Error: {:?}", err);
} else {
debug!("Error in response: {:?}", err);
log::debug!("Error in response: {:?}", err);
}
resp
}

View file

@ -258,7 +258,7 @@ impl MessageType for Request {
}
httparse::Status::Partial => {
if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
log::trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(DecodeError::TooLarge(src.len()));
}
return Ok(None);
@ -274,7 +274,7 @@ impl MessageType for Request {
// disallow HTTP/1.0 POST requests that do not contain a Content-Length headers
// see https://datatracker.ietf.org/doc/html/rfc1945#section-7.2.2
if ver == Version::HTTP_10 && method == Method::POST && length.is_none() {
debug!("no Content-Length specified for HTTP/1.0 POST request");
log::debug!("no Content-Length specified for HTTP/1.0 POST request");
return Err(DecodeError::Header);
}

View file

@ -425,7 +425,7 @@ where
}
}
Some(Err(e)) => {
error!("Response payload stream error: {:?}", e);
log::error!("Response payload stream error: {:?}", e);
return Err(e.into());
}
}

View file

@ -734,7 +734,7 @@ impl fmt::Debug for ResponseBuilder {
fn log_error<T: Into<HttpError>>(err: T) -> HttpError {
let e = err.into();
error!("Error in ResponseBuilder {}", e);
log::error!("Error in ResponseBuilder {}", e);
e
}

View file

@ -6,7 +6,7 @@ use coo_kie::{Cookie, CookieJar};
use crate::io::{Filter, Io};
use crate::ws::{error::WsClientError, WsClient, WsConnection};
use crate::{rt::System, server::Server, service::ServiceFactory};
use crate::{rt::System, service::ServiceFactory};
use crate::{time::Millis, time::Seconds, util::Bytes};
use super::client::{Client, ClientRequest, ClientResponse, Connector};
@ -222,7 +222,7 @@ fn parts(parts: &mut Option<Inner>) -> &mut Inner {
pub fn server<F, R>(factory: F) -> TestServer
where
F: Fn() -> R + Send + Clone + 'static,
R: ServiceFactory<Io>,
R: ServiceFactory<Io> + 'static,
{
let (tx, rx) = mpsc::channel();
@ -232,14 +232,14 @@ where
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
tx.send((sys.system(), local_addr)).unwrap();
sys.run(|| {
Server::build()
let system = sys.system();
sys.run(move || {
crate::server::build()
.listen("test", tcp, move |_| factory())?
.workers(1)
.disable_signals()
.run();
tx.send((system, local_addr)).unwrap();
Ok(())
})
});

View file

@ -10,7 +10,7 @@
rust_2018_idioms,
unreachable_pub,
// missing_debug_implementations,
// missing_docs,
// missing_docs
)]
#![allow(
type_alias_bounds,
@ -21,9 +21,6 @@
clippy::new_without_default
)]
#[macro_use]
extern crate log;
#[cfg(not(test))] // Work around for rust-lang/rust#62127
pub use ntex_macros::{rt_main as main, rt_test as test};

View file

@ -1,12 +1,12 @@
use std::time::{Duration, Instant};
use std::{cell::Cell, fmt, io, num::NonZeroUsize, sync::mpsc, sync::Arc, thread};
use std::{cell::Cell, fmt, io, sync::mpsc, sync::Arc, thread};
use std::{collections::VecDeque, num::NonZeroUsize};
use polling::{Event, Events, Poller};
use crate::{rt::System, time::sleep, time::Millis, util::Either};
use super::socket::{Listener, SocketAddr};
use super::worker::{Connection, WorkerClient};
use super::socket::{Connection, Listener, SocketAddr};
use super::{Server, ServerStatus, Token};
const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
@ -14,13 +14,12 @@ const ERR_TIMEOUT: Duration = Duration::from_millis(500);
const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
#[derive(Debug)]
pub(super) enum Command {
Stop(mpsc::Sender<()>),
pub enum AcceptorCommand {
Stop(oneshot::Sender<()>),
Terminate,
Pause,
Resume,
Worker(WorkerClient),
Timer,
WorkerAvailable,
}
#[derive(Debug)]
@ -33,27 +32,29 @@ struct ServerSocketInfo {
}
#[derive(Debug, Clone)]
pub(super) struct AcceptNotify(Arc<Poller>, mpsc::Sender<Command>);
pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
impl AcceptNotify {
pub(super) fn new(waker: Arc<Poller>, tx: mpsc::Sender<Command>) -> Self {
fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
AcceptNotify(waker, tx)
}
pub(super) fn send(&self, cmd: Command) {
pub fn send(&self, cmd: AcceptorCommand) {
let _ = self.1.send(cmd);
let _ = self.0.notify();
}
}
pub(super) struct AcceptLoop {
/// Streamin io accept loop
pub struct AcceptLoop {
notify: AcceptNotify,
inner: Option<(mpsc::Receiver<Command>, Arc<Poller>, Server)>,
inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}
impl AcceptLoop {
pub(super) fn new(srv: Server) -> AcceptLoop {
/// Create accept loop
pub fn new() -> AcceptLoop {
// Create a poller instance
let poll = Arc::new(
Poller::new()
@ -66,45 +67,37 @@ impl AcceptLoop {
AcceptLoop {
notify,
inner: Some((rx, poll, srv)),
inner: Some((rx, poll)),
status_handler: None,
}
}
pub(super) fn send(&self, msg: Command) {
self.notify.send(msg)
}
pub(super) fn notify(&self) -> AcceptNotify {
/// Get notification api for the loop
pub fn notify(&self) -> AcceptNotify {
self.notify.clone()
}
pub(super) fn set_status_handler<F>(&mut self, f: F)
pub fn set_status_handler<F>(&mut self, f: F)
where
F: FnMut(ServerStatus) + Send + 'static,
{
self.status_handler = Some(Box::new(f));
}
pub(super) fn start(
&mut self,
socks: Vec<(Token, Listener)>,
workers: Vec<WorkerClient>,
) {
let (rx, poll, srv) = self
/// Start accept loop
pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
let (rx, poll) = self
.inner
.take()
.expect("AcceptLoop cannot be used multiple times");
let status_handler = self.status_handler.take();
Accept::start(
rx,
poll,
socks,
srv,
workers,
self.notify.clone(),
status_handler,
self.status_handler.take(),
);
}
}
@ -121,23 +114,21 @@ impl fmt::Debug for AcceptLoop {
struct Accept {
poller: Arc<Poller>,
rx: mpsc::Receiver<Command>,
rx: mpsc::Receiver<AcceptorCommand>,
sockets: Vec<ServerSocketInfo>,
workers: Vec<WorkerClient>,
srv: Server,
notify: AcceptNotify,
next: usize,
backpressure: bool,
backlog: VecDeque<Connection>,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}
impl Accept {
fn start(
rx: mpsc::Receiver<Command>,
rx: mpsc::Receiver<AcceptorCommand>,
poller: Arc<Poller>,
socks: Vec<(Token, Listener)>,
srv: Server,
workers: Vec<WorkerClient>,
notify: AcceptNotify,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
) {
@ -148,15 +139,14 @@ impl Accept {
.name("ntex-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
Accept::new(rx, poller, socks, workers, srv, notify, status_handler).poll()
Accept::new(rx, poller, socks, srv, notify, status_handler).poll()
});
}
fn new(
rx: mpsc::Receiver<Command>,
rx: mpsc::Receiver<AcceptorCommand>,
poller: Arc<Poller>,
socks: Vec<(Token, Listener)>,
workers: Vec<WorkerClient>,
srv: Server,
notify: AcceptNotify,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
@ -176,12 +166,11 @@ impl Accept {
poller,
rx,
sockets,
workers,
notify,
srv,
status_handler,
next: 0,
backpressure: false,
backpressure: true,
backlog: VecDeque::new(),
}
}
@ -194,12 +183,6 @@ impl Accept {
fn poll(&mut self) {
log::trace!("Starting server accept loop");
// Add all sources
for (idx, info) in self.sockets.iter().enumerate() {
log::info!("Starting socket listener on {}", info.addr);
self.add_source(idx);
}
// Create storage for events
let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
@ -261,7 +244,7 @@ impl Accept {
let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move {
sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(Command::Timer);
notify.send(AcceptorCommand::Timer);
}));
} else {
info.registered.set(true);
@ -300,57 +283,43 @@ impl Accept {
}
}
fn process_cmd(&mut self) -> Either<(), Option<mpsc::Sender<()>>> {
fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Stop(rx) => {
log::trace!("Stopping accept loop");
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
AcceptorCommand::Stop(rx) => {
if !self.backpressure {
log::trace!("Stopping accept loop");
self.backpressure(true);
}
self.update_status(ServerStatus::NotReady);
break Either::Right(Some(rx));
}
Command::Pause => {
log::trace!("Pausing accept loop");
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
AcceptorCommand::Terminate => {
log::trace!("Stopping accept loop");
self.backpressure(true);
break Either::Right(None);
}
AcceptorCommand::Pause => {
if !self.backpressure {
log::trace!("Pausing accept loop");
self.backpressure(true);
}
self.update_status(ServerStatus::NotReady);
}
Command::Resume => {
log::trace!("Resuming accept loop");
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Resuming socket listener on {}", info.addr);
self.add_source(key);
AcceptorCommand::Resume => {
if self.backpressure {
log::trace!("Resuming accept loop");
self.backpressure(false);
}
self.update_status(ServerStatus::Ready);
}
Command::Worker(worker) => {
log::trace!("Adding new worker to accept loop");
self.backpressure(false);
self.workers.push(worker);
}
Command::Timer => {
AcceptorCommand::Timer => {
self.process_timer();
}
Command::WorkerAvailable => {
log::trace!("Worker is available");
self.backpressure(false);
}
},
Err(err) => {
break match err {
mpsc::TryRecvError::Empty => Either::Left(()),
mpsc::TryRecvError::Disconnected => {
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
}
self.backpressure(true);
Either::Right(None)
}
}
@ -366,107 +335,57 @@ impl Accept {
ServerStatus::Ready
});
if self.backpressure {
if !on {
self.backpressure = false;
for (key, info) in self.sockets.iter().enumerate() {
if info.timeout.get().is_none() {
// socket with timeout will re-register itself after timeout
log::info!(
"Resuming socket listener on {} after back-pressure",
info.addr
);
self.add_source(key);
}
if self.backpressure && !on {
// handle backlog
while let Some(msg) = self.backlog.pop_front() {
if let Err(msg) = self.srv.process(msg) {
log::trace!("Server is unavailable");
self.backlog.push_front(msg);
return;
}
}
} else if on {
// re-enable acceptors
self.backpressure = false;
for (key, info) in self.sockets.iter().enumerate() {
if info.timeout.get().is_none() {
// socket with timeout will re-register itself after timeout
log::info!(
"Resuming socket listener on {} after back-pressure",
info.addr
);
self.add_source(key);
}
}
} else if !self.backpressure && on {
self.backpressure = true;
for key in 0..self.sockets.len() {
// disable err timeout
let info = &mut self.sockets[key];
if info.timeout.take().is_none() {
log::trace!("Enabling back-pressure for {}", info.addr);
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
}
}
}
}
fn accept_one(&mut self, mut msg: Connection) {
log::trace!(
"Accepting connection: {:?} bp: {}",
msg.io,
self.backpressure
);
if self.backpressure {
while !self.workers.is_empty() {
match self.workers[self.next].send(msg) {
Ok(_) => (),
Err(tmp) => {
log::trace!("Worker failed while processing connection");
self.update_status(ServerStatus::WorkerFailed);
self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp;
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
log::error!("No workers");
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
self.next = (self.next + 1) % self.workers.len();
break;
}
} else {
let mut idx = 0;
while idx < self.workers.len() {
idx += 1;
if self.workers[self.next].available() {
match self.workers[self.next].send(msg) {
Ok(_) => {
log::trace!("Sent to worker {:?}", self.next);
self.next = (self.next + 1) % self.workers.len();
return;
}
Err(tmp) => {
log::trace!("Worker failed while processing connection");
self.update_status(ServerStatus::WorkerFailed);
self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp;
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
log::error!("No workers");
self.backpressure(true);
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
}
self.next = (self.next + 1) % self.workers.len();
}
// enable backpressure
log::trace!("No available workers, enable back-pressure");
self.backpressure(true);
self.accept_one(msg);
}
}
fn accept(&mut self, token: usize) -> bool {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept() {
Ok(Some(io)) => Connection {
io,
token: info.token,
},
Ok(Some(io)) => {
let msg = Connection {
io,
token: info.token,
};
if let Err(msg) = self.srv.process(msg) {
log::trace!("Server is unavailable");
self.backlog.push_back(msg);
self.backpressure(true);
return false;
}
}
Ok(None) => return true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
Err(ref e) if connection_error(e) => continue,
@ -479,16 +398,12 @@ impl Accept {
let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move {
sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(Command::Timer);
notify.send(AcceptorCommand::Timer);
}));
return false;
}
}
} else {
return false;
};
self.accept_one(msg);
}
}
}
}

View file

@ -1,40 +1,25 @@
use std::{fmt, future::Future, io, marker, mem, net, pin::Pin, task::Context, task::Poll};
use std::{fmt, future::Future, io, net};
use async_channel::unbounded;
use log::{error, info};
use ntex_server::{Server, WorkerPool};
use socket2::{Domain, SockAddr, Socket, Type};
use crate::rt::{spawn, Signal, System};
use crate::time::{sleep, Millis};
use crate::{io::Io, service::ServiceFactory, util::join_all, util::Stream};
use crate::service::ServiceFactory;
use crate::{io::Io, time::Millis};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{
Config, ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime,
};
use super::service::{Factory, InternalServiceFactory};
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{socket::Listener, Server, ServerCommand, ServerStatus, Token};
const STOP_DELAY: Millis = Millis(300);
type ServerStream = Pin<Box<dyn Stream<Item = ServerCommand>>>;
use super::accept::AcceptLoop;
use super::config::{Config, ServiceConfig};
use super::factory::{self, FactoryServiceType, OnWorkerStart, OnWorkerStartWrapper};
use super::{socket::Listener, Connection, ServerStatus, StreamServer, Token};
/// Server builder
pub struct ServerBuilder {
threads: usize,
token: Token,
backlog: i32,
workers: Vec<(usize, WorkerClient)>,
services: Vec<Box<dyn InternalServiceFactory>>,
services: Vec<FactoryServiceType>,
sockets: Vec<(Token, String, Listener)>,
on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: Millis,
no_signals: bool,
cmd: ServerStream,
server: Server,
notify: Vec<oneshot::Sender<()>>,
pool: WorkerPool,
}
impl Default for ServerBuilder {
@ -46,24 +31,14 @@ impl Default for ServerBuilder {
impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded();
let server = Server::new(tx);
ServerBuilder {
threads: std::thread::available_parallelism()
.map_or(2, std::num::NonZeroUsize::get),
token: Token(0),
workers: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
accept: AcceptLoop::new(server.clone()),
on_worker_start: Vec::new(),
accept: AcceptLoop::new(),
backlog: 2048,
exit: false,
shutdown_timeout: Millis::from_secs(30),
no_signals: false,
cmd: Box::pin(rx),
notify: Vec::new(),
server,
pool: WorkerPool::new(),
}
}
@ -72,7 +47,7 @@ impl ServerBuilder {
/// By default server uses number of available logical cpu as workers
/// count.
pub fn workers(mut self, num: usize) -> Self {
self.threads = num;
self.pool = self.pool.workers(num);
self
}
@ -98,7 +73,7 @@ impl ServerBuilder {
///
/// By default max connections is set to a 25k per worker.
pub fn maxconn(self, num: usize) -> Self {
worker::max_concurrent_connections(num);
super::max_concurrent_connections(num);
self
}
@ -106,7 +81,7 @@ impl ServerBuilder {
///
/// By default "stop runtime" is disabled.
pub fn stop_runtime(mut self) -> Self {
self.exit = true;
self.pool = self.pool.stop_runtime();
self
}
@ -114,7 +89,7 @@ impl ServerBuilder {
///
/// By default signal handling is enabled.
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self.pool = self.pool.disable_signals();
self
}
@ -126,7 +101,7 @@ impl ServerBuilder {
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.shutdown_timeout = timeout.into();
self.pool = self.pool.shutdown_timeout(timeout);
self
}
@ -150,19 +125,14 @@ impl ServerBuilder {
where
F: Fn(&mut ServiceConfig) -> io::Result<()>,
{
let mut cfg = ServiceConfig::new(self.threads, self.backlog);
let mut cfg = ServiceConfig::new(self.token, self.backlog);
f(&mut cfg)?;
let mut cfg = cfg.0.borrow_mut();
let mut srv = ConfiguredService::new(cfg.apply.take().unwrap());
for (name, lst, tag) in mem::take(&mut cfg.services) {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?, tag);
self.sockets.push((token, name, Listener::from_tcp(lst)));
}
self.services.push(Box::new(srv));
self.threads = cfg.threads;
let (token, sockets, factory) = cfg.into_factory();
self.token = token;
self.sockets.extend(sockets);
self.services.push(factory);
Ok(self)
}
@ -177,20 +147,14 @@ impl ServerBuilder {
F: Fn(ServiceConfig) -> R,
R: Future<Output = io::Result<()>>,
{
let cfg = ServiceConfig::new(self.threads, self.backlog);
let inner = cfg.0.clone();
let cfg = ServiceConfig::new(self.token, self.backlog);
f(cfg).await?;
f(cfg.clone()).await?;
let mut cfg = inner.borrow_mut();
let mut srv = ConfiguredService::new(cfg.apply.take().unwrap());
for (name, lst, tag) in mem::take(&mut cfg.services) {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?, tag);
self.sockets.push((token, name, Listener::from_tcp(lst)));
}
self.services.push(Box::new(srv));
self.threads = cfg.threads;
let (token, sockets, factory) = cfg.into_factory();
self.token = token;
self.sockets.extend(sockets);
self.services.push(factory);
Ok(self)
}
@ -201,44 +165,38 @@ impl ServerBuilder {
/// It get executed in the worker thread.
pub fn on_worker_start<F, R, E>(mut self, f: F) -> Self
where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
F: Fn() -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
self.services
.push(Box::new(ConfiguredService::new(Box::new(ConfigWrapper {
f,
_t: marker::PhantomData,
}))));
self.on_worker_start.push(OnWorkerStartWrapper::create(f));
self
}
/// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>, R>(
mut self,
name: N,
addr: U,
factory: F,
) -> io::Result<Self>
pub fn bind<F, U, N, R>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
U: net::ToSocketAddrs,
N: AsRef<str>,
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Io>,
R: ServiceFactory<Io> + 'static,
{
let sockets = bind_addr(addr, self.backlog)?;
let mut tokens = Vec::new();
for lst in sockets {
let token = self.token.next();
self.services.push(Factory::create(
name.as_ref().to_string(),
token,
factory.clone(),
lst.local_addr()?,
"",
));
self.sockets
.push((token, name.as_ref().to_string(), Listener::from_tcp(lst)));
tokens.push((token, ""));
}
self.services.push(factory::create_factory_service(
name.as_ref().to_string(),
tokens,
factory,
));
Ok(self)
}
@ -249,7 +207,7 @@ impl ServerBuilder {
N: AsRef<str>,
U: AsRef<std::path::Path>,
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Io>,
R: ServiceFactory<Io> + 'static,
{
use std::os::unix::net::UnixListener;
@ -278,17 +236,13 @@ impl ServerBuilder {
) -> io::Result<Self>
where
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Io>,
R: ServiceFactory<Io> + 'static,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let token = self.token.next();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(Factory::create(
self.services.push(factory::create_factory_service(
name.as_ref().to_string(),
token,
vec![(token, "")],
factory,
addr,
"",
));
self.sockets
.push((token, name.as_ref().to_string(), Listener::from_uds(lst)));
@ -304,15 +258,13 @@ impl ServerBuilder {
) -> io::Result<Self>
where
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Io>,
R: ServiceFactory<Io> + 'static,
{
let token = self.token.next();
self.services.push(Factory::create(
self.services.push(factory::create_factory_service(
name.as_ref().to_string(),
token,
vec![(token, "")],
factory,
lst.local_addr()?,
"",
));
self.sockets
.push((token, name.as_ref().to_string(), Listener::from_tcp(lst)));
@ -343,208 +295,28 @@ impl ServerBuilder {
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server {
pub fn run(self) -> Server<Connection> {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
// start workers
let mut workers = Vec::new();
for idx in 0..self.threads {
let worker = self.start_worker(idx, self.accept.notify());
workers.push(worker.clone());
self.workers.push((idx, worker));
}
// start accept thread
for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2);
}
self.accept.start(
mem::take(&mut self.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect(),
workers,
let srv = StreamServer::new(
self.accept.notify(),
self.services,
self.on_worker_start,
);
let svc = self.pool.run(srv);
// handle signals
if !self.no_signals {
spawn(signals(self.server.clone()));
}
let sockets = self
.sockets
.into_iter()
.map(|sock| {
log::info!("Starting \"{}\" service on {}", sock.1, sock.2);
(sock.0, sock.2)
})
.collect();
self.accept.start(sockets, svc.clone());
// start http server actor
let server = self.server.clone();
spawn(self);
server
}
}
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient {
let avail = WorkerAvailability::new(notify);
let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect();
Worker::start(idx, services, avail, self.shutdown_timeout)
}
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.accept.send(Command::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.accept.send(Command::Resume);
let _ = tx.send(());
}
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop ntex system
match sig {
Signal::Int => {
info!("SIGINT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
_ => (),
}
}
ServerCommand::Notify(tx) => {
self.notify.push(tx);
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
let (tx, rx) = std::sync::mpsc::channel();
self.accept.send(Command::Stop(tx));
let _ = rx.recv();
let notify = std::mem::take(&mut self.notify);
// stop workers
if !self.workers.is_empty() && graceful {
let futs: Vec<_> = self
.workers
.iter()
.map(move |worker| worker.1.stop(graceful))
.collect();
spawn(async move {
let _ = join_all(futs).await;
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
if exit {
sleep(STOP_DELAY).await;
System::current().stop();
}
});
} else {
self.workers.iter().for_each(move |worker| {
worker.1.stop(false);
});
// we need to stop system if server was spawned
if self.exit {
spawn(async {
sleep(STOP_DELAY).await;
System::current().stop();
});
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
}
}
ServerCommand::WorkerFaulted(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let worker = self.start_worker(new_idx, self.accept.notify());
self.workers.push((new_idx, worker.clone()));
self.accept.send(Command::Worker(worker));
}
}
}
}
}
impl Future for ServerBuilder {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.cmd).poll_next(cx) {
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
Poll::Ready(None) => return Poll::Pending,
Poll::Pending => return Poll::Pending,
}
}
}
}
async fn signals(srv: Server) {
loop {
if let Some(rx) = crate::rt::signal() {
if let Ok(sig) = rx.await {
srv.signal(sig);
} else {
return;
}
} else {
log::info!("Signals are not supported by current runtime");
return;
svc
}
}
}

View file

@ -1,15 +1,13 @@
use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc};
use log::error;
use crate::io::Io;
use crate::service::{self, boxed, ServiceFactory as NServiceFactory};
use crate::service::{IntoServiceFactory, ServiceFactory};
use crate::util::{BoxFuture, HashMap, PoolId, Ready};
use super::service::{
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
use super::factory::{
self, BoxServerService, FactoryService, FactoryServiceType, NetService,
};
use super::{builder::bind_addr, counter::CounterGuard, Token};
use super::{builder::bind_addr, socket::Listener, Token};
#[derive(Clone, Debug)]
pub struct Config(Rc<InnerServiceConfig>);
@ -41,31 +39,30 @@ impl Config {
}
}
#[derive(Debug)]
#[derive(Clone)]
pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
#[derive(Debug)]
struct Socket {
name: String,
sockets: Vec<(Token, Listener, &'static str)>,
}
pub(super) struct ServiceConfigInner {
pub(super) services: Vec<(String, net::TcpListener, &'static str)>,
pub(super) apply: Option<Box<dyn ServiceRuntimeConfiguration + Send>>,
pub(super) threads: usize,
pub(super) backlog: i32,
applied: bool,
token: Token,
apply: Option<Box<dyn OnWorkerStart>>,
sockets: Vec<Socket>,
backlog: i32,
}
impl ServiceConfig {
pub(super) fn new(threads: usize, backlog: i32) -> Self {
pub(super) fn new(token: Token, backlog: i32) -> Self {
ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
threads,
token,
backlog,
services: Vec::new(),
applied: false,
apply: Some(Box::new(ConfigWrapper {
f: |_| {
not_configured();
Ready::Ok::<_, &'static str>(())
},
_t: marker::PhantomData,
sockets: Vec::new(),
apply: Some(OnWorkerStartWrapper::create(|_| {
not_configured();
Ready::Ok::<_, &str>(())
})),
})))
}
@ -75,39 +72,41 @@ impl ServiceConfig {
where
U: net::ToSocketAddrs,
{
let sockets = bind_addr(addr, self.0.borrow().backlog)?;
let mut inner = self.0.borrow_mut();
for lst in sockets {
self.listen(name.as_ref(), lst);
}
let sockets = bind_addr(addr, inner.backlog)?;
let socket = Socket {
name: name.as_ref().to_string(),
sockets: sockets
.into_iter()
.map(|lst| (inner.token.next(), Listener::from_tcp(lst), ""))
.collect(),
};
inner.sockets.push(socket);
Ok(self)
}
/// Add new service to the server.
pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
{
let mut inner = self.0.borrow_mut();
if !inner.applied {
inner.apply = Some(Box::new(ConfigWrapper {
f: |_| {
not_configured();
Ready::Ok::<_, &'static str>(())
},
_t: marker::PhantomData,
}));
}
inner.services.push((name.as_ref().to_string(), lst, ""));
}
let mut inner = self.0.borrow_mut();
let socket = Socket {
name: name.as_ref().to_string(),
sockets: vec![(inner.token.next(), Listener::from_tcp(lst), "")],
};
inner.sockets.push(socket);
self
}
/// Set io tag for configured service.
pub fn set_tag<N: AsRef<str>>(&self, name: N, tag: &'static str) -> &Self {
let mut inner = self.0.borrow_mut();
for svc in &mut inner.services {
if svc.0 == name.as_ref() {
svc.2 = tag;
for sock in &mut inner.sockets {
if sock.name == name.as_ref() {
for item in &mut sock.sockets {
item.2 = tag;
}
}
}
self
@ -117,179 +116,115 @@ impl ServiceConfig {
///
/// This function get called during worker runtime configuration stage.
/// It get executed in the worker thread.
pub fn on_worker_start<F, R, E>(&self, f: F) -> io::Result<()>
pub fn on_worker_start<F, R, E>(&self, f: F) -> &Self
where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
self.0.borrow_mut().applied = true;
self.0.borrow_mut().apply = Some(Box::new(ConfigWrapper {
f,
_t: marker::PhantomData,
}));
Ok(())
self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
self
}
}
pub(super) struct ConfiguredService {
rt: Box<dyn ServiceRuntimeConfiguration + Send>,
names: HashMap<Token, (String, net::SocketAddr)>,
topics: HashMap<String, (Token, &'static str)>,
services: Vec<Token>,
}
pub(super) fn into_factory(
self,
) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
let mut inner = self.0.borrow_mut();
impl ConfiguredService {
pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration + Send>) -> Self {
ConfiguredService {
rt,
names: HashMap::default(),
topics: HashMap::default(),
services: Vec::new(),
let mut sockets = Vec::new();
let mut names = HashMap::default();
for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
names.insert(
s.name.clone(),
Entry {
idx,
pool: PoolId::DEFAULT,
tokens: s
.sockets
.iter()
.map(|(token, _, tag)| (*token, *tag))
.collect(),
},
);
sockets.extend(
s.sockets
.into_iter()
.map(|(token, lst, _)| (token, s.name.clone(), lst)),
);
}
}
pub(super) fn stream(
&mut self,
token: Token,
name: String,
addr: net::SocketAddr,
tag: &'static str,
) {
self.names.insert(token, (name.clone(), addr));
self.topics.insert(name, (token, tag));
self.services.push(token);
(
inner.token,
sockets,
Box::new(ConfiguredService {
rt: inner.apply.take().unwrap(),
names,
}),
)
}
}
impl InternalServiceFactory for ConfiguredService {
fn name(&self, token: Token) -> &str {
&self.names[&token].0
}
struct ConfiguredService {
rt: Box<dyn OnWorkerStart>,
names: HashMap<String, Entry>,
}
fn set_tag(&mut self, token: Token, tag: &'static str) {
for item in self.topics.values_mut() {
if item.0 == token {
item.1 = tag;
}
}
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
impl FactoryService for ConfiguredService {
fn clone_factory(&self) -> FactoryServiceType {
Box::new(Self {
rt: self.rt.clone(),
names: self.names.clone(),
topics: self.topics.clone(),
services: self.services.clone(),
})
}
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
// configure services
let rt = ServiceRuntime::new(self.topics.clone());
let cfg_fut = self.rt.configure(ServiceRuntime(rt.0.clone()));
let mut names = self.names.clone();
let tokens = self.services.clone();
let rt = ServiceRuntime::new(self.names.clone());
let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
// construct services
Box::pin(async move {
cfg_fut.await?;
rt.validate();
let names = mem::take(&mut rt.0.borrow_mut().names);
let mut services = mem::take(&mut rt.0.borrow_mut().services);
// TODO: Proper error handling here
let onstart = mem::take(&mut rt.0.borrow_mut().onstart);
for f in onstart.into_iter() {
f.await;
}
let mut res = vec![];
for token in tokens {
if let Some(srv) = services.remove(&token) {
let newserv = srv.create(());
match newserv.await {
Ok(serv) => {
res.push((token, serv));
}
Err(_) => {
error!("Cannot construct service");
return Err(());
}
let mut res = Vec::new();
while let Some(Some(svc)) = services.pop() {
for entry in names.values() {
if entry.idx == services.len() {
res.push(NetService {
pool: entry.pool,
tokens: entry.tokens.clone(),
factory: svc,
});
break;
}
} else {
let name = names.remove(&token).unwrap().0;
res.push((
token,
boxed::service(StreamService::new(
service::fn_service(move |_: Io| {
error!("Service {:?} is not configured", name);
Ready::<_, ()>::Ok(())
}),
"UNKNOWN",
PoolId::P0,
)),
));
};
}
}
Ok(res)
})
}
}
pub(super) trait ServiceRuntimeConfiguration: fmt::Debug {
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration + Send>;
fn configure(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
}
pub(super) struct ConfigWrapper<F, R, E> {
pub(super) f: F,
pub(super) _t: marker::PhantomData<(R, E)>,
}
// SAFETY: we dont store R or E in ConfigWrapper
unsafe impl<F: Send, R, E> Send for ConfigWrapper<F, R, E> {}
impl<F, R, E> fmt::Debug for ConfigWrapper<F, R, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConfigWrapper")
.field("f", &std::any::type_name::<F>())
.finish()
}
}
impl<F, R, E> ServiceRuntimeConfiguration for ConfigWrapper<F, R, E>
where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration + Send> {
Box::new(ConfigWrapper {
f: self.f.clone(),
_t: marker::PhantomData,
})
}
fn configure(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
let f = self.f.clone();
Box::pin(async move {
(f)(rt).await.map_err(|e| {
error!("Cannot configure service: {}", e);
})
})
}
}
fn not_configured() {
error!("Service is not configured");
log::error!("Service is not configured");
}
pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
#[derive(Debug, Clone)]
struct Entry {
idx: usize,
pool: PoolId,
tokens: Vec<(Token, &'static str)>,
}
struct ServiceRuntimeInner {
names: HashMap<String, (Token, &'static str)>,
services: HashMap<Token, BoxServiceFactory>,
onstart: Vec<BoxFuture<'static, ()>>,
names: HashMap<String, Entry>,
services: Vec<Option<BoxServerService>>,
}
impl fmt::Debug for ServiceRuntime {
@ -298,25 +233,23 @@ impl fmt::Debug for ServiceRuntime {
f.debug_struct("ServiceRuntimer")
.field("names", &inner.names)
.field("services", &inner.services)
.field("onstart", &inner.onstart.len())
.finish()
}
}
impl ServiceRuntime {
fn new(names: HashMap<String, (Token, &'static str)>) -> Self {
fn new(names: HashMap<String, Entry>) -> Self {
ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
services: (0..names.len()).map(|_| None).collect(),
names,
services: HashMap::default(),
onstart: Vec::new(),
})))
}
fn validate(&self) {
let inner = self.0.as_ref().borrow();
for (name, item) in &inner.names {
if !inner.services.contains_key(&item.0) {
error!("Service {:?} is not configured", name);
if inner.services[item.idx].is_none() {
log::error!("Service {:?} is not configured", name);
}
}
}
@ -327,8 +260,8 @@ impl ServiceRuntime {
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service<T, F>(&self, name: &str, service: F)
where
F: service::IntoServiceFactory<T, Io>,
T: service::ServiceFactory<Io> + 'static,
F: IntoServiceFactory<T, Io>,
T: ServiceFactory<Io> + 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
{
@ -341,73 +274,73 @@ impl ServiceRuntime {
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service_in<T, F>(&self, name: &str, pool: PoolId, service: F)
where
F: service::IntoServiceFactory<T, Io>,
T: service::ServiceFactory<Io> + 'static,
F: IntoServiceFactory<T, Io>,
T: ServiceFactory<Io> + 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
{
let mut inner = self.0.borrow_mut();
if let Some((token, tag)) = inner.names.get(name) {
let token = *token;
let tag = *tag;
inner.services.insert(
token,
boxed::factory(ServiceFactory {
tag,
pool,
inner: service.into_factory(),
}),
);
if let Some(entry) = inner.names.get_mut(name) {
let idx = entry.idx;
entry.pool = pool;
inner.services[idx] = Some(factory::create_boxed_factory(
name.to_string(),
service.into_factory(),
));
} else {
panic!("Unknown service: {:?}", name);
}
}
/// Execute future before services initialization.
pub fn on_start<F>(&self, fut: F)
where
F: Future<Output = ()> + 'static,
{
self.0.borrow_mut().onstart.push(Box::pin(fut))
}
}
type BoxServiceFactory = service::boxed::BoxServiceFactory<
(),
(Option<CounterGuard>, ServerMessage),
(),
(),
(),
>;
trait OnWorkerStart: Send {
fn clone(&self) -> Box<dyn OnWorkerStart>;
struct ServiceFactory<T> {
inner: T,
tag: &'static str,
pool: PoolId,
fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
}
impl<T> service::ServiceFactory<(Option<CounterGuard>, ServerMessage)> for ServiceFactory<T>
struct OnWorkerStartWrapper<F, R, E> {
pub(super) f: F,
pub(super) _t: marker::PhantomData<(R, E)>,
}
impl<F, R, E> OnWorkerStartWrapper<F, R, E>
where
T: service::ServiceFactory<Io>,
T::Service: 'static,
T::Error: 'static,
T::InitError: fmt::Debug + 'static,
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
type Response = ();
type Error = ();
type InitError = ();
type Service = BoxedServerService;
async fn create(&self, _: ()) -> Result<BoxedServerService, ()> {
let tag = self.tag;
let pool = self.pool;
match self.inner.create(()).await {
Ok(s) => Ok(boxed::service(StreamService::new(s, tag, pool))),
Err(e) => {
error!("Cannot construct service: {:?}", e);
Err(())
}
}
pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
Box::new(Self {
f,
_t: marker::PhantomData,
})
}
}
// SAFETY: Send cannot be provided authomatically because of R param
// but R always get executed in one thread and never leave it
unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
fn clone(&self) -> Box<dyn OnWorkerStart> {
Box::new(Self {
f: self.f.clone(),
_t: marker::PhantomData,
})
}
fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
let f = self.f.clone();
Box::pin(async move {
(f)(rt).await.map_err(|e| {
log::error!("On worker start callback failed: {}", e);
})
})
}
}

206
ntex/src/server/factory.rs Normal file
View file

@ -0,0 +1,206 @@
use std::task::{Context, Poll};
use std::{fmt, future::Future, marker::PhantomData};
use crate::io::Io;
use crate::service::{boxed, Service, ServiceCtx, ServiceFactory};
use crate::util::{BoxFuture, PoolId, Ready};
use super::{Config, Token};
pub(super) type BoxServerService = boxed::BoxServiceFactory<(), Io, (), (), ()>;
pub(crate) type FactoryServiceType = Box<dyn FactoryService>;
pub(crate) struct NetService {
pub(crate) tokens: Vec<(Token, &'static str)>,
pub(crate) factory: BoxServerService,
pub(crate) pool: PoolId,
}
pub(crate) trait FactoryService: Send {
fn name(&self, _: Token) -> &str {
""
}
fn set_tag(&mut self, _: Token, _: &'static str) {}
fn clone_factory(&self) -> Box<dyn FactoryService>;
fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>>;
}
pub(crate) fn create_boxed_factory<S>(name: String, factory: S) -> BoxServerService
where
S: ServiceFactory<Io> + 'static,
{
boxed::factory(ServerServiceFactory { name, factory })
}
pub(crate) fn create_factory_service<F, R>(
name: String,
tokens: Vec<(Token, &'static str)>,
factory: F,
) -> Box<dyn FactoryService>
where
F: Fn(Config) -> R + Send + Clone + 'static,
R: ServiceFactory<Io> + 'static,
{
Box::new(Factory {
tokens,
name: name.clone(),
factory: move |cfg| {
Ready::Ok::<_, &'static str>(create_boxed_factory(name.clone(), (factory)(cfg)))
},
_t: PhantomData,
})
}
struct Factory<F, R, E> {
name: String,
tokens: Vec<(Token, &'static str)>,
factory: F,
_t: PhantomData<(R, E)>,
}
impl<F, R, E> FactoryService for Factory<F, R, E>
where
F: Fn(Config) -> R + Send + Clone + 'static,
R: Future<Output = Result<BoxServerService, E>> + 'static,
E: fmt::Display + 'static,
{
fn name(&self, _: Token) -> &str {
&self.name
}
fn clone_factory(&self) -> Box<dyn FactoryService> {
Box::new(Self {
name: self.name.clone(),
tokens: self.tokens.clone(),
factory: self.factory.clone(),
_t: PhantomData,
})
}
fn set_tag(&mut self, token: Token, tag: &'static str) {
for item in &mut self.tokens {
if item.0 == token {
item.1 = tag;
}
}
}
fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
let cfg = Config::default();
let pool = cfg.get_pool_id();
let name = self.name.clone();
let tokens = self.tokens.clone();
let factory_fut = (self.factory)(cfg);
Box::pin(async move {
let factory = factory_fut.await.map_err(|_| {
log::error!("Cannot create {:?} service", name);
})?;
Ok(vec![NetService {
tokens,
factory,
pool,
}])
})
}
}
struct ServerServiceFactory<S> {
name: String,
factory: S,
}
impl<S> ServiceFactory<Io> for ServerServiceFactory<S>
where
S: ServiceFactory<Io>,
{
type Response = ();
type Error = ();
type Service = ServerService<S::Service>;
type InitError = ();
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
self.factory
.create(())
.await
.map(|inner| ServerService { inner })
.map_err(|_| log::error!("Cannot construct {:?} service", self.name))
}
}
struct ServerService<S> {
inner: S,
}
impl<S> Service<Io> for ServerService<S>
where
S: Service<Io>,
{
type Response = ();
type Error = ();
crate::forward_poll_shutdown!(inner);
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.inner.poll_ready(cx).map_err(|_| ())
}
async fn call(&self, req: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
ctx.call(&self.inner, req).await.map(|_| ()).map_err(|_| ())
}
}
// SAFETY: Send cannot be provided authomatically because of E and R params
// but R always get executed in one thread and never leave it
unsafe impl<F, R, E> Send for Factory<F, R, E> where F: Send {}
pub(crate) trait OnWorkerStart {
fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send>;
fn run(&self) -> BoxFuture<'static, Result<(), ()>>;
}
pub(super) struct OnWorkerStartWrapper<F, R, E> {
pub(super) f: F,
pub(super) _t: PhantomData<(R, E)>,
}
unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
impl<F, R, E> OnWorkerStartWrapper<F, R, E>
where
F: Fn() -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
Box::new(Self { f, _t: PhantomData })
}
}
impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
where
F: Fn() -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send> {
Box::new(Self {
f: self.f.clone(),
_t: PhantomData,
})
}
fn run(&self) -> BoxFuture<'static, Result<(), ()>> {
let f = self.f.clone();
Box::pin(async move {
(f)().await.map_err(|e| {
log::error!("On worker start callback failed: {}", e);
})
})
}
}

View file

@ -1,16 +1,14 @@
//! General purpose tcp server
use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
use async_channel::Sender;
use std::sync::atomic::{AtomicUsize, Ordering};
mod accept;
mod builder;
mod config;
mod counter;
mod factory;
mod service;
mod socket;
mod test;
mod worker;
#[cfg(feature = "openssl")]
pub use ntex_tls::openssl;
@ -21,10 +19,16 @@ pub use ntex_tls::rustls;
pub use ntex_tls::max_concurrent_ssl_accept;
pub(crate) use self::builder::create_tcp_listener;
pub use self::accept::{AcceptLoop, AcceptNotify, AcceptorCommand};
pub use self::builder::ServerBuilder;
pub use self::config::{Config, ServiceConfig, ServiceRuntime};
pub use self::service::{ServerMessage, StreamServer};
pub use self::socket::{Connection, Stream};
pub use self::test::{build_test_server, test_server, TestServer};
pub type Server = ntex_server::Server<Connection>;
#[non_exhaustive]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Server readiness status
@ -36,10 +40,11 @@ pub enum ServerStatus {
/// Socket id token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct Token(usize);
pub struct Token(usize);
impl Token {
pub(self) fn next(&mut self) -> Token {
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Token {
let token = Token(self.0);
self.0 += 1;
token
@ -58,103 +63,23 @@ pub enum SslError<E> {
Service(E),
}
#[derive(Debug)]
enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(crate::rt::Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
/// Notify of server stop
Notify(oneshot::Sender<()>),
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
thread_local! {
static MAX_CONNS_COUNTER: self::counter::Counter =
self::counter::Counter::new(MAX_CONNS.load(Ordering::Relaxed));
}
/// Server controller
#[derive(Debug)]
pub struct Server(Sender<ServerCommand>, Option<oneshot::Receiver<()>>);
impl Server {
fn new(tx: Sender<ServerCommand>) -> Self {
Server(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
fn signal(&self, sig: crate::rt::Signal) {
let _ = self.0.try_send(ServerCommand::Signal(sig));
}
fn worker_faulted(&self, idx: usize) {
let _ = self.0.try_send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.try_send(ServerCommand::Pause(tx));
async move {
let _ = rx.await;
}
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.try_send(ServerCommand::Resume(tx));
async move {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.try_send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async move {
let _ = rx.await;
}
}
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub(super) fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
impl Clone for Server {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for Server {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.try_send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);
}
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
}
}
pub(super) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}

View file

@ -1,74 +1,170 @@
use std::{net::SocketAddr, rc::Rc, task::Context, task::Poll};
use std::{task::Context, task::Poll};
use log::error;
use ntex_server::{ServerConfiguration, WorkerMessage};
use crate::io::Io;
use crate::service::{boxed, Service, ServiceCtx, ServiceFactory};
use crate::util::{BoxFuture, Pool, PoolId, PoolRef};
use crate::{io::Io, time::Millis};
use crate::util::{HashMap, Pool, PoolRef};
use super::{counter::CounterGuard, socket::Stream, Config, Token};
use super::accept::{AcceptNotify, AcceptorCommand};
use super::counter::Counter;
use super::factory::{FactoryServiceType, NetService, OnWorkerStart};
use super::{socket::Connection, Token, MAX_CONNS_COUNTER};
/// Server message
pub(super) enum ServerMessage {
/// New stream
Connect(Stream),
/// Gracefull shutdown in millis
Shutdown(Millis),
/// Force shutdown
ForceShutdown,
pub type ServerMessage = WorkerMessage<Connection>;
pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
pub struct StreamServer {
notify: AcceptNotify,
services: Vec<FactoryServiceType>,
on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
}
pub(super) trait StreamServiceFactory: Send + Clone + 'static {
type Factory: ServiceFactory<Io>;
fn create(&self, _: Config) -> Self::Factory;
}
pub(super) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str;
fn set_tag(&mut self, token: Token, tag: &'static str);
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
}
pub(super) type BoxedServerService =
boxed::BoxService<(Option<CounterGuard>, ServerMessage), (), ()>;
#[derive(Clone)]
pub(super) struct StreamService<T> {
service: Rc<T>,
tag: &'static str,
pool: Pool,
pool_ref: PoolRef,
}
impl<T> StreamService<T> {
pub(crate) fn new(service: T, tag: &'static str, pid: PoolId) -> Self {
StreamService {
tag,
pool: pid.pool(),
pool_ref: pid.pool_ref(),
service: Rc::new(service),
impl StreamServer {
pub(crate) fn new(
notify: AcceptNotify,
services: Vec<FactoryServiceType>,
on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
) -> Self {
Self {
notify,
services,
on_worker_start,
}
}
}
impl<T> Service<(Option<CounterGuard>, ServerMessage)> for StreamService<T>
where
T: Service<Io>,
{
/// Worker service factory.
impl ServerConfiguration for StreamServer {
type Item = Connection;
type Factory = StreamService;
/// Create service factory for handling `WorkerMessage<T>` messages.
async fn create(&self) -> Result<Self::Factory, ()> {
// on worker start callbacks
for cb in &self.on_worker_start {
cb.run().await?;
}
// construct services
let mut services = Vec::new();
for svc in &self.services {
services.extend(svc.create().await?);
}
Ok(StreamService { services })
}
/// Server is paused
fn paused(&self) {
self.notify.send(AcceptorCommand::Pause);
}
/// Server is resumed
fn resumed(&self) {
self.notify.send(AcceptorCommand::Resume);
}
/// Server is stopped
fn terminate(&self) {
self.notify.send(AcceptorCommand::Terminate);
}
/// Server is stopped
async fn stop(&self) {
let (tx, rx) = oneshot::channel();
self.notify.send(AcceptorCommand::Stop(tx));
let _ = rx.await;
}
}
impl Clone for StreamServer {
fn clone(&self) -> Self {
Self {
notify: self.notify.clone(),
services: self.services.iter().map(|s| s.clone_factory()).collect(),
on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
}
}
}
pub struct StreamService {
services: Vec<NetService>,
}
impl ServiceFactory<ServerMessage> for StreamService {
type Response = ();
type Error = ();
type Service = StreamServiceImpl;
type InitError = ();
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
let mut tokens = HashMap::default();
let mut services = Vec::new();
for info in &self.services {
match info.factory.create(()).await {
Ok(svc) => {
services.push(svc);
let idx = services.len() - 1;
for (token, tag) in &info.tokens {
tokens.insert(
*token,
(idx, *tag, info.pool.pool(), info.pool.pool_ref()),
);
}
}
Err(_) => {
log::error!("Cannot construct service: {:?}", info.tokens);
return Err(());
}
}
}
let conns = MAX_CONNS_COUNTER.with(|conns| conns.priv_clone());
Ok(StreamServiceImpl {
tokens,
services,
conns,
})
}
}
pub struct StreamServiceImpl {
tokens: HashMap<Token, (usize, &'static str, Pool, PoolRef)>,
services: Vec<BoxService>,
conns: Counter,
}
impl Service<ServerMessage> for StreamServiceImpl {
type Response = ();
type Error = ();
crate::forward_poll_shutdown!(service);
#[inline]
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self.service.poll_ready(cx).map_err(|_| ())?.is_ready();
let ready = self.pool.poll_ready(cx).is_ready() && ready;
let mut ready = self.conns.available(cx);
for (idx, svc) in self.services.iter().enumerate() {
match svc.poll_ready(cx) {
Poll::Pending => ready = false,
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(_)) => {
for (idx_, tag, _, _) in self.tokens.values() {
if idx == *idx_ {
log::error!("{}: Service readiness has failed", tag);
break;
}
}
return Poll::Ready(Err(()));
}
}
}
// check memory pools
for (_, _, pool, _) in self.tokens.values() {
ready = pool.poll_ready(cx).is_ready() && ready;
}
if ready {
Poll::Ready(Ok(()))
} else {
@ -76,25 +172,41 @@ where
}
}
async fn call(
&self,
(guard, req): (Option<CounterGuard>, ServerMessage),
ctx: ServiceCtx<'_, Self>,
) -> Result<(), ()> {
match req {
ServerMessage::Connect(stream) => {
let stream = stream.try_into().map_err(|e| {
error!("Cannot convert to an async io stream: {}", e);
});
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let mut ready = true;
for svc in &self.services {
match svc.poll_shutdown(cx) {
Poll::Pending => ready = false,
Poll::Ready(_) => (),
}
}
if ready {
log::info!(
"Worker service shutdown, {} connections",
super::num_connections()
);
Poll::Ready(())
} else {
Poll::Pending
}
}
if let Ok(stream) = stream {
let stream: Io<_> = stream;
stream.set_tag(self.tag);
stream.set_memory_pool(self.pool_ref);
let _ = ctx.call(self.service.as_ref(), stream).await;
async fn call(&self, req: ServerMessage, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
match req {
ServerMessage::New(con) => {
if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) {
let stream: Io<_> = con.io.try_into().map_err(|e| {
log::error!("Cannot convert to an async io stream: {}", e);
})?;
stream.set_tag(tag);
stream.set_memory_pool(*pool);
let guard = self.conns.get();
let _ = ctx.call(&self.services[*idx], stream).await;
drop(guard);
Ok(())
} else {
log::error!("Cannot get handler service for connection: {:?}", con);
Err(())
}
}
@ -102,104 +214,3 @@ where
}
}
}
pub(super) struct Factory<F: StreamServiceFactory> {
name: String,
tag: &'static str,
inner: F,
token: Token,
addr: SocketAddr,
}
impl<F> Factory<F>
where
F: StreamServiceFactory,
{
pub(crate) fn create(
name: String,
token: Token,
inner: F,
addr: SocketAddr,
tag: &'static str,
) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
name,
token,
inner,
addr,
tag,
})
}
}
impl<F> InternalServiceFactory for Factory<F>
where
F: StreamServiceFactory,
{
fn name(&self, _: Token) -> &str {
&self.name
}
fn set_tag(&mut self, _: Token, tag: &'static str) {
self.tag = tag;
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
token: self.token,
addr: self.addr,
tag: self.tag,
})
}
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
let token = self.token;
let tag = self.tag;
let cfg = Config::default();
let pool = cfg.get_pool_id();
let factory = self.inner.create(cfg);
Box::pin(async move {
match factory.create(()).await {
Ok(inner) => {
let service = boxed::service(StreamService::new(inner, tag, pool));
Ok(vec![(token, service)])
}
Err(_) => Err(()),
}
})
}
}
impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
fn name(&self, token: Token) -> &str {
self.as_ref().name(token)
}
fn set_tag(&mut self, token: Token, tag: &'static str) {
self.as_mut().set_tag(token, tag);
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory()
}
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
self.as_ref().create()
}
}
impl<F, T> StreamServiceFactory for F
where
F: Fn(Config) -> T + Send + Clone + 'static,
T: ServiceFactory<Io>,
{
type Factory = T;
#[inline]
fn create(&self, cfg: Config) -> T {
(self)(cfg)
}
}

View file

@ -1,8 +1,14 @@
use std::{fmt, io, net};
use crate::{io::Io, rt};
use crate::{io::Io, rt, server::Token};
pub(crate) enum Listener {
#[derive(Debug)]
pub struct Connection {
pub(crate) io: Stream,
pub(crate) token: Token,
}
pub enum Listener {
Tcp(net::TcpListener),
#[cfg(unix)]
Uds(std::os::unix::net::UnixListener),

View file

@ -4,7 +4,7 @@ use std::{io, net, sync::mpsc, thread};
use socket2::{Domain, SockAddr, Socket, Type};
use crate::rt::{tcp_connect, System};
use crate::server::{Server, ServerBuilder};
use crate::server::ServerBuilder;
use crate::{io::Io, service::ServiceFactory};
/// Start test server
@ -41,7 +41,7 @@ use crate::{io::Io, service::ServiceFactory};
pub fn test_server<F, R>(factory: F) -> TestServer
where
F: Fn() -> R + Send + Clone + 'static,
R: ServiceFactory<Io>,
R: ServiceFactory<Io> + 'static,
{
let (tx, rx) = mpsc::channel();
@ -53,7 +53,7 @@ where
tx.send((sys.system(), local_addr)).unwrap();
sys.run(|| {
Server::build()
ServerBuilder::new()
.listen("test", tcp, move |_| factory())?
.set_tag("test", "TEST-SERVER")
.workers(1)
@ -81,7 +81,7 @@ where
tx.send(sys.system()).unwrap();
sys.run(|| {
factory(Server::build()).workers(1).disable_signals().run();
factory(super::build()).workers(1).disable_signals().run();
Ok(())
})
});

View file

@ -1,705 +0,0 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};
use async_channel::{unbounded, Receiver, Sender};
use crate::rt::{spawn, Arbiter};
use crate::service::Pipeline;
use crate::time::{sleep, Millis, Sleep};
use crate::util::{
join_all, ready, select, stream_recv, BoxFuture, Either, Stream as FutStream,
};
use super::accept::{AcceptNotify, Command};
use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
use super::{counter::Counter, socket::Stream, Token};
type ServerStopCommand = Pin<Box<dyn FutStream<Item = StopCommand>>>;
type ServerWorkerCommand = Pin<Box<dyn FutStream<Item = WorkerCommand>>>;
#[derive(Debug)]
pub(super) struct WorkerCommand(Connection);
#[derive(Debug)]
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections are still alive.
pub(super) struct StopCommand {
graceful: bool,
result: oneshot::Sender<bool>,
}
#[derive(Debug)]
pub(super) struct Connection {
pub(super) io: Stream,
pub(super) token: Token,
}
const STOP_TIMEOUT: Millis = Millis::ONE_SEC;
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub(super) fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
pub(super) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
thread_local! {
static MAX_CONNS_COUNTER: Counter =
Counter::new(MAX_CONNS.load(Ordering::Relaxed));
}
#[derive(Clone, Debug)]
pub(super) struct WorkerClient {
pub(super) idx: usize,
tx1: Sender<WorkerCommand>,
tx2: Sender<StopCommand>,
avail: WorkerAvailability,
}
impl WorkerClient {
pub(super) fn new(
idx: usize,
tx1: Sender<WorkerCommand>,
tx2: Sender<StopCommand>,
avail: WorkerAvailability,
) -> Self {
WorkerClient {
idx,
tx1,
tx2,
avail,
}
}
pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> {
self.tx1
.try_send(WorkerCommand(msg))
.map_err(|msg| msg.into_inner().0)
}
pub(super) fn available(&self) -> bool {
self.avail.available()
}
pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (result, rx) = oneshot::channel();
let _ = self.tx2.try_send(StopCommand { graceful, result });
rx
}
}
#[derive(Debug, Clone)]
pub(super) struct WorkerAvailability {
notify: AcceptNotify,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub(super) fn new(notify: AcceptNotify) -> Self {
WorkerAvailability {
notify,
available: Arc::new(AtomicBool::new(false)),
}
}
pub(super) fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub(super) fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release);
if !old && val {
self.notify.send(Command::WorkerAvailable)
}
}
}
/// Service worker
///
/// Worker accepts Socket objects via unbounded channel and starts stream
/// processing.
pub(super) struct Worker {
rx: ServerWorkerCommand,
rx2: ServerStopCommand,
services: Vec<WorkerService>,
availability: WorkerAvailability,
conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState,
shutdown_timeout: Millis,
}
struct WorkerService {
factory: usize,
status: WorkerServiceStatus,
service: Pipeline<BoxedServerService>,
}
impl WorkerService {
fn created(&mut self, service: BoxedServerService) {
self.service = Pipeline::new(service);
self.status = WorkerServiceStatus::Unavailable;
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum WorkerServiceStatus {
Available,
Unavailable,
Failed,
Restarting,
Stopping,
Stopped,
}
impl Worker {
pub(super) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: Millis,
) -> WorkerClient {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let avail = availability.clone();
Arbiter::default().exec_fn(move || {
spawn(async move {
match Worker::create(rx1, rx2, factories, availability, shutdown_timeout)
.await
{
Ok(wrk) => {
spawn(wrk);
}
Err(e) => {
error!("Cannot start worker: {:?}", e);
Arbiter::current().stop();
}
}
});
});
WorkerClient::new(idx, tx1, tx2, avail)
}
async fn create(
rx: Receiver<WorkerCommand>,
rx2: Receiver<StopCommand>,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: Millis,
) -> Result<Worker, ()> {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
rx: Box::pin(rx),
rx2: Box::pin(rx2),
availability,
factories,
shutdown_timeout,
services: Vec::new(),
conns: conns.priv_clone(),
state: WorkerState::Unavailable,
});
let mut fut: Vec<BoxFuture<'static, _>> = Vec::new();
for (idx, factory) in wrk.factories.iter().enumerate() {
let f = factory.create();
fut.push(Box::pin(async move {
let r = f.await?;
Ok::<_, ()>(
r.into_iter()
.map(|(t, s): (Token, _)| (idx, t, s))
.collect::<Vec<_>>(),
)
}));
}
let res: Result<Vec<_>, _> =
match select(join_all(fut), stream_recv(&mut wrk.rx2)).await {
Either::Left(result) => result.into_iter().collect(),
Either::Right(Some(StopCommand { result, .. })) => {
trace!("Shutdown uninitialized worker");
wrk.shutdown(true);
let _ = result.send(false);
return Err(());
}
Either::Right(None) => Err(()),
};
match res {
Ok(services) => {
for item in services {
for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service: service.into(),
status: WorkerServiceStatus::Unavailable,
});
}
}
Ok(wrk)
}
Err(_) => Err(()),
}
}
fn shutdown(&mut self, force: bool) {
if force {
self.services.iter_mut().for_each(|srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopped;
let fut = srv
.service
.call_static((None, ServerMessage::ForceShutdown));
spawn(async move {
let _ = fut.await;
});
}
});
} else {
let timeout = self.shutdown_timeout;
self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping;
let fut = srv
.service
.call_static((None, ServerMessage::Shutdown(timeout)));
spawn(async move {
let _ = fut.await;
});
}
});
}
}
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
let mut ready = self.conns.available(cx);
let mut failed = None;
for (idx, srv) in &mut self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
{
match srv.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(Token(idx))
);
srv.status = WorkerServiceStatus::Available;
}
}
Poll::Pending => {
ready = false;
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(Token(idx))
);
srv.status = WorkerServiceStatus::Unavailable;
}
}
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(Token(idx))
);
failed = Some((Token(idx), srv.factory));
srv.status = WorkerServiceStatus::Failed;
}
}
}
}
if let Some(idx) = failed {
Err(idx)
} else {
Ok(ready)
}
}
}
enum WorkerState {
Available,
Unavailable,
Restarting(
usize,
Token,
BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
),
Shutdown(Sleep, Sleep, Option<oneshot::Sender<bool>>),
}
impl Future for Worker {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// `StopWorker` message handler
let stop = Pin::new(&mut self.rx2).poll_next(cx);
if let Poll::Ready(Some(StopCommand { graceful, result })) = stop {
self.availability.set(false);
let num = num_connections();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = result.send(true);
return Poll::Ready(());
} else if graceful {
self.shutdown(false);
let num = num_connections();
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
sleep(STOP_TIMEOUT),
sleep(self.shutdown_timeout),
Some(result),
);
} else {
let _ = result.send(true);
return Poll::Ready(());
}
} else {
info!("Force shutdown worker, {} connections", num);
self.shutdown(true);
let _ = result.send(false);
return Poll::Ready(());
}
}
match self.state {
WorkerState::Unavailable => {
match self.check_readiness(cx) {
Ok(true) => {
// process requests from wait queue
self.state = WorkerState::Available;
self.availability.set(true);
self.poll(cx)
}
Ok(false) => Poll::Pending,
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name(token)
);
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(
idx,
token,
self.factories[idx].create(),
);
self.poll(cx)
}
}
}
WorkerState::Restarting(idx, token, ref mut fut) => {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(item)) => {
// TODO: deal with multiple services
if let Some((token, service)) = item.into_iter().next() {
trace!(
"Service {:?} has been restarted",
self.factories[idx].name(token)
);
self.services[token.0].created(service);
// service is restarted, now wait for readiness
self.state = WorkerState::Unavailable;
return self.poll(cx);
}
}
Poll::Ready(Err(_)) => {
panic!(
"Cannot restart {:?} service",
self.factories[idx].name(token)
);
}
Poll::Pending => return Poll::Pending,
}
self.poll(cx)
}
WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {
let num = num_connections();
if num == 0 {
let _ = tx.take().unwrap().send(true);
Arbiter::current().stop();
return Poll::Ready(());
}
// check graceful timeout
match t2.poll_elapsed(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
let _ = tx.take().unwrap().send(false);
self.shutdown(true);
Arbiter::current().stop();
return Poll::Ready(());
}
}
// sleep for 1 second and then check again
match t1.poll_elapsed(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
*t1 = sleep(STOP_TIMEOUT);
let _ = t1.poll_elapsed(cx);
}
}
Poll::Pending
}
WorkerState::Available => {
loop {
match self.check_readiness(cx) {
Ok(true) => (),
Ok(false) => {
trace!("Worker is unavailable");
self.availability.set(false);
self.state = WorkerState::Unavailable;
return self.poll(cx);
}
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name(token)
);
self.availability.set(false);
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(
idx,
token,
self.factories[idx].create(),
);
return self.poll(cx);
}
}
let next = ready!(Pin::new(&mut self.rx).poll_next(cx));
if let Some(WorkerCommand(msg)) = next {
// handle incoming io stream
let guard = self.conns.get();
let srv = &self.services[msg.token.0];
if log::log_enabled!(log::Level::Trace) {
trace!(
"Got socket for service: {:?}",
self.factories[srv.factory].name(msg.token)
);
}
let fut = srv
.service
.call_static((Some(guard), ServerMessage::Connect(msg.io)));
spawn(async move {
let _ = fut.await;
});
} else {
return Poll::Ready(());
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
use crate::io::Io;
use crate::server::service::Factory;
use crate::service::{Service, ServiceCtx, ServiceFactory};
use crate::util::lazy;
#[derive(Clone, Copy, Debug)]
enum St {
Fail,
Ready,
Pending,
}
#[derive(Clone)]
struct SrvFactory {
st: Arc<Mutex<St>>,
counter: Arc<Mutex<usize>>,
}
impl ServiceFactory<Io> for SrvFactory {
type Response = ();
type Error = ();
type Service = Srv;
type InitError = ();
async fn create(&self, _: ()) -> Result<Srv, ()> {
let mut cnt = self.counter.lock().unwrap();
*cnt += 1;
Ok(Srv {
st: self.st.clone(),
})
}
}
struct Srv {
st: Arc<Mutex<St>>,
}
impl Service<Io> for Srv {
type Response = ();
type Error = ();
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let st: St = { *self.st.lock().unwrap() };
match st {
St::Fail => {
*self.st.lock().unwrap() = St::Pending;
Poll::Ready(Err(()))
}
St::Ready => Poll::Ready(Ok(())),
St::Pending => Poll::Pending,
}
}
fn poll_shutdown(&self, _: &mut Context<'_>) -> Poll<()> {
match *self.st.lock().unwrap() {
St::Ready => Poll::Ready(()),
St::Fail | St::Pending => Poll::Pending,
}
}
async fn call(&self, _: Io, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Ok(())
}
}
#[crate::rt_test]
#[allow(clippy::mutex_atomic)]
async fn basics() {
let (_tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let (sync_tx, _sync_rx) = std::sync::mpsc::channel();
let poll = Arc::new(polling::Poller::new().unwrap());
let waker = poll.clone();
let avail =
WorkerAvailability::new(AcceptNotify::new(waker.clone(), sync_tx.clone()));
let st = Arc::new(Mutex::new(St::Pending));
let counter = Arc::new(Mutex::new(0));
let f = SrvFactory {
st: st.clone(),
counter: counter.clone(),
};
let mut worker = Worker::create(
rx1,
rx2,
vec![Factory::create(
"test".to_string(),
Token(0),
move |_| f.clone(),
"127.0.0.1:8080".parse().unwrap(),
"TEST",
)],
avail.clone(),
Millis(5_000),
)
.await
.unwrap();
assert_eq!(*counter.lock().unwrap(), 1);
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(!avail.available());
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(!avail.available());
*st.lock().unwrap() = St::Ready;
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(avail.available());
*st.lock().unwrap() = St::Pending;
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(!avail.available());
*st.lock().unwrap() = St::Ready;
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(avail.available());
// restart
*st.lock().unwrap() = St::Fail;
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(!avail.available());
*st.lock().unwrap() = St::Fail;
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(!avail.available());
*st.lock().unwrap() = St::Ready;
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(avail.available());
// shutdown
let g = MAX_CONNS_COUNTER.with(|conns| conns.get());
let (tx, rx) = oneshot::channel();
tx2.try_send(StopCommand {
graceful: true,
result: tx,
})
.unwrap();
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(!avail.available());
drop(g);
assert!(lazy(|cx| Pin::new(&mut worker).poll(cx)).await.is_ready());
let _ = rx.await;
// force shutdown
let (_tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone()));
let f = SrvFactory {
st: st.clone(),
counter: counter.clone(),
};
let mut worker = Worker::create(
rx1,
rx2,
vec![Factory::create(
"test".to_string(),
Token(0),
move |_| f.clone(),
"127.0.0.1:8080".parse().unwrap(),
"TEST",
)],
avail.clone(),
Millis(5_000),
)
.await
.unwrap();
// shutdown
let _g = MAX_CONNS_COUNTER.with(|conns| conns.get());
*st.lock().unwrap() = St::Ready;
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(avail.available());
let (tx, rx) = oneshot::channel();
tx2.try_send(StopCommand {
graceful: false,
result: tx,
})
.unwrap();
assert!(lazy(|cx| Pin::new(&mut worker).poll(cx)).await.is_ready());
let _ = rx.await;
}
}

View file

@ -608,7 +608,7 @@ where
let local_addr = tcp.local_addr().unwrap();
sys.run(move || {
let builder = Server::build().workers(1).disable_signals();
let builder = crate::server::build().workers(1).disable_signals();
let srv = match cfg.stream {
StreamType::Tcp => match cfg.tp {

View file

@ -263,7 +263,7 @@ impl Decoder for Codec {
}
OpCode::Bad => Err(ProtocolError::BadOpCode),
_ => {
error!("Unfinished fragment {:?}", opcode);
log::error!("Unfinished fragment {:?}", opcode);
Err(ProtocolError::ContinuationFragment(opcode))
}
}

View file

@ -1,4 +1,3 @@
use log::debug;
use nanorand::{Rng, WyRand};
use super::proto::{CloseCode, CloseReason, OpCode};
@ -117,7 +116,7 @@ impl Parser {
return Err(ProtocolError::InvalidLength(length));
}
OpCode::Close if length > 125 => {
debug!("Received close frame with payload length exceeding 125. Morphing to protocol close frame.");
log::debug!("Received close frame with payload length exceeding 125. Morphing to protocol close frame.");
return Ok(Some((true, OpCode::Close, None)));
}
_ => (),

View file

@ -513,6 +513,9 @@ async fn test_h1_head_binary() {
#[ntex::test]
async fn test_h1_head_binary2() {
std::env::set_var("RUST_LOG", "trace");
let _ = env_logger::try_init();
let srv = test_server(|| {
HttpService::build().h1(|_| Ready::Ok::<_, io::Error>(Response::Ok().body(STR)))
});

View file

@ -4,7 +4,7 @@ use std::{io, io::Read, net, sync::mpsc, sync::Arc, thread, time};
use ntex::codec::BytesCodec;
use ntex::io::Io;
use ntex::server::{Server, TestServer};
use ntex::server::{build, TestServer};
use ntex::service::fn_service;
use ntex::util::{Bytes, Ready};
@ -16,7 +16,7 @@ fn test_bind() {
let h = thread::spawn(move || {
let sys = ntex::rt::System::new("test");
sys.run(move || {
let srv = Server::build()
let srv = build()
.workers(1)
.disable_signals()
.bind("test", addr, move |_| {
@ -44,8 +44,8 @@ async fn test_listen() {
let h = thread::spawn(move || {
let sys = ntex::rt::System::new("test");
let lst = net::TcpListener::bind(addr).unwrap();
sys.run(move || {
let srv = Server::build()
let _ = sys.run(move || {
let srv = build()
.disable_signals()
.workers(1)
.listen("test", lst, move |_| fn_service(|_| Ready::Ok::<_, ()>(())))
@ -53,7 +53,7 @@ async fn test_listen() {
.run();
let _ = tx.send((srv, ntex::rt::System::current()));
Ok(())
})
});
});
let (srv, sys) = rx.recv().unwrap();
@ -74,7 +74,7 @@ async fn test_run() {
let h = thread::spawn(move || {
let sys = ntex::rt::System::new("test");
sys.run(move || {
let srv = Server::build()
let srv = build()
.backlog(100)
.disable_signals()
.bind("test", addr, move |_| {
@ -144,7 +144,7 @@ fn test_on_worker_start() {
let num2 = num2.clone();
let sys = ntex::rt::System::new("test");
sys.run(move || {
let srv = Server::build()
let srv = build()
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
@ -162,12 +162,11 @@ fn test_on_worker_start() {
let _ = num.fetch_add(1, Relaxed);
Ok::<_, io::Error>(())
}
})
.unwrap();
});
Ok::<_, io::Error>(())
})
.unwrap()
.on_worker_start(move |_| {
.on_worker_start(move || {
let _ = num2.fetch_add(1, Relaxed);
Ready::Ok::<_, io::Error>(())
})
@ -202,42 +201,53 @@ fn test_configure_async() {
let num = num2.clone();
let num2 = num2.clone();
let sys = ntex::rt::System::new("test");
sys.block_on(async move {
let srv = Server::build()
.disable_signals()
.configure_async(move |cfg| {
let num = num.clone();
let lst = net::TcpListener::bind(addr3).unwrap();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.listen("addr3", lst)
.on_worker_start(move |rt| {
let num = num.clone();
async move {
rt.service("addr1", fn_service(|_| Ready::Ok::<_, ()>(())));
rt.service("addr3", fn_service(|_| Ready::Ok::<_, ()>(())));
let _ = num.fetch_add(1, Relaxed);
Ok::<_, io::Error>(())
}
})
.unwrap();
Ready::Ok::<_, io::Error>(())
})
.await
.unwrap()
.on_worker_start(move |_| {
let _ = num2.fetch_add(1, Relaxed);
Ready::Ok::<_, io::Error>(())
})
.workers(1)
.run();
let _ = tx.send((srv, ntex::rt::System::current()));
let _ = sys.run(move || {
ntex_rt::spawn(async move {
let srv = build()
.disable_signals()
.configure_async(move |cfg| {
let num = num.clone();
let lst = net::TcpListener::bind(addr3).unwrap();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.listen("addr3", lst)
.set_tag("addr1", "srv-addr1")
.on_worker_start(move |rt| {
assert!(format!("{:?}", rt).contains("ServiceRuntime"));
let num = num.clone();
async move {
rt.service(
"addr1",
fn_service(|_| Ready::Ok::<_, ()>(())),
);
rt.service(
"addr3",
fn_service(|_| Ready::Ok::<_, ()>(())),
);
let _ = num.fetch_add(1, Relaxed);
Ok::<_, io::Error>(())
}
});
Ready::Ok::<_, io::Error>(())
})
.await
.unwrap()
.on_worker_start(move || {
let _ = num2.fetch_add(1, Relaxed);
Ready::Ok::<_, io::Error>(())
})
.workers(1)
.run();
let _ = tx.send((srv.clone(), ntex::rt::System::current()));
let _ = srv.await;
});
Ok::<_, io::Error>(())
})
});
});
let (_, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok());
@ -263,7 +273,7 @@ fn test_panic_in_worker() {
let counter = counter2.clone();
sys.run(move || {
let counter = counter.clone();
let srv = Server::build()
let srv = build()
.workers(1)
.disable_signals()
.bind("test", addr, move |_| {