diff --git a/ntex-async-std/CHANGES.md b/ntex-async-std/CHANGES.md index 38a43584..7fc05c74 100644 --- a/ntex-async-std/CHANGES.md +++ b/ntex-async-std/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.1] - 2022-01-30 + +* Update to ntex-io 0.1.7 + ## [0.1.0] - 2022-01-03 * Initial release diff --git a/ntex-async-std/Cargo.toml b/ntex-async-std/Cargo.toml index 970623d8..1e389818 100644 --- a/ntex-async-std/Cargo.toml +++ b/ntex-async-std/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-async-std" -version = "0.1.0" +version = "0.1.1" authors = ["ntex contributors "] description = "async-std intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,9 +16,9 @@ name = "ntex_async_std" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.8" -ntex-io = "0.1.0" -ntex-util = "0.1.6" +ntex-bytes = "0.1.11" +ntex-io = "0.1.7" +ntex-util = "0.1.13" async-oneshot = "0.5.0" log = "0.4" pin-project-lite = "0.2" diff --git a/ntex-async-std/src/io.rs b/ntex-async-std/src/io.rs index 35d401e7..cfe702a6 100644 --- a/ntex-async-std/src/io.rs +++ b/ntex-async-std/src/io.rs @@ -1,7 +1,7 @@ use std::{any, future::Future, io, pin::Pin, task::Context, task::Poll}; use async_std::io::{Read, Write}; -use ntex_bytes::{Buf, BufMut, BytesMut}; +use ntex_bytes::{Buf, BufMut, BytesVec}; use ntex_io::{ types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, }; @@ -356,7 +356,7 @@ pub(super) fn flush_io( pub fn poll_read_buf( io: Pin<&mut T>, cx: &mut Context<'_>, - buf: &mut BytesMut, + buf: &mut BytesVec, ) -> Poll> { if !buf.has_remaining_mut() { return Poll::Ready(Ok(0)); diff --git a/ntex-bytes/src/bytes.rs b/ntex-bytes/src/bytes.rs index 26957e83..12a7bc0a 100644 --- a/ntex-bytes/src/bytes.rs +++ b/ntex-bytes/src/bytes.rs @@ -508,15 +508,7 @@ impl Bytes { /// Creates `Bytes` instance from slice, by copying it. pub fn copy_from_slice(data: &[u8]) -> Self { - if data.len() <= INLINE_CAP { - Bytes { - inner: Inner::from_slice_inline(data), - } - } else { - Bytes { - inner: BytesMut::copy_from_slice_in(data, PoolId::DEFAULT.pool_ref()).inner, - } - } + Self::copy_from_slice_in(data, PoolId::DEFAULT) } /// Creates `Bytes` instance from slice, by copying it. @@ -530,7 +522,7 @@ impl Bytes { } } else { Bytes { - inner: BytesMut::copy_from_slice_in(data, pool).inner, + inner: Inner::from_slice(data.len(), data, pool.into()), } } } @@ -578,19 +570,18 @@ impl Bytes { assert!(end <= len); if end - begin <= INLINE_CAP { - return Bytes { + Bytes { inner: Inner::from_slice_inline(&self[begin..end]), - }; + } + } else { + let mut ret = self.clone(); + + unsafe { + ret.inner.set_end(end); + ret.inner.set_start(begin); + } + ret } - - let mut ret = self.clone(); - - unsafe { - ret.inner.set_end(end); - ret.inner.set_start(begin); - } - - ret } /// Returns a slice of self that is equivalent to the given `subset`. @@ -664,11 +655,11 @@ impl Bytes { } if at == 0 { - return mem::replace(self, Bytes::new()); - } - - Bytes { - inner: self.inner.split_off(at, true), + mem::replace(self, Bytes::new()) + } else { + Bytes { + inner: self.inner.split_off(at, true), + } } } @@ -703,11 +694,11 @@ impl Bytes { } if at == 0 { - return Bytes::new(); - } - - Bytes { - inner: self.inner.split_to(at, true), + Bytes::new() + } else { + Bytes { + inner: self.inner.split_to(at, true), + } } } @@ -764,7 +755,7 @@ impl Bytes { } } else { Bytes { - inner: BytesMut::copy_from_slice_in(self, self.inner.pool()).inner, + inner: Inner::from_slice(self.len(), self, self.inner.pool()), } }; } @@ -1151,13 +1142,6 @@ impl BytesMut { } } - #[inline] - pub(crate) fn with_capacity_in_priv(capacity: usize, pool: PoolRef) -> BytesMut { - BytesMut { - inner: Inner::with_capacity(capacity, pool), - } - } - /// Creates a new `BytesMut` from slice, by copying it. pub fn copy_from_slice_in(src: &[u8], pool: T) -> Self where @@ -1949,13 +1933,6 @@ impl BytesVec { } } - #[inline] - pub(crate) fn with_capacity_in_priv(capacity: usize, pool: PoolRef) -> BytesVec { - BytesVec { - inner: InnerVec::with_capacity(capacity, pool), - } - } - /// Creates a new `BytesVec` from slice, by copying it. pub fn copy_from_slice(src: &[u8]) -> Self { Self::copy_from_slice_in(src, PoolId::DEFAULT) @@ -2096,8 +2073,7 @@ impl BytesVec { /// /// assert_eq!(other, b"hello world"[..]); /// ``` - pub fn split(&mut self) -> Bytes { - // let len = self.len(); + pub fn split(&mut self) -> BytesMut { self.split_to(self.len()) } @@ -2126,10 +2102,10 @@ impl BytesVec { /// # Panics /// /// Panics if `at > len`. - pub fn split_to(&mut self, at: usize) -> Bytes { + pub fn split_to(&mut self, at: usize) -> BytesMut { assert!(at <= self.len()); - Bytes { + BytesMut { inner: self.inner.split_to(at, false), } } @@ -2779,10 +2755,10 @@ impl InnerVec { } fn split_to(&mut self, at: usize, create_inline: bool) -> Inner { - let other = unsafe { + unsafe { let ptr = self.as_ptr(); - if create_inline && at <= INLINE_CAP { + let other = if create_inline && at <= INLINE_CAP { Inner::from_ptr_inline(ptr, at) } else { let inner = self.as_inner(); @@ -2799,13 +2775,11 @@ impl InnerVec { (self.0.as_ptr() as usize ^ KIND_VEC) as *mut Shared, ), } - } - }; - unsafe { + }; self.set_start(at as u32); - } - other + other + } } fn truncate(&mut self, len: usize) { @@ -2862,11 +2836,13 @@ impl InnerVec { // try to reclaim the buffer. This is possible if the current // handle is the only outstanding handle pointing to the buffer. if inner.is_unique() && vec_cap >= new_cap { + let offset = inner.offset; inner.offset = SHARED_VEC_SIZE as u32; // The capacity is sufficient, reclaim the buffer - let ptr = (self.0.as_ptr() as *mut u8).add(SHARED_VEC_SIZE); - ptr::copy(self.as_ptr(), ptr, len); + let src = (self.0.as_ptr() as *mut u8).add(offset as usize); + let dst = (self.0.as_ptr() as *mut u8).add(SHARED_VEC_SIZE); + ptr::copy(src, dst, len); } else { // Create a new vector storage let pool = inner.pool; diff --git a/ntex-bytes/src/pool.rs b/ntex-bytes/src/pool.rs index fc8118cb..e53034da 100644 --- a/ntex-bytes/src/pool.rs +++ b/ntex-bytes/src/pool.rs @@ -50,9 +50,9 @@ struct MemoryPool { // io read/write cache and params read_wm: Cell, - read_cache: RefCell>, + read_cache: RefCell>, write_wm: Cell, - write_cache: RefCell>, + write_cache: RefCell>, spawn: RefCell>>)>>>, } @@ -198,13 +198,13 @@ impl PoolRef { #[inline] /// Creates a new `BytesMut` with the specified capacity. pub fn buf_with_capacity(self, cap: usize) -> BytesMut { - BytesMut::with_capacity_in_priv(cap, self) + BytesMut::with_capacity_in(cap, self) } #[inline] /// Creates a new `BytesVec` with the specified capacity. pub fn vec_with_capacity(self, cap: usize) -> BytesVec { - BytesVec::with_capacity_in_priv(cap, self) + BytesVec::with_capacity_in(cap, self) } #[doc(hidden)] @@ -285,18 +285,18 @@ impl PoolRef { #[doc(hidden)] #[inline] - pub fn get_read_buf(self) -> BytesMut { + pub fn get_read_buf(self) -> BytesVec { if let Some(buf) = self.0.read_cache.borrow_mut().pop() { buf } else { - BytesMut::with_capacity_in_priv(self.0.read_wm.get().high as usize, self) + BytesVec::with_capacity_in(self.0.read_wm.get().high as usize, self) } } #[doc(hidden)] #[inline] /// Release read buffer, buf must be allocated from this pool - pub fn release_read_buf(self, mut buf: BytesMut) { + pub fn release_read_buf(self, mut buf: BytesVec) { let cap = buf.capacity(); let (hw, lw) = self.0.read_wm.get().unpack(); if cap > lw && cap <= hw { @@ -310,18 +310,18 @@ impl PoolRef { #[doc(hidden)] #[inline] - pub fn get_write_buf(self) -> BytesMut { + pub fn get_write_buf(self) -> BytesVec { if let Some(buf) = self.0.write_cache.borrow_mut().pop() { buf } else { - BytesMut::with_capacity_in_priv(self.0.write_wm.get().high as usize, self) + BytesVec::with_capacity_in(self.0.write_wm.get().high as usize, self) } } #[doc(hidden)] #[inline] /// Release write buffer, buf must be allocated from this pool - pub fn release_write_buf(self, mut buf: BytesMut) { + pub fn release_write_buf(self, mut buf: BytesVec) { let cap = buf.capacity(); let (hw, lw) = self.0.write_wm.get().unpack(); if cap > lw && cap <= hw { diff --git a/ntex-codec/Cargo.toml b/ntex-codec/Cargo.toml index b9844f53..0e18157c 100644 --- a/ntex-codec/Cargo.toml +++ b/ntex-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-codec" -version = "0.6.1" +version = "0.6.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -16,4 +16,4 @@ name = "ntex_codec" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.9" +ntex-bytes = "0.1.11" diff --git a/ntex-codec/src/lib.rs b/ntex-codec/src/lib.rs index 58e00153..f17cb992 100644 --- a/ntex-codec/src/lib.rs +++ b/ntex-codec/src/lib.rs @@ -3,7 +3,7 @@ use std::{io, rc::Rc}; -use ntex_bytes::{Bytes, BytesMut}; +use ntex_bytes::{Bytes, BytesMut, BytesVec}; /// Trait of helper objects to write out messages as bytes. pub trait Encoder { @@ -15,6 +15,11 @@ pub trait Encoder { /// Encodes a frame into the buffer provided. fn encode(&self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>; + + /// Encodes a frame into the buffer provided. + fn encode_vec(&self, item: Self::Item, dst: &mut BytesVec) -> Result<(), Self::Error> { + dst.with_bytes_mut(|dst| self.encode(item, dst)) + } } /// Decoding of frames via buffers. @@ -31,6 +36,11 @@ pub trait Decoder { /// Attempts to decode a frame from the provided buffer of bytes. fn decode(&self, src: &mut BytesMut) -> Result, Self::Error>; + + /// Attempts to decode a frame from the provided buffer of bytes. + fn decode_vec(&self, src: &mut BytesVec) -> Result, Self::Error> { + src.with_bytes_mut(|src| self.decode(src)) + } } impl Encoder for Rc diff --git a/ntex-glommio/CHANGES.md b/ntex-glommio/CHANGES.md index b1cc83e6..3124d5db 100644 --- a/ntex-glommio/CHANGES.md +++ b/ntex-glommio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.1] - 2022-01-30 + +* Update to ntex-io 0.1.7 + ## [0.1.0] - 2022-01-17 * Initial release diff --git a/ntex-glommio/Cargo.toml b/ntex-glommio/Cargo.toml index 4506bfa6..f772e724 100644 --- a/ntex-glommio/Cargo.toml +++ b/ntex-glommio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-glommio" -version = "0.1.0" +version = "0.1.1" authors = ["ntex contributors "] description = "glommio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,9 +16,9 @@ name = "ntex_glommio" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.9" -ntex-io = "0.1.4" -ntex-util = "0.1.9" +ntex-bytes = "0.1.11" +ntex-io = "0.1.7" +ntex-util = "0.1.13" async-oneshot = "0.5.0" futures-lite = "1.12" futures-channel = "0.3" diff --git a/ntex-glommio/src/io.rs b/ntex-glommio/src/io.rs index 7855d0b1..62ce7a2f 100644 --- a/ntex-glommio/src/io.rs +++ b/ntex-glommio/src/io.rs @@ -4,7 +4,7 @@ use std::{any, future::Future, io, pin::Pin}; use futures_lite::future::FutureExt; use futures_lite::io::{AsyncRead, AsyncWrite}; use glommio::Task; -use ntex_bytes::{Buf, BufMut, BytesMut}; +use ntex_bytes::{Buf, BufMut, BytesVec}; use ntex_io::{ types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, }; @@ -371,7 +371,7 @@ pub(super) fn flush_io( pub fn poll_read_buf( io: Pin<&mut T>, cx: &mut Context<'_>, - buf: &mut BytesMut, + buf: &mut BytesVec, ) -> Poll> { if !buf.has_remaining_mut() { return Poll::Ready(Ok(0)); diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index a61b6fc4..e601c013 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.7] - 2022-01-xx + +* Add BytesVec type + ## [0.1.6] - 2022-01-27 * Optimize Io memory layout diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index c897b0e7..eca05cdf 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.1.6" +version = "0.1.7" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -16,9 +16,9 @@ name = "ntex_io" path = "src/lib.rs" [dependencies] -ntex-codec = "0.6.1" -ntex-bytes = "0.1.9" -ntex-util = "0.1.12" +ntex-codec = "0.6.2" +ntex-bytes = "0.1.11" +ntex-util = "0.1.13" ntex-service = "0.3.1" bitflags = "1.3" diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index f2084a0f..46ee97fc 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -1,6 +1,6 @@ use std::{any, io, task::Context, task::Poll}; -use ntex_bytes::BytesMut; +use ntex_bytes::BytesVec; use super::io::Flags; use super::{Filter, IoRef, ReadStatus, WriteStatus}; @@ -68,17 +68,17 @@ impl Filter for Base { } #[inline] - fn get_read_buf(&self) -> Option { + fn get_read_buf(&self) -> Option { self.0 .0.read_buf.take() } #[inline] - fn get_write_buf(&self) -> Option { + fn get_write_buf(&self) -> Option { self.0 .0.write_buf.take() } #[inline] - fn release_read_buf(&self, buf: BytesMut) { + fn release_read_buf(&self, buf: BytesVec) { self.0 .0.read_buf.set(Some(buf)); } @@ -91,7 +91,7 @@ impl Filter for Base { } #[inline] - fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, buf: BytesVec) -> Result<(), io::Error> { let pool = self.0.memory_pool(); if buf.is_empty() { pool.release_write_buf(buf); @@ -133,21 +133,21 @@ impl Filter for NullFilter { Poll::Ready(WriteStatus::Terminate) } - fn get_read_buf(&self) -> Option { + fn get_read_buf(&self) -> Option { None } - fn get_write_buf(&self) -> Option { + fn get_write_buf(&self) -> Option { None } - fn release_read_buf(&self, _: BytesMut) {} + fn release_read_buf(&self, _: BytesVec) {} fn process_read_buf(&self, _: &IoRef, _: usize) -> io::Result<(usize, usize)> { Ok((0, 0)) } - fn release_write_buf(&self, _: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, _: BytesVec) -> Result<(), io::Error> { Ok(()) } } diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 6084b234..ca64e7f4 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -4,7 +4,7 @@ use std::{ fmt, future::Future, hash, io, marker, mem, ops::Deref, pin::Pin, ptr, rc::Rc, time, }; -use ntex_bytes::{BytesMut, PoolId, PoolRef}; +use ntex_bytes::{BytesVec, PoolId, PoolRef}; use ntex_codec::{Decoder, Encoder}; use ntex_util::{ future::poll_fn, future::Either, task::LocalWaker, time::now, time::Millis, @@ -62,8 +62,8 @@ pub(crate) struct IoState { pub(super) read_task: LocalWaker, pub(super) write_task: LocalWaker, pub(super) dispatch_task: LocalWaker, - pub(super) read_buf: Cell>, - pub(super) write_buf: Cell>, + pub(super) read_buf: Cell>, + pub(super) write_buf: Cell>, pub(super) filter: Cell<&'static dyn Filter>, pub(super) handle: Cell>>, #[allow(clippy::box_collection)] @@ -134,9 +134,6 @@ impl IoState { { log::trace!("initiate io shutdown {:?}", self.flags.get()); self.insert_flags(Flags::IO_STOPPING_FILTERS); - self.read_task.wake(); - self.write_task.wake(); - self.dispatch_task.wake(); self.shutdown_filters(); } } @@ -178,7 +175,7 @@ impl IoState { #[inline] pub(super) fn with_read_buf(&self, release: bool, f: Fn) -> Ret where - Fn: FnOnce(&mut Option) -> Ret, + Fn: FnOnce(&mut Option) -> Ret, { let filter = self.filter.get(); let mut buf = filter.get_read_buf(); @@ -200,7 +197,7 @@ impl IoState { #[inline] pub(super) fn with_write_buf(&self, f: Fn) -> Ret where - Fn: FnOnce(&mut Option) -> Ret, + Fn: FnOnce(&mut Option) -> Ret, { let buf = self.write_buf.as_ptr(); let ref_buf = unsafe { buf.as_mut().unwrap() }; @@ -284,11 +281,11 @@ impl Io { /// Set memory pool pub fn set_memory_pool(&self, pool: PoolRef) { if let Some(mut buf) = self.0 .0.read_buf.take() { - pool.move_in(&mut buf); + pool.move_vec_in(&mut buf); self.0 .0.read_buf.set(Some(buf)); } if let Some(mut buf) = self.0 .0.write_buf.take() { - pool.move_in(&mut buf); + pool.move_vec_in(&mut buf); self.0 .0.write_buf.set(Some(buf)); } self.0 .0.pool.set(pool); diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 0238aa6c..82d518f9 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -1,6 +1,6 @@ use std::{any, fmt, hash, io}; -use ntex_bytes::{BufMut, BytesMut, PoolRef}; +use ntex_bytes::{BufMut, BytesVec, PoolRef}; use ntex_codec::{Decoder, Encoder}; use super::io::{Flags, IoRef, OnDisconnect}; @@ -106,7 +106,7 @@ impl IoRef { } // encode item and wake write task - codec.encode(item, buf) + codec.encode_vec(item, buf) }) .map_or_else( |err| { @@ -130,7 +130,9 @@ impl IoRef { U: Decoder, { self.0.with_read_buf(false, |buf| { - buf.as_mut().map(|b| codec.decode(b)).unwrap_or(Ok(None)) + buf.as_mut() + .map(|b| codec.decode_vec(b)) + .unwrap_or(Ok(None)) }) } @@ -152,7 +154,7 @@ impl IoRef { /// Get mut access to write buffer pub fn with_write_buf(&self, f: F) -> Result where - F: FnOnce(&mut BytesMut) -> R, + F: FnOnce(&mut BytesVec) -> R, { let filter = self.0.filter.get(); let mut buf = filter @@ -172,7 +174,7 @@ impl IoRef { /// Get mut access to read buffer pub fn with_read_buf(&self, f: F) -> R where - F: FnOnce(&mut BytesMut) -> R, + F: FnOnce(&mut BytesVec) -> R, { self.0.with_read_buf(true, |buf| { // set buf @@ -302,7 +304,7 @@ mod tests { assert_eq!(io.read_ready().await.unwrap(), Some(())); assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending()); - let item = io.with_read_buf(|buffer| buffer.clone()); + let item = io.with_read_buf(|buffer| buffer.split()); assert_eq!(item, Bytes::from_static(BIN)); client.write(TEXT); @@ -367,11 +369,11 @@ mod tests { self.inner.poll_read_ready(cx) } - fn get_read_buf(&self) -> Option { + fn get_read_buf(&self) -> Option { self.inner.get_read_buf() } - fn release_read_buf(&self, buf: BytesMut) { + fn release_read_buf(&self, buf: BytesVec) { self.inner.release_read_buf(buf) } @@ -386,7 +388,7 @@ mod tests { self.inner.poll_write_ready(cx) } - fn get_write_buf(&self) -> Option { + fn get_write_buf(&self) -> Option { if let Some(buf) = self.inner.get_write_buf() { self.out_bytes.set(self.out_bytes.get() - buf.len()); Some(buf) @@ -395,7 +397,7 @@ mod tests { } } - fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, buf: BytesVec) -> Result<(), io::Error> { self.write_order.borrow_mut().push(self.idx); self.out_bytes.set(self.out_bytes.get() + buf.len()); self.inner.release_write_buf(buf) diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 81b30802..e40ee52d 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -17,7 +17,7 @@ mod tasks; mod timer; mod utils; -use ntex_bytes::BytesMut; +use ntex_bytes::BytesVec; use ntex_codec::{Decoder, Encoder}; use ntex_util::time::Millis; @@ -52,18 +52,18 @@ pub enum WriteStatus { pub trait Filter: 'static { fn query(&self, id: TypeId) -> Option>; - fn get_read_buf(&self) -> Option; + fn get_read_buf(&self) -> Option; - fn release_read_buf(&self, buf: BytesMut); + fn release_read_buf(&self, buf: BytesVec); /// Process read buffer /// /// Returns tuple (total bytes, new bytes) fn process_read_buf(&self, io: &IoRef, n: usize) -> sio::Result<(usize, usize)>; - fn get_write_buf(&self) -> Option; + fn get_write_buf(&self) -> Option; - fn release_write_buf(&self, buf: BytesMut) -> sio::Result<()>; + fn release_write_buf(&self, buf: BytesVec) -> sio::Result<()>; /// Check readiness for read operations fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll; diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 47569421..e04c3a2c 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -1,6 +1,6 @@ use std::{io, task::Context, task::Poll}; -use ntex_bytes::{BytesMut, PoolRef}; +use ntex_bytes::{BytesVec, PoolRef}; use super::{io::Flags, IoRef, ReadStatus, WriteStatus}; @@ -26,7 +26,7 @@ impl ReadContext { #[inline] /// Get read buffer - pub fn get_read_buf(&self) -> BytesMut { + pub fn get_read_buf(&self) -> BytesVec { self.0 .0 .read_buf @@ -36,7 +36,7 @@ impl ReadContext { #[inline] /// Release read buffer after io read operations - pub fn release_read_buf(&self, buf: BytesMut, nbytes: usize) { + pub fn release_read_buf(&self, buf: BytesVec, nbytes: usize) { if buf.is_empty() { self.0.memory_pool().release_read_buf(buf); } else { @@ -99,13 +99,13 @@ impl WriteContext { #[inline] /// Get write buffer - pub fn get_write_buf(&self) -> Option { + pub fn get_write_buf(&self) -> Option { self.0 .0.write_buf.take() } #[inline] /// Release write buffer after io write operations - pub fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> { + pub fn release_write_buf(&self, buf: BytesVec) -> Result<(), io::Error> { let pool = self.0.memory_pool(); let mut flags = self.0.flags(); diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 700bb4f9..222613bd 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::{any, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc}; -use ntex_bytes::{Buf, BufMut, BytesMut}; +use ntex_bytes::{Buf, BufMut, Bytes, BytesVec}; use ntex_util::future::poll_fn; use ntex_util::time::{sleep, Millis, Sleep}; @@ -60,7 +60,7 @@ struct State { #[derive(Default, Debug)] struct Channel { - buf: BytesMut, + buf: BytesVec, buf_cap: usize, flags: IoTestFlags, waker: AtomicWaker, @@ -159,7 +159,7 @@ impl IoTest { /// Access read buffer. pub fn local_buffer(&self, f: F) -> R where - F: FnOnce(&mut BytesMut) -> R, + F: FnOnce(&mut BytesVec) -> R, { let guard = self.local.lock().unwrap(); let mut ch = guard.borrow_mut(); @@ -169,7 +169,7 @@ impl IoTest { /// Access remote buffer. pub fn remote_buffer(&self, f: F) -> R where - F: FnOnce(&mut BytesMut) -> R, + F: FnOnce(&mut BytesVec) -> R, { let guard = self.remote.lock().unwrap(); let mut ch = guard.borrow_mut(); @@ -205,12 +205,12 @@ impl IoTest { } /// Read any available data - pub fn read_any(&self) -> BytesMut { - self.local.lock().unwrap().borrow_mut().buf.split() + pub fn read_any(&self) -> Bytes { + self.local.lock().unwrap().borrow_mut().buf.split().freeze() } /// Read data, if data is not available wait for it - pub async fn read(&self) -> Result { + pub async fn read(&self) -> Result { if self.local.lock().unwrap().borrow().buf.is_empty() { poll_fn(|cx| { let guard = self.local.lock().unwrap(); @@ -237,13 +237,13 @@ impl IoTest { }) .await; } - Ok(self.local.lock().unwrap().borrow_mut().buf.split()) + Ok(self.local.lock().unwrap().borrow_mut().buf.split().freeze()) } pub fn poll_read_buf( &self, cx: &mut Context<'_>, - buf: &mut BytesMut, + buf: &mut BytesVec, ) -> Poll> { let guard = self.local.lock().unwrap(); let mut ch = guard.borrow_mut(); @@ -551,7 +551,7 @@ impl Future for WriteTask { // read until 0 or err let io = &this.io; loop { - let mut buf = BytesMut::new(); + let mut buf = BytesVec::new(); match io.poll_read_buf(cx, &mut buf) { Poll::Ready(Err(e)) => { this.state.close(Some(e)); diff --git a/ntex-tls/examples/client.rs b/ntex-tls/examples/simple-client.rs similarity index 100% rename from ntex-tls/examples/client.rs rename to ntex-tls/examples/simple-client.rs diff --git a/ntex-tls/src/openssl/mod.rs b/ntex-tls/src/openssl/mod.rs index ec93ab92..77da6f74 100644 --- a/ntex-tls/src/openssl/mod.rs +++ b/ntex-tls/src/openssl/mod.rs @@ -5,7 +5,7 @@ use std::{ any, cmp, error::Error, future::Future, io, pin::Pin, task::Context, task::Poll, }; -use ntex_bytes::{BufMut, BytesMut, PoolRef}; +use ntex_bytes::{BufMut, BytesVec, PoolRef}; use ntex_io::{Base, Filter, FilterFactory, Io, IoRef, ReadStatus, WriteStatus}; use ntex_util::{future::poll_fn, ready, time, time::Millis}; use tls_openssl::ssl::{self, SslStream}; @@ -29,13 +29,13 @@ pub struct SslFilter { inner: RefCell>>, pool: PoolRef, handshake: Cell, - read_buf: Cell>, + read_buf: Cell>, } struct IoInner { inner: F, pool: PoolRef, - write_buf: Option, + write_buf: Option, } impl io::Read for IoInner { @@ -61,7 +61,7 @@ impl io::Write for IoInner { buf.reserve(src.len()); buf } else { - BytesMut::with_capacity_in(src.len(), self.pool) + BytesVec::with_capacity_in(src.len(), self.pool) }; buf.extend_from_slice(src); self.inner.release_write_buf(buf)?; @@ -143,17 +143,17 @@ impl Filter for SslFilter { } #[inline] - fn get_read_buf(&self) -> Option { + fn get_read_buf(&self) -> Option { self.read_buf.take() } #[inline] - fn get_write_buf(&self) -> Option { + fn get_write_buf(&self) -> Option { self.inner.borrow_mut().get_mut().write_buf.take() } #[inline] - fn release_read_buf(&self, buf: BytesMut) { + fn release_read_buf(&self, buf: BytesVec) { self.read_buf.set(Some(buf)); } @@ -218,7 +218,7 @@ impl Filter for SslFilter { } } - fn release_write_buf(&self, mut buf: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, mut buf: BytesVec) -> Result<(), io::Error> { loop { if buf.is_empty() { return Ok(()); diff --git a/ntex-tls/src/rustls/client.rs b/ntex-tls/src/rustls/client.rs index 7dd6ab91..b2666fe3 100644 --- a/ntex-tls/src/rustls/client.rs +++ b/ntex-tls/src/rustls/client.rs @@ -1,8 +1,8 @@ //! An implementation of SSL streams for ntex backed by OpenSSL use std::io::{self, Read as IoRead, Write as IoWrite}; -use std::{any, cell::RefCell, sync::Arc, task::Context, task::Poll}; +use std::{any, cell::Cell, cell::RefCell, sync::Arc, task::Context, task::Poll}; -use ntex_bytes::{BufMut, BytesMut}; +use ntex_bytes::{BufMut, BytesVec}; use ntex_io::{Filter, Io, IoRef, ReadStatus, WriteStatus}; use ntex_util::{future::poll_fn, ready}; use tls_rust::{ClientConfig, ClientConnection, ServerName}; @@ -14,7 +14,7 @@ use super::{PeerCert, PeerCertChain}; /// An implementation of SSL streams pub struct TlsClientFilter { - inner: RefCell>, + inner: IoInner, session: RefCell, } @@ -53,107 +53,101 @@ impl Filter for TlsClientFilter { None } } else { - self.inner.borrow().filter.query(id) + self.inner.filter.query(id) } } #[inline] fn poll_shutdown(&self) -> Poll> { - self.inner.borrow().filter.poll_shutdown() + self.inner.filter.poll_shutdown() } #[inline] fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll { - self.inner.borrow().filter.poll_read_ready(cx) + self.inner.filter.poll_read_ready(cx) } #[inline] fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll { - self.inner.borrow().filter.poll_write_ready(cx) + self.inner.filter.poll_write_ready(cx) } #[inline] - fn get_read_buf(&self) -> Option { - self.inner.borrow_mut().read_buf.take() + fn get_read_buf(&self) -> Option { + self.inner.read_buf.take() } #[inline] - fn get_write_buf(&self) -> Option { - self.inner.borrow_mut().write_buf.take() + fn get_write_buf(&self) -> Option { + self.inner.write_buf.take() } #[inline] - fn release_read_buf(&self, buf: BytesMut) { - self.inner.borrow_mut().read_buf = Some(buf); + fn release_read_buf(&self, buf: BytesVec) { + self.inner.read_buf.set(Some(buf)); } fn process_read_buf(&self, io: &IoRef, nbytes: usize) -> io::Result<(usize, usize)> { - let mut inner = self.inner.borrow_mut(); let mut session = self.session.borrow_mut(); // ask inner filter to process read buf - match inner.filter.process_read_buf(io, nbytes) { + match self.inner.filter.process_read_buf(io, nbytes) { Err(err) => io.want_shutdown(Some(err)), Ok((_, 0)) => return Ok((0, 0)), Ok(_) => (), } - if session.is_handshaking() { - Ok((0, 1)) + // get processed buffer + let mut dst = if let Some(dst) = self.inner.read_buf.take() { + dst } else { - // get processed buffer - let mut dst = if let Some(dst) = inner.read_buf.take() { - dst - } else { - inner.pool.get_read_buf() - }; - let (hw, lw) = inner.pool.read_params().unpack(); + self.inner.pool.get_read_buf() + }; + let (hw, lw) = self.inner.pool.read_params().unpack(); - let mut src = if let Some(src) = inner.filter.get_read_buf() { - src - } else { - return Ok((0, 0)); - }; + let mut src = if let Some(src) = self.inner.filter.get_read_buf() { + src + } else { + return Ok((0, 0)); + }; - let mut new_bytes = 0; - loop { - // make sure we've got room - let remaining = dst.remaining_mut(); - if remaining < lw { - dst.reserve(hw - remaining); - } - - let mut cursor = io::Cursor::new(&src); - let n = session.read_tls(&mut cursor)?; - src.split_to(n); - let state = session - .process_new_packets() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - let new_b = state.plaintext_bytes_to_read(); - if new_b > 0 { - dst.reserve(new_b); - let chunk: &mut [u8] = - unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; - let v = session.reader().read(chunk)?; - unsafe { dst.advance_mut(v) }; - new_bytes += v; - } else { - break; - } + let mut new_bytes = if self.inner.handshake.get() { 1 } else { 0 }; + loop { + // make sure we've got room + let remaining = dst.remaining_mut(); + if remaining < lw { + dst.reserve(hw - remaining); } - let dst_len = dst.len(); - inner.read_buf = Some(dst); - inner.filter.release_read_buf(src); - Ok((dst_len, new_bytes)) + let mut cursor = io::Cursor::new(&src); + let n = session.read_tls(&mut cursor)?; + src.split_to(n); + let state = session + .process_new_packets() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + let new_b = state.plaintext_bytes_to_read(); + if new_b > 0 { + dst.reserve(new_b); + let chunk: &mut [u8] = + unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; + let v = session.reader().read(chunk)?; + unsafe { dst.advance_mut(v) }; + new_bytes += v; + } else { + break; + } } + + let dst_len = dst.len(); + self.inner.read_buf.set(Some(dst)); + self.inner.filter.release_read_buf(src); + Ok((dst_len, new_bytes)) } - fn release_write_buf(&self, mut src: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, mut src: BytesVec) -> Result<(), io::Error> { let mut session = self.session.borrow_mut(); - let mut inner = self.inner.borrow_mut(); - let mut io = Wrapper(&mut *inner); + let mut io = Wrapper(&self.inner); loop { if !src.is_empty() { @@ -168,7 +162,7 @@ impl Filter for TlsClientFilter { } if !src.is_empty() { - self.inner.borrow_mut().write_buf = Some(src); + self.inner.write_buf.set(Some(src)); } Ok(()) @@ -190,12 +184,13 @@ impl TlsClientFilter { let inner = IoInner { pool, filter, - read_buf: None, - write_buf: None, + read_buf: Cell::new(None), + write_buf: Cell::new(None), + handshake: Cell::new(true), }; Ok::<_, io::Error>(TlsFilter::new_client(TlsClientFilter { - inner: RefCell::new(inner), + inner, session: RefCell::new(session), })) })?; @@ -204,34 +199,14 @@ impl TlsClientFilter { loop { let (result, wants_read) = { let mut session = filter.client().session.borrow_mut(); - let mut inner = filter.client().inner.borrow_mut(); - let mut wrp = Wrapper(&mut *inner); - let result = session.complete_io(&mut wrp); - let wants_read = session.wants_read(); - - if session.wants_write() { - loop { - let n = session.write_tls(&mut wrp)?; - if n == 0 { - break; - } - } - } - (result, wants_read) + let mut wrp = Wrapper(&filter.client().inner); + (session.complete_io(&mut wrp), session.wants_read()) }; - if result.is_ok() && wants_read { - poll_fn(|cx| match ready!(io.poll_read_ready(cx)) { - Ok(None) => Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "disconnected", - ))), - Err(e) => Poll::Ready(Err(e)), - _ => Poll::Ready(Ok(())), - }) - .await?; - } match result { - Ok(_) => return Ok(io), + Ok(_) => { + filter.client().inner.handshake.set(false); + return Ok(io); + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { poll_fn(|cx| { let read_ready = if wants_read { diff --git a/ntex-tls/src/rustls/mod.rs b/ntex-tls/src/rustls/mod.rs index 0210971e..11060f5c 100644 --- a/ntex-tls/src/rustls/mod.rs +++ b/ntex-tls/src/rustls/mod.rs @@ -1,9 +1,9 @@ #![allow(clippy::type_complexity)] //! An implementation of SSL streams for ntex backed by OpenSSL -use std::sync::Arc; use std::{any, cmp, future::Future, io, pin::Pin, task::Context, task::Poll}; +use std::{cell::Cell, sync::Arc}; -use ntex_bytes::{BytesMut, PoolRef}; +use ntex_bytes::{BytesVec, PoolRef}; use ntex_io::{Base, Filter, FilterFactory, Io, IoRef, ReadStatus, WriteStatus}; use ntex_util::time::Millis; use tls_rust::{Certificate, ClientConfig, ServerConfig, ServerName}; @@ -93,7 +93,7 @@ impl Filter for TlsFilter { } #[inline] - fn get_read_buf(&self) -> Option { + fn get_read_buf(&self) -> Option { match self.inner { InnerTlsFilter::Server(ref f) => f.get_read_buf(), InnerTlsFilter::Client(ref f) => f.get_read_buf(), @@ -101,7 +101,7 @@ impl Filter for TlsFilter { } #[inline] - fn get_write_buf(&self) -> Option { + fn get_write_buf(&self) -> Option { match self.inner { InnerTlsFilter::Server(ref f) => f.get_write_buf(), InnerTlsFilter::Client(ref f) => f.get_write_buf(), @@ -109,7 +109,7 @@ impl Filter for TlsFilter { } #[inline] - fn release_read_buf(&self, buf: BytesMut) { + fn release_read_buf(&self, buf: BytesVec) { match self.inner { InnerTlsFilter::Server(ref f) => f.release_read_buf(buf), InnerTlsFilter::Client(ref f) => f.release_read_buf(buf), @@ -125,7 +125,7 @@ impl Filter for TlsFilter { } #[inline] - fn release_write_buf(&self, src: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, src: BytesVec) -> Result<(), io::Error> { match self.inner { InnerTlsFilter::Server(ref f) => f.release_write_buf(src), InnerTlsFilter::Client(ref f) => f.release_write_buf(src), @@ -243,11 +243,12 @@ impl FilterFactory for TlsConnectorConfigured { pub(crate) struct IoInner { filter: F, pool: PoolRef, - read_buf: Option, - write_buf: Option, + read_buf: Cell>, + write_buf: Cell>, + handshake: Cell, } -pub(crate) struct Wrapper<'a, F>(&'a mut IoInner); +pub(crate) struct Wrapper<'a, F>(&'a IoInner); impl<'a, F: Filter> io::Read for Wrapper<'a, F> { fn read(&mut self, dst: &mut [u8]) -> io::Result { @@ -273,7 +274,7 @@ impl<'a, F: Filter> io::Write for Wrapper<'a, F> { buf.reserve(src.len()); buf } else { - BytesMut::with_capacity_in(src.len(), self.0.pool) + BytesVec::with_capacity_in(src.len(), self.0.pool) }; buf.extend_from_slice(src); self.0.filter.release_write_buf(buf)?; diff --git a/ntex-tls/src/rustls/server.rs b/ntex-tls/src/rustls/server.rs index a6de2082..2ca8aa72 100644 --- a/ntex-tls/src/rustls/server.rs +++ b/ntex-tls/src/rustls/server.rs @@ -1,8 +1,8 @@ //! An implementation of SSL streams for ntex backed by OpenSSL use std::io::{self, Read as IoRead, Write as IoWrite}; -use std::{any, cell::RefCell, sync::Arc, task::Context, task::Poll}; +use std::{any, cell::Cell, cell::RefCell, sync::Arc, task::Context, task::Poll}; -use ntex_bytes::{BufMut, BytesMut}; +use ntex_bytes::{BufMut, BytesVec}; use ntex_io::{Filter, Io, IoRef, ReadStatus, WriteStatus}; use ntex_util::{future::poll_fn, ready, time, time::Millis}; use tls_rust::{ServerConfig, ServerConnection}; @@ -14,7 +14,7 @@ use super::{PeerCert, PeerCertChain}; /// An implementation of SSL streams pub struct TlsServerFilter { - inner: RefCell>, + inner: IoInner, session: RefCell, } @@ -53,107 +53,101 @@ impl Filter for TlsServerFilter { None } } else { - self.inner.borrow().filter.query(id) + self.inner.filter.query(id) } } #[inline] fn poll_shutdown(&self) -> Poll> { - self.inner.borrow().filter.poll_shutdown() + self.inner.filter.poll_shutdown() } #[inline] fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll { - self.inner.borrow().filter.poll_read_ready(cx) + self.inner.filter.poll_read_ready(cx) } #[inline] fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll { - self.inner.borrow().filter.poll_write_ready(cx) + self.inner.filter.poll_write_ready(cx) } #[inline] - fn get_read_buf(&self) -> Option { - self.inner.borrow_mut().read_buf.take() + fn get_read_buf(&self) -> Option { + self.inner.read_buf.take() } #[inline] - fn get_write_buf(&self) -> Option { - self.inner.borrow_mut().write_buf.take() + fn get_write_buf(&self) -> Option { + self.inner.write_buf.take() } #[inline] - fn release_read_buf(&self, buf: BytesMut) { - self.inner.borrow_mut().read_buf = Some(buf); + fn release_read_buf(&self, buf: BytesVec) { + self.inner.read_buf.set(Some(buf)); } fn process_read_buf(&self, io: &IoRef, nbytes: usize) -> io::Result<(usize, usize)> { - let mut inner = self.inner.borrow_mut(); let mut session = self.session.borrow_mut(); // ask inner filter to process read buf - match inner.filter.process_read_buf(io, nbytes) { + match self.inner.filter.process_read_buf(io, nbytes) { Err(err) => io.want_shutdown(Some(err)), Ok((_, 0)) => return Ok((0, 0)), Ok(_) => (), } - if session.is_handshaking() { - Ok((0, 1)) + // get processed buffer + let mut dst = if let Some(dst) = self.inner.read_buf.take() { + dst } else { - // get processed buffer - let mut dst = if let Some(dst) = inner.read_buf.take() { - dst - } else { - inner.pool.get_read_buf() - }; - let (hw, lw) = inner.pool.read_params().unpack(); + self.inner.pool.get_read_buf() + }; + let (hw, lw) = self.inner.pool.read_params().unpack(); - let mut src = if let Some(src) = inner.filter.get_read_buf() { - src - } else { - return Ok((0, 0)); - }; + let mut src = if let Some(src) = self.inner.filter.get_read_buf() { + src + } else { + return Ok((0, 0)); + }; - let mut new_bytes = 0; - loop { - // make sure we've got room - let remaining = dst.remaining_mut(); - if remaining < lw { - dst.reserve(hw - remaining); - } - - let mut cursor = io::Cursor::new(&src); - let n = session.read_tls(&mut cursor)?; - src.split_to(n); - let state = session - .process_new_packets() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - let new_b = state.plaintext_bytes_to_read(); - if new_b > 0 { - dst.reserve(new_b); - let chunk: &mut [u8] = - unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; - let v = session.reader().read(chunk)?; - unsafe { dst.advance_mut(v) }; - new_bytes += v; - } else { - break; - } + let mut new_bytes = if self.inner.handshake.get() { 1 } else { 0 }; + loop { + // make sure we've got room + let remaining = dst.remaining_mut(); + if remaining < lw { + dst.reserve(hw - remaining); } - let dst_len = dst.len(); - inner.read_buf = Some(dst); - inner.filter.release_read_buf(src); - Ok((dst_len, new_bytes)) + let mut cursor = io::Cursor::new(&src); + let n = session.read_tls(&mut cursor)?; + src.split_to(n); + let state = session + .process_new_packets() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + let new_b = state.plaintext_bytes_to_read(); + if new_b > 0 { + dst.reserve(new_b); + let chunk: &mut [u8] = + unsafe { std::mem::transmute(&mut *dst.chunk_mut()) }; + let v = session.reader().read(chunk)?; + unsafe { dst.advance_mut(v) }; + new_bytes += v; + } else { + break; + } } + + let dst_len = dst.len(); + self.inner.read_buf.set(Some(dst)); + self.inner.filter.release_read_buf(src); + Ok((dst_len, new_bytes)) } - fn release_write_buf(&self, mut src: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, mut src: BytesVec) -> Result<(), io::Error> { let mut session = self.session.borrow_mut(); - let mut inner = self.inner.borrow_mut(); - let mut io = Wrapper(&mut *inner); + let mut io = Wrapper(&self.inner); loop { if !src.is_empty() { @@ -168,7 +162,7 @@ impl Filter for TlsServerFilter { } if !src.is_empty() { - inner.write_buf = Some(src); + self.inner.write_buf.set(Some(src)); } Ok(()) } @@ -190,12 +184,13 @@ impl TlsServerFilter { let inner = IoInner { pool, filter, - read_buf: None, - write_buf: None, + read_buf: Cell::new(None), + write_buf: Cell::new(None), + handshake: Cell::new(true), }; Ok::<_, io::Error>(TlsFilter::new_server(TlsServerFilter { - inner: RefCell::new(inner), + inner, session: RefCell::new(session), })) })?; @@ -204,35 +199,14 @@ impl TlsServerFilter { loop { let (result, wants_read) = { let mut session = filter.server().session.borrow_mut(); - let mut inner = filter.server().inner.borrow_mut(); - let mut wrp = Wrapper(&mut *inner); - let result = session.complete_io(&mut wrp); - let wants_read = session.wants_read(); - - if session.wants_write() { - loop { - let n = session.write_tls(&mut wrp)?; - if n == 0 { - break; - } - } - } - (result, wants_read) + let mut wrp = Wrapper(&filter.server().inner); + (session.complete_io(&mut wrp), session.wants_read()) }; - if result.is_ok() && wants_read { - poll_fn(|cx| { - match ready!(io.poll_read_ready(cx))? { - Some(_) => Ok(()), - None => { - Err(io::Error::new(io::ErrorKind::Other, "disconnected")) - } - }?; - Poll::Ready(Ok::<_, io::Error>(())) - }) - .await?; - } match result { - Ok(_) => return Ok(io), + Ok(_) => { + filter.server().inner.handshake.set(false); + return Ok(io); + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { poll_fn(|cx| { let read_ready = if wants_read { diff --git a/ntex-tokio/CHANGES.md b/ntex-tokio/CHANGES.md index c00d391e..3ba200eb 100644 --- a/ntex-tokio/CHANGES.md +++ b/ntex-tokio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.3] - 2022-01-30 + +* Update to ntex-io 0.1.7 + ## [0.1.2] - 2022-01-12 * Fix potential BorrowMutError diff --git a/ntex-tokio/Cargo.toml b/ntex-tokio/Cargo.toml index 440080aa..680407f9 100644 --- a/ntex-tokio/Cargo.toml +++ b/ntex-tokio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tokio" -version = "0.1.2" +version = "0.1.3" authors = ["ntex contributors "] description = "tokio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -16,9 +16,9 @@ name = "ntex_tokio" path = "src/lib.rs" [dependencies] -ntex-bytes = "0.1.9" -ntex-io = "0.1.3" -ntex-util = "0.1.9" +ntex-bytes = "0.1.11" +ntex-io = "0.1.7" +ntex-util = "0.1.13" log = "0.4" pin-project-lite = "0.2" tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] } diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index bf068bdf..0ff3806a 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -1,7 +1,7 @@ use std::task::{Context, Poll}; use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc, rc::Weak}; -use ntex_bytes::{Buf, BufMut, BytesMut}; +use ntex_bytes::{Buf, BufMut, BytesVec}; use ntex_io::{ types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, @@ -733,7 +733,7 @@ mod unixstream { pub fn poll_read_buf( io: Pin<&mut T>, cx: &mut Context<'_>, - buf: &mut BytesMut, + buf: &mut BytesVec, ) -> Poll> { if !buf.has_remaining_mut() { return Poll::Ready(Ok(0)); diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index cff876e7..255df824 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.5.14] - 2022-01-30 + +* Update ntex-io to 0.1.7 + ## [0.5.13] - 2022-01-28 * http: Refactor client pool support for http/2 connections diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 6d0e51e8..102956ed 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.5.13" +version = "0.5.14" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -48,18 +48,18 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"] async-std = ["ntex-rt/async-std", "ntex-async-std"] [dependencies] -ntex-codec = "0.6.1" +ntex-codec = "0.6.2" ntex-router = "0.5.1" ntex-service = "0.3.1" ntex-macros = "0.1.3" ntex-util = "0.1.13" -ntex-bytes = "0.1.10" -ntex-tls = "0.1.2" +ntex-bytes = "0.1.11" +ntex-tls = "0.1.3" ntex-rt = "0.4.3" -ntex-io = "0.1.6" -ntex-tokio = "0.1.2" -ntex-glommio = { version = "0.1.0", optional = true } -ntex-async-std = { version = "0.1.0", optional = true } +ntex-io = "0.1.7" +ntex-tokio = "0.1.3" +ntex-glommio = { version = "0.1.1", optional = true } +ntex-async-std = { version = "0.1.1", optional = true } async-oneshot = "0.5.0" async-channel = "1.6.1" diff --git a/ntex/src/http/body.rs b/ntex/src/http/body.rs index b72d1894..6d2e5ec2 100644 --- a/ntex/src/http/body.rs +++ b/ntex/src/http/body.rs @@ -610,11 +610,9 @@ mod tests { #[crate::rt_test] async fn test_bytes_mut() { - let mut b = BytesMut::from("test"); - assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); - assert_eq!(Body::from(b.clone()).get_ref(), b"test"); - + let mut b = Body::from(BytesMut::from("test")); assert_eq!(b.size(), BodySize::Sized(4)); + assert_eq!(b.get_ref(), b"test"); assert_eq!( poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(), Some(Bytes::from("test")) diff --git a/ntex/src/http/client/h1proto.rs b/ntex/src/http/client/h1proto.rs index f5c5ab57..a6c4e4e9 100644 --- a/ntex/src/http/client/h1proto.rs +++ b/ntex/src/http/client/h1proto.rs @@ -35,7 +35,7 @@ where Some(port) => write!(wrt, "{}:{}", host, port), }; - match HeaderValue::from_maybe_shared(wrt.get_mut().split().freeze()) { + match HeaderValue::from_maybe_shared(wrt.get_mut().split()) { Ok(value) => match head { RequestHeadType::Owned(ref mut head) => { head.headers.insert(HOST, value) diff --git a/ntex/src/http/h1/decoder.rs b/ntex/src/http/h1/decoder.rs index a2f99320..16c9ff1c 100644 --- a/ntex/src/http/h1/decoder.rs +++ b/ntex/src/http/h1/decoder.rs @@ -469,15 +469,15 @@ impl Decoder for PayloadDecoder { let len = src.len() as u64; let buf; if *remaining > len { - buf = src.split().freeze(); + buf = src.split(); *remaining -= len; } else { - buf = src.split_to(*remaining as usize).freeze(); + buf = src.split_to(*remaining as usize); *remaining = 0; }; self.kind.set(kind); log::trace!("Length read: {}", buf.len()); - Ok(Some(PayloadItem::Chunk(buf))) + Ok(Some(PayloadItem::Chunk(buf.freeze()))) } } Kind::Chunked(ref mut state, ref mut size) => { @@ -630,13 +630,13 @@ impl ChunkedState { } else { let slice; if *rem > len { - slice = rdr.split().freeze(); + slice = rdr.split(); *rem -= len; } else { - slice = rdr.split_to(*rem as usize).freeze(); + slice = rdr.split_to(*rem as usize); *rem = 0; } - *buf = Some(slice); + *buf = Some(slice.freeze()); if *rem > 0 { Poll::Ready(Ok(ChunkedState::Body)) } else { diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index b19479dc..87ab5bb6 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -844,14 +844,14 @@ mod tests { client.write("GET /test1 HTTP/1.1\r\n\r\n"); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(!client.is_server_dropped()); client.write("GET /test2 HTTP/1.1\r\n\r\n"); client.write("GET /test3 HTTP/1.1\r\n\r\n"); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(decoder.decode(&mut buf).unwrap().is_none()); @@ -877,13 +877,13 @@ mod tests { sleep(Millis(50)).await; client.write("xxxxx"); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(!client.is_server_dropped()); client.write("GET /test2 HTTP/1.1\r\n\r\n"); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(decoder.decode(&mut buf).unwrap().is_none()); assert!(!client.is_server_dropped()); @@ -904,7 +904,7 @@ mod tests { client.write("GET /test HTTP/1.1\r\n\r\n"); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(!client.is_server_dropped()); @@ -913,10 +913,10 @@ mod tests { sleep(Millis(50)).await; client.write("GET /test HTTP/1.1\r\n\r\n"); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(decoder.decode(&mut buf).unwrap().is_none()); assert!(!client.is_server_dropped()); @@ -984,13 +984,12 @@ mod tests { assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); sleep(Millis(50)).await; - // required because io shutdown is async oper - let _ = lazy(|cx| Pin::new(&mut h1).poll(cx)).await; - sleep(Millis(550)).await; - assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready()); + crate::util::poll_fn(|cx| Pin::new(&mut h1).poll(cx)) + .await + .unwrap(); assert!(h1.inner.io.is_closed()); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert_eq!(load(&mut decoder, &mut buf).status, StatusCode::BAD_REQUEST); } @@ -1129,7 +1128,7 @@ mod tests { assert_eq!(client.remote_buffer(|buf| buf.len()), 0); let mut decoder = ClientCodec::default(); - let mut buf = client.read().await.unwrap(); + let mut buf = BytesMut::from(&client.read().await.unwrap()[..]); assert!(load(&mut decoder, &mut buf).status.is_success()); assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending()); @@ -1157,7 +1156,7 @@ mod tests { assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready()); sleep(Millis(50)).await; assert!(h1.inner.io.is_closed()); - let buf = client.local_buffer(|buf| buf.split().freeze()); + let buf = client.local_buffer(|buf| buf.split()); assert_eq!(&buf[..28], b"HTTP/1.1 500 Internal Server"); assert_eq!(&buf[buf.len() - 5..], b"error"); } diff --git a/ntex/src/http/h1/encoder.rs b/ntex/src/http/h1/encoder.rs index 33c78bf1..a0f9bac4 100644 --- a/ntex/src/http/h1/encoder.rs +++ b/ntex/src/http/h1/encoder.rs @@ -597,10 +597,7 @@ mod tests { assert!(!enc.encode(b"test", &mut bytes).ok().unwrap()); assert!(enc.encode(b"", &mut bytes).ok().unwrap()); } - assert_eq!( - bytes.split().freeze(), - Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n") - ); + assert_eq!(bytes.split(), Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n")); } #[test] @@ -629,7 +626,7 @@ mod tests { ConnectionType::Close, &DateService::default(), ); - let data = String::from_utf8(Vec::from(bytes.split().freeze().as_ref())).unwrap(); + let data = String::from_utf8(Vec::from(bytes.split().as_ref())).unwrap(); assert!(data.contains("content-length: 0\r\n")); assert!(data.contains("connection: close\r\n")); assert!(data.contains("authorization: another authorization\r\n")); @@ -641,35 +638,35 @@ mod tests { let mut bytes = BytesMut::new(); bytes.reserve(50); write_content_length(0, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 0\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 0\r\n"[..]); bytes.reserve(50); write_content_length(9, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 9\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 9\r\n"[..]); bytes.reserve(50); write_content_length(10, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 10\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 10\r\n"[..]); bytes.reserve(50); write_content_length(99, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 99\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 99\r\n"[..]); bytes.reserve(50); write_content_length(100, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 100\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 100\r\n"[..]); bytes.reserve(50); write_content_length(101, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 101\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 101\r\n"[..]); bytes.reserve(50); write_content_length(998, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 998\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 998\r\n"[..]); bytes.reserve(50); write_content_length(1000, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 1000\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 1000\r\n"[..]); bytes.reserve(50); write_content_length(1001, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 1001\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 1001\r\n"[..]); bytes.reserve(50); write_content_length(5909, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 5909\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 5909\r\n"[..]); write_content_length(25999, &mut bytes); - assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 25999\r\n"[..]); + assert_eq!(bytes.split(), b"\r\ncontent-length: 25999\r\n"[..]); } } diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index 5879f665..3bd52f25 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -105,6 +105,8 @@ pub mod tls { } pub mod util { - pub use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut, Pool, PoolId, PoolRef}; + pub use ntex_bytes::{ + Buf, BufMut, ByteString, Bytes, BytesMut, BytesVec, Pool, PoolId, PoolRef, + }; pub use ntex_util::{future::*, ready, services::*, HashMap, HashSet}; } diff --git a/ntex/src/ws/codec.rs b/ntex/src/ws/codec.rs index 6677886a..36557f5d 100644 --- a/ntex/src/ws/codec.rs +++ b/ntex/src/ws/codec.rs @@ -228,9 +228,7 @@ impl Decoder for Codec { OpCode::Continue => { if self.flags.get().contains(Flags::R_CONTINUATION) { Ok(Some(Frame::Continuation(Item::Continue( - payload - .map(|pl| pl.freeze()) - .unwrap_or_else(Bytes::new), + payload.unwrap_or_else(Bytes::new), )))) } else { Err(ProtocolError::ContinuationNotStarted) @@ -240,9 +238,7 @@ impl Decoder for Codec { if !self.flags.get().contains(Flags::R_CONTINUATION) { self.insert_flags(Flags::R_CONTINUATION); Ok(Some(Frame::Continuation(Item::FirstBinary( - payload - .map(|pl| pl.freeze()) - .unwrap_or_else(Bytes::new), + payload.unwrap_or_else(Bytes::new), )))) } else { Err(ProtocolError::ContinuationStarted) @@ -252,20 +248,18 @@ impl Decoder for Codec { if !self.flags.get().contains(Flags::R_CONTINUATION) { self.insert_flags(Flags::R_CONTINUATION); Ok(Some(Frame::Continuation(Item::FirstText( - payload - .map(|pl| pl.freeze()) - .unwrap_or_else(Bytes::new), + payload.unwrap_or_else(Bytes::new), )))) } else { Err(ProtocolError::ContinuationStarted) } } - OpCode::Ping => Ok(Some(Frame::Ping( - payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new), - ))), - OpCode::Pong => Ok(Some(Frame::Pong( - payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new), - ))), + OpCode::Ping => { + Ok(Some(Frame::Ping(payload.unwrap_or_else(Bytes::new)))) + } + OpCode::Pong => { + Ok(Some(Frame::Pong(payload.unwrap_or_else(Bytes::new)))) + } OpCode::Bad => Err(ProtocolError::BadOpCode), _ => { error!("Unfinished fragment {:?}", opcode); @@ -278,9 +272,7 @@ impl Decoder for Codec { if self.flags.get().contains(Flags::R_CONTINUATION) { self.remove_flags(Flags::R_CONTINUATION); Ok(Some(Frame::Continuation(Item::Last( - payload - .map(|pl| pl.freeze()) - .unwrap_or_else(Bytes::new), + payload.unwrap_or_else(Bytes::new), )))) } else { Err(ProtocolError::ContinuationNotStarted) @@ -295,18 +287,18 @@ impl Decoder for Codec { Ok(Some(Frame::Close(None))) } } - OpCode::Ping => Ok(Some(Frame::Ping( - payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new), - ))), - OpCode::Pong => Ok(Some(Frame::Pong( - payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new), - ))), - OpCode::Binary => Ok(Some(Frame::Binary( - payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new), - ))), - OpCode::Text => Ok(Some(Frame::Text( - payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new), - ))), + OpCode::Ping => { + Ok(Some(Frame::Ping(payload.unwrap_or_else(Bytes::new)))) + } + OpCode::Pong => { + Ok(Some(Frame::Pong(payload.unwrap_or_else(Bytes::new)))) + } + OpCode::Binary => { + Ok(Some(Frame::Binary(payload.unwrap_or_else(Bytes::new)))) + } + OpCode::Text => { + Ok(Some(Frame::Text(payload.unwrap_or_else(Bytes::new)))) + } } } } diff --git a/ntex/src/ws/frame.rs b/ntex/src/ws/frame.rs index 1e4735dc..40b89e3c 100644 --- a/ntex/src/ws/frame.rs +++ b/ntex/src/ws/frame.rs @@ -5,7 +5,7 @@ use nanorand::{Rng, WyRand}; use super::proto::{CloseCode, CloseReason, OpCode}; use super::{error::ProtocolError, mask::apply_mask}; -use crate::util::{Buf, BufMut, BytesMut}; +use crate::util::{Buf, BufMut, Bytes, BytesMut}; /// WebSocket frame parser. #[derive(Debug)] @@ -92,7 +92,7 @@ impl Parser { src: &mut BytesMut, server: bool, max_size: usize, - ) -> Result)>, ProtocolError> { + ) -> Result)>, ProtocolError> { // try to parse ws frame metadata let (idx, finished, opcode, length, mask) = match Parser::parse_metadata(src, server, max_size)? { @@ -113,8 +113,6 @@ impl Parser { return Ok(Some((finished, opcode, None))); } - let mut data = src.split_to(length); - // control frames must have length <= 125 match opcode { OpCode::Ping | OpCode::Pong if length > 125 => { @@ -129,10 +127,14 @@ impl Parser { // unmask if let Some(mask) = mask { - apply_mask(&mut data, mask); + apply_mask(&mut src[..length], mask); } - Ok(Some((finished, opcode, Some(data)))) + Ok(Some(( + finished, + opcode, + Some(src.split_to(length).freeze()), + ))) } /// Parse the payload of a close frame. @@ -225,21 +227,19 @@ mod tests { payload: Bytes, } - fn is_none( - frm: &Result)>, ProtocolError>, - ) -> bool { + fn is_none(frm: &Result)>, ProtocolError>) -> bool { match *frm { Ok(None) => true, _ => false, } } - fn extract(frm: Result)>, ProtocolError>) -> F { + fn extract(frm: Result)>, ProtocolError>) -> F { match frm { Ok(Some((finished, opcode, payload))) => F { finished, opcode, - payload: payload.map(|b| b.freeze()).unwrap_or_else(Bytes::new), + payload: payload.unwrap_or_else(Bytes::new), }, _ => unreachable!("error"), } diff --git a/ntex/src/ws/transport.rs b/ntex/src/ws/transport.rs index 0fb67671..53402949 100644 --- a/ntex/src/ws/transport.rs +++ b/ntex/src/ws/transport.rs @@ -3,7 +3,7 @@ use std::{any, cell::Cell, cmp, io, task::Context, task::Poll}; use crate::codec::{Decoder, Encoder}; use crate::io::{Base, Filter, FilterFactory, Io, IoRef, ReadStatus, WriteStatus}; -use crate::util::{BufMut, BytesMut, PoolRef, Ready}; +use crate::util::{BufMut, BytesVec, PoolRef, Ready}; use super::{CloseCode, CloseReason, Codec, Frame, Item, Message}; @@ -21,7 +21,7 @@ pub struct WsTransport { pool: PoolRef, codec: Codec, flags: Cell, - read_buf: Cell>, + read_buf: Cell>, } impl WsTransport { @@ -67,7 +67,7 @@ impl Filter for WsTransport { } else { CloseCode::Normal }; - let _ = self.codec.encode( + let _ = self.codec.encode_vec( Message::Close(Some(CloseReason { code, description: None, @@ -91,17 +91,17 @@ impl Filter for WsTransport { } #[inline] - fn get_read_buf(&self) -> Option { + fn get_read_buf(&self) -> Option { self.read_buf.take() } #[inline] - fn get_write_buf(&self) -> Option { + fn get_write_buf(&self) -> Option { None } #[inline] - fn release_read_buf(&self, buf: BytesMut) { + fn release_read_buf(&self, buf: BytesVec) { self.read_buf.set(Some(buf)); } @@ -136,11 +136,12 @@ impl Filter for WsTransport { dst.reserve(hw - remaining); } - let frame = if let Some(frame) = self.codec.decode(&mut src).map_err(|e| { - log::trace!("Failed to decode ws codec frames: {:?}", e); - self.insert_flags(Flags::PROTO_ERR); - io::Error::new(io::ErrorKind::Other, e) - })? { + let frame = if let Some(frame) = + self.codec.decode_vec(&mut src).map_err(|e| { + log::trace!("Failed to decode ws codec frames: {:?}", e); + self.insert_flags(Flags::PROTO_ERR); + io::Error::new(io::ErrorKind::Other, e) + })? { frame } else { break; @@ -182,7 +183,7 @@ impl Filter for WsTransport { .inner .get_write_buf() .unwrap_or_else(|| self.pool.get_write_buf()); - let _ = self.codec.encode(Message::Pong(msg), &mut b); + let _ = self.codec.encode_vec(Message::Pong(msg), &mut b); self.release_write_buf(b)?; } Frame::Pong(_) => (), @@ -205,7 +206,7 @@ impl Filter for WsTransport { Ok((dlen, nbytes)) } - fn release_write_buf(&self, src: BytesMut) -> Result<(), io::Error> { + fn release_write_buf(&self, src: BytesVec) -> Result<(), io::Error> { let mut buf = if let Some(buf) = self.inner.get_write_buf() { buf } else { @@ -220,7 +221,9 @@ impl Filter for WsTransport { } // Encoder ws::Codec do not fail - let _ = self.codec.encode(Message::Binary(src.freeze()), &mut buf); + let _ = self + .codec + .encode_vec(Message::Binary(src.freeze()), &mut buf); self.inner.release_write_buf(buf) } } diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index bf45533f..c4c3ef13 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -1,12 +1,11 @@ -use std::io; -use std::sync::Arc; +use std::{io, sync::Arc}; use ntex::codec::BytesCodec; use ntex::connect::Connect; use ntex::io::{types::PeerAddr, Io}; use ntex::server::test_server; use ntex::service::{fn_service, pipeline_factory, Service, ServiceFactory}; -use ntex::util::Bytes; +use ntex::{time, util::Bytes}; #[cfg(feature = "openssl")] fn ssl_acceptor() -> tls_openssl::ssl::SslAcceptor { @@ -90,6 +89,7 @@ async fn test_openssl_string() { io.send(Bytes::from_static(b"test"), &BytesCodec) .await .unwrap(); + time::sleep(time::Millis(100)).await; Ok::<_, Box>(()) })) }); @@ -144,7 +144,6 @@ async fn test_openssl_read_before_error() { #[cfg(feature = "rustls")] #[ntex::test] -#[ignore] async fn test_rustls_string() { use ntex::server::rustls; use ntex_tls::rustls::PeerCert;