diff --git a/ntex-async-std/CHANGES.md b/ntex-async-std/CHANGES.md index f9e11732..7f04447c 100644 --- a/ntex-async-std/CHANGES.md +++ b/ntex-async-std/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.2] - 2023-11-22 + +* Replace async-oneshot with oneshot + ## [0.3.1] - 2023-11-12 * Optimize io read task diff --git a/ntex-async-std/Cargo.toml b/ntex-async-std/Cargo.toml index 0ca99640..39363f31 100644 --- a/ntex-async-std/Cargo.toml +++ b/ntex-async-std/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-async-std" -version = "0.3.1" +version = "0.3.2" authors = ["ntex contributors "] description = "async-std intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -19,7 +19,7 @@ path = "src/lib.rs" ntex-bytes = "0.1.21" ntex-io = "0.3.6" ntex-util = "0.3.4" -async-oneshot = "0.5.0" log = "0.4" pin-project-lite = "0.2" async-std = { version = "1", features = ["unstable"] } +oneshot = { version = "0.1", default-features = false, features = ["async"] } diff --git a/ntex-async-std/src/signals.rs b/ntex-async-std/src/signals.rs index b144f506..47f5613b 100644 --- a/ntex-async-std/src/signals.rs +++ b/ntex-async-std/src/signals.rs @@ -1,7 +1,5 @@ use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; -use async_oneshot as oneshot; - thread_local! { static SRUN: RefCell = RefCell::new(false); static SHANDLERS: Rc>>> = Default::default(); @@ -29,7 +27,7 @@ pub fn signal() -> Option> { async_std::task::spawn_local(Signals::new()); } SHANDLERS.with(|handlers| { - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); handlers.borrow_mut().push(tx); Some(rx) }) diff --git a/ntex-glommio/CHANGES.md b/ntex-glommio/CHANGES.md index 321bf2af..d6e1a01e 100644 --- a/ntex-glommio/CHANGES.md +++ b/ntex-glommio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.1] - 2023-11-22 + +* Replace async-oneshot with oneshot + ## [0.3.0] - 2023-06-22 * Release v0.3.0 diff --git a/ntex-glommio/Cargo.toml b/ntex-glommio/Cargo.toml index 334b255d..231e41a1 100644 --- a/ntex-glommio/Cargo.toml +++ b/ntex-glommio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-glommio" -version = "0.3.0" +version = "0.3.1" authors = ["ntex contributors "] description = "glommio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,12 +16,12 @@ name = "ntex_glommio" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.19" -ntex-io = "0.3.0" -ntex-util = "0.3.0" -async-oneshot = "0.5.0" +ntex-bytes = "0.1.21" +ntex-io = "0.3.9" +ntex-util = "0.3.4" futures-lite = "1.12" log = "0.4" +oneshot = { version = "0.1", default-features = false, features = ["async"] } [target.'cfg(target_os = "linux")'.dependencies] glommio = "0.8" diff --git a/ntex-glommio/src/signals.rs b/ntex-glommio/src/signals.rs index 305a8b7d..d06077ee 100644 --- a/ntex-glommio/src/signals.rs +++ b/ntex-glommio/src/signals.rs @@ -1,7 +1,5 @@ use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; -use async_oneshot as oneshot; - thread_local! { static SRUN: RefCell = RefCell::new(false); static SHANDLERS: Rc>>> = Default::default(); @@ -29,7 +27,7 @@ pub fn signal() -> Option> { glommio::spawn_local(Signals::new()).detach(); } SHANDLERS.with(|handlers| { - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); handlers.borrow_mut().push(tx); Some(rx) }) diff --git a/ntex-rt/CHANGES.md b/ntex-rt/CHANGES.md index cff095db..75883056 100644 --- a/ntex-rt/CHANGES.md +++ b/ntex-rt/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.11] - 2023-11-22 + +* Replace async-oneshot with oneshot + ## [0.4.10] - 2023-11-02 * Upgrade async-channel to 2.0 diff --git a/ntex-rt/Cargo.toml b/ntex-rt/Cargo.toml index be4150fc..6bec636e 100644 --- a/ntex-rt/Cargo.toml +++ b/ntex-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-rt" -version = "0.4.10" +version = "0.4.11" authors = ["ntex contributors "] description = "ntex runtime" keywords = ["network", "framework", "async", "futures"] @@ -28,10 +28,10 @@ tokio = ["tok-io"] async-std = ["async_std/unstable"] [dependencies] -async-oneshot = "0.5" async-channel = "2.0" futures-core = "0.3" log = "0.4" +oneshot = "0.1" tok-io = { version = "1", package = "tokio", default-features = false, features = [ "rt", diff --git a/ntex-rt/src/arbiter.rs b/ntex-rt/src/arbiter.rs index dc8eb7ae..a7f0249a 100644 --- a/ntex-rt/src/arbiter.rs +++ b/ntex-rt/src/arbiter.rs @@ -4,7 +4,6 @@ use std::task::{Context, Poll}; use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread}; use async_channel::{unbounded, Receiver, Sender}; -use async_oneshot as oneshot; use futures_core::stream::Stream; use crate::system::System; @@ -97,7 +96,7 @@ impl Arbiter { .spawn(move || { let arb = Arbiter::with_sender(arb_tx); - let (stop, stop_rx) = oneshot::oneshot(); + let (stop, stop_rx) = oneshot::channel(); STORAGE.with(|cell| cell.borrow_mut().clear()); System::set_current(sys); @@ -147,18 +146,16 @@ impl Arbiter { /// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// A future is created, and when resolved will contain the result of the function sent /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future> + pub fn exec(&self, f: F) -> impl Future> where F: FnOnce() -> R + Send + 'static, R: Sync + Send + 'static, { - let (mut tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); let _ = self .sender .try_send(ArbiterCommand::ExecuteFn(Box::new(move || { - if !tx.is_closed() { - let _ = tx.send(f()); - } + let _ = tx.send(f()); }))); rx } @@ -265,7 +262,7 @@ impl Future for ArbiterController { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(item)) => match item { ArbiterCommand::Stop => { - if let Some(mut stop) = self.stop.take() { + if let Some(stop) = self.stop.take() { let _ = stop.send(0); }; return Poll::Ready(()); @@ -331,7 +328,7 @@ impl Future for SystemArbiter { arb.stop(); } // stop event loop - if let Some(mut stop) = self.stop.take() { + if let Some(stop) = self.stop.take() { let _ = stop.send(code); } } diff --git a/ntex-rt/src/builder.rs b/ntex-rt/src/builder.rs index 43f752f9..a49a1722 100644 --- a/ntex-rt/src/builder.rs +++ b/ntex-rt/src/builder.rs @@ -1,7 +1,6 @@ use std::{cell::RefCell, future::Future, io, rc::Rc}; use async_channel::unbounded; -use async_oneshot as oneshot; use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter}; use crate::System; @@ -45,7 +44,7 @@ impl Builder { /// /// This method panics if it can not create tokio runtime pub fn finish(self) -> SystemRunner { - let (stop_tx, stop) = oneshot::oneshot(); + let (stop_tx, stop) = oneshot::channel(); let (sys_sender, sys_receiver) = unbounded(); let stop_on_panic = self.stop_on_panic; diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 712fa3cf..58350ac8 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.12] - 2023-11-22 + +* Replace async-oneshot with oneshot + ## [0.7.11] - 2023-11-20 * Refactor http/1 timeouts diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index eef8d1e2..ec5e4579 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.11" +version = "0.7.12" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -64,11 +64,11 @@ ntex-tokio = { version = "0.3.1", optional = true } ntex-glommio = { version = "0.3.0", optional = true } ntex-async-std = { version = "0.3.1", optional = true } -async-oneshot = "0.5" -async-channel = "2.0" +async-channel = "2.1" base64 = "0.21" bitflags = "2.4" log = "0.4" +oneshot = { version = "0.1", default-features = false, features = ["async"] } nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand"] } polling = "3.3" pin-project-lite = "0.2" diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index feef5aac..21dbb0d3 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -1,7 +1,6 @@ use std::{fmt, future::Future, io, marker, mem, net, pin::Pin, task::Context, task::Poll}; use async_channel::unbounded; -use async_oneshot as oneshot; use log::{error, info}; use socket2::{Domain, SockAddr, Socket, Type}; @@ -366,11 +365,11 @@ impl ServerBuilder { fn handle_cmd(&mut self, item: ServerCommand) { match item { - ServerCommand::Pause(mut tx) => { + ServerCommand::Pause(tx) => { self.accept.send(Command::Pause); let _ = tx.send(()); } - ServerCommand::Resume(mut tx) => { + ServerCommand::Resume(tx) => { self.accept.send(Command::Resume); let _ = tx.send(()); } @@ -431,10 +430,10 @@ impl ServerBuilder { spawn(async move { let _ = join_all(futs).await; - if let Some(mut tx) = completion { + if let Some(tx) = completion { let _ = tx.send(()); } - for mut tx in notify { + for tx in notify { let _ = tx.send(()); } if exit { @@ -454,10 +453,10 @@ impl ServerBuilder { System::current().stop(); }); } - if let Some(mut tx) = completion { + if let Some(tx) = completion { let _ = tx.send(()); } - for mut tx in notify { + for tx in notify { let _ = tx.send(()); } } diff --git a/ntex/src/server/mod.rs b/ntex/src/server/mod.rs index def80e03..62fb3016 100644 --- a/ntex/src/server/mod.rs +++ b/ntex/src/server/mod.rs @@ -2,7 +2,6 @@ use std::{future::Future, io, pin::Pin, task::Context, task::Poll}; use async_channel::Sender; -use async_oneshot as oneshot; mod accept; mod builder; @@ -101,7 +100,7 @@ impl Server { /// If socket contains some pending connection, they might be dropped. /// All opened connection remains active. pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); let _ = self.0.try_send(ServerCommand::Pause(tx)); async move { let _ = rx.await; @@ -110,7 +109,7 @@ impl Server { /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); let _ = self.0.try_send(ServerCommand::Resume(tx)); async move { let _ = rx.await; @@ -121,7 +120,7 @@ impl Server { /// /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); let _ = self.0.try_send(ServerCommand::Stop { graceful, completion: Some(tx), @@ -145,7 +144,7 @@ impl Future for Server { let this = self.get_mut(); if this.1.is_none() { - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); if this.0.try_send(ServerCommand::Notify(tx)).is_err() { return Poll::Ready(Ok(())); } diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index c025aca6..b144b39b 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -2,7 +2,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; use async_channel::{unbounded, Receiver, Sender}; -use async_oneshot as oneshot; use crate::rt::{spawn, Arbiter}; use crate::service::Pipeline; @@ -91,7 +90,7 @@ impl WorkerClient { } pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (result, rx) = oneshot::oneshot(); + let (result, rx) = oneshot::channel(); let _ = self.tx2.try_send(StopCommand { graceful, result }); rx } @@ -227,7 +226,7 @@ impl Worker { let res: Result, _> = match select(join_all(fut), stream_recv(&mut wrk.rx2)).await { Either::Left(result) => result.into_iter().collect(), - Either::Right(Some(StopCommand { mut result, .. })) => { + Either::Right(Some(StopCommand { result, .. })) => { trace!("Shutdown uninitialized worker"); wrk.shutdown(true); let _ = result.send(false); @@ -347,11 +346,7 @@ impl Future for Worker { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // `StopWorker` message handler let stop = Pin::new(&mut self.rx2).poll_next(cx); - if let Poll::Ready(Some(StopCommand { - graceful, - mut result, - })) = stop - { + if let Poll::Ready(Some(StopCommand { graceful, result })) = stop { self.availability.set(false); let num = num_connections(); if num == 0 { @@ -653,7 +648,7 @@ mod tests { // shutdown let g = MAX_CONNS_COUNTER.with(|conns| conns.get()); - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); tx2.try_send(StopCommand { graceful: true, result: tx, @@ -697,7 +692,7 @@ mod tests { let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; assert!(avail.available()); - let (tx, rx) = oneshot::oneshot(); + let (tx, rx) = oneshot::channel(); tx2.try_send(StopCommand { graceful: false, result: tx,