Refactor service

This commit is contained in:
Nikolay Kim 2024-11-02 10:26:45 +05:00
parent d8f2c87781
commit 9ecf7c7e6c
5 changed files with 107 additions and 213 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "3.2.1"
version = "3.3.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]

View file

@ -1,4 +1,4 @@
use std::{fmt, future::Future, pin::Pin, rc::Rc};
use std::{fmt, future::Future, pin::Pin};
use crate::ctx::{ServiceCtx, WaitersRef};
@ -50,15 +50,15 @@ trait ServiceObj<Req> {
fn ready<'a>(
&'a self,
idx: usize,
waiters: &'a Rc<WaitersRef>,
idx: u32,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, (), Self::Error>;
fn call<'a>(
&'a self,
req: Req,
idx: usize,
waiters: &'a Rc<WaitersRef>,
idx: u32,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, Self::Response, Self::Error>;
fn shutdown<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
@ -75,14 +75,10 @@ where
#[inline]
fn ready<'a>(
&'a self,
idx: usize,
waiters: &'a Rc<WaitersRef>,
idx: u32,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, (), Self::Error> {
Box::pin(async move {
ServiceCtx::<'a, S>::from_ref(idx, waiters)
.ready(self)
.await
})
Box::pin(async move { ServiceCtx::<'a, S>::new(idx, waiters).ready(self).await })
}
#[inline]
@ -94,11 +90,11 @@ where
fn call<'a>(
&'a self,
req: Req,
idx: usize,
waiters: &'a Rc<WaitersRef>,
idx: u32,
waiters: &'a WaitersRef,
) -> BoxFuture<'a, Self::Response, Self::Error> {
Box::pin(async move {
ServiceCtx::<'a, S>::from_ref(idx, waiters)
ServiceCtx::<'a, S>::new(idx, waiters)
.call_nowait(self, req)
.await
})

View file

@ -3,69 +3,67 @@ use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task, task::Conte
use crate::Service;
pub struct ServiceCtx<'a, S: ?Sized> {
idx: usize,
waiters: &'a Rc<WaitersRef>,
idx: u32,
waiters: &'a WaitersRef,
_t: marker::PhantomData<Rc<S>>,
}
#[derive(Clone, Debug)]
/// Pipeline tag allows to notify pipeline binding
pub struct PipelineTag(Rc<WaitersRef>);
pub(crate) struct Waiters {
index: usize,
waiters: Rc<WaitersRef>,
}
#[derive(Debug)]
pub(crate) struct WaitersRef {
cur: cell::Cell<usize>,
cur: cell::Cell<u32>,
indexes: cell::UnsafeCell<slab::Slab<Option<task::Waker>>>,
}
impl PipelineTag {
/// Notify pipeline dispatcher
pub fn notify(&self) {
if let Some(waker) = self.0.get()[0].take() {
waker.wake();
}
}
}
impl WaitersRef {
pub(crate) fn new() -> (u32, Self) {
let mut waiters = slab::Slab::new();
let index = waiters.insert(Default::default()) as u32;
(
index,
WaitersRef {
cur: cell::Cell::new(u32::MAX),
indexes: cell::UnsafeCell::new(waiters),
},
)
}
#[allow(clippy::mut_from_ref)]
fn get(&self) -> &mut slab::Slab<Option<task::Waker>> {
pub(crate) fn get(&self) -> &mut slab::Slab<Option<task::Waker>> {
unsafe { &mut *self.indexes.get() }
}
fn insert(&self) -> usize {
self.get().insert(None)
pub(crate) fn insert(&self) -> u32 {
self.get().insert(None) as u32
}
fn remove(&self, idx: usize) {
self.notify();
self.get().remove(idx);
pub(crate) fn remove(&self, idx: u32) {
self.get().remove(idx as usize);
if self.cur.get() == idx {
self.notify();
}
}
fn register(&self, idx: usize, cx: &mut Context<'_>) {
self.get()[idx] = Some(cx.waker().clone());
pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) {
self.get()[idx as usize] = Some(cx.waker().clone());
}
fn notify(&self) {
pub(crate) fn notify(&self) {
for (_, waker) in self.get().iter_mut().skip(1) {
if let Some(waker) = waker.take() {
waker.wake();
}
}
self.cur.set(usize::MAX);
self.cur.set(u32::MAX);
}
pub(crate) fn can_check(&self, idx: usize, cx: &mut Context<'_>) -> bool {
pub(crate) fn can_check(&self, idx: u32, cx: &mut Context<'_>) -> bool {
let cur = self.cur.get();
if cur == idx {
true
} else if cur == usize::MAX {
} else if cur == u32::MAX {
self.cur.set(idx);
true
} else {
@ -75,81 +73,8 @@ impl WaitersRef {
}
}
impl Waiters {
pub(crate) fn new() -> Self {
let mut waiters = slab::Slab::new();
// first insert for wake ups from services
let _ = waiters.insert(None);
Waiters {
index: waiters.insert(None),
waiters: Rc::new(WaitersRef {
cur: cell::Cell::new(usize::MAX),
indexes: cell::UnsafeCell::new(waiters),
}),
}
}
pub(crate) fn get_ref(&self) -> &Rc<WaitersRef> {
&self.waiters
}
pub(crate) fn can_check(&self, cx: &mut Context<'_>) -> bool {
self.waiters.can_check(self.index, cx)
}
pub(crate) fn register(&self, cx: &mut Context<'_>) {
self.waiters.register(self.index, cx);
}
pub(crate) fn register_pipeline(&self, cx: &mut Context<'_>) {
self.waiters.register(0, cx);
}
pub(crate) fn notify(&self) {
if self.waiters.cur.get() == self.index {
self.waiters.notify();
}
}
}
impl Drop for Waiters {
#[inline]
fn drop(&mut self) {
self.waiters.remove(self.index);
self.notify();
}
}
impl Clone for Waiters {
fn clone(&self) -> Self {
Waiters {
index: self.waiters.insert(),
waiters: self.waiters.clone(),
}
}
}
impl fmt::Debug for Waiters {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Waiters")
.field("index", &self.index)
.field("waiters", &self.waiters.get().len())
.finish()
}
}
impl<'a, S> ServiceCtx<'a, S> {
pub(crate) fn new(waiters: &'a Waiters) -> Self {
Self {
idx: waiters.index,
waiters: waiters.get_ref(),
_t: marker::PhantomData,
}
}
pub(crate) fn from_ref(idx: usize, waiters: &'a Rc<WaitersRef>) -> Self {
pub(crate) fn new(idx: u32, waiters: &'a WaitersRef) -> Self {
Self {
idx,
waiters,
@ -157,7 +82,7 @@ impl<'a, S> ServiceCtx<'a, S> {
}
}
pub(crate) fn inner(self) -> (usize, &'a Rc<WaitersRef>) {
pub(crate) fn inner(self) -> (u32, &'a WaitersRef) {
(self.idx, self.waiters)
}
@ -220,11 +145,6 @@ impl<'a, S> ServiceCtx<'a, S> {
)
.await
}
/// Get pipeline tag for current pipeline
pub fn tag(&self) -> PipelineTag {
PipelineTag(self.waiters.clone())
}
}
impl<'a, S> Copy for ServiceCtx<'a, S> {}
@ -453,50 +373,4 @@ mod tests {
assert_eq!(cnt.get(), 2);
assert_eq!(&*data.borrow(), &["srv1", "srv2"]);
}
#[ntex::test]
async fn test_pipeline_tag() {
struct Srv(Rc<Cell<usize>>, Cell<Option<PipelineTag>>);
impl Service<&'static str> for Srv {
type Response = &'static str;
type Error = ();
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.1.set(Some(ctx.tag()));
self.0.set(self.0.get() + 1);
Ok(())
}
async fn call(
&self,
req: &'static str,
_: ServiceCtx<'_, Self>,
) -> Result<&'static str, ()> {
Ok(req)
}
}
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), Cell::new(None))).bind();
let srv1 = srv.clone();
let waiter = con.wait();
ntex::rt::spawn(async move {
let _ = poll_fn(|cx| {
let _ = srv1.poll_ready(cx);
waiter.poll_ready(cx)
})
.await;
});
time::sleep(time::Millis(50)).await;
assert_eq!(cnt.get(), 1);
let tag = srv.get_ref().1.take().unwrap();
tag.notify();
time::sleep(time::Millis(50)).await;
assert_eq!(cnt.get(), 2);
}
}

View file

@ -27,7 +27,7 @@ mod util;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::chain::{chain, chain_factory};
pub use self::ctx::{PipelineTag, ServiceCtx};
pub use self::ctx::ServiceCtx;
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::fn_shutdown::fn_shutdown;
pub use self::map_config::{map_config, unit_config};

View file

@ -1,30 +1,42 @@
use std::{cell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
use crate::{ctx::Waiters, Service, ServiceCtx};
use crate::{ctx::WaitersRef, Service, ServiceCtx};
#[derive(Debug)]
/// Container for a service.
///
/// Container allows to call enclosed service and adds support of shared readiness.
pub struct Pipeline<S> {
svc: Rc<S>,
pub(crate) waiters: Waiters,
index: u32,
state: Rc<PipelineState<S>>,
}
struct PipelineState<S> {
svc: S,
waiters: WaitersRef,
}
impl<S> PipelineState<S> {
pub(crate) fn waiters_ref(&self) -> &WaitersRef {
&self.waiters
}
}
impl<S> Pipeline<S> {
#[inline]
/// Construct new container instance.
pub fn new(svc: S) -> Self {
let (index, waiters) = WaitersRef::new();
Pipeline {
svc: Rc::new(svc),
waiters: Waiters::new(),
index,
state: Rc::new(PipelineState { svc, waiters }),
}
}
#[inline]
/// Return reference to enclosed service
pub fn get_ref(&self) -> &S {
self.svc.as_ref()
&self.state.svc
}
#[inline]
@ -33,8 +45,8 @@ impl<S> Pipeline<S> {
where
S: Service<R>,
{
ServiceCtx::<'_, S>::new(&self.waiters)
.ready(self.svc.as_ref())
ServiceCtx::<'_, S>::new(self.index, self.state.waiters_ref())
.ready(&self.state.svc)
.await
}
@ -45,13 +57,9 @@ impl<S> Pipeline<S> {
where
S: Service<R>,
{
let ctx = ServiceCtx::<'_, S>::new(&self.waiters);
// check service readiness
self.svc.as_ref().ready(ctx).await?;
// call service
self.svc.as_ref().call(req, ctx).await
ServiceCtx::<'_, S>::new(self.index, self.state.waiters_ref())
.call(&self.state.svc, req)
.await
}
#[inline]
@ -66,8 +74,8 @@ impl<S> Pipeline<S> {
PipelineCall {
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call(pl.svc.as_ref(), req)
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
.call(&pl.state.svc, req)
.await
}),
}
@ -86,8 +94,8 @@ impl<S> Pipeline<S> {
PipelineCall {
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call_nowait(pl.svc.as_ref(), req)
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
.call_nowait(&pl.state.svc, req)
.await
}),
}
@ -99,11 +107,11 @@ impl<S> Pipeline<S> {
where
S: Service<R>,
{
self.svc.as_ref().shutdown().await
self.state.svc.shutdown().await
}
#[inline]
/// Convert to lifetime object.
/// Get current pipeline.
pub fn bind<R>(self) -> PipelineBinding<S, R>
where
S: Service<R> + 'static,
@ -121,15 +129,29 @@ impl<S> From<S> for Pipeline<S> {
}
impl<S> Clone for Pipeline<S> {
#[inline]
fn clone(&self) -> Self {
Self {
svc: self.svc.clone(),
waiters: self.waiters.clone(),
Pipeline {
index: self.state.waiters.insert(),
state: self.state.clone(),
}
}
}
impl<S> Drop for Pipeline<S> {
#[inline]
fn drop(&mut self) {
self.state.waiters.remove(self.index);
}
}
impl<S> fmt::Debug for PipelineState<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipelineState")
.field("waiters", &self.waiters.get().len())
.finish()
}
}
/// Bound container for a service.
pub struct PipelineBinding<S, R>
where
@ -160,7 +182,7 @@ where
#[inline]
/// Return reference to enclosed service
pub fn get_ref(&self) -> &S {
self.pl.svc.as_ref()
&self.pl.state.svc
}
#[inline]
@ -215,8 +237,8 @@ where
PipelineCall {
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call(pl.svc.as_ref(), req)
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
.call(&pl.state.svc, req)
.await
}),
}
@ -231,8 +253,8 @@ where
PipelineCall {
fut: Box::pin(async move {
ServiceCtx::<S>::new(&pl.waiters)
.call_nowait(pl.svc.as_ref(), req)
ServiceCtx::<S>::new(pl.index, pl.state.waiters_ref())
.call_nowait(&pl.state.svc, req)
.await
}),
}
@ -241,7 +263,7 @@ where
#[inline]
/// Shutdown enclosed service.
pub async fn shutdown(&self) {
self.pl.svc.as_ref().shutdown().await
self.pl.state.svc.shutdown().await
}
}
@ -316,7 +338,9 @@ where
S: Service<R>,
R: 'static,
{
pl.svc.ready(ServiceCtx::<'_, S>::new(&pl.waiters))
pl.state
.svc
.ready(ServiceCtx::<'_, S>::new(pl.index, pl.state.waiters_ref()))
}
struct CheckReadiness<S: 'static, F, Fut> {
@ -331,7 +355,7 @@ impl<S, F, Fut> Drop for CheckReadiness<S, F, Fut> {
fn drop(&mut self) {
// future fot dropped during polling, we must notify other waiters
if self.fut.is_some() {
self.pl.waiters.notify();
self.pl.state.waiters.notify();
}
}
}
@ -347,18 +371,18 @@ where
let mut slf = self.as_mut();
// register pipeline tag
slf.pl.waiters.register_pipeline(cx);
// slf.pl.state.waiters.register_pipeline(cx);
if slf.pl.waiters.can_check(cx) {
if slf.pl.state.waiters.can_check(slf.pl.index, cx) {
if let Some(ref mut fut) = slf.fut {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => {
slf.pl.waiters.register(cx);
slf.pl.state.waiters.register(slf.pl.index, cx);
Poll::Pending
}
Poll::Ready(res) => {
let _ = slf.fut.take();
slf.pl.waiters.notify();
slf.pl.state.waiters.notify();
Poll::Ready(res)
}
}