mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 13:27:39 +03:00
limit usage of Cell
This commit is contained in:
parent
89cebe5534
commit
c73e64226d
7 changed files with 51 additions and 34 deletions
|
@ -4,8 +4,8 @@ use std::cell::UnsafeCell;
|
|||
use std::fmt;
|
||||
use std::rc::Rc;
|
||||
|
||||
pub(crate) struct Cell<T> {
|
||||
pub(crate) inner: Rc<UnsafeCell<T>>,
|
||||
pub(super) struct Cell<T> {
|
||||
inner: Rc<UnsafeCell<T>>,
|
||||
}
|
||||
|
||||
impl<T> Clone for Cell<T> {
|
||||
|
@ -23,26 +23,26 @@ impl<T: fmt::Debug> fmt::Debug for Cell<T> {
|
|||
}
|
||||
|
||||
impl<T> Cell<T> {
|
||||
pub(crate) fn new(inner: T) -> Self {
|
||||
pub(super) fn new(inner: T) -> Self {
|
||||
Self {
|
||||
inner: Rc::new(UnsafeCell::new(inner)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn strong_count(&self) -> usize {
|
||||
pub(super) fn strong_count(&self) -> usize {
|
||||
Rc::strong_count(&self.inner)
|
||||
}
|
||||
|
||||
pub(crate) fn get_ref(&self) -> &T {
|
||||
pub(super) fn get_ref(&self) -> &T {
|
||||
unsafe { &*self.inner.as_ref().get() }
|
||||
}
|
||||
|
||||
pub(crate) fn get_mut(&mut self) -> &mut T {
|
||||
pub(super) fn get_mut(&mut self) -> &mut T {
|
||||
unsafe { &mut *self.inner.as_ref().get() }
|
||||
}
|
||||
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
|
||||
pub(super) unsafe fn get_mut_unchecked(&self) -> &mut T {
|
||||
&mut *self.inner.as_ref().get()
|
||||
}
|
||||
}
|
|
@ -4,8 +4,8 @@ use std::task::{Context, Poll};
|
|||
|
||||
use slab::Slab;
|
||||
|
||||
use super::cell::Cell;
|
||||
use crate::task::LocalWaker;
|
||||
use crate::util::Cell;
|
||||
|
||||
/// Condition allows to notify multiple receivers at the same time
|
||||
pub struct Condition(Cell<Inner>);
|
||||
|
@ -61,7 +61,7 @@ impl Waiter {
|
|||
pub fn poll_waiter(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
let inner = unsafe {
|
||||
self.inner
|
||||
.get_mut_unsafe()
|
||||
.get_mut_unchecked()
|
||||
.data
|
||||
.get_unchecked_mut(self.token)
|
||||
};
|
||||
|
@ -80,7 +80,7 @@ impl Waiter {
|
|||
|
||||
impl Clone for Waiter {
|
||||
fn clone(&self) -> Self {
|
||||
let token = unsafe { self.inner.get_mut_unsafe() }.data.insert(None);
|
||||
let token = unsafe { self.inner.get_mut_unchecked() }.data.insert(None);
|
||||
Waiter {
|
||||
token,
|
||||
inner: self.inner.clone(),
|
||||
|
@ -145,4 +145,22 @@ mod tests {
|
|||
assert_eq!(waiter.await, ());
|
||||
assert_eq!(waiter2.await, ());
|
||||
}
|
||||
|
||||
#[ntex_rt::test]
|
||||
async fn test_condition_poll() {
|
||||
let mut cond = Condition::new();
|
||||
let waiter = cond.wait();
|
||||
assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Pending);
|
||||
cond.notify();
|
||||
assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Ready(()));
|
||||
|
||||
let waiter = cond.wait();
|
||||
assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Pending);
|
||||
let waiter2 = waiter.clone();
|
||||
assert_eq!(lazy(|cx| waiter2.poll_waiter(cx)).await, Poll::Pending);
|
||||
|
||||
drop(cond);
|
||||
assert_eq!(lazy(|cx| waiter.poll_waiter(cx)).await, Poll::Ready(()));
|
||||
assert_eq!(lazy(|cx| waiter2.poll_waiter(cx)).await, Poll::Ready(()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
//! Communication primitives
|
||||
|
||||
mod cell;
|
||||
pub mod condition;
|
||||
pub mod mpsc;
|
||||
pub mod oneshot;
|
||||
|
|
|
@ -8,8 +8,8 @@ use std::task::{Context, Poll};
|
|||
|
||||
use futures::{Sink, Stream};
|
||||
|
||||
use super::cell::Cell;
|
||||
use crate::task::LocalWaker;
|
||||
use crate::util::Cell;
|
||||
|
||||
/// Creates a unbounded in-memory channel with buffered storage.
|
||||
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||
|
@ -45,7 +45,7 @@ impl<T> Unpin for Sender<T> {}
|
|||
impl<T> Sender<T> {
|
||||
/// Sends the provided message along this channel.
|
||||
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
|
||||
let shared = unsafe { self.shared.get_mut_unsafe() };
|
||||
let shared = unsafe { self.shared.get_mut_unchecked() };
|
||||
if !shared.has_receiver {
|
||||
return Err(SendError(item)); // receiver was dropped
|
||||
};
|
||||
|
|
|
@ -6,8 +6,8 @@ use std::task::{Context, Poll};
|
|||
pub use futures::channel::oneshot::Canceled;
|
||||
use slab::Slab;
|
||||
|
||||
use super::cell::Cell;
|
||||
use crate::task::LocalWaker;
|
||||
use crate::util::Cell;
|
||||
|
||||
/// Creates a new futures-aware, one-shot channel.
|
||||
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||
|
@ -127,7 +127,7 @@ struct PoolInner<T> {
|
|||
|
||||
impl<T> Pool<T> {
|
||||
pub fn channel(&self) -> (PSender<T>, PReceiver<T>) {
|
||||
let token = unsafe { self.0.get_mut_unsafe() }.insert(PoolInner {
|
||||
let token = unsafe { self.0.get_mut_unchecked() }.insert(PoolInner {
|
||||
flags: Flags::all(),
|
||||
value: None,
|
||||
waker: LocalWaker::default(),
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
mod cell;
|
||||
pub mod counter;
|
||||
pub mod either;
|
||||
pub mod framed;
|
||||
|
@ -9,6 +8,4 @@ pub mod stream;
|
|||
pub mod time;
|
||||
pub mod timeout;
|
||||
|
||||
pub(crate) use self::cell::Cell;
|
||||
|
||||
pub use self::either::either;
|
||||
|
|
|
@ -1,15 +1,16 @@
|
|||
use std::cell::RefCell;
|
||||
use std::convert::Infallible;
|
||||
use std::rc::Rc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{self, Duration, Instant};
|
||||
|
||||
use futures::future::{ok, ready, FutureExt, Ready};
|
||||
|
||||
use super::cell::Cell;
|
||||
use crate::rt::time::delay_for;
|
||||
use crate::service::{Service, ServiceFactory};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LowResTime(Cell<Inner>);
|
||||
pub struct LowResTime(Rc<RefCell<Inner>>);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Inner {
|
||||
|
@ -28,7 +29,7 @@ impl Inner {
|
|||
|
||||
impl LowResTime {
|
||||
pub fn with(resolution: Duration) -> LowResTime {
|
||||
LowResTime(Cell::new(Inner::new(resolution)))
|
||||
LowResTime(Rc::new(RefCell::new(Inner::new(resolution))))
|
||||
}
|
||||
|
||||
pub fn timer(&self) -> LowResTimeService {
|
||||
|
@ -38,7 +39,7 @@ impl LowResTime {
|
|||
|
||||
impl Default for LowResTime {
|
||||
fn default() -> Self {
|
||||
LowResTime(Cell::new(Inner::new(Duration::from_secs(1))))
|
||||
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1)))))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,30 +59,30 @@ impl ServiceFactory for LowResTime {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LowResTimeService(Cell<Inner>);
|
||||
pub struct LowResTimeService(Rc<RefCell<Inner>>);
|
||||
|
||||
impl LowResTimeService {
|
||||
pub fn with(resolution: Duration) -> LowResTimeService {
|
||||
LowResTimeService(Cell::new(Inner::new(resolution)))
|
||||
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution))))
|
||||
}
|
||||
|
||||
/// Get current time. This function has to be called from
|
||||
/// future's poll method, otherwise it panics.
|
||||
pub fn now(&self) -> Instant {
|
||||
let cur = self.0.get_ref().current;
|
||||
let cur = self.0.borrow().current;
|
||||
if let Some(cur) = cur {
|
||||
cur
|
||||
} else {
|
||||
let now = Instant::now();
|
||||
let mut inner = self.0.clone();
|
||||
let inner = self.0.clone();
|
||||
let interval = {
|
||||
let mut b = inner.get_mut();
|
||||
let mut b = inner.borrow_mut();
|
||||
b.current = Some(now);
|
||||
b.resolution
|
||||
};
|
||||
|
||||
crate::rt::spawn(delay_for(interval).then(move |_| {
|
||||
inner.get_mut().current.take();
|
||||
inner.borrow_mut().current.take();
|
||||
ready(())
|
||||
}));
|
||||
now
|
||||
|
@ -107,7 +108,7 @@ impl Service for LowResTimeService {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SystemTime(Cell<SystemTimeInner>);
|
||||
pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SystemTimeInner {
|
||||
|
@ -125,30 +126,30 @@ impl SystemTimeInner {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SystemTimeService(Cell<SystemTimeInner>);
|
||||
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
|
||||
|
||||
impl SystemTimeService {
|
||||
pub fn with(resolution: Duration) -> SystemTimeService {
|
||||
SystemTimeService(Cell::new(SystemTimeInner::new(resolution)))
|
||||
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution))))
|
||||
}
|
||||
|
||||
/// Get current time. This function has to be called from
|
||||
/// future's poll method, otherwise it panics.
|
||||
pub fn now(&self) -> time::SystemTime {
|
||||
let cur = self.0.get_ref().current;
|
||||
let cur = self.0.borrow().current;
|
||||
if let Some(cur) = cur {
|
||||
cur
|
||||
} else {
|
||||
let now = time::SystemTime::now();
|
||||
let mut inner = self.0.clone();
|
||||
let inner = self.0.clone();
|
||||
let interval = {
|
||||
let mut b = inner.get_mut();
|
||||
let mut b = inner.borrow_mut();
|
||||
b.current = Some(now);
|
||||
b.resolution
|
||||
};
|
||||
|
||||
crate::rt::spawn(delay_for(interval).then(move |_| {
|
||||
inner.get_mut().current.take();
|
||||
inner.borrow_mut().current.take();
|
||||
ready(())
|
||||
}));
|
||||
now
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue