This commit is contained in:
Nikolay Kim 2024-03-22 16:11:13 +01:00
parent 9d2c3bf3c4
commit 8cc9a362d7

View file

@ -7,8 +7,9 @@ use crate::service::{boxed, Service, ServiceCtx, ServiceFactory};
use crate::util::{HashMap, Pool, PoolRef};
use super::accept::{AcceptNotify, AcceptorCommand};
use super::counter::Counter;
use super::factory::{FactoryServiceType, NetService, OnWorkerStart};
use super::{socket::Connection, Token};
use super::{socket::Connection, Token, MAX_CONNS_COUNTER};
pub type ServerMessage = WorkerMessage<Connection>;
@ -121,13 +122,20 @@ impl ServiceFactory<ServerMessage> for StreamService {
}
}
Ok(StreamServiceImpl { tokens, services })
let conns = MAX_CONNS_COUNTER.with(|conns| conns.priv_clone());
Ok(StreamServiceImpl {
tokens,
services,
conns,
})
}
}
pub struct StreamServiceImpl {
tokens: HashMap<Token, (usize, &'static str, Pool, PoolRef)>,
services: Vec<BoxService>,
conns: Counter,
}
impl Service<ServerMessage> for StreamServiceImpl {
@ -135,7 +143,7 @@ impl Service<ServerMessage> for StreamServiceImpl {
type Error = ();
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut ready = true;
let mut ready = self.conns.available(cx);
for (idx, tag, pool, _) in self.tokens.values() {
match self.services[*idx].poll_ready(cx) {
Poll::Pending => ready = false,
@ -163,6 +171,10 @@ impl Service<ServerMessage> for StreamServiceImpl {
}
}
if ready {
log::info!(
"Worker service shutdown, {} connections",
super::num_connections()
);
Poll::Ready(())
} else {
Poll::Pending
@ -179,7 +191,9 @@ impl Service<ServerMessage> for StreamServiceImpl {
stream.set_tag(tag);
stream.set_memory_pool(*pool);
let guard = self.conns.get();
let _ = ctx.call(&self.services[*idx], stream).await;
drop(guard);
Ok(())
} else {
log::error!("Cannot get handler service for connection: {:?}", con);