Refactor io buffers api (#169)

* Refactor io buffers api
This commit is contained in:
Nikolay Kim 2023-01-29 23:02:12 +06:00 committed by GitHub
parent 3b6cf6a3ef
commit 0f8387c3ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 573 additions and 553 deletions

View file

@ -31,7 +31,7 @@ impl fmt::Debug for BsDebug<'_> {
} else if (0x20..0x7f).contains(&c) {
write!(fmt, "{}", c as char)?;
} else {
write!(fmt, "\\x{:02x}", c)?;
write!(fmt, "\\x{c:02x}")?;
}
}
write!(fmt, "\"")?;

View file

@ -6,7 +6,7 @@ struct BytesRef<'a>(&'a [u8]);
impl<'a> LowerHex for BytesRef<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
for b in self.0 {
write!(f, "{:02x}", b)?;
write!(f, "{b:02x}")?;
}
Ok(())
}
@ -15,7 +15,7 @@ impl<'a> LowerHex for BytesRef<'a> {
impl<'a> UpperHex for BytesRef<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
for b in self.0 {
write!(f, "{:02X}", b)?;
write!(f, "{b:02X}")?;
}
Ok(())
}

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.7] - 2023-01-29
* Refactor buffer api
## [0.2.6] - 2023-01-27
* Add IoRef::with_rw_buf() helper

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.2.6"
version = "0.2.7"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -29,4 +29,4 @@ pin-project-lite = "0.2"
rand = "0.8"
env_logger = "0.10"
ntex = { version = "0.6.2", features = ["tokio"] }
ntex = { version = "0.6.3", features = ["tokio"] }

View file

