Change io buffers layout (#165)

* Change buffers layout
This commit is contained in:
Nikolay Kim 2023-01-25 22:03:42 +06:00 committed by GitHub
parent 7abd4a61f8
commit 0d5cd049b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 196 additions and 164 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.3] - 2023-01-xx
* Optimize buffers layout
## [0.2.2] - 2023-01-24
* Process write buffer if filter wrote to write buffer during reading

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.2.2"
version = "0.2.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -24,10 +24,12 @@ ntex-service = "1.0.0"
bitflags = "1.3"
log = "0.4"
pin-project-lite = "0.2"
smallvec = "1"
backtrace = "*"
[dev-dependencies]
rand = "0.8"
env_logger = "0.10"
ntex = { version = "0.6.1", features = ["tokio"] }
ntex = { version = "0.6.2", features = ["tokio"] }

View file

@ -1,23 +1,93 @@
use ntex_bytes::{BytesVec, PoolRef};
use smallvec::SmallVec;
use ntex_util::future::Either;
use crate::IoRef;
#[derive(Debug)]
pub struct Stack {
pub(crate) buffers: SmallVec<[(Option<BytesVec>, Option<BytesVec>); 4]>,
len: usize,
buffers: Either<
[(Option<BytesVec>, Option<BytesVec>); 3],
Vec<(Option<BytesVec>, Option<BytesVec>)>,
>,
}
impl Stack {
pub(crate) fn new() -> Self {
let mut buffers = SmallVec::with_capacity(4);
buffers.push((None, None));
Self { buffers }
Self {
len: 1,
buffers: Either::Left(Default::default()),
}
}
pub(crate) fn add_layer(&mut self) {
self.buffers.insert(0, (None, None));
match &mut self.buffers {
Either::Left(b) => {
if self.len == 3 {
// move to vec
let mut vec = vec![(None, None)];
for idx in 0..self.len {
vec.push((b[idx].0.take(), b[idx].1.take()));
}
self.len += 1;
self.buffers = Either::Right(vec);
} else {
let mut idx = self.len;
while idx > 0 {
let item = (b[idx - 1].0.take(), b[idx - 1].1.take());
b[idx] = item;
idx -= 1;
}
b[0] = (None, None);
self.len += 1;
}
}
Either::Right(vec) => {
self.len += 1;
vec.insert(0, (None, None));
}
}
}
fn get_buffers<F, R>(&mut self, idx: usize, f: F) -> R
where
F: FnOnce(
&mut (Option<BytesVec>, Option<BytesVec>),
&mut (Option<BytesVec>, Option<BytesVec>),
) -> R,
{
let buffers = match self.buffers {
Either::Left(ref mut b) => &mut b[..],
Either::Right(ref mut b) => &mut b[..],
};
let pos = idx + 1;
if self.len > pos {
let (curr, next) = buffers.split_at_mut(pos);
f(&mut curr[idx], &mut next[0])
} else {
let mut curr = (buffers[idx].0.take(), None);
let mut next = (None, buffers[idx].1.take());
let result = f(&mut curr, &mut next);
buffers[idx].0 = curr.0;
buffers[idx].1 = next.1;
result
}
}
fn get_first_level(&mut self) -> &mut (Option<BytesVec>, Option<BytesVec>) {
match &mut self.buffers {
Either::Left(b) => &mut b[0],
Either::Right(b) => &mut b[0],
}
}
fn get_last_level(&mut self) -> &mut (Option<BytesVec>, Option<BytesVec>) {
match &mut self.buffers {
Either::Left(b) => &mut b[self.len - 1],
Either::Right(b) => &mut b[self.len - 1],
}
}
pub(crate) fn read_buf<F, R>(
@ -30,116 +100,95 @@ impl Stack {
where
F: FnOnce(&mut ReadBuf<'_>) -> R,
{
let pos = idx + 1;
if self.buffers.len() > pos {
let (curr, next) = self.buffers.split_at_mut(pos);
self.get_buffers(idx, |curr, next| {
let mut buf = ReadBuf {
io,
nbytes,
curr: &mut curr[idx],
next: &mut next[0],
curr,
next,
need_write: false,
};
f(&mut buf)
} else {
let mut val1 = (self.buffers[idx].0.take(), None);
let mut val2 = (None, self.buffers[idx].1.take());
let mut buf = ReadBuf {
io,
nbytes,
curr: &mut val1,
next: &mut val2,
need_write: false,
};
let result = f(&mut buf);
self.buffers[idx].0 = val1.0;
self.buffers[idx].1 = val2.1;
result
}
})
}
pub(crate) fn write_buf<F, R>(&mut self, io: &IoRef, idx: usize, f: F) -> R
where
F: FnOnce(&mut WriteBuf<'_>) -> R,
{
let pos = idx + 1;
if self.buffers.len() > pos {
let (curr, next) = self.buffers.split_at_mut(pos);
self.get_buffers(idx, |curr, next| {
let mut buf = WriteBuf {
io,
curr: &mut curr[idx],
next: &mut next[0],
curr,
next,
need_write: false,
};
f(&mut buf)
} else {
let mut val1 = (self.buffers[idx].0.take(), None);
let mut val2 = (None, self.buffers[idx].1.take());
let mut buf = WriteBuf {
io,
curr: &mut val1,
next: &mut val2,
need_write: false,
};
let result = f(&mut buf);
self.buffers[idx].0 = val1.0;
self.buffers[idx].1 = val2.1;
result
}
})
}
pub(crate) fn first_read_buf_size(&self) -> usize {
self.buffers[0].0.as_ref().map(|b| b.len()).unwrap_or(0)
pub(crate) fn first_read_buf_size(&mut self) -> usize {
self.get_first_level()
.0
.as_ref()
.map(|b| b.len())
.unwrap_or(0)
}
pub(crate) fn first_read_buf(&mut self) -> &mut Option<BytesVec> {
&mut self.buffers[0].0
&mut self.get_first_level().0
}
pub(crate) fn first_write_buf(&mut self, io: &IoRef) -> &mut BytesVec {
if self.buffers[0].1.is_none() {
self.buffers[0].1 = Some(io.memory_pool().get_write_buf());
let item = &mut self.get_first_level().1;
if item.is_none() {
*item = Some(io.memory_pool().get_write_buf());
}
self.buffers[0].1.as_mut().unwrap()
item.as_mut().unwrap()
}
pub(crate) fn last_read_buf(&mut self) -> &mut Option<BytesVec> {
let idx = self.buffers.len() - 1;
&mut self.buffers[idx].0
&mut self.get_last_level().0
}
pub(crate) fn last_write_buf(&mut self) -> &mut Option<BytesVec> {
let idx = self.buffers.len() - 1;
&mut self.buffers[idx].1
&mut self.get_last_level().1
}
pub(crate) fn last_write_buf_size(&self) -> usize {
let idx = self.buffers.len() - 1;
self.buffers[idx].1.as_ref().map(|b| b.len()).unwrap_or(0)
pub(crate) fn last_write_buf_size(&mut self) -> usize {
self.get_last_level()
.1
.as_ref()
.map(|b| b.len())
.unwrap_or(0)
}
pub(crate) fn set_last_write_buf(&mut self, buf: BytesVec) {
let idx = self.buffers.len() - 1;
self.buffers[idx].1 = Some(buf);
*(&mut self.get_last_level().1) = Some(buf);
}
pub(crate) fn release(&mut self, pool: PoolRef) {
for buf in &mut self.buffers {
if let Some(buf) = buf.0.take() {
let items = match &mut self.buffers {
Either::Left(b) => &mut b[..],
Either::Right(b) => &mut b[..],
};
for item in items {
if let Some(buf) = item.0.take() {
pool.release_read_buf(buf);
}
if let Some(buf) = buf.1.take() {
if let Some(buf) = item.1.take() {
pool.release_write_buf(buf);
}
}
}
pub(crate) fn set_memory_pool(&mut self, pool: PoolRef) {
for buf in &mut self.buffers {
let items = match &mut self.buffers {
Either::Left(b) => &mut b[..],
Either::Right(b) => &mut b[..],
};
for buf in items {
if let Some(ref mut b) = buf.0 {
pool.move_vec_in(b);
}

View file

@ -119,7 +119,7 @@ impl IoState {
}
/// Gracefully shutdown read and write io tasks
pub(super) fn init_shutdown(&self, err: Option<io::Error>, io: &IoRef) {
pub(super) fn init_shutdown(&self, err: Option<io::Error>) {
if err.is_some() {
self.io_stopped(err);
} else if !self
@ -129,42 +129,7 @@ impl IoState {
{
log::trace!("initiate io shutdown {:?}", self.flags.get());
self.insert_flags(Flags::IO_STOPPING_FILTERS);
self.shutdown_filters(io);
}
}
pub(super) fn shutdown_filters(&self, io: &IoRef) {
if !self
.flags
.get()
.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING)
{
let mut buffer = self.buffer.borrow_mut();
match self.filter.get().shutdown(io, &mut buffer, 0) {
Ok(Poll::Ready(())) => {
self.read_task.wake();
self.write_task.wake();
self.dispatch_task.wake();
self.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
let flags = self.flags.get();
// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
|| flags.contains(Flags::RD_BUF_FULL | Flags::RD_READY)
{
self.read_task.wake();
self.write_task.wake();
self.dispatch_task.wake();
self.insert_flags(Flags::IO_STOPPING);
}
}
Err(err) => {
self.io_stopped(Some(err));
}
};
self.write_task.wake();
self.read_task.wake();
}
}
@ -587,7 +552,7 @@ impl<F> Io<F> {
}
} else {
if !flags.contains(Flags::IO_STOPPING_FILTERS) {
self.0 .0.init_shutdown(None, &self.0);
self.0 .0.init_shutdown(None);
}
self.0 .0.read_task.wake();

View file

@ -49,7 +49,7 @@ impl IoRef {
/// Notify dispatcher and initiate io stream shutdown process.
pub fn close(&self) {
self.0.insert_flags(Flags::DSP_STOP);
self.0.init_shutdown(None, self);
self.0.init_shutdown(None);
}
#[inline]
@ -81,6 +81,7 @@ impl IoRef {
{
log::trace!("initiate io shutdown {:?}", self.0.flags.get());
self.0.insert_flags(Flags::IO_STOPPING_FILTERS);
self.0.read_task.wake();
}
}

View file

@ -76,11 +76,13 @@ pub trait FilterLayer: 'static {
/// Process write buffer
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> sio::Result<()>;
#[inline]
/// Query internal filter data
fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
None
}
#[inline]
/// Gracefully shutdown filter
fn shutdown(&self, buf: &mut WriteBuf<'_>) -> sio::Result<Poll<()>> {
Ok(Poll::Ready(()))

View file

@ -23,7 +23,8 @@ impl ReadContext {
where
F: FnOnce(&mut BytesVec, usize, usize) -> Poll<io::Result<()>>,
{
let mut stack = self.0 .0.buffer.borrow_mut();
let inner = &self.0 .0;
let mut stack = inner.buffer.borrow_mut();
let is_write_sleep = stack.last_write_buf_size() == 0;
let mut buf = stack
.last_read_buf()
@ -46,85 +47,70 @@ impl ReadContext {
if nbytes > 0 {
let buf_full = nbytes >= hw;
match self
.0
.filter()
.process_read_buf(&self.0, &mut stack, 0, nbytes)
{
Ok(status) => {
let filter = self.0.filter();
let _ = filter.process_read_buf(&self.0, &mut stack, 0, nbytes)
.and_then(|status| {
if status.nbytes > 0 {
if buf_full || stack.first_read_buf_size() >= hw {
log::trace!(
"io read buffer is too large {}, enable read back-pressure",
total2
);
self.0
.0
.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
} else {
self.0 .0.insert_flags(Flags::RD_READY);
inner.insert_flags(Flags::RD_READY);
}
self.0 .0.dispatch_task.wake();
log::trace!(
"new {} bytes available, wakeup dispatcher",
nbytes,
);
inner.dispatch_task.wake();
} else if buf_full {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
self.0 .0.read_task.wake();
inner.read_task.wake();
}
// while reading, filter wrote some data
// in that case filters need to process write buffers
// and potentialy wake write task
if status.need_write {
if let Err(err) =
self.0.filter().process_write_buf(&self.0, &mut stack, 0)
{
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
self.0 .0.init_shutdown(Some(err), &self.0);
}
let result = filter.process_write_buf(&self.0, &mut stack, 0);
if is_write_sleep && stack.last_write_buf_size() != 0 {
self.0 .0.write_task.wake();
inner.write_task.wake();
}
result
} else {
Ok(())
}
}
Err(err) => {
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
self.0 .0.init_shutdown(Some(err), &self.0);
}
}
})
.map_err(|err| {
inner.dispatch_task.wake();
inner.insert_flags(Flags::RD_READY);
inner.init_shutdown(Some(err));
});
}
}
drop(stack);
let result = match result {
match result {
Poll::Ready(Ok(())) => {
self.0 .0.io_stopped(None);
inner.io_stopped(None);
Poll::Ready(())
}
Poll::Ready(Err(e)) => {
self.0 .0.io_stopped(Some(e));
inner.io_stopped(Some(e));
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
};
drop(stack);
if self.0.flags().contains(Flags::IO_STOPPING_FILTERS) {
self.0 .0.shutdown_filters(&self.0);
Poll::Pending => {
if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
shutdown_filters(&self.0);
}
Poll::Pending
}
}
result
}
#[inline]
/// Indicate that io task is stopped
pub fn close(&self, err: Option<io::Error>) {
self.0 .0.io_stopped(err);
}
}
@ -179,10 +165,6 @@ impl WriteContext {
self.0 .0.buffer.borrow_mut().set_last_write_buf(buf);
}
if self.0.flags().contains(Flags::IO_STOPPING_FILTERS) {
self.0 .0.shutdown_filters(&self.0);
}
Ok(())
}
@ -192,3 +174,32 @@ impl WriteContext {
self.0 .0.io_stopped(err);
}
}
fn shutdown_filters(io: &IoRef) {
let st = &io.0;
let flags = st.flags.get();
if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
let mut buffer = st.buffer.borrow_mut();
match io.filter().shutdown(io, &mut buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
|| flags.contains(Flags::RD_BUF_FULL | Flags::RD_READY)
{
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
}
Err(err) => {
st.io_stopped(Some(err));
}
}
st.write_task.wake();
}
}

View file

@ -26,7 +26,7 @@ rustls = ["tls_rust"]
[dependencies]
ntex-bytes = "0.1.19"
ntex-io = "0.2.2"
ntex-io = "0.2.3"
ntex-util = "0.2.0"
ntex-service = "1.0.0"
log = "0.4"

View file

@ -69,7 +69,7 @@ impl Future for ReadTask {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("tokio stream is disconnected");
log::trace!("tcp stream is disconnected");
Poll::Ready(Ok(()))
} else if buf.len() < hw {
continue;
@ -244,9 +244,7 @@ impl Future for WriteTask {
Poll::Pending => {
*count += read_buf.filled().len() as u16;
if *count > 4096 {
log::trace!(
"tokio write task is stopped, too much input"
);
log::trace!("tokio write task is stopped, too much input");
this.state.close(None);
return Poll::Ready(());
}

View file

@ -58,7 +58,7 @@ ntex-util = "0.2.0"
ntex-bytes = "0.1.19"
ntex-h2 = "0.2.1"
ntex-rt = "0.4.7"
ntex-io = "0.2.2"
ntex-io = "0.2.3"
ntex-tls = "0.2.2"
ntex-tokio = { version = "0.2.1", optional = true }
ntex-glommio = { version = "0.2.1", optional = true }