diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index 53b783e5..c87e1c0b 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -7,8 +7,9 @@ use crate::service::{boxed, Service, ServiceCtx, ServiceFactory}; use crate::util::{HashMap, Pool, PoolRef}; use super::accept::{AcceptNotify, AcceptorCommand}; +use super::counter::Counter; use super::factory::{FactoryServiceType, NetService, OnWorkerStart}; -use super::{socket::Connection, Token}; +use super::{socket::Connection, Token, MAX_CONNS_COUNTER}; pub type ServerMessage = WorkerMessage; @@ -121,13 +122,20 @@ impl ServiceFactory for StreamService { } } - Ok(StreamServiceImpl { tokens, services }) + let conns = MAX_CONNS_COUNTER.with(|conns| conns.priv_clone()); + + Ok(StreamServiceImpl { + tokens, + services, + conns, + }) } } pub struct StreamServiceImpl { tokens: HashMap, services: Vec, + conns: Counter, } impl Service for StreamServiceImpl { @@ -135,7 +143,7 @@ impl Service for StreamServiceImpl { type Error = (); fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let mut ready = true; + let mut ready = self.conns.available(cx); for (idx, tag, pool, _) in self.tokens.values() { match self.services[*idx].poll_ready(cx) { Poll::Pending => ready = false, @@ -163,6 +171,10 @@ impl Service for StreamServiceImpl { } } if ready { + log::info!( + "Worker service shutdown, {} connections", + super::num_connections() + ); Poll::Ready(()) } else { Poll::Pending @@ -179,7 +191,9 @@ impl Service for StreamServiceImpl { stream.set_tag(tag); stream.set_memory_pool(*pool); + let guard = self.conns.get(); let _ = ctx.call(&self.services[*idx], stream).await; + drop(guard); Ok(()) } else { log::error!("Cannot get handler service for connection: {:?}", con);