mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 05:17:39 +03:00
Optimize Io layout
This commit is contained in:
parent
d81e089059
commit
be421c0c85
5 changed files with 161 additions and 111 deletions
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [2.1.0] - 2024-07-30
|
||||
|
||||
* Optimize `Io` layout
|
||||
|
||||
## [2.0.0] - 2024-05-28
|
||||
|
||||
* Use async fn for Service::ready() and Service::shutdown()
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-io"
|
||||
version = "2.0.0"
|
||||
version = "2.1.0"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "Utilities for encoding and decoding frames"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -18,7 +18,7 @@ path = "src/lib.rs"
|
|||
[dependencies]
|
||||
ntex-codec = "0.6.2"
|
||||
ntex-bytes = "0.1.24"
|
||||
ntex-util = "2.0"
|
||||
ntex-util = "2.2"
|
||||
ntex-service = "3.0"
|
||||
|
||||
bitflags = "2"
|
||||
|
|
|
@ -37,6 +37,28 @@ impl Stack {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn take(&self) -> Stack {
|
||||
let buffers = match self.buffers {
|
||||
Either::Left(ref array) => Either::Left([
|
||||
Buffer(Cell::new(array[0].0.take()), Cell::new(array[0].1.take())),
|
||||
Buffer(Cell::new(array[1].0.take()), Cell::new(array[1].1.take())),
|
||||
Buffer(Cell::new(array[2].0.take()), Cell::new(array[2].1.take())),
|
||||
]),
|
||||
Either::Right(ref vec) => {
|
||||
let mut bufs = Vec::with_capacity(self.len);
|
||||
for item in vec {
|
||||
bufs.push(Buffer(Cell::new(item.0.take()), Cell::new(item.1.take())));
|
||||
}
|
||||
Either::Right(bufs)
|
||||
}
|
||||
};
|
||||
|
||||
Stack {
|
||||
buffers,
|
||||
len: self.len,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn add_layer(&mut self) {
|
||||
match &mut self.buffers {
|
||||
Either::Left(b) => {
|
||||
|
|
|
@ -50,12 +50,13 @@ bitflags::bitflags! {
|
|||
}
|
||||
|
||||
/// Interface object to underlying io stream
|
||||
pub struct Io<F = Base>(pub(super) IoRef, FilterItem<F>);
|
||||
pub struct Io<F = Base>(pub(super) IoRef, marker::PhantomData<F>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IoRef(pub(super) Rc<IoState>);
|
||||
|
||||
pub(crate) struct IoState {
|
||||
filter: FilterPtr,
|
||||
pub(super) flags: Cell<Flags>,
|
||||
pub(super) pool: Cell<PoolRef>,
|
||||
pub(super) disconnect_timeout: Cell<Seconds>,
|
||||
|
@ -64,7 +65,6 @@ pub(crate) struct IoState {
|
|||
pub(super) write_task: LocalWaker,
|
||||
pub(super) dispatch_task: LocalWaker,
|
||||
pub(super) buffer: Stack,
|
||||
pub(super) filter: Cell<&'static dyn Filter>,
|
||||
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
|
||||
pub(super) timeout: Cell<TimerHandle>,
|
||||
pub(super) tag: Cell<&'static str>,
|
||||
|
@ -75,6 +75,10 @@ pub(crate) struct IoState {
|
|||
const DEFAULT_TAG: &str = "IO";
|
||||
|
||||
impl IoState {
|
||||
pub(super) fn filter(&self) -> &dyn Filter {
|
||||
self.filter.filter.get()
|
||||
}
|
||||
|
||||
pub(super) fn insert_flags(&self, f: Flags) {
|
||||
let mut flags = self.flags.get();
|
||||
flags.insert(f);
|
||||
|
@ -193,6 +197,7 @@ impl Io {
|
|||
/// Create `Io` instance in specific memory pool.
|
||||
pub fn with_memory_pool<I: IoStream>(io: I, pool: PoolRef) -> Self {
|
||||
let inner = Rc::new(IoState {
|
||||
filter: FilterPtr::null(),
|
||||
pool: Cell::new(pool),
|
||||
flags: Cell::new(Flags::empty()),
|
||||
error: Cell::new(None),
|
||||
|
@ -201,19 +206,12 @@ impl Io {
|
|||
read_task: LocalWaker::new(),
|
||||
write_task: LocalWaker::new(),
|
||||
buffer: Stack::new(),
|
||||
filter: Cell::new(NullFilter::get()),
|
||||
handle: Cell::new(None),
|
||||
timeout: Cell::new(TimerHandle::default()),
|
||||
on_disconnect: Cell::new(None),
|
||||
tag: Cell::new(DEFAULT_TAG),
|
||||
});
|
||||
|
||||
let filter = Box::new(Base::new(IoRef(inner.clone())));
|
||||
let filter_ref: &'static dyn Filter = unsafe {
|
||||
let filter: &dyn Filter = filter.as_ref();
|
||||
std::mem::transmute(filter)
|
||||
};
|
||||
inner.filter.replace(filter_ref);
|
||||
inner.filter.update(Base::new(IoRef(inner.clone())));
|
||||
|
||||
let io_ref = IoRef(inner);
|
||||
|
||||
|
@ -221,7 +219,7 @@ impl Io {
|
|||
let hnd = io.start(ReadContext::new(&io_ref), WriteContext::new(&io_ref));
|
||||
io_ref.0.handle.set(hnd);
|
||||
|
||||
Io(io_ref, FilterItem::with_filter(filter))
|
||||
Io(io_ref, marker::PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,31 +241,34 @@ impl<F> Io<F> {
|
|||
/// Clone current io object.
|
||||
///
|
||||
/// Current io object becomes closed.
|
||||
pub fn take(&mut self) -> Self {
|
||||
let inner = Rc::new(IoState {
|
||||
pool: self.0 .0.pool.clone(),
|
||||
flags: Cell::new(
|
||||
Flags::DSP_STOP
|
||||
| Flags::IO_STOPPED
|
||||
| Flags::IO_STOPPING
|
||||
| Flags::IO_STOPPING_FILTERS,
|
||||
),
|
||||
error: Cell::new(None),
|
||||
disconnect_timeout: Cell::new(Seconds(1)),
|
||||
pub fn take(&self) -> Self {
|
||||
let st = &self.0 .0;
|
||||
let new = Rc::new(IoState {
|
||||
filter: st.filter.take(),
|
||||
pool: st.pool.clone(),
|
||||
flags: Cell::new(st.flags.get()),
|
||||
error: Cell::new(st.error.take()),
|
||||
disconnect_timeout: Cell::new(st.disconnect_timeout.get()),
|
||||
dispatch_task: LocalWaker::new(),
|
||||
read_task: LocalWaker::new(),
|
||||
write_task: LocalWaker::new(),
|
||||
buffer: Stack::new(),
|
||||
filter: Cell::new(NullFilter::get()),
|
||||
handle: Cell::new(None),
|
||||
timeout: Cell::new(TimerHandle::default()),
|
||||
on_disconnect: Cell::new(None),
|
||||
tag: Cell::new(DEFAULT_TAG),
|
||||
buffer: st.buffer.take(),
|
||||
handle: Cell::new(st.handle.take()),
|
||||
timeout: Cell::new(st.timeout.take()),
|
||||
on_disconnect: Cell::new(st.on_disconnect.take()),
|
||||
tag: Cell::new(st.tag.get()),
|
||||
});
|
||||
|
||||
let state = mem::replace(&mut self.0, IoRef(inner));
|
||||
let filter = mem::replace(&mut self.1, FilterItem::null());
|
||||
Self(state, filter)
|
||||
st.flags.set(
|
||||
Flags::DSP_STOP
|
||||
| Flags::IO_STOPPED
|
||||
| Flags::IO_STOPPING
|
||||
| Flags::IO_STOPPING_FILTERS,
|
||||
);
|
||||
st.timeout.set(TimerHandle::default());
|
||||
st.tag.set(DEFAULT_TAG);
|
||||
|
||||
Self(IoRef(new), marker::PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -295,22 +296,22 @@ impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
|
|||
#[inline]
|
||||
/// Get referece to a filter
|
||||
pub fn filter(&self) -> &F {
|
||||
&self.1.filter().0
|
||||
&self.0 .0.filter.filter::<Layer<F, T>>().0
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Filter> Io<F> {
|
||||
#[inline]
|
||||
/// Convert current io stream into sealed version
|
||||
pub fn seal(mut self) -> Io<Sealed> {
|
||||
let (filter, filter_ref) = self.1.seal();
|
||||
self.0 .0.filter.replace(filter_ref);
|
||||
Io(self.0.clone(), filter)
|
||||
pub fn seal(self) -> Io<Sealed> {
|
||||
self.0 .0.filter.seal::<F>();
|
||||
|
||||
Io(self.0.clone(), marker::PhantomData)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Map current filter with new one
|
||||
pub fn add_filter<U>(mut self, nf: U) -> Io<Layer<U, F>>
|
||||
pub fn add_filter<U>(self, nf: U) -> Io<Layer<U, F>>
|
||||
where
|
||||
U: FilterLayer,
|
||||
{
|
||||
|
@ -325,9 +326,9 @@ impl<F: Filter> Io<F> {
|
|||
}
|
||||
|
||||
// replace current filter
|
||||
let (filter, filter_ref) = self.1.add_filter(nf);
|
||||
self.0 .0.filter.replace(filter_ref);
|
||||
Io(self.0.clone(), filter)
|
||||
self.0 .0.filter.add_filter::<F, U>(nf);
|
||||
|
||||
Io(self.0.clone(), marker::PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -704,7 +705,7 @@ impl<F> Drop for Io<F> {
|
|||
|
||||
// filter must be dropped, it is unsafe
|
||||
// and wont be dropped without special attention
|
||||
if self.1.is_set() {
|
||||
if self.0 .0.filter.is_set() || Rc::strong_count(&self.0 .0) == 1 {
|
||||
if !self.0.flags().contains(Flags::IO_STOPPED) {
|
||||
log::trace!(
|
||||
"{}: Io is dropped, force stopping io streams {:?}",
|
||||
|
@ -714,8 +715,7 @@ impl<F> Drop for Io<F> {
|
|||
}
|
||||
|
||||
self.force_close();
|
||||
self.1.drop_filter();
|
||||
self.0 .0.filter.set(NullFilter::get());
|
||||
self.0 .0.filter.drop_filter::<F>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -734,37 +734,57 @@ const KIND_IDX: usize = 0;
|
|||
#[cfg(target_endian = "big")]
|
||||
const KIND_IDX: usize = SEALED_SIZE - 1;
|
||||
|
||||
struct FilterItem<F> {
|
||||
data: [u8; SEALED_SIZE],
|
||||
_t: marker::PhantomData<F>,
|
||||
struct FilterPtr {
|
||||
data: Cell<[u8; SEALED_SIZE]>,
|
||||
filter: Cell<&'static dyn Filter>,
|
||||
}
|
||||
|
||||
impl<F> FilterItem<F> {
|
||||
impl FilterPtr {
|
||||
fn null() -> Self {
|
||||
Self {
|
||||
data: [0; SEALED_SIZE],
|
||||
_t: marker::PhantomData,
|
||||
data: Cell::new([0u8; SEALED_SIZE]),
|
||||
filter: Cell::new(NullFilter::get()),
|
||||
}
|
||||
}
|
||||
|
||||
fn with_filter(f: Box<F>) -> Self {
|
||||
let mut slf = Self {
|
||||
data: [0; SEALED_SIZE],
|
||||
_t: marker::PhantomData,
|
||||
fn take(&self) -> FilterPtr {
|
||||
let new = FilterPtr {
|
||||
data: Cell::new(self.data.get()),
|
||||
filter: Cell::new(self.filter.get()),
|
||||
};
|
||||
|
||||
unsafe {
|
||||
let ptr = &mut slf.data as *mut _ as *mut *mut F;
|
||||
ptr.write(Box::into_raw(f));
|
||||
slf.data[KIND_IDX] |= KIND_PTR;
|
||||
self.data.set([0u8; SEALED_SIZE]);
|
||||
self.filter.set(NullFilter::get());
|
||||
|
||||
new
|
||||
}
|
||||
|
||||
fn update<F: Filter>(&self, filter: F) {
|
||||
if self.is_set() {
|
||||
self.drop_filter::<F>();
|
||||
}
|
||||
slf
|
||||
|
||||
let filter = Box::new(filter);
|
||||
let filter_ref: &'static dyn Filter = unsafe {
|
||||
let filter: &dyn Filter = filter.as_ref();
|
||||
std::mem::transmute(filter)
|
||||
};
|
||||
|
||||
let mut data = [0; SEALED_SIZE];
|
||||
unsafe {
|
||||
let ptr = &mut data as *mut _ as *mut *mut F;
|
||||
ptr.write(Box::into_raw(filter));
|
||||
data[KIND_IDX] |= KIND_PTR;
|
||||
}
|
||||
self.data.set(data);
|
||||
self.filter.set(filter_ref);
|
||||
}
|
||||
|
||||
/// Get filter, panic if it is not filter
|
||||
fn filter(&self) -> &F {
|
||||
if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
let ptr = &self.data as *const _ as *const *mut F;
|
||||
fn filter<F: Filter>(&self) -> &F {
|
||||
let data = self.data.get();
|
||||
if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
let ptr = &data as *const _ as *const *mut F;
|
||||
unsafe {
|
||||
let p = (ptr.read() as *const _ as usize) & KIND_UNMASK_USIZE;
|
||||
(p as *const F as *mut F).as_ref().unwrap()
|
||||
|
@ -775,68 +795,82 @@ impl<F> FilterItem<F> {
|
|||
}
|
||||
|
||||
/// Get filter, panic if it is not set
|
||||
fn take_filter(&mut self) -> Box<F> {
|
||||
if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
self.data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut self.data as *mut _ as *mut *mut F;
|
||||
fn take_filter<F>(&self) -> Box<F> {
|
||||
let mut data = self.data.get();
|
||||
if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut data as *mut _ as *mut *mut F;
|
||||
unsafe { Box::from_raw(*ptr) }
|
||||
} else {
|
||||
panic!(
|
||||
"Wrong filter item {:?} expected: {:?}",
|
||||
self.data[KIND_IDX], KIND_PTR
|
||||
data[KIND_IDX], KIND_PTR
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get sealed, panic if it is already sealed
|
||||
fn take_sealed(&mut self) -> Sealed {
|
||||
if self.data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
self.data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut self.data as *mut _ as *mut Sealed;
|
||||
fn take_sealed(&self) -> Sealed {
|
||||
let mut data = self.data.get();
|
||||
|
||||
if data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut data as *mut _ as *mut Sealed;
|
||||
unsafe { ptr.read() }
|
||||
} else {
|
||||
panic!(
|
||||
"Wrong filter item {:?} expected: {:?}",
|
||||
self.data[KIND_IDX], KIND_SEALED
|
||||
data[KIND_IDX], KIND_SEALED
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn is_set(&self) -> bool {
|
||||
self.data[KIND_IDX] & KIND_MASK != 0
|
||||
self.data.get()[KIND_IDX] & KIND_MASK != 0
|
||||
}
|
||||
|
||||
fn drop_filter(&mut self) {
|
||||
if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
self.take_filter();
|
||||
} else if self.data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
fn drop_filter<F>(&self) {
|
||||
let data = self.data.get();
|
||||
|
||||
if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
self.take_filter::<F>();
|
||||
} else if data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
self.take_sealed();
|
||||
}
|
||||
self.filter.set(NullFilter::get());
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Filter> FilterItem<F> {
|
||||
fn add_filter<T: FilterLayer>(
|
||||
&mut self,
|
||||
new: T,
|
||||
) -> (FilterItem<Layer<T, F>>, &'static dyn Filter) {
|
||||
impl FilterPtr {
|
||||
fn add_filter<F: Filter, T: FilterLayer>(&self, new: T) {
|
||||
let mut data = [0; SEALED_SIZE];
|
||||
|
||||
let filter = Box::new(Layer::new(new, *self.take_filter()));
|
||||
let filter_ref: &'static dyn Filter = {
|
||||
let filter: &dyn Filter = filter.as_ref();
|
||||
unsafe { std::mem::transmute(filter) }
|
||||
};
|
||||
(FilterItem::with_filter(filter), filter_ref)
|
||||
unsafe {
|
||||
let filter_ref: &'static dyn Filter = {
|
||||
let filter: &dyn Filter = filter.as_ref();
|
||||
std::mem::transmute(filter)
|
||||
};
|
||||
self.filter.set(filter_ref);
|
||||
|
||||
let ptr = &mut data as *mut _ as *mut *mut Layer<T, F>;
|
||||
ptr.write(Box::into_raw(filter));
|
||||
data[KIND_IDX] |= KIND_PTR;
|
||||
}
|
||||
self.data.set(data);
|
||||
}
|
||||
|
||||
fn seal(&mut self) -> (FilterItem<Sealed>, &'static dyn Filter) {
|
||||
let filter = if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
Sealed(Box::new(*self.take_filter()))
|
||||
} else if self.data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
fn seal<F: Filter>(&self) {
|
||||
let mut data = self.data.get();
|
||||
|
||||
let filter = if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
Sealed(Box::new(*self.take_filter::<F>()))
|
||||
} else if data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
self.take_sealed()
|
||||
} else {
|
||||
panic!(
|
||||
"Wrong filter item {:?} expected: {:?}",
|
||||
self.data[KIND_IDX], KIND_PTR
|
||||
data[KIND_IDX], KIND_PTR
|
||||
);
|
||||
};
|
||||
|
||||
|
@ -845,17 +879,13 @@ impl<F: Filter> FilterItem<F> {
|
|||
unsafe { std::mem::transmute(filter) }
|
||||
};
|
||||
|
||||
let mut slf = FilterItem {
|
||||
data: [0; SEALED_SIZE],
|
||||
_t: marker::PhantomData,
|
||||
};
|
||||
|
||||
unsafe {
|
||||
let ptr = &mut slf.data as *mut _ as *mut Sealed;
|
||||
let ptr = &mut data as *mut _ as *mut Sealed;
|
||||
ptr.write(filter);
|
||||
slf.data[KIND_IDX] |= KIND_SEALED;
|
||||
data[KIND_IDX] |= KIND_SEALED;
|
||||
}
|
||||
(slf, filter_ref)
|
||||
self.data.set(data);
|
||||
self.filter.set(filter_ref);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,9 @@ impl IoRef {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
/// Get memory pool
|
||||
/// Get current filter
|
||||
pub(crate) fn filter(&self) -> &dyn Filter {
|
||||
self.0.filter.get()
|
||||
self.0.filter()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -192,10 +192,7 @@ impl IoRef {
|
|||
F: FnOnce(&WriteBuf<'_>) -> R,
|
||||
{
|
||||
let result = self.0.buffer.write_buf(self, 0, f);
|
||||
self.0
|
||||
.filter
|
||||
.get()
|
||||
.process_write_buf(self, &self.0.buffer, 0)?;
|
||||
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
@ -206,10 +203,7 @@ impl IoRef {
|
|||
F: FnOnce(&mut BytesVec) -> R,
|
||||
{
|
||||
let result = self.0.buffer.with_write_source(self, f);
|
||||
self.0
|
||||
.filter
|
||||
.get()
|
||||
.process_write_buf(self, &self.0.buffer, 0)?;
|
||||
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue