mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 21:07:39 +03:00
Optimize io layout (#388)
This commit is contained in:
parent
d81e089059
commit
f5bad7bfa0
8 changed files with 215 additions and 168 deletions
|
@ -1,5 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## [2.1.0] - 2024-07-30
|
||||
|
||||
* Optimize `Io` layout
|
||||
|
||||
## [2.0.0] - 2024-05-28
|
||||
|
||||
* Use async fn for Service::ready() and Service::shutdown()
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "ntex-io"
|
||||
version = "2.0.0"
|
||||
version = "2.1.0"
|
||||
authors = ["ntex contributors <team@ntex.rs>"]
|
||||
description = "Utilities for encoding and decoding frames"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@ -18,7 +18,7 @@ path = "src/lib.rs"
|
|||
[dependencies]
|
||||
ntex-codec = "0.6.2"
|
||||
ntex-bytes = "0.1.24"
|
||||
ntex-util = "2.0"
|
||||
ntex-util = "2.2"
|
||||
ntex-service = "3.0"
|
||||
|
||||
bitflags = "2"
|
||||
|
|
|
@ -295,6 +295,12 @@ pub struct ReadBuf<'a> {
|
|||
}
|
||||
|
||||
impl<'a> ReadBuf<'a> {
|
||||
#[inline]
|
||||
/// Get io tag
|
||||
pub fn tag(&self) -> &'static str {
|
||||
self.io.tag()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Get number of newly added bytes
|
||||
pub fn nbytes(&self) -> usize {
|
||||
|
@ -433,6 +439,12 @@ pub struct WriteBuf<'a> {
|
|||
}
|
||||
|
||||
impl<'a> WriteBuf<'a> {
|
||||
#[inline]
|
||||
/// Get io tag
|
||||
pub fn tag(&self) -> &'static str {
|
||||
self.io.tag()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Initiate graceful io stream shutdown
|
||||
pub fn want_shutdown(&self) {
|
||||
|
|
|
@ -726,7 +726,6 @@ mod tests {
|
|||
|
||||
#[ntex::test]
|
||||
async fn test_basic() {
|
||||
let _ = env_logger::try_init();
|
||||
let (client, server) = IoTest::create();
|
||||
client.remote_buffer_cap(1024);
|
||||
client.write("GET /test HTTP/1\r\n\r\n");
|
||||
|
|
|
@ -26,7 +26,7 @@ pub(crate) struct NullFilter;
|
|||
const NULL: NullFilter = NullFilter;
|
||||
|
||||
impl NullFilter {
|
||||
pub(super) fn get() -> &'static dyn Filter {
|
||||
pub(super) const fn get() -> &'static dyn Filter {
|
||||
&NULL
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::cell::Cell;
|
||||
use std::cell::{Cell, UnsafeCell};
|
||||
use std::future::{poll_fn, Future};
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc};
|
||||
|
@ -50,12 +50,13 @@ bitflags::bitflags! {
|
|||
}
|
||||
|
||||
/// Interface object to underlying io stream
|
||||
pub struct Io<F = Base>(pub(super) IoRef, FilterItem<F>);
|
||||
pub struct Io<F = Base>(UnsafeCell<IoRef>, marker::PhantomData<F>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IoRef(pub(super) Rc<IoState>);
|
||||
|
||||
pub(crate) struct IoState {
|
||||
filter: FilterPtr,
|
||||
pub(super) flags: Cell<Flags>,
|
||||
pub(super) pool: Cell<PoolRef>,
|
||||
pub(super) disconnect_timeout: Cell<Seconds>,
|
||||
|
@ -64,7 +65,6 @@ pub(crate) struct IoState {
|
|||
pub(super) write_task: LocalWaker,
|
||||
pub(super) dispatch_task: LocalWaker,
|
||||
pub(super) buffer: Stack,
|
||||
pub(super) filter: Cell<&'static dyn Filter>,
|
||||
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
|
||||
pub(super) timeout: Cell<TimerHandle>,
|
||||
pub(super) tag: Cell<&'static str>,
|
||||
|
@ -75,6 +75,10 @@ pub(crate) struct IoState {
|
|||
const DEFAULT_TAG: &str = "IO";
|
||||
|
||||
impl IoState {
|
||||
pub(super) fn filter(&self) -> &dyn Filter {
|
||||
self.filter.filter.get()
|
||||
}
|
||||
|
||||
pub(super) fn insert_flags(&self, f: Flags) {
|
||||
let mut flags = self.flags.get();
|
||||
flags.insert(f);
|
||||
|
@ -171,7 +175,7 @@ impl fmt::Debug for IoState {
|
|||
let res = f
|
||||
.debug_struct("IoState")
|
||||
.field("flags", &self.flags)
|
||||
.field("pool", &self.pool)
|
||||
.field("filter", &self.filter.is_set())
|
||||
.field("disconnect_timeout", &self.disconnect_timeout)
|
||||
.field("timeout", &self.timeout)
|
||||
.field("error", &err)
|
||||
|
@ -193,27 +197,21 @@ impl Io {
|
|||
/// Create `Io` instance in specific memory pool.
|
||||
pub fn with_memory_pool<I: IoStream>(io: I, pool: PoolRef) -> Self {
|
||||
let inner = Rc::new(IoState {
|
||||
filter: FilterPtr::null(),
|
||||
pool: Cell::new(pool),
|
||||
flags: Cell::new(Flags::empty()),
|
||||
error: Cell::new(None),
|
||||
disconnect_timeout: Cell::new(Seconds(1)),
|
||||
dispatch_task: LocalWaker::new(),
|
||||
read_task: LocalWaker::new(),
|
||||
write_task: LocalWaker::new(),
|
||||
buffer: Stack::new(),
|
||||
filter: Cell::new(NullFilter::get()),
|
||||
handle: Cell::new(None),
|
||||
timeout: Cell::new(TimerHandle::default()),
|
||||
disconnect_timeout: Cell::new(Seconds(1)),
|
||||
on_disconnect: Cell::new(None),
|
||||
tag: Cell::new(DEFAULT_TAG),
|
||||
});
|
||||
|
||||
let filter = Box::new(Base::new(IoRef(inner.clone())));
|
||||
let filter_ref: &'static dyn Filter = unsafe {
|
||||
let filter: &dyn Filter = filter.as_ref();
|
||||
std::mem::transmute(filter)
|
||||
};
|
||||
inner.filter.replace(filter_ref);
|
||||
inner.filter.update(Base::new(IoRef(inner.clone())));
|
||||
|
||||
let io_ref = IoRef(inner);
|
||||
|
||||
|
@ -221,7 +219,7 @@ impl Io {
|
|||
let hnd = io.start(ReadContext::new(&io_ref), WriteContext::new(&io_ref));
|
||||
io_ref.0.handle.set(hnd);
|
||||
|
||||
Io(io_ref, FilterItem::with_filter(filter))
|
||||
Io(UnsafeCell::new(io_ref), marker::PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,23 +227,28 @@ impl<F> Io<F> {
|
|||
#[inline]
|
||||
/// Set memory pool
|
||||
pub fn set_memory_pool(&self, pool: PoolRef) {
|
||||
self.0 .0.buffer.set_memory_pool(pool);
|
||||
self.0 .0.pool.set(pool);
|
||||
self.st().buffer.set_memory_pool(pool);
|
||||
self.st().pool.set(pool);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Set io disconnect timeout in millis
|
||||
pub fn set_disconnect_timeout(&self, timeout: Seconds) {
|
||||
self.0 .0.disconnect_timeout.set(timeout);
|
||||
self.st().disconnect_timeout.set(timeout);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Clone current io object.
|
||||
///
|
||||
/// Current io object becomes closed.
|
||||
pub fn take(&mut self) -> Self {
|
||||
pub fn take(&self) -> Self {
|
||||
Self(UnsafeCell::new(self.take_io_ref()), marker::PhantomData)
|
||||
}
|
||||
|
||||
fn take_io_ref(&self) -> IoRef {
|
||||
let inner = Rc::new(IoState {
|
||||
pool: self.0 .0.pool.clone(),
|
||||
filter: FilterPtr::null(),
|
||||
pool: self.st().pool.clone(),
|
||||
flags: Cell::new(
|
||||
Flags::DSP_STOP
|
||||
| Flags::IO_STOPPED
|
||||
|
@ -258,16 +261,12 @@ impl<F> Io<F> {
|
|||
read_task: LocalWaker::new(),
|
||||
write_task: LocalWaker::new(),
|
||||
buffer: Stack::new(),
|
||||
filter: Cell::new(NullFilter::get()),
|
||||
handle: Cell::new(None),
|
||||
timeout: Cell::new(TimerHandle::default()),
|
||||
on_disconnect: Cell::new(None),
|
||||
tag: Cell::new(DEFAULT_TAG),
|
||||
});
|
||||
|
||||
let state = mem::replace(&mut self.0, IoRef(inner));
|
||||
let filter = mem::replace(&mut self.1, FilterItem::null());
|
||||
Self(state, filter)
|
||||
unsafe { mem::replace(&mut *self.0.get(), IoRef(inner)) }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -276,18 +275,26 @@ impl<F> Io<F> {
|
|||
#[doc(hidden)]
|
||||
/// Get current state flags
|
||||
pub fn flags(&self) -> Flags {
|
||||
self.0 .0.flags.get()
|
||||
self.st().flags.get()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Get instance of `IoRef`
|
||||
pub fn get_ref(&self) -> IoRef {
|
||||
self.0.clone()
|
||||
self.io_ref().clone()
|
||||
}
|
||||
|
||||
fn st(&self) -> &IoState {
|
||||
unsafe { &(*self.0.get()).0 }
|
||||
}
|
||||
|
||||
fn io_ref(&self) -> &IoRef {
|
||||
unsafe { &*self.0.get() }
|
||||
}
|
||||
|
||||
/// Get current io error
|
||||
fn error(&self) -> Option<io::Error> {
|
||||
self.0 .0.error.take()
|
||||
self.st().error.take()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -295,39 +302,42 @@ impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
|
|||
#[inline]
|
||||
/// Get referece to a filter
|
||||
pub fn filter(&self) -> &F {
|
||||
&self.1.filter().0
|
||||
&self.st().filter.filter::<Layer<F, T>>().0
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Filter> Io<F> {
|
||||
#[inline]
|
||||
/// Convert current io stream into sealed version
|
||||
pub fn seal(mut self) -> Io<Sealed> {
|
||||
let (filter, filter_ref) = self.1.seal();
|
||||
self.0 .0.filter.replace(filter_ref);
|
||||
Io(self.0.clone(), filter)
|
||||
pub fn seal(self) -> Io<Sealed> {
|
||||
let state = self.take_io_ref();
|
||||
state.0.filter.seal::<F>();
|
||||
|
||||
Io(UnsafeCell::new(state), marker::PhantomData)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Map current filter with new one
|
||||
pub fn add_filter<U>(mut self, nf: U) -> Io<Layer<U, F>>
|
||||
pub fn add_filter<U>(self, nf: U) -> Io<Layer<U, F>>
|
||||
where
|
||||
U: FilterLayer,
|
||||
{
|
||||
let state = self.take_io_ref();
|
||||
|
||||
// add layer to buffers
|
||||
if U::BUFFERS {
|
||||
// Safety: .add_layer() only increases internal buffers
|
||||
// there is no api that holds references into buffers storage
|
||||
// all apis first removes buffer from storage and then work with it
|
||||
unsafe { &mut *(Rc::as_ptr(&self.0 .0) as *mut IoState) }
|
||||
unsafe { &mut *(Rc::as_ptr(&state.0) as *mut IoState) }
|
||||
.buffer
|
||||
.add_layer();
|
||||
}
|
||||
|
||||
// replace current filter
|
||||
let (filter, filter_ref) = self.1.add_filter(nf);
|
||||
self.0 .0.filter.replace(filter_ref);
|
||||
Io(self.0.clone(), filter)
|
||||
state.0.filter.add_filter::<F, U>(nf);
|
||||
|
||||
Io(UnsafeCell::new(state), marker::PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -381,9 +391,10 @@ impl<F> Io<F> {
|
|||
#[inline]
|
||||
/// Pause read task
|
||||
pub fn pause(&self) {
|
||||
if !self.0.flags().contains(Flags::RD_PAUSED) {
|
||||
self.0 .0.read_task.wake();
|
||||
self.0 .0.insert_flags(Flags::RD_PAUSED);
|
||||
let st = self.st();
|
||||
if !st.flags.get().contains(Flags::RD_PAUSED) {
|
||||
st.read_task.wake();
|
||||
st.insert_flags(Flags::RD_PAUSED);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -435,18 +446,19 @@ impl<F> Io<F> {
|
|||
/// `Poll::Ready(Ok(None))` if io stream is disconnected
|
||||
/// `Some(Poll::Ready(Err(e)))` if an error is encountered.
|
||||
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
|
||||
let mut flags = self.0 .0.flags.get();
|
||||
let st = self.st();
|
||||
let mut flags = st.flags.get();
|
||||
|
||||
if flags.contains(Flags::IO_STOPPED) {
|
||||
Poll::Ready(self.error().map(Err).unwrap_or(Ok(None)))
|
||||
} else {
|
||||
self.0 .0.dispatch_task.register(cx.waker());
|
||||
st.dispatch_task.register(cx.waker());
|
||||
|
||||
let ready = flags.contains(Flags::RD_READY);
|
||||
if flags.intersects(Flags::RD_BUF_FULL | Flags::RD_PAUSED) {
|
||||
flags.remove(Flags::RD_READY | Flags::RD_BUF_FULL | Flags::RD_PAUSED);
|
||||
self.0 .0.read_task.wake();
|
||||
self.0 .0.flags.set(flags);
|
||||
st.read_task.wake();
|
||||
st.flags.set(flags);
|
||||
if ready {
|
||||
Poll::Ready(Ok(Some(())))
|
||||
} else {
|
||||
|
@ -454,7 +466,7 @@ impl<F> Io<F> {
|
|||
}
|
||||
} else if ready {
|
||||
flags.remove(Flags::RD_READY);
|
||||
self.0 .0.flags.set(flags);
|
||||
st.flags.set(flags);
|
||||
Poll::Ready(Ok(Some(())))
|
||||
} else {
|
||||
Poll::Pending
|
||||
|
@ -484,10 +496,11 @@ impl<F> Io<F> {
|
|||
let ready = self.poll_read_ready(cx);
|
||||
|
||||
if ready.is_pending() {
|
||||
if self.0 .0.remove_flags(Flags::RD_FORCE_READY) {
|
||||
let st = self.st();
|
||||
if st.remove_flags(Flags::RD_FORCE_READY) {
|
||||
Poll::Ready(Ok(Some(())))
|
||||
} else {
|
||||
self.0 .0.insert_flags(Flags::RD_FORCE_READY);
|
||||
st.insert_flags(Flags::RD_FORCE_READY);
|
||||
Poll::Pending
|
||||
}
|
||||
} else {
|
||||
|
@ -538,14 +551,15 @@ impl<F> Io<F> {
|
|||
if decoded.item.is_some() {
|
||||
Ok(decoded)
|
||||
} else {
|
||||
let flags = self.flags();
|
||||
let st = self.st();
|
||||
let flags = st.flags.get();
|
||||
if flags.contains(Flags::IO_STOPPED) {
|
||||
Err(RecvError::PeerGone(self.error()))
|
||||
} else if flags.contains(Flags::DSP_STOP) {
|
||||
self.0 .0.remove_flags(Flags::DSP_STOP);
|
||||
st.remove_flags(Flags::DSP_STOP);
|
||||
Err(RecvError::Stop)
|
||||
} else if flags.contains(Flags::DSP_TIMEOUT) {
|
||||
self.0 .0.remove_flags(Flags::DSP_TIMEOUT);
|
||||
st.remove_flags(Flags::DSP_TIMEOUT);
|
||||
Err(RecvError::KeepAlive)
|
||||
} else if flags.contains(Flags::WR_BACKPRESSURE) {
|
||||
Err(RecvError::WriteBackpressure)
|
||||
|
@ -579,20 +593,20 @@ impl<F> Io<F> {
|
|||
if flags.contains(Flags::IO_STOPPED) {
|
||||
Poll::Ready(self.error().map(Err).unwrap_or(Ok(())))
|
||||
} else {
|
||||
let inner = &self.0 .0;
|
||||
let len = inner.buffer.write_destination_size();
|
||||
let st = self.st();
|
||||
let len = st.buffer.write_destination_size();
|
||||
if len > 0 {
|
||||
if full {
|
||||
inner.insert_flags(Flags::WR_WAIT);
|
||||
inner.dispatch_task.register(cx.waker());
|
||||
st.insert_flags(Flags::WR_WAIT);
|
||||
st.dispatch_task.register(cx.waker());
|
||||
return Poll::Pending;
|
||||
} else if len >= inner.pool.get().write_params_high() << 1 {
|
||||
inner.insert_flags(Flags::WR_BACKPRESSURE);
|
||||
inner.dispatch_task.register(cx.waker());
|
||||
} else if len >= st.pool.get().write_params_high() << 1 {
|
||||
st.insert_flags(Flags::WR_BACKPRESSURE);
|
||||
st.dispatch_task.register(cx.waker());
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
inner.remove_flags(Flags::WR_WAIT | Flags::WR_BACKPRESSURE);
|
||||
st.remove_flags(Flags::WR_WAIT | Flags::WR_BACKPRESSURE);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
@ -600,7 +614,8 @@ impl<F> Io<F> {
|
|||
#[inline]
|
||||
/// Gracefully shutdown io stream
|
||||
pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let flags = self.flags();
|
||||
let st = self.st();
|
||||
let flags = st.flags.get();
|
||||
|
||||
if flags.intersects(Flags::IO_STOPPED) {
|
||||
if let Some(err) = self.error() {
|
||||
|
@ -610,12 +625,12 @@ impl<F> Io<F> {
|
|||
}
|
||||
} else {
|
||||
if !flags.contains(Flags::IO_STOPPING_FILTERS) {
|
||||
self.0 .0.init_shutdown();
|
||||
st.init_shutdown();
|
||||
}
|
||||
|
||||
self.0 .0.read_task.wake();
|
||||
self.0 .0.write_task.wake();
|
||||
self.0 .0.dispatch_task.register(cx.waker());
|
||||
st.read_task.wake();
|
||||
st.write_task.wake();
|
||||
st.dispatch_task.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
@ -628,7 +643,7 @@ impl<F> Io<F> {
|
|||
self.pause();
|
||||
let result = self.poll_status_update(cx);
|
||||
if !result.is_pending() {
|
||||
self.0 .0.dispatch_task.register(cx.waker());
|
||||
self.st().dispatch_task.register(cx.waker());
|
||||
}
|
||||
result
|
||||
}
|
||||
|
@ -636,19 +651,20 @@ impl<F> Io<F> {
|
|||
#[inline]
|
||||
/// Wait for status updates
|
||||
pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
|
||||
let flags = self.flags();
|
||||
let st = self.st();
|
||||
let flags = st.flags.get();
|
||||
if flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
|
||||
Poll::Ready(IoStatusUpdate::PeerGone(self.error()))
|
||||
} else if flags.contains(Flags::DSP_STOP) {
|
||||
self.0 .0.remove_flags(Flags::DSP_STOP);
|
||||
st.remove_flags(Flags::DSP_STOP);
|
||||
Poll::Ready(IoStatusUpdate::Stop)
|
||||
} else if flags.contains(Flags::DSP_TIMEOUT) {
|
||||
self.0 .0.remove_flags(Flags::DSP_TIMEOUT);
|
||||
st.remove_flags(Flags::DSP_TIMEOUT);
|
||||
Poll::Ready(IoStatusUpdate::KeepAlive)
|
||||
} else if flags.contains(Flags::WR_BACKPRESSURE) {
|
||||
Poll::Ready(IoStatusUpdate::WriteBackpressure)
|
||||
} else {
|
||||
self.0 .0.dispatch_task.register(cx.waker());
|
||||
st.dispatch_task.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
@ -656,14 +672,14 @@ impl<F> Io<F> {
|
|||
#[inline]
|
||||
/// Register dispatch task
|
||||
pub fn poll_dispatch(&self, cx: &mut Context<'_>) {
|
||||
self.0 .0.dispatch_task.register(cx.waker());
|
||||
self.st().dispatch_task.register(cx.waker());
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> AsRef<IoRef> for Io<F> {
|
||||
#[inline]
|
||||
fn as_ref(&self) -> &IoRef {
|
||||
&self.0
|
||||
self.io_ref()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -672,20 +688,20 @@ impl<F> Eq for Io<F> {}
|
|||
impl<F> PartialEq for Io<F> {
|
||||
#[inline]
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.eq(&other.0)
|
||||
self.io_ref().eq(other.io_ref())
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> hash::Hash for Io<F> {
|
||||
#[inline]
|
||||
fn hash<H: hash::Hasher>(&self, state: &mut H) {
|
||||
self.0.hash(state);
|
||||
self.io_ref().hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> fmt::Debug for Io<F> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Io").field("state", &self.0).finish()
|
||||
f.debug_struct("Io").field("state", self.st()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -694,28 +710,28 @@ impl<F> ops::Deref for Io<F> {
|
|||
|
||||
#[inline]
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
self.io_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Drop for Io<F> {
|
||||
fn drop(&mut self) {
|
||||
let st = self.st();
|
||||
self.stop_timer();
|
||||
|
||||
// filter must be dropped, it is unsafe
|
||||
// and wont be dropped without special attention
|
||||
if self.1.is_set() {
|
||||
if !self.0.flags().contains(Flags::IO_STOPPED) {
|
||||
if st.filter.is_set() {
|
||||
// filter is unsafe and must be dropped explicitly,
|
||||
// and wont be dropped without special attention
|
||||
if !st.flags.get().contains(Flags::IO_STOPPED) {
|
||||
log::trace!(
|
||||
"{}: Io is dropped, force stopping io streams {:?}",
|
||||
self.tag(),
|
||||
self.0.flags()
|
||||
st.tag.get(),
|
||||
st.flags.get()
|
||||
);
|
||||
}
|
||||
|
||||
self.force_close();
|
||||
self.1.drop_filter();
|
||||
self.0 .0.filter.set(NullFilter::get());
|
||||
st.filter.drop_filter::<F>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -727,6 +743,7 @@ const KIND_UNMASK: u8 = !KIND_MASK;
|
|||
const KIND_MASK_USIZE: usize = 0b11;
|
||||
const KIND_UNMASK_USIZE: usize = !KIND_MASK_USIZE;
|
||||
const SEALED_SIZE: usize = mem::size_of::<Sealed>();
|
||||
const NULL: [u8; SEALED_SIZE] = [0u8; SEALED_SIZE];
|
||||
|
||||
#[cfg(target_endian = "little")]
|
||||
const KIND_IDX: usize = 0;
|
||||
|
@ -734,37 +751,45 @@ const KIND_IDX: usize = 0;
|
|||
#[cfg(target_endian = "big")]
|
||||
const KIND_IDX: usize = SEALED_SIZE - 1;
|
||||
|
||||
struct FilterItem<F> {
|
||||
data: [u8; SEALED_SIZE],
|
||||
_t: marker::PhantomData<F>,
|
||||
struct FilterPtr {
|
||||
data: Cell<[u8; SEALED_SIZE]>,
|
||||
filter: Cell<&'static dyn Filter>,
|
||||
}
|
||||
|
||||
impl<F> FilterItem<F> {
|
||||
fn null() -> Self {
|
||||
impl FilterPtr {
|
||||
const fn null() -> Self {
|
||||
Self {
|
||||
data: [0; SEALED_SIZE],
|
||||
_t: marker::PhantomData,
|
||||
data: Cell::new(NULL),
|
||||
filter: Cell::new(NullFilter::get()),
|
||||
}
|
||||
}
|
||||
|
||||
fn with_filter(f: Box<F>) -> Self {
|
||||
let mut slf = Self {
|
||||
data: [0; SEALED_SIZE],
|
||||
_t: marker::PhantomData,
|
||||
};
|
||||
|
||||
unsafe {
|
||||
let ptr = &mut slf.data as *mut _ as *mut *mut F;
|
||||
ptr.write(Box::into_raw(f));
|
||||
slf.data[KIND_IDX] |= KIND_PTR;
|
||||
fn update<F: Filter>(&self, filter: F) {
|
||||
if self.is_set() {
|
||||
panic!("Filter is set, must be dropped first");
|
||||
}
|
||||
|
||||
let filter = Box::new(filter);
|
||||
let mut data = NULL;
|
||||
unsafe {
|
||||
let filter_ref: &'static dyn Filter = {
|
||||
let f: &dyn Filter = filter.as_ref();
|
||||
std::mem::transmute(f)
|
||||
};
|
||||
self.filter.set(filter_ref);
|
||||
|
||||
let ptr = &mut data as *mut _ as *mut *mut F;
|
||||
ptr.write(Box::into_raw(filter));
|
||||
data[KIND_IDX] |= KIND_PTR;
|
||||
self.data.set(data);
|
||||
}
|
||||
slf
|
||||
}
|
||||
|
||||
/// Get filter, panic if it is not filter
|
||||
fn filter(&self) -> &F {
|
||||
if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
let ptr = &self.data as *const _ as *const *mut F;
|
||||
fn filter<F: Filter>(&self) -> &F {
|
||||
let data = self.data.get();
|
||||
if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
let ptr = &data as *const _ as *const *mut F;
|
||||
unsafe {
|
||||
let p = (ptr.read() as *const _ as usize) & KIND_UNMASK_USIZE;
|
||||
(p as *const F as *mut F).as_ref().unwrap()
|
||||
|
@ -775,87 +800,99 @@ impl<F> FilterItem<F> {
|
|||
}
|
||||
|
||||
/// Get filter, panic if it is not set
|
||||
fn take_filter(&mut self) -> Box<F> {
|
||||
if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
self.data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut self.data as *mut _ as *mut *mut F;
|
||||
fn take_filter<F>(&self) -> Box<F> {
|
||||
let mut data = self.data.get();
|
||||
if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut data as *mut _ as *mut *mut F;
|
||||
unsafe { Box::from_raw(*ptr) }
|
||||
} else {
|
||||
panic!(
|
||||
"Wrong filter item {:?} expected: {:?}",
|
||||
self.data[KIND_IDX], KIND_PTR
|
||||
data[KIND_IDX], KIND_PTR
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get sealed, panic if it is already sealed
|
||||
fn take_sealed(&mut self) -> Sealed {
|
||||
if self.data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
self.data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut self.data as *mut _ as *mut Sealed;
|
||||
fn take_sealed(&self) -> Sealed {
|
||||
let mut data = self.data.get();
|
||||
|
||||
if data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
data[KIND_IDX] &= KIND_UNMASK;
|
||||
let ptr = &mut data as *mut _ as *mut Sealed;
|
||||
unsafe { ptr.read() }
|
||||
} else {
|
||||
panic!(
|
||||
"Wrong filter item {:?} expected: {:?}",
|
||||
self.data[KIND_IDX], KIND_SEALED
|
||||
data[KIND_IDX], KIND_SEALED
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn is_set(&self) -> bool {
|
||||
self.data[KIND_IDX] & KIND_MASK != 0
|
||||
self.data.get()[KIND_IDX] & KIND_MASK != 0
|
||||
}
|
||||
|
||||
fn drop_filter(&mut self) {
|
||||
if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
self.take_filter();
|
||||
} else if self.data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
self.take_sealed();
|
||||
fn drop_filter<F>(&self) {
|
||||
let data = self.data.get();
|
||||
|
||||
if data[KIND_IDX] & KIND_MASK != 0 {
|
||||
if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
self.take_filter::<F>();
|
||||
} else if data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
self.take_sealed();
|
||||
}
|
||||
self.data.set(NULL);
|
||||
self.filter.set(NullFilter::get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Filter> FilterItem<F> {
|
||||
fn add_filter<T: FilterLayer>(
|
||||
&mut self,
|
||||
new: T,
|
||||
) -> (FilterItem<Layer<T, F>>, &'static dyn Filter) {
|
||||
let filter = Box::new(Layer::new(new, *self.take_filter()));
|
||||
let filter_ref: &'static dyn Filter = {
|
||||
let filter: &dyn Filter = filter.as_ref();
|
||||
unsafe { std::mem::transmute(filter) }
|
||||
};
|
||||
(FilterItem::with_filter(filter), filter_ref)
|
||||
impl FilterPtr {
|
||||
fn add_filter<F: Filter, T: FilterLayer>(&self, new: T) {
|
||||
let mut data = NULL;
|
||||
let filter = Box::new(Layer::new(new, *self.take_filter::<F>()));
|
||||
unsafe {
|
||||
let filter_ref: &'static dyn Filter = {
|
||||
let f: &dyn Filter = filter.as_ref();
|
||||
std::mem::transmute(f)
|
||||
};
|
||||
self.filter.set(filter_ref);
|
||||
|
||||
let ptr = &mut data as *mut _ as *mut *mut Layer<T, F>;
|
||||
ptr.write(Box::into_raw(filter));
|
||||
data[KIND_IDX] |= KIND_PTR;
|
||||
self.data.set(data);
|
||||
}
|
||||
}
|
||||
|
||||
fn seal(&mut self) -> (FilterItem<Sealed>, &'static dyn Filter) {
|
||||
let filter = if self.data[KIND_IDX] & KIND_PTR != 0 {
|
||||
Sealed(Box::new(*self.take_filter()))
|
||||
} else if self.data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
fn seal<F: Filter>(&self) {
|
||||
let mut data = self.data.get();
|
||||
|
||||
let filter = if data[KIND_IDX] & KIND_PTR != 0 {
|
||||
Sealed(Box::new(*self.take_filter::<F>()))
|
||||
} else if data[KIND_IDX] & KIND_SEALED != 0 {
|
||||
self.take_sealed()
|
||||
} else {
|
||||
panic!(
|
||||
"Wrong filter item {:?} expected: {:?}",
|
||||
self.data[KIND_IDX], KIND_PTR
|
||||
data[KIND_IDX], KIND_PTR
|
||||
);
|
||||
};
|
||||
|
||||
let filter_ref: &'static dyn Filter = {
|
||||
let filter: &dyn Filter = filter.0.as_ref();
|
||||
unsafe { std::mem::transmute(filter) }
|
||||
};
|
||||
|
||||
let mut slf = FilterItem {
|
||||
data: [0; SEALED_SIZE],
|
||||
_t: marker::PhantomData,
|
||||
};
|
||||
|
||||
unsafe {
|
||||
let ptr = &mut slf.data as *mut _ as *mut Sealed;
|
||||
let filter_ref: &'static dyn Filter = {
|
||||
let f: &dyn Filter = filter.0.as_ref();
|
||||
std::mem::transmute(f)
|
||||
};
|
||||
self.filter.set(filter_ref);
|
||||
|
||||
let ptr = &mut data as *mut _ as *mut Sealed;
|
||||
ptr.write(filter);
|
||||
slf.data[KIND_IDX] |= KIND_SEALED;
|
||||
data[KIND_IDX] |= KIND_SEALED;
|
||||
self.data.set(data);
|
||||
}
|
||||
(slf, filter_ref)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -943,7 +980,7 @@ mod tests {
|
|||
|
||||
let server = Io::new(server);
|
||||
assert!(server.eq(&server));
|
||||
assert!(server.0.eq(&server.0));
|
||||
assert!(server.io_ref().eq(server.io_ref()));
|
||||
|
||||
assert!(format!("{:?}", Flags::IO_STOPPED).contains("IO_STOPPED"));
|
||||
assert!(Flags::IO_STOPPED == Flags::IO_STOPPED);
|
||||
|
@ -957,16 +994,16 @@ mod tests {
|
|||
|
||||
let server = Io::new(server);
|
||||
|
||||
server.0 .0.notify_timeout();
|
||||
server.st().notify_timeout();
|
||||
let err = server.recv(&BytesCodec).await.err().unwrap();
|
||||
assert!(format!("{:?}", err).contains("Timeout"));
|
||||
|
||||
server.0 .0.insert_flags(Flags::DSP_STOP);
|
||||
server.st().insert_flags(Flags::DSP_STOP);
|
||||
let err = server.recv(&BytesCodec).await.err().unwrap();
|
||||
assert!(format!("{:?}", err).contains("Dispatcher stopped"));
|
||||
|
||||
client.write(TEXT);
|
||||
server.0 .0.insert_flags(Flags::WR_BACKPRESSURE);
|
||||
server.st().insert_flags(Flags::WR_BACKPRESSURE);
|
||||
let item = server.recv(&BytesCodec).await.ok().unwrap().unwrap();
|
||||
assert_eq!(item, TEXT);
|
||||
}
|
||||
|
@ -1016,7 +1053,7 @@ mod tests {
|
|||
let (client, server) = IoTest::create();
|
||||
let f = DropFilter { p: p.clone() };
|
||||
let _ = format!("{:?}", f);
|
||||
let mut io = Io::new(server).add_filter(f);
|
||||
let io = Io::new(server).add_filter(f);
|
||||
|
||||
client.remote_buffer_cap(1024);
|
||||
client.write(TEXT);
|
||||
|
|
|
@ -21,9 +21,9 @@ impl IoRef {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
/// Get memory pool
|
||||
/// Get current filter
|
||||
pub(crate) fn filter(&self) -> &dyn Filter {
|
||||
self.0.filter.get()
|
||||
self.0.filter()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -192,10 +192,7 @@ impl IoRef {
|
|||
F: FnOnce(&WriteBuf<'_>) -> R,
|
||||
{
|
||||
let result = self.0.buffer.write_buf(self, 0, f);
|
||||
self.0
|
||||
.filter
|
||||
.get()
|
||||
.process_write_buf(self, &self.0.buffer, 0)?;
|
||||
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
@ -206,10 +203,7 @@ impl IoRef {
|
|||
F: FnOnce(&mut BytesVec) -> R,
|
||||
{
|
||||
let result = self.0.buffer.with_write_source(self, f);
|
||||
self.0
|
||||
.filter
|
||||
.get()
|
||||
.process_write_buf(self, &self.0.buffer, 0)?;
|
||||
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue