Cleanup io

This commit is contained in:
Nikolay Kim 2023-01-26 16:20:08 +01:00
parent 38614715ca
commit 2853a42d13
6 changed files with 70 additions and 83 deletions

View file

@ -3,12 +3,12 @@ use ntex_util::future::Either;
use crate::IoRef;
type CacheLine = (Option<BytesVec>, Option<BytesVec>);
type BufferLine = (Option<BytesVec>, Option<BytesVec>);
#[derive(Debug)]
pub struct Stack {
len: usize,
buffers: Either<[CacheLine; 3], Vec<CacheLine>>,
buffers: Either<[BufferLine; 3], Vec<BufferLine>>,
}
impl Stack {
@ -50,7 +50,7 @@ impl Stack {
fn get_buffers<F, R>(&mut self, idx: usize, f: F) -> R
where
F: FnOnce(&mut CacheLine, &mut CacheLine) -> R,
F: FnOnce(&mut BufferLine, &mut BufferLine) -> R,
{
let buffers = match self.buffers {
Either::Left(ref mut b) => &mut b[..],
@ -72,14 +72,14 @@ impl Stack {
}
}
fn get_first_level(&mut self) -> &mut CacheLine {
fn get_first_level(&mut self) -> &mut BufferLine {
match &mut self.buffers {
Either::Left(b) => &mut b[0],
Either::Right(b) => &mut b[0],
}
}
fn get_last_level(&mut self) -> &mut CacheLine {
fn get_last_level(&mut self) -> &mut BufferLine {
match &mut self.buffers {
Either::Left(b) => &mut b[self.len - 1],
Either::Right(b) => &mut b[self.len - 1],
@ -186,8 +186,8 @@ impl Stack {
#[derive(Debug)]
pub struct ReadBuf<'a> {
pub(crate) io: &'a IoRef,
pub(crate) curr: &'a mut CacheLine,
pub(crate) next: &'a mut CacheLine,
pub(crate) curr: &'a mut BufferLine,
pub(crate) next: &'a mut BufferLine,
pub(crate) nbytes: usize,
pub(crate) need_write: bool,
}
@ -303,8 +303,8 @@ impl<'a> ReadBuf<'a> {
#[derive(Debug)]
pub struct WriteBuf<'a> {
pub(crate) io: &'a IoRef,
pub(crate) curr: &'a mut CacheLine,
pub(crate) next: &'a mut CacheLine,
pub(crate) curr: &'a mut BufferLine,
pub(crate) next: &'a mut BufferLine,
pub(crate) need_write: bool,
}

View file

@ -79,7 +79,7 @@ impl Filter for Base {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
let flags = self.0.flags();
if flags.intersects(Flags::IO_STOPPING) {
if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
Poll::Ready(ReadStatus::Terminate)
} else if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) {
self.0 .0.read_task.register(cx.waker());

View file

@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell};
use std::task::{Context, Poll};
use std::{fmt, future::Future, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc, time};
use ntex_bytes::{BytesVec, PoolId, PoolRef};
use ntex_bytes::{PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder};
use ntex_util::time::{now, Millis};
use ntex_util::{future::poll_fn, future::Either, task::LocalWaker};
@ -71,21 +71,18 @@ pub(crate) struct IoState {
}
impl IoState {
#[inline]
pub(super) fn insert_flags(&self, f: Flags) {
let mut flags = self.flags.get();
flags.insert(f);
self.flags.set(flags);
}
#[inline]
pub(super) fn remove_flags(&self, f: Flags) {
let mut flags = self.flags.get();
flags.remove(f);
self.flags.set(flags);
}
#[inline]
pub(super) fn notify_keepalive(&self) {
log::trace!("keep-alive timeout, notify dispatcher");
let mut flags = self.flags.get();
@ -97,7 +94,6 @@ impl IoState {
self.flags.set(flags);
}
#[inline]
pub(super) fn notify_disconnect(&self) {
if let Some(on_disconnect) = self.on_disconnect.take() {
for item in on_disconnect.into_iter() {
@ -121,10 +117,8 @@ impl IoState {
}
/// Gracefully shutdown read and write io tasks
pub(super) fn init_shutdown(&self, err: Option<io::Error>) {
if err.is_some() {
self.io_stopped(err);
} else if !self
pub(super) fn init_shutdown(&self) {
if !self
.flags
.get()
.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING | Flags::IO_STOPPING_FILTERS)
@ -134,31 +128,6 @@ impl IoState {
self.read_task.wake();
}
}
pub(super) fn with_read_buf<Fn, Ret>(&self, release: bool, f: Fn) -> Ret
where
Fn: FnOnce(&mut Option<BytesVec>) -> Ret,
{
// use top most buffer
let mut buffer = self.buffer.borrow_mut();
let buf = buffer.first_read_buf();
let result = f(buf);
// release buffer
if release && buf.as_ref().map(|b| b.is_empty()).unwrap_or(false) {
if let Some(b) = buf.take() {
self.pool.get().release_read_buf(b);
}
}
result
}
pub(super) fn with_write_buf<Fn, Ret>(&self, f: Fn) -> Ret
where
Fn: FnOnce(&mut Option<BytesVec>) -> Ret,
{
f(self.buffer.borrow_mut().last_write_buf())
}
}
impl Eq for IoState {}
@ -287,6 +256,7 @@ impl<F> Io<F> {
#[inline]
#[doc(hidden)]
#[deprecated]
pub fn remove_keepalive_timer(&self) {
self.stop_keepalive_timer()
}
@ -297,6 +267,14 @@ impl<F> Io<F> {
}
}
impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
#[inline]
/// Get referece to a filter
pub fn filter(&self) -> &F {
&self.1.filter().0
}
}
impl<F: Filter> Io<F> {
#[inline]
/// Convert current io stream into sealed version
@ -324,14 +302,6 @@ impl<F: Filter> Io<F> {
}
}
impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
#[inline]
/// Get referece to a filter
pub fn filter(&self) -> &F {
&self.1.filter().0
}
}
impl<F> Io<F> {
#[inline]
/// Read incoming io stream and decode codec item.
@ -518,25 +488,27 @@ impl<F> Io<F> {
if flags.contains(Flags::IO_STOPPED) {
Poll::Ready(self.error().map(Err).unwrap_or(Ok(())))
} else {
let len = self
.0
.0
.with_write_buf(|buf| buf.as_ref().map(|b| b.len()).unwrap_or(0));
let inner = &self.0 .0;
let len = inner
.buffer
.borrow_mut()
.last_write_buf()
.as_ref()
.map(|b| b.len())
.unwrap_or(0);
if len > 0 {
if full {
self.0 .0.insert_flags(Flags::WR_WAIT);
self.0 .0.dispatch_task.register(cx.waker());
inner.insert_flags(Flags::WR_WAIT);
inner.dispatch_task.register(cx.waker());
return Poll::Pending;
} else if len >= self.0.memory_pool().write_params_high() << 1 {
self.0 .0.insert_flags(Flags::WR_BACKPRESSURE);
self.0 .0.dispatch_task.register(cx.waker());
} else if len >= inner.pool.get().write_params_high() << 1 {
inner.insert_flags(Flags::WR_BACKPRESSURE);
inner.dispatch_task.register(cx.waker());
return Poll::Pending;
}
}
self.0
.0
.remove_flags(Flags::WR_WAIT | Flags::WR_BACKPRESSURE);
inner.remove_flags(Flags::WR_WAIT | Flags::WR_BACKPRESSURE);
Poll::Ready(Ok(()))
}
}
@ -554,7 +526,7 @@ impl<F> Io<F> {
}
} else {
if !flags.contains(Flags::IO_STOPPING_FILTERS) {
self.0 .0.init_shutdown(None);
self.0 .0.init_shutdown();
}
self.0 .0.read_task.wake();
@ -592,6 +564,7 @@ impl<F> Io<F> {
}
impl<F> AsRef<IoRef> for Io<F> {
#[inline]
fn as_ref(&self) -> &IoRef {
&self.0
}

View file

@ -49,7 +49,7 @@ impl IoRef {
/// Notify dispatcher and initiate io stream shutdown process.
pub fn close(&self) {
self.0.insert_flags(Flags::DSP_STOP);
self.0.init_shutdown(None);
self.0.init_shutdown();
}
#[inline]
@ -132,11 +132,13 @@ impl IoRef {
where
U: Decoder,
{
self.0.with_read_buf(false, |buf| {
buf.as_mut()
.map(|b| codec.decode_vec(b))
.unwrap_or(Ok(None))
})
self.0
.buffer
.borrow_mut()
.first_read_buf()
.as_mut()
.map(|b| codec.decode_vec(b))
.unwrap_or(Ok(None))
}
#[inline]
@ -173,9 +175,12 @@ impl IoRef {
where
F: FnOnce(&mut WriteBuf<'_>) -> R,
{
let mut b = self.0.buffer.borrow_mut();
let result = b.write_buf(self, 0, f);
self.0.filter.get().process_write_buf(self, &mut b, 0)?;
let mut buffer = self.0.buffer.borrow_mut();
let result = buffer.write_buf(self, 0, f);
self.0
.filter
.get()
.process_write_buf(self, &mut buffer, 0)?;
Ok(result)
}
@ -185,13 +190,22 @@ impl IoRef {
where
F: FnOnce(&mut BytesVec) -> R,
{
self.0.with_read_buf(true, |buf| {
// set buf
if buf.is_none() {
*buf = Some(self.memory_pool().get_read_buf());
// use top most buffer
let mut buffer = self.0.buffer.borrow_mut();
let buf = buffer.first_read_buf();
if buf.is_none() {
*buf = Some(self.memory_pool().get_read_buf());
}
let result = f(buf.as_mut().unwrap());
// release buffer
if buf.as_ref().map(|b| b.is_empty()).unwrap_or(false) {
if let Some(b) = buf.take() {
self.0.pool.get().release_read_buf(b);
}
f(buf.as_mut().unwrap())
})
}
result
}
#[inline]

View file

@ -84,7 +84,7 @@ impl ReadContext {
.map_err(|err| {
inner.dispatch_task.wake();
inner.insert_flags(Flags::RD_READY);
inner.init_shutdown(Some(err));
inner.io_stopped(Some(err));
});
}
}

View file

@ -429,7 +429,7 @@ where
fn unregister_keepalive(&mut self) {
if self.flags.contains(Flags::KEEPALIVE_REG) {
self.io.remove_keepalive_timer();
self.io.stop_keepalive_timer();
self.flags.remove(Flags::KEEPALIVE | Flags::KEEPALIVE_REG);
}
}
@ -474,7 +474,7 @@ where
// keep-alive timer
if self.flags.contains(Flags::KEEPALIVE_REG) {
self.flags.remove(Flags::KEEPALIVE_REG);
self.io.remove_keepalive_timer();
self.io.stop_keepalive_timer();
}
// configure request payload