Notify readiness waiters if ready call get dropped (#431)

This commit is contained in:
Nikolay Kim 2024-09-29 16:39:26 +05:00 committed by GitHub
parent 0d6f348fc2
commit effce6915f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 138 additions and 24 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [3.1.0] - 2024-09-29
* Notify readiness waiters if ready call get dropped
## [3.0.0] - 2024-05-28
* Use "async fn" for Service::ready() and Service::shutdown() methods

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "3.0.0"
version = "3.1.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]

View file

@ -1,4 +1,4 @@
use std::{cell, fmt, future::poll_fn, future::Future, marker, pin, rc::Rc, task};
use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task};
use crate::Service;
@ -146,26 +146,15 @@ impl<'a, S> ServiceCtx<'a, S> {
T: Service<R>,
{
// check readiness and notify waiters
let mut fut = svc.ready(ServiceCtx {
idx: self.idx,
waiters: self.waiters,
_t: marker::PhantomData,
});
poll_fn(|cx| {
if self.waiters.can_check(self.idx, cx) {
// SAFETY: `fut` never moves
let p = unsafe { pin::Pin::new_unchecked(&mut fut) };
match p.poll(cx) {
task::Poll::Pending => self.waiters.register(self.idx, cx),
task::Poll::Ready(res) => {
self.waiters.notify();
return task::Poll::Ready(res);
}
}
}
task::Poll::Pending
})
ReadyCall {
completed: false,
fut: svc.ready(ServiceCtx {
idx: self.idx,
waiters: self.waiters,
_t: marker::PhantomData,
}),
ctx: *self,
}
.await
}
@ -230,11 +219,51 @@ impl<'a, S> fmt::Debug for ServiceCtx<'a, S> {
}
}
struct ReadyCall<'a, S: ?Sized, F: Future> {
completed: bool,
fut: F,
ctx: ServiceCtx<'a, S>,
}
impl<'a, S: ?Sized, F: Future> Drop for ReadyCall<'a, S, F> {
fn drop(&mut self) {
if !self.completed {
self.ctx.waiters.notify();
}
}
}
impl<'a, S: ?Sized, F: Future> Unpin for ReadyCall<'a, S, F> {}
impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> {
type Output = F::Output;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Self::Output> {
if self.ctx.waiters.can_check(self.ctx.idx, cx) {
// SAFETY: `fut` never moves
let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) };
match result {
task::Poll::Pending => self.ctx.waiters.register(self.ctx.idx, cx),
task::Poll::Ready(res) => {
self.completed = true;
self.ctx.waiters.notify();
return task::Poll::Ready(res);
}
}
}
task::Poll::Pending
}
}
#[cfg(test)]
mod tests {
use std::{cell::Cell, cell::RefCell, future::poll_fn, task::Poll};
use ntex_util::{channel::condition, future::lazy, time};
use ntex_util::channel::{condition, oneshot};
use ntex_util::{future::lazy, future::select, spawn, time};
use super::*;
use crate::Pipeline;
@ -299,6 +328,69 @@ mod tests {
assert_eq!(cnt.get(), 3);
}
#[ntex::test]
async fn test_ready_on_drop() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
let srv1 = srv.clone();
let srv2 = srv1.clone().bind();
let (tx, rx) = oneshot::channel();
spawn(async move {
select(rx, srv1.ready()).await;
time::sleep(time::Millis(25000)).await;
drop(srv1);
});
time::sleep(time::Millis(250)).await;
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
let _ = tx.send(());
time::sleep(time::Millis(250)).await;
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
con.notify();
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
}
#[ntex::test]
async fn test_ready_after_shutdown() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
let srv1 = srv.clone().bind();
let srv2 = srv1.clone();
let (tx, rx) = oneshot::channel();
spawn(async move {
select(rx, poll_fn(|cx| srv1.poll_ready(cx))).await;
poll_fn(|cx| srv1.poll_shutdown(cx)).await;
time::sleep(time::Millis(25000)).await;
drop(srv1);
});
time::sleep(time::Millis(250)).await;
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
let _ = tx.send(());
time::sleep(time::Millis(250)).await;
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);
con.notify();
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
}
#[ntex::test]
async fn test_shared_call() {
let data = Rc::new(RefCell::new(Vec::new()));

View file

@ -245,6 +245,15 @@ where
}
}
impl<S, R> Drop for PipelineBinding<S, R>
where
S: Service<R>,
{
fn drop(&mut self) {
self.st = cell::UnsafeCell::new(State::New);
}
}
impl<S, R> Clone for PipelineBinding<S, R>
where
S: Service<R>,
@ -318,6 +327,15 @@ struct CheckReadiness<S: 'static, F, Fut> {
impl<S, F, Fut> Unpin for CheckReadiness<S, F, Fut> {}
impl<S, F, Fut> Drop for CheckReadiness<S, F, Fut> {
fn drop(&mut self) {
// future fot dropped during polling, we must notify other waiters
if self.fut.is_some() {
self.pl.waiters.notify();
}
}
}
impl<T, S, F, Fut> Future for CheckReadiness<S, F, Fut>
where
F: Fn(&'static Pipeline<S>) -> Fut,