Cpu affinity support for workers

This commit is contained in:
Nikolay Kim 2025-01-31 19:39:09 +05:00
parent 282e3224cd
commit fc43822da2
5 changed files with 41 additions and 3 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [2.7.0] - 2025-01-31
* Cpu affinity support for workers
## [2.6.2] - 2024-12-30
* Fix error log

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-server"
version = "2.6.2"
version = "2.7.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -24,6 +24,7 @@ ntex-util = "2.8"
async-channel = "2"
async-broadcast = "0.7"
core_affinity = "0.8"
polling = "3.3"
log = "0.4"
socket2 = "0.5"

View file

@ -2,6 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::{cell::Cell, cell::RefCell, collections::VecDeque, rc::Rc, sync::Arc};
use async_channel::{unbounded, Receiver, Sender};
use core_affinity::CoreId;
use ntex_rt::System;
use ntex_util::future::join_all;
use ntex_util::time::{sleep, timeout, Millis};
@ -69,9 +70,16 @@ impl<F: ServerConfiguration> ServerManager<F> {
// handle cmd
let _ = ntex_rt::spawn(handle_cmd(mgr.clone(), rx));
// Retrieve the IDs of all active CPU cores.
let mut cores = if cfg.affinity {
core_affinity::get_core_ids().unwrap_or_default()
} else {
Vec::new()
};
// start workers
for _ in 0..mgr.0.cfg.num {
start_worker(mgr.clone());
start_worker(mgr.clone(), cores.pop());
}
let srv = Server::new(tx, shared);
@ -128,9 +136,16 @@ impl<F: ServerConfiguration> ServerManager<F> {
}
}
fn start_worker<F: ServerConfiguration>(mgr: ServerManager<F>) {
fn start_worker<F: ServerConfiguration>(mgr: ServerManager<F>, cid: Option<CoreId>) {
let _ = ntex_rt::spawn(async move {
let id = mgr.next_id();
if let Some(cid) = cid {
if core_affinity::set_for_current(cid) {
log::info!("Set affinity to {:?} for worker {:?}", cid, id);
}
}
let mut wrk = Worker::start(id, mgr.factory());
loop {

View file

@ -110,6 +110,14 @@ impl ServerBuilder {
self
}
/// Enable core affinity
///
/// By default affinity is disabled.
pub fn enable_affinity(mut self) -> Self {
self.pool = self.pool.enable_affinity();
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish

View file

@ -11,6 +11,7 @@ pub struct WorkerPool {
pub(crate) no_signals: bool,
pub(crate) stop_runtime: bool,
pub(crate) shutdown_timeout: Millis,
pub(crate) affinity: bool,
}
impl Default for WorkerPool {
@ -28,6 +29,7 @@ impl WorkerPool {
no_signals: false,
stop_runtime: false,
shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT,
affinity: false,
}
}
@ -68,6 +70,14 @@ impl WorkerPool {
self
}
/// Enable core affinity
///
/// By default affinity is disabled.
pub fn enable_affinity(mut self) -> Self {
self.affinity = true;
self
}
/// Starts processing incoming items and return server controller.
pub fn run<F: ServerConfiguration>(self, factory: F) -> Server<F::Item> {
crate::manager::ServerManager::start(self, factory)