diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 9677d725..a85fd21c 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.3] - 2022-01-12 + +* Refactor Filter trait, fix read buffer processing + ## [0.1.2] - 2022-01-10 * Remove unneeded boxed types diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 896bd276..6e801971 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.1.2" +version = "0.1.3" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.0" ntex-bytes = "0.1.9" -ntex-util = "0.1.8" +ntex-util = "0.1.9" ntex-service = "0.3.1" bitflags = "1.3" diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index 3b729193..f2084a0f 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -78,19 +78,16 @@ impl Filter for Base { } #[inline] - fn release_read_buf( - &self, - _: &IoRef, - buf: BytesMut, - dst: &mut Option, - nbytes: usize, - ) -> io::Result { - if let Some(ref mut dst) = dst { - dst.extend_from_slice(&buf) - } else { - *dst = Some(buf) - } - Ok(nbytes) + fn release_read_buf(&self, buf: BytesMut) { + self.0 .0.read_buf.set(Some(buf)); + } + + #[inline] + fn process_read_buf(&self, _: &IoRef, n: usize) -> io::Result<(usize, usize)> { + let buf = self.0 .0.read_buf.as_ptr(); + let ref_buf = unsafe { buf.as_ref().unwrap() }; + let total = ref_buf.as_ref().map(|v| v.len()).unwrap_or(0); + Ok((total, n)) } #[inline] @@ -144,14 +141,10 @@ impl Filter for NullFilter { None } - fn release_read_buf( - &self, - _: &IoRef, - _: BytesMut, - _: &mut Option, - _: usize, - ) -> io::Result { - Ok(0) + fn release_read_buf(&self, _: BytesMut) {} + + fn process_read_buf(&self, _: &IoRef, _: usize) -> io::Result<(usize, usize)> { + Ok((0, 0)) } fn release_write_buf(&self, _: BytesMut) -> Result<(), io::Error> { diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 6a65f42c..f63dac1a 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -177,18 +177,19 @@ impl IoState { where Fn: FnOnce(&mut Option) -> Ret, { - let buf = self.read_buf.as_ptr(); - let ref_buf = unsafe { buf.as_mut().unwrap() }; - let result = f(ref_buf); + let filter = self.filter.get(); + let mut buf = filter.get_read_buf(); + let result = f(&mut buf); - // release buffer - if release { - if let Some(ref buf) = ref_buf { + if let Some(buf) = buf { + if release { + // release buffer if buf.is_empty() { - let buf = mem::take(ref_buf).unwrap(); self.pool.get().release_read_buf(buf); + return result; } } + filter.release_read_buf(buf); } result } diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 556d50bb..62c9ba75 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -55,15 +55,14 @@ pub trait Filter: 'static { fn get_read_buf(&self) -> Option; - fn get_write_buf(&self) -> Option; + fn release_read_buf(&self, buf: BytesMut); - fn release_read_buf( - &self, - io: &IoRef, - src: BytesMut, - dst: &mut Option, - nbytes: usize, - ) -> sio::Result; + /// Process read buffer + /// + /// Returns tuple (total bytes, new bytes) + fn process_read_buf(&self, io: &IoRef, n: usize) -> sio::Result<(usize, usize)>; + + fn get_write_buf(&self) -> Option; fn release_write_buf(&self, buf: BytesMut) -> sio::Result<()>; diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 3855acaf..47569421 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -28,8 +28,9 @@ impl ReadContext { /// Get read buffer pub fn get_read_buf(&self) -> BytesMut { self.0 - .filter() - .get_read_buf() + .0 + .read_buf + .take() .unwrap_or_else(|| self.0.memory_pool().get_read_buf()) } @@ -39,35 +40,28 @@ impl ReadContext { if buf.is_empty() { self.0.memory_pool().release_read_buf(buf); } else { + self.0 .0.read_buf.set(Some(buf)); let filter = self.0.filter(); - let mut dst = self.0 .0.read_buf.take(); - let result = filter.release_read_buf(&self.0, buf, &mut dst, nbytes); - let nbytes = result.as_ref().map(|i| *i).unwrap_or(0); - - if let Some(dst) = dst { - if nbytes > 0 { - if dst.len() > self.0.memory_pool().read_params().high as usize { - log::trace!( - "buffer is too large {}, enable read back-pressure", - dst.len() - ); - self.0 .0.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); - } else { + match filter.process_read_buf(&self.0, nbytes) { + Ok((total, nbytes)) => { + if nbytes > 0 { + if total > self.0.memory_pool().read_params().high as usize { + log::trace!( + "buffer is too large {}, enable read back-pressure", + total + ); + self.0 .0.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL); + } + self.0 .0.dispatch_task.wake(); self.0 .0.insert_flags(Flags::RD_READY); log::trace!("new {} bytes available, wakeup dispatcher", nbytes); } - self.0 .0.dispatch_task.wake(); } - self.0 .0.read_buf.set(Some(dst)); - } else if nbytes > 0 { - self.0 .0.dispatch_task.wake(); - self.0 .0.insert_flags(Flags::RD_READY); - } - - if let Err(err) = result { - self.0 .0.dispatch_task.wake(); - self.0 .0.insert_flags(Flags::RD_READY); - self.0.want_shutdown(Some(err)); + Err(err) => { + self.0 .0.dispatch_task.wake(); + self.0 .0.insert_flags(Flags::RD_READY); + self.0.want_shutdown(Some(err)); + } } }