diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 7275df30..d60dc30f 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.3] - 2023-01-xx + +* Optimize buffers layout + ## [0.2.2] - 2023-01-24 * Process write buffer if filter wrote to write buffer during reading diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 73c14813..d115b745 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.2.2" +version = "0.2.3" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -24,10 +24,12 @@ ntex-service = "1.0.0" bitflags = "1.3" log = "0.4" pin-project-lite = "0.2" + smallvec = "1" +backtrace = "*" [dev-dependencies] rand = "0.8" env_logger = "0.10" -ntex = { version = "0.6.1", features = ["tokio"] } +ntex = { version = "0.6.2", features = ["tokio"] } diff --git a/ntex-io/src/buf.rs b/ntex-io/src/buf.rs index be1bb30e..28445fbb 100644 --- a/ntex-io/src/buf.rs +++ b/ntex-io/src/buf.rs @@ -1,23 +1,93 @@ use ntex_bytes::{BytesVec, PoolRef}; -use smallvec::SmallVec; +use ntex_util::future::Either; use crate::IoRef; #[derive(Debug)] pub struct Stack { - pub(crate) buffers: SmallVec<[(Option, Option); 4]>, + len: usize, + buffers: Either< + [(Option, Option); 3], + Vec<(Option, Option)>, + >, } impl Stack { pub(crate) fn new() -> Self { - let mut buffers = SmallVec::with_capacity(4); - buffers.push((None, None)); - - Self { buffers } + Self { + len: 1, + buffers: Either::Left(Default::default()), + } } pub(crate) fn add_layer(&mut self) { - self.buffers.insert(0, (None, None)); + match &mut self.buffers { + Either::Left(b) => { + if self.len == 3 { + // move to vec + let mut vec = vec![(None, None)]; + for idx in 0..self.len { + vec.push((b[idx].0.take(), b[idx].1.take())); + } + self.len += 1; + self.buffers = Either::Right(vec); + } else { + let mut idx = self.len; + while idx > 0 { + let item = (b[idx - 1].0.take(), b[idx - 1].1.take()); + b[idx] = item; + idx -= 1; + } + b[0] = (None, None); + self.len += 1; + } + } + Either::Right(vec) => { + self.len += 1; + vec.insert(0, (None, None)); + } + } + } + + fn get_buffers(&mut self, idx: usize, f: F) -> R + where + F: FnOnce( + &mut (Option, Option), + &mut (Option, Option), + ) -> R, + { + let buffers = match self.buffers { + Either::Left(ref mut b) => &mut b[..], + Either::Right(ref mut b) => &mut b[..], + }; + + let pos = idx + 1; + if self.len > pos { + let (curr, next) = buffers.split_at_mut(pos); + f(&mut curr[idx], &mut next[0]) + } else { + let mut curr = (buffers[idx].0.take(), None); + let mut next = (None, buffers[idx].1.take()); + + let result = f(&mut curr, &mut next); + buffers[idx].0 = curr.0; + buffers[idx].1 = next.1; + result + } + } + + fn get_first_level(&mut self) -> &mut (Option, Option) { + match &mut self.buffers { + Either::Left(b) => &mut b[0], + Either::Right(b) => &mut b[0], + } + } + + fn get_last_level(&mut self) -> &mut (Option, Option) { + match &mut self.buffers { + Either::Left(b) => &mut b[self.len - 1], + Either::Right(b) => &mut b[self.len - 1], + } } pub(crate) fn read_buf( @@ -30,116 +100,95 @@ impl Stack { where F: FnOnce(&mut ReadBuf<'_>) -> R, { - let pos = idx + 1; - if self.buffers.len() > pos { - let (curr, next) = self.buffers.split_at_mut(pos); + self.get_buffers(idx, |curr, next| { let mut buf = ReadBuf { io, nbytes, - curr: &mut curr[idx], - next: &mut next[0], + curr, + next, need_write: false, }; f(&mut buf) - } else { - let mut val1 = (self.buffers[idx].0.take(), None); - let mut val2 = (None, self.buffers[idx].1.take()); - - let mut buf = ReadBuf { - io, - nbytes, - curr: &mut val1, - next: &mut val2, - need_write: false, - }; - let result = f(&mut buf); - - self.buffers[idx].0 = val1.0; - self.buffers[idx].1 = val2.1; - result - } + }) } pub(crate) fn write_buf(&mut self, io: &IoRef, idx: usize, f: F) -> R where F: FnOnce(&mut WriteBuf<'_>) -> R, { - let pos = idx + 1; - if self.buffers.len() > pos { - let (curr, next) = self.buffers.split_at_mut(pos); + self.get_buffers(idx, |curr, next| { let mut buf = WriteBuf { io, - curr: &mut curr[idx], - next: &mut next[0], + curr, + next, need_write: false, }; f(&mut buf) - } else { - let mut val1 = (self.buffers[idx].0.take(), None); - let mut val2 = (None, self.buffers[idx].1.take()); - - let mut buf = WriteBuf { - io, - curr: &mut val1, - next: &mut val2, - need_write: false, - }; - let result = f(&mut buf); - - self.buffers[idx].0 = val1.0; - self.buffers[idx].1 = val2.1; - result - } + }) } - pub(crate) fn first_read_buf_size(&self) -> usize { - self.buffers[0].0.as_ref().map(|b| b.len()).unwrap_or(0) + pub(crate) fn first_read_buf_size(&mut self) -> usize { + self.get_first_level() + .0 + .as_ref() + .map(|b| b.len()) + .unwrap_or(0) } pub(crate) fn first_read_buf(&mut self) -> &mut Option { - &mut self.buffers[0].0 + &mut self.get_first_level().0 } pub(crate) fn first_write_buf(&mut self, io: &IoRef) -> &mut BytesVec { - if self.buffers[0].1.is_none() { - self.buffers[0].1 = Some(io.memory_pool().get_write_buf()); + let item = &mut self.get_first_level().1; + if item.is_none() { + *item = Some(io.memory_pool().get_write_buf()); } - self.buffers[0].1.as_mut().unwrap() + item.as_mut().unwrap() } pub(crate) fn last_read_buf(&mut self) -> &mut Option { - let idx = self.buffers.len() - 1; - &mut self.buffers[idx].0 + &mut self.get_last_level().0 } pub(crate) fn last_write_buf(&mut self) -> &mut Option { - let idx = self.buffers.len() - 1; - &mut self.buffers[idx].1 + &mut self.get_last_level().1 } - pub(crate) fn last_write_buf_size(&self) -> usize { - let idx = self.buffers.len() - 1; - self.buffers[idx].1.as_ref().map(|b| b.len()).unwrap_or(0) + pub(crate) fn last_write_buf_size(&mut self) -> usize { + self.get_last_level() + .1 + .as_ref() + .map(|b| b.len()) + .unwrap_or(0) } pub(crate) fn set_last_write_buf(&mut self, buf: BytesVec) { - let idx = self.buffers.len() - 1; - self.buffers[idx].1 = Some(buf); + *(&mut self.get_last_level().1) = Some(buf); } pub(crate) fn release(&mut self, pool: PoolRef) { - for buf in &mut self.buffers { - if let Some(buf) = buf.0.take() { + let items = match &mut self.buffers { + Either::Left(b) => &mut b[..], + Either::Right(b) => &mut b[..], + }; + + for item in items { + if let Some(buf) = item.0.take() { pool.release_read_buf(buf); } - if let Some(buf) = buf.1.take() { + if let Some(buf) = item.1.take() { pool.release_write_buf(buf); } } } pub(crate) fn set_memory_pool(&mut self, pool: PoolRef) { - for buf in &mut self.buffers { + let items = match &mut self.buffers { + Either::Left(b) => &mut b[..], + Either::Right(b) => &mut b[..], + }; + for buf in items { if let Some(ref mut b) = buf.0 { pool.move_vec_in(b); } diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 362134f6..bba0a34a 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -119,7 +119,7 @@ impl IoState { } /// Gracefully shutdown read and write io tasks - pub(super) fn init_shutdown(&self, err: Option, io: &IoRef) { + pub(super) fn init_shutdown(&self, err: Option) { if err.is_some() { self.io_stopped(err); } else if !self @@ -129,42 +129,7 @@ impl IoState { { log::trace!("initiate io shutdown {:?}", self.flags.get()); self.insert_flags(Flags::IO_STOPPING_FILTERS); - self.shutdown_filters(io); - } - } - - pub(super) fn shutdown_filters(&self, io: &IoRef) { - if !self - .flags - .get() - .intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) - { - let mut buffer = self.buffer.borrow_mut(); - match self.filter.get().shutdown(io, &mut buffer, 0) { - Ok(Poll::Ready(())) => { - self.read_task.wake(); - self.write_task.wake(); - self.dispatch_task.wake(); - self.insert_flags(Flags::IO_STOPPING); - } - Ok(Poll::Pending) => { - let flags = self.flags.get(); - // check read buffer, if buffer is not consumed it is unlikely - // that filter will properly complete shutdown - if flags.contains(Flags::RD_PAUSED) - || flags.contains(Flags::RD_BUF_FULL | Flags::RD_READY) - { - self.read_task.wake(); - self.write_task.wake(); - self.dispatch_task.wake(); - self.insert_flags(Flags::IO_STOPPING); - } - } - Err(err) => { - self.io_stopped(Some(err)); - } - }; - self.write_task.wake(); + self.read_task.wake(); } } @@ -587,7 +552,7 @@ impl Io { } } else { if !flags.contains(Flags::IO_STOPPING_FILTERS) { - self.0 .0.init_shutdown(None, &self.0); + self.0 .0.init_shutdown(None); } self.0 .0.read_task.wake(); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 2fc994b9..2ad56cba 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -49,7 +49,7 @@ impl IoRef { /// Notify dispatcher and initiate io stream shutdown process. pub fn close(&self) { self.0.insert_flags(Flags::DSP_STOP); - self.0.init_shutdown(None, self); + self.0.init_shutdown(None); } #[inline] @@ -81,6 +81,7 @@ impl IoRef { { log::trace!("initiate io shutdown {:?}", self.0.flags.get()); self.0.insert_flags(Flags::IO_STOPPING_FILTERS); + self.0.read_task.wake(); } } diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 87a307b1..533354ff 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -76,11 +76,13 @@ pub trait FilterLayer: 'static { /// Process write buffer fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> sio::Result<()>; + #[inline] /// Query internal filter data fn query(&self, id: TypeId) -> Option> { None } + #[inline] /// Gracefully shutdown filter fn shutdown(&self, buf: &mut WriteBuf<'_>) -> sio::Result> { Ok(Poll::Ready(())) diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 28d8503b..3f114fa0 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -23,7 +23,8 @@ impl ReadContext { where F: FnOnce(&mut BytesVec, usize, usize) -> Poll>, { - let mut stack = self.0 .0.buffer.borrow_mut(); + let inner = &self.0 .0; + let mut stack = inner.buffer.borrow_mut(); let is_write_sleep = stack.last_write_buf_size() == 0; let mut buf = stack .last_read_buf() @@ -46,85 +47,70 @@ impl ReadContext { if nbytes > 0 { let buf_full = nbytes >= hw; - match self - .0 - .filter() - .process_read_buf(&self.0, &mut stack, 0, nbytes) - { - Ok(status) => { + let filter = self.0.filter(); + let _ = filter.process_read_buf(&self.0, &mut stack, 0, nbytes) + .and_then(|status| { if status.nbytes > 0 { if buf_full || stack.first_read_buf_size() >= hw { log::trace!( "io read buffer is too large {}, enable read back-pressure", total2 ); - self.0 - .0 - .insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); + inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); } else { - self.0 .0.insert_flags(Flags::RD_READY); + inner.insert_flags(Flags::RD_READY); } - self.0 .0.dispatch_task.wake(); log::trace!( "new {} bytes available, wakeup dispatcher", nbytes, ); + inner.dispatch_task.wake(); } else if buf_full { // read task is paused because of read back-pressure // but there is no new data in top most read buffer // so we need to wake up read task to read more data // otherwise read task would sleep forever - self.0 .0.read_task.wake(); + inner.read_task.wake(); } // while reading, filter wrote some data // in that case filters need to process write buffers // and potentialy wake write task if status.need_write { - if let Err(err) = - self.0.filter().process_write_buf(&self.0, &mut stack, 0) - { - self.0 .0.dispatch_task.wake(); - self.0 .0.insert_flags(Flags::RD_READY); - self.0 .0.init_shutdown(Some(err), &self.0); - } + let result = filter.process_write_buf(&self.0, &mut stack, 0); if is_write_sleep && stack.last_write_buf_size() != 0 { - self.0 .0.write_task.wake(); + inner.write_task.wake(); } + result + } else { + Ok(()) } - } - Err(err) => { - self.0 .0.dispatch_task.wake(); - self.0 .0.insert_flags(Flags::RD_READY); - self.0 .0.init_shutdown(Some(err), &self.0); - } - } + }) + .map_err(|err| { + inner.dispatch_task.wake(); + inner.insert_flags(Flags::RD_READY); + inner.init_shutdown(Some(err)); + }); } } + drop(stack); - let result = match result { + match result { Poll::Ready(Ok(())) => { - self.0 .0.io_stopped(None); + inner.io_stopped(None); Poll::Ready(()) } Poll::Ready(Err(e)) => { - self.0 .0.io_stopped(Some(e)); + inner.io_stopped(Some(e)); Poll::Ready(()) } - Poll::Pending => Poll::Pending, - }; - - drop(stack); - if self.0.flags().contains(Flags::IO_STOPPING_FILTERS) { - self.0 .0.shutdown_filters(&self.0); + Poll::Pending => { + if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) { + shutdown_filters(&self.0); + } + Poll::Pending + } } - result - } - - #[inline] - /// Indicate that io task is stopped - pub fn close(&self, err: Option) { - self.0 .0.io_stopped(err); } } @@ -179,10 +165,6 @@ impl WriteContext { self.0 .0.buffer.borrow_mut().set_last_write_buf(buf); } - if self.0.flags().contains(Flags::IO_STOPPING_FILTERS) { - self.0 .0.shutdown_filters(&self.0); - } - Ok(()) } @@ -192,3 +174,32 @@ impl WriteContext { self.0 .0.io_stopped(err); } } + +fn shutdown_filters(io: &IoRef) { + let st = &io.0; + let flags = st.flags.get(); + + if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { + let mut buffer = st.buffer.borrow_mut(); + match io.filter().shutdown(io, &mut buffer, 0) { + Ok(Poll::Ready(())) => { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + } + Ok(Poll::Pending) => { + // check read buffer, if buffer is not consumed it is unlikely + // that filter will properly complete shutdown + if flags.contains(Flags::RD_PAUSED) + || flags.contains(Flags::RD_BUF_FULL | Flags::RD_READY) + { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + } + } + Err(err) => { + st.io_stopped(Some(err)); + } + } + st.write_task.wake(); + } +} diff --git a/ntex-tls/Cargo.toml b/ntex-tls/Cargo.toml index 10cffd29..8b8da6d2 100644 --- a/ntex-tls/Cargo.toml +++ b/ntex-tls/Cargo.toml @@ -26,7 +26,7 @@ rustls = ["tls_rust"] [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.2.2" +ntex-io = "0.2.3" ntex-util = "0.2.0" ntex-service = "1.0.0" log = "0.4" diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index fcc02848..e135a150 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -69,7 +69,7 @@ impl Future for ReadTask { Poll::Pending => Poll::Pending, Poll::Ready(Ok(n)) => { if n == 0 { - log::trace!("tokio stream is disconnected"); + log::trace!("tcp stream is disconnected"); Poll::Ready(Ok(())) } else if buf.len() < hw { continue; @@ -244,9 +244,7 @@ impl Future for WriteTask { Poll::Pending => { *count += read_buf.filled().len() as u16; if *count > 4096 { - log::trace!( - "tokio write task is stopped, too much input" - ); + log::trace!("tokio write task is stopped, too much input"); this.state.close(None); return Poll::Ready(()); } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 917c9617..b44ced70 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -58,7 +58,7 @@ ntex-util = "0.2.0" ntex-bytes = "0.1.19" ntex-h2 = "0.2.1" ntex-rt = "0.4.7" -ntex-io = "0.2.2" +ntex-io = "0.2.3" ntex-tls = "0.2.2" ntex-tokio = { version = "0.2.1", optional = true } ntex-glommio = { version = "0.2.1", optional = true }