Optimize readiness handling (#457)

This commit is contained in:
Nikolay Kim 2024-11-04 18:18:15 +05:00 committed by GitHub
parent c26b336fe5
commit ddbc4a722d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 46 additions and 37 deletions

View file

@ -1,6 +1,10 @@
# Changes # Changes
## [3.3.0] - 2024-11-02 ## [3.3.1] - 2024-11-04
* Optimize readiness handling
## [3.3.0] - 2024-11-04
* Added Service::not_ready() method * Added Service::not_ready() method

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-service" name = "ntex-service"
version = "3.3.0" version = "3.3.1"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service" description = "ntex service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -56,8 +56,14 @@ impl WaitersRef {
} }
pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) { pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) {
let wakers = self.get_wakers();
if let Some(last) = wakers.last() {
if idx == *last {
return;
}
}
wakers.push(idx);
self.get()[idx as usize] = Some(cx.waker().clone()); self.get()[idx as usize] = Some(cx.waker().clone());
self.get_wakers().push(idx);
} }
pub(crate) fn register_unready(&self, cx: &mut Context<'_>) { pub(crate) fn register_unready(&self, cx: &mut Context<'_>) {
@ -66,13 +72,14 @@ impl WaitersRef {
} }
pub(crate) fn notify(&self) { pub(crate) fn notify(&self) {
let indexes = self.get();
let wakers = self.get_wakers(); let wakers = self.get_wakers();
if !wakers.is_empty() {
for idx in wakers.drain(..) { let indexes = self.get();
if let Some(item) = indexes.get_mut(idx as usize) { for idx in wakers.drain(..) {
if let Some(waker) = item.take() { if let Some(item) = indexes.get_mut(idx as usize) {
waker.wake(); if let Some(waker) = item.take() {
waker.wake();
}
} }
} }
} }

View file

@ -420,21 +420,20 @@ where
let mut slf = self.as_mut(); let mut slf = self.as_mut();
if slf.pl.state.waiters.can_check(slf.pl.index, cx) { if slf.pl.state.waiters.can_check(slf.pl.index, cx) {
if let Some(ref mut fut) = slf.fut { if slf.fut.is_none() {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => {
slf.pl.state.waiters.register(slf.pl.index, cx);
Poll::Pending
}
Poll::Ready(res) => {
let _ = slf.fut.take();
slf.pl.state.waiters.notify();
Poll::Ready(res)
}
}
} else {
slf.fut = Some((slf.f)(slf.pl)); slf.fut = Some((slf.f)(slf.pl));
self.poll(cx) }
let fut = slf.fut.as_mut().unwrap();
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => {
slf.pl.state.waiters.register(slf.pl.index, cx);
Poll::Pending
}
Poll::Ready(res) => {
let _ = slf.fut.take();
slf.pl.state.waiters.notify();
Poll::Ready(res)
}
} }
} else { } else {
Poll::Pending Poll::Pending
@ -460,21 +459,20 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut slf = self.as_mut(); let mut slf = self.as_mut();
if let Some(ref mut fut) = slf.fut { if slf.fut.is_none() {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => {
slf.pl.state.waiters.register_unready(cx);
Poll::Pending
}
Poll::Ready(res) => {
let _ = slf.fut.take();
slf.pl.state.waiters.notify();
Poll::Ready(res)
}
}
} else {
slf.fut = Some((slf.f)(slf.pl)); slf.fut = Some((slf.f)(slf.pl));
self.poll(cx) }
let fut = slf.fut.as_mut().unwrap();
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => {
slf.pl.state.waiters.register_unready(cx);
Poll::Pending
}
Poll::Ready(res) => {
let _ = slf.fut.take();
slf.pl.state.waiters.notify();
Poll::Ready(res)
}
} }
} }
} }