Release empty buffers (#166)

* Release empty buffers
This commit is contained in:
Nikolay Kim 2023-01-25 23:38:00 +06:00 committed by GitHub
parent 0d5cd049b1
commit 63881e40f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 44 additions and 46 deletions

View file

@ -1,9 +1,11 @@
# Changes # Changes
## [0.2.3] - 2023-01-xx ## [0.2.3] - 2023-01-25
* Optimize buffers layout * Optimize buffers layout
* Release empty buffers
## [0.2.2] - 2023-01-24 ## [0.2.2] - 2023-01-24
* Process write buffer if filter wrote to write buffer during reading * Process write buffer if filter wrote to write buffer during reading

View file

@ -3,13 +3,12 @@ use ntex_util::future::Either;
use crate::IoRef; use crate::IoRef;
type CacheLine = (Option<BytesVec>, Option<BytesVec>);
#[derive(Debug)] #[derive(Debug)]
pub struct Stack { pub struct Stack {
len: usize, len: usize,
buffers: Either< buffers: Either<[CacheLine; 3], Vec<CacheLine>>,
[(Option<BytesVec>, Option<BytesVec>); 3],
Vec<(Option<BytesVec>, Option<BytesVec>)>,
>,
} }
impl Stack { impl Stack {
@ -26,8 +25,8 @@ impl Stack {
if self.len == 3 { if self.len == 3 {
// move to vec // move to vec
let mut vec = vec![(None, None)]; let mut vec = vec![(None, None)];
for idx in 0..self.len { for item in b.iter_mut().take(self.len) {
vec.push((b[idx].0.take(), b[idx].1.take())); vec.push((item.0.take(), item.1.take()));
} }
self.len += 1; self.len += 1;
self.buffers = Either::Right(vec); self.buffers = Either::Right(vec);
@ -51,10 +50,7 @@ impl Stack {
fn get_buffers<F, R>(&mut self, idx: usize, f: F) -> R fn get_buffers<F, R>(&mut self, idx: usize, f: F) -> R
where where
F: FnOnce( F: FnOnce(&mut CacheLine, &mut CacheLine) -> R,
&mut (Option<BytesVec>, Option<BytesVec>),
&mut (Option<BytesVec>, Option<BytesVec>),
) -> R,
{ {
let buffers = match self.buffers { let buffers = match self.buffers {
Either::Left(ref mut b) => &mut b[..], Either::Left(ref mut b) => &mut b[..],
@ -76,14 +72,14 @@ impl Stack {
} }
} }
fn get_first_level(&mut self) -> &mut (Option<BytesVec>, Option<BytesVec>) { fn get_first_level(&mut self) -> &mut CacheLine {
match &mut self.buffers { match &mut self.buffers {
Either::Left(b) => &mut b[0], Either::Left(b) => &mut b[0],
Either::Right(b) => &mut b[0], Either::Right(b) => &mut b[0],
} }
} }
fn get_last_level(&mut self) -> &mut (Option<BytesVec>, Option<BytesVec>) { fn get_last_level(&mut self) -> &mut CacheLine {
match &mut self.buffers { match &mut self.buffers {
Either::Left(b) => &mut b[self.len - 1], Either::Left(b) => &mut b[self.len - 1],
Either::Right(b) => &mut b[self.len - 1], Either::Right(b) => &mut b[self.len - 1],
@ -164,7 +160,7 @@ impl Stack {
} }
pub(crate) fn set_last_write_buf(&mut self, buf: BytesVec) { pub(crate) fn set_last_write_buf(&mut self, buf: BytesVec) {
*(&mut self.get_last_level().1) = Some(buf); self.get_last_level().1 = Some(buf);
} }
pub(crate) fn release(&mut self, pool: PoolRef) { pub(crate) fn release(&mut self, pool: PoolRef) {
@ -202,8 +198,8 @@ impl Stack {
#[derive(Debug)] #[derive(Debug)]
pub struct ReadBuf<'a> { pub struct ReadBuf<'a> {
pub(crate) io: &'a IoRef, pub(crate) io: &'a IoRef,
pub(crate) curr: &'a mut (Option<BytesVec>, Option<BytesVec>), pub(crate) curr: &'a mut CacheLine,
pub(crate) next: &'a mut (Option<BytesVec>, Option<BytesVec>), pub(crate) next: &'a mut CacheLine,
pub(crate) nbytes: usize, pub(crate) nbytes: usize,
pub(crate) need_write: bool, pub(crate) need_write: bool,
} }
@ -242,13 +238,13 @@ impl<'a> ReadBuf<'a> {
#[inline] #[inline]
/// Set source read buffer /// Set source read buffer
pub fn set_src(&mut self, src: Option<BytesVec>) { pub fn set_src(&mut self, src: Option<BytesVec>) {
if let Some(buf) = self.next.0.take() {
self.io.memory_pool().release_read_buf(buf);
}
if let Some(src) = src { if let Some(src) = src {
if src.is_empty() { if src.is_empty() {
self.io.memory_pool().release_read_buf(src); self.io.memory_pool().release_read_buf(src);
} else { } else {
if let Some(b) = self.next.0.take() {
self.io.memory_pool().release_read_buf(b);
}
self.next.0 = Some(src); self.next.0 = Some(src);
} }
} }
@ -275,13 +271,13 @@ impl<'a> ReadBuf<'a> {
#[inline] #[inline]
/// Set destination read buffer /// Set destination read buffer
pub fn set_dst(&mut self, dst: Option<BytesVec>) { pub fn set_dst(&mut self, dst: Option<BytesVec>) {
if let Some(buf) = self.curr.0.take() {
self.io.memory_pool().release_read_buf(buf);
}
if let Some(dst) = dst { if let Some(dst) = dst {
if dst.is_empty() { if dst.is_empty() {
self.io.memory_pool().release_read_buf(dst); self.io.memory_pool().release_read_buf(dst);
} else { } else {
if let Some(b) = self.curr.0.take() {
self.io.memory_pool().release_read_buf(b);
}
self.curr.0 = Some(dst); self.curr.0 = Some(dst);
} }
} }
@ -319,8 +315,8 @@ impl<'a> ReadBuf<'a> {
#[derive(Debug)] #[derive(Debug)]
pub struct WriteBuf<'a> { pub struct WriteBuf<'a> {
pub(crate) io: &'a IoRef, pub(crate) io: &'a IoRef,
pub(crate) curr: &'a mut (Option<BytesVec>, Option<BytesVec>), pub(crate) curr: &'a mut CacheLine,
pub(crate) next: &'a mut (Option<BytesVec>, Option<BytesVec>), pub(crate) next: &'a mut CacheLine,
pub(crate) need_write: bool, pub(crate) need_write: bool,
} }
@ -352,10 +348,16 @@ impl<'a> WriteBuf<'a> {
#[inline] #[inline]
/// Set source write buffer /// Set source write buffer
pub fn set_src(&mut self, src: Option<BytesVec>) { pub fn set_src(&mut self, src: Option<BytesVec>) {
if let Some(b) = self.curr.1.take() { if let Some(buf) = self.curr.1.take() {
self.io.memory_pool().release_read_buf(b); self.io.memory_pool().release_read_buf(buf);
}
if let Some(buf) = src {
if buf.is_empty() {
self.io.memory_pool().release_read_buf(buf);
} else {
self.curr.1 = Some(buf);
}
} }
self.curr.1 = src;
} }
#[inline] #[inline]
@ -385,13 +387,13 @@ impl<'a> WriteBuf<'a> {
#[inline] #[inline]
/// Set destination write buffer /// Set destination write buffer
pub fn set_dst(&mut self, dst: Option<BytesVec>) { pub fn set_dst(&mut self, dst: Option<BytesVec>) {
if let Some(buf) = self.next.1.take() {
self.io.memory_pool().release_write_buf(buf);
}
if let Some(dst) = dst { if let Some(dst) = dst {
if dst.is_empty() { if dst.is_empty() {
self.io.memory_pool().release_write_buf(dst); self.io.memory_pool().release_write_buf(dst);
} else { } else {
if let Some(b) = self.next.1.take() {
self.io.memory_pool().release_write_buf(b);
}
self.need_write |= !dst.is_empty(); self.need_write |= !dst.is_empty();
self.next.1 = Some(dst); self.next.1 = Some(dst);
} }

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.2.3] - 2023-01-25
* Fix double buf cleanup
## [0.2.2] - 2023-01-24 ## [0.2.2] - 2023-01-24
* Update ntex-io to 0.2.2 * Update ntex-io to 0.2.2

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-tls" name = "ntex-tls"
version = "0.2.2" version = "0.2.3"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "An implementation of SSL streams for ntex backed by OpenSSL" description = "An implementation of SSL streams for ntex backed by OpenSSL"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -53,8 +53,7 @@ impl io::Read for IoInner {
impl io::Write for IoInner { impl io::Write for IoInner {
fn write(&mut self, src: &[u8]) -> io::Result<usize> { fn write(&mut self, src: &[u8]) -> io::Result<usize> {
let mut buf = if let Some(mut buf) = self.destination.take() { let mut buf = if let Some(buf) = self.destination.take() {
buf.reserve(src.len());
buf buf
} else { } else {
BytesVec::with_capacity_in(src.len(), self.pool) BytesVec::with_capacity_in(src.len(), self.pool)
@ -216,15 +215,10 @@ impl FilterLayer for SslFilter {
continue; continue;
} }
Err(e) => { Err(e) => {
if !src.is_empty() { buf.set_src(Some(src));
buf.set_src(Some(src));
}
self.unset_buffers(buf); self.unset_buffers(buf);
return match e.code() { return match e.code() {
ssl::ErrorCode::WANT_READ | ssl::ErrorCode::WANT_WRITE => { ssl::ErrorCode::WANT_READ | ssl::ErrorCode::WANT_WRITE => {
buf.set_dst(
self.inner.borrow_mut().get_mut().destination.take(),
);
Ok(()) Ok(())
} }
_ => Err(map_to_ioerr(e)), _ => Err(map_to_ioerr(e)),

View file

@ -107,9 +107,7 @@ impl FilterLayer for TlsClientFilter {
} }
} }
if !src.is_empty() { buf.set_src(Some(src));
buf.set_src(Some(src));
}
Ok(()) Ok(())
} else { } else {
Ok(()) Ok(())

View file

@ -114,9 +114,7 @@ impl FilterLayer for TlsServerFilter {
} }
} }
if !src.is_empty() { buf.set_src(Some(src));
buf.set_src(Some(src));
}
} }
Ok(()) Ok(())
} }

View file

@ -59,7 +59,7 @@ ntex-bytes = "0.1.19"
ntex-h2 = "0.2.1" ntex-h2 = "0.2.1"
ntex-rt = "0.4.7" ntex-rt = "0.4.7"
ntex-io = "0.2.3" ntex-io = "0.2.3"
ntex-tls = "0.2.2" ntex-tls = "0.2.3"
ntex-tokio = { version = "0.2.1", optional = true } ntex-tokio = { version = "0.2.1", optional = true }
ntex-glommio = { version = "0.2.1", optional = true } ntex-glommio = { version = "0.2.1", optional = true }
ntex-async-std = { version = "0.2.1", optional = true } ntex-async-std = { version = "0.2.1", optional = true }