Refactor Filter trait, fix read buffer processing

This commit is contained in:
Nikolay Kim 2022-01-12 22:07:10 +06:00
parent fcf7022059
commit 1df005f53f
6 changed files with 55 additions and 64 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.3] - 2022-01-12
* Refactor Filter trait, fix read buffer processing
## [0.1.2] - 2022-01-10
* Remove unneeded boxed types

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.1.2"
version = "0.1.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -18,7 +18,7 @@ path = "src/lib.rs"
[dependencies]
ntex-codec = "0.6.0"
ntex-bytes = "0.1.9"
ntex-util = "0.1.8"
ntex-util = "0.1.9"
ntex-service = "0.3.1"
bitflags = "1.3"

View file

@ -78,19 +78,16 @@ impl Filter for Base {
}
#[inline]
fn release_read_buf(
&self,
_: &IoRef,
buf: BytesMut,
dst: &mut Option<BytesMut>,
nbytes: usize,
) -> io::Result<usize> {
if let Some(ref mut dst) = dst {
dst.extend_from_slice(&buf)
} else {
*dst = Some(buf)
}
Ok(nbytes)
fn release_read_buf(&self, buf: BytesMut) {
self.0 .0.read_buf.set(Some(buf));
}
#[inline]
fn process_read_buf(&self, _: &IoRef, n: usize) -> io::Result<(usize, usize)> {
let buf = self.0 .0.read_buf.as_ptr();
let ref_buf = unsafe { buf.as_ref().unwrap() };
let total = ref_buf.as_ref().map(|v| v.len()).unwrap_or(0);
Ok((total, n))
}
#[inline]
@ -144,14 +141,10 @@ impl Filter for NullFilter {
None
}
fn release_read_buf(
&self,
_: &IoRef,
_: BytesMut,
_: &mut Option<BytesMut>,
_: usize,
) -> io::Result<usize> {
Ok(0)
fn release_read_buf(&self, _: BytesMut) {}
fn process_read_buf(&self, _: &IoRef, _: usize) -> io::Result<(usize, usize)> {
Ok((0, 0))
}
fn release_write_buf(&self, _: BytesMut) -> Result<(), io::Error> {

View file

@ -177,18 +177,19 @@ impl IoState {
where
Fn: FnOnce(&mut Option<BytesMut>) -> Ret,
{
let buf = self.read_buf.as_ptr();
let ref_buf = unsafe { buf.as_mut().unwrap() };
let result = f(ref_buf);
let filter = self.filter.get();
let mut buf = filter.get_read_buf();
let result = f(&mut buf);
// release buffer
if release {
if let Some(ref buf) = ref_buf {
if let Some(buf) = buf {
if release {
// release buffer
if buf.is_empty() {
let buf = mem::take(ref_buf).unwrap();
self.pool.get().release_read_buf(buf);
return result;
}
}
filter.release_read_buf(buf);
}
result
}

View file

@ -55,15 +55,14 @@ pub trait Filter: 'static {
fn get_read_buf(&self) -> Option<BytesMut>;
fn get_write_buf(&self) -> Option<BytesMut>;
fn release_read_buf(&self, buf: BytesMut);
fn release_read_buf(
&self,
io: &IoRef,
src: BytesMut,
dst: &mut Option<BytesMut>,
nbytes: usize,
) -> sio::Result<usize>;
/// Process read buffer
///
/// Returns tuple (total bytes, new bytes)
fn process_read_buf(&self, io: &IoRef, n: usize) -> sio::Result<(usize, usize)>;
fn get_write_buf(&self) -> Option<BytesMut>;
fn release_write_buf(&self, buf: BytesMut) -> sio::Result<()>;

View file

@ -28,8 +28,9 @@ impl ReadContext {
/// Get read buffer
pub fn get_read_buf(&self) -> BytesMut {
self.0
.filter()
.get_read_buf()
.0
.read_buf
.take()
.unwrap_or_else(|| self.0.memory_pool().get_read_buf())
}
@ -39,35 +40,28 @@ impl ReadContext {
if buf.is_empty() {
self.0.memory_pool().release_read_buf(buf);
} else {
self.0 .0.read_buf.set(Some(buf));
let filter = self.0.filter();
let mut dst = self.0 .0.read_buf.take();
let result = filter.release_read_buf(&self.0, buf, &mut dst, nbytes);
let nbytes = result.as_ref().map(|i| *i).unwrap_or(0);
if let Some(dst) = dst {
if nbytes > 0 {
if dst.len() > self.0.memory_pool().read_params().high as usize {
log::trace!(
"buffer is too large {}, enable read back-pressure",
dst.len()
);
self.0 .0.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
} else {
match filter.process_read_buf(&self.0, nbytes) {
Ok((total, nbytes)) => {
if nbytes > 0 {
if total > self.0.memory_pool().read_params().high as usize {
log::trace!(
"buffer is too large {}, enable read back-pressure",
total
);
self.0 .0.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
}
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
log::trace!("new {} bytes available, wakeup dispatcher", nbytes);
}
self.0 .0.dispatch_task.wake();
}
self.0 .0.read_buf.set(Some(dst));
} else if nbytes > 0 {
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
}
if let Err(err) = result {
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
self.0.want_shutdown(Some(err));
Err(err) => {
self.0 .0.dispatch_task.wake();
self.0 .0.insert_flags(Flags::RD_READY);
self.0.want_shutdown(Some(err));
}
}
}