This commit is contained in:
Nikolay Kim 2025-03-27 20:24:04 +01:00
parent 2c226f4cf4
commit 8f81f129ac
6 changed files with 11 additions and 17 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.5.9] - 2025-03-27
* Handle closed sockets
## [2.5.8] - 2025-03-25 ## [2.5.8] - 2025-03-25
* Update neon runtime * Update neon runtime

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-net" name = "ntex-net"
version = "2.5.8" version = "2.5.9"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntexwork utils for ntex framework" description = "ntexwork utils for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -70,7 +70,6 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> { pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> {
let fd = io.as_raw_fd(); let fd = io.as_raw_fd();
let tag = context.tag();
let stream = self.0.with(move |streams| { let stream = self.0.with(move |streams| {
let item = StreamItem { let item = StreamItem {
fd, fd,

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.7.2] - 2025-03-27
* Handle paused state
## [2.7.1] - 2025-02-28 ## [2.7.1] - 2025-02-28
* Fix set core affinity out of worker start #508 * Fix set core affinity out of worker start #508

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-server" name = "ntex-server"
version = "2.7.1" version = "2.7.2"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Server for ntex framework" description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -24,7 +24,6 @@ ntex-util = "2.8"
async-channel = "2" async-channel = "2"
async-broadcast = "0.7" async-broadcast = "0.7"
atomic-waker = "1.1"
core_affinity = "0.8" core_affinity = "0.8"
polling = "3.3" polling = "3.3"
log = "0.4" log = "0.4"

View file

@ -4,7 +4,6 @@ use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc};
use async_broadcast::{self as bus, broadcast}; use async_broadcast::{self as bus, broadcast};
use async_channel::{unbounded, Receiver, Sender}; use async_channel::{unbounded, Receiver, Sender};
use atomic_waker::AtomicWaker;
use core_affinity::CoreId; use core_affinity::CoreId;
use ntex_rt::{spawn, Arbiter}; use ntex_rt::{spawn, Arbiter};
@ -40,7 +39,6 @@ pub struct Worker<T> {
id: WorkerId, id: WorkerId,
tx1: Sender<T>, tx1: Sender<T>,
tx2: Sender<Shutdown>, tx2: Sender<Shutdown>,
waker: Arc<AtomicWaker>,
avail: WorkerAvailability, avail: WorkerAvailability,
failed: Arc<AtomicBool>, failed: Arc<AtomicBool>,
} }
@ -87,8 +85,6 @@ impl<T> Worker<T> {
{ {
let (tx1, rx1) = unbounded(); let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded(); let (tx2, rx2) = unbounded();
let waker = Arc::new(AtomicWaker::new());
let waker2 = waker.clone();
let (avail, avail_tx) = WorkerAvailability::create(); let (avail, avail_tx) = WorkerAvailability::create();
Arbiter::default().exec_fn(move || { Arbiter::default().exec_fn(move || {
@ -104,7 +100,7 @@ impl<T> Worker<T> {
log::debug!("Creating server instance in {:?}", id); log::debug!("Creating server instance in {:?}", id);
let factory = cfg.create().await; let factory = cfg.create().await;
match create(id, rx1, waker2, rx2, factory, avail_tx).await { match create(id, rx1, rx2, factory, avail_tx).await {
Ok((svc, wrk)) => { Ok((svc, wrk)) => {
log::debug!("Server instance has been created in {:?}", id); log::debug!("Server instance has been created in {:?}", id);
run_worker(svc, wrk).await; run_worker(svc, wrk).await;
@ -121,7 +117,6 @@ impl<T> Worker<T> {
id, id,
tx1, tx1,
tx2, tx2,
waker,
avail, avail,
failed: Arc::new(AtomicBool::new(false)), failed: Arc::new(AtomicBool::new(false)),
} }
@ -137,7 +132,6 @@ impl<T> Worker<T> {
/// Returns `Ok` if message got accepted by the worker. /// Returns `Ok` if message got accepted by the worker.
/// Otherwise return message back as `Err` /// Otherwise return message back as `Err`
pub fn send(&self, msg: T) -> Result<(), T> { pub fn send(&self, msg: T) -> Result<(), T> {
self.waker.wake();
self.tx1.try_send(msg).map_err(|msg| msg.into_inner()) self.tx1.try_send(msg).map_err(|msg| msg.into_inner())
} }
@ -183,7 +177,6 @@ impl<T> Clone for Worker<T> {
id: self.id, id: self.id,
tx1: self.tx1.clone(), tx1: self.tx1.clone(),
tx2: self.tx2.clone(), tx2: self.tx2.clone(),
waker: self.waker.clone(),
avail: self.avail.clone(), avail: self.avail.clone(),
failed: self.failed.clone(), failed: self.failed.clone(),
} }
@ -251,7 +244,6 @@ struct WorkerSt<T, F: ServiceFactory<T>> {
rx: Receiver<T>, rx: Receiver<T>,
stop: Pin<Box<dyn Stream<Item = Shutdown>>>, stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
factory: F, factory: F,
waker: Arc<AtomicWaker>,
availability: WorkerAvailabilityTx, availability: WorkerAvailabilityTx,
} }
@ -263,8 +255,6 @@ where
loop { loop {
let mut recv = std::pin::pin!(wrk.rx.recv()); let mut recv = std::pin::pin!(wrk.rx.recv());
let fut = poll_fn(|cx| { let fut = poll_fn(|cx| {
wrk.waker.register(cx.waker());
match svc.poll_ready(cx) { match svc.poll_ready(cx) {
Poll::Ready(res) => { Poll::Ready(res) => {
res?; res?;
@ -352,7 +342,6 @@ async fn stop_svc<T, F>(
async fn create<T, F>( async fn create<T, F>(
id: WorkerId, id: WorkerId,
rx: Receiver<T>, rx: Receiver<T>,
waker: Arc<AtomicWaker>,
stop: Receiver<Shutdown>, stop: Receiver<Shutdown>,
factory: Result<F, ()>, factory: Result<F, ()>,
availability: WorkerAvailabilityTx, availability: WorkerAvailabilityTx,
@ -382,7 +371,6 @@ where
WorkerSt { WorkerSt {
id, id,
rx, rx,
waker,
factory, factory,
availability, availability,
stop: Box::pin(stop), stop: Box::pin(stop),