diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index f4e3b158..64a64b70 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.1] - 2024-01-19 + +* Allow to lock readiness for Condition + ## [1.0.0] - 2024-01-09 * Release diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index cfcb6441..5c6d11ca 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "1.0.0" +version = "1.0.1" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,19 +16,18 @@ name = "ntex_util" path = "src/lib.rs" [dependencies] -ntex-rt = "0.4.11" -ntex-service = "2.0.0" +ntex-service = "2.0" +ntex-rt = "0.4" bitflags = "2.4" fxhash = "0.2.1" log = "0.4" slab = "0.4" -futures-timer = "3.0.2" +futures-timer = "3.0" futures-core = { version = "0.3", default-features = false, features = ["alloc"] } futures-sink = { version = "0.3", default-features = false, features = ["alloc"] } -pin-project-lite = "0.2.9" +pin-project-lite = "0.2" [dev-dependencies] -ntex = { version = "1.0.0", features = ["tokio"] } -ntex-bytes = "0.1.21" +ntex = { version = "1.0", features = ["tokio"] } ntex-macros = "0.1.3" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex-util/src/channel/condition.rs b/ntex-util/src/channel/condition.rs index 8a95e4af..3a22c87d 100644 --- a/ntex-util/src/channel/condition.rs +++ b/ntex-util/src/channel/condition.rs @@ -11,6 +11,7 @@ pub struct Condition(Cell); #[derive(Debug)] struct Inner { data: Slab>, + ready: bool, } impl Default for Condition { @@ -22,7 +23,10 @@ impl Default for Condition { impl Condition { /// Coonstruct new condition instance pub fn new() -> Condition { - Condition(Cell::new(Inner { data: Slab::new() })) + Condition(Cell::new(Inner { + data: Slab::new(), + ready: false, + })) } /// Get condition waiter @@ -43,6 +47,15 @@ impl Condition { } } } + + #[doc(hidden)] + /// Notify all waiters. + /// + /// All subsequent waiter readiness checks always returns `ready` + pub fn notify_and_lock_readiness(&self) { + self.0.get_mut().ready = true; + self.notify(); + } } impl Drop for Condition { @@ -66,7 +79,9 @@ impl Waiter { /// Returns readiness state of the condition. pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { - let inner = unsafe { self.inner.get_mut().data.get_unchecked_mut(self.token) }; + let parent = self.inner.get_mut(); + + let inner = unsafe { parent.data.get_unchecked_mut(self.token) }; if inner.is_none() { let waker = LocalWaker::default(); waker.register(cx.waker()); @@ -74,7 +89,11 @@ impl Waiter { } else if !inner.as_mut().unwrap().register(cx.waker()) { return Poll::Ready(()); } - Poll::Pending + if parent.ready { + Poll::Ready(()) + } else { + Poll::Pending + } } } @@ -157,4 +176,19 @@ mod tests { assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(())); assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending); } + + #[ntex_macros::rt_test2] + async fn test_condition_notify_ready() { + let cond = Condition::default().clone(); + let waiter = cond.wait(); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending); + + cond.notify_and_lock_readiness(); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(())); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(())); + assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(())); + + let waiter2 = cond.wait(); + assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(())); + } }