From cb7af434ea43b1f01c9e9ec9e7a6c6fb592c5766 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 27 Jan 2022 01:25:44 +0600 Subject: [PATCH] Optimize Millis and Io memory layout (#100) * Optimize Millis and Io memory layout --- ntex-io/CHANGES.md | 4 + ntex-io/Cargo.toml | 4 +- ntex-io/src/io.rs | 269 ++++++++++++++++++---------- ntex-util/CHANGES.md | 4 + ntex-util/Cargo.toml | 3 +- ntex-util/src/services/keepalive.rs | 8 +- ntex-util/src/services/timeout.rs | 12 +- ntex-util/src/time/mod.rs | 12 +- ntex-util/src/time/types.rs | 16 +- ntex/CHANGES.md | 2 +- ntex/Cargo.toml | 4 +- 11 files changed, 221 insertions(+), 117 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 9e5b2863..a61b6fc4 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.6] - 2022-01-27 + +* Optimize Io memory layout + ## [0.1.5] - 2022-01-23 * Add Eq,PartialEq,Hash,Debug impls to Io asn IoRef diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 97b34826..c897b0e7 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.1.5" +version = "0.1.6" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.1" ntex-bytes = "0.1.9" -ntex-util = "0.1.9" +ntex-util = "0.1.12" ntex-service = "0.3.1" bitflags = "1.3" diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 86446ed5..5a0e916f 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -1,10 +1,14 @@ -use std::cell::{Cell, RefCell}; +use std::cell::Cell; use std::task::{Context, Poll}; -use std::{fmt, future::Future, hash, io, mem, ops::Deref, pin::Pin, ptr, rc::Rc, time}; +use std::{ + fmt, future::Future, hash, io, marker, mem, ops::Deref, pin::Pin, ptr, rc::Rc, time, +}; use ntex_bytes::{BytesMut, PoolId, PoolRef}; use ntex_codec::{Decoder, Encoder}; -use ntex_util::{future::poll_fn, future::Either, task::LocalWaker, time::Millis}; +use ntex_util::{ + future::poll_fn, future::Either, task::LocalWaker, time::now, time::Millis, +}; use super::filter::{Base, NullFilter}; use super::seal::Sealed; @@ -38,12 +42,10 @@ bitflags::bitflags! { const DSP_STOP = 0b0000_0010_0000_0000; /// keep-alive timeout occured const DSP_KEEPALIVE = 0b0000_0100_0000_0000; - } -} -enum FilterItem { - Boxed(Sealed), - Ptr(*mut F), + /// keep-alive timeout started + const KEEPALIVE = 0b0001_0000_0000_0000; + } } /// Interface object to underlying io stream @@ -64,8 +66,8 @@ pub(crate) struct IoState { pub(super) write_buf: Cell>, pub(super) filter: Cell<&'static dyn Filter>, pub(super) handle: Cell>>, - pub(super) on_disconnect: RefCell>>, - keepalive: Cell>, + pub(super) on_disconnect: Cell>>>, + keepalive: Cell, } impl IoState { @@ -87,19 +89,19 @@ impl IoState { pub(super) fn notify_keepalive(&self) { log::trace!("keep-alive timeout, notify dispatcher"); let mut flags = self.flags.get(); + flags.remove(Flags::KEEPALIVE); if !flags.contains(Flags::DSP_KEEPALIVE) { flags.insert(Flags::DSP_KEEPALIVE); - self.flags.set(flags); self.dispatch_task.wake(); } + self.flags.set(flags); } #[inline] pub(super) fn notify_disconnect(&self) { - let mut on_disconnect = self.on_disconnect.borrow_mut(); - for item in &mut *on_disconnect { - if let Some(waker) = item.take() { - waker.wake(); + if let Some(on_disconnect) = self.on_disconnect.take() { + for item in on_disconnect.into_iter() { + item.wake(); } } } @@ -255,8 +257,8 @@ impl Io { write_buf: Cell::new(None), filter: Cell::new(NullFilter::get()), handle: Cell::new(None), - on_disconnect: RefCell::new(Vec::new()), - keepalive: Cell::new(None), + on_disconnect: Cell::new(None), + keepalive: Cell::new(now()), }); let filter = Box::new(Base::new(IoRef(inner.clone()))); @@ -272,7 +274,7 @@ impl Io { let hnd = io.start(ReadContext::new(&io_ref), WriteContext::new(&io_ref)); io_ref.0.handle.set(hnd); - Io(io_ref, FilterItem::Ptr(Box::into_raw(filter))) + Io(io_ref, FilterItem::with_filter(filter)) } } @@ -319,12 +321,12 @@ impl Io { write_buf: Cell::new(None), filter: Cell::new(NullFilter::get()), handle: Cell::new(None), - on_disconnect: RefCell::new(Vec::new()), - keepalive: Cell::new(None), + on_disconnect: Cell::new(None), + keepalive: Cell::new(now()), }); let state = mem::replace(&mut self.0, IoRef(inner)); - let filter = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut())); + let filter = mem::replace(&mut self.1, FilterItem::null()); Self(state, filter) } } @@ -346,22 +348,20 @@ impl Io { #[inline] /// Start keep-alive timer pub fn start_keepalive_timer(&self, timeout: time::Duration) { - if let Some(expire) = self.0 .0.keepalive.take() { - timer::unregister(expire, &self.0) + if self.flags().contains(Flags::KEEPALIVE) { + timer::unregister(self.0 .0.keepalive.get(), &self.0); } if timeout != time::Duration::ZERO { - self.0 - .0 - .keepalive - .set(Some(timer::register(timeout, &self.0))); + self.0 .0.insert_flags(Flags::KEEPALIVE); + self.0 .0.keepalive.set(timer::register(timeout, &self.0)); } } #[inline] /// Remove keep-alive timer pub fn remove_keepalive_timer(&self) { - if let Some(expire) = self.0 .0.keepalive.take() { - timer::unregister(expire, &self.0) + if self.flags().contains(Flags::KEEPALIVE) { + timer::unregister(self.0 .0.keepalive.get(), &self.0) } } @@ -375,12 +375,7 @@ impl Io { #[inline] /// Get referece to a filter pub fn filter(&self) -> &F { - if let FilterItem::Ptr(p) = self.1 { - if let Some(r) = unsafe { p.as_ref() } { - return r; - } - } - panic!() + self.1.filter() } #[inline] @@ -388,12 +383,7 @@ impl Io { pub fn seal(mut self) -> Io { // get current filter let filter = unsafe { - let item = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut())); - let filter: Sealed = match item { - FilterItem::Boxed(b) => b, - FilterItem::Ptr(p) => Sealed(Box::new(*Box::from_raw(p))), - }; - + let filter = self.1.seal(); let filter_ref: &'static dyn Filter = { let filter: &dyn Filter = filter.0.as_ref(); std::mem::transmute(filter) @@ -402,7 +392,7 @@ impl Io { filter }; - Io(self.0.clone(), FilterItem::Boxed(filter)) + Io(self.0.clone(), FilterItem::with_sealed(filter)) } #[inline] @@ -423,14 +413,7 @@ impl Io { { // replace current filter let filter = unsafe { - let item = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut())); - let filter = match item { - FilterItem::Boxed(_) => panic!(), - FilterItem::Ptr(p) => { - assert!(!p.is_null()); - Box::new(map(*Box::from_raw(p))?) - } - }; + let filter = Box::new(map(*(self.1.get_filter()))?); let filter_ref: &'static dyn Filter = { let filter: &dyn Filter = filter.as_ref(); std::mem::transmute(filter) @@ -439,7 +422,7 @@ impl Io { filter }; - Ok(Io(self.0.clone(), FilterItem::Ptr(Box::into_raw(filter)))) + Ok(Io(self.0.clone(), FilterItem::with_filter(filter))) } } @@ -735,27 +718,139 @@ impl Deref for Io { impl Drop for Io { fn drop(&mut self) { self.remove_keepalive_timer(); + if self.1.is_set() { + log::trace!( + "io is dropped, force stopping io streams {:?}", + self.0.flags() + ); - if let FilterItem::Ptr(p) = self.1 { - if p.is_null() { - return; + self.force_close(); + self.1.drop_filter(); + self.0 .0.filter.set(NullFilter::get()); + } + } +} + +const KIND_SEALED: u8 = 0b01; +const KIND_PTR: u8 = 0b10; +const KIND_MASK: u8 = 0b11; +const KIND_UNMASK: u8 = !KIND_MASK; +const KIND_MASK_USIZE: usize = 0b11; +const KIND_UNMASK_USIZE: usize = !KIND_MASK_USIZE; +const SEALED_SIZE: usize = mem::size_of::(); + +#[cfg(target_endian = "little")] +const KIND_IDX: usize = 0; + +#[cfg(target_endian = "big")] +const KIND_IDX: usize = SEALED_SIZE - 1; + +struct FilterItem { + data: [u8; SEALED_SIZE], + _t: marker::PhantomData, +} + +impl FilterItem { + fn null() -> Self { + Self { + data: [0; 16], + _t: marker::PhantomData, + } + } + + fn with_filter(f: Box) -> Self { + let mut slf = Self { + data: [0; 16], + _t: marker::PhantomData, + }; + + unsafe { + let ptr = &mut slf.data as *mut _ as *mut *mut F; + ptr.write(Box::into_raw(f)); + slf.data[KIND_IDX] |= KIND_PTR; + } + slf + } + + fn with_sealed(f: Sealed) -> Self { + let mut slf = Self { + data: [0; 16], + _t: marker::PhantomData, + }; + + unsafe { + let ptr = &mut slf.data as *mut _ as *mut Sealed; + ptr.write(f); + slf.data[KIND_IDX] |= KIND_SEALED; + } + slf + } + + /// Get filter, panic if it is not filter + fn filter(&self) -> &F { + if self.data[KIND_IDX] & KIND_PTR != 0 { + let ptr = &self.data as *const _ as *const *mut F; + unsafe { + let p = (ptr.read() as *const _ as usize) & KIND_UNMASK_USIZE; + (p as *const F as *mut F).as_ref().unwrap() } - log::trace!( - "io is dropped, force stopping io streams {:?}", - self.0.flags() - ); - - self.force_close(); - self.0 .0.filter.set(NullFilter::get()); - let _ = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut())); - unsafe { Box::from_raw(p) }; } else { - log::trace!( - "io is dropped, force stopping io streams {:?}", - self.0.flags() + panic!("Wrong filter item"); + } + } + + /// Get filter, panic if it is not filter + fn get_filter(&mut self) -> Box { + if self.data[KIND_IDX] & KIND_PTR != 0 { + self.data[KIND_IDX] &= KIND_UNMASK; + let ptr = &mut self.data as *mut _ as *mut *mut F; + unsafe { Box::from_raw(*ptr) } + } else { + panic!( + "Wrong filter item {:?} expected: {:?}", + self.data[KIND_IDX], KIND_PTR + ); + } + } + + /// Get sealed, panic if it is not sealed + fn get_sealed(&mut self) -> Sealed { + if self.data[KIND_IDX] & KIND_SEALED != 0 { + self.data[KIND_IDX] &= KIND_UNMASK; + let ptr = &mut self.data as *mut _ as *mut Sealed; + unsafe { ptr.read() } + } else { + panic!( + "Wrong filter item {:?} expected: {:?}", + self.data[KIND_IDX], KIND_SEALED + ); + } + } + + fn is_set(&self) -> bool { + self.data[KIND_IDX] & KIND_MASK != 0 + } + + fn drop_filter(&mut self) { + if self.data[KIND_IDX] & KIND_PTR != 0 { + self.get_filter(); + } else if self.data[KIND_IDX] & KIND_SEALED != 0 { + self.get_sealed(); + } + } +} + +impl FilterItem { + fn seal(&mut self) -> Sealed { + if self.data[KIND_IDX] & KIND_PTR != 0 { + Sealed(Box::new(*self.get_filter())) + } else if self.data[KIND_IDX] & KIND_SEALED != 0 { + self.get_sealed() + } else { + panic!( + "Wrong filter item {:?} expected: {:?}", + self.data[KIND_IDX], KIND_PTR ); - self.force_close(); - self.0 .0.filter.set(NullFilter::get()); } } } @@ -776,10 +871,16 @@ impl OnDisconnect { let token = if disconnected { usize::MAX } else { - let mut on_disconnect = inner.on_disconnect.borrow_mut(); - let token = on_disconnect.len(); - on_disconnect.push(Some(LocalWaker::default())); - drop(on_disconnect); + let mut on_disconnect = inner.on_disconnect.take(); + let token = if let Some(ref mut on_disconnect) = on_disconnect { + let token = on_disconnect.len(); + on_disconnect.push(LocalWaker::default()); + token + } else { + on_disconnect = Some(Box::new(vec![LocalWaker::default()])); + 0 + }; + inner.on_disconnect.set(on_disconnect); token }; Self { token, inner } @@ -790,17 +891,13 @@ impl OnDisconnect { pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { if self.token == usize::MAX { Poll::Ready(()) + } else if self.inner.flags.get().contains(Flags::IO_STOPPED) { + Poll::Ready(()) + } else if let Some(on_disconnect) = self.inner.on_disconnect.take() { + on_disconnect[self.token].register(cx.waker()); + Poll::Pending } else { - let on_disconnect = self.inner.on_disconnect.borrow(); - if on_disconnect[self.token].is_some() { - on_disconnect[self.token] - .as_ref() - .unwrap() - .register(cx.waker()); - Poll::Pending - } else { - Poll::Ready(()) - } + Poll::Ready(()) } } } @@ -823,11 +920,3 @@ impl Future for OnDisconnect { self.poll_ready(cx) } } - -impl Drop for OnDisconnect { - fn drop(&mut self) { - if self.token != usize::MAX { - self.inner.on_disconnect.borrow_mut()[self.token].take(); - } - } -} diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index dbe9ca11..f2fc4cf8 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.12] - 2022-01-27 + +* Reduce size of Millis + ## [0.1.11] - 2022-01-23 * Remove useless stream::Dispatcher and sink::SinkService diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 11ca939e..62ac2d09 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "0.1.11" +version = "0.1.12" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -31,5 +31,4 @@ pin-project-lite = "0.2.6" ntex = { version = "0.5", features = ["tokio"] } ntex-bytes = "0.1.9" ntex-macros = "0.1.3" -derive_more = "0.99" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex-util/src/services/keepalive.rs b/ntex-util/src/services/keepalive.rs index c04fdadb..cc6169ff 100644 --- a/ntex-util/src/services/keepalive.rs +++ b/ntex-util/src/services/keepalive.rs @@ -1,5 +1,8 @@ use std::task::{Context, Poll}; -use std::{cell::Cell, convert::Infallible, marker, time::Duration, time::Instant}; +use std::{ + cell::Cell, convert::Infallible, convert::TryInto, marker, time::Duration, + time::Instant, +}; use ntex_service::{Service, ServiceFactory}; @@ -103,7 +106,8 @@ where Poll::Ready(Err((self.f)())) } else { let expire = expire - now; - self.sleep.reset(Millis(expire.as_millis() as u64)); + self.sleep + .reset(Millis(expire.as_millis().try_into().unwrap_or(u32::MAX))); let _ = self.sleep.poll_elapsed(cx); Poll::Ready(Ok(())) } diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index 98c30f36..7821b5ed 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -217,10 +217,8 @@ where #[cfg(test)] mod tests { - use std::task::{Context, Poll}; - use std::time::Duration; + use std::{fmt, task::Context, task::Poll, time::Duration}; - use derive_more::Display; use ntex_service::{apply, fn_factory, Service, ServiceFactory}; use super::*; @@ -229,9 +227,15 @@ mod tests { #[derive(Clone, Debug, PartialEq)] struct SleepService(Duration); - #[derive(Clone, Debug, Display, PartialEq)] + #[derive(Clone, Debug, PartialEq)] struct SrvError; + impl fmt::Display for SrvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SrvError") + } + } + impl Service<()> for SleepService { type Response = (); type Error = SrvError; diff --git a/ntex-util/src/time/mod.rs b/ntex-util/src/time/mod.rs index 20a588fc..1efc5198 100644 --- a/ntex-util/src/time/mod.rs +++ b/ntex-util/src/time/mod.rs @@ -81,7 +81,7 @@ impl Sleep { #[inline] pub fn new(duration: Millis) -> Sleep { Sleep { - hnd: TimerHandle::new(duration.0), + hnd: TimerHandle::new(duration.0 as u64), } } @@ -99,7 +99,7 @@ impl Sleep { /// This function can be called both before and after the future has /// completed. pub fn reset>(&self, millis: T) { - self.hnd.reset(millis.into().0); + self.hnd.reset(millis.into().0 as u64); } #[inline] @@ -209,7 +209,7 @@ where #[derive(Debug)] pub struct Interval { hnd: TimerHandle, - period: u64, + period: u32, } impl Interval { @@ -217,7 +217,7 @@ impl Interval { #[inline] pub fn new(period: Millis) -> Interval { Interval { - hnd: TimerHandle::new(period.0), + hnd: TimerHandle::new(period.0 as u64), period: period.0, } } @@ -230,7 +230,7 @@ impl Interval { #[inline] pub fn poll_tick(&self, cx: &mut task::Context<'_>) -> Poll<()> { if self.hnd.poll_elapsed(cx).is_ready() { - self.hnd.reset(self.period); + self.hnd.reset(self.period as u64); Poll::Ready(()) } else { Poll::Pending @@ -305,7 +305,7 @@ mod tests { .duration_since(time::SystemTime::UNIX_EPOCH) .unwrap(); - assert!(second_time - first_time >= time::Duration::from_millis(wait_time)); + assert!(second_time - first_time >= time::Duration::from_millis(wait_time as u64)); } #[ntex_macros::rt_test2] diff --git a/ntex-util/src/time/types.rs b/ntex-util/src/time/types.rs index 2a88fe6a..8bb19241 100644 --- a/ntex-util/src/time/types.rs +++ b/ntex-util/src/time/types.rs @@ -2,7 +2,7 @@ use std::{convert::TryInto, ops}; /// A Duration type to represent a span of time. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Millis(pub u64); +pub struct Millis(pub u32); impl Millis { /// Zero milliseconds value @@ -12,8 +12,8 @@ impl Millis { pub const ONE_SEC: Millis = Millis(1_000); #[inline] - pub const fn from_secs(secs: u32) -> Millis { - Millis((secs as u64) * 1000) + pub const fn from_secs(secs: u16) -> Millis { + Millis((secs as u32) * 1000) } #[inline] @@ -52,7 +52,7 @@ impl ops::Add for Millis { #[inline] fn add(self, other: Millis) -> Millis { - Millis(self.0.checked_add(other.0).unwrap_or(u64::MAX)) + Millis(self.0.checked_add(other.0).unwrap_or(u32::MAX)) } } @@ -64,8 +64,8 @@ impl ops::Add for Millis { fn add(self, other: Seconds) -> Millis { Millis( self.0 - .checked_add((other.0 as u64) * 1000) - .unwrap_or(u64::MAX), + .checked_add((other.0 as u32).checked_mul(1000).unwrap_or(u32::MAX)) + .unwrap_or(u32::MAX), ) } } @@ -91,7 +91,7 @@ impl ops::Add for std::time::Duration { impl From for Millis { #[inline] fn from(s: Seconds) -> Millis { - Millis((s.0 as u64) * 1000) + Millis((s.0 as u32).checked_mul(1000).unwrap_or(u32::MAX)) } } @@ -108,7 +108,7 @@ impl From for Millis { impl From for std::time::Duration { #[inline] fn from(d: Millis) -> std::time::Duration { - std::time::Duration::from_millis(d.0) + std::time::Duration::from_millis(d.0 as u64) } } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index d97cd644..73a0aee3 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,6 +1,6 @@ # Changes -## [0.5.12] - 2022-01-xx +## [0.5.12] - 2022-01-27 * Replace derive_more with thiserror diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index d18388b6..3e615e2a 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -52,11 +52,11 @@ ntex-codec = "0.6.1" ntex-router = "0.5.1" ntex-service = "0.3.1" ntex-macros = "0.1.3" -ntex-util = "0.1.10" +ntex-util = "0.1.12" ntex-bytes = "0.1.10" ntex-tls = "0.1.2" ntex-rt = "0.4.3" -ntex-io = "0.1.5" +ntex-io = "0.1.6" ntex-tokio = "0.1.2" ntex-glommio = { version = "0.1.0", optional = true } ntex-async-std = { version = "0.1.0", optional = true }