Optimize Millis and Io memory layout (#100)

* Optimize Millis and Io memory layout
This commit is contained in:
Nikolay Kim 2022-01-27 01:25:44 +06:00 committed by GitHub
parent 321218f80b
commit cb7af434ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 221 additions and 117 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.6] - 2022-01-27
* Optimize Io memory layout
## [0.1.5] - 2022-01-23
* Add Eq,PartialEq,Hash,Debug impls to Io asn IoRef

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.1.5"
version = "0.1.6"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -18,7 +18,7 @@ path = "src/lib.rs"
[dependencies]
ntex-codec = "0.6.1"
ntex-bytes = "0.1.9"
ntex-util = "0.1.9"
ntex-util = "0.1.12"
ntex-service = "0.3.1"
bitflags = "1.3"

View file

@ -1,10 +1,14 @@
use std::cell::{Cell, RefCell};
use std::cell::Cell;
use std::task::{Context, Poll};
use std::{fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc, time};
use std::{
fmt, future::Future, hash, io, marker, mem, ops::Deref, pin::Pin, ptr, rc::Rc, time,
};
use ntex_bytes::{BytesMut, PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder};
use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Millis};
use ntex_util::{
future::poll_fn, future::Either, task::LocalWaker, time::now, time::Millis,
};
use super::filter::{Base, NullFilter};
use super::seal::Sealed;
@ -38,12 +42,10 @@ bitflags::bitflags! {
const DSP_STOP = 0b0000_0010_0000_0000;
/// keep-alive timeout occured
const DSP_KEEPALIVE = 0b0000_0100_0000_0000;
}
}
enum FilterItem<F> {
Boxed(Sealed),
Ptr(*mut F),
/// keep-alive timeout started
const KEEPALIVE = 0b0001_0000_0000_0000;
}
}
/// Interface object to underlying io stream
@ -64,8 +66,8 @@ pub(crate) struct IoState {
pub(super) write_buf: Cell<Option<BytesMut>>,
pub(super) filter: Cell<&'static dyn Filter>,
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
pub(super) on_disconnect: RefCell<Vec<Option<LocalWaker>>>,
keepalive: Cell<Option<time::Instant>>,
pub(super) on_disconnect: Cell<Option<Box<Vec<LocalWaker>>>>,
keepalive: Cell<time::Instant>,
}
impl IoState {
@ -87,19 +89,19 @@ impl IoState {
pub(super) fn notify_keepalive(&self) {
log::trace!("keep-alive timeout, notify dispatcher");
let mut flags = self.flags.get();
flags.remove(Flags::KEEPALIVE);
if !flags.contains(Flags::DSP_KEEPALIVE) {
flags.insert(Flags::DSP_KEEPALIVE);
self.flags.set(flags);
self.dispatch_task.wake();
}
self.flags.set(flags);
}
#[inline]
pub(super) fn notify_disconnect(&self) {
let mut on_disconnect = self.on_disconnect.borrow_mut();
for item in &mut *on_disconnect {
if let Some(waker) = item.take() {
waker.wake();
if let Some(on_disconnect) = self.on_disconnect.take() {
for item in on_disconnect.into_iter() {
item.wake();
}
}
}
@ -255,8 +257,8 @@ impl Io {
write_buf: Cell::new(None),
filter: Cell::new(NullFilter::get()),
handle: Cell::new(None),
on_disconnect: RefCell::new(Vec::new()),
keepalive: Cell::new(None),
on_disconnect: Cell::new(None),
keepalive: Cell::new(now()),
});
let filter = Box::new(Base::new(IoRef(inner.clone())));
@ -272,7 +274,7 @@ impl Io {
let hnd = io.start(ReadContext::new(&io_ref), WriteContext::new(&io_ref));
io_ref.0.handle.set(hnd);
Io(io_ref, FilterItem::Ptr(Box::into_raw(filter)))
Io(io_ref, FilterItem::with_filter(filter))
}
}
@ -319,12 +321,12 @@ impl<F> Io<F> {
write_buf: Cell::new(None),
filter: Cell::new(NullFilter::get()),
handle: Cell::new(None),
on_disconnect: RefCell::new(Vec::new()),
keepalive: Cell::new(None),
on_disconnect: Cell::new(None),
keepalive: Cell::new(now()),
});
let state = mem::replace(&mut self.0, IoRef(inner));
let filter = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut()));
let filter = mem::replace(&mut self.1, FilterItem::null());
Self(state, filter)
}
}
@ -346,22 +348,20 @@ impl<F> Io<F> {
#[inline]
/// Start keep-alive timer
pub fn start_keepalive_timer(&self, timeout: time::Duration) {
if let Some(expire) = self.0 .0.keepalive.take() {
timer::unregister(expire, &self.0)
if self.flags().contains(Flags::KEEPALIVE) {
timer::unregister(self.0 .0.keepalive.get(), &self.0);
}
if timeout != time::Duration::ZERO {
self.0
.0
.keepalive
.set(Some(timer::register(timeout, &self.0)));
self.0 .0.insert_flags(Flags::KEEPALIVE);
self.0 .0.keepalive.set(timer::register(timeout, &self.0));
}
}
#[inline]
/// Remove keep-alive timer
pub fn remove_keepalive_timer(&self) {
if let Some(expire) = self.0 .0.keepalive.take() {
timer::unregister(expire, &self.0)
if self.flags().contains(Flags::KEEPALIVE) {
timer::unregister(self.0 .0.keepalive.get(), &self.0)
}
}
@ -375,12 +375,7 @@ impl<F: Filter> Io<F> {
#[inline]
/// Get referece to a filter
pub fn filter(&self) -> &F {
if let FilterItem::Ptr(p) = self.1 {
if let Some(r) = unsafe { p.as_ref() } {
return r;
}
}
panic!()
self.1.filter()
}
#[inline]
@ -388,12 +383,7 @@ impl<F: Filter> Io<F> {
pub fn seal(mut self) -> Io<Sealed> {
// get current filter
let filter = unsafe {
let item = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut()));
let filter: Sealed = match item {
FilterItem::Boxed(b) => b,
FilterItem::Ptr(p) => Sealed(Box::new(*Box::from_raw(p))),
};
let filter = self.1.seal();
let filter_ref: &'static dyn Filter = {
let filter: &dyn Filter = filter.0.as_ref();
std::mem::transmute(filter)
@ -402,7 +392,7 @@ impl<F: Filter> Io<F> {
filter
};
Io(self.0.clone(), FilterItem::Boxed(filter))
Io(self.0.clone(), FilterItem::with_sealed(filter))
}
#[inline]
@ -423,14 +413,7 @@ impl<F: Filter> Io<F> {
{
// replace current filter
let filter = unsafe {
let item = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut()));
let filter = match item {
FilterItem::Boxed(_) => panic!(),
FilterItem::Ptr(p) => {
assert!(!p.is_null());
Box::new(map(*Box::from_raw(p))?)
}
};
let filter = Box::new(map(*(self.1.get_filter()))?);
let filter_ref: &'static dyn Filter = {
let filter: &dyn Filter = filter.as_ref();
std::mem::transmute(filter)
@ -439,7 +422,7 @@ impl<F: Filter> Io<F> {
filter
};
Ok(Io(self.0.clone(), FilterItem::Ptr(Box::into_raw(filter))))
Ok(Io(self.0.clone(), FilterItem::with_filter(filter)))
}
}
@ -735,27 +718,139 @@ impl<F> Deref for Io<F> {
impl<F> Drop for Io<F> {
fn drop(&mut self) {
self.remove_keepalive_timer();
if self.1.is_set() {
log::trace!(
"io is dropped, force stopping io streams {:?}",
self.0.flags()
);
if let FilterItem::Ptr(p) = self.1 {
if p.is_null() {
return;
self.force_close();
self.1.drop_filter();
self.0 .0.filter.set(NullFilter::get());
}
log::trace!(
"io is dropped, force stopping io streams {:?}",
self.0.flags()
);
}
}
self.force_close();
self.0 .0.filter.set(NullFilter::get());
let _ = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut()));
unsafe { Box::from_raw(p) };
const KIND_SEALED: u8 = 0b01;
const KIND_PTR: u8 = 0b10;
const KIND_MASK: u8 = 0b11;
const KIND_UNMASK: u8 = !KIND_MASK;
const KIND_MASK_USIZE: usize = 0b11;
const KIND_UNMASK_USIZE: usize = !KIND_MASK_USIZE;
const SEALED_SIZE: usize = mem::size_of::<Sealed>();
#[cfg(target_endian = "little")]
const KIND_IDX: usize = 0;
#[cfg(target_endian = "big")]
const KIND_IDX: usize = SEALED_SIZE - 1;
struct FilterItem<F> {
data: [u8; SEALED_SIZE],
_t: marker::PhantomData<F>,
}
impl<F> FilterItem<F> {
fn null() -> Self {
Self {
data: [0; 16],
_t: marker::PhantomData,
}
}
fn with_filter(f: Box<F>) -> Self {
let mut slf = Self {
data: [0; 16],
_t: marker::PhantomData,
};
unsafe {
let ptr = &mut slf.data as *mut _ as *mut *mut F;
ptr.write(Box::into_raw(f));
slf.data[KIND_IDX] |= KIND_PTR;
}
slf
}
fn with_sealed(f: Sealed) -> Self {
let mut slf = Self {
data: [0; 16],
_t: marker::PhantomData,
};
unsafe {
let ptr = &mut slf.data as *mut _ as *mut Sealed;
ptr.write(f);
slf.data[KIND_IDX] |= KIND_SEALED;
}
slf
}
/// Get filter, panic if it is not filter
fn filter(&self) -> &F {
if self.data[KIND_IDX] & KIND_PTR != 0 {
let ptr = &self.data as *const _ as *const *mut F;
unsafe {
let p = (ptr.read() as *const _ as usize) & KIND_UNMASK_USIZE;
(p as *const F as *mut F).as_ref().unwrap()
}
} else {
log::trace!(
"io is dropped, force stopping io streams {:?}",
self.0.flags()
panic!("Wrong filter item");
}
}
/// Get filter, panic if it is not filter
fn get_filter(&mut self) -> Box<F> {
if self.data[KIND_IDX] & KIND_PTR != 0 {
self.data[KIND_IDX] &= KIND_UNMASK;
let ptr = &mut self.data as *mut _ as *mut *mut F;
unsafe { Box::from_raw(*ptr) }
} else {
panic!(
"Wrong filter item {:?} expected: {:?}",
self.data[KIND_IDX], KIND_PTR
);
}
}
/// Get sealed, panic if it is not sealed
fn get_sealed(&mut self) -> Sealed {
if self.data[KIND_IDX] & KIND_SEALED != 0 {
self.data[KIND_IDX] &= KIND_UNMASK;
let ptr = &mut self.data as *mut _ as *mut Sealed;
unsafe { ptr.read() }
} else {
panic!(
"Wrong filter item {:?} expected: {:?}",
self.data[KIND_IDX], KIND_SEALED
);
}
}
fn is_set(&self) -> bool {
self.data[KIND_IDX] & KIND_MASK != 0
}
fn drop_filter(&mut self) {
if self.data[KIND_IDX] & KIND_PTR != 0 {
self.get_filter();
} else if self.data[KIND_IDX] & KIND_SEALED != 0 {
self.get_sealed();
}
}
}
impl<F: Filter> FilterItem<F> {
fn seal(&mut self) -> Sealed {
if self.data[KIND_IDX] & KIND_PTR != 0 {
Sealed(Box::new(*self.get_filter()))
} else if self.data[KIND_IDX] & KIND_SEALED != 0 {
self.get_sealed()
} else {
panic!(
"Wrong filter item {:?} expected: {:?}",
self.data[KIND_IDX], KIND_PTR
);
self.force_close();
self.0 .0.filter.set(NullFilter::get());
}
}
}
@ -776,10 +871,16 @@ impl OnDisconnect {
let token = if disconnected {
usize::MAX
} else {
let mut on_disconnect = inner.on_disconnect.borrow_mut();
let mut on_disconnect = inner.on_disconnect.take();
let token = if let Some(ref mut on_disconnect) = on_disconnect {
let token = on_disconnect.len();
on_disconnect.push(Some(LocalWaker::default()));
drop(on_disconnect);
on_disconnect.push(LocalWaker::default());
token
} else {
on_disconnect = Some(Box::new(vec![LocalWaker::default()]));
0
};
inner.on_disconnect.set(on_disconnect);
token
};
Self { token, inner }
@ -790,19 +891,15 @@ impl OnDisconnect {
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.token == usize::MAX {
Poll::Ready(())
} else {
let on_disconnect = self.inner.on_disconnect.borrow();
if on_disconnect[self.token].is_some() {
on_disconnect[self.token]
.as_ref()
.unwrap()
.register(cx.waker());
} else if self.inner.flags.get().contains(Flags::IO_STOPPED) {
Poll::Ready(())
} else if let Some(on_disconnect) = self.inner.on_disconnect.take() {
on_disconnect[self.token].register(cx.waker());
Poll::Pending
} else {
Poll::Ready(())
}
}
}
}
impl Clone for OnDisconnect {
@ -823,11 +920,3 @@ impl Future for OnDisconnect {
self.poll_ready(cx)
}
}
impl Drop for OnDisconnect {
fn drop(&mut self) {
if self.token != usize::MAX {
self.inner.on_disconnect.borrow_mut()[self.token].take();
}
}
}

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.12] - 2022-01-27
* Reduce size of Millis
## [0.1.11] - 2022-01-23
* Remove useless stream::Dispatcher and sink::SinkService

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "0.1.11"
version = "0.1.12"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -31,5 +31,4 @@ pin-project-lite = "0.2.6"
ntex = { version = "0.5", features = ["tokio"] }
ntex-bytes = "0.1.9"
ntex-macros = "0.1.3"
derive_more = "0.99"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }

View file

@ -1,5 +1,8 @@
use std::task::{Context, Poll};
use std::{cell::Cell, convert::Infallible, marker, time::Duration, time::Instant};
use std::{
cell::Cell, convert::Infallible, convert::TryInto, marker, time::Duration,
time::Instant,
};
use ntex_service::{Service, ServiceFactory};
@ -103,7 +106,8 @@ where
Poll::Ready(Err((self.f)()))
} else {
let expire = expire - now;
self.sleep.reset(Millis(expire.as_millis() as u64));
self.sleep
.reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX)));
let _ = self.sleep.poll_elapsed(cx);
Poll::Ready(Ok(()))
}

