From ddbc4a722df3226bf246cc76905f338f2ecf463d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 4 Nov 2024 18:18:15 +0500 Subject: [PATCH] Optimize readiness handling (#457) --- ntex-service/CHANGES.md | 6 +++- ntex-service/Cargo.toml | 2 +- ntex-service/src/ctx.rs | 21 +++++++++----- ntex-service/src/pipeline.rs | 54 +++++++++++++++++------------------- 4 files changed, 46 insertions(+), 37 deletions(-) diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index f3e14370..6e17abf4 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,6 +1,10 @@ # Changes -## [3.3.0] - 2024-11-02 +## [3.3.1] - 2024-11-04 + +* Optimize readiness handling + +## [3.3.0] - 2024-11-04 * Added Service::not_ready() method diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 48b568dc..de2c8e0a 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "3.3.0" +version = "3.3.1" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 36b07a03..c404eee2 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -56,8 +56,14 @@ impl WaitersRef { } pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) { + let wakers = self.get_wakers(); + if let Some(last) = wakers.last() { + if idx == *last { + return; + } + } + wakers.push(idx); self.get()[idx as usize] = Some(cx.waker().clone()); - self.get_wakers().push(idx); } pub(crate) fn register_unready(&self, cx: &mut Context<'_>) { @@ -66,13 +72,14 @@ impl WaitersRef { } pub(crate) fn notify(&self) { - let indexes = self.get(); let wakers = self.get_wakers(); - - for idx in wakers.drain(..) { - if let Some(item) = indexes.get_mut(idx as usize) { - if let Some(waker) = item.take() { - waker.wake(); + if !wakers.is_empty() { + let indexes = self.get(); + for idx in wakers.drain(..) { + if let Some(item) = indexes.get_mut(idx as usize) { + if let Some(waker) = item.take() { + waker.wake(); + } } } } diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 5bed092f..6e815128 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -420,21 +420,20 @@ where let mut slf = self.as_mut(); if slf.pl.state.waiters.can_check(slf.pl.index, cx) { - if let Some(ref mut fut) = slf.fut { - match unsafe { Pin::new_unchecked(fut) }.poll(cx) { - Poll::Pending => { - slf.pl.state.waiters.register(slf.pl.index, cx); - Poll::Pending - } - Poll::Ready(res) => { - let _ = slf.fut.take(); - slf.pl.state.waiters.notify(); - Poll::Ready(res) - } - } - } else { + if slf.fut.is_none() { slf.fut = Some((slf.f)(slf.pl)); - self.poll(cx) + } + let fut = slf.fut.as_mut().unwrap(); + match unsafe { Pin::new_unchecked(fut) }.poll(cx) { + Poll::Pending => { + slf.pl.state.waiters.register(slf.pl.index, cx); + Poll::Pending + } + Poll::Ready(res) => { + let _ = slf.fut.take(); + slf.pl.state.waiters.notify(); + Poll::Ready(res) + } } } else { Poll::Pending @@ -460,21 +459,20 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut slf = self.as_mut(); - if let Some(ref mut fut) = slf.fut { - match unsafe { Pin::new_unchecked(fut) }.poll(cx) { - Poll::Pending => { - slf.pl.state.waiters.register_unready(cx); - Poll::Pending - } - Poll::Ready(res) => { - let _ = slf.fut.take(); - slf.pl.state.waiters.notify(); - Poll::Ready(res) - } - } - } else { + if slf.fut.is_none() { slf.fut = Some((slf.f)(slf.pl)); - self.poll(cx) + } + let fut = slf.fut.as_mut().unwrap(); + match unsafe { Pin::new_unchecked(fut) }.poll(cx) { + Poll::Pending => { + slf.pl.state.waiters.register_unready(cx); + Poll::Pending + } + Poll::Ready(res) => { + let _ = slf.fut.take(); + slf.pl.state.waiters.notify(); + Poll::Ready(res) + } } } }