This commit is contained in:
Nikolay Kim 2024-11-02 12:04:41 +05:00
parent aec2908c96
commit 8502e834c7
2 changed files with 17 additions and 13 deletions

View file

@ -1,5 +1,5 @@
use std::task::{Context, Poll, Waker};
use std::{cell, collections::VecDeque, fmt, future::Future, marker, pin::Pin, rc::Rc};
use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc};
use crate::Service;
@ -12,7 +12,7 @@ pub struct ServiceCtx<'a, S: ?Sized> {
#[derive(Debug)]
pub(crate) struct WaitersRef {
cur: cell::Cell<u32>,
wakers: cell::UnsafeCell<VecDeque<u32>>,
wakers: cell::UnsafeCell<Vec<u32>>,
indexes: cell::UnsafeCell<slab::Slab<Option<Waker>>>,
}
@ -28,7 +28,7 @@ impl WaitersRef {
WaitersRef {
cur: cell::Cell::new(u32::MAX),
indexes: cell::UnsafeCell::new(waiters),
wakers: cell::UnsafeCell::new(VecDeque::default()),
wakers: cell::UnsafeCell::new(Vec::default()),
},
)
}
@ -39,7 +39,7 @@ impl WaitersRef {
}
#[allow(clippy::mut_from_ref)]
pub(crate) fn get_wakers(&self) -> &mut VecDeque<u32> {
pub(crate) fn get_wakers(&self) -> &mut Vec<u32> {
unsafe { &mut *self.wakers.get() }
}
@ -57,12 +57,12 @@ impl WaitersRef {
pub(crate) fn register(&self, idx: u32, cx: &mut Context<'_>) {
self.get()[idx as usize] = Some(cx.waker().clone());
self.get_wakers().push_back(idx);
self.get_wakers().push(idx);
}
pub(crate) fn register_unready(&self, cx: &mut Context<'_>) {
self.get()[0] = Some(cx.waker().clone());
self.get_wakers().push_back(0);
self.get_wakers().push(0);
}
pub(crate) fn notify(&self) {

View file

@ -160,7 +160,7 @@ where
{
pl: Pipeline<S>,
st: cell::UnsafeCell<State<S::Error>>,
not_ready: cell::UnsafeCell<State<S::Error>>,
not_ready: cell::UnsafeCell<StateNotReady<S::Error>>,
}
enum State<E> {
@ -169,6 +169,11 @@ enum State<E> {
Shutdown(Pin<Box<dyn Future<Output = ()> + 'static>>),
}
enum StateNotReady<E> {
New,
Readiness(Pin<Box<dyn Future<Output = Result<(), E>> + 'static>>),
}
impl<S, R> PipelineBinding<S, R>
where
S: Service<R> + 'static,
@ -178,7 +183,7 @@ where
PipelineBinding {
pl,
st: cell::UnsafeCell::new(State::New),
not_ready: cell::UnsafeCell::new(State::New),
not_ready: cell::UnsafeCell::new(StateNotReady::New),
}
}
@ -220,7 +225,7 @@ where
let st = unsafe { &mut *self.not_ready.get() };
match st {
State::New => {
StateNotReady::New => {
// SAFETY: `fut` has same lifetime same as lifetime of `self.pl`.
// Pipeline::svc is heap allocated(Rc<S>), and it is being kept alive until
// `self` is alive
@ -230,11 +235,10 @@ where
f: not_ready,
pl,
});
*st = State::Readiness(fut);
*st = StateNotReady::Readiness(fut);
self.poll_not_ready(cx)
}
State::Readiness(ref mut fut) => Pin::new(fut).poll(cx),
State::Shutdown(_) => panic!("Pipeline is shutding down"),
StateNotReady::Readiness(ref mut fut) => Pin::new(fut).poll(cx),
}
}
@ -312,7 +316,7 @@ where
Self {
pl: self.pl.clone(),
st: cell::UnsafeCell::new(State::New),
not_ready: cell::UnsafeCell::new(State::New),
not_ready: cell::UnsafeCell::new(StateNotReady::New),
}
}
}