Replace async-oneshot crate (#255)

This commit is contained in:
Nikolay Kim 2023-11-22 23:44:28 +06:00 committed by GitHub
parent 3043dbe57c
commit 2713922e03
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 52 additions and 51 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.3.2] - 2023-11-22
* Replace async-oneshot with oneshot
## [0.3.1] - 2023-11-12 ## [0.3.1] - 2023-11-12
* Optimize io read task * Optimize io read task

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-async-std" name = "ntex-async-std"
version = "0.3.1" version = "0.3.2"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "async-std intergration for ntex framework" description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -19,7 +19,7 @@ path = "src/lib.rs"
ntex-bytes = "0.1.21" ntex-bytes = "0.1.21"
ntex-io = "0.3.6" ntex-io = "0.3.6"
ntex-util = "0.3.4" ntex-util = "0.3.4"
async-oneshot = "0.5.0"
log = "0.4" log = "0.4"
pin-project-lite = "0.2" pin-project-lite = "0.2"
async-std = { version = "1", features = ["unstable"] } async-std = { version = "1", features = ["unstable"] }
oneshot = { version = "0.1", default-features = false, features = ["async"] }

View file

@ -1,7 +1,5 @@
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use async_oneshot as oneshot;
thread_local! { thread_local! {
static SRUN: RefCell<bool> = RefCell::new(false); static SRUN: RefCell<bool> = RefCell::new(false);
static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default(); static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default();
@ -29,7 +27,7 @@ pub fn signal() -> Option<oneshot::Receiver<Signal>> {
async_std::task::spawn_local(Signals::new()); async_std::task::spawn_local(Signals::new());
} }
SHANDLERS.with(|handlers| { SHANDLERS.with(|handlers| {
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
handlers.borrow_mut().push(tx); handlers.borrow_mut().push(tx);
Some(rx) Some(rx)
}) })

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.3.1] - 2023-11-22
* Replace async-oneshot with oneshot
## [0.3.0] - 2023-06-22 ## [0.3.0] - 2023-06-22
* Release v0.3.0 * Release v0.3.0

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-glommio" name = "ntex-glommio"
version = "0.3.0" version = "0.3.1"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework" description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -16,12 +16,12 @@ name = "ntex_glommio"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
ntex-bytes = "0.1.19" ntex-bytes = "0.1.21"
ntex-io = "0.3.0" ntex-io = "0.3.9"
ntex-util = "0.3.0" ntex-util = "0.3.4"
async-oneshot = "0.5.0"
futures-lite = "1.12" futures-lite = "1.12"
log = "0.4" log = "0.4"
oneshot = { version = "0.1", default-features = false, features = ["async"] }
[target.'cfg(target_os = "linux")'.dependencies] [target.'cfg(target_os = "linux")'.dependencies]
glommio = "0.8" glommio = "0.8"

View file

@ -1,7 +1,5 @@
use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; use std::{cell::RefCell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use async_oneshot as oneshot;
thread_local! { thread_local! {
static SRUN: RefCell<bool> = RefCell::new(false); static SRUN: RefCell<bool> = RefCell::new(false);
static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default(); static SHANDLERS: Rc<RefCell<Vec<oneshot::Sender<Signal>>>> = Default::default();
@ -29,7 +27,7 @@ pub fn signal() -> Option<oneshot::Receiver<Signal>> {
glommio::spawn_local(Signals::new()).detach(); glommio::spawn_local(Signals::new()).detach();
} }
SHANDLERS.with(|handlers| { SHANDLERS.with(|handlers| {
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
handlers.borrow_mut().push(tx); handlers.borrow_mut().push(tx);
Some(rx) Some(rx)
}) })

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.4.11] - 2023-11-22
* Replace async-oneshot with oneshot
## [0.4.10] - 2023-11-02 ## [0.4.10] - 2023-11-02
* Upgrade async-channel to 2.0 * Upgrade async-channel to 2.0

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-rt" name = "ntex-rt"
version = "0.4.10" version = "0.4.11"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime" description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -28,10 +28,10 @@ tokio = ["tok-io"]
async-std = ["async_std/unstable"] async-std = ["async_std/unstable"]
[dependencies] [dependencies]
async-oneshot = "0.5"
async-channel = "2.0" async-channel = "2.0"
futures-core = "0.3" futures-core = "0.3"
log = "0.4" log = "0.4"
oneshot = "0.1"
tok-io = { version = "1", package = "tokio", default-features = false, features = [ tok-io = { version = "1", package = "tokio", default-features = false, features = [
"rt", "rt",

View file

@ -4,7 +4,6 @@ use std::task::{Context, Poll};
use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread}; use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
use async_channel::{unbounded, Receiver, Sender}; use async_channel::{unbounded, Receiver, Sender};
use async_oneshot as oneshot;
use futures_core::stream::Stream; use futures_core::stream::Stream;
use crate::system::System; use crate::system::System;
@ -97,7 +96,7 @@ impl Arbiter {
.spawn(move || { .spawn(move || {
let arb = Arbiter::with_sender(arb_tx); let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = oneshot::oneshot(); let (stop, stop_rx) = oneshot::channel();
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys); System::set_current(sys);
@ -147,18 +146,16 @@ impl Arbiter {
/// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
/// A future is created, and when resolved will contain the result of the function sent /// A future is created, and when resolved will contain the result of the function sent
/// to the Arbiters thread. /// to the Arbiters thread.
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, oneshot::Closed>> pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, oneshot::RecvError>>
where where
F: FnOnce() -> R + Send + 'static, F: FnOnce() -> R + Send + 'static,
R: Sync + Send + 'static, R: Sync + Send + 'static,
{ {
let (mut tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
let _ = self let _ = self
.sender .sender
.try_send(ArbiterCommand::ExecuteFn(Box::new(move || { .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_closed() { let _ = tx.send(f());
let _ = tx.send(f());
}
}))); })));
rx rx
} }
@ -265,7 +262,7 @@ impl Future for ArbiterController {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item { Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => { ArbiterCommand::Stop => {
if let Some(mut stop) = self.stop.take() { if let Some(stop) = self.stop.take() {
let _ = stop.send(0); let _ = stop.send(0);
}; };
return Poll::Ready(()); return Poll::Ready(());
@ -331,7 +328,7 @@ impl Future for SystemArbiter {
arb.stop(); arb.stop();
} }
// stop event loop // stop event loop
if let Some(mut stop) = self.stop.take() { if let Some(stop) = self.stop.take() {
let _ = stop.send(code); let _ = stop.send(code);
} }
} }

View file

@ -1,7 +1,6 @@
use std::{cell::RefCell, future::Future, io, rc::Rc}; use std::{cell::RefCell, future::Future, io, rc::Rc};
use async_channel::unbounded; use async_channel::unbounded;
use async_oneshot as oneshot;
use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter}; use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter};
use crate::System; use crate::System;
@ -45,7 +44,7 @@ impl Builder {
/// ///
/// This method panics if it can not create tokio runtime /// This method panics if it can not create tokio runtime
pub fn finish(self) -> SystemRunner { pub fn finish(self) -> SystemRunner {
let (stop_tx, stop) = oneshot::oneshot(); let (stop_tx, stop) = oneshot::channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded();
let stop_on_panic = self.stop_on_panic; let stop_on_panic = self.stop_on_panic;

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.7.12] - 2023-11-22
* Replace async-oneshot with oneshot
## [0.7.11] - 2023-11-20 ## [0.7.11] - 2023-11-20
* Refactor http/1 timeouts * Refactor http/1 timeouts

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.7.11" version = "0.7.12"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -64,11 +64,11 @@ ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.0", optional = true } ntex-glommio = { version = "0.3.0", optional = true }
ntex-async-std = { version = "0.3.1", optional = true } ntex-async-std = { version = "0.3.1", optional = true }
async-oneshot = "0.5" async-channel = "2.1"
async-channel = "2.0"
base64 = "0.21" base64 = "0.21"
bitflags = "2.4" bitflags = "2.4"
log = "0.4" log = "0.4"
oneshot = { version = "0.1", default-features = false, features = ["async"] }
nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand"] } nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand"] }
polling = "3.3" polling = "3.3"
pin-project-lite = "0.2" pin-project-lite = "0.2"

View file

@ -1,7 +1,6 @@
use std::{fmt, future::Future, io, marker, mem, net, pin::Pin, task::Context, task::Poll}; use std::{fmt, future::Future, io, marker, mem, net, pin::Pin, task::Context, task::Poll};
use async_channel::unbounded; use async_channel::unbounded;
use async_oneshot as oneshot;
use log::{error, info}; use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type}; use socket2::{Domain, SockAddr, Socket, Type};
@ -366,11 +365,11 @@ impl ServerBuilder {
fn handle_cmd(&mut self, item: ServerCommand) { fn handle_cmd(&mut self, item: ServerCommand) {
match item { match item {
ServerCommand::Pause(mut tx) => { ServerCommand::Pause(tx) => {
self.accept.send(Command::Pause); self.accept.send(Command::Pause);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Resume(mut tx) => { ServerCommand::Resume(tx) => {
self.accept.send(Command::Resume); self.accept.send(Command::Resume);
let _ = tx.send(()); let _ = tx.send(());
} }
@ -431,10 +430,10 @@ impl ServerBuilder {
spawn(async move { spawn(async move {
let _ = join_all(futs).await; let _ = join_all(futs).await;
if let Some(mut tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
} }
for mut tx in notify { for tx in notify {
let _ = tx.send(()); let _ = tx.send(());
} }
if exit { if exit {
@ -454,10 +453,10 @@ impl ServerBuilder {
System::current().stop(); System::current().stop();
}); });
} }
if let Some(mut tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
} }
for mut tx in notify { for tx in notify {
let _ = tx.send(()); let _ = tx.send(());
} }
} }

View file

@ -2,7 +2,6 @@
use std::{future::Future, io, pin::Pin, task::Context, task::Poll}; use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
use async_channel::Sender; use async_channel::Sender;
use async_oneshot as oneshot;
mod accept; mod accept;
mod builder; mod builder;
@ -101,7 +100,7 @@ impl Server {
/// If socket contains some pending connection, they might be dropped. /// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active. /// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> { pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
let _ = self.0.try_send(ServerCommand::Pause(tx)); let _ = self.0.try_send(ServerCommand::Pause(tx));
async move { async move {
let _ = rx.await; let _ = rx.await;
@ -110,7 +109,7 @@ impl Server {
/// Resume accepting incoming connections /// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> { pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
let _ = self.0.try_send(ServerCommand::Resume(tx)); let _ = self.0.try_send(ServerCommand::Resume(tx));
async move { async move {
let _ = rx.await; let _ = rx.await;
@ -121,7 +120,7 @@ impl Server {
/// ///
/// If server starts with `spawn()` method, then spawned thread get terminated. /// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> { pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
let _ = self.0.try_send(ServerCommand::Stop { let _ = self.0.try_send(ServerCommand::Stop {
graceful, graceful,
completion: Some(tx), completion: Some(tx),
@ -145,7 +144,7 @@ impl Future for Server {
let this = self.get_mut(); let this = self.get_mut();
if this.1.is_none() { if this.1.is_none() {
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
if this.0.try_send(ServerCommand::Notify(tx)).is_err() { if this.0.try_send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }

View file

@ -2,7 +2,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};
use async_channel::{unbounded, Receiver, Sender}; use async_channel::{unbounded, Receiver, Sender};
use async_oneshot as oneshot;
use crate::rt::{spawn, Arbiter}; use crate::rt::{spawn, Arbiter};
use crate::service::Pipeline; use crate::service::Pipeline;
@ -91,7 +90,7 @@ impl WorkerClient {
} }
pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (result, rx) = oneshot::oneshot(); let (result, rx) = oneshot::channel();
let _ = self.tx2.try_send(StopCommand { graceful, result }); let _ = self.tx2.try_send(StopCommand { graceful, result });
rx rx
} }
@ -227,7 +226,7 @@ impl Worker {
let res: Result<Vec<_>, _> = let res: Result<Vec<_>, _> =
match select(join_all(fut), stream_recv(&mut wrk.rx2)).await { match select(join_all(fut), stream_recv(&mut wrk.rx2)).await {
Either::Left(result) => result.into_iter().collect(), Either::Left(result) => result.into_iter().collect(),
Either::Right(Some(StopCommand { mut result, .. })) => { Either::Right(Some(StopCommand { result, .. })) => {
trace!("Shutdown uninitialized worker"); trace!("Shutdown uninitialized worker");
wrk.shutdown(true); wrk.shutdown(true);
let _ = result.send(false); let _ = result.send(false);
@ -347,11 +346,7 @@ impl Future for Worker {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// `StopWorker` message handler // `StopWorker` message handler
let stop = Pin::new(&mut self.rx2).poll_next(cx); let stop = Pin::new(&mut self.rx2).poll_next(cx);
if let Poll::Ready(Some(StopCommand { if let Poll::Ready(Some(StopCommand { graceful, result })) = stop {
graceful,
mut result,
})) = stop
{
self.availability.set(false); self.availability.set(false);
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
@ -653,7 +648,7 @@ mod tests {
// shutdown // shutdown
let g = MAX_CONNS_COUNTER.with(|conns| conns.get()); let g = MAX_CONNS_COUNTER.with(|conns| conns.get());
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
tx2.try_send(StopCommand { tx2.try_send(StopCommand {
graceful: true, graceful: true,
result: tx, result: tx,
@ -697,7 +692,7 @@ mod tests {
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(avail.available()); assert!(avail.available());
let (tx, rx) = oneshot::oneshot(); let (tx, rx) = oneshot::channel();
tx2.try_send(StopCommand { tx2.try_send(StopCommand {
graceful: false, graceful: false,
result: tx, result: tx,