Upgrade async-channel (#236)

This commit is contained in:
Nikolay Kim 2023-11-02 20:15:18 +06:00 committed by GitHub
parent ea26d9ef53
commit d460d9c259
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 16 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.10] - 2023-11-02
* Upgrade async-channel to 2.0
## [0.4.9] - 2023-04-11
* Chore upgrade glommio to 0.8

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-rt"
version = "0.4.9"
version = "0.4.10"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex runtime"
keywords = ["network", "framework", "async", "futures"]
@ -28,8 +28,8 @@ tokio = ["tok-io"]
async-std = ["async_std/unstable"]
[dependencies]
async-oneshot = "0.5.0"
async-channel = "1.8.0"
async-oneshot = "0.5"
async-channel = "2.0"
futures-core = "0.3"
log = "0.4"

View file

@ -14,6 +14,9 @@ thread_local!(
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
type ServerCommandRx = Pin<Box<dyn Stream<Item = SystemCommand>>>;
type ArbiterCommandRx = Pin<Box<dyn Stream<Item = ArbiterCommand>>>;
pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0);
pub(super) enum ArbiterCommand {
@ -57,7 +60,13 @@ impl Arbiter {
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
(arb, ArbiterController { stop: None, rx })
(
arb,
ArbiterController {
stop: None,
rx: Box::pin(rx),
},
)
}
/// Returns the current thread's arbiter's address. If no Arbiter is present, then this
@ -97,7 +106,7 @@ impl Arbiter {
// start arbiter controller
crate::spawn(ArbiterController {
stop: Some(stop),
rx: arb_rx,
rx: Box::pin(arb_rx),
});
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
@ -231,7 +240,7 @@ impl Arbiter {
pub(crate) struct ArbiterController {
stop: Option<oneshot::Sender<i32>>,
rx: Receiver<ArbiterCommand>,
rx: ArbiterCommandRx,
}
impl Drop for ArbiterController {
@ -281,10 +290,9 @@ pub(super) enum SystemCommand {
UnregisterArbiter(usize),
}
#[derive(Debug)]
pub(super) struct SystemArbiter {
stop: Option<oneshot::Sender<i32>>,
commands: Receiver<SystemCommand>,
commands: ServerCommandRx,
arbiters: HashMap<usize, Arbiter>,
}
@ -294,13 +302,21 @@ impl SystemArbiter {
commands: Receiver<SystemCommand>,
) -> Self {
SystemArbiter {
commands,
commands: Box::pin(commands),
stop: Some(stop),
arbiters: HashMap::new(),
}
}
}
impl fmt::Debug for SystemArbiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SystemArbiter")
.field("arbiters", &self.arbiters)
.finish()
}
}
impl Future for SystemArbiter {
type Output = ();