diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index 55030acd..6c5802d3 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0] - 2024-01-09 + +* Release + ## [2.0.0-b.0] - 2024-01-07 * Use "async fn" in trait for Service definition diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 3594920c..c14eb27c 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "2.0.0-b.0" +version = "2.0.0" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] @@ -16,9 +16,8 @@ name = "ntex_service" path = "src/lib.rs" [dependencies] -pin-project-lite = "0.2.6" slab = "0.4" [dev-dependencies] ntex = { version = "1.0.0-b.0", features = ["tokio"] } -ntex-util = "1.0.0-b.0" +ntex-util = "1.0.0-b.1" diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 61a77679..8a7ba56f 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -217,7 +217,7 @@ mod tests { } } - #[derive(Copy, Clone, Debug, PartialEq, Eq)] + #[derive(Debug, PartialEq, Eq)] struct Err; impl From<()> for Err { @@ -296,6 +296,8 @@ mod tests { assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv", ())); format!("{:?}", new_srv); + + assert!(Err == Err::from(())); } #[ntex::test] diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index fd16b9c2..9baef377 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -217,6 +217,7 @@ mod tests { req: &'static str, ctx: ServiceCtx<'_, Self>, ) -> Result { + format!("{:?}", ctx); let _ = ctx.clone(); Ok(req) } @@ -285,33 +286,4 @@ mod tests { assert_eq!(cnt.get(), 5); assert_eq!(&*data.borrow(), &["srv2", "srv1"]); } - - // #[ntex::test] - // async fn test_advance_to_call() { - // let cnt = Rc::new(Cell::new(0)); - // let con = condition::Condition::new(); - // let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); - - // let mut fut = srv.call("test").advance_to_call(); - // let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - // con.notify(); - - // let res = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - // assert!(res.is_ready()); - // } - - // #[ntex::test] - // #[should_panic] - // async fn test_advance_to_call_panic() { - // let cnt = Rc::new(Cell::new(0)); - // let con = condition::Condition::new(); - // let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); - - // let mut fut = srv.call("test"); - // let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - // con.notify(); - - // let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; - // let _f = fut.advance_to_call(); - // } } diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index edf5245f..c4d3c356 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -69,7 +69,7 @@ pub use self::pipeline::{Pipeline, PipelineCall}; /// type Response = u64; /// type Error = Infallible; /// -/// async fn call(&self, req: u8, _: ServiceCtx<'_, Self>) -> Result { +/// async fn call(&self, req: u8, ctx: ServiceCtx<'_, Self>) -> Result { /// Ok(req as u64) /// } /// } @@ -82,7 +82,7 @@ pub use self::pipeline::{Pipeline, PipelineCall}; /// async fn my_service(req: u8) -> Result; /// ``` /// -/// Service cannot be called directly, it must be wrapped to an instance of [`Container`] or +/// Service cannot be called directly, it must be wrapped to an instance of [`Pipeline``] or /// by using `ctx` argument of the call method in case of chanined services. /// pub trait Service { diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 87210855..6d394017 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -137,37 +137,35 @@ impl Clone for Pipeline { type BoxFuture<'a, T> = Pin + 'a>>; -pin_project_lite::pin_project! { - #[must_use = "futures do nothing unless polled"] - pub struct PipelineCall - where - S: Service, - S: 'static, - R: 'static, - { - #[pin] - state: PipelineCallState, - pipeline: Pipeline, - } +#[allow(missing_debug_implementations)] +#[must_use = "futures do nothing unless polled"] +pub struct PipelineCall +where + S: Service, + R: 'static, +{ + state: PipelineCallState, + pipeline: Pipeline, } -pin_project_lite::pin_project! { - #[project = PipelineCallStateProject] - enum PipelineCallState - where - S: Service, - S: 'static, - Req: 'static, - { - Ready { req: Option }, - Call { #[pin] fut: BoxFuture<'static, Result> }, - Empty, - } +impl, R> Unpin for PipelineCall {} + +enum PipelineCallState +where + S: Service, + Req: 'static, +{ + Ready { + req: Option, + }, + Call { + fut: BoxFuture<'static, Result>, + }, } impl PipelineCallState where - S: Service + 'static, + S: Service, R: 'static, { fn new_call<'a>(pl: &'a Pipeline, req: R) -> Self { @@ -176,7 +174,7 @@ where Box::pin(pl.get_ref().call(req, ctx)); // SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc` - // Pipeline::svc is heap allocated(Rc), we keep it alive until + // Pipeline::svc is heap allocated(Rc), and it is being kept alive until // `svc_call` get resolved to result let fut = unsafe { std::mem::transmute(svc_call) }; @@ -192,23 +190,20 @@ where #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); + let mut slf = self.as_mut(); - match this.state.as_mut().project() { - PipelineCallStateProject::Ready { req } => { - task::ready!(this.pipeline.poll_ready(cx))?; - - let st = PipelineCallState::new_call(this.pipeline, req.take().unwrap()); - this.state.set(st); - self.poll(cx) - } - PipelineCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| { - this.state.set(PipelineCallState::Empty); - r - }), - PipelineCallStateProject::Empty => { - panic!("future must not be polled after it returned `Poll::Ready`") - } + if let PipelineCallState::Call { ref mut fut, .. } = slf.state { + return Pin::new(fut).poll(cx); } + + task::ready!(slf.pipeline.poll_ready(cx))?; + + let req = if let PipelineCallState::Ready { ref mut req } = slf.state { + req.take().unwrap() + } else { + panic!("future must not be polled after it returned `Poll::Ready`") + }; + slf.state = PipelineCallState::new_call(&slf.pipeline, req); + slf.poll(cx) } } diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index 037d254c..2f42ed73 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -252,10 +252,8 @@ fn connector( ) -> impl Service + fmt::Debug { TimeoutService::new( timeout, - apply_fn(connector, |msg: Connect, srv| { - Box::pin( - async move { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)).await }, - ) + apply_fn(connector, |msg: Connect, svc| async move { + svc.call(TcpConnect::new(msg.uri).set_addr(msg.addr)).await }) .map(move |io: IoBoxed| { io.set_disconnect_timeout(disconnect_timeout);