From 8f81f129acbca10917c282d1eeca7a0688775284 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 27 Mar 2025 20:24:04 +0100 Subject: [PATCH] wip --- ntex-net/CHANGES.md | 4 ++++ ntex-net/Cargo.toml | 2 +- ntex-net/src/rt_polling/driver.rs | 1 - ntex-server/CHANGES.md | 4 ++++ ntex-server/Cargo.toml | 3 +-- ntex-server/src/wrk.rs | 14 +------------- 6 files changed, 11 insertions(+), 17 deletions(-) diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index 800ff027..a16145fc 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.9] - 2025-03-27 + +* Handle closed sockets + ## [2.5.8] - 2025-03-25 * Update neon runtime diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 71791e06..8e7335c6 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-net" -version = "2.5.8" +version = "2.5.9" authors = ["ntex contributors "] description = "ntexwork utils for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index ae3ffc47..c5937b5a 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -70,7 +70,6 @@ impl StreamOps { pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { let fd = io.as_raw_fd(); - let tag = context.tag(); let stream = self.0.with(move |streams| { let item = StreamItem { fd, diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index 7f1d8302..0d8dabc5 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.7.2] - 2025-03-27 + +* Handle paused state + ## [2.7.1] - 2025-02-28 * Fix set core affinity out of worker start #508 diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index 040e7381..bca5f8b2 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.7.1" +version = "2.7.2" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -24,7 +24,6 @@ ntex-util = "2.8" async-channel = "2" async-broadcast = "0.7" -atomic-waker = "1.1" core_affinity = "0.8" polling = "3.3" log = "0.4" diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index 4ede204f..b9092d0f 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -4,7 +4,6 @@ use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc}; use async_broadcast::{self as bus, broadcast}; use async_channel::{unbounded, Receiver, Sender}; -use atomic_waker::AtomicWaker; use core_affinity::CoreId; use ntex_rt::{spawn, Arbiter}; @@ -40,7 +39,6 @@ pub struct Worker { id: WorkerId, tx1: Sender, tx2: Sender, - waker: Arc, avail: WorkerAvailability, failed: Arc, } @@ -87,8 +85,6 @@ impl Worker { { let (tx1, rx1) = unbounded(); let (tx2, rx2) = unbounded(); - let waker = Arc::new(AtomicWaker::new()); - let waker2 = waker.clone(); let (avail, avail_tx) = WorkerAvailability::create(); Arbiter::default().exec_fn(move || { @@ -104,7 +100,7 @@ impl Worker { log::debug!("Creating server instance in {:?}", id); let factory = cfg.create().await; - match create(id, rx1, waker2, rx2, factory, avail_tx).await { + match create(id, rx1, rx2, factory, avail_tx).await { Ok((svc, wrk)) => { log::debug!("Server instance has been created in {:?}", id); run_worker(svc, wrk).await; @@ -121,7 +117,6 @@ impl Worker { id, tx1, tx2, - waker, avail, failed: Arc::new(AtomicBool::new(false)), } @@ -137,7 +132,6 @@ impl Worker { /// Returns `Ok` if message got accepted by the worker. /// Otherwise return message back as `Err` pub fn send(&self, msg: T) -> Result<(), T> { - self.waker.wake(); self.tx1.try_send(msg).map_err(|msg| msg.into_inner()) } @@ -183,7 +177,6 @@ impl Clone for Worker { id: self.id, tx1: self.tx1.clone(), tx2: self.tx2.clone(), - waker: self.waker.clone(), avail: self.avail.clone(), failed: self.failed.clone(), } @@ -251,7 +244,6 @@ struct WorkerSt> { rx: Receiver, stop: Pin>>, factory: F, - waker: Arc, availability: WorkerAvailabilityTx, } @@ -263,8 +255,6 @@ where loop { let mut recv = std::pin::pin!(wrk.rx.recv()); let fut = poll_fn(|cx| { - wrk.waker.register(cx.waker()); - match svc.poll_ready(cx) { Poll::Ready(res) => { res?; @@ -352,7 +342,6 @@ async fn stop_svc( async fn create( id: WorkerId, rx: Receiver, - waker: Arc, stop: Receiver, factory: Result, availability: WorkerAvailabilityTx, @@ -382,7 +371,6 @@ where WorkerSt { id, rx, - waker, factory, availability, stop: Box::pin(stop),