diff --git a/ntex-bytes/src/debug.rs b/ntex-bytes/src/debug.rs index cd9df0d9..185f2504 100644 --- a/ntex-bytes/src/debug.rs +++ b/ntex-bytes/src/debug.rs @@ -31,7 +31,7 @@ impl fmt::Debug for BsDebug<'_> { } else if (0x20..0x7f).contains(&c) { write!(fmt, "{}", c as char)?; } else { - write!(fmt, "\\x{:02x}", c)?; + write!(fmt, "\\x{c:02x}")?; } } write!(fmt, "\"")?; diff --git a/ntex-bytes/src/hex.rs b/ntex-bytes/src/hex.rs index 3a6fa5c4..46ad1e1f 100644 --- a/ntex-bytes/src/hex.rs +++ b/ntex-bytes/src/hex.rs @@ -6,7 +6,7 @@ struct BytesRef<'a>(&'a [u8]); impl<'a> LowerHex for BytesRef<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> Result { for b in self.0 { - write!(f, "{:02x}", b)?; + write!(f, "{b:02x}")?; } Ok(()) } @@ -15,7 +15,7 @@ impl<'a> LowerHex for BytesRef<'a> { impl<'a> UpperHex for BytesRef<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> Result { for b in self.0 { - write!(f, "{:02X}", b)?; + write!(f, "{b:02X}")?; } Ok(()) } diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index e4f7968e..50a5671a 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.7] - 2023-01-29 + +* Refactor buffer api + ## [0.2.6] - 2023-01-27 * Add IoRef::with_rw_buf() helper diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index fc53c03e..b0a0e80c 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.2.6" +version = "0.2.7" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -29,4 +29,4 @@ pin-project-lite = "0.2" rand = "0.8" env_logger = "0.10" -ntex = { version = "0.6.2", features = ["tokio"] } +ntex = { version = "0.6.3", features = ["tokio"] } diff --git a/ntex-io/src/buf.rs b/ntex-io/src/buf.rs index 3c530da6..2723d310 100644 --- a/ntex-io/src/buf.rs +++ b/ntex-io/src/buf.rs @@ -1,14 +1,15 @@ +use std::cell::Cell; + use ntex_bytes::{BytesVec, PoolRef}; use ntex_util::future::Either; use crate::IoRef; -type BufferLine = (Option, Option); +type Buffer = (Cell>, Cell>); -#[derive(Debug)] pub struct Stack { len: usize, - buffers: Either<[BufferLine; 3], Vec>, + buffers: Either<[Buffer; 3], Vec>, } impl Stack { @@ -22,153 +23,204 @@ impl Stack { pub(crate) fn add_layer(&mut self) { match &mut self.buffers { Either::Left(b) => { + // move to vec if self.len == 3 { - // move to vec - let mut vec = vec![(None, None)]; + let mut vec = vec![(Cell::new(None), Cell::new(None))]; for item in b.iter_mut().take(self.len) { - vec.push((item.0.take(), item.1.take())); + vec.push((Cell::new(item.0.take()), Cell::new(item.1.take()))); } self.len += 1; self.buffers = Either::Right(vec); } else { let mut idx = self.len; while idx > 0 { - let item = (b[idx - 1].0.take(), b[idx - 1].1.take()); + let item = ( + Cell::new(b[idx - 1].0.take()), + Cell::new(b[idx - 1].1.take()), + ); b[idx] = item; idx -= 1; } - b[0] = (None, None); + b[0] = (Cell::new(None), Cell::new(None)); self.len += 1; } } Either::Right(vec) => { self.len += 1; - vec.insert(0, (None, None)); + vec.insert(0, (Cell::new(None), Cell::new(None))); } } } - fn get_buffers(&mut self, idx: usize, f: F) -> R + fn get_buffers(&self, idx: usize, f: F) -> R where - F: FnOnce(&mut BufferLine, &mut BufferLine) -> R, + F: FnOnce(&Buffer, &Buffer) -> R, { let buffers = match self.buffers { - Either::Left(ref mut b) => &mut b[..], - Either::Right(ref mut b) => &mut b[..], + Either::Left(ref b) => &b[..], + Either::Right(ref b) => &b[..], }; - let pos = idx + 1; - if self.len > pos { - let (curr, next) = buffers.split_at_mut(pos); - f(&mut curr[idx], &mut next[0]) + let next = idx + 1; + if self.len > next { + f(&buffers[idx], &buffers[next]) } else { - let mut curr = (buffers[idx].0.take(), None); - let mut next = (None, buffers[idx].1.take()); + let curr = (Cell::new(buffers[idx].0.take()), Cell::new(None)); + let next = (Cell::new(None), Cell::new(buffers[idx].1.take())); - let result = f(&mut curr, &mut next); - buffers[idx].0 = curr.0; - buffers[idx].1 = next.1; + let result = f(&curr, &next); + buffers[idx].0.set(curr.0.take()); + buffers[idx].1.set(next.1.take()); result } } - fn get_first_level(&mut self) -> &mut BufferLine { - match &mut self.buffers { - Either::Left(b) => &mut b[0], - Either::Right(b) => &mut b[0], + fn get_first_level(&self) -> &Buffer { + match &self.buffers { + Either::Left(b) => &b[0], + Either::Right(b) => &b[0], } } - fn get_last_level(&mut self) -> &mut BufferLine { - match &mut self.buffers { - Either::Left(b) => &mut b[self.len - 1], - Either::Right(b) => &mut b[self.len - 1], + fn get_last_level(&self) -> &Buffer { + match &self.buffers { + Either::Left(b) => &b[self.len - 1], + Either::Right(b) => &b[self.len - 1], } } - pub(crate) fn read_buf( - &mut self, - io: &IoRef, - idx: usize, - nbytes: usize, - f: F, - ) -> R + pub(crate) fn read_buf(&self, io: &IoRef, idx: usize, nbytes: usize, f: F) -> R where - F: FnOnce(&mut ReadBuf<'_>) -> R, + F: FnOnce(&ReadBuf<'_>) -> R, { self.get_buffers(idx, |curr, next| { - let mut buf = ReadBuf { + let buf = ReadBuf { io, nbytes, curr, next, - need_write: false, + need_write: Cell::new(false), }; - f(&mut buf) + f(&buf) }) } - pub(crate) fn write_buf(&mut self, io: &IoRef, idx: usize, f: F) -> R + pub(crate) fn write_buf(&self, io: &IoRef, idx: usize, f: F) -> R where - F: FnOnce(&mut WriteBuf<'_>) -> R, + F: FnOnce(&WriteBuf<'_>) -> R, { self.get_buffers(idx, |curr, next| { - let mut buf = WriteBuf { + let buf = WriteBuf { io, curr, next, - need_write: false, + need_write: Cell::new(false), }; - f(&mut buf) + f(&buf) }) } - pub(crate) fn first_read_buf_size(&mut self) -> usize { - self.get_first_level() - .0 - .as_ref() - .map(|b| b.len()) - .unwrap_or(0) - } - - pub(crate) fn with_rw_buf(&mut self, io: &IoRef, f: F) -> R + pub(crate) fn with_read_source(&self, io: &IoRef, f: F) -> R where - F: FnOnce(&mut BytesVec, &mut BytesVec) -> R, + F: FnOnce(&mut BytesVec) -> R, { - let lvl = self.get_first_level(); - if lvl.0.is_none() { - lvl.0 = Some(io.memory_pool().get_read_buf()); + let item = self.get_last_level(); + let mut rb = item.0.take(); + if rb.is_none() { + rb = Some(io.memory_pool().get_read_buf()); } - if lvl.1.is_none() { - lvl.1 = Some(io.memory_pool().get_write_buf()); + + let result = f(rb.as_mut().unwrap()); + if let Some(b) = rb { + if b.is_empty() { + io.memory_pool().release_read_buf(b); + } else { + item.0.set(Some(b)); + } } - f(lvl.0.as_mut().unwrap(), lvl.1.as_mut().unwrap()) + result } - pub(crate) fn first_read_buf(&mut self) -> &mut Option { - &mut self.get_first_level().0 - } - - pub(crate) fn first_write_buf(&mut self, io: &IoRef) -> &mut BytesVec { - let item = &mut self.get_first_level().1; - if item.is_none() { - *item = Some(io.memory_pool().get_write_buf()); + pub(crate) fn with_read_destination(&self, io: &IoRef, f: F) -> R + where + F: FnOnce(&mut BytesVec) -> R, + { + let item = self.get_first_level(); + let mut rb = item.0.take(); + if rb.is_none() { + rb = Some(io.memory_pool().get_read_buf()); } - item.as_mut().unwrap() + + let result = f(rb.as_mut().unwrap()); + if let Some(b) = rb { + if b.is_empty() { + io.memory_pool().release_read_buf(b); + } else { + item.0.set(Some(b)); + } + } + result } - pub(crate) fn last_read_buf(&mut self) -> &mut Option { - &mut self.get_last_level().0 + pub(crate) fn with_write_source(&self, io: &IoRef, f: F) -> R + where + F: FnOnce(&mut BytesVec) -> R, + { + let item = self.get_first_level(); + let mut wb = item.1.take(); + if wb.is_none() { + wb = Some(io.memory_pool().get_write_buf()); + } + + let result = f(wb.as_mut().unwrap()); + if let Some(b) = wb { + if b.is_empty() { + io.memory_pool().release_write_buf(b); + } else { + item.1.set(Some(b)); + } + } + result } - pub(crate) fn last_write_buf(&mut self) -> &mut Option { - &mut self.get_last_level().1 + pub(crate) fn with_write_destination(&self, io: &IoRef, f: F) -> R + where + F: FnOnce(&mut Option) -> R, + { + let item = self.get_last_level(); + let mut wb = item.1.take(); + + let result = f(&mut wb); + if let Some(b) = wb { + if b.is_empty() { + io.memory_pool().release_write_buf(b); + } else { + item.1.set(Some(b)); + } + } + result } - pub(crate) fn release(&mut self, pool: PoolRef) { - let items = match &mut self.buffers { - Either::Left(b) => &mut b[..], - Either::Right(b) => &mut b[..], + pub(crate) fn read_destination_size(&self) -> usize { + let item = self.get_first_level(); + let rb = item.0.take(); + let size = rb.as_ref().map(|b| b.len()).unwrap_or(0); + item.0.set(rb); + size + } + + pub(crate) fn write_destination_size(&self) -> usize { + let item = self.get_last_level(); + let wb = item.1.take(); + let size = wb.as_ref().map(|b| b.len()).unwrap_or(0); + item.1.set(wb); + size + } + + pub(crate) fn release(&self, pool: PoolRef) { + let items = match &self.buffers { + Either::Left(b) => &b[..], + Either::Right(b) => &b[..], }; for item in items { @@ -181,29 +233,31 @@ impl Stack { } } - pub(crate) fn set_memory_pool(&mut self, pool: PoolRef) { - let items = match &mut self.buffers { - Either::Left(b) => &mut b[..], - Either::Right(b) => &mut b[..], + pub(crate) fn set_memory_pool(&self, pool: PoolRef) { + let items = match &self.buffers { + Either::Left(b) => &b[..], + Either::Right(b) => &b[..], }; - for buf in items { - if let Some(ref mut b) = buf.0 { - pool.move_vec_in(b); + for item in items { + if let Some(mut b) = item.0.take() { + pool.move_vec_in(&mut b); + item.0.set(Some(b)); } - if let Some(ref mut b) = buf.1 { - pool.move_vec_in(b); + if let Some(mut b) = item.1.take() { + pool.move_vec_in(&mut b); + item.1.set(Some(b)); } } } } -#[derive(Debug)] +// #[derive(Debug)] pub struct ReadBuf<'a> { pub(crate) io: &'a IoRef, - pub(crate) curr: &'a mut BufferLine, - pub(crate) next: &'a mut BufferLine, + pub(crate) curr: &'a Buffer, + pub(crate) next: &'a Buffer, pub(crate) nbytes: usize, - pub(crate) need_write: bool, + pub(crate) need_write: Cell, } impl<'a> ReadBuf<'a> { @@ -219,27 +273,68 @@ impl<'a> ReadBuf<'a> { self.io.want_shutdown() } + #[inline] + /// Make sure buffer has enough free space + pub fn resize_buf(&self, buf: &mut BytesVec) { + self.io.memory_pool().resize_read_buf(buf); + } + #[inline] /// Get reference to source read buffer - pub fn get_src(&mut self) -> &mut BytesVec { - if self.next.0.is_none() { - self.next.0 = Some(self.io.memory_pool().get_read_buf()); + pub fn with_src(&self, f: F) -> R + where + F: FnOnce(&mut Option) -> R, + { + let mut item = self.next.0.take(); + let result = f(&mut item); + + if let Some(b) = item { + if b.is_empty() { + self.io.memory_pool().release_read_buf(b); + } else { + self.next.0.set(Some(b)); + } } - self.next.0.as_mut().unwrap() + result + } + + #[inline] + /// Get reference to destination read buffer + pub fn with_dst(&self, f: F) -> R + where + F: FnOnce(&mut BytesVec) -> R, + { + let mut item = self.curr.0.take(); + if item.is_none() { + item = Some(self.io.memory_pool().get_read_buf()); + } + let result = f(item.as_mut().unwrap()); + if let Some(b) = item { + if b.is_empty() { + self.io.memory_pool().release_read_buf(b); + } else { + self.curr.0.set(Some(b)); + } + } + result } #[inline] /// Take source read buffer - pub fn take_src(&mut self) -> Option { - self.next - .0 - .take() - .and_then(|b| if b.is_empty() { None } else { Some(b) }) + pub fn take_src(&self) -> Option { + self.next.0.take().and_then(|b| { + if b.is_empty() { + self.io.memory_pool().release_read_buf(b); + None + } else { + Some(b) + } + }) } #[inline] /// Set source read buffer - pub fn set_src(&mut self, src: Option) { + pub fn set_src(&self, src: Option) { if let Some(buf) = self.next.0.take() { self.io.memory_pool().release_read_buf(buf); } @@ -247,23 +342,14 @@ impl<'a> ReadBuf<'a> { if src.is_empty() { self.io.memory_pool().release_read_buf(src); } else { - self.next.0 = Some(src); + self.next.0.set(Some(src)); } } } - #[inline] - /// Get reference to destination read buffer - pub fn get_dst(&mut self) -> &mut BytesVec { - if self.curr.0.is_none() { - self.curr.0 = Some(self.io.memory_pool().get_read_buf()); - } - self.curr.0.as_mut().unwrap() - } - #[inline] /// Take destination read buffer - pub fn take_dst(&mut self) -> BytesVec { + pub fn take_dst(&self) -> BytesVec { self.curr .0 .take() @@ -272,7 +358,7 @@ impl<'a> ReadBuf<'a> { #[inline] /// Set destination read buffer - pub fn set_dst(&mut self, dst: Option) { + pub fn set_dst(&self, dst: Option) { if let Some(buf) = self.curr.0.take() { self.io.memory_pool().release_read_buf(buf); } @@ -280,46 +366,34 @@ impl<'a> ReadBuf<'a> { if dst.is_empty() { self.io.memory_pool().release_read_buf(dst); } else { - self.curr.0 = Some(dst); + self.curr.0.set(Some(dst)); } } } #[inline] - /// Get reference to source and destination read buffers (src, dst) - pub fn get_pair(&mut self) -> (&mut BytesVec, &mut BytesVec) { - if self.next.0.is_none() { - self.next.0 = Some(self.io.memory_pool().get_read_buf()); - } - if self.curr.0.is_none() { - self.curr.0 = Some(self.io.memory_pool().get_read_buf()); - } - (self.next.0.as_mut().unwrap(), self.curr.0.as_mut().unwrap()) - } - - #[inline] - pub fn with_write_buf<'b, F, R>(&'b mut self, f: F) -> R + pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R where - F: FnOnce(&mut WriteBuf<'b>) -> R, + F: FnOnce(&WriteBuf<'b>) -> R, { let mut buf = WriteBuf { io: self.io, curr: self.curr, next: self.next, - need_write: self.need_write, + need_write: Cell::new(self.need_write.get()), }; let result = f(&mut buf); - self.need_write = buf.need_write; + self.need_write.set(buf.need_write.get()); result } } -#[derive(Debug)] +// #[derive(Debug)] pub struct WriteBuf<'a> { pub(crate) io: &'a IoRef, - pub(crate) curr: &'a mut BufferLine, - pub(crate) next: &'a mut BufferLine, - pub(crate) need_write: bool, + pub(crate) curr: &'a Buffer, + pub(crate) next: &'a Buffer, + pub(crate) need_write: Cell, } impl<'a> WriteBuf<'a> { @@ -329,57 +403,85 @@ impl<'a> WriteBuf<'a> { self.io.want_shutdown() } + #[inline] + /// Make sure buffer has enough free space + pub fn resize_buf(&self, buf: &mut BytesVec) { + self.io.memory_pool().resize_write_buf(buf); + } + #[inline] /// Get reference to source write buffer - pub fn get_src(&mut self) -> &mut BytesVec { - if self.curr.1.is_none() { - self.curr.1 = Some(self.io.memory_pool().get_write_buf()); + pub fn with_src(&self, f: F) -> R + where + F: FnOnce(&mut Option) -> R, + { + let mut item = self.curr.1.take(); + let result = f(&mut item); + if let Some(b) = item { + if b.is_empty() { + self.io.memory_pool().release_write_buf(b); + } else { + self.curr.1.set(Some(b)); + } } - self.curr.1.as_mut().unwrap() + result + } + + #[inline] + /// Get reference to destination write buffer + pub fn with_dst(&self, f: F) -> R + where + F: FnOnce(&mut BytesVec) -> R, + { + let mut item = self.next.1.take(); + if item.is_none() { + item = Some(self.io.memory_pool().get_write_buf()); + } + let buf = item.as_mut().unwrap(); + let total = buf.len(); + let result = f(buf); + + if buf.is_empty() { + self.io.memory_pool().release_write_buf(item.unwrap()); + } else { + self.need_write + .set(self.need_write.get() | (total != buf.len())); + self.next.1.set(item); + } + result } #[inline] /// Take source write buffer - pub fn take_src(&mut self) -> Option { - self.curr - .1 - .take() - .and_then(|b| if b.is_empty() { None } else { Some(b) }) + pub fn take_src(&self) -> Option { + self.curr.1.take().and_then(|b| { + if b.is_empty() { + self.io.memory_pool().release_write_buf(b); + None + } else { + Some(b) + } + }) } #[inline] /// Set source write buffer - pub fn set_src(&mut self, src: Option) { + pub fn set_src(&self, src: Option) { if let Some(buf) = self.curr.1.take() { - self.io.memory_pool().release_read_buf(buf); + self.io.memory_pool().release_write_buf(buf); } if let Some(buf) = src { if buf.is_empty() { - self.io.memory_pool().release_read_buf(buf); + self.io.memory_pool().release_write_buf(buf); } else { - self.curr.1 = Some(buf); + self.curr.1.set(Some(buf)); } } } - #[inline] - /// Get reference to destination write buffer - pub fn with_dst_buf(&mut self, f: F) -> R - where - F: FnOnce(&mut BytesVec) -> R, - { - if self.next.1.is_none() { - self.next.1 = Some(self.io.memory_pool().get_write_buf()); - } - let buf = self.next.1.as_mut().unwrap(); - let r = f(buf); - self.need_write |= !buf.is_empty(); - r - } - #[inline] /// Take destination write buffer - pub fn take_dst(&mut self) -> BytesVec { + pub fn take_dst(&self) -> BytesVec { self.next .1 .take() @@ -388,7 +490,7 @@ impl<'a> WriteBuf<'a> { #[inline] /// Set destination write buffer - pub fn set_dst(&mut self, dst: Option) { + pub fn set_dst(&self, dst: Option) { if let Some(buf) = self.next.1.take() { self.io.memory_pool().release_write_buf(buf); } @@ -396,26 +498,26 @@ impl<'a> WriteBuf<'a> { if dst.is_empty() { self.io.memory_pool().release_write_buf(dst); } else { - self.need_write |= !dst.is_empty(); - self.next.1 = Some(dst); + self.need_write.set(self.need_write.get() | !dst.is_empty()); + self.next.1.set(Some(dst)); } } } #[inline] - pub fn with_read_buf<'b, F, R>(&'b mut self, f: F) -> R + pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R where - F: FnOnce(&mut ReadBuf<'b>) -> R, + F: FnOnce(&ReadBuf<'b>) -> R, { let mut buf = ReadBuf { io: self.io, curr: self.curr, next: self.next, nbytes: 0, - need_write: self.need_write, + need_write: Cell::new(self.need_write.get()), }; let result = f(&mut buf); - self.need_write = buf.need_write; + self.need_write.set(buf.need_write.get()); result } } diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index a69b2cba..7ebe2729 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -41,21 +41,16 @@ pub trait Filter: 'static { fn process_read_buf( &self, io: &IoRef, - stack: &mut Stack, + stack: &Stack, idx: usize, nbytes: usize, ) -> io::Result; /// Process write buffer - fn process_write_buf( - &self, - io: &IoRef, - stack: &mut Stack, - idx: usize, - ) -> io::Result<()>; + fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()>; /// Gracefully shutdown filter - fn shutdown(&self, io: &IoRef, stack: &mut Stack, idx: usize) -> io::Result>; + fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result>; /// Check readiness for read operations fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll; @@ -115,7 +110,7 @@ impl Filter for Base { fn process_read_buf( &self, _: &IoRef, - _: &mut Stack, + _: &Stack, _: usize, nbytes: usize, ) -> io::Result { @@ -126,22 +121,24 @@ impl Filter for Base { } #[inline] - fn process_write_buf(&self, _: &IoRef, s: &mut Stack, _: usize) -> io::Result<()> { - if let Some(buf) = s.last_write_buf() { - let len = buf.len(); - if len > 0 && self.0.flags().contains(Flags::WR_PAUSED) { - self.0 .0.remove_flags(Flags::WR_PAUSED); - self.0 .0.write_task.wake(); + fn process_write_buf(&self, io: &IoRef, s: &Stack, _: usize) -> io::Result<()> { + s.with_write_destination(io, |buf| { + if let Some(buf) = buf { + let len = buf.len(); + if len > 0 && self.0.flags().contains(Flags::WR_PAUSED) { + self.0 .0.remove_flags(Flags::WR_PAUSED); + self.0 .0.write_task.wake(); + } + if len >= self.0.memory_pool().write_params_high() { + self.0 .0.insert_flags(Flags::WR_BACKPRESSURE); + } } - if len >= self.0.memory_pool().write_params_high() { - self.0 .0.insert_flags(Flags::WR_BACKPRESSURE); - } - } + }); Ok(()) } #[inline] - fn shutdown(&self, _: &IoRef, _: &mut Stack, _: usize) -> io::Result> { + fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result> { Ok(Poll::Ready(())) } } @@ -157,7 +154,7 @@ where } #[inline] - fn shutdown(&self, io: &IoRef, stack: &mut Stack, idx: usize) -> io::Result> { + fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result> { let result1 = stack.write_buf(io, idx, |buf| self.0.shutdown(buf))?; self.process_write_buf(io, stack, idx)?; @@ -178,7 +175,7 @@ where fn process_read_buf( &self, io: &IoRef, - stack: &mut Stack, + stack: &Stack, idx: usize, nbytes: usize, ) -> io::Result { @@ -190,18 +187,13 @@ where stack.read_buf(io, idx, status.nbytes, |buf| { self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus { nbytes, - need_write: status.need_write || buf.need_write, + need_write: status.need_write || buf.need_write.get(), }) }) } #[inline] - fn process_write_buf( - &self, - io: &IoRef, - stack: &mut Stack, - idx: usize, - ) -> io::Result<()> { + fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()> { stack.write_buf(io, idx, |buf| self.0.process_write_buf(buf))?; if F::BUFFERS { @@ -270,7 +262,7 @@ impl Filter for NullFilter { fn process_read_buf( &self, _: &IoRef, - _: &mut Stack, + _: &Stack, _: usize, _: usize, ) -> io::Result { @@ -278,12 +270,12 @@ impl Filter for NullFilter { } #[inline] - fn process_write_buf(&self, _: &IoRef, _: &mut Stack, _: usize) -> io::Result<()> { + fn process_write_buf(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<()> { Ok(()) } #[inline] - fn shutdown(&self, _: &IoRef, _: &mut Stack, _: usize) -> io::Result> { + fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result> { Ok(Poll::Ready(())) } } diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 41c08361..5be0d46e 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -1,4 +1,4 @@ -use std::cell::{Cell, RefCell}; +use std::cell::Cell; use std::task::{Context, Poll}; use std::{fmt, future::Future, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc, time}; @@ -62,7 +62,7 @@ pub(crate) struct IoState { pub(super) read_task: LocalWaker, pub(super) write_task: LocalWaker, pub(super) dispatch_task: LocalWaker, - pub(super) buffer: RefCell, + pub(super) buffer: Stack, pub(super) filter: Cell<&'static dyn Filter>, pub(super) handle: Cell>>, #[allow(clippy::box_collection)] @@ -149,7 +149,7 @@ impl hash::Hash for IoState { impl Drop for IoState { #[inline] fn drop(&mut self) { - self.buffer.borrow_mut().release(self.pool.get()); + self.buffer.release(self.pool.get()); } } @@ -171,7 +171,7 @@ impl Io { dispatch_task: LocalWaker::new(), read_task: LocalWaker::new(), write_task: LocalWaker::new(), - buffer: RefCell::new(Stack::new()), + buffer: Stack::new(), filter: Cell::new(NullFilter::get()), handle: Cell::new(None), on_disconnect: Cell::new(None), @@ -199,7 +199,7 @@ impl Io { #[inline] /// Set memory pool pub fn set_memory_pool(&self, pool: PoolRef) { - self.0 .0.buffer.borrow_mut().set_memory_pool(pool); + self.0 .0.buffer.set_memory_pool(pool); self.0 .0.pool.set(pool); } @@ -227,7 +227,7 @@ impl Io { dispatch_task: LocalWaker::new(), read_task: LocalWaker::new(), write_task: LocalWaker::new(), - buffer: RefCell::new(Stack::new()), + buffer: Stack::new(), filter: Cell::new(NullFilter::get()), handle: Cell::new(None), on_disconnect: Cell::new(None), @@ -292,7 +292,12 @@ impl Io { { // add layer to buffers if U::BUFFERS { - self.0 .0.buffer.borrow_mut().add_layer(); + // Safety: .add_layer() modifies internal buffers + // there is no api that holds references into buffers storage + // all apis first removes buffer from storage and then work with it + unsafe { &mut *(Rc::as_ptr(&self.0 .0) as *mut IoState) } + .buffer + .add_layer(); } // replace current filter @@ -489,14 +494,7 @@ impl Io { Poll::Ready(self.error().map(Err).unwrap_or(Ok(()))) } else { let inner = &self.0 .0; - let len = inner - .buffer - .borrow_mut() - .last_write_buf() - .as_ref() - .map(|b| b.len()) - .unwrap_or(0); - + let len = inner.buffer.write_destination_size(); if len > 0 { if full { inner.insert_flags(Flags::WR_WAIT); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 0f271ea6..66205de0 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -1,9 +1,9 @@ -use std::{any, cell, fmt, hash, io, time}; +use std::{any, fmt, hash, io, time}; use ntex_bytes::{BytesVec, PoolRef}; use ntex_codec::{Decoder, Encoder}; -use super::{buf::Stack, io::Flags, timer, types, Filter, IoRef, OnDisconnect, WriteBuf}; +use super::{io::Flags, timer, types, Filter, IoRef, OnDisconnect, WriteBuf}; impl IoRef { #[inline] @@ -132,11 +132,9 @@ impl IoRef { where U: Decoder, { - borrow_buffer(&self.0.buffer) - .first_read_buf() - .as_mut() - .map(|b| codec.decode_vec(b)) - .unwrap_or(Ok(None)) + self.0 + .buffer + .with_read_destination(self, |buf| codec.decode_vec(buf)) } #[inline] @@ -152,59 +150,40 @@ impl IoRef { } #[inline] - /// Get mut access to write buffer + /// Get access to write buffer pub fn with_buf(&self, f: F) -> io::Result where - F: FnOnce(&mut WriteBuf<'_>) -> R, + F: FnOnce(&WriteBuf<'_>) -> R, { - let mut buffer = borrow_buffer(&self.0.buffer); - let result = buffer.write_buf(self, 0, f); + let result = self.0.buffer.write_buf(self, 0, f); self.0 .filter .get() - .process_write_buf(self, &mut buffer, 0)?; + .process_write_buf(self, &self.0.buffer, 0)?; Ok(result) } #[inline] - /// Get mut access to write buffer + /// Get mut access to source write buffer pub fn with_write_buf(&self, f: F) -> io::Result where F: FnOnce(&mut BytesVec) -> R, { - let mut buffer = borrow_buffer(&self.0.buffer); - let result = f(buffer.first_write_buf(self)); + let result = self.0.buffer.with_write_source(self, f); self.0 .filter .get() - .process_write_buf(self, &mut buffer, 0)?; - + .process_write_buf(self, &self.0.buffer, 0)?; Ok(result) } #[inline] - /// Get mut access to read buffer + /// Get mut access to source read buffer pub fn with_read_buf(&self, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, { - // use top most buffer - let mut buffer = borrow_buffer(&self.0.buffer); - let buf = buffer.first_read_buf(); - if buf.is_none() { - *buf = Some(self.memory_pool().get_read_buf()); - } - - f(buf.as_mut().unwrap()) - } - - #[inline] - /// Get mut access to read and write buffer - pub fn with_rw_buf(&self, f: F) -> R - where - F: FnOnce(&mut BytesVec, &mut BytesVec) -> R, - { - borrow_buffer(&self.0.buffer).with_rw_buf(self, f) + self.0.buffer.with_read_destination(self, f) } #[inline] @@ -260,14 +239,6 @@ impl fmt::Debug for IoRef { } } -fn borrow_buffer(buf: &cell::RefCell) -> cell::RefMut<'_, Stack> { - if let Ok(r) = buf.try_borrow_mut() { - r - } else { - panic!("Nested access to read/write buffers are not allowed"); - } -} - #[cfg(test)] mod tests { use std::cell::{Cell, RefCell}; @@ -411,16 +382,16 @@ mod tests { impl FilterLayer for Counter { const BUFFERS: bool = false; - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result { + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result { self.read_order.borrow_mut().push(self.idx); self.in_bytes.set(self.in_bytes.get() + buf.nbytes()); Ok(buf.nbytes()) } - fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> { + fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> { self.write_order.borrow_mut().push(self.idx); self.out_bytes - .set(self.out_bytes.get() + buf.with_dst_buf(|b| b.len())); + .set(self.out_bytes.get() + buf.with_dst(|b| b.len())); Ok(()) } } diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 533354ff..1bba316b 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -71,10 +71,10 @@ pub trait FilterLayer: 'static { /// /// Inner filter must process buffer before current. /// Returns number of new bytes. - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> sio::Result; + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> sio::Result; /// Process write buffer - fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> sio::Result<()>; + fn process_write_buf(&self, buf: &WriteBuf<'_>) -> sio::Result<()>; #[inline] /// Query internal filter data @@ -84,7 +84,7 @@ pub trait FilterLayer: 'static { #[inline] /// Gracefully shutdown filter - fn shutdown(&self, buf: &mut WriteBuf<'_>) -> sio::Result> { + fn shutdown(&self, buf: &WriteBuf<'_>) -> sio::Result> { Ok(Poll::Ready(())) } } diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index e3b493c9..4b80bdbc 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -24,71 +24,59 @@ impl ReadContext { F: FnOnce(&mut BytesVec, usize, usize) -> Poll>, { let inner = &self.0 .0; - let mut stack = inner.buffer.borrow_mut(); - let mut buf = stack - .last_read_buf() - .take() - .unwrap_or_else(|| self.0.memory_pool().get_read_buf()); - - let total = buf.len(); let (hw, lw) = self.0.memory_pool().read_params().unpack(); + let (result, nbytes, total) = inner.buffer.with_read_source(&self.0, |buf| { + let total = buf.len(); - // call provided callback - let result = f(&mut buf, hw, lw); - - // handle buffer changes - if buf.is_empty() { - self.0.memory_pool().release_read_buf(buf); - } else { + // call provided callback + let result = f(buf, hw, lw); let total2 = buf.len(); let nbytes = if total2 > total { total2 - total } else { 0 }; - *stack.last_read_buf() = Some(buf); + (result, nbytes, total2) + }); - if nbytes > 0 { - let buf_full = nbytes >= hw; - let filter = self.0.filter(); - let _ = filter.process_read_buf(&self.0, &mut stack, 0, nbytes) - .and_then(|status| { - if status.nbytes > 0 { - if buf_full || stack.first_read_buf_size() >= hw { - log::trace!( - "io read buffer is too large {}, enable read back-pressure", - total2 - ); - inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); - } else { - inner.insert_flags(Flags::RD_READY); - } + // handle buffer changes + if nbytes > 0 { + let buf_full = nbytes >= hw; + let filter = self.0.filter(); + let _ = filter + .process_read_buf(&self.0, &inner.buffer, 0, nbytes) + .and_then(|status| { + if status.nbytes > 0 { + if buf_full || inner.buffer.read_destination_size() >= hw { log::trace!( - "new {} bytes available, wakeup dispatcher", - nbytes, + "io read buffer is too large {}, enable read back-pressure", + total ); - inner.dispatch_task.wake(); - } else if buf_full { - // read task is paused because of read back-pressure - // but there is no new data in top most read buffer - // so we need to wake up read task to read more data - // otherwise read task would sleep forever - inner.read_task.wake(); - } - - // while reading, filter wrote some data - // in that case filters need to process write buffers - // and potentialy wake write task - if status.need_write { - filter.process_write_buf(&self.0, &mut stack, 0) + inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); } else { - Ok(()) + inner.insert_flags(Flags::RD_READY); } - }) - .map_err(|err| { + log::trace!("new {} bytes available, wakeup dispatcher", nbytes,); inner.dispatch_task.wake(); - inner.insert_flags(Flags::RD_READY); - inner.io_stopped(Some(err)); - }); - } + } else if buf_full { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + + // while reading, filter wrote some data + // in that case filters need to process write buffers + // and potentialy wake write task + if status.need_write { + filter.process_write_buf(&self.0, &inner.buffer, 0) + } else { + Ok(()) + } + }) + .map_err(|err| { + inner.dispatch_task.wake(); + inner.insert_flags(Flags::RD_READY); + inner.io_stopped(Some(err)); + }); } - drop(stack); match result { Poll::Ready(Ok(())) => { @@ -135,20 +123,12 @@ impl WriteContext { F: FnOnce(&mut Option) -> Poll>, { let inner = &self.0 .0; - let mut stack = inner.buffer.borrow_mut(); - let buf = stack.last_write_buf(); // call provided callback - let result = f(buf); - - let mut len = 0; - if let Some(b) = buf { - if b.is_empty() { - inner.pool.get().release_write_buf(buf.take().unwrap()); - } else { - len = b.len(); - } - } + let (result, len) = inner.buffer.with_write_destination(&self.0, |buf| { + let result = f(buf); + (result, buf.as_ref().map(|b| b.len()).unwrap_or(0)) + }); // if write buffer is smaller than high watermark value, turn off back-pressure let mut flags = inner.flags.get(); @@ -191,8 +171,7 @@ fn shutdown_filters(io: &IoRef) { if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { let filter = io.filter(); - let mut buffer = st.buffer.borrow_mut(); - match filter.shutdown(io, &mut buffer, 0) { + match filter.shutdown(io, &st.buffer, 0) { Ok(Poll::Ready(())) => { st.dispatch_task.wake(); st.insert_flags(Flags::IO_STOPPING); @@ -211,7 +190,7 @@ fn shutdown_filters(io: &IoRef) { st.io_stopped(Some(err)); } } - if let Err(err) = filter.process_write_buf(io, &mut buffer, 0) { + if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) { st.io_stopped(Some(err)); } } diff --git a/ntex-io/src/utils.rs b/ntex-io/src/utils.rs index c196ee45..bdaa3da4 100644 --- a/ntex-io/src/utils.rs +++ b/ntex-io/src/utils.rs @@ -118,11 +118,11 @@ mod tests { pub(crate) struct TestFilter; impl FilterLayer for TestFilter { - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result { + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result { Ok(buf.nbytes()) } - fn process_write_buf(&self, _: &mut WriteBuf<'_>) -> io::Result<()> { + fn process_write_buf(&self, _: &WriteBuf<'_>) -> io::Result<()> { Ok(()) } } diff --git a/ntex-tls/CHANGES.md b/ntex-tls/CHANGES.md index 162503b3..0aa08998 100644 --- a/ntex-tls/CHANGES.md +++ b/ntex-tls/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.4] - 2023-01-29 + +* Update buffer api + ## [0.2.3] - 2023-01-25 * Fix double buf cleanup diff --git a/ntex-tls/Cargo.toml b/ntex-tls/Cargo.toml index d59f2375..38f8b5ba 100644 --- a/ntex-tls/Cargo.toml +++ b/ntex-tls/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tls" -version = "0.2.3" +version = "0.2.4" authors = ["ntex contributors "] description = "An implementation of SSL streams for ntex backed by OpenSSL" keywords = ["network", "framework", "async", "futures"] @@ -26,7 +26,7 @@ rustls = ["tls_rust"] [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.2.3" +ntex-io = "0.2.7" ntex-util = "0.2.0" ntex-service = "1.0.0" log = "0.4" @@ -39,7 +39,7 @@ tls_openssl = { version="0.10", package = "openssl", optional = true } tls_rust = { version = "0.20", package = "rustls", optional = true } [dev-dependencies] -ntex = { version = "0.6.1", features = ["openssl", "rustls", "tokio"] } +ntex = { version = "0.6.3", features = ["openssl", "rustls", "tokio"] } env_logger = "0.10" rustls-pemfile = { version = "1.0" } webpki-roots = { version = "0.22" } diff --git a/ntex-tls/src/openssl/mod.rs b/ntex-tls/src/openssl/mod.rs index b018ff86..93ab73d3 100644 --- a/ntex-tls/src/openssl/mod.rs +++ b/ntex-tls/src/openssl/mod.rs @@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell}; use std::{any, cmp, error::Error, io, task::Context, task::Poll}; -use ntex_bytes::{BufMut, BytesVec, PoolRef}; +use ntex_bytes::{BufMut, BytesVec}; use ntex_io::{types, Filter, FilterFactory, FilterLayer, Io, Layer, ReadBuf, WriteBuf}; use ntex_util::{future::poll_fn, future::BoxFuture, ready, time, time::Millis}; use tls_openssl::ssl::{self, NameType, SslStream}; @@ -24,25 +24,22 @@ pub struct PeerCertChain(pub Vec); /// An implementation of SSL streams pub struct SslFilter { inner: RefCell>, - pool: PoolRef, handshake: Cell, } struct IoInner { source: Option, destination: Option, - pool: PoolRef, } impl io::Read for IoInner { fn read(&mut self, dst: &mut [u8]) -> io::Result { - if let Some(mut buf) = self.source.take() { + if let Some(ref mut buf) = self.source { if buf.is_empty() { Err(io::Error::from(io::ErrorKind::WouldBlock)) } else { let len = cmp::min(buf.len(), dst.len()); dst[..len].copy_from_slice(&buf.split_to(len)); - self.source = Some(buf); Ok(len) } } else { @@ -53,13 +50,7 @@ impl io::Read for IoInner { impl io::Write for IoInner { fn write(&mut self, src: &[u8]) -> io::Result { - let mut buf = if let Some(buf) = self.destination.take() { - buf - } else { - BytesVec::with_capacity_in(src.len(), self.pool) - }; - buf.extend_from_slice(src); - self.destination = Some(buf); + self.destination.as_mut().unwrap().extend_from_slice(src); Ok(src.len()) } @@ -69,7 +60,7 @@ impl io::Write for IoInner { } impl SslFilter { - fn with_buffers(&self, buf: &mut WriteBuf<'_>, f: F) -> R + fn with_buffers(&self, buf: &WriteBuf<'_>, f: F) -> R where F: FnOnce() -> R, { @@ -80,16 +71,6 @@ impl SslFilter { buf.with_read_buf(|b| b.set_src(self.inner.borrow_mut().get_mut().source.take())); result } - - fn set_buffers(&self, buf: &mut WriteBuf<'_>) { - self.inner.borrow_mut().get_mut().destination = Some(buf.take_dst()); - self.inner.borrow_mut().get_mut().source = buf.with_read_buf(|b| b.take_src()); - } - - fn unset_buffers(&self, buf: &mut WriteBuf<'_>) { - buf.set_dst(self.inner.borrow_mut().get_mut().destination.take()); - buf.with_read_buf(|b| b.set_src(self.inner.borrow_mut().get_mut().source.take())); - } } impl FilterLayer for SslFilter { @@ -141,7 +122,7 @@ impl FilterLayer for SslFilter { } } - fn shutdown(&self, buf: &mut WriteBuf<'_>) -> io::Result> { + fn shutdown(&self, buf: &WriteBuf<'_>) -> io::Result> { let ssl_result = self.with_buffers(buf, || self.inner.borrow_mut().shutdown()); match ssl_result { @@ -160,75 +141,72 @@ impl FilterLayer for SslFilter { } } - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result { - buf.with_write_buf(|b| self.set_buffers(b)); + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result { + buf.with_write_buf(|b| { + self.with_buffers(b, || { + buf.with_dst(|dst| { + let mut new_bytes = usize::from(self.handshake.get()); + loop { + buf.resize_buf(dst); - let dst = buf.get_dst(); - //let mut new_bytes = usize::from(self.handshake.get()); - let mut new_bytes = 1; - loop { - // make sure we've got room - self.pool.resize_read_buf(dst); - - let chunk: &mut [u8] = unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; - let ssl_result = self.inner.borrow_mut().ssl_read(chunk); - let result = match ssl_result { - Ok(v) => { - unsafe { dst.advance_mut(v) }; - new_bytes += v; - continue; - } - Err(ref e) - if e.code() == ssl::ErrorCode::WANT_READ - || e.code() == ssl::ErrorCode::WANT_WRITE => - { - Ok(new_bytes) - } - Err(ref e) if e.code() == ssl::ErrorCode::ZERO_RETURN => { - buf.want_shutdown(); - Ok(new_bytes) - } - Err(e) => { - log::trace!("SSL Error: {:?}", e); - Err(map_to_ioerr(e)) - } - }; - - buf.with_write_buf(|b| self.unset_buffers(b)); - return result; - } + let chunk: &mut [u8] = + unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; + let ssl_result = self.inner.borrow_mut().ssl_read(chunk); + let result = match ssl_result { + Ok(v) => { + unsafe { dst.advance_mut(v) }; + new_bytes += v; + continue; + } + Err(ref e) + if e.code() == ssl::ErrorCode::WANT_READ + || e.code() == ssl::ErrorCode::WANT_WRITE => + { + Ok(new_bytes) + } + Err(ref e) if e.code() == ssl::ErrorCode::ZERO_RETURN => { + buf.want_shutdown(); + Ok(new_bytes) + } + Err(e) => { + log::trace!("SSL Error: {:?}", e); + Err(map_to_ioerr(e)) + } + }; + return result; + } + }) + }) + }) } - fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> { - if let Some(mut src) = buf.take_src() { - self.set_buffers(buf); - - loop { - if src.is_empty() { - self.unset_buffers(buf); - return Ok(()); - } - let ssl_result = self.inner.borrow_mut().ssl_write(&src); - match ssl_result { - Ok(v) => { - src.split_to(v); - continue; + fn process_write_buf(&self, wb: &WriteBuf<'_>) -> io::Result<()> { + wb.with_src(|b| { + if let Some(src) = b { + self.with_buffers(wb, || loop { + if src.is_empty() { + return Ok(()); } - Err(e) => { - buf.set_src(Some(src)); - self.unset_buffers(buf); - return match e.code() { - ssl::ErrorCode::WANT_READ | ssl::ErrorCode::WANT_WRITE => { - Ok(()) - } - _ => Err(map_to_ioerr(e)), - }; + let ssl_result = self.inner.borrow_mut().ssl_write(src); + match ssl_result { + Ok(v) => { + src.split_to(v); + continue; + } + Err(e) => { + return match e.code() { + ssl::ErrorCode::WANT_READ | ssl::ErrorCode::WANT_WRITE => { + Ok(()) + } + _ => Err(map_to_ioerr(e)), + }; + } } - } + }) + } else { + Ok(()) } - } else { - Ok(()) - } + }) } } @@ -278,12 +256,10 @@ impl FilterFactory for SslAcceptor { time::timeout(timeout, async { let ssl = ctx_result.map_err(map_to_ioerr)?; let inner = IoInner { - pool: io.memory_pool(), source: None, destination: None, }; let filter = SslFilter { - pool: io.memory_pool(), handshake: Cell::new(true), inner: RefCell::new(ssl::SslStream::new(ssl, inner)?), }; @@ -336,12 +312,10 @@ impl FilterFactory for SslConnector { fn create(self, io: Io) -> Self::Future { Box::pin(async move { let inner = IoInner { - pool: io.memory_pool(), source: None, destination: None, }; let filter = SslFilter { - pool: io.memory_pool(), handshake: Cell::new(true), inner: RefCell::new(ssl::SslStream::new(self.ssl, inner)?), }; diff --git a/ntex-tls/src/rustls/client.rs b/ntex-tls/src/rustls/client.rs index 557f4c23..9ec38e8a 100644 --- a/ntex-tls/src/rustls/client.rs +++ b/ntex-tls/src/rustls/client.rs @@ -56,62 +56,60 @@ impl FilterLayer for TlsClientFilter { } } - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result { + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result { let mut session = self.session.borrow_mut(); + let mut new_bytes = usize::from(self.inner.handshake.get()); // get processed buffer - let (src, dst) = buf.get_pair(); - let mut new_bytes = usize::from(self.inner.handshake.get()); - loop { - // make sure we've got room - self.inner.pool.resize_read_buf(dst); + buf.with_src(|src| { + if let Some(src) = src { + buf.with_dst(|dst| { + loop { + let mut cursor = io::Cursor::new(&src); + let n = session.read_tls(&mut cursor)?; + src.split_to(n); + let state = session + .process_new_packets() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let mut cursor = io::Cursor::new(&src); - let n = session.read_tls(&mut cursor)?; - src.split_to(n); - let state = session - .process_new_packets() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - let new_b = state.plaintext_bytes_to_read(); - if new_b > 0 { - dst.reserve(new_b); - let chunk: &mut [u8] = - unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; - let v = session.reader().read(chunk)?; - unsafe { dst.advance_mut(v) }; - new_bytes += v; - } else { - break; + let new_b = state.plaintext_bytes_to_read(); + if new_b > 0 { + dst.reserve(new_b); + let chunk: &mut [u8] = + unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; + let v = session.reader().read(chunk)?; + unsafe { dst.advance_mut(v) }; + new_bytes += v; + } else { + break; + } + } + Ok::<_, io::Error>(()) + })?; } - } - - Ok(new_bytes) + Ok(new_bytes) + }) } - fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> { - if let Some(mut src) = buf.take_src() { - let mut session = self.session.borrow_mut(); - let mut io = Wrapper(&self.inner, buf); + fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> { + buf.with_src(|src| { + if let Some(src) = src { + let mut session = self.session.borrow_mut(); + let mut io = Wrapper(&self.inner, buf); - loop { - if !src.is_empty() { - let n = session.writer().write(&src)?; - src.split_to(n); - } - - if session.wants_write() { - session.complete_io(&mut io)?; - } else { - break; + loop { + if !src.is_empty() { + src.split_to(session.writer().write(src)?); + } + if session.wants_write() { + session.complete_io(&mut io)?; + } else { + break; + } } } - - buf.set_src(Some(src)); Ok(()) - } else { - Ok(()) - } + }) } } @@ -125,7 +123,6 @@ impl TlsClientFilter { .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; let filter = TlsFilter::new_client(TlsClientFilter { inner: IoInner { - pool: io.memory_pool(), handshake: Cell::new(true), }, session: RefCell::new(session), diff --git a/ntex-tls/src/rustls/mod.rs b/ntex-tls/src/rustls/mod.rs index 814c26c2..52bc7529 100644 --- a/ntex-tls/src/rustls/mod.rs +++ b/ntex-tls/src/rustls/mod.rs @@ -2,7 +2,6 @@ //! An implementation of SSL streams for ntex backed by OpenSSL use std::{any, cell::Cell, cmp, io, sync::Arc, task::Context, task::Poll}; -use ntex_bytes::PoolRef; use ntex_io::{ Filter, FilterFactory, FilterLayer, Io, Layer, ReadBuf, ReadStatus, WriteBuf, WriteStatus, @@ -71,7 +70,7 @@ impl FilterLayer for TlsFilter { } #[inline] - fn shutdown(&self, buf: &mut WriteBuf<'_>) -> io::Result> { + fn shutdown(&self, buf: &WriteBuf<'_>) -> io::Result> { match self.inner { InnerTlsFilter::Server(ref f) => f.shutdown(buf), InnerTlsFilter::Client(ref f) => f.shutdown(buf), @@ -95,7 +94,7 @@ impl FilterLayer for TlsFilter { } #[inline] - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result { + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result { match self.inner { InnerTlsFilter::Server(ref f) => f.process_read_buf(buf), InnerTlsFilter::Client(ref f) => f.process_read_buf(buf), @@ -103,7 +102,7 @@ impl FilterLayer for TlsFilter { } #[inline] - fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> { + fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> { match self.inner { InnerTlsFilter::Server(ref f) => f.process_write_buf(buf), InnerTlsFilter::Client(ref f) => f.process_write_buf(buf), @@ -219,30 +218,31 @@ impl FilterFactory for TlsConnectorConfigured { } pub(crate) struct IoInner { - pool: PoolRef, handshake: Cell, } -pub(crate) struct Wrapper<'a, 'b>(&'a IoInner, &'a mut WriteBuf<'b>); +pub(crate) struct Wrapper<'a, 'b>(&'a IoInner, &'a WriteBuf<'b>); impl<'a, 'b> io::Read for Wrapper<'a, 'b> { fn read(&mut self, dst: &mut [u8]) -> io::Result { self.1.with_read_buf(|buf| { - let read_buf = buf.get_src(); - let len = cmp::min(read_buf.len(), dst.len()); - if len > 0 { - dst[..len].copy_from_slice(&read_buf.split_to(len)); - Ok(len) - } else { + buf.with_src(|buf| { + if let Some(buf) = buf { + let len = cmp::min(buf.len(), dst.len()); + if len > 0 { + dst[..len].copy_from_slice(&buf.split_to(len)); + return Ok(len); + } + } Err(io::Error::new(io::ErrorKind::WouldBlock, "")) - } + }) }) } } impl<'a, 'b> io::Write for Wrapper<'a, 'b> { fn write(&mut self, src: &[u8]) -> io::Result { - self.1.with_dst_buf(|buf| buf.extend_from_slice(src)); + self.1.with_dst(|buf| buf.extend_from_slice(src)); Ok(src.len()) } diff --git a/ntex-tls/src/rustls/server.rs b/ntex-tls/src/rustls/server.rs index 6b22bb1e..8cc965a7 100644 --- a/ntex-tls/src/rustls/server.rs +++ b/ntex-tls/src/rustls/server.rs @@ -63,60 +63,60 @@ impl FilterLayer for TlsServerFilter { } } - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result { + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result { let mut session = self.session.borrow_mut(); + let mut new_bytes = usize::from(self.inner.handshake.get()); // get processed buffer - let (src, dst) = buf.get_pair(); - let mut new_bytes = usize::from(self.inner.handshake.get()); - loop { - // make sure we've got room - self.inner.pool.resize_read_buf(dst); + buf.with_src(|src| { + if let Some(src) = src { + buf.with_dst(|dst| { + loop { + let mut cursor = io::Cursor::new(&src); + let n = session.read_tls(&mut cursor)?; + src.split_to(n); + let state = session + .process_new_packets() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let mut cursor = io::Cursor::new(&src); - let n = session.read_tls(&mut cursor)?; - src.split_to(n); - let state = session - .process_new_packets() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - let new_b = state.plaintext_bytes_to_read(); - if new_b > 0 { - dst.reserve(new_b); - let chunk: &mut [u8] = - unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; - let v = session.reader().read(chunk)?; - unsafe { dst.advance_mut(v) }; - new_bytes += v; - } else { - break; + let new_b = state.plaintext_bytes_to_read(); + if new_b > 0 { + dst.reserve(new_b); + let chunk: &mut [u8] = + unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; + let v = session.reader().read(chunk)?; + unsafe { dst.advance_mut(v) }; + new_bytes += v; + } else { + break; + } + } + Ok::<_, io::Error>(()) + })?; } - } - - Ok(new_bytes) + Ok(new_bytes) + }) } - fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> { - if let Some(mut src) = buf.take_src() { - let mut session = self.session.borrow_mut(); - let mut io = Wrapper(&self.inner, buf); + fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> { + buf.with_src(|src| { + if let Some(src) = src { + let mut session = self.session.borrow_mut(); + let mut io = Wrapper(&self.inner, buf); - loop { - if !src.is_empty() { - let n = session.writer().write(&src)?; - src.split_to(n); - } - - if session.wants_write() { - session.complete_io(&mut io)?; - } else { - break; + loop { + if !src.is_empty() { + src.split_to(session.writer().write(src)?); + } + if session.wants_write() { + session.complete_io(&mut io)?; + } else { + break; + } } } - - buf.set_src(Some(src)); - } - Ok(()) + Ok(()) + }) } } @@ -132,7 +132,6 @@ impl TlsServerFilter { let filter = TlsFilter::new_server(TlsServerFilter { session: RefCell::new(session), inner: IoInner { - pool: io.memory_pool(), handshake: Cell::new(true), }, }); diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 5e29ac69..ef4c3c46 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.6.2" +version = "0.6.3" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -58,8 +58,8 @@ ntex-util = "0.2.0" ntex-bytes = "0.1.19" ntex-h2 = "0.2.1" ntex-rt = "0.4.7" -ntex-io = "0.2.3" -ntex-tls = "0.2.3" +ntex-io = "0.2.7" +ntex-tls = "0.2.4" ntex-tokio = { version = "0.2.1", optional = true } ntex-glommio = { version = "0.2.1", optional = true } ntex-async-std = { version = "0.2.1", optional = true } diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 4edc70c2..e2244632 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -684,8 +684,7 @@ where // read request payload let mut updated = false; loop { - let res = io.poll_recv(&payload.0, cx); - match res { + match io.poll_recv(&payload.0, cx) { Poll::Ready(Ok(PayloadItem::Chunk(chunk))) => { updated = true; payload.1.feed_data(chunk); @@ -945,6 +944,7 @@ mod tests { #[crate::rt_test] async fn test_pipeline_with_payload() { + env_logger::init(); let (client, server) = Io::create(); client.remote_buffer_cap(4096); let mut decoder = ClientCodec::default(); diff --git a/ntex/src/ws/transport.rs b/ntex/src/ws/transport.rs index 7938be7a..e379ee22 100644 --- a/ntex/src/ws/transport.rs +++ b/ntex/src/ws/transport.rs @@ -58,7 +58,7 @@ impl WsTransport { impl FilterLayer for WsTransport { #[inline] - fn shutdown(&self, buf: &mut WriteBuf<'_>) -> io::Result> { + fn shutdown(&self, buf: &WriteBuf<'_>) -> io::Result> { let flags = self.flags.get(); if !flags.contains(Flags::CLOSED) { self.insert_flags(Flags::CLOSED); @@ -67,7 +67,7 @@ impl FilterLayer for WsTransport { } else { CloseCode::Normal }; - let _ = buf.with_dst_buf(|buf| { + let _ = buf.with_dst(|buf| { self.codec.encode_vec( Message::Close(Some(CloseReason { code, @@ -80,7 +80,7 @@ impl FilterLayer for WsTransport { Ok(Poll::Ready(())) } - fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result { + fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result { if let Some(mut src) = buf.take_src() { let mut dst = buf.take_dst(); let dst_len = dst.len(); @@ -133,7 +133,7 @@ impl FilterLayer for WsTransport { } Frame::Ping(msg) => { let _ = buf.with_write_buf(|b| { - b.with_dst_buf(|b| self.codec.encode_vec(Message::Pong(msg), b)) + b.with_dst(|b| self.codec.encode_vec(Message::Pong(msg), b)) }); } Frame::Pong(_) => (), @@ -153,9 +153,9 @@ impl FilterLayer for WsTransport { } } - fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> { + fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> { if let Some(src) = buf.take_src() { - buf.with_dst_buf(|dst| { + buf.with_dst(|dst| { // make sure we've got room let (hw, lw) = self.pool.write_params().unpack(); let remaining = dst.remaining_mut();