From 37e56f242b8ffca7e3449d27e981c1d41d38faef Mon Sep 17 00:00:00 2001 From: Diogo Barbosa Date: Wed, 6 Mar 2024 13:05:08 +0100 Subject: [PATCH] Abstract server/worker implementation to allow for creating non-TCP-based web servers --- ntex/src/server/accept.rs | 70 ++++++++--------- ntex/src/server/builder.rs | 24 +++--- ntex/src/server/config.rs | 15 ++-- ntex/src/server/counter.rs | 2 +- ntex/src/server/mod.rs | 20 ++--- ntex/src/server/service.rs | 38 +++++----- ntex/src/server/worker.rs | 150 +++++++++++++++++++++++-------------- 7 files changed, 178 insertions(+), 141 deletions(-) diff --git a/ntex/src/server/accept.rs b/ntex/src/server/accept.rs index 48c7103c..d5e7e3cb 100644 --- a/ntex/src/server/accept.rs +++ b/ntex/src/server/accept.rs @@ -5,24 +5,14 @@ use polling::{Event, Events, Poller}; use crate::{rt::System, time::sleep, time::Millis, util::Either}; -use super::socket::{Listener, SocketAddr}; -use super::worker::{Connection, WorkerClient}; +use super::socket::{Listener, SocketAddr, Stream}; +use super::worker::{WorkerMessage, WorkerClient, WorkerManagerCmd, WorkerManagerNotifier}; use super::{Server, ServerStatus, Token}; const EXIT_TIMEOUT: Duration = Duration::from_millis(100); const ERR_TIMEOUT: Duration = Duration::from_millis(500); const ERR_SLEEP_TIMEOUT: Millis = Millis(525); -#[derive(Debug)] -pub(super) enum Command { - Stop(mpsc::Sender<()>), - Pause, - Resume, - Worker(WorkerClient), - Timer, - WorkerAvailable, -} - #[derive(Debug)] struct ServerSocketInfo { addr: SocketAddr, @@ -33,22 +23,28 @@ struct ServerSocketInfo { } #[derive(Debug, Clone)] -pub(super) struct AcceptNotify(Arc, mpsc::Sender); +pub(super) struct AcceptNotify(Arc, mpsc::Sender>); impl AcceptNotify { - pub(super) fn new(waker: Arc, tx: mpsc::Sender) -> Self { + pub(super) fn new(waker: Arc, tx: mpsc::Sender>) -> Self { AcceptNotify(waker, tx) } +} - pub(super) fn send(&self, cmd: Command) { +impl WorkerManagerNotifier for AcceptNotify { + fn send(&self, cmd: WorkerManagerCmd) { let _ = self.1.send(cmd); let _ = self.0.notify(); } + + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } } pub(super) struct AcceptLoop { notify: AcceptNotify, - inner: Option<(mpsc::Receiver, Arc, Server)>, + inner: Option<(mpsc::Receiver>, Arc, Server)>, status_handler: Option>, } @@ -57,7 +53,7 @@ impl AcceptLoop { // Create a poller instance let poll = Arc::new( Poller::new() - .map_err(|e| panic!("Cannot create Polller {}", e)) + .map_err(|e| panic!("Cannot create Poller {}", e)) .unwrap(), ); @@ -71,7 +67,7 @@ impl AcceptLoop { } } - pub(super) fn send(&self, msg: Command) { + pub(super) fn send(&self, msg: WorkerManagerCmd) { self.notify.send(msg) } @@ -89,7 +85,7 @@ impl AcceptLoop { pub(super) fn start( &mut self, socks: Vec<(Token, Listener)>, - workers: Vec, + workers: Vec>, ) { let (rx, poll, srv) = self .inner @@ -121,9 +117,9 @@ impl fmt::Debug for AcceptLoop { struct Accept { poller: Arc, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, sockets: Vec, - workers: Vec, + workers: Vec>, srv: Server, notify: AcceptNotify, next: usize, @@ -133,11 +129,11 @@ struct Accept { impl Accept { fn start( - rx: mpsc::Receiver, + rx: mpsc::Receiver>, poller: Arc, socks: Vec<(Token, Listener)>, srv: Server, - workers: Vec, + workers: Vec>, notify: AcceptNotify, status_handler: Option>, ) { @@ -153,10 +149,10 @@ impl Accept { } fn new( - rx: mpsc::Receiver, + rx: mpsc::Receiver>, poller: Arc, socks: Vec<(Token, Listener)>, - workers: Vec, + workers: Vec>, srv: Server, notify: AcceptNotify, status_handler: Option>, @@ -261,7 +257,7 @@ impl Accept { let notify = self.notify.clone(); System::current().arbiter().spawn(Box::pin(async move { sleep(ERR_SLEEP_TIMEOUT).await; - notify.send(Command::Timer); + notify.send(WorkerManagerCmd::Timer); })); } else { info.registered.set(true); @@ -304,7 +300,7 @@ impl Accept { loop { match self.rx.try_recv() { Ok(cmd) => match cmd { - Command::Stop(rx) => { + WorkerManagerCmd::Stop(rx) => { log::trace!("Stopping accept loop"); for (key, info) in self.sockets.iter().enumerate() { log::info!("Stopping socket listener on {}", info.addr); @@ -313,7 +309,7 @@ impl Accept { self.update_status(ServerStatus::NotReady); break Either::Right(Some(rx)); } - Command::Pause => { + WorkerManagerCmd::Pause => { log::trace!("Pausing accept loop"); for (key, info) in self.sockets.iter().enumerate() { log::info!("Stopping socket listener on {}", info.addr); @@ -321,7 +317,7 @@ impl Accept { } self.update_status(ServerStatus::NotReady); } - Command::Resume => { + WorkerManagerCmd::Resume => { log::trace!("Resuming accept loop"); for (key, info) in self.sockets.iter().enumerate() { log::info!("Resuming socket listener on {}", info.addr); @@ -329,15 +325,15 @@ impl Accept { } self.update_status(ServerStatus::Ready); } - Command::Worker(worker) => { + WorkerManagerCmd::Worker(worker) => { log::trace!("Adding new worker to accept loop"); self.backpressure(false); self.workers.push(worker); } - Command::Timer => { + WorkerManagerCmd::Timer => { self.process_timer(); } - Command::WorkerAvailable => { + WorkerManagerCmd::WorkerAvailable => { log::trace!("Worker is available"); self.backpressure(false); } @@ -393,10 +389,10 @@ impl Accept { } } - fn accept_one(&mut self, mut msg: Connection) { + fn accept_one(&mut self, mut msg: WorkerMessage) { log::trace!( "Accepting connection: {:?} bp: {}", - msg.io, + msg.content, self.backpressure ); @@ -463,8 +459,8 @@ impl Accept { loop { let msg = if let Some(info) = self.sockets.get_mut(token) { match info.sock.accept() { - Ok(Some(io)) => Connection { - io, + Ok(Some(io)) => WorkerMessage { + content: io, token: info.token, }, Ok(None) => return true, @@ -479,7 +475,7 @@ impl Accept { let notify = self.notify.clone(); System::current().arbiter().spawn(Box::pin(async move { sleep(ERR_SLEEP_TIMEOUT).await; - notify.send(Command::Timer); + notify.send(WorkerManagerCmd::Timer); })); return false; } diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index f93e310a..d1d51c37 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -8,13 +8,13 @@ use crate::rt::{spawn, Signal, System}; use crate::time::{sleep, Millis}; use crate::{io::Io, service::ServiceFactory, util::join_all, util::Stream}; -use super::accept::{AcceptLoop, AcceptNotify, Command}; +use super::accept::{AcceptLoop, AcceptNotify}; use super::config::{ Config, ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime, }; use super::service::{Factory, InternalServiceFactory}; -use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; -use super::{socket::Listener, Server, ServerCommand, ServerStatus, Token}; +use super::worker::{self, WorkerManagerCmd, Worker, WorkerAvailability, WorkerClient}; +use super::{socket::Listener, Server, ServerCommand, ServerStatus, Token, socket}; const STOP_DELAY: Millis = Millis(300); @@ -25,8 +25,8 @@ pub struct ServerBuilder { threads: usize, token: Token, backlog: i32, - workers: Vec<(usize, WorkerClient)>, - services: Vec>, + workers: Vec<(usize, WorkerClient)>, + services: Vec>>, sockets: Vec<(Token, String, Listener)>, accept: AcceptLoop, exit: bool, @@ -381,9 +381,9 @@ impl ServerBuilder { } } - fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { - let avail = WorkerAvailability::new(notify); - let services: Vec> = + fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { + let avail = WorkerAvailability::new(Box::new(notify)); + let services: Vec>> = self.services.iter().map(|v| v.clone_factory()).collect(); Worker::start(idx, services, avail, self.shutdown_timeout) @@ -392,11 +392,11 @@ impl ServerBuilder { fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { - self.accept.send(Command::Pause); + self.accept.send(WorkerManagerCmd::Pause); let _ = tx.send(()); } ServerCommand::Resume(tx) => { - self.accept.send(Command::Resume); + self.accept.send(WorkerManagerCmd::Resume); let _ = tx.send(()); } ServerCommand::Signal(sig) => { @@ -441,7 +441,7 @@ impl ServerBuilder { // stop accept thread let (tx, rx) = std::sync::mpsc::channel(); - self.accept.send(Command::Stop(tx)); + self.accept.send(WorkerManagerCmd::Stop(tx)); let _ = rx.recv(); let notify = std::mem::take(&mut self.notify); @@ -513,7 +513,7 @@ impl ServerBuilder { let worker = self.start_worker(new_idx, self.accept.notify()); self.workers.push((new_idx, worker.clone())); - self.accept.send(Command::Worker(worker)); + self.accept.send(WorkerManagerCmd::Worker(worker)); } } } diff --git a/ntex/src/server/config.rs b/ntex/src/server/config.rs index c68f4fb6..f6b22f41 100644 --- a/ntex/src/server/config.rs +++ b/ntex/src/server/config.rs @@ -3,6 +3,7 @@ use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, use log::error; use crate::io::Io; +use crate::server::socket::Stream; use crate::service::{self, boxed, ServiceFactory as NServiceFactory}; use crate::util::{BoxFuture, HashMap, PoolId, Ready}; @@ -162,7 +163,7 @@ impl ConfiguredService { } } -impl InternalServiceFactory for ConfiguredService { +impl InternalServiceFactory for ConfiguredService { fn name(&self, token: Token) -> &str { &self.names[&token].0 } @@ -175,7 +176,7 @@ impl InternalServiceFactory for ConfiguredService { } } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box> { Box::new(Self { rt: self.rt.clone(), names: self.names.clone(), @@ -184,7 +185,7 @@ impl InternalServiceFactory for ConfiguredService { }) } - fn create(&self) -> BoxFuture<'static, Result, ()>> { + fn create(&self) -> BoxFuture<'static, Result)>, ()>> { // configure services let rt = ServiceRuntime::new(self.topics.clone()); let cfg_fut = self.rt.configure(ServiceRuntime(rt.0.clone())); @@ -374,7 +375,7 @@ impl ServiceRuntime { type BoxServiceFactory = service::boxed::BoxServiceFactory< (), - (Option, ServerMessage), + (Option, ServerMessage), (), (), (), @@ -386,7 +387,7 @@ struct ServiceFactory { pool: PoolId, } -impl service::ServiceFactory<(Option, ServerMessage)> for ServiceFactory +impl service::ServiceFactory<(Option, ServerMessage)> for ServiceFactory where T: service::ServiceFactory, T::Service: 'static, @@ -396,9 +397,9 @@ where type Response = (); type Error = (); type InitError = (); - type Service = BoxedServerService; + type Service = BoxedServerService; - async fn create(&self, _: ()) -> Result { + async fn create(&self, _: ()) -> Result, ()> { let tag = self.tag; let pool = self.pool; diff --git a/ntex/src/server/counter.rs b/ntex/src/server/counter.rs index 3b3e606c..372f740e 100644 --- a/ntex/src/server/counter.rs +++ b/ntex/src/server/counter.rs @@ -44,7 +44,7 @@ impl Counter { } } -pub(super) struct CounterGuard(Rc); +pub struct CounterGuard(Rc); impl CounterGuard { fn new(inner: Rc) -> Self { diff --git a/ntex/src/server/mod.rs b/ntex/src/server/mod.rs index 62fb3016..39ea2ac3 100644 --- a/ntex/src/server/mod.rs +++ b/ntex/src/server/mod.rs @@ -6,11 +6,11 @@ use async_channel::Sender; mod accept; mod builder; mod config; -mod counter; -mod service; +pub mod counter; +pub mod service; mod socket; mod test; -mod worker; +pub mod worker; #[cfg(feature = "openssl")] pub use ntex_tls::openssl; @@ -36,10 +36,10 @@ pub enum ServerStatus { /// Socket id token #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -struct Token(usize); +pub struct Token(pub usize); impl Token { - pub(self) fn next(&mut self) -> Token { + pub fn next(&mut self) -> Token { let token = Token(self.0); self.0 += 1; token @@ -51,7 +51,7 @@ pub fn build() -> ServerBuilder { ServerBuilder::default() } -/// Ssl error combinded with service error. +/// Ssl error combined with service error. #[derive(Debug)] pub enum SslError { Ssl(Box), @@ -59,7 +59,7 @@ pub enum SslError { } #[derive(Debug)] -enum ServerCommand { +pub enum ServerCommand { WorkerFaulted(usize), Pause(oneshot::Sender<()>), Resume(oneshot::Sender<()>), @@ -78,7 +78,7 @@ enum ServerCommand { pub struct Server(Sender, Option>); impl Server { - fn new(tx: Sender) -> Self { + pub fn new(tx: Sender) -> Self { Server(tx, None) } @@ -87,11 +87,11 @@ impl Server { ServerBuilder::default() } - fn signal(&self, sig: crate::rt::Signal) { + pub fn signal(&self, sig: crate::rt::Signal) { let _ = self.0.try_send(ServerCommand::Signal(sig)); } - fn worker_faulted(&self, idx: usize) { + pub fn worker_faulted(&self, idx: usize) { let _ = self.0.try_send(ServerCommand::WorkerFaulted(idx)); } diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index 882b4993..1645294a 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -9,10 +9,10 @@ use crate::{io::Io, time::Millis}; use super::{counter::CounterGuard, socket::Stream, Config, Token}; /// Server message -pub(super) enum ServerMessage { - /// New stream - Connect(Stream), - /// Gracefull shutdown in millis +pub enum ServerMessage { + /// New content received + New(T), + /// Graceful shutdown in millis Shutdown(Millis), /// Force shutdown ForceShutdown, @@ -24,18 +24,18 @@ pub(super) trait StreamServiceFactory: Send + Clone + 'static { fn create(&self, _: Config) -> Self::Factory; } -pub(super) trait InternalServiceFactory: Send { +pub trait InternalServiceFactory: Send { fn name(&self, token: Token) -> &str; fn set_tag(&mut self, token: Token, tag: &'static str); - fn clone_factory(&self) -> Box; + fn clone_factory(&self) -> Box>; - fn create(&self) -> BoxFuture<'static, Result, ()>>; + fn create(&self) -> BoxFuture<'static, Result)>, ()>>; } -pub(super) type BoxedServerService = - boxed::BoxService<(Option, ServerMessage), (), ()>; +pub type BoxedServerService = + boxed::BoxService<(Option, ServerMessage), (), ()>; #[derive(Clone)] pub(super) struct StreamService { @@ -56,7 +56,7 @@ impl StreamService { } } -impl Service<(Option, ServerMessage)> for StreamService +impl Service<(Option, ServerMessage)> for StreamService where T: Service, { @@ -78,11 +78,11 @@ where async fn call( &self, - (guard, req): (Option, ServerMessage), + (guard, req): (Option, ServerMessage), ctx: ServiceCtx<'_, Self>, ) -> Result<(), ()> { match req { - ServerMessage::Connect(stream) => { + ServerMessage::New(stream) => { let stream = stream.try_into().map_err(|e| { error!("Cannot convert to an async io stream: {}", e); }); @@ -121,7 +121,7 @@ where inner: F, addr: SocketAddr, tag: &'static str, - ) -> Box { + ) -> Box> { Box::new(Self { name, token, @@ -132,7 +132,7 @@ where } } -impl InternalServiceFactory for Factory +impl InternalServiceFactory for Factory where F: StreamServiceFactory, { @@ -144,7 +144,7 @@ where self.tag = tag; } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box> { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), @@ -154,7 +154,7 @@ where }) } - fn create(&self) -> BoxFuture<'static, Result, ()>> { + fn create(&self) -> BoxFuture<'static, Result)>, ()>> { let token = self.token; let tag = self.tag; let cfg = Config::default(); @@ -173,7 +173,7 @@ where } } -impl InternalServiceFactory for Box { +impl InternalServiceFactory for Box> { fn name(&self, token: Token) -> &str { self.as_ref().name(token) } @@ -182,11 +182,11 @@ impl InternalServiceFactory for Box { self.as_mut().set_tag(token, tag); } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box> { self.as_ref().clone_factory() } - fn create(&self) -> BoxFuture<'static, Result, ()>> { + fn create(&self) -> BoxFuture<'static, Result)>, ()>> { self.as_ref().create() } } diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index 64476f08..588d7172 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -1,24 +1,25 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; +use std::{fmt, future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; +use std::sync::mpsc; use async_channel::{unbounded, Receiver, Sender}; use crate::rt::{spawn, Arbiter}; use crate::service::Pipeline; -use crate::time::{sleep, Millis, Sleep}; +use ntex_util::time::Millis; +use crate::time::{sleep, Sleep}; use crate::util::{ join_all, ready, select, stream_recv, BoxFuture, Either, Stream as FutStream, }; -use super::accept::{AcceptNotify, Command}; use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; -use super::{counter::Counter, socket::Stream, Token}; +use super::{counter::Counter, Token}; type ServerStopCommand = Pin>>; -type ServerWorkerCommand = Pin>>; +type ServerWorkerCommand = Pin>>>; #[derive(Debug)] -pub(super) struct WorkerCommand(Connection); +pub(super) struct WorkerCommand(WorkerMessage); #[derive(Debug)] /// Stop worker message. Returns `true` on successful shutdown @@ -29,9 +30,9 @@ pub(super) struct StopCommand { } #[derive(Debug)] -pub(super) struct Connection { - pub(super) io: Stream, - pub(super) token: Token, +pub struct WorkerMessage { + pub content: T, + pub token: Token, } const STOP_TIMEOUT: Millis = Millis::ONE_SEC; @@ -56,20 +57,31 @@ thread_local! { Counter::new(MAX_CONNS.load(Ordering::Relaxed)); } -#[derive(Clone, Debug)] -pub(super) struct WorkerClient { - pub(super) idx: usize, - tx1: Sender, +#[derive(Debug)] +pub struct WorkerClient { + pub idx: usize, + tx1: Sender>, tx2: Sender, - avail: WorkerAvailability, + avail: WorkerAvailability, } -impl WorkerClient { +impl Clone for WorkerClient { + fn clone(&self) -> Self { + WorkerClient { + idx: self.idx, + tx1: self.tx1.clone(), + tx2: self.tx2.clone(), + avail: self.avail.clone(), + } + } +} + +impl WorkerClient { pub(super) fn new( idx: usize, - tx1: Sender, + tx1: Sender>, tx2: Sender, - avail: WorkerAvailability, + avail: WorkerAvailability, ) -> Self { WorkerClient { idx, @@ -79,45 +91,70 @@ impl WorkerClient { } } - pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> { + pub fn send(&self, msg: WorkerMessage) -> Result<(), WorkerMessage> { self.tx1 .try_send(WorkerCommand(msg)) .map_err(|msg| msg.into_inner().0) } - pub(super) fn available(&self) -> bool { + pub fn available(&self) -> bool { self.avail.available() } - pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver { + pub fn stop(&self, graceful: bool) -> oneshot::Receiver { let (result, rx) = oneshot::channel(); let _ = self.tx2.try_send(StopCommand { graceful, result }); rx } } -#[derive(Debug, Clone)] -pub(super) struct WorkerAvailability { - notify: AcceptNotify, +#[derive(Debug)] +pub struct WorkerAvailability { + notify: Box>, available: Arc, } -impl WorkerAvailability { - pub(super) fn new(notify: AcceptNotify) -> Self { +impl Clone for WorkerAvailability { + fn clone(&self) -> Self { + WorkerAvailability { + notify: self.notify.clone_box(), + available: self.available.clone(), + } + } +} + +#[derive(Debug)] +pub enum WorkerManagerCmd { + Stop(mpsc::Sender<()>), + Pause, + Resume, + Worker(WorkerClient), + Timer, + WorkerAvailable, +} + +pub trait WorkerManagerNotifier: Send + Sync + fmt::Debug { + fn send(&self, cmd: WorkerManagerCmd); + + fn clone_box(&self) -> Box>; +} + +impl WorkerAvailability { + pub fn new(notify: Box>) -> Self { WorkerAvailability { notify, available: Arc::new(AtomicBool::new(false)), } } - pub(super) fn available(&self) -> bool { + pub fn available(&self) -> bool { self.available.load(Ordering::Acquire) } - pub(super) fn set(&self, val: bool) { + pub fn set(&self, val: bool) { let old = self.available.swap(val, Ordering::Release); if !old && val { - self.notify.send(Command::WorkerAvailable) + self.notify.send(WorkerManagerCmd::WorkerAvailable) } } } @@ -126,25 +163,25 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream /// processing. -pub(super) struct Worker { - rx: ServerWorkerCommand, +pub struct Worker { + rx: ServerWorkerCommand, rx2: ServerStopCommand, - services: Vec, - availability: WorkerAvailability, + services: Vec>, + availability: WorkerAvailability, conns: Counter, - factories: Vec>, - state: WorkerState, + factories: Vec>>, + state: WorkerState, shutdown_timeout: Millis, } -struct WorkerService { +struct WorkerService { factory: usize, status: WorkerServiceStatus, - service: Pipeline, + service: Pipeline>, } -impl WorkerService { - fn created(&mut self, service: BoxedServerService) { +impl WorkerService { + fn created(&mut self, service: BoxedServerService) { self.service = Pipeline::new(service); self.status = WorkerServiceStatus::Unavailable; } @@ -160,13 +197,13 @@ enum WorkerServiceStatus { Stopped, } -impl Worker { - pub(super) fn start( +impl Worker { + pub fn start( idx: usize, - factories: Vec>, - availability: WorkerAvailability, + factories: Vec>>, + availability: WorkerAvailability, shutdown_timeout: Millis, - ) -> WorkerClient { + ) -> WorkerClient { let (tx1, rx1) = unbounded(); let (tx2, rx2) = unbounded(); let avail = availability.clone(); @@ -191,12 +228,12 @@ impl Worker { } async fn create( - rx: Receiver, + rx: Receiver>, rx2: Receiver, - factories: Vec>, - availability: WorkerAvailability, + factories: Vec>>, + availability: WorkerAvailability, shutdown_timeout: Millis, - ) -> Result { + ) -> Result, ()> { availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { rx: Box::pin(rx), @@ -329,18 +366,20 @@ impl Worker { } } -enum WorkerState { +enum WorkerState { Available, Unavailable, Restarting( usize, Token, - BoxFuture<'static, Result, ()>>, + BoxFuture<'static, Result)>, ()>>, ), Shutdown(Sleep, Sleep, Option>), } -impl Future for Worker { +impl Future for Worker +where T: 'static + Send +{ type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -482,19 +521,19 @@ impl Future for Worker { let next = ready!(Pin::new(&mut self.rx).poll_next(cx)); if let Some(WorkerCommand(msg)) = next { - // handle incoming io stream + // handle incoming message let guard = self.conns.get(); let srv = &self.services[msg.token.0]; if log::log_enabled!(log::Level::Trace) { trace!( - "Got socket for service: {:?}", + "Got message for service: {:?}", self.factories[srv.factory].name(msg.token) ); } let fut = srv .service - .call_static((Some(guard), ServerMessage::Connect(msg.io))); + .call_static((Some(guard), ServerMessage::New(msg.content))); spawn(async move { let _ = fut.await; }); @@ -513,6 +552,7 @@ mod tests { use super::*; use crate::io::Io; + use crate::server::accept::AcceptNotify; use crate::server::service::Factory; use crate::service::{Service, ServiceCtx, ServiceFactory}; use crate::util::lazy; @@ -586,7 +626,7 @@ mod tests { let poll = Arc::new(polling::Poller::new().unwrap()); let waker = poll.clone(); let avail = - WorkerAvailability::new(AcceptNotify::new(waker.clone(), sync_tx.clone())); + WorkerAvailability::new(Box::new(AcceptNotify::new(waker.clone(), sync_tx.clone()))); let st = Arc::new(Mutex::new(St::Pending)); let counter = Arc::new(Mutex::new(0)); @@ -663,7 +703,7 @@ mod tests { // force shutdown let (_tx1, rx1) = unbounded(); let (tx2, rx2) = unbounded(); - let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone())); + let avail = WorkerAvailability::new(Box::new(AcceptNotify::new(waker, sync_tx.clone()))); let f = SrvFactory { st: st.clone(), counter: counter.clone(),