@ -1,14 +1,15 @@
use std::cell::Cell;
use ntex_bytes::{BytesVec, PoolRef};
use ntex_util::future::Either;
use crate::IoRef;
type BufferLine = (Option<BytesVec>, Option<BytesVec>);
type Buffer = (Cell<Option<BytesVec>>, Cell<Option<BytesVec>>);
#[derive(Debug)]
pub struct Stack {
len: usize,
buffers: Either<[BufferLine; 3], Vec<BufferLine>>,
buffers: Either<[Buffer; 3], Vec<Buffer>>,
}
impl Stack {
@ -22,153 +23,204 @@ impl Stack {
pub(crate) fn add_layer(&mut self) {
match &mut self.buffers {
Either::Left(b) => {
// move to vec
if self.len == 3 {
// move to vec
let mut vec = vec![(None, None)];
let mut vec = vec![(Cell::new(None), Cell::new(None))];
for item in b.iter_mut().take(self.len) {
vec.push((item.0.take(), item.1.take()));
vec.push((Cell::new(item.0.take()), Cell::new(item.1.take())));
}
self.len += 1;
self.buffers = Either::Right(vec);
} else {
let mut idx = self.len;
while idx > 0 {
let item = (b[idx - 1].0.take(), b[idx - 1].1.take());
let item = (
Cell::new(b[idx - 1].0.take()),
Cell::new(b[idx - 1].1.take()),
);
b[idx] = item;
idx -= 1;
}
b[0] = (None, None);
b[0] = (Cell::new(None), Cell::new(None));
self.len += 1;
}
}
Either::Right(vec) => {
self.len += 1;
vec.insert(0, (None, None));
vec.insert(0, (Cell::new(None), Cell::new(None)));
}
}
}
fn get_buffers<F, R>(&mut self, idx: usize, f: F) -> R
fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
where
F: FnOnce(&mut BufferLine, &mut BufferLine) -> R,
F: FnOnce(&Buffer, &Buffer) -> R,
{
let buffers = match self.buffers {
Either::Left(ref mut b) => &mut b[..],
Either::Right(ref mut b) => &mut b[..],
Either::Left(ref b) => &b[..],
Either::Right(ref b) => &b[..],
};
let pos = idx + 1;
if self.len > pos {
let (curr, next) = buffers.split_at_mut(pos);
f(&mut curr[idx], &mut next[0])
let next = idx + 1;
if self.len > next {
f(&buffers[idx], &buffers[next])
} else {
let mut curr = (buffers[idx].0.take(), None);
let mut next = (None, buffers[idx].1.take());
let curr = (Cell::new(buffers[idx].0.take()), Cell::new(None));
let next = (Cell::new(None), Cell::new(buffers[idx].1.take()));
let result = f(&mut curr, &mut next);
buffers[idx].0 = curr.0;
buffers[idx].1 = next.1;
let result = f(&curr, &next);
buffers[idx].0.set(curr.0.take());
buffers[idx].1.set(next.1.take());
result
}
}
fn get_first_level(&mut self) -> &mut BufferLine {
match &mut self.buffers {
Either::Left(b) => &mut b[0],
Either::Right(b) => &mut b[0],
fn get_first_level(&self) -> &Buffer {
match &self.buffers {
Either::Left(b) => &b[0],
Either::Right(b) => &b[0],
}
}
fn get_last_level(&mut self) -> &mut BufferLine {
match &mut self.buffers {
Either::Left(b) => &mut b[self.len - 1],
Either::Right(b) => &mut b[self.len - 1],
fn get_last_level(&self) -> &Buffer {
match &self.buffers {
Either::Left(b) => &b[self.len - 1],
Either::Right(b) => &b[self.len - 1],
}
}
pub(crate) fn read_buf<F, R>(
&mut self,
io: &IoRef,
idx: usize,
nbytes: usize,
f: F,
) -> R
pub(crate) fn read_buf<F, R>(&self, io: &IoRef, idx: usize, nbytes: usize, f: F) -> R
where
F: FnOnce(&mut ReadBuf<'_>) -> R,
F: FnOnce(&ReadBuf<'_>) -> R,
{
self.get_buffers(idx, |curr, next| {
let mut buf = ReadBuf {
let buf = ReadBuf {
io,
nbytes,
curr,
next,
need_write: false,
need_write: Cell::new(false),
};
f(&mut buf)
f(&buf)
})
}
pub(crate) fn write_buf<F, R>(&mut self, io: &IoRef, idx: usize, f: F) -> R
pub(crate) fn write_buf<F, R>(&self, io: &IoRef, idx: usize, f: F) -> R
where
F: FnOnce(&mut WriteBuf<'_>) -> R,
F: FnOnce(&WriteBuf<'_>) -> R,
{
self.get_buffers(idx, |curr, next| {
let mut buf = WriteBuf {
let buf = WriteBuf {
io,
curr,
next,
need_write: false,
need_write: Cell::new(false),
};
f(&mut buf)
f(&buf)
})
}
pub(crate) fn first_read_buf_size(&mut self) -> usize {
self.get_first_level()
.0
.as_ref()
.map(|b| b.len())
.unwrap_or(0)
}
pub(crate) fn with_rw_buf<F, R>(&mut self, io: &IoRef, f: F) -> R
pub(crate) fn with_read_source<F, R>(&self, io: &IoRef, f: F) -> R
where
F: FnOnce(&mut BytesVec, &mut BytesVec) -> R,
F: FnOnce(&mut BytesVec) -> R,
{
let lvl = self.get_first_level();
if lvl.0.is_none() {
lvl.0 = Some(io.memory_pool().get_read_buf());
let item = self.get_last_level();
let mut rb = item.0.take();
if rb.is_none() {
rb = Some(io.memory_pool().get_read_buf());
}
if lvl.1.is_none() {
lvl.1 = Some(io.memory_pool().get_write_buf());
let result = f(rb.as_mut().unwrap());
if let Some(b) = rb {
if b.is_empty() {
io.memory_pool().release_read_buf(b);
} else {
item.0.set(Some(b));
}
}
f(lvl.0.as_mut().unwrap(), lvl.1.as_mut().unwrap())
result
}
pub(crate) fn first_read_buf(&mut self) -> &mut Option<BytesVec> {
&mut self.get_first_level().0
}
pub(crate) fn first_write_buf(&mut self, io: &IoRef) -> &mut BytesVec {
let item = &mut self.get_first_level().1;
if item.is_none() {
*item = Some(io.memory_pool().get_write_buf());
pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
where
F: FnOnce(&mut BytesVec) -> R,
{
let item = self.get_first_level();
let mut rb = item.0.take();
if rb.is_none() {
rb = Some(io.memory_pool().get_read_buf());
}
item.as_mut().unwrap()
let result = f(rb.as_mut().unwrap());
if let Some(b) = rb {
if b.is_empty() {
io.memory_pool().release_read_buf(b);
} else {
item.0.set(Some(b));
}
}
result
}
pub(crate) fn last_read_buf(&mut self) -> &mut Option<BytesVec> {
&mut self.get_last_level().0
pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
where
F: FnOnce(&mut BytesVec) -> R,
{
let item = self.get_first_level();
let mut wb = item.1.take();
if wb.is_none() {
wb = Some(io.memory_pool().get_write_buf());
}
let result = f(wb.as_mut().unwrap());
if let Some(b) = wb {
if b.is_empty() {
io.memory_pool().release_write_buf(b);
} else {
item.1.set(Some(b));
}
}
result
}
pub(crate) fn last_write_buf(&mut self) -> &mut Option<BytesVec> {
&mut self.get_last_level().1
pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
where
F: FnOnce(&mut Option<BytesVec>) -> R,
{
let item = self.get_last_level();
let mut wb = item.1.take();
let result = f(&mut wb);
if let Some(b) = wb {
if b.is_empty() {
io.memory_pool().release_write_buf(b);
} else {
item.1.set(Some(b));
}
}
result
}
pub(crate) fn release(&mut self, pool: PoolRef) {
let items = match &mut self.buffers {
Either::Left(b) => &mut b[..],
Either::Right(b) => &mut b[..],
pub(crate) fn read_destination_size(&self) -> usize {
let item = self.get_first_level();
let rb = item.0.take();
let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
item.0.set(rb);
size
}
pub(crate) fn write_destination_size(&self) -> usize {
let item = self.get_last_level();
let wb = item.1.take();
let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
item.1.set(wb);
size
}
pub(crate) fn release(&self, pool: PoolRef) {
let items = match &self.buffers {
Either::Left(b) => &b[..],
Either::Right(b) => &b[..],
};
for item in items {
@ -181,29 +233,31 @@ impl Stack {
}
}
pub(crate) fn set_memory_pool(&mut self, pool: PoolRef) {
let items = match &mut self.buffers {
Either::Left(b) => &mut b[..],
Either::Right(b) => &mut b[..],
pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
let items = match &self.buffers {
Either::Left(b) => &b[..],
Either::Right(b) => &b[..],
};
for buf in items {
if let Some(ref mut b) = buf.0 {
pool.move_vec_in(b);
for item in items {
if let Some(mut b) = item.0.take() {
pool.move_vec_in(&mut b);
item.0.set(Some(b));
}
if let Some(ref mut b) = buf.1 {
pool.move_vec_in(b);
if let Some(mut b) = item.1.take() {
pool.move_vec_in(&mut b);
item.1.set(Some(b));
}
}
}
}
#[derive(Debug)]
// #[derive(Debug)]
pub struct ReadBuf<'a> {
pub(crate) io: &'a IoRef,
pub(crate) curr: &'a mut BufferLine,
pub(crate) next: &'a mut BufferLine,
pub(crate) curr: &'a Buffer,
pub(crate) next: &'a Buffer,
pub(crate) nbytes: usize,
pub(crate) need_write: bool,
pub(crate) need_write: Cell<bool>,
}
impl<'a> ReadBuf<'a> {
@ -219,27 +273,68 @@ impl<'a> ReadBuf<'a> {
self.io.want_shutdown()
}
#[inline]
/// Make sure buffer has enough free space
pub fn resize_buf(&self, buf: &mut BytesVec) {
self.io.memory_pool().resize_read_buf(buf);
}
#[inline]
/// Get reference to source read buffer
pub fn get_src(&mut self) -> &mut BytesVec {
if self.next.0.is_none() {
self.next.0 = Some(self.io.memory_pool().get_read_buf());
pub fn with_src<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Option<BytesVec>) -> R,
{
let mut item = self.next.0.take();
let result = f(&mut item);
if let Some(b) = item {
if b.is_empty() {
self.io.memory_pool().release_read_buf(b);
} else {
self.next.0.set(Some(b));
}
}
self.next.0.as_mut().unwrap()
result
}
#[inline]
/// Get reference to destination read buffer
pub fn with_dst<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesVec) -> R,
{
let mut item = self.curr.0.take();
if item.is_none() {
item = Some(self.io.memory_pool().get_read_buf());
}
let result = f(item.as_mut().unwrap());
if let Some(b) = item {
if b.is_empty() {
self.io.memory_pool().release_read_buf(b);
} else {
self.curr.0.set(Some(b));
}
}
result
}
#[inline]
/// Take source read buffer
pub fn take_src(&mut self) -> Option<BytesVec> {
self.next
.0
.take()
.and_then(|b| if b.is_empty() { None } else { Some(b) })
pub fn take_src(&self) -> Option<BytesVec> {
self.next.0.take().and_then(|b| {
if b.is_empty() {
self.io.memory_pool().release_read_buf(b);
None
} else {
Some(b)
}
})
}
#[inline]
/// Set source read buffer
pub fn set_src(&mut self, src: Option<BytesVec>) {
pub fn set_src(&self, src: Option<BytesVec>) {
if let Some(buf) = self.next.0.take() {
self.io.memory_pool().release_read_buf(buf);
}
@ -247,23 +342,14 @@ impl<'a> ReadBuf<'a> {
if src.is_empty() {
self.io.memory_pool().release_read_buf(src);
} else {
self.next.0 = Some(src);
self.next.0.set(Some(src));
}
}
}
#[inline]
/// Get reference to destination read buffer
pub fn get_dst(&mut self) -> &mut BytesVec {
if self.curr.0.is_none() {
self.curr.0 = Some(self.io.memory_pool().get_read_buf());
}
self.curr.0.as_mut().unwrap()
}
#[inline]
/// Take destination read buffer
pub fn take_dst(&mut self) -> BytesVec {
pub fn take_dst(&self) -> BytesVec {
self.curr
.0
.take()
@ -272,7 +358,7 @@ impl<'a> ReadBuf<'a> {
#[inline]
/// Set destination read buffer
pub fn set_dst(&mut self, dst: Option<BytesVec>) {
pub fn set_dst(&self, dst: Option<BytesVec>) {
if let Some(buf) = self.curr.0.take() {
self.io.memory_pool().release_read_buf(buf);
}
@ -280,46 +366,34 @@ impl<'a> ReadBuf<'a> {
if dst.is_empty() {
self.io.memory_pool().release_read_buf(dst);
} else {
self.curr.0 = Some(dst);
self.curr.0.set(Some(dst));
}
}
}
#[inline]
/// Get reference to source and destination read buffers (src, dst)
pub fn get_pair(&mut self) -> (&mut BytesVec, &mut BytesVec) {
if self.next.0.is_none() {
self.next.0 = Some(self.io.memory_pool().get_read_buf());
}
if self.curr.0.is_none() {
self.curr.0 = Some(self.io.memory_pool().get_read_buf());
}
(self.next.0.as_mut().unwrap(), self.curr.0.as_mut().unwrap())
}
#[inline]
pub fn with_write_buf<'b, F, R>(&'b mut self, f: F) -> R
pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
where
F: FnOnce(&mut WriteBuf<'b>) -> R,
F: FnOnce(&WriteBuf<'b>) -> R,
{
let mut buf = WriteBuf {
io: self.io,
curr: self.curr,
next: self.next,
need_write: self.need_write,
need_write: Cell::new(self.need_write.get()),
};
let result = f(&mut buf);
self.need_write = buf.need_write;
self.need_write.set(buf.need_write.get());
result
}
}
#[derive(Debug)]
// #[derive(Debug)]
pub struct WriteBuf<'a> {
pub(crate) io: &'a IoRef,
pub(crate) curr: &'a mut BufferLine,
pub(crate) next: &'a mut BufferLine,
pub(crate) need_write: bool,
pub(crate) curr: &'a Buffer,
pub(crate) next: &'a Buffer,
pub(crate) need_write: Cell<bool>,
}
impl<'a> WriteBuf<'a> {
@ -329,57 +403,85 @@ impl<'a> WriteBuf<'a> {
self.io.want_shutdown()
}
#[inline]
/// Make sure buffer has enough free space
pub fn resize_buf(&self, buf: &mut BytesVec) {
self.io.memory_pool().resize_write_buf(buf);
}
#[inline]
/// Get reference to source write buffer
pub fn get_src(&mut self) -> &mut BytesVec {
if self.curr.1.is_none() {
self.curr.1 = Some(self.io.memory_pool().get_write_buf());
pub fn with_src<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Option<BytesVec>) -> R,
{
let mut item = self.curr.1.take();
let result = f(&mut item);
if let Some(b) = item {
if b.is_empty() {
self.io.memory_pool().release_write_buf(b);
} else {
self.curr.1.set(Some(b));
}
}
self.curr.1.as_mut().unwrap()
result
}
#[inline]
/// Get reference to destination write buffer
pub fn with_dst<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesVec) -> R,
{
let mut item = self.next.1.take();
if item.is_none() {
item = Some(self.io.memory_pool().get_write_buf());
}
let buf = item.as_mut().unwrap();
let total = buf.len();
let result = f(buf);
if buf.is_empty() {
self.io.memory_pool().release_write_buf(item.unwrap());
} else {
self.need_write
.set(self.need_write.get() | (total != buf.len()));
self.next.1.set(item);
}
result
}
#[inline]
/// Take source write buffer
pub fn take_src(&mut self) -> Option<BytesVec> {
self.curr
.1
.take()
.and_then(|b| if b.is_empty() { None } else { Some(b) })
pub fn take_src(&self) -> Option<BytesVec> {
self.curr.1.take().and_then(|b| {
if b.is_empty() {
self.io.memory_pool().release_write_buf(b);
None
} else {
Some(b)
}
})
}
#[inline]
/// Set source write buffer
pub fn set_src(&mut self, src: Option<BytesVec>) {
pub fn set_src(&self, src: Option<BytesVec>) {
if let Some(buf) = self.curr.1.take() {
self.io.memory_pool().release_read_buf(buf);
self.io.memory_pool().release_write_buf(buf);
}
if let Some(buf) = src {
if buf.is_empty() {
self.io.memory_pool().release_read_buf(buf);
self.io.memory_pool().release_write_buf(buf);
} else {
self.curr.1 = Some(buf);
self.curr.1.set(Some(buf));
}
}
}
#[inline]
/// Get reference to destination write buffer
pub fn with_dst_buf<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut BytesVec) -> R,
{
if self.next.1.is_none() {
self.next.1 = Some(self.io.memory_pool().get_write_buf());
}
let buf = self.next.1.as_mut().unwrap();
let r = f(buf);
self.need_write |= !buf.is_empty();
r
}
#[inline]
/// Take destination write buffer
pub fn take_dst(&mut self) -> BytesVec {
pub fn take_dst(&self) -> BytesVec {
self.next
.1
.take()
@ -388,7 +490,7 @@ impl<'a> WriteBuf<'a> {
#[inline]
/// Set destination write buffer
pub fn set_dst(&mut self, dst: Option<BytesVec>) {
pub fn set_dst(&self, dst: Option<BytesVec>) {
if let Some(buf) = self.next.1.take() {
self.io.memory_pool().release_write_buf(buf);
}
@ -396,26 +498,26 @@ impl<'a> WriteBuf<'a> {
if dst.is_empty() {
self.io.memory_pool().release_write_buf(dst);
} else {
self.need_write |= !dst.is_empty();
self.next.1 = Some(dst);
self.need_write.set(self.need_write.get() | !dst.is_empty());
self.next.1.set(Some(dst));
}
}
}
#[inline]
pub fn with_read_buf<'b, F, R>(&'b mut self, f: F) -> R
pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
where
F: FnOnce(&mut ReadBuf<'b>) -> R,
F: FnOnce(&ReadBuf<'b>) -> R,
{
let mut buf = ReadBuf {
io: self.io,
curr: self.curr,
next: self.next,
nbytes: 0,
need_write: self.need_write,
need_write: Cell::new(self.need_write.get()),
};
let result = f(&mut buf);
self.need_write = buf.need_write;
self.need_write.set(buf.need_write.get());
result
}
}

View file

@ -41,21 +41,16 @@ pub trait Filter: 'static {
fn process_read_buf(
&self,
io: &IoRef,
stack: &mut Stack,
stack: &Stack,
idx: usize,
nbytes: usize,
) -> io::Result<FilterReadStatus>;
/// Process write buffer
fn process_write_buf(
&self,
io: &IoRef,
stack: &mut Stack,
idx: usize,
) -> io::Result<()>;
fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()>;
/// Gracefully shutdown filter
fn shutdown(&self, io: &IoRef, stack: &mut Stack, idx: usize) -> io::Result<Poll<()>>;
fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>>;
/// Check readiness for read operations
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus>;
@ -115,7 +110,7 @@ impl Filter for Base {
fn process_read_buf(
&self,
_: &IoRef,
_: &mut Stack,
_: &Stack,
_: usize,
nbytes: usize,
) -> io::Result<FilterReadStatus> {
@ -126,22 +121,24 @@ impl Filter for Base {
}
#[inline]
fn process_write_buf(&self, _: &IoRef, s: &mut Stack, _: usize) -> io::Result<()> {
if let Some(buf) = s.last_write_buf() {
let len = buf.len();
if len > 0 && self.0.flags().contains(Flags::WR_PAUSED) {
self.0 .0.remove_flags(Flags::WR_PAUSED);
self.0 .0.write_task.wake();
fn process_write_buf(&self, io: &IoRef, s: &Stack, _: usize) -> io::Result<()> {
s.with_write_destination(io, |buf| {
if let Some(buf) = buf {
let len = buf.len();
if len > 0 && self.0.flags().contains(Flags::WR_PAUSED) {
self.0 .0.remove_flags(Flags::WR_PAUSED);
self.0 .0.write_task.wake();
}
if len >= self.0.memory_pool().write_params_high() {
self.0 .0.insert_flags(Flags::WR_BACKPRESSURE);
}
}
if len >= self.0.memory_pool().write_params_high() {
self.0 .0.insert_flags(Flags::WR_BACKPRESSURE);
}
}
});
Ok(())
}
#[inline]
fn shutdown(&self, _: &IoRef, _: &mut Stack, _: usize) -> io::Result<Poll<()>> {
fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
Ok(Poll::Ready(()))
}
}
@ -157,7 +154,7 @@ where
}
#[inline]
fn shutdown(&self, io: &IoRef, stack: &mut Stack, idx: usize) -> io::Result<Poll<()>> {
fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>> {
let result1 = stack.write_buf(io, idx, |buf| self.0.shutdown(buf))?;
self.process_write_buf(io, stack, idx)?;
@ -178,7 +175,7 @@ where
fn process_read_buf(
&self,
io: &IoRef,
stack: &mut Stack,
stack: &Stack,
idx: usize,
nbytes: usize,
) -> io::Result<FilterReadStatus> {
@ -190,18 +187,13 @@ where
stack.read_buf(io, idx, status.nbytes, |buf| {
self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus {
nbytes,
need_write: status.need_write || buf.need_write,
need_write: status.need_write || buf.need_write.get(),
})
})
}
#[inline]
fn process_write_buf(
&self,
io: &IoRef,
stack: &mut Stack,
idx: usize,
) -> io::Result<()> {
fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()> {
stack.write_buf(io, idx, |buf| self.0.process_write_buf(buf))?;
if F::BUFFERS {
@ -270,7 +262,7 @@ impl Filter for NullFilter {
fn process_read_buf(
&self,
_: &IoRef,
_: &mut Stack,
_: &Stack,
_: usize,
_: usize,
) -> io::Result<FilterReadStatus> {
@ -278,12 +270,12 @@ impl Filter for NullFilter {
}
#[inline]
fn process_write_buf(&self, _: &IoRef, _: &mut Stack, _: usize) -> io::Result<()> {
fn process_write_buf(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<()> {
Ok(())
}
#[inline]
fn shutdown(&self, _: &IoRef, _: &mut Stack, _: usize) -> io::Result<Poll<()>> {
fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
Ok(Poll::Ready(()))
}
}

View file

@ -1,4 +1,4 @@
use std::cell::{Cell, RefCell};
use std::cell::Cell;
use std::task::{Context, Poll};
use std::{fmt, future::Future, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc, time};
@ -62,7 +62,7 @@ pub(crate) struct IoState {
pub(super) read_task: LocalWaker,
pub(super) write_task: LocalWaker,
pub(super) dispatch_task: LocalWaker,
pub(super) buffer: RefCell<Stack>,
pub(super) buffer: Stack,
pub(super) filter: Cell<&'static dyn Filter>,
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
#[allow(clippy::box_collection)]
@ -149,7 +149,7 @@ impl hash::Hash for IoState {
impl Drop for IoState {
#[inline]
fn drop(&mut self) {
self.buffer.borrow_mut().release(self.pool.get());
self.buffer.release(self.pool.get());
}
}
@ -171,7 +171,7 @@ impl Io {
dispatch_task: LocalWaker::new(),
read_task: LocalWaker::new(),
write_task: LocalWaker::new(),
buffer: RefCell::new(Stack::new()),
buffer: Stack::new(),
filter: Cell::new(NullFilter::get()),
handle: Cell::new(None),
on_disconnect: Cell::new(None),
@ -199,7 +199,7 @@ impl<F> Io<F> {
#[inline]
/// Set memory pool
pub fn set_memory_pool(&self, pool: PoolRef) {
self.0 .0.buffer.borrow_mut().set_memory_pool(pool);
self.0 .0.buffer.set_memory_pool(pool);
self.0 .0.pool.set(pool);
}
@ -227,7 +227,7 @@ impl<F> Io<F> {
dispatch_task: LocalWaker::new(),
read_task: LocalWaker::new(),
write_task: LocalWaker::new(),
buffer: RefCell::new(Stack::new()),
buffer: Stack::new(),
filter: Cell::new(NullFilter::get()),
handle: Cell::new(None),
on_disconnect: Cell::new(None),
@ -292,7 +292,12 @@ impl<F: Filter> Io<F> {
{
// add layer to buffers
if U::BUFFERS {
self.0 .0.buffer.borrow_mut().add_layer();
// Safety: .add_layer() modifies internal buffers
// there is no api that holds references into buffers storage
// all apis first removes buffer from storage and then work with it
unsafe { &mut *(Rc::as_ptr(&self.0 .0) as *mut IoState) }
.buffer
.add_layer();
}
// replace current filter
@ -489,14 +494,7 @@ impl<F> Io<F> {
Poll::Ready(self.error().map(Err).unwrap_or(Ok(())))
} else {
let inner = &self.0 .0;
let len = inner
.buffer
.borrow_mut()
.last_write_buf()
.as_ref()
.map(|b| b.len())
.unwrap_or(0);
let len = inner.buffer.write_destination_size();
if len > 0 {
if full {
inner.insert_flags(Flags::WR_WAIT);

View file

@ -1,9 +1,9 @@
use std::{any, cell, fmt, hash, io, time};
use std::{any, fmt, hash, io, time};
use ntex_bytes::{BytesVec, PoolRef};
use ntex_codec::{Decoder, Encoder};
use super::{buf::Stack, io::Flags, timer, types, Filter, IoRef, OnDisconnect, WriteBuf};
use super::{io::Flags, timer, types, Filter, IoRef, OnDisconnect, WriteBuf};
impl IoRef {
#[inline]
@ -132,11 +132,9 @@ impl IoRef {
where
U: Decoder,
{
borrow_buffer(&self.0.buffer)
.first_read_buf()
.as_mut()
.map(|b| codec.decode_vec(b))
.unwrap_or(Ok(None))
self.0
.buffer
.with_read_destination(self, |buf| codec.decode_vec(buf))
}
#[inline]
@ -152,59 +150,40 @@ impl IoRef {
}
#[inline]
/// Get mut access to write buffer
/// Get access to write buffer
pub fn with_buf<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&mut WriteBuf<'_>) -> R,
F: FnOnce(&WriteBuf<'_>) -> R,
{
let mut buffer = borrow_buffer(&self.0.buffer);
let result = buffer.write_buf(self, 0, f);
let result = self.0.buffer.write_buf(self, 0, f);
self.0
.filter
.get()
.process_write_buf(self, &mut buffer, 0)?;
.process_write_buf(self, &self.0.buffer, 0)?;
Ok(result)
}
#[inline]
/// Get mut access to write buffer
/// Get mut access to source write buffer
pub fn with_write_buf<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&mut BytesVec) -> R,
{
let mut buffer = borrow_buffer(&self.0.buffer);
let result = f(buffer.first_write_buf(self));
let result = self.0.buffer.with_write_source(self, f);
self.0
.filter
.get()
.process_write_buf(self, &mut buffer, 0)?;
.process_write_buf(self, &self.0.buffer, 0)?;
Ok(result)
}
#[inline]
/// Get mut access to read buffer
/// Get mut access to source read buffer
pub fn with_read_buf<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesVec) -> R,
{
// use top most buffer
let mut buffer = borrow_buffer(&self.0.buffer);
let buf = buffer.first_read_buf();
if buf.is_none() {
*buf = Some(self.memory_pool().get_read_buf());
}
f(buf.as_mut().unwrap())
}
#[inline]
/// Get mut access to read and write buffer
pub fn with_rw_buf<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesVec, &mut BytesVec) -> R,
{
borrow_buffer(&self.0.buffer).with_rw_buf(self, f)
self.0.buffer.with_read_destination(self, f)
}
#[inline]
@ -260,14 +239,6 @@ impl fmt::Debug for IoRef {
}
}
fn borrow_buffer(buf: &cell::RefCell<Stack>) -> cell::RefMut<'_, Stack> {
if let Ok(r) = buf.try_borrow_mut() {
r
} else {
panic!("Nested access to read/write buffers are not allowed");
}
}
#[cfg(test)]
mod tests {
use std::cell::{Cell, RefCell};
@ -411,16 +382,16 @@ mod tests {
impl FilterLayer for Counter {
const BUFFERS: bool = false;
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result<usize> {
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
self.read_order.borrow_mut().push(self.idx);
self.in_bytes.set(self.in_bytes.get() + buf.nbytes());
Ok(buf.nbytes())
}
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> {
fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> {
self.write_order.borrow_mut().push(self.idx);
self.out_bytes
.set(self.out_bytes.get() + buf.with_dst_buf(|b| b.len()));
.set(self.out_bytes.get() + buf.with_dst(|b| b.len()));
Ok(())
}
}

View file

@ -71,10 +71,10 @@ pub trait FilterLayer: 'static {
///
/// Inner filter must process buffer before current.
/// Returns number of new bytes.
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> sio::Result<usize>;
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> sio::Result<usize>;
/// Process write buffer
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> sio::Result<()>;
fn process_write_buf(&self, buf: &WriteBuf<'_>) -> sio::Result<()>;
#[inline]
/// Query internal filter data
@ -84,7 +84,7 @@ pub trait FilterLayer: 'static {
#[inline]
/// Gracefully shutdown filter
fn shutdown(&self, buf: &mut WriteBuf<'_>) -> sio::Result<Poll<()>> {
fn shutdown(&self, buf: &WriteBuf<'_>) -> sio::Result<Poll<()>> {
Ok(Poll::Ready(()))
}
}

View file

@ -24,71 +24,59 @@ impl ReadContext {
F: FnOnce(&mut BytesVec, usize, usize) -> Poll<io::Result<()>>,
{
let inner = &self.0 .0;
let mut stack = inner.buffer.borrow_mut();
let mut buf = stack
.last_read_buf()
.take()
.unwrap_or_else(|| self.0.memory_pool().get_read_buf());
let total = buf.len();
let (hw, lw) = self.0.memory_pool().read_params().unpack();
let (result, nbytes, total) = inner.buffer.with_read_source(&self.0, |buf| {
let total = buf.len();
// call provided callback
let result = f(&mut buf, hw, lw);
// handle buffer changes
if buf.is_empty() {
self.0.memory_pool().release_read_buf(buf);
} else {
// call provided callback
let result = f(buf, hw, lw);
let total2 = buf.len();
let nbytes = if total2 > total { total2 - total } else { 0 };
*stack.last_read_buf() = Some(buf);
(result, nbytes, total2)
});
if nbytes > 0 {
let buf_full = nbytes >= hw;
let filter = self.0.filter();
let _ = filter.process_read_buf(&self.0, &mut stack, 0, nbytes)
.and_then(|status| {
if status.nbytes > 0 {
if buf_full || stack.first_read_buf_size() >= hw {
log::trace!(
"io read buffer is too large {}, enable read back-pressure",
total2
);
inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
} else {
inner.insert_flags(Flags::RD_READY);
}
// handle buffer changes
if nbytes > 0 {
let buf_full = nbytes >= hw;
let filter = self.0.filter();
let _ = filter
.process_read_buf(&self.0, &inner.buffer, 0, nbytes)
.and_then(|status| {
if status.nbytes > 0 {
if buf_full || inner.buffer.read_destination_size() >= hw {
log::trace!(
"new {} bytes available, wakeup dispatcher",
nbytes,
"io read buffer is too large {}, enable read back-pressure",
total
);
inner.dispatch_task.wake();
} else if buf_full {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
// while reading, filter wrote some data
// in that case filters need to process write buffers
// and potentialy wake write task
if status.need_write {
filter.process_write_buf(&self.0, &mut stack, 0)
inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
} else {
Ok(())
inner.insert_flags(Flags::RD_READY);
}
})
.map_err(|err| {
log::trace!("new {} bytes available, wakeup dispatcher", nbytes,);
inner.dispatch_task.wake();
inner.insert_flags(Flags::RD_READY);
inner.io_stopped(Some(err));
});
}
} else if buf_full {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
// while reading, filter wrote some data
// in that case filters need to process write buffers
// and potentialy wake write task
if status.need_write {
filter.process_write_buf(&self.0, &inner.buffer, 0)
} else {
Ok(())
}
})
.map_err(|err| {
inner.dispatch_task.wake();
inner.insert_flags(Flags::RD_READY);
inner.io_stopped(Some(err));
});
}
drop(stack);
match result {
Poll::Ready(Ok(())) => {
@ -135,20 +123,12 @@ impl WriteContext {
F: FnOnce(&mut Option<BytesVec>) -> Poll<io::Result<()>>,
{
let inner = &self.0 .0;
let mut stack = inner.buffer.borrow_mut();
let buf = stack.last_write_buf();
// call provided callback
let result = f(buf);
let mut len = 0;
if let Some(b) = buf {
if b.is_empty() {
inner.pool.get().release_write_buf(buf.take().unwrap());
} else {
len = b.len();
}
}
let (result, len) = inner.buffer.with_write_destination(&self.0, |buf| {
let result = f(buf);
(result, buf.as_ref().map(|b| b.len()).unwrap_or(0))
});
// if write buffer is smaller than high watermark value, turn off back-pressure
let mut flags = inner.flags.get();
@ -191,8 +171,7 @@ fn shutdown_filters(io: &IoRef) {
if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
let filter = io.filter();
let mut buffer = st.buffer.borrow_mut();
match filter.shutdown(io, &mut buffer, 0) {
match filter.shutdown(io, &st.buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
@ -211,7 +190,7 @@ fn shutdown_filters(io: &IoRef) {
st.io_stopped(Some(err));
}
}
if let Err(err) = filter.process_write_buf(io, &mut buffer, 0) {
if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) {
st.io_stopped(Some(err));
}
}

View file

@ -118,11 +118,11 @@ mod tests {
pub(crate) struct TestFilter;
impl FilterLayer for TestFilter {
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result<usize> {
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
Ok(buf.nbytes())
}
fn process_write_buf(&self, _: &mut WriteBuf<'_>) -> io::Result<()> {
fn process_write_buf(&self, _: &WriteBuf<'_>) -> io::Result<()> {
Ok(())
}
}

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.4] - 2023-01-29
* Update buffer api
## [0.2.3] - 2023-01-25
* Fix double buf cleanup

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-tls"
version = "0.2.3"
version = "0.2.4"
authors = ["ntex contributors <team@ntex.rs>"]
description = "An implementation of SSL streams for ntex backed by OpenSSL"
keywords = ["network", "framework", "async", "futures"]
@ -26,7 +26,7 @@ rustls = ["tls_rust"]
[dependencies]
ntex-bytes = "0.1.19"
ntex-io = "0.2.3"
ntex-io = "0.2.7"
ntex-util = "0.2.0"
ntex-service = "1.0.0"
log = "0.4"
@ -39,7 +39,7 @@ tls_openssl = { version="0.10", package = "openssl", optional = true }
tls_rust = { version = "0.20", package = "rustls", optional = true }
[dev-dependencies]
ntex = { version = "0.6.1", features = ["openssl", "rustls", "tokio"] }
ntex = { version = "0.6.3", features = ["openssl", "rustls", "tokio"] }
env_logger = "0.10"
rustls-pemfile = { version = "1.0" }
webpki-roots = { version = "0.22" }

View file

@ -2,7 +2,7 @@
use std::cell::{Cell, RefCell};
use std::{any, cmp, error::Error, io, task::Context, task::Poll};
use ntex_bytes::{BufMut, BytesVec, PoolRef};
use ntex_bytes::{BufMut, BytesVec};
use ntex_io::{types, Filter, FilterFactory, FilterLayer, Io, Layer, ReadBuf, WriteBuf};
use ntex_util::{future::poll_fn, future::BoxFuture, ready, time, time::Millis};
use tls_openssl::ssl::{self, NameType, SslStream};
@ -24,25 +24,22 @@ pub struct PeerCertChain(pub Vec<X509>);
/// An implementation of SSL streams
pub struct SslFilter {
inner: RefCell<SslStream<IoInner>>,
pool: PoolRef,
handshake: Cell<bool>,
}
struct IoInner {
source: Option<BytesVec>,
destination: Option<BytesVec>,
pool: PoolRef,
}
impl io::Read for IoInner {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
if let Some(mut buf) = self.source.take() {
if let Some(ref mut buf) = self.source {
if buf.is_empty() {
Err(io::Error::from(io::ErrorKind::WouldBlock))
} else {
let len = cmp::min(buf.len(), dst.len());
dst[..len].copy_from_slice(&buf.split_to(len));
self.source = Some(buf);
Ok(len)
}
} else {
@ -53,13 +50,7 @@ impl io::Read for IoInner {
impl io::Write for IoInner {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
let mut buf = if let Some(buf) = self.destination.take() {
buf
} else {
BytesVec::with_capacity_in(src.len(), self.pool)
};
buf.extend_from_slice(src);
self.destination = Some(buf);
self.destination.as_mut().unwrap().extend_from_slice(src);
Ok(src.len())
}
@ -69,7 +60,7 @@ impl io::Write for IoInner {
}
impl SslFilter {
fn with_buffers<F, R>(&self, buf: &mut WriteBuf<'_>, f: F) -> R
fn with_buffers<F, R>(&self, buf: &WriteBuf<'_>, f: F) -> R
where
F: FnOnce() -> R,
{
@ -80,16 +71,6 @@ impl SslFilter {
buf.with_read_buf(|b| b.set_src(self.inner.borrow_mut().get_mut().source.take()));
result
}
fn set_buffers(&self, buf: &mut WriteBuf<'_>) {
self.inner.borrow_mut().get_mut().destination = Some(buf.take_dst());
self.inner.borrow_mut().get_mut().source = buf.with_read_buf(|b| b.take_src());
}
fn unset_buffers(&self, buf: &mut WriteBuf<'_>) {
buf.set_dst(self.inner.borrow_mut().get_mut().destination.take());
buf.with_read_buf(|b| b.set_src(self.inner.borrow_mut().get_mut().source.take()));
}
}
impl FilterLayer for SslFilter {
@ -141,7 +122,7 @@ impl FilterLayer for SslFilter {
}
}
fn shutdown(&self, buf: &mut WriteBuf<'_>) -> io::Result<Poll<()>> {
fn shutdown(&self, buf: &WriteBuf<'_>) -> io::Result<Poll<()>> {
let ssl_result = self.with_buffers(buf, || self.inner.borrow_mut().shutdown());
match ssl_result {
@ -160,75 +141,72 @@ impl FilterLayer for SslFilter {
}
}
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result<usize> {
buf.with_write_buf(|b| self.set_buffers(b));
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
buf.with_write_buf(|b| {
self.with_buffers(b, || {
buf.with_dst(|dst| {
let mut new_bytes = usize::from(self.handshake.get());
loop {
buf.resize_buf(dst);
let dst = buf.get_dst();
//let mut new_bytes = usize::from(self.handshake.get());
let mut new_bytes = 1;
loop {
// make sure we've got room
self.pool.resize_read_buf(dst);
let chunk: &mut [u8] = unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let ssl_result = self.inner.borrow_mut().ssl_read(chunk);
let result = match ssl_result {
Ok(v) => {
unsafe { dst.advance_mut(v) };
new_bytes += v;
continue;
}
Err(ref e)
if e.code() == ssl::ErrorCode::WANT_READ
|| e.code() == ssl::ErrorCode::WANT_WRITE =>
{
Ok(new_bytes)
}
Err(ref e) if e.code() == ssl::ErrorCode::ZERO_RETURN => {
buf.want_shutdown();
Ok(new_bytes)
}
Err(e) => {
log::trace!("SSL Error: {:?}", e);
Err(map_to_ioerr(e))
}
};
buf.with_write_buf(|b| self.unset_buffers(b));
return result;
}
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let ssl_result = self.inner.borrow_mut().ssl_read(chunk);
let result = match ssl_result {
Ok(v) => {
unsafe { dst.advance_mut(v) };
new_bytes += v;
continue;
}
Err(ref e)
if e.code() == ssl::ErrorCode::WANT_READ
|| e.code() == ssl::ErrorCode::WANT_WRITE =>
{
Ok(new_bytes)
}
Err(ref e) if e.code() == ssl::ErrorCode::ZERO_RETURN => {
buf.want_shutdown();
Ok(new_bytes)
}
Err(e) => {
log::trace!("SSL Error: {:?}", e);
Err(map_to_ioerr(e))
}
};
return result;
}
})
})
})
}
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> {
if let Some(mut src) = buf.take_src() {
self.set_buffers(buf);
loop {
if src.is_empty() {
self.unset_buffers(buf);
return Ok(());
}
let ssl_result = self.inner.borrow_mut().ssl_write(&src);
match ssl_result {
Ok(v) => {
src.split_to(v);
continue;
fn process_write_buf(&self, wb: &WriteBuf<'_>) -> io::Result<()> {
wb.with_src(|b| {
if let Some(src) = b {
self.with_buffers(wb, || loop {
if src.is_empty() {
return Ok(());
}
Err(e) => {
buf.set_src(Some(src));
self.unset_buffers(buf);
return match e.code() {
ssl::ErrorCode::WANT_READ | ssl::ErrorCode::WANT_WRITE => {
Ok(())
}
_ => Err(map_to_ioerr(e)),
};
let ssl_result = self.inner.borrow_mut().ssl_write(src);
match ssl_result {
Ok(v) => {
src.split_to(v);
continue;
}
Err(e) => {
return match e.code() {
ssl::ErrorCode::WANT_READ | ssl::ErrorCode::WANT_WRITE => {
Ok(())
}
_ => Err(map_to_ioerr(e)),
};
}
}
}
})
} else {
Ok(())
}
} else {
Ok(())
}
})
}
}
@ -278,12 +256,10 @@ impl<F: Filter> FilterFactory<F> for SslAcceptor {
time::timeout(timeout, async {
let ssl = ctx_result.map_err(map_to_ioerr)?;
let inner = IoInner {
pool: io.memory_pool(),
source: None,
destination: None,
};
let filter = SslFilter {
pool: io.memory_pool(),
handshake: Cell::new(true),
inner: RefCell::new(ssl::SslStream::new(ssl, inner)?),
};
@ -336,12 +312,10 @@ impl<F: Filter> FilterFactory<F> for SslConnector {
fn create(self, io: Io<F>) -> Self::Future {
Box::pin(async move {
let inner = IoInner {
pool: io.memory_pool(),
source: None,
destination: None,
};
let filter = SslFilter {
pool: io.memory_pool(),
handshake: Cell::new(true),
inner: RefCell::new(ssl::SslStream::new(self.ssl, inner)?),
};

View file

@ -56,62 +56,60 @@ impl FilterLayer for TlsClientFilter {
}
}
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result<usize> {
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
let mut session = self.session.borrow_mut();
let mut new_bytes = usize::from(self.inner.handshake.get());
// get processed buffer
let (src, dst) = buf.get_pair();
let mut new_bytes = usize::from(self.inner.handshake.get());
loop {
// make sure we've got room
self.inner.pool.resize_read_buf(dst);
buf.with_src(|src| {
if let Some(src) = src {
buf.with_dst(|dst| {
loop {
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
}
}
Ok::<_, io::Error>(())
})?;
}
}
Ok(new_bytes)
Ok(new_bytes)
})
}
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> {
if let Some(mut src) = buf.take_src() {
let mut session = self.session.borrow_mut();
let mut io = Wrapper(&self.inner, buf);
fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> {
buf.with_src(|src| {
if let Some(src) = src {
let mut session = self.session.borrow_mut();
let mut io = Wrapper(&self.inner, buf);
loop {
if !src.is_empty() {
let n = session.writer().write(&src)?;
src.split_to(n);
}
if session.wants_write() {
session.complete_io(&mut io)?;
} else {
break;
loop {
if !src.is_empty() {
src.split_to(session.writer().write(src)?);
}
if session.wants_write() {
session.complete_io(&mut io)?;
} else {
break;
}
}
}
buf.set_src(Some(src));
Ok(())
} else {
Ok(())
}
})
}
}
@ -125,7 +123,6 @@ impl TlsClientFilter {
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let filter = TlsFilter::new_client(TlsClientFilter {
inner: IoInner {
pool: io.memory_pool(),
handshake: Cell::new(true),
},
session: RefCell::new(session),

View file

@ -2,7 +2,6 @@
//! An implementation of SSL streams for ntex backed by OpenSSL
use std::{any, cell::Cell, cmp, io, sync::Arc, task::Context, task::Poll};
use ntex_bytes::PoolRef;
use ntex_io::{
Filter, FilterFactory, FilterLayer, Io, Layer, ReadBuf, ReadStatus, WriteBuf,
WriteStatus,
@ -71,7 +70,7 @@ impl FilterLayer for TlsFilter {
}
#[inline]
fn shutdown(&self, buf: &mut WriteBuf<'_>) -> io::Result<Poll<()>> {
fn shutdown(&self, buf: &WriteBuf<'_>) -> io::Result<Poll<()>> {
match self.inner {
InnerTlsFilter::Server(ref f) => f.shutdown(buf),
InnerTlsFilter::Client(ref f) => f.shutdown(buf),
@ -95,7 +94,7 @@ impl FilterLayer for TlsFilter {
}
#[inline]
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result<usize> {
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
match self.inner {
InnerTlsFilter::Server(ref f) => f.process_read_buf(buf),
InnerTlsFilter::Client(ref f) => f.process_read_buf(buf),
@ -103,7 +102,7 @@ impl FilterLayer for TlsFilter {
}
#[inline]
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> {
fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> {
match self.inner {
InnerTlsFilter::Server(ref f) => f.process_write_buf(buf),
InnerTlsFilter::Client(ref f) => f.process_write_buf(buf),
@ -219,30 +218,31 @@ impl<F: Filter> FilterFactory<F> for TlsConnectorConfigured {
}
pub(crate) struct IoInner {
pool: PoolRef,
handshake: Cell<bool>,
}
pub(crate) struct Wrapper<'a, 'b>(&'a IoInner, &'a mut WriteBuf<'b>);
pub(crate) struct Wrapper<'a, 'b>(&'a IoInner, &'a WriteBuf<'b>);
impl<'a, 'b> io::Read for Wrapper<'a, 'b> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.1.with_read_buf(|buf| {
let read_buf = buf.get_src();
let len = cmp::min(read_buf.len(), dst.len());
if len > 0 {
dst[..len].copy_from_slice(&read_buf.split_to(len));
Ok(len)
} else {
buf.with_src(|buf| {
if let Some(buf) = buf {
let len = cmp::min(buf.len(), dst.len());
if len > 0 {
dst[..len].copy_from_slice(&buf.split_to(len));
return Ok(len);
}
}
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
})
})
}
}
impl<'a, 'b> io::Write for Wrapper<'a, 'b> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.1.with_dst_buf(|buf| buf.extend_from_slice(src));
self.1.with_dst(|buf| buf.extend_from_slice(src));
Ok(src.len())
}

View file

@ -63,60 +63,60 @@ impl FilterLayer for TlsServerFilter {
}
}
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result<usize> {
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
let mut session = self.session.borrow_mut();
let mut new_bytes = usize::from(self.inner.handshake.get());
// get processed buffer
let (src, dst) = buf.get_pair();
let mut new_bytes = usize::from(self.inner.handshake.get());
loop {
// make sure we've got room
self.inner.pool.resize_read_buf(dst);
buf.with_src(|src| {
if let Some(src) = src {
buf.with_dst(|dst| {
loop {
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
}
}
Ok::<_, io::Error>(())
})?;
}
}
Ok(new_bytes)
Ok(new_bytes)
})
}
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> {
if let Some(mut src) = buf.take_src() {
let mut session = self.session.borrow_mut();
let mut io = Wrapper(&self.inner, buf);
fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> {
buf.with_src(|src| {
if let Some(src) = src {
let mut session = self.session.borrow_mut();
let mut io = Wrapper(&self.inner, buf);
loop {
if !src.is_empty() {
let n = session.writer().write(&src)?;
src.split_to(n);
}
if session.wants_write() {
session.complete_io(&mut io)?;
} else {
break;
loop {
if !src.is_empty() {
src.split_to(session.writer().write(src)?);
}
if session.wants_write() {
session.complete_io(&mut io)?;
} else {
break;
}
}
}
buf.set_src(Some(src));
}
Ok(())
Ok(())
})
}
}
@ -132,7 +132,6 @@ impl TlsServerFilter {
let filter = TlsFilter::new_server(TlsServerFilter {
session: RefCell::new(session),
inner: IoInner {
pool: io.memory_pool(),
handshake: Cell::new(true),
},
});

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.6.2"
version = "0.6.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -58,8 +58,8 @@ ntex-util = "0.2.0"
ntex-bytes = "0.1.19"
ntex-h2 = "0.2.1"
ntex-rt = "0.4.7"
ntex-io = "0.2.3"
ntex-tls = "0.2.3"
ntex-io = "0.2.7"
ntex-tls = "0.2.4"
ntex-tokio = { version = "0.2.1", optional = true }
ntex-glommio = { version = "0.2.1", optional = true }
ntex-async-std = { version = "0.2.1", optional = true }

View file

@ -684,8 +684,7 @@ where
// read request payload
let mut updated = false;
loop {
let res = io.poll_recv(&payload.0, cx);
match res {
match io.poll_recv(&payload.0, cx) {
Poll::Ready(Ok(PayloadItem::Chunk(chunk))) => {
updated = true;
payload.1.feed_data(chunk);
@ -945,6 +944,7 @@ mod tests {
#[crate::rt_test]
async fn test_pipeline_with_payload() {
env_logger::init();
let (client, server) = Io::create();
client.remote_buffer_cap(4096);
let mut decoder = ClientCodec::default();

View file

@ -58,7 +58,7 @@ impl WsTransport {
impl FilterLayer for WsTransport {
#[inline]
fn shutdown(&self, buf: &mut WriteBuf<'_>) -> io::Result<Poll<()>> {
fn shutdown(&self, buf: &WriteBuf<'_>) -> io::Result<Poll<()>> {
let flags = self.flags.get();
if !flags.contains(Flags::CLOSED) {
self.insert_flags(Flags::CLOSED);
@ -67,7 +67,7 @@ impl FilterLayer for WsTransport {
} else {
CloseCode::Normal
};
let _ = buf.with_dst_buf(|buf| {
let _ = buf.with_dst(|buf| {
self.codec.encode_vec(
Message::Close(Some(CloseReason {
code,
@ -80,7 +80,7 @@ impl FilterLayer for WsTransport {
Ok(Poll::Ready(()))
}
fn process_read_buf(&self, buf: &mut ReadBuf<'_>) -> io::Result<usize> {
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
if let Some(mut src) = buf.take_src() {
let mut dst = buf.take_dst();
let dst_len = dst.len();
@ -133,7 +133,7 @@ impl FilterLayer for WsTransport {
}
Frame::Ping(msg) => {
let _ = buf.with_write_buf(|b| {
b.with_dst_buf(|b| self.codec.encode_vec(Message::Pong(msg), b))
b.with_dst(|b| self.codec.encode_vec(Message::Pong(msg), b))
});
}
Frame::Pong(_) => (),
@ -153,9 +153,9 @@ impl FilterLayer for WsTransport {
}
}
fn process_write_buf(&self, buf: &mut WriteBuf<'_>) -> io::Result<()> {
fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> {
if let Some(src) = buf.take_src() {
buf.with_dst_buf(|dst| {
buf.with_dst(|dst| {
// make sure we've got room
let (hw, lw) = self.pool.write_params().unpack();
let remaining = dst.remaining_mut();