Abstract server/worker implementation to allow for creating non-TCP-based web servers

This commit is contained in:
Diogo Barbosa 2024-03-06 13:05:08 +01:00
parent 68e158d877
commit 37e56f242b
7 changed files with 178 additions and 141 deletions

View file

@ -5,24 +5,14 @@ use polling::{Event, Events, Poller};
use crate::{rt::System, time::sleep, time::Millis, util::Either}; use crate::{rt::System, time::sleep, time::Millis, util::Either};
use super::socket::{Listener, SocketAddr}; use super::socket::{Listener, SocketAddr, Stream};
use super::worker::{Connection, WorkerClient}; use super::worker::{WorkerMessage, WorkerClient, WorkerManagerCmd, WorkerManagerNotifier};
use super::{Server, ServerStatus, Token}; use super::{Server, ServerStatus, Token};
const EXIT_TIMEOUT: Duration = Duration::from_millis(100); const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
const ERR_TIMEOUT: Duration = Duration::from_millis(500); const ERR_TIMEOUT: Duration = Duration::from_millis(500);
const ERR_SLEEP_TIMEOUT: Millis = Millis(525); const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
#[derive(Debug)]
pub(super) enum Command {
Stop(mpsc::Sender<()>),
Pause,
Resume,
Worker(WorkerClient),
Timer,
WorkerAvailable,
}
#[derive(Debug)] #[derive(Debug)]
struct ServerSocketInfo { struct ServerSocketInfo {
addr: SocketAddr, addr: SocketAddr,
@ -33,22 +23,28 @@ struct ServerSocketInfo {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(super) struct AcceptNotify(Arc<Poller>, mpsc::Sender<Command>); pub(super) struct AcceptNotify(Arc<Poller>, mpsc::Sender<WorkerManagerCmd<Stream>>);
impl AcceptNotify { impl AcceptNotify {
pub(super) fn new(waker: Arc<Poller>, tx: mpsc::Sender<Command>) -> Self { pub(super) fn new(waker: Arc<Poller>, tx: mpsc::Sender<WorkerManagerCmd<Stream>>) -> Self {
AcceptNotify(waker, tx) AcceptNotify(waker, tx)
} }
}
pub(super) fn send(&self, cmd: Command) { impl WorkerManagerNotifier<Stream> for AcceptNotify {
fn send(&self, cmd: WorkerManagerCmd<Stream>) {
let _ = self.1.send(cmd); let _ = self.1.send(cmd);
let _ = self.0.notify(); let _ = self.0.notify();
} }
fn clone_box(&self) -> Box<dyn WorkerManagerNotifier<Stream>> {
Box::new(self.clone())
}
} }
pub(super) struct AcceptLoop { pub(super) struct AcceptLoop {
notify: AcceptNotify, notify: AcceptNotify,
inner: Option<(mpsc::Receiver<Command>, Arc<Poller>, Server)>, inner: Option<(mpsc::Receiver<WorkerManagerCmd<Stream>>, Arc<Poller>, Server)>,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>, status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
} }
@ -57,7 +53,7 @@ impl AcceptLoop {
// Create a poller instance // Create a poller instance
let poll = Arc::new( let poll = Arc::new(
Poller::new() Poller::new()
.map_err(|e| panic!("Cannot create Polller {}", e)) .map_err(|e| panic!("Cannot create Poller {}", e))
.unwrap(), .unwrap(),
); );
@ -71,7 +67,7 @@ impl AcceptLoop {
} }
} }
pub(super) fn send(&self, msg: Command) { pub(super) fn send(&self, msg: WorkerManagerCmd<Stream>) {
self.notify.send(msg) self.notify.send(msg)
} }
@ -89,7 +85,7 @@ impl AcceptLoop {
pub(super) fn start( pub(super) fn start(
&mut self, &mut self,
socks: Vec<(Token, Listener)>, socks: Vec<(Token, Listener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient<Stream>>,
) { ) {
let (rx, poll, srv) = self let (rx, poll, srv) = self
.inner .inner
@ -121,9 +117,9 @@ impl fmt::Debug for AcceptLoop {
struct Accept { struct Accept {
poller: Arc<Poller>, poller: Arc<Poller>,
rx: mpsc::Receiver<Command>, rx: mpsc::Receiver<WorkerManagerCmd<Stream>>,
sockets: Vec<ServerSocketInfo>, sockets: Vec<ServerSocketInfo>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient<Stream>>,
srv: Server, srv: Server,
notify: AcceptNotify, notify: AcceptNotify,
next: usize, next: usize,
@ -133,11 +129,11 @@ struct Accept {
impl Accept { impl Accept {
fn start( fn start(
rx: mpsc::Receiver<Command>, rx: mpsc::Receiver<WorkerManagerCmd<Stream>>,
poller: Arc<Poller>, poller: Arc<Poller>,
socks: Vec<(Token, Listener)>, socks: Vec<(Token, Listener)>,
srv: Server, srv: Server,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient<Stream>>,
notify: AcceptNotify, notify: AcceptNotify,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>, status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
) { ) {
@ -153,10 +149,10 @@ impl Accept {
} }
fn new( fn new(
rx: mpsc::Receiver<Command>, rx: mpsc::Receiver<WorkerManagerCmd<Stream>>,
poller: Arc<Poller>, poller: Arc<Poller>,
socks: Vec<(Token, Listener)>, socks: Vec<(Token, Listener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient<Stream>>,
srv: Server, srv: Server,
notify: AcceptNotify, notify: AcceptNotify,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>, status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
@ -261,7 +257,7 @@ impl Accept {
let notify = self.notify.clone(); let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move { System::current().arbiter().spawn(Box::pin(async move {
sleep(ERR_SLEEP_TIMEOUT).await; sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(Command::Timer); notify.send(WorkerManagerCmd::Timer);
})); }));
} else { } else {
info.registered.set(true); info.registered.set(true);
@ -304,7 +300,7 @@ impl Accept {
loop { loop {
match self.rx.try_recv() { match self.rx.try_recv() {
Ok(cmd) => match cmd { Ok(cmd) => match cmd {
Command::Stop(rx) => { WorkerManagerCmd::Stop(rx) => {
log::trace!("Stopping accept loop"); log::trace!("Stopping accept loop");
for (key, info) in self.sockets.iter().enumerate() { for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr); log::info!("Stopping socket listener on {}", info.addr);
@ -313,7 +309,7 @@ impl Accept {
self.update_status(ServerStatus::NotReady); self.update_status(ServerStatus::NotReady);
break Either::Right(Some(rx)); break Either::Right(Some(rx));
} }
Command::Pause => { WorkerManagerCmd::Pause => {
log::trace!("Pausing accept loop"); log::trace!("Pausing accept loop");
for (key, info) in self.sockets.iter().enumerate() { for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr); log::info!("Stopping socket listener on {}", info.addr);
@ -321,7 +317,7 @@ impl Accept {
} }
self.update_status(ServerStatus::NotReady); self.update_status(ServerStatus::NotReady);
} }
Command::Resume => { WorkerManagerCmd::Resume => {
log::trace!("Resuming accept loop"); log::trace!("Resuming accept loop");
for (key, info) in self.sockets.iter().enumerate() { for (key, info) in self.sockets.iter().enumerate() {
log::info!("Resuming socket listener on {}", info.addr); log::info!("Resuming socket listener on {}", info.addr);
@ -329,15 +325,15 @@ impl Accept {
} }
self.update_status(ServerStatus::Ready); self.update_status(ServerStatus::Ready);
} }
Command::Worker(worker) => { WorkerManagerCmd::Worker(worker) => {
log::trace!("Adding new worker to accept loop"); log::trace!("Adding new worker to accept loop");
self.backpressure(false); self.backpressure(false);
self.workers.push(worker); self.workers.push(worker);
} }
Command::Timer => { WorkerManagerCmd::Timer => {
self.process_timer(); self.process_timer();
} }
Command::WorkerAvailable => { WorkerManagerCmd::WorkerAvailable => {
log::trace!("Worker is available"); log::trace!("Worker is available");
self.backpressure(false); self.backpressure(false);
} }
@ -393,10 +389,10 @@ impl Accept {
} }
} }
fn accept_one(&mut self, mut msg: Connection) { fn accept_one(&mut self, mut msg: WorkerMessage<Stream>) {
log::trace!( log::trace!(
"Accepting connection: {:?} bp: {}", "Accepting connection: {:?} bp: {}",
msg.io, msg.content,
self.backpressure self.backpressure
); );
@ -463,8 +459,8 @@ impl Accept {
loop { loop {
let msg = if let Some(info) = self.sockets.get_mut(token) { let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept() { match info.sock.accept() {
Ok(Some(io)) => Connection { Ok(Some(io)) => WorkerMessage {
io, content: io,
token: info.token, token: info.token,
}, },
Ok(None) => return true, Ok(None) => return true,
@ -479,7 +475,7 @@ impl Accept {
let notify = self.notify.clone(); let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move { System::current().arbiter().spawn(Box::pin(async move {
sleep(ERR_SLEEP_TIMEOUT).await; sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(Command::Timer); notify.send(WorkerManagerCmd::Timer);
})); }));
return false; return false;
} }

View file

@ -8,13 +8,13 @@ use crate::rt::{spawn, Signal, System};
use crate::time::{sleep, Millis}; use crate::time::{sleep, Millis};
use crate::{io::Io, service::ServiceFactory, util::join_all, util::Stream}; use crate::{io::Io, service::ServiceFactory, util::join_all, util::Stream};
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify};
use super::config::{ use super::config::{
Config, ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime, Config, ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime,
}; };
use super::service::{Factory, InternalServiceFactory}; use super::service::{Factory, InternalServiceFactory};
use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; use super::worker::{self, WorkerManagerCmd, Worker, WorkerAvailability, WorkerClient};
use super::{socket::Listener, Server, ServerCommand, ServerStatus, Token}; use super::{socket::Listener, Server, ServerCommand, ServerStatus, Token, socket};
const STOP_DELAY: Millis = Millis(300); const STOP_DELAY: Millis = Millis(300);
@ -25,8 +25,8 @@ pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: Token,
backlog: i32, backlog: i32,
workers: Vec<(usize, WorkerClient)>, workers: Vec<(usize, WorkerClient<socket::Stream>)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory<socket::Stream>>>,
sockets: Vec<(Token, String, Listener)>, sockets: Vec<(Token, String, Listener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
@ -381,9 +381,9 @@ impl ServerBuilder {
} }
} }
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient<socket::Stream> {
let avail = WorkerAvailability::new(notify); let avail = WorkerAvailability::new(Box::new(notify));
let services: Vec<Box<dyn InternalServiceFactory>> = let services: Vec<Box<dyn InternalServiceFactory<socket::Stream>>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Worker::start(idx, services, avail, self.shutdown_timeout) Worker::start(idx, services, avail, self.shutdown_timeout)
@ -392,11 +392,11 @@ impl ServerBuilder {
fn handle_cmd(&mut self, item: ServerCommand) { fn handle_cmd(&mut self, item: ServerCommand) {
match item { match item {
ServerCommand::Pause(tx) => { ServerCommand::Pause(tx) => {
self.accept.send(Command::Pause); self.accept.send(WorkerManagerCmd::Pause);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Resume(tx) => { ServerCommand::Resume(tx) => {
self.accept.send(Command::Resume); self.accept.send(WorkerManagerCmd::Resume);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Signal(sig) => { ServerCommand::Signal(sig) => {
@ -441,7 +441,7 @@ impl ServerBuilder {
// stop accept thread // stop accept thread
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
self.accept.send(Command::Stop(tx)); self.accept.send(WorkerManagerCmd::Stop(tx));
let _ = rx.recv(); let _ = rx.recv();
let notify = std::mem::take(&mut self.notify); let notify = std::mem::take(&mut self.notify);
@ -513,7 +513,7 @@ impl ServerBuilder {
let worker = self.start_worker(new_idx, self.accept.notify()); let worker = self.start_worker(new_idx, self.accept.notify());
self.workers.push((new_idx, worker.clone())); self.workers.push((new_idx, worker.clone()));
self.accept.send(Command::Worker(worker)); self.accept.send(WorkerManagerCmd::Worker(worker));
} }
} }
} }

View file

@ -3,6 +3,7 @@ use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net,
use log::error; use log::error;
use crate::io::Io; use crate::io::Io;
use crate::server::socket::Stream;
use crate::service::{self, boxed, ServiceFactory as NServiceFactory}; use crate::service::{self, boxed, ServiceFactory as NServiceFactory};
use crate::util::{BoxFuture, HashMap, PoolId, Ready}; use crate::util::{BoxFuture, HashMap, PoolId, Ready};
@ -162,7 +163,7 @@ impl ConfiguredService {
} }
} }
impl InternalServiceFactory for ConfiguredService { impl InternalServiceFactory<Stream> for ConfiguredService {
fn name(&self, token: Token) -> &str { fn name(&self, token: Token) -> &str {
&self.names[&token].0 &self.names[&token].0
} }
@ -175,7 +176,7 @@ impl InternalServiceFactory for ConfiguredService {
} }
} }
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory<Stream>> {
Box::new(Self { Box::new(Self {
rt: self.rt.clone(), rt: self.rt.clone(),
names: self.names.clone(), names: self.names.clone(),
@ -184,7 +185,7 @@ impl InternalServiceFactory for ConfiguredService {
}) })
} }
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> { fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService<Stream>)>, ()>> {
// configure services // configure services
let rt = ServiceRuntime::new(self.topics.clone()); let rt = ServiceRuntime::new(self.topics.clone());
let cfg_fut = self.rt.configure(ServiceRuntime(rt.0.clone())); let cfg_fut = self.rt.configure(ServiceRuntime(rt.0.clone()));
@ -374,7 +375,7 @@ impl ServiceRuntime {
type BoxServiceFactory = service::boxed::BoxServiceFactory< type BoxServiceFactory = service::boxed::BoxServiceFactory<
(), (),
(Option<CounterGuard>, ServerMessage), (Option<CounterGuard>, ServerMessage<Stream>),
(), (),
(), (),
(), (),
@ -386,7 +387,7 @@ struct ServiceFactory<T> {
pool: PoolId, pool: PoolId,
} }
impl<T> service::ServiceFactory<(Option<CounterGuard>, ServerMessage)> for ServiceFactory<T> impl<T> service::ServiceFactory<(Option<CounterGuard>, ServerMessage<Stream>)> for ServiceFactory<T>
where where
T: service::ServiceFactory<Io>, T: service::ServiceFactory<Io>,
T::Service: 'static, T::Service: 'static,
@ -396,9 +397,9 @@ where
type Response = (); type Response = ();
type Error = (); type Error = ();
type InitError = (); type InitError = ();
type Service = BoxedServerService; type Service = BoxedServerService<Stream>;
async fn create(&self, _: ()) -> Result<BoxedServerService, ()> { async fn create(&self, _: ()) -> Result<BoxedServerService<Stream>, ()> {
let tag = self.tag; let tag = self.tag;
let pool = self.pool; let pool = self.pool;

View file

@ -44,7 +44,7 @@ impl Counter {
} }
} }
pub(super) struct CounterGuard(Rc<CounterInner>); pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard { impl CounterGuard {
fn new(inner: Rc<CounterInner>) -> Self { fn new(inner: Rc<CounterInner>) -> Self {

View file

@ -6,11 +6,11 @@ use async_channel::Sender;
mod accept; mod accept;
mod builder; mod builder;
mod config; mod config;
mod counter; pub mod counter;
mod service; pub mod service;
mod socket; mod socket;
mod test; mod test;
mod worker; pub mod worker;
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
pub use ntex_tls::openssl; pub use ntex_tls::openssl;
@ -36,10 +36,10 @@ pub enum ServerStatus {
/// Socket id token /// Socket id token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct Token(usize); pub struct Token(pub usize);
impl Token { impl Token {
pub(self) fn next(&mut self) -> Token { pub fn next(&mut self) -> Token {
let token = Token(self.0); let token = Token(self.0);
self.0 += 1; self.0 += 1;
token token
@ -51,7 +51,7 @@ pub fn build() -> ServerBuilder {
ServerBuilder::default() ServerBuilder::default()
} }
/// Ssl error combinded with service error. /// Ssl error combined with service error.
#[derive(Debug)] #[derive(Debug)]
pub enum SslError<E> { pub enum SslError<E> {
Ssl(Box<dyn std::error::Error>), Ssl(Box<dyn std::error::Error>),
@ -59,7 +59,7 @@ pub enum SslError<E> {
} }
#[derive(Debug)] #[derive(Debug)]
enum ServerCommand { pub enum ServerCommand {
WorkerFaulted(usize), WorkerFaulted(usize),
Pause(oneshot::Sender<()>), Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>), Resume(oneshot::Sender<()>),
@ -78,7 +78,7 @@ enum ServerCommand {
pub struct Server(Sender<ServerCommand>, Option<oneshot::Receiver<()>>); pub struct Server(Sender<ServerCommand>, Option<oneshot::Receiver<()>>);
impl Server { impl Server {
fn new(tx: Sender<ServerCommand>) -> Self { pub fn new(tx: Sender<ServerCommand>) -> Self {
Server(tx, None) Server(tx, None)
} }
@ -87,11 +87,11 @@ impl Server {
ServerBuilder::default() ServerBuilder::default()
} }
fn signal(&self, sig: crate::rt::Signal) { pub fn signal(&self, sig: crate::rt::Signal) {
let _ = self.0.try_send(ServerCommand::Signal(sig)); let _ = self.0.try_send(ServerCommand::Signal(sig));
} }
fn worker_faulted(&self, idx: usize) { pub fn worker_faulted(&self, idx: usize) {
let _ = self.0.try_send(ServerCommand::WorkerFaulted(idx)); let _ = self.0.try_send(ServerCommand::WorkerFaulted(idx));
} }

View file

@ -9,10 +9,10 @@ use crate::{io::Io, time::Millis};
use super::{counter::CounterGuard, socket::Stream, Config, Token}; use super::{counter::CounterGuard, socket::Stream, Config, Token};
/// Server message /// Server message
pub(super) enum ServerMessage { pub enum ServerMessage<T> {
/// New stream /// New content received
Connect(Stream), New(T),
/// Gracefull shutdown in millis /// Graceful shutdown in millis
Shutdown(Millis), Shutdown(Millis),
/// Force shutdown /// Force shutdown
ForceShutdown, ForceShutdown,
@ -24,18 +24,18 @@ pub(super) trait StreamServiceFactory: Send + Clone + 'static {
fn create(&self, _: Config) -> Self::Factory; fn create(&self, _: Config) -> Self::Factory;
} }
pub(super) trait InternalServiceFactory: Send { pub trait InternalServiceFactory<T>: Send {
fn name(&self, token: Token) -> &str; fn name(&self, token: Token) -> &str;
fn set_tag(&mut self, token: Token, tag: &'static str); fn set_tag(&mut self, token: Token, tag: &'static str);
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory<T>>;
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>; fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService<T>)>, ()>>;
} }
pub(super) type BoxedServerService = pub type BoxedServerService<T> =
boxed::BoxService<(Option<CounterGuard>, ServerMessage), (), ()>; boxed::BoxService<(Option<CounterGuard>, ServerMessage<T>), (), ()>;
#[derive(Clone)] #[derive(Clone)]
pub(super) struct StreamService<T> { pub(super) struct StreamService<T> {
@ -56,7 +56,7 @@ impl<T> StreamService<T> {
} }
} }
impl<T> Service<(Option<CounterGuard>, ServerMessage)> for StreamService<T> impl<T> Service<(Option<CounterGuard>, ServerMessage<Stream>)> for StreamService<T>
where where
T: Service<Io>, T: Service<Io>,
{ {
@ -78,11 +78,11 @@ where
async fn call( async fn call(
&self, &self,
(guard, req): (Option<CounterGuard>, ServerMessage), (guard, req): (Option<CounterGuard>, ServerMessage<Stream>),
ctx: ServiceCtx<'_, Self>, ctx: ServiceCtx<'_, Self>,
) -> Result<(), ()> { ) -> Result<(), ()> {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::New(stream) => {
let stream = stream.try_into().map_err(|e| { let stream = stream.try_into().map_err(|e| {
error!("Cannot convert to an async io stream: {}", e); error!("Cannot convert to an async io stream: {}", e);
}); });
@ -121,7 +121,7 @@ where
inner: F, inner: F,
addr: SocketAddr, addr: SocketAddr,
tag: &'static str, tag: &'static str,
) -> Box<dyn InternalServiceFactory> { ) -> Box<dyn InternalServiceFactory<Stream>> {
Box::new(Self { Box::new(Self {
name, name,
token, token,
@ -132,7 +132,7 @@ where
} }
} }
impl<F> InternalServiceFactory for Factory<F> impl<F> InternalServiceFactory<Stream> for Factory<F>
where where
F: StreamServiceFactory, F: StreamServiceFactory,
{ {
@ -144,7 +144,7 @@ where
self.tag = tag; self.tag = tag;
} }
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory<Stream>> {
Box::new(Self { Box::new(Self {
name: self.name.clone(), name: self.name.clone(),
inner: self.inner.clone(), inner: self.inner.clone(),
@ -154,7 +154,7 @@ where
}) })
} }
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> { fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService<Stream>)>, ()>> {
let token = self.token; let token = self.token;
let tag = self.tag; let tag = self.tag;
let cfg = Config::default(); let cfg = Config::default();
@ -173,7 +173,7 @@ where
} }
} }
impl InternalServiceFactory for Box<dyn InternalServiceFactory> { impl<T> InternalServiceFactory<T> for Box<dyn InternalServiceFactory<T>> {
fn name(&self, token: Token) -> &str { fn name(&self, token: Token) -> &str {
self.as_ref().name(token) self.as_ref().name(token)
} }
@ -182,11 +182,11 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
self.as_mut().set_tag(token, tag); self.as_mut().set_tag(token, tag);
} }
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory<T>> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> { fn create(&self) -> BoxFuture<'static, Result<Vec<(Token, BoxedServerService<T>)>, ()>> {
self.as_ref().create() self.as_ref().create()
} }
} }

View file

@ -1,24 +1,25 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; use std::{fmt, future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};
use std::sync::mpsc;
use async_channel::{unbounded, Receiver, Sender}; use async_channel::{unbounded, Receiver, Sender};
use crate::rt::{spawn, Arbiter}; use crate::rt::{spawn, Arbiter};
use crate::service::Pipeline; use crate::service::Pipeline;
use crate::time::{sleep, Millis, Sleep}; use ntex_util::time::Millis;
use crate::time::{sleep, Sleep};
use crate::util::{ use crate::util::{
join_all, ready, select, stream_recv, BoxFuture, Either, Stream as FutStream, join_all, ready, select, stream_recv, BoxFuture, Either, Stream as FutStream,
}; };
use super::accept::{AcceptNotify, Command};
use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
use super::{counter::Counter, socket::Stream, Token}; use super::{counter::Counter, Token};
type ServerStopCommand = Pin<Box<dyn FutStream<Item = StopCommand>>>; type ServerStopCommand = Pin<Box<dyn FutStream<Item = StopCommand>>>;
type ServerWorkerCommand = Pin<Box<dyn FutStream<Item = WorkerCommand>>>; type ServerWorkerCommand<T> = Pin<Box<dyn FutStream<Item = WorkerCommand<T>>>>;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct WorkerCommand(Connection); pub(super) struct WorkerCommand<T>(WorkerMessage<T>);
#[derive(Debug)] #[derive(Debug)]
/// Stop worker message. Returns `true` on successful shutdown /// Stop worker message. Returns `true` on successful shutdown
@ -29,9 +30,9 @@ pub(super) struct StopCommand {
} }
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Connection { pub struct WorkerMessage<T> {
pub(super) io: Stream, pub content: T,
pub(super) token: Token, pub token: Token,
} }
const STOP_TIMEOUT: Millis = Millis::ONE_SEC; const STOP_TIMEOUT: Millis = Millis::ONE_SEC;
@ -56,20 +57,31 @@ thread_local! {
Counter::new(MAX_CONNS.load(Ordering::Relaxed)); Counter::new(MAX_CONNS.load(Ordering::Relaxed));
} }
#[derive(Clone, Debug)] #[derive(Debug)]
pub(super) struct WorkerClient { pub struct WorkerClient<T> {
pub(super) idx: usize, pub idx: usize,
tx1: Sender<WorkerCommand>, tx1: Sender<WorkerCommand<T>>,
tx2: Sender<StopCommand>, tx2: Sender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability<T>,
} }
impl WorkerClient { impl<T> Clone for WorkerClient<T> {
fn clone(&self) -> Self {
WorkerClient {
idx: self.idx,
tx1: self.tx1.clone(),
tx2: self.tx2.clone(),
avail: self.avail.clone(),
}
}
}
impl<T> WorkerClient<T> {
pub(super) fn new( pub(super) fn new(
idx: usize, idx: usize,
tx1: Sender<WorkerCommand>, tx1: Sender<WorkerCommand<T>>,
tx2: Sender<StopCommand>, tx2: Sender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability<T>,
) -> Self { ) -> Self {
WorkerClient { WorkerClient {
idx, idx,
@ -79,45 +91,70 @@ impl WorkerClient {
} }
} }
pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> { pub fn send(&self, msg: WorkerMessage<T>) -> Result<(), WorkerMessage<T>> {
self.tx1 self.tx1
.try_send(WorkerCommand(msg)) .try_send(WorkerCommand(msg))
.map_err(|msg| msg.into_inner().0) .map_err(|msg| msg.into_inner().0)
} }
pub(super) fn available(&self) -> bool { pub fn available(&self) -> bool {
self.avail.available() self.avail.available()
} }
pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (result, rx) = oneshot::channel(); let (result, rx) = oneshot::channel();
let _ = self.tx2.try_send(StopCommand { graceful, result }); let _ = self.tx2.try_send(StopCommand { graceful, result });
rx rx
} }
} }
#[derive(Debug, Clone)] #[derive(Debug)]
pub(super) struct WorkerAvailability { pub struct WorkerAvailability<T> {
notify: AcceptNotify, notify: Box<dyn WorkerManagerNotifier<T>>,
available: Arc<AtomicBool>, available: Arc<AtomicBool>,
} }
impl WorkerAvailability { impl<T> Clone for WorkerAvailability<T> {
pub(super) fn new(notify: AcceptNotify) -> Self { fn clone(&self) -> Self {
WorkerAvailability {
notify: self.notify.clone_box(),
available: self.available.clone(),
}
}
}
#[derive(Debug)]
pub enum WorkerManagerCmd<T> {
Stop(mpsc::Sender<()>),
Pause,
Resume,
Worker(WorkerClient<T>),
Timer,
WorkerAvailable,
}
pub trait WorkerManagerNotifier<T>: Send + Sync + fmt::Debug {
fn send(&self, cmd: WorkerManagerCmd<T>);
fn clone_box(&self) -> Box<dyn WorkerManagerNotifier<T>>;
}
impl<T> WorkerAvailability<T> {
pub fn new(notify: Box<dyn WorkerManagerNotifier<T>>) -> Self {
WorkerAvailability { WorkerAvailability {
notify, notify,
available: Arc::new(AtomicBool::new(false)), available: Arc::new(AtomicBool::new(false)),
} }
} }
pub(super) fn available(&self) -> bool { pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire) self.available.load(Ordering::Acquire)
} }
pub(super) fn set(&self, val: bool) { pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release); let old = self.available.swap(val, Ordering::Release);
if !old && val { if !old && val {
self.notify.send(Command::WorkerAvailable) self.notify.send(WorkerManagerCmd::WorkerAvailable)
} }
} }
} }
@ -126,25 +163,25 @@ impl WorkerAvailability {
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream /// Worker accepts Socket objects via unbounded channel and starts stream
/// processing. /// processing.
pub(super) struct Worker { pub struct Worker<T> {
rx: ServerWorkerCommand, rx: ServerWorkerCommand<T>,
rx2: ServerStopCommand, rx2: ServerStopCommand,
services: Vec<WorkerService>, services: Vec<WorkerService<T>>,
availability: WorkerAvailability, availability: WorkerAvailability<T>,
conns: Counter, conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory<T>>>,
state: WorkerState, state: WorkerState<T>,
shutdown_timeout: Millis, shutdown_timeout: Millis,
} }
struct WorkerService { struct WorkerService<T> {
factory: usize, factory: usize,
status: WorkerServiceStatus, status: WorkerServiceStatus,
service: Pipeline<BoxedServerService>, service: Pipeline<BoxedServerService<T>>,
} }
impl WorkerService { impl<T> WorkerService<T> {
fn created(&mut self, service: BoxedServerService) { fn created(&mut self, service: BoxedServerService<T>) {
self.service = Pipeline::new(service); self.service = Pipeline::new(service);
self.status = WorkerServiceStatus::Unavailable; self.status = WorkerServiceStatus::Unavailable;
} }
@ -160,13 +197,13 @@ enum WorkerServiceStatus {
Stopped, Stopped,
} }
impl Worker { impl<T: 'static + Send> Worker<T> {
pub(super) fn start( pub fn start(
idx: usize, idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory<T>>>,
availability: WorkerAvailability, availability: WorkerAvailability<T>,
shutdown_timeout: Millis, shutdown_timeout: Millis,
) -> WorkerClient { ) -> WorkerClient<T> {
let (tx1, rx1) = unbounded(); let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded(); let (tx2, rx2) = unbounded();
let avail = availability.clone(); let avail = availability.clone();
@ -191,12 +228,12 @@ impl Worker {
} }
async fn create( async fn create(
rx: Receiver<WorkerCommand>, rx: Receiver<WorkerCommand<T>>,
rx2: Receiver<StopCommand>, rx2: Receiver<StopCommand>,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory<T>>>,
availability: WorkerAvailability, availability: WorkerAvailability<T>,
shutdown_timeout: Millis, shutdown_timeout: Millis,
) -> Result<Worker, ()> { ) -> Result<Worker<T>, ()> {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
rx: Box::pin(rx), rx: Box::pin(rx),
@ -329,18 +366,20 @@ impl Worker {
} }
} }
enum WorkerState { enum WorkerState<T> {
Available, Available,
Unavailable, Unavailable,
Restarting( Restarting(
usize, usize,
Token, Token,
BoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>, BoxFuture<'static, Result<Vec<(Token, BoxedServerService<T>)>, ()>>,
), ),
Shutdown(Sleep, Sleep, Option<oneshot::Sender<bool>>), Shutdown(Sleep, Sleep, Option<oneshot::Sender<bool>>),
} }
impl Future for Worker { impl<T> Future for Worker<T>
where T: 'static + Send
{
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -482,19 +521,19 @@ impl Future for Worker {
let next = ready!(Pin::new(&mut self.rx).poll_next(cx)); let next = ready!(Pin::new(&mut self.rx).poll_next(cx));
if let Some(WorkerCommand(msg)) = next { if let Some(WorkerCommand(msg)) = next {
// handle incoming io stream // handle incoming message
let guard = self.conns.get(); let guard = self.conns.get();
let srv = &self.services[msg.token.0]; let srv = &self.services[msg.token.0];
if log::log_enabled!(log::Level::Trace) { if log::log_enabled!(log::Level::Trace) {
trace!( trace!(
"Got socket for service: {:?}", "Got message for service: {:?}",
self.factories[srv.factory].name(msg.token) self.factories[srv.factory].name(msg.token)
); );
} }
let fut = srv let fut = srv
.service .service
.call_static((Some(guard), ServerMessage::Connect(msg.io))); .call_static((Some(guard), ServerMessage::New(msg.content)));
spawn(async move { spawn(async move {
let _ = fut.await; let _ = fut.await;
}); });
@ -513,6 +552,7 @@ mod tests {
use super::*; use super::*;
use crate::io::Io; use crate::io::Io;
use crate::server::accept::AcceptNotify;
use crate::server::service::Factory; use crate::server::service::Factory;
use crate::service::{Service, ServiceCtx, ServiceFactory}; use crate::service::{Service, ServiceCtx, ServiceFactory};
use crate::util::lazy; use crate::util::lazy;
@ -586,7 +626,7 @@ mod tests {
let poll = Arc::new(polling::Poller::new().unwrap()); let poll = Arc::new(polling::Poller::new().unwrap());
let waker = poll.clone(); let waker = poll.clone();
let avail = let avail =
WorkerAvailability::new(AcceptNotify::new(waker.clone(), sync_tx.clone())); WorkerAvailability::new(Box::new(AcceptNotify::new(waker.clone(), sync_tx.clone())));
let st = Arc::new(Mutex::new(St::Pending)); let st = Arc::new(Mutex::new(St::Pending));
let counter = Arc::new(Mutex::new(0)); let counter = Arc::new(Mutex::new(0));
@ -663,7 +703,7 @@ mod tests {
// force shutdown // force shutdown
let (_tx1, rx1) = unbounded(); let (_tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded(); let (tx2, rx2) = unbounded();
let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone())); let avail = WorkerAvailability::new(Box::new(AcceptNotify::new(waker, sync_tx.clone())));
let f = SrvFactory { let f = SrvFactory {
st: st.clone(), st: st.clone(),
counter: counter.clone(), counter: counter.clone(),