Various cleanups (#282)

This commit is contained in:
Nikolay Kim 2024-01-09 19:35:38 +06:00 committed by GitHub
parent 48eb1c5e54
commit 5869141954
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 51 additions and 81 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.0.0] - 2024-01-09
* Release
## [2.0.0-b.0] - 2024-01-07 ## [2.0.0-b.0] - 2024-01-07
* Use "async fn" in trait for Service definition * Use "async fn" in trait for Service definition

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-service" name = "ntex-service"
version = "2.0.0-b.0" version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service" description = "ntex service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -16,9 +16,8 @@ name = "ntex_service"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
pin-project-lite = "0.2.6"
slab = "0.4" slab = "0.4"
[dev-dependencies] [dev-dependencies]
ntex = { version = "1.0.0-b.0", features = ["tokio"] } ntex = { version = "1.0.0-b.0", features = ["tokio"] }
ntex-util = "1.0.0-b.0" ntex-util = "1.0.0-b.1"

View file

@ -217,7 +217,7 @@ mod tests {
} }
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
struct Err; struct Err;
impl From<()> for Err { impl From<()> for Err {
@ -296,6 +296,8 @@ mod tests {
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv", ())); assert_eq!(res.unwrap(), ("srv", ()));
format!("{:?}", new_srv); format!("{:?}", new_srv);
assert!(Err == Err::from(()));
} }
#[ntex::test] #[ntex::test]

View file

@ -217,6 +217,7 @@ mod tests {
req: &'static str, req: &'static str,
ctx: ServiceCtx<'_, Self>, ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> { ) -> Result<Self::Response, Self::Error> {
format!("{:?}", ctx);
let _ = ctx.clone(); let _ = ctx.clone();
Ok(req) Ok(req)
} }
@ -285,33 +286,4 @@ mod tests {
assert_eq!(cnt.get(), 5); assert_eq!(cnt.get(), 5);
assert_eq!(&*data.borrow(), &["srv2", "srv1"]); assert_eq!(&*data.borrow(), &["srv2", "srv1"]);
} }
// #[ntex::test]
// async fn test_advance_to_call() {
// let cnt = Rc::new(Cell::new(0));
// let con = condition::Condition::new();
// let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
// let mut fut = srv.call("test").advance_to_call();
// let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// con.notify();
// let res = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// assert!(res.is_ready());
// }
// #[ntex::test]
// #[should_panic]
// async fn test_advance_to_call_panic() {
// let cnt = Rc::new(Cell::new(0));
// let con = condition::Condition::new();
// let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));
// let mut fut = srv.call("test");
// let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// con.notify();
// let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
// let _f = fut.advance_to_call();
// }
} }

View file

@ -69,7 +69,7 @@ pub use self::pipeline::{Pipeline, PipelineCall};
/// type Response = u64; /// type Response = u64;
/// type Error = Infallible; /// type Error = Infallible;
/// ///
/// async fn call(&self, req: u8, _: ServiceCtx<'_, Self>) -> Result<Self::Response, Self::Error> { /// async fn call(&self, req: u8, ctx: ServiceCtx<'_, Self>) -> Result<Self::Response, Self::Error> {
/// Ok(req as u64) /// Ok(req as u64)
/// } /// }
/// } /// }
@ -82,7 +82,7 @@ pub use self::pipeline::{Pipeline, PipelineCall};
/// async fn my_service(req: u8) -> Result<u64, Infallible>; /// async fn my_service(req: u8) -> Result<u64, Infallible>;
/// ``` /// ```
/// ///
/// Service cannot be called directly, it must be wrapped to an instance of [`Container`] or /// Service cannot be called directly, it must be wrapped to an instance of [`Pipeline``] or
/// by using `ctx` argument of the call method in case of chanined services. /// by using `ctx` argument of the call method in case of chanined services.
/// ///
pub trait Service<Req> { pub trait Service<Req> {

View file

@ -137,37 +137,35 @@ impl<S> Clone for Pipeline<S> {
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>; type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pin_project_lite::pin_project! { #[allow(missing_debug_implementations)]
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct PipelineCall<S, R> pub struct PipelineCall<S, R>
where where
S: Service<R>, S: Service<R>,
S: 'static, R: 'static,
R: 'static, {
{ state: PipelineCallState<S, R>,
#[pin] pipeline: Pipeline<S>,
state: PipelineCallState<S, R>,
pipeline: Pipeline<S>,
}
} }
pin_project_lite::pin_project! { impl<S: Service<R>, R> Unpin for PipelineCall<S, R> {}
#[project = PipelineCallStateProject]
enum PipelineCallState<S, Req> enum PipelineCallState<S, Req>
where where
S: Service<Req>, S: Service<Req>,
S: 'static, Req: 'static,
Req: 'static, {
{ Ready {
Ready { req: Option<Req> }, req: Option<Req>,
Call { #[pin] fut: BoxFuture<'static, Result<S::Response, S::Error>> }, },
Empty, Call {
} fut: BoxFuture<'static, Result<S::Response, S::Error>>,
},
} }
impl<S, R> PipelineCallState<S, R> impl<S, R> PipelineCallState<S, R>
where where
S: Service<R> + 'static, S: Service<R>,
R: 'static, R: 'static,
{ {
fn new_call<'a>(pl: &'a Pipeline<S>, req: R) -> Self { fn new_call<'a>(pl: &'a Pipeline<S>, req: R) -> Self {
@ -176,7 +174,7 @@ where
Box::pin(pl.get_ref().call(req, ctx)); Box::pin(pl.get_ref().call(req, ctx));
// SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc` // SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until // Pipeline::svc is heap allocated(Rc<S>), and it is being kept alive until
// `svc_call` get resolved to result // `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) }; let fut = unsafe { std::mem::transmute(svc_call) };
@ -192,23 +190,20 @@ where
#[inline] #[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut slf = self.as_mut();
match this.state.as_mut().project() { if let PipelineCallState::Call { ref mut fut, .. } = slf.state {
PipelineCallStateProject::Ready { req } => { return Pin::new(fut).poll(cx);
task::ready!(this.pipeline.poll_ready(cx))?;
let st = PipelineCallState::new_call(this.pipeline, req.take().unwrap());
this.state.set(st);
self.poll(cx)
}
PipelineCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| {
this.state.set(PipelineCallState::Empty);
r
}),
PipelineCallStateProject::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
} }
task::ready!(slf.pipeline.poll_ready(cx))?;
let req = if let PipelineCallState::Ready { ref mut req } = slf.state {
req.take().unwrap()
} else {
panic!("future must not be polled after it returned `Poll::Ready`")
};
slf.state = PipelineCallState::new_call(&slf.pipeline, req);
slf.poll(cx)
} }
} }

View file

@ -252,10 +252,8 @@ fn connector(
) -> impl Service<Connect, Response = IoBoxed, Error = ConnectError> + fmt::Debug { ) -> impl Service<Connect, Response = IoBoxed, Error = ConnectError> + fmt::Debug {
TimeoutService::new( TimeoutService::new(
timeout, timeout,
apply_fn(connector, |msg: Connect, srv| { apply_fn(connector, |msg: Connect, svc| async move {
Box::pin( svc.call(TcpConnect::new(msg.uri).set_addr(msg.addr)).await
async move { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)).await },
)
}) })
.map(move |io: IoBoxed| { .map(move |io: IoBoxed| {
io.set_disconnect_timeout(disconnect_timeout); io.set_disconnect_timeout(disconnect_timeout);