View file

@ -217,10 +217,8 @@ where
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, task::Context, task::Poll, time::Duration};
use derive_more::Display;
use ntex_service::{apply, fn_factory, Service, ServiceFactory};
use super::*;
@ -229,9 +227,15 @@ mod tests {
#[derive(Clone, Debug, PartialEq)]
struct SleepService(Duration);
#[derive(Clone, Debug, Display, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
struct SrvError;
impl fmt::Display for SrvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SrvError")
}
}
impl Service<()> for SleepService {
type Response = ();
type Error = SrvError;

View file

@ -81,7 +81,7 @@ impl Sleep {
#[inline]
pub fn new(duration: Millis) -> Sleep {
Sleep {
hnd: TimerHandle::new(duration.0),
hnd: TimerHandle::new(duration.0 as u64),
}
}
@ -99,7 +99,7 @@ impl Sleep {
/// This function can be called both before and after the future has
/// completed.
pub fn reset<T: Into<Millis>>(&self, millis: T) {
self.hnd.reset(millis.into().0);
self.hnd.reset(millis.into().0 as u64);
}
#[inline]
@ -209,7 +209,7 @@ where
#[derive(Debug)]
pub struct Interval {
hnd: TimerHandle,
period: u64,
period: u32,
}
impl Interval {
@ -217,7 +217,7 @@ impl Interval {
#[inline]
pub fn new(period: Millis) -> Interval {
Interval {
hnd: TimerHandle::new(period.0),
hnd: TimerHandle::new(period.0 as u64),
period: period.0,
}
}
@ -230,7 +230,7 @@ impl Interval {
#[inline]
pub fn poll_tick(&self, cx: &mut task::Context<'_>) -> Poll<()> {
if self.hnd.poll_elapsed(cx).is_ready() {
self.hnd.reset(self.period);
self.hnd.reset(self.period as u64);
Poll::Ready(())
} else {
Poll::Pending
@ -305,7 +305,7 @@ mod tests {
.duration_since(time::SystemTime::UNIX_EPOCH)
.unwrap();
assert!(second_time - first_time >= time::Duration::from_millis(wait_time));
assert!(second_time - first_time >= time::Duration::from_millis(wait_time as u64));
}
#[ntex_macros::rt_test2]

View file

@ -2,7 +2,7 @@ use std::{convert::TryInto, ops};
/// A Duration type to represent a span of time.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Millis(pub u64);
pub struct Millis(pub u32);
impl Millis {
/// Zero milliseconds value
@ -12,8 +12,8 @@ impl Millis {
pub const ONE_SEC: Millis = Millis(1_000);
#[inline]
pub const fn from_secs(secs: u32) -> Millis {
Millis((secs as u64) * 1000)
pub const fn from_secs(secs: u16) -> Millis {
Millis((secs as u32) * 1000)
}
#[inline]
@ -52,7 +52,7 @@ impl ops::Add<Millis> for Millis {
#[inline]
fn add(self, other: Millis) -> Millis {
Millis(self.0.checked_add(other.0).unwrap_or(u64::MAX))
Millis(self.0.checked_add(other.0).unwrap_or(u32::MAX))
}
}
@ -64,8 +64,8 @@ impl ops::Add<Seconds> for Millis {
fn add(self, other: Seconds) -> Millis {
Millis(
self.0
.checked_add((other.0 as u64) * 1000)
.unwrap_or(u64::MAX),
.checked_add((other.0 as u32).checked_mul(1000).unwrap_or(u32::MAX))
.unwrap_or(u32::MAX),
)
}
}
@ -91,7 +91,7 @@ impl ops::Add<Millis> for std::time::Duration {
impl From<Seconds> for Millis {
#[inline]
fn from(s: Seconds) -> Millis {
Millis((s.0 as u64) * 1000)
Millis((s.0 as u32).checked_mul(1000).unwrap_or(u32::MAX))
}
}
@ -108,7 +108,7 @@ impl From<std::time::Duration> for Millis {
impl From<Millis> for std::time::Duration {
#[inline]
fn from(d: Millis) -> std::time::Duration {
std::time::Duration::from_millis(d.0)
std::time::Duration::from_millis(d.0 as u64)
}
}

View file

@ -1,6 +1,6 @@
# Changes
## [0.5.12] - 2022-01-xx
## [0.5.12] - 2022-01-27
* Replace derive_more with thiserror

View file

@ -52,11 +52,11 @@ ntex-codec = "0.6.1"
ntex-router = "0.5.1"
ntex-service = "0.3.1"
ntex-macros = "0.1.3"
ntex-util = "0.1.10"
ntex-util = "0.1.12"
ntex-bytes = "0.1.10"
ntex-tls = "0.1.2"
ntex-rt = "0.4.3"
ntex-io = "0.1.5"
ntex-io = "0.1.6"
ntex-tokio = "0.1.2"
ntex-glommio = { version = "0.1.0", optional = true }
ntex-async-std = { version = "0.1.0", optional = true }