diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 83625407..9ad96eca 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.3.0] - 2024-08-19 + +* Allow to send clonable value via `Condition` + ## [2.2.0] - 2024-07-30 * Add LocalWaker::with() helper diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 01f51471..e0c6236a 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "2.2.0" +version = "2.3.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-util/src/channel/condition.rs b/ntex-util/src/channel/condition.rs index 3a22c87d..d6771caa 100644 --- a/ntex-util/src/channel/condition.rs +++ b/ntex-util/src/channel/condition.rs @@ -1,49 +1,88 @@ +use std::{ + cell, fmt, future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll, +}; + use slab::Slab; -use std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll}; use super::cell::Cell; use crate::task::LocalWaker; /// Condition allows to notify multiple waiters at the same time -#[derive(Clone, Debug)] -pub struct Condition(Cell); +pub struct Condition +where + T: Default, +{ + inner: Cell>, +} -#[derive(Debug)] -struct Inner { - data: Slab>, +struct Inner { + data: Slab>>, ready: bool, + count: usize, } -impl Default for Condition { +struct Item { + val: cell::Cell, + waker: LocalWaker, +} + +impl Default for Condition { fn default() -> Self { - Self::new() + Condition { + inner: Cell::new(Inner { + data: Slab::new(), + ready: false, + count: 1, + }), + } } } -impl Condition { - /// Coonstruct new condition instance - pub fn new() -> Condition { - Condition(Cell::new(Inner { - data: Slab::new(), - ready: false, - })) +impl Clone for Condition { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + inner.get_mut().count += 1; + Self { inner } } +} +impl fmt::Debug for Condition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Condition") + .field("ready", &self.inner.get_ref().ready) + .finish() + } +} + +impl Condition<()> { + /// Coonstruct new condition instance + pub fn new() -> Condition<()> { + Condition { + inner: Cell::new(Inner { + data: Slab::new(), + ready: false, + count: 1, + }), + } + } +} + +impl Condition { /// Get condition waiter - pub fn wait(&self) -> Waiter { - let token = self.0.get_mut().data.insert(None); + pub fn wait(&self) -> Waiter { + let token = self.inner.get_mut().data.insert(None); Waiter { token, - inner: self.0.clone(), + inner: self.inner.clone(), } } /// Notify all waiters pub fn notify(&self) { - let inner = self.0.get_ref(); - for item in inner.data.iter() { - if let Some(waker) = item.1 { - waker.wake(); + let inner = self.inner.get_ref(); + for (_, item) in inner.data.iter() { + if let Some(item) = item { + item.waker.wake(); } } } @@ -53,51 +92,83 @@ impl Condition { /// /// All subsequent waiter readiness checks always returns `ready` pub fn notify_and_lock_readiness(&self) { - self.0.get_mut().ready = true; + self.inner.get_mut().ready = true; self.notify(); } } -impl Drop for Condition { - fn drop(&mut self) { - self.notify() +impl Condition { + /// Notify all waiters + pub fn notify_with(&self, val: T) { + let inner = self.inner.get_ref(); + for (_, item) in inner.data.iter() { + if let Some(item) = item { + if item.waker.wake_checked() { + item.val.set(val.clone()); + } + } + } + } + + #[doc(hidden)] + /// Notify all waiters. + /// + /// All subsequent waiter readiness checks always returns `ready` + pub fn notify_with_and_lock_readiness(&self, val: T) { + self.inner.get_mut().ready = true; + self.notify_with(val); } } -#[derive(Debug)] -#[must_use = "Waiter do nothing unless polled"] -pub struct Waiter { - token: usize, - inner: Cell, +impl Drop for Condition { + fn drop(&mut self) { + let inner = self.inner.get_mut(); + inner.count -= 1; + if inner.count == 0 { + self.notify_and_lock_readiness() + } + } } -impl Waiter { +#[must_use = "Waiter do nothing unless polled"] +pub struct Waiter { + token: usize, + inner: Cell>, +} + +impl Waiter { /// Returns readiness state of the condition. - pub async fn ready(&self) { + pub async fn ready(&self) -> T { poll_fn(|cx| self.poll_ready(cx)).await } /// Returns readiness state of the condition. - pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll { let parent = self.inner.get_mut(); let inner = unsafe { parent.data.get_unchecked_mut(self.token) }; if inner.is_none() { let waker = LocalWaker::default(); waker.register(cx.waker()); - *inner = Some(waker); - } else if !inner.as_mut().unwrap().register(cx.waker()) { - return Poll::Ready(()); + *inner = Some(Item { + waker, + val: cell::Cell::new(Default::default()), + }); + } else { + let item = inner.as_mut().unwrap(); + if !item.waker.register(cx.waker()) { + return Poll::Ready(item.val.replace(Default::default())); + } } if parent.ready { - Poll::Ready(()) + Poll::Ready(Default::default()) } else { Poll::Pending } } } -impl Clone for Waiter { +impl Clone for Waiter { fn clone(&self) -> Self { let token = self.inner.get_mut().data.insert(None); Waiter { @@ -107,15 +178,21 @@ impl Clone for Waiter { } } -impl Future for Waiter { - type Output = (); +impl Future for Waiter { + type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.get_mut().poll_ready(cx) } } -impl Drop for Waiter { +impl fmt::Debug for Waiter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Waiter").finish() + } +} + +impl Drop for Waiter { fn drop(&mut self) { self.inner.get_mut().data.remove(self.token); } @@ -172,13 +249,46 @@ mod tests { drop(cond); assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(())); - assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(())); + assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(())); assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(())); - assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending); } #[ntex_macros::rt_test2] - async fn test_condition_notify_ready() { + async fn test_condition_with() { + let cond = Condition::::default(); + let waiter = cond.wait(); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending); + cond.notify_with("TEST".into()); + assert_eq!(waiter.ready().await, "TEST".to_string()); + + let waiter2 = waiter.clone(); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending); + assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending); + assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending); + + drop(cond); + assert_eq!( + lazy(|cx| waiter.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + assert_eq!( + lazy(|cx| waiter.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + assert_eq!( + lazy(|cx| waiter2.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + assert_eq!( + lazy(|cx| waiter2.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + } + + #[ntex_macros::rt_test2] + async fn notify_ready() { let cond = Condition::default().clone(); let waiter = cond.wait(); assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending); @@ -191,4 +301,42 @@ mod tests { let waiter2 = cond.wait(); assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(())); } + + #[ntex_macros::rt_test2] + async fn notify_with_and_lock_ready() { + // with + let cond = Condition::::default(); + let waiter = cond.wait(); + let waiter2 = cond.wait(); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending); + assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending); + + cond.notify_with_and_lock_readiness("TEST".into()); + assert_eq!( + lazy(|cx| waiter.poll_ready(cx)).await, + Poll::Ready("TEST".into()) + ); + assert_eq!( + lazy(|cx| waiter.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + assert_eq!( + lazy(|cx| waiter.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + assert_eq!( + lazy(|cx| waiter2.poll_ready(cx)).await, + Poll::Ready("TEST".into()) + ); + assert_eq!( + lazy(|cx| waiter2.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + + let waiter2 = cond.wait(); + assert_eq!( + lazy(|cx| waiter2.poll_ready(cx)).await, + Poll::Ready("".into()) + ); + } } diff --git a/ntex-util/src/task.rs b/ntex-util/src/task.rs index 0dc49640..a2c427ce 100644 --- a/ntex-util/src/task.rs +++ b/ntex-util/src/task.rs @@ -58,6 +58,19 @@ impl LocalWaker { } } + #[inline] + /// Calls `wake` on the last `Waker` passed to `register`. + /// + /// If `register` has not been called yet, then this returns `false`. + pub fn wake_checked(&self) -> bool { + if let Some(waker) = self.take() { + waker.wake(); + true + } else { + false + } + } + /// Returns the last `Waker` passed to `register`, so that the user can wake it. /// /// If a waker has not been registered, this returns `None`.