From effce6915f4c72e001b7c60e87a7b3b1b0a69356 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 29 Sep 2024 16:39:26 +0500 Subject: [PATCH] Notify readiness waiters if ready call get dropped (#431) --- ntex-service/CHANGES.md | 4 ++ ntex-service/Cargo.toml | 2 +- ntex-service/src/ctx.rs | 136 +++++++++++++++++++++++++++++------ ntex-service/src/pipeline.rs | 18 +++++ ntex/Cargo.toml | 2 +- 5 files changed, 138 insertions(+), 24 deletions(-) diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 1cd222a6..b399c3f8 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [3.1.0] - 2024-09-29 + +* Notify readiness waiters if ready call get dropped + ## [3.0.0] - 2024-05-28 * Use "async fn" for Service::ready() and Service::shutdown() methods diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index d30a8496..519ef949 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "3.0.0" +version = "3.1.0" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 049efad8..db0da28f 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,4 +1,4 @@ -use std::{cell, fmt, future::poll_fn, future::Future, marker, pin, rc::Rc, task}; +use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task}; use crate::Service; @@ -146,26 +146,15 @@ impl<'a, S> ServiceCtx<'a, S> { T: Service, { // check readiness and notify waiters - let mut fut = svc.ready(ServiceCtx { - idx: self.idx, - waiters: self.waiters, - _t: marker::PhantomData, - }); - - poll_fn(|cx| { - if self.waiters.can_check(self.idx, cx) { - // SAFETY: `fut` never moves - let p = unsafe { pin::Pin::new_unchecked(&mut fut) }; - match p.poll(cx) { - task::Poll::Pending => self.waiters.register(self.idx, cx), - task::Poll::Ready(res) => { - self.waiters.notify(); - return task::Poll::Ready(res); - } - } - } - task::Poll::Pending - }) + ReadyCall { + completed: false, + fut: svc.ready(ServiceCtx { + idx: self.idx, + waiters: self.waiters, + _t: marker::PhantomData, + }), + ctx: *self, + } .await } @@ -230,11 +219,51 @@ impl<'a, S> fmt::Debug for ServiceCtx<'a, S> { } } +struct ReadyCall<'a, S: ?Sized, F: Future> { + completed: bool, + fut: F, + ctx: ServiceCtx<'a, S>, +} + +impl<'a, S: ?Sized, F: Future> Drop for ReadyCall<'a, S, F> { + fn drop(&mut self) { + if !self.completed { + self.ctx.waiters.notify(); + } + } +} + +impl<'a, S: ?Sized, F: Future> Unpin for ReadyCall<'a, S, F> {} + +impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> { + type Output = F::Output; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll { + if self.ctx.waiters.can_check(self.ctx.idx, cx) { + // SAFETY: `fut` never moves + let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) }; + match result { + task::Poll::Pending => self.ctx.waiters.register(self.ctx.idx, cx), + task::Poll::Ready(res) => { + self.completed = true; + self.ctx.waiters.notify(); + return task::Poll::Ready(res); + } + } + } + task::Poll::Pending + } +} + #[cfg(test)] mod tests { use std::{cell::Cell, cell::RefCell, future::poll_fn, task::Poll}; - use ntex_util::{channel::condition, future::lazy, time}; + use ntex_util::channel::{condition, oneshot}; + use ntex_util::{future::lazy, future::select, spawn, time}; use super::*; use crate::Pipeline; @@ -299,6 +328,69 @@ mod tests { assert_eq!(cnt.get(), 3); } + #[ntex::test] + async fn test_ready_on_drop() { + let cnt = Rc::new(Cell::new(0)); + let con = condition::Condition::new(); + let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); + + let srv1 = srv.clone(); + let srv2 = srv1.clone().bind(); + + let (tx, rx) = oneshot::channel(); + spawn(async move { + select(rx, srv1.ready()).await; + time::sleep(time::Millis(25000)).await; + drop(srv1); + }); + time::sleep(time::Millis(250)).await; + + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Pending); + + let _ = tx.send(()); + time::sleep(time::Millis(250)).await; + + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Pending); + + con.notify(); + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Ready(Ok(()))); + } + + #[ntex::test] + async fn test_ready_after_shutdown() { + let cnt = Rc::new(Cell::new(0)); + let con = condition::Condition::new(); + let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); + + let srv1 = srv.clone().bind(); + let srv2 = srv1.clone(); + + let (tx, rx) = oneshot::channel(); + spawn(async move { + select(rx, poll_fn(|cx| srv1.poll_ready(cx))).await; + poll_fn(|cx| srv1.poll_shutdown(cx)).await; + time::sleep(time::Millis(25000)).await; + drop(srv1); + }); + time::sleep(time::Millis(250)).await; + + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Pending); + + let _ = tx.send(()); + time::sleep(time::Millis(250)).await; + + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Pending); + + con.notify(); + let res = lazy(|cx| srv2.poll_ready(cx)).await; + assert_eq!(res, Poll::Ready(Ok(()))); + } + #[ntex::test] async fn test_shared_call() { let data = Rc::new(RefCell::new(Vec::new())); diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index bfd748f5..ea5b1883 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -245,6 +245,15 @@ where } } +impl Drop for PipelineBinding +where + S: Service, +{ + fn drop(&mut self) { + self.st = cell::UnsafeCell::new(State::New); + } +} + impl Clone for PipelineBinding where S: Service, @@ -318,6 +327,15 @@ struct CheckReadiness { impl Unpin for CheckReadiness {} +impl Drop for CheckReadiness { + fn drop(&mut self) { + // future fot dropped during polling, we must notify other waiters + if self.fut.is_some() { + self.pl.waiters.notify(); + } + } +} + impl Future for CheckReadiness where F: Fn(&'static Pipeline) -> Fut, diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index ca9c65f5..e4093759 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -64,7 +64,7 @@ brotli = ["dep:brotli2"] ntex-codec = "0.6.2" ntex-http = "0.1.12" ntex-router = "0.5.3" -ntex-service = "3.0" +ntex-service = "3.1" ntex-macros = "0.1.3" ntex-util = "2" ntex-bytes = "0.1.27"