Use BytesVec for io

This commit is contained in:
Nikolay Kim 2022-01-30 16:17:34 +06:00
parent a2a5899bbe
commit 868c0324f7
38 changed files with 400 additions and 455 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.1] - 2022-01-30
* Update to ntex-io 0.1.7
## [0.1.0] - 2022-01-03
* Initial release

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-async-std"
version = "0.1.0"
version = "0.1.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,9 +16,9 @@ name = "ntex_async_std"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.8"
ntex-io = "0.1.0"
ntex-util = "0.1.6"
ntex-bytes = "0.1.11"
ntex-io = "0.1.7"
ntex-util = "0.1.13"
async-oneshot = "0.5.0"
log = "0.4"
pin-project-lite = "0.2"

View file

@ -1,7 +1,7 @@
use std::{any, future::Future, io, pin::Pin, task::Context, task::Poll};
use async_std::io::{Read, Write};
use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
@ -356,7 +356,7 @@ pub(super) fn flush_io<T: Read + Write + Unpin>(
pub fn poll_read_buf<T: Read>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesMut,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));

View file

@ -508,15 +508,7 @@ impl Bytes {
/// Creates `Bytes` instance from slice, by copying it.
pub fn copy_from_slice(data: &[u8]) -> Self {
if data.len() <= INLINE_CAP {
Bytes {
inner: Inner::from_slice_inline(data),
}
} else {
Bytes {
inner: BytesMut::copy_from_slice_in(data, PoolId::DEFAULT.pool_ref()).inner,
}
}
Self::copy_from_slice_in(data, PoolId::DEFAULT)
}
/// Creates `Bytes` instance from slice, by copying it.
@ -530,7 +522,7 @@ impl Bytes {
}
} else {
Bytes {
inner: BytesMut::copy_from_slice_in(data, pool).inner,
inner: Inner::from_slice(data.len(), data, pool.into()),
}
}
}
@ -578,19 +570,18 @@ impl Bytes {
assert!(end <= len);
if end - begin <= INLINE_CAP {
return Bytes {
Bytes {
inner: Inner::from_slice_inline(&self[begin..end]),
};
}
} else {
let mut ret = self.clone();
unsafe {
ret.inner.set_end(end);
ret.inner.set_start(begin);
}
ret
}
let mut ret = self.clone();
unsafe {
ret.inner.set_end(end);
ret.inner.set_start(begin);
}
ret
}
/// Returns a slice of self that is equivalent to the given `subset`.
@ -664,11 +655,11 @@ impl Bytes {
}
if at == 0 {
return mem::replace(self, Bytes::new());
}
Bytes {
inner: self.inner.split_off(at, true),
mem::replace(self, Bytes::new())
} else {
Bytes {
inner: self.inner.split_off(at, true),
}
}
}
@ -703,11 +694,11 @@ impl Bytes {
}
if at == 0 {
return Bytes::new();
}
Bytes {
inner: self.inner.split_to(at, true),
Bytes::new()
} else {
Bytes {
inner: self.inner.split_to(at, true),
}
}
}
@ -764,7 +755,7 @@ impl Bytes {
}
} else {
Bytes {
inner: BytesMut::copy_from_slice_in(self, self.inner.pool()).inner,
inner: Inner::from_slice(self.len(), self, self.inner.pool()),
}
};
}
@ -1151,13 +1142,6 @@ impl BytesMut {
}
}
#[inline]
pub(crate) fn with_capacity_in_priv(capacity: usize, pool: PoolRef) -> BytesMut {
BytesMut {
inner: Inner::with_capacity(capacity, pool),
}
}
/// Creates a new `BytesMut` from slice, by copying it.
pub fn copy_from_slice_in<T>(src: &[u8], pool: T) -> Self
where
@ -1949,13 +1933,6 @@ impl BytesVec {
}
}
#[inline]
pub(crate) fn with_capacity_in_priv(capacity: usize, pool: PoolRef) -> BytesVec {
BytesVec {
inner: InnerVec::with_capacity(capacity, pool),
}
}
/// Creates a new `BytesVec` from slice, by copying it.
pub fn copy_from_slice(src: &[u8]) -> Self {
Self::copy_from_slice_in(src, PoolId::DEFAULT)
@ -2096,8 +2073,7 @@ impl BytesVec {
///
/// assert_eq!(other, b"hello world"[..]);
/// ```
pub fn split(&mut self) -> Bytes {
// let len = self.len();
pub fn split(&mut self) -> BytesMut {
self.split_to(self.len())
}
@ -2126,10 +2102,10 @@ impl BytesVec {
/// # Panics
///
/// Panics if `at > len`.
pub fn split_to(&mut self, at: usize) -> Bytes {
pub fn split_to(&mut self, at: usize) -> BytesMut {
assert!(at <= self.len());
Bytes {
BytesMut {
inner: self.inner.split_to(at, false),
}
}
@ -2779,10 +2755,10 @@ impl InnerVec {
}
fn split_to(&mut self, at: usize, create_inline: bool) -> Inner {
let other = unsafe {
unsafe {
let ptr = self.as_ptr();
if create_inline && at <= INLINE_CAP {
let other = if create_inline && at <= INLINE_CAP {
Inner::from_ptr_inline(ptr, at)
} else {
let inner = self.as_inner();
@ -2799,13 +2775,11 @@ impl InnerVec {
(self.0.as_ptr() as usize ^ KIND_VEC) as *mut Shared,
),
}
}
};
unsafe {
};
self.set_start(at as u32);
}
other
other
}
}
fn truncate(&mut self, len: usize) {
@ -2862,11 +2836,13 @@ impl InnerVec {
// try to reclaim the buffer. This is possible if the current
// handle is the only outstanding handle pointing to the buffer.
if inner.is_unique() && vec_cap >= new_cap {
let offset = inner.offset;
inner.offset = SHARED_VEC_SIZE as u32;
// The capacity is sufficient, reclaim the buffer
let ptr = (self.0.as_ptr() as *mut u8).add(SHARED_VEC_SIZE);
ptr::copy(self.as_ptr(), ptr, len);
let src = (self.0.as_ptr() as *mut u8).add(offset as usize);
let dst = (self.0.as_ptr() as *mut u8).add(SHARED_VEC_SIZE);
ptr::copy(src, dst, len);
} else {
// Create a new vector storage
let pool = inner.pool;

View file

@ -50,9 +50,9 @@ struct MemoryPool {
// io read/write cache and params
read_wm: Cell<BufParams>,
read_cache: RefCell<Vec<BytesMut>>,
read_cache: RefCell<Vec<BytesVec>>,
write_wm: Cell<BufParams>,
write_cache: RefCell<Vec<BytesMut>>,
write_cache: RefCell<Vec<BytesVec>>,
spawn: RefCell<Option<Rc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)>>>,
}
@ -198,13 +198,13 @@ impl PoolRef {
#[inline]
/// Creates a new `BytesMut` with the specified capacity.
pub fn buf_with_capacity(self, cap: usize) -> BytesMut {
BytesMut::with_capacity_in_priv(cap, self)
BytesMut::with_capacity_in(cap, self)
}
#[inline]
/// Creates a new `BytesVec` with the specified capacity.
pub fn vec_with_capacity(self, cap: usize) -> BytesVec {
BytesVec::with_capacity_in_priv(cap, self)
BytesVec::with_capacity_in(cap, self)
}
#[doc(hidden)]
@ -285,18 +285,18 @@ impl PoolRef {
#[doc(hidden)]
#[inline]
pub fn get_read_buf(self) -> BytesMut {
pub fn get_read_buf(self) -> BytesVec {
if let Some(buf) = self.0.read_cache.borrow_mut().pop() {
buf
} else {
BytesMut::with_capacity_in_priv(self.0.read_wm.get().high as usize, self)
BytesVec::with_capacity_in(self.0.read_wm.get().high as usize, self)
}
}
#[doc(hidden)]
#[inline]
/// Release read buffer, buf must be allocated from this pool
pub fn release_read_buf(self, mut buf: BytesMut) {
pub fn release_read_buf(self, mut buf: BytesVec) {
let cap = buf.capacity();
let (hw, lw) = self.0.read_wm.get().unpack();
if cap > lw && cap <= hw {
@ -310,18 +310,18 @@ impl PoolRef {
#[doc(hidden)]
#[inline]
pub fn get_write_buf(self) -> BytesMut {
pub fn get_write_buf(self) -> BytesVec {
if let Some(buf) = self.0.write_cache.borrow_mut().pop() {
buf
} else {
BytesMut::with_capacity_in_priv(self.0.write_wm.get().high as usize, self)
BytesVec::with_capacity_in(self.0.write_wm.get().high as usize, self)
}
}
#[doc(hidden)]
#[inline]
/// Release write buffer, buf must be allocated from this pool
pub fn release_write_buf(self, mut buf: BytesMut) {
pub fn release_write_buf(self, mut buf: BytesVec) {
let cap = buf.capacity();
let (hw, lw) = self.0.write_wm.get().unpack();
if cap > lw && cap <= hw {

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-codec"
version = "0.6.1"
version = "0.6.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -16,4 +16,4 @@ name = "ntex_codec"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.9"
ntex-bytes = "0.1.11"

View file

@ -3,7 +3,7 @@
use std::{io, rc::Rc};
use ntex_bytes::{Bytes, BytesMut};
use ntex_bytes::{Bytes, BytesMut, BytesVec};
/// Trait of helper objects to write out messages as bytes.
pub trait Encoder {
@ -15,6 +15,11 @@ pub trait Encoder {
/// Encodes a frame into the buffer provided.
fn encode(&self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
/// Encodes a frame into the buffer provided.
fn encode_vec(&self, item: Self::Item, dst: &mut BytesVec) -> Result<(), Self::Error> {
dst.with_bytes_mut(|dst| self.encode(item, dst))
}
}
/// Decoding of frames via buffers.
@ -31,6 +36,11 @@ pub trait Decoder {
/// Attempts to decode a frame from the provided buffer of bytes.
fn decode(&self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
/// Attempts to decode a frame from the provided buffer of bytes.
fn decode_vec(&self, src: &mut BytesVec) -> Result<Option<Self::Item>, Self::Error> {
src.with_bytes_mut(|src| self.decode(src))
}
}
impl<T> Encoder for Rc<T>

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.1] - 2022-01-30
* Update to ntex-io 0.1.7
## [0.1.0] - 2022-01-17
* Initial release

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-glommio"
version = "0.1.0"
version = "0.1.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,9 +16,9 @@ name = "ntex_glommio"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.9"
ntex-io = "0.1.4"
ntex-util = "0.1.9"
ntex-bytes = "0.1.11"
ntex-io = "0.1.7"
ntex-util = "0.1.13"
async-oneshot = "0.5.0"
futures-lite = "1.12"
futures-channel = "0.3"

View file

@ -4,7 +4,7 @@ use std::{any, future::Future, io, pin::Pin};
use futures_lite::future::FutureExt;
use futures_lite::io::{AsyncRead, AsyncWrite};
use glommio::Task;
use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
@ -371,7 +371,7 @@ pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
pub fn poll_read_buf<T: AsyncRead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesMut,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.7] - 2022-01-xx
* Add BytesVec type
## [0.1.6] - 2022-01-27
* Optimize Io memory layout

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.1.6"
version = "0.1.7"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@ -16,9 +16,9 @@ name = "ntex_io"
path = "src/lib.rs"
[dependencies]
ntex-codec = "0.6.1"
ntex-bytes = "0.1.9"
ntex-util = "0.1.12"
ntex-codec = "0.6.2"
ntex-bytes = "0.1.11"
ntex-util = "0.1.13"
ntex-service = "0.3.1"
bitflags = "1.3"

View file

@ -1,6 +1,6 @@
use std::{any, io, task::Context, task::Poll};
use ntex_bytes::BytesMut;
use ntex_bytes::BytesVec;
use super::io::Flags;
use super::{Filter, IoRef, ReadStatus, WriteStatus};
@ -68,17 +68,17 @@ impl Filter for Base {
}
#[inline]
fn get_read_buf(&self) -> Option<BytesMut> {
fn get_read_buf(&self) -> Option<BytesVec> {
self.0 .0.read_buf.take()
}
#[inline]
fn get_write_buf(&self) -> Option<BytesMut> {
fn get_write_buf(&self) -> Option<BytesVec> {
self.0 .0.write_buf.take()
}
#[inline]
fn release_read_buf(&self, buf: BytesMut) {
fn release_read_buf(&self, buf: BytesVec) {
self.0 .0.read_buf.set(Some(buf));
}
@ -91,7 +91,7 @@ impl Filter for Base {
}
#[inline]
fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, buf: BytesVec) -> Result<(), io::Error> {
let pool = self.0.memory_pool();
if buf.is_empty() {
pool.release_write_buf(buf);
@ -133,21 +133,21 @@ impl Filter for NullFilter {
Poll::Ready(WriteStatus::Terminate)
}
fn get_read_buf(&self) -> Option<BytesMut> {
fn get_read_buf(&self) -> Option<BytesVec> {
None
}
fn get_write_buf(&self) -> Option<BytesMut> {
fn get_write_buf(&self) -> Option<BytesVec> {
None
}
fn release_read_buf(&self, _: BytesMut) {}
fn release_read_buf(&self, _: BytesVec) {}
fn process_read_buf(&self, _: &IoRef, _: usize) -> io::Result<(usize, usize)> {
Ok((0, 0))
}
fn release_write_buf(&self, _: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, _: BytesVec) -> Result<(), io::Error> {
Ok(())
}
}

View file

@ -4,7 +4,7 @@ use std::{
fmt, future::Future, hash, io, marker, mem, ops::Deref, pin::Pin, ptr, rc::Rc, time,
};
use ntex_bytes::{BytesMut, PoolId, PoolRef};
use ntex_bytes::{BytesVec, PoolId, PoolRef};
use ntex_codec::{Decoder, Encoder};
use ntex_util::{
future::poll_fn, future::Either, task::LocalWaker, time::now, time::Millis,
@ -62,8 +62,8 @@ pub(crate) struct IoState {
pub(super) read_task: LocalWaker,
pub(super) write_task: LocalWaker,
pub(super) dispatch_task: LocalWaker,
pub(super) read_buf: Cell<Option<BytesMut>>,
pub(super) write_buf: Cell<Option<BytesMut>>,
pub(super) read_buf: Cell<Option<BytesVec>>,
pub(super) write_buf: Cell<Option<BytesVec>>,
pub(super) filter: Cell<&'static dyn Filter>,
pub(super) handle: Cell<Option<Box<dyn Handle>>>,
#[allow(clippy::box_collection)]
@ -134,9 +134,6 @@ impl IoState {
{
log::trace!("initiate io shutdown {:?}", self.flags.get());
self.insert_flags(Flags::IO_STOPPING_FILTERS);
self.read_task.wake();
self.write_task.wake();
self.dispatch_task.wake();
self.shutdown_filters();
}
}
@ -178,7 +175,7 @@ impl IoState {
#[inline]
pub(super) fn with_read_buf<Fn, Ret>(&self, release: bool, f: Fn) -> Ret
where
Fn: FnOnce(&mut Option<BytesMut>) -> Ret,
Fn: FnOnce(&mut Option<BytesVec>) -> Ret,
{
let filter = self.filter.get();
let mut buf = filter.get_read_buf();
@ -200,7 +197,7 @@ impl IoState {
#[inline]
pub(super) fn with_write_buf<Fn, Ret>(&self, f: Fn) -> Ret
where
Fn: FnOnce(&mut Option<BytesMut>) -> Ret,
Fn: FnOnce(&mut Option<BytesVec>) -> Ret,
{
let buf = self.write_buf.as_ptr();
let ref_buf = unsafe { buf.as_mut().unwrap() };
@ -284,11 +281,11 @@ impl<F> Io<F> {
/// Set memory pool
pub fn set_memory_pool(&self, pool: PoolRef) {
if let Some(mut buf) = self.0 .0.read_buf.take() {
pool.move_in(&mut buf);
pool.move_vec_in(&mut buf);
self.0 .0.read_buf.set(Some(buf));
}
if let Some(mut buf) = self.0 .0.write_buf.take() {
pool.move_in(&mut buf);
pool.move_vec_in(&mut buf);
self.0 .0.write_buf.set(Some(buf));
}
self.0 .0.pool.set(pool);

View file

@ -1,6 +1,6 @@
use std::{any, fmt, hash, io};
use ntex_bytes::{BufMut, BytesMut, PoolRef};
use ntex_bytes::{BufMut, BytesVec, PoolRef};
use ntex_codec::{Decoder, Encoder};
use super::io::{Flags, IoRef, OnDisconnect};
@ -106,7 +106,7 @@ impl IoRef {
}
// encode item and wake write task
codec.encode(item, buf)
codec.encode_vec(item, buf)
})
.map_or_else(
|err| {
@ -130,7 +130,9 @@ impl IoRef {
U: Decoder,
{
self.0.with_read_buf(false, |buf| {
buf.as_mut().map(|b| codec.decode(b)).unwrap_or(Ok(None))
buf.as_mut()
.map(|b| codec.decode_vec(b))
.unwrap_or(Ok(None))
})
}
@ -152,7 +154,7 @@ impl IoRef {
/// Get mut access to write buffer
pub fn with_write_buf<F, R>(&self, f: F) -> Result<R, io::Error>
where
F: FnOnce(&mut BytesMut) -> R,
F: FnOnce(&mut BytesVec) -> R,
{
let filter = self.0.filter.get();
let mut buf = filter
@ -172,7 +174,7 @@ impl IoRef {
/// Get mut access to read buffer
pub fn with_read_buf<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
F: FnOnce(&mut BytesVec) -> R,
{
self.0.with_read_buf(true, |buf| {
// set buf
@ -302,7 +304,7 @@ mod tests {
assert_eq!(io.read_ready().await.unwrap(), Some(()));
assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
let item = io.with_read_buf(|buffer| buffer.clone());
let item = io.with_read_buf(|buffer| buffer.split());
assert_eq!(item, Bytes::from_static(BIN));
client.write(TEXT);
@ -367,11 +369,11 @@ mod tests {
self.inner.poll_read_ready(cx)
}
fn get_read_buf(&self) -> Option<BytesMut> {
fn get_read_buf(&self) -> Option<BytesVec> {
self.inner.get_read_buf()
}
fn release_read_buf(&self, buf: BytesMut) {
fn release_read_buf(&self, buf: BytesVec) {
self.inner.release_read_buf(buf)
}
@ -386,7 +388,7 @@ mod tests {
self.inner.poll_write_ready(cx)
}
fn get_write_buf(&self) -> Option<BytesMut> {
fn get_write_buf(&self) -> Option<BytesVec> {
if let Some(buf) = self.inner.get_write_buf() {
self.out_bytes.set(self.out_bytes.get() - buf.len());
Some(buf)
@ -395,7 +397,7 @@ mod tests {
}
}
fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, buf: BytesVec) -> Result<(), io::Error> {
self.write_order.borrow_mut().push(self.idx);
self.out_bytes.set(self.out_bytes.get() + buf.len());
self.inner.release_write_buf(buf)

View file

@ -17,7 +17,7 @@ mod tasks;
mod timer;
mod utils;
use ntex_bytes::BytesMut;
use ntex_bytes::BytesVec;
use ntex_codec::{Decoder, Encoder};
use ntex_util::time::Millis;
@ -52,18 +52,18 @@ pub enum WriteStatus {
pub trait Filter: 'static {
fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
fn get_read_buf(&self) -> Option<BytesMut>;
fn get_read_buf(&self) -> Option<BytesVec>;
fn release_read_buf(&self, buf: BytesMut);
fn release_read_buf(&self, buf: BytesVec);
/// Process read buffer
///
/// Returns tuple (total bytes, new bytes)
fn process_read_buf(&self, io: &IoRef, n: usize) -> sio::Result<(usize, usize)>;
fn get_write_buf(&self) -> Option<BytesMut>;
fn get_write_buf(&self) -> Option<BytesVec>;
fn release_write_buf(&self, buf: BytesMut) -> sio::Result<()>;
fn release_write_buf(&self, buf: BytesVec) -> sio::Result<()>;
/// Check readiness for read operations
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus>;

View file

@ -1,6 +1,6 @@
use std::{io, task::Context, task::Poll};
use ntex_bytes::{BytesMut, PoolRef};
use ntex_bytes::{BytesVec, PoolRef};
use super::{io::Flags, IoRef, ReadStatus, WriteStatus};
@ -26,7 +26,7 @@ impl ReadContext {
#[inline]
/// Get read buffer
pub fn get_read_buf(&self) -> BytesMut {
pub fn get_read_buf(&self) -> BytesVec {
self.0
.0
.read_buf
@ -36,7 +36,7 @@ impl ReadContext {
#[inline]
/// Release read buffer after io read operations
pub fn release_read_buf(&self, buf: BytesMut, nbytes: usize) {
pub fn release_read_buf(&self, buf: BytesVec, nbytes: usize) {
if buf.is_empty() {
self.0.memory_pool().release_read_buf(buf);
} else {
@ -99,13 +99,13 @@ impl WriteContext {
#[inline]
/// Get write buffer
pub fn get_write_buf(&self) -> Option<BytesMut> {
pub fn get_write_buf(&self) -> Option<BytesVec> {
self.0 .0.write_buf.take()
}
#[inline]
/// Release write buffer after io write operations
pub fn release_write_buf(&self, buf: BytesMut) -> Result<(), io::Error> {
pub fn release_write_buf(&self, buf: BytesVec) -> Result<(), io::Error> {
let pool = self.0.memory_pool();
let mut flags = self.0.flags();

View file

@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::{any, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_bytes::{Buf, BufMut, Bytes, BytesVec};
use ntex_util::future::poll_fn;
use ntex_util::time::{sleep, Millis, Sleep};
@ -60,7 +60,7 @@ struct State {
#[derive(Default, Debug)]
struct Channel {
buf: BytesMut,
buf: BytesVec,
buf_cap: usize,
flags: IoTestFlags,
waker: AtomicWaker,
@ -159,7 +159,7 @@ impl IoTest {
/// Access read buffer.
pub fn local_buffer<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
F: FnOnce(&mut BytesVec) -> R,
{
let guard = self.local.lock().unwrap();
let mut ch = guard.borrow_mut();
@ -169,7 +169,7 @@ impl IoTest {
/// Access remote buffer.
pub fn remote_buffer<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
F: FnOnce(&mut BytesVec) -> R,
{
let guard = self.remote.lock().unwrap();
let mut ch = guard.borrow_mut();
@ -205,12 +205,12 @@ impl IoTest {
}
/// Read any available data
pub fn read_any(&self) -> BytesMut {
self.local.lock().unwrap().borrow_mut().buf.split()
pub fn read_any(&self) -> Bytes {
self.local.lock().unwrap().borrow_mut().buf.split().freeze()
}
/// Read data, if data is not available wait for it
pub async fn read(&self) -> Result<BytesMut, io::Error> {
pub async fn read(&self) -> Result<Bytes, io::Error> {
if self.local.lock().unwrap().borrow().buf.is_empty() {
poll_fn(|cx| {
let guard = self.local.lock().unwrap();
@ -237,13 +237,13 @@ impl IoTest {
})
.await;
}
Ok(self.local.lock().unwrap().borrow_mut().buf.split())
Ok(self.local.lock().unwrap().borrow_mut().buf.split().freeze())
}
pub fn poll_read_buf(
&self,
cx: &mut Context<'_>,
buf: &mut BytesMut,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
let guard = self.local.lock().unwrap();
let mut ch = guard.borrow_mut();
@ -551,7 +551,7 @@ impl Future for WriteTask {
// read until 0 or err
let io = &this.io;
loop {
let mut buf = BytesMut::new();
let mut buf = BytesVec::new();
match io.poll_read_buf(cx, &mut buf) {
Poll::Ready(Err(e)) => {
this.state.close(Some(e));

View file

@ -5,7 +5,7 @@ use std::{
any, cmp, error::Error, future::Future, io, pin::Pin, task::Context, task::Poll,
};
use ntex_bytes::{BufMut, BytesMut, PoolRef};
use ntex_bytes::{BufMut, BytesVec, PoolRef};
use ntex_io::{Base, Filter, FilterFactory, Io, IoRef, ReadStatus, WriteStatus};
use ntex_util::{future::poll_fn, ready, time, time::Millis};
use tls_openssl::ssl::{self, SslStream};
@ -29,13 +29,13 @@ pub struct SslFilter<F = Base> {
inner: RefCell<SslStream<IoInner<F>>>,
pool: PoolRef,
handshake: Cell<bool>,
read_buf: Cell<Option<BytesMut>>,
read_buf: Cell<Option<BytesVec>>,
}
struct IoInner<F> {
inner: F,
pool: PoolRef,
write_buf: Option<BytesMut>,
write_buf: Option<BytesVec>,
}
impl<F: Filter> io::Read for IoInner<F> {
@ -61,7 +61,7 @@ impl<F: Filter> io::Write for IoInner<F> {
buf.reserve(src.len());
buf
} else {
BytesMut::with_capacity_in(src.len(), self.pool)
BytesVec::with_capacity_in(src.len(), self.pool)
};
buf.extend_from_slice(src);
self.inner.release_write_buf(buf)?;
@ -143,17 +143,17 @@ impl<F: Filter> Filter for SslFilter<F> {
}
#[inline]
fn get_read_buf(&self) -> Option<BytesMut> {
fn get_read_buf(&self) -> Option<BytesVec> {
self.read_buf.take()
}
#[inline]
fn get_write_buf(&self) -> Option<BytesMut> {
fn get_write_buf(&self) -> Option<BytesVec> {
self.inner.borrow_mut().get_mut().write_buf.take()
}
#[inline]
fn release_read_buf(&self, buf: BytesMut) {
fn release_read_buf(&self, buf: BytesVec) {
self.read_buf.set(Some(buf));
}
@ -218,7 +218,7 @@ impl<F: Filter> Filter for SslFilter<F> {
}
}
fn release_write_buf(&self, mut buf: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, mut buf: BytesVec) -> Result<(), io::Error> {
loop {
if buf.is_empty() {
return Ok(());

View file

@ -1,8 +1,8 @@
//! An implementation of SSL streams for ntex backed by OpenSSL
use std::io::{self, Read as IoRead, Write as IoWrite};
use std::{any, cell::RefCell, sync::Arc, task::Context, task::Poll};
use std::{any, cell::Cell, cell::RefCell, sync::Arc, task::Context, task::Poll};
use ntex_bytes::{BufMut, BytesMut};
use ntex_bytes::{BufMut, BytesVec};
use ntex_io::{Filter, Io, IoRef, ReadStatus, WriteStatus};
use ntex_util::{future::poll_fn, ready};
use tls_rust::{ClientConfig, ClientConnection, ServerName};
@ -14,7 +14,7 @@ use super::{PeerCert, PeerCertChain};
/// An implementation of SSL streams
pub struct TlsClientFilter<F> {
inner: RefCell<IoInner<F>>,
inner: IoInner<F>,
session: RefCell<ClientConnection>,
}
@ -53,107 +53,101 @@ impl<F: Filter> Filter for TlsClientFilter<F> {
None
}
} else {
self.inner.borrow().filter.query(id)
self.inner.filter.query(id)
}
}
#[inline]
fn poll_shutdown(&self) -> Poll<io::Result<()>> {
self.inner.borrow().filter.poll_shutdown()
self.inner.filter.poll_shutdown()
}
#[inline]
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
self.inner.borrow().filter.poll_read_ready(cx)
self.inner.filter.poll_read_ready(cx)
}
#[inline]
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
self.inner.borrow().filter.poll_write_ready(cx)
self.inner.filter.poll_write_ready(cx)
}
#[inline]
fn get_read_buf(&self) -> Option<BytesMut> {
self.inner.borrow_mut().read_buf.take()
fn get_read_buf(&self) -> Option<BytesVec> {
self.inner.read_buf.take()
}
#[inline]
fn get_write_buf(&self) -> Option<BytesMut> {
self.inner.borrow_mut().write_buf.take()
fn get_write_buf(&self) -> Option<BytesVec> {
self.inner.write_buf.take()
}
#[inline]
fn release_read_buf(&self, buf: BytesMut) {
self.inner.borrow_mut().read_buf = Some(buf);
fn release_read_buf(&self, buf: BytesVec) {
self.inner.read_buf.set(Some(buf));
}
fn process_read_buf(&self, io: &IoRef, nbytes: usize) -> io::Result<(usize, usize)> {
let mut inner = self.inner.borrow_mut();
let mut session = self.session.borrow_mut();
// ask inner filter to process read buf
match inner.filter.process_read_buf(io, nbytes) {
match self.inner.filter.process_read_buf(io, nbytes) {
Err(err) => io.want_shutdown(Some(err)),
Ok((_, 0)) => return Ok((0, 0)),
Ok(_) => (),
}
if session.is_handshaking() {
Ok((0, 1))
// get processed buffer
let mut dst = if let Some(dst) = self.inner.read_buf.take() {
dst
} else {
// get processed buffer
let mut dst = if let Some(dst) = inner.read_buf.take() {
dst
} else {
inner.pool.get_read_buf()
};
let (hw, lw) = inner.pool.read_params().unpack();
self.inner.pool.get_read_buf()
};
let (hw, lw) = self.inner.pool.read_params().unpack();
let mut src = if let Some(src) = inner.filter.get_read_buf() {
src
} else {
return Ok((0, 0));
};
let mut src = if let Some(src) = self.inner.filter.get_read_buf() {
src
} else {
return Ok((0, 0));
};
let mut new_bytes = 0;
loop {
// make sure we've got room
let remaining = dst.remaining_mut();
if remaining < lw {
dst.reserve(hw - remaining);
}
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
}
let mut new_bytes = if self.inner.handshake.get() { 1 } else { 0 };
loop {
// make sure we've got room
let remaining = dst.remaining_mut();
if remaining < lw {
dst.reserve(hw - remaining);
}
let dst_len = dst.len();
inner.read_buf = Some(dst);
inner.filter.release_read_buf(src);
Ok((dst_len, new_bytes))
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
}
}
let dst_len = dst.len();
self.inner.read_buf.set(Some(dst));
self.inner.filter.release_read_buf(src);
Ok((dst_len, new_bytes))
}
fn release_write_buf(&self, mut src: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, mut src: BytesVec) -> Result<(), io::Error> {
let mut session = self.session.borrow_mut();
let mut inner = self.inner.borrow_mut();
let mut io = Wrapper(&mut *inner);
let mut io = Wrapper(&self.inner);
loop {
if !src.is_empty() {
@ -168,7 +162,7 @@ impl<F: Filter> Filter for TlsClientFilter<F> {
}
if !src.is_empty() {
self.inner.borrow_mut().write_buf = Some(src);
self.inner.write_buf.set(Some(src));
}
Ok(())
@ -190,12 +184,13 @@ impl<F: Filter> TlsClientFilter<F> {
let inner = IoInner {
pool,
filter,
read_buf: None,
write_buf: None,
read_buf: Cell::new(None),
write_buf: Cell::new(None),
handshake: Cell::new(true),
};
Ok::<_, io::Error>(TlsFilter::new_client(TlsClientFilter {
inner: RefCell::new(inner),
inner,
session: RefCell::new(session),
}))
})?;
@ -204,34 +199,14 @@ impl<F: Filter> TlsClientFilter<F> {
loop {
let (result, wants_read) = {
let mut session = filter.client().session.borrow_mut();
let mut inner = filter.client().inner.borrow_mut();
let mut wrp = Wrapper(&mut *inner);
let result = session.complete_io(&mut wrp);
let wants_read = session.wants_read();
if session.wants_write() {
loop {
let n = session.write_tls(&mut wrp)?;
if n == 0 {
break;
}
}
}
(result, wants_read)
let mut wrp = Wrapper(&filter.client().inner);
(session.complete_io(&mut wrp), session.wants_read())
};
if result.is_ok() && wants_read {
poll_fn(|cx| match ready!(io.poll_read_ready(cx)) {
Ok(None) => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"disconnected",
))),
Err(e) => Poll::Ready(Err(e)),
_ => Poll::Ready(Ok(())),
})
.await?;
}
match result {
Ok(_) => return Ok(io),
Ok(_) => {
filter.client().inner.handshake.set(false);
return Ok(io);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
poll_fn(|cx| {
let read_ready = if wants_read {

View file

@ -1,9 +1,9 @@
#![allow(clippy::type_complexity)]
//! An implementation of SSL streams for ntex backed by OpenSSL
use std::sync::Arc;
use std::{any, cmp, future::Future, io, pin::Pin, task::Context, task::Poll};
use std::{cell::Cell, sync::Arc};
use ntex_bytes::{BytesMut, PoolRef};
use ntex_bytes::{BytesVec, PoolRef};
use ntex_io::{Base, Filter, FilterFactory, Io, IoRef, ReadStatus, WriteStatus};
use ntex_util::time::Millis;
use tls_rust::{Certificate, ClientConfig, ServerConfig, ServerName};
@ -93,7 +93,7 @@ impl<F: Filter> Filter for TlsFilter<F> {
}
#[inline]
fn get_read_buf(&self) -> Option<BytesMut> {
fn get_read_buf(&self) -> Option<BytesVec> {
match self.inner {
InnerTlsFilter::Server(ref f) => f.get_read_buf(),
InnerTlsFilter::Client(ref f) => f.get_read_buf(),
@ -101,7 +101,7 @@ impl<F: Filter> Filter for TlsFilter<F> {
}
#[inline]
fn get_write_buf(&self) -> Option<BytesMut> {
fn get_write_buf(&self) -> Option<BytesVec> {
match self.inner {
InnerTlsFilter::Server(ref f) => f.get_write_buf(),
InnerTlsFilter::Client(ref f) => f.get_write_buf(),
@ -109,7 +109,7 @@ impl<F: Filter> Filter for TlsFilter<F> {
}
#[inline]
fn release_read_buf(&self, buf: BytesMut) {
fn release_read_buf(&self, buf: BytesVec) {
match self.inner {
InnerTlsFilter::Server(ref f) => f.release_read_buf(buf),
InnerTlsFilter::Client(ref f) => f.release_read_buf(buf),
@ -125,7 +125,7 @@ impl<F: Filter> Filter for TlsFilter<F> {
}
#[inline]
fn release_write_buf(&self, src: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, src: BytesVec) -> Result<(), io::Error> {
match self.inner {
InnerTlsFilter::Server(ref f) => f.release_write_buf(src),
InnerTlsFilter::Client(ref f) => f.release_write_buf(src),
@ -243,11 +243,12 @@ impl<F: Filter> FilterFactory<F> for TlsConnectorConfigured {
pub(crate) struct IoInner<F> {
filter: F,
pool: PoolRef,
read_buf: Option<BytesMut>,
write_buf: Option<BytesMut>,
read_buf: Cell<Option<BytesVec>>,
write_buf: Cell<Option<BytesVec>>,
handshake: Cell<bool>,
}
pub(crate) struct Wrapper<'a, F>(&'a mut IoInner<F>);
pub(crate) struct Wrapper<'a, F>(&'a IoInner<F>);
impl<'a, F: Filter> io::Read for Wrapper<'a, F> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
@ -273,7 +274,7 @@ impl<'a, F: Filter> io::Write for Wrapper<'a, F> {
buf.reserve(src.len());
buf
} else {
BytesMut::with_capacity_in(src.len(), self.0.pool)
BytesVec::with_capacity_in(src.len(), self.0.pool)
};
buf.extend_from_slice(src);
self.0.filter.release_write_buf(buf)?;

View file

@ -1,8 +1,8 @@
//! An implementation of SSL streams for ntex backed by OpenSSL
use std::io::{self, Read as IoRead, Write as IoWrite};
use std::{any, cell::RefCell, sync::Arc, task::Context, task::Poll};
use std::{any, cell::Cell, cell::RefCell, sync::Arc, task::Context, task::Poll};
use ntex_bytes::{BufMut, BytesMut};
use ntex_bytes::{BufMut, BytesVec};
use ntex_io::{Filter, Io, IoRef, ReadStatus, WriteStatus};
use ntex_util::{future::poll_fn, ready, time, time::Millis};
use tls_rust::{ServerConfig, ServerConnection};
@ -14,7 +14,7 @@ use super::{PeerCert, PeerCertChain};
/// An implementation of SSL streams
pub struct TlsServerFilter<F> {
inner: RefCell<IoInner<F>>,
inner: IoInner<F>,
session: RefCell<ServerConnection>,
}
@ -53,107 +53,101 @@ impl<F: Filter> Filter for TlsServerFilter<F> {
None
}
} else {
self.inner.borrow().filter.query(id)
self.inner.filter.query(id)
}
}
#[inline]
fn poll_shutdown(&self) -> Poll<io::Result<()>> {
self.inner.borrow().filter.poll_shutdown()
self.inner.filter.poll_shutdown()
}
#[inline]
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
self.inner.borrow().filter.poll_read_ready(cx)
self.inner.filter.poll_read_ready(cx)
}
#[inline]
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
self.inner.borrow().filter.poll_write_ready(cx)
self.inner.filter.poll_write_ready(cx)
}
#[inline]
fn get_read_buf(&self) -> Option<BytesMut> {
self.inner.borrow_mut().read_buf.take()
fn get_read_buf(&self) -> Option<BytesVec> {
self.inner.read_buf.take()
}
#[inline]
fn get_write_buf(&self) -> Option<BytesMut> {
self.inner.borrow_mut().write_buf.take()
fn get_write_buf(&self) -> Option<BytesVec> {
self.inner.write_buf.take()
}
#[inline]
fn release_read_buf(&self, buf: BytesMut) {
self.inner.borrow_mut().read_buf = Some(buf);
fn release_read_buf(&self, buf: BytesVec) {
self.inner.read_buf.set(Some(buf));
}
fn process_read_buf(&self, io: &IoRef, nbytes: usize) -> io::Result<(usize, usize)> {
let mut inner = self.inner.borrow_mut();
let mut session = self.session.borrow_mut();
// ask inner filter to process read buf
match inner.filter.process_read_buf(io, nbytes) {
match self.inner.filter.process_read_buf(io, nbytes) {
Err(err) => io.want_shutdown(Some(err)),
Ok((_, 0)) => return Ok((0, 0)),
Ok(_) => (),
}
if session.is_handshaking() {
Ok((0, 1))
// get processed buffer
let mut dst = if let Some(dst) = self.inner.read_buf.take() {
dst
} else {
// get processed buffer
let mut dst = if let Some(dst) = inner.read_buf.take() {
dst
} else {
inner.pool.get_read_buf()
};
let (hw, lw) = inner.pool.read_params().unpack();
self.inner.pool.get_read_buf()
};
let (hw, lw) = self.inner.pool.read_params().unpack();
let mut src = if let Some(src) = inner.filter.get_read_buf() {
src
} else {
return Ok((0, 0));
};
let mut src = if let Some(src) = self.inner.filter.get_read_buf() {
src
} else {
return Ok((0, 0));
};
let mut new_bytes = 0;
loop {
// make sure we've got room
let remaining = dst.remaining_mut();
if remaining < lw {
dst.reserve(hw - remaining);
}
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
}
let mut new_bytes = if self.inner.handshake.get() { 1 } else { 0 };
loop {
// make sure we've got room
let remaining = dst.remaining_mut();
if remaining < lw {
dst.reserve(hw - remaining);
}
let dst_len = dst.len();
inner.read_buf = Some(dst);
inner.filter.release_read_buf(src);
Ok((dst_len, new_bytes))
let mut cursor = io::Cursor::new(&src);
let n = session.read_tls(&mut cursor)?;
src.split_to(n);
let state = session
.process_new_packets()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let new_b = state.plaintext_bytes_to_read();
if new_b > 0 {
dst.reserve(new_b);
let chunk: &mut [u8] =
unsafe { std::mem::transmute(&mut *dst.chunk_mut()) };
let v = session.reader().read(chunk)?;
unsafe { dst.advance_mut(v) };
new_bytes += v;
} else {
break;
}
}
let dst_len = dst.len();
self.inner.read_buf.set(Some(dst));
self.inner.filter.release_read_buf(src);
Ok((dst_len, new_bytes))
}
fn release_write_buf(&self, mut src: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, mut src: BytesVec) -> Result<(), io::Error> {
let mut session = self.session.borrow_mut();
let mut inner = self.inner.borrow_mut();
let mut io = Wrapper(&mut *inner);
let mut io = Wrapper(&self.inner);
loop {
if !src.is_empty() {
@ -168,7 +162,7 @@ impl<F: Filter> Filter for TlsServerFilter<F> {
}
if !src.is_empty() {
inner.write_buf = Some(src);
self.inner.write_buf.set(Some(src));
}
Ok(())
}
@ -190,12 +184,13 @@ impl<F: Filter> TlsServerFilter<F> {
let inner = IoInner {
pool,
filter,
read_buf: None,
write_buf: None,
read_buf: Cell::new(None),
write_buf: Cell::new(None),
handshake: Cell::new(true),
};
Ok::<_, io::Error>(TlsFilter::new_server(TlsServerFilter {
inner: RefCell::new(inner),
inner,
session: RefCell::new(session),
}))
})?;
@ -204,35 +199,14 @@ impl<F: Filter> TlsServerFilter<F> {
loop {
let (result, wants_read) = {
let mut session = filter.server().session.borrow_mut();
let mut inner = filter.server().inner.borrow_mut();
let mut wrp = Wrapper(&mut *inner);
let result = session.complete_io(&mut wrp);
let wants_read = session.wants_read();
if session.wants_write() {
loop {
let n = session.write_tls(&mut wrp)?;
if n == 0 {
break;
}
}
}
(result, wants_read)
let mut wrp = Wrapper(&filter.server().inner);
(session.complete_io(&mut wrp), session.wants_read())
};
if result.is_ok() && wants_read {
poll_fn(|cx| {
match ready!(io.poll_read_ready(cx))? {
Some(_) => Ok(()),
None => {
Err(io::Error::new(io::ErrorKind::Other, "disconnected"))
}
}?;
Poll::Ready(Ok::<_, io::Error>(()))
})
.await?;
}
match result {
Ok(_) => return Ok(io),
Ok(_) => {
filter.server().inner.handshake.set(false);
return Ok(io);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
poll_fn(|cx| {
let read_ready = if wants_read {

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.3] - 2022-01-30
* Update to ntex-io 0.1.7
## [0.1.2] - 2022-01-12
* Fix potential BorrowMutError

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-tokio"
version = "0.1.2"
version = "0.1.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "tokio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -16,9 +16,9 @@ name = "ntex_tokio"
path = "src/lib.rs"
[dependencies]
ntex-bytes = "0.1.9"
ntex-io = "0.1.3"
ntex-util = "0.1.9"
ntex-bytes = "0.1.11"
ntex-io = "0.1.7"
ntex-util = "0.1.13"
log = "0.4"
pin-project-lite = "0.2"
tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] }

View file

@ -1,7 +1,7 @@
use std::task::{Context, Poll};
use std::{any, cell::RefCell, cmp, future::Future, io, mem, pin::Pin, rc::Rc, rc::Weak};
use ntex_bytes::{Buf, BufMut, BytesMut};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{
types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, ReadStatus, WriteContext,
WriteStatus,
@ -733,7 +733,7 @@ mod unixstream {
pub fn poll_read_buf<T: AsyncRead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesMut,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));

View file

@ -1,5 +1,9 @@
# Changes
## [0.5.14] - 2022-01-30
* Update ntex-io to 0.1.7
## [0.5.13] - 2022-01-28
* http: Refactor client pool support for http/2 connections

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.5.13"
version = "0.5.14"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -48,18 +48,18 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"]
async-std = ["ntex-rt/async-std", "ntex-async-std"]
[dependencies]
ntex-codec = "0.6.1"
ntex-codec = "0.6.2"
ntex-router = "0.5.1"
ntex-service = "0.3.1"
ntex-macros = "0.1.3"
ntex-util = "0.1.13"
ntex-bytes = "0.1.10"
ntex-tls = "0.1.2"
ntex-bytes = "0.1.11"
ntex-tls = "0.1.3"
ntex-rt = "0.4.3"
ntex-io = "0.1.6"
ntex-tokio = "0.1.2"
ntex-glommio = { version = "0.1.0", optional = true }
ntex-async-std = { version = "0.1.0", optional = true }
ntex-io = "0.1.7"
ntex-tokio = "0.1.3"
ntex-glommio = { version = "0.1.1", optional = true }
ntex-async-std = { version = "0.1.1", optional = true }
async-oneshot = "0.5.0"
async-channel = "1.6.1"

View file

@ -610,11 +610,9 @@ mod tests {
#[crate::rt_test]
async fn test_bytes_mut() {
let mut b = BytesMut::from("test");
assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test");
let mut b = Body::from(BytesMut::from("test"));
assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!(b.get_ref(), b"test");
assert_eq!(
poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
Some(Bytes::from("test"))

View file

@ -35,7 +35,7 @@ where
Some(port) => write!(wrt, "{}:{}", host, port),
};
match HeaderValue::from_maybe_shared(wrt.get_mut().split().freeze()) {
match HeaderValue::from_maybe_shared(wrt.get_mut().split()) {
Ok(value) => match head {
RequestHeadType::Owned(ref mut head) => {
head.headers.insert(HOST, value)

View file

@ -469,15 +469,15 @@ impl Decoder for PayloadDecoder {
let len = src.len() as u64;
let buf;
if *remaining > len {
buf = src.split().freeze();
buf = src.split();
*remaining -= len;
} else {
buf = src.split_to(*remaining as usize).freeze();
buf = src.split_to(*remaining as usize);
*remaining = 0;
};
self.kind.set(kind);
log::trace!("Length read: {}", buf.len());
Ok(Some(PayloadItem::Chunk(buf)))
Ok(Some(PayloadItem::Chunk(buf.freeze())))
}
}
Kind::Chunked(ref mut state, ref mut size) => {
@ -630,13 +630,13 @@ impl ChunkedState {
} else {
let slice;
if *rem > len {
slice = rdr.split().freeze();
slice = rdr.split();
*rem -= len;
} else {
slice = rdr.split_to(*rem as usize).freeze();
slice = rdr.split_to(*rem as usize);
*rem = 0;
}
*buf = Some(slice);
*buf = Some(slice.freeze());
if *rem > 0 {
Poll::Ready(Ok(ChunkedState::Body))
} else {

View file

@ -844,14 +844,14 @@ mod tests {
client.write("GET /test1 HTTP/1.1\r\n\r\n");
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(!client.is_server_dropped());
client.write("GET /test2 HTTP/1.1\r\n\r\n");
client.write("GET /test3 HTTP/1.1\r\n\r\n");
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(decoder.decode(&mut buf).unwrap().is_none());
@ -877,13 +877,13 @@ mod tests {
sleep(Millis(50)).await;
client.write("xxxxx");
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(!client.is_server_dropped());
client.write("GET /test2 HTTP/1.1\r\n\r\n");
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(decoder.decode(&mut buf).unwrap().is_none());
assert!(!client.is_server_dropped());
@ -904,7 +904,7 @@ mod tests {
client.write("GET /test HTTP/1.1\r\n\r\n");
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(!client.is_server_dropped());
@ -913,10 +913,10 @@ mod tests {
sleep(Millis(50)).await;
client.write("GET /test HTTP/1.1\r\n\r\n");
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(decoder.decode(&mut buf).unwrap().is_none());
assert!(!client.is_server_dropped());
@ -984,13 +984,12 @@ mod tests {
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
sleep(Millis(50)).await;
// required because io shutdown is async oper
let _ = lazy(|cx| Pin::new(&mut h1).poll(cx)).await;
sleep(Millis(550)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
crate::util::poll_fn(|cx| Pin::new(&mut h1).poll(cx))
.await
.unwrap();
assert!(h1.inner.io.is_closed());
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert_eq!(load(&mut decoder, &mut buf).status, StatusCode::BAD_REQUEST);
}
@ -1129,7 +1128,7 @@ mod tests {
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);
let mut decoder = ClientCodec::default();
let mut buf = client.read().await.unwrap();
let mut buf = BytesMut::from(&client.read().await.unwrap()[..]);
assert!(load(&mut decoder, &mut buf).status.is_success());
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_pending());
@ -1157,7 +1156,7 @@ mod tests {
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
sleep(Millis(50)).await;
assert!(h1.inner.io.is_closed());
let buf = client.local_buffer(|buf| buf.split().freeze());
let buf = client.local_buffer(|buf| buf.split());
assert_eq!(&buf[..28], b"HTTP/1.1 500 Internal Server");
assert_eq!(&buf[buf.len() - 5..], b"error");
}

View file

@ -597,10 +597,7 @@ mod tests {
assert!(!enc.encode(b"test", &mut bytes).ok().unwrap());
assert!(enc.encode(b"", &mut bytes).ok().unwrap());
}
assert_eq!(
bytes.split().freeze(),
Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n")
);
assert_eq!(bytes.split(), Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n"));
}
#[test]
@ -629,7 +626,7 @@ mod tests {
ConnectionType::Close,
&DateService::default(),
);
let data = String::from_utf8(Vec::from(bytes.split().freeze().as_ref())).unwrap();
let data = String::from_utf8(Vec::from(bytes.split().as_ref())).unwrap();
assert!(data.contains("content-length: 0\r\n"));
assert!(data.contains("connection: close\r\n"));
assert!(data.contains("authorization: another authorization\r\n"));
@ -641,35 +638,35 @@ mod tests {
let mut bytes = BytesMut::new();
bytes.reserve(50);
write_content_length(0, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 0\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 0\r\n"[..]);
bytes.reserve(50);
write_content_length(9, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 9\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 9\r\n"[..]);
bytes.reserve(50);
write_content_length(10, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 10\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 10\r\n"[..]);
bytes.reserve(50);
write_content_length(99, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 99\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 99\r\n"[..]);
bytes.reserve(50);
write_content_length(100, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 100\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 100\r\n"[..]);
bytes.reserve(50);
write_content_length(101, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 101\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 101\r\n"[..]);
bytes.reserve(50);
write_content_length(998, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 998\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 998\r\n"[..]);
bytes.reserve(50);
write_content_length(1000, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 1000\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 1000\r\n"[..]);
bytes.reserve(50);
write_content_length(1001, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 1001\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 1001\r\n"[..]);
bytes.reserve(50);
write_content_length(5909, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 5909\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 5909\r\n"[..]);
write_content_length(25999, &mut bytes);
assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 25999\r\n"[..]);
assert_eq!(bytes.split(), b"\r\ncontent-length: 25999\r\n"[..]);
}
}

View file

@ -105,6 +105,8 @@ pub mod tls {
}
pub mod util {
pub use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut, Pool, PoolId, PoolRef};
pub use ntex_bytes::{
Buf, BufMut, ByteString, Bytes, BytesMut, BytesVec, Pool, PoolId, PoolRef,
};
pub use ntex_util::{future::*, ready, services::*, HashMap, HashSet};
}

View file

@ -228,9 +228,7 @@ impl Decoder for Codec {
OpCode::Continue => {
if self.flags.get().contains(Flags::R_CONTINUATION) {
Ok(Some(Frame::Continuation(Item::Continue(
payload
.map(|pl| pl.freeze())
.unwrap_or_else(Bytes::new),
payload.unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationNotStarted)
@ -240,9 +238,7 @@ impl Decoder for Codec {
if !self.flags.get().contains(Flags::R_CONTINUATION) {
self.insert_flags(Flags::R_CONTINUATION);
Ok(Some(Frame::Continuation(Item::FirstBinary(
payload
.map(|pl| pl.freeze())
.unwrap_or_else(Bytes::new),
payload.unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationStarted)
@ -252,20 +248,18 @@ impl Decoder for Codec {
if !self.flags.get().contains(Flags::R_CONTINUATION) {
self.insert_flags(Flags::R_CONTINUATION);
Ok(Some(Frame::Continuation(Item::FirstText(
payload
.map(|pl| pl.freeze())
.unwrap_or_else(Bytes::new),
payload.unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationStarted)
}
}
OpCode::Ping => Ok(Some(Frame::Ping(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Pong => Ok(Some(Frame::Pong(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Ping => {
Ok(Some(Frame::Ping(payload.unwrap_or_else(Bytes::new))))
}
OpCode::Pong => {
Ok(Some(Frame::Pong(payload.unwrap_or_else(Bytes::new))))
}
OpCode::Bad => Err(ProtocolError::BadOpCode),
_ => {
error!("Unfinished fragment {:?}", opcode);
@ -278,9 +272,7 @@ impl Decoder for Codec {
if self.flags.get().contains(Flags::R_CONTINUATION) {
self.remove_flags(Flags::R_CONTINUATION);
Ok(Some(Frame::Continuation(Item::Last(
payload
.map(|pl| pl.freeze())
.unwrap_or_else(Bytes::new),
payload.unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationNotStarted)
@ -295,18 +287,18 @@ impl Decoder for Codec {
Ok(Some(Frame::Close(None)))
}
}
OpCode::Ping => Ok(Some(Frame::Ping(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Pong => Ok(Some(Frame::Pong(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Binary => Ok(Some(Frame::Binary(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Text => Ok(Some(Frame::Text(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Ping => {
Ok(Some(Frame::Ping(payload.unwrap_or_else(Bytes::new))))
}
OpCode::Pong => {
Ok(Some(Frame::Pong(payload.unwrap_or_else(Bytes::new))))
}
OpCode::Binary => {
Ok(Some(Frame::Binary(payload.unwrap_or_else(Bytes::new))))
}
OpCode::Text => {
Ok(Some(Frame::Text(payload.unwrap_or_else(Bytes::new))))
}
}
}
}

View file

@ -5,7 +5,7 @@ use nanorand::{Rng, WyRand};
use super::proto::{CloseCode, CloseReason, OpCode};
use super::{error::ProtocolError, mask::apply_mask};
use crate::util::{Buf, BufMut, BytesMut};
use crate::util::{Buf, BufMut, Bytes, BytesMut};
/// WebSocket frame parser.
#[derive(Debug)]
@ -92,7 +92,7 @@ impl Parser {
src: &mut BytesMut,
server: bool,
max_size: usize,
) -> Result<Option<(bool, OpCode, Option<BytesMut>)>, ProtocolError> {
) -> Result<Option<(bool, OpCode, Option<Bytes>)>, ProtocolError> {
// try to parse ws frame metadata
let (idx, finished, opcode, length, mask) =
match Parser::parse_metadata(src, server, max_size)? {
@ -113,8 +113,6 @@ impl Parser {
return Ok(Some((finished, opcode, None)));
}
let mut data = src.split_to(length);
// control frames must have length <= 125
match opcode {
OpCode::Ping | OpCode::Pong if length > 125 => {
@ -129,10 +127,14 @@ impl Parser {
// unmask
if let Some(mask) = mask {
apply_mask(&mut data, mask);
apply_mask(&mut src[..length], mask);
}
Ok(Some((finished, opcode, Some(data))))
Ok(Some((
finished,
opcode,
Some(src.split_to(length).freeze()),
)))
}
/// Parse the payload of a close frame.
@ -225,21 +227,19 @@ mod tests {
payload: Bytes,
}
fn is_none(
frm: &Result<Option<(bool, OpCode, Option<BytesMut>)>, ProtocolError>,
) -> bool {
fn is_none(frm: &Result<Option<(bool, OpCode, Option<Bytes>)>, ProtocolError>) -> bool {
match *frm {
Ok(None) => true,
_ => false,
}
}
fn extract(frm: Result<Option<(bool, OpCode, Option<BytesMut>)>, ProtocolError>) -> F {
fn extract(frm: Result<Option<(bool, OpCode, Option<Bytes>)>, ProtocolError>) -> F {
match frm {
Ok(Some((finished, opcode, payload))) => F {
finished,
opcode,
payload: payload.map(|b| b.freeze()).unwrap_or_else(Bytes::new),
payload: payload.unwrap_or_else(Bytes::new),
},
_ => unreachable!("error"),
}

View file

@ -3,7 +3,7 @@ use std::{any, cell::Cell, cmp, io, task::Context, task::Poll};
use crate::codec::{Decoder, Encoder};
use crate::io::{Base, Filter, FilterFactory, Io, IoRef, ReadStatus, WriteStatus};
use crate::util::{BufMut, BytesMut, PoolRef, Ready};
use crate::util::{BufMut, BytesVec, PoolRef, Ready};
use super::{CloseCode, CloseReason, Codec, Frame, Item, Message};
@ -21,7 +21,7 @@ pub struct WsTransport<F = Base> {
pool: PoolRef,
codec: Codec,
flags: Cell<Flags>,
read_buf: Cell<Option<BytesMut>>,
read_buf: Cell<Option<BytesVec>>,
}
impl<F: Filter> WsTransport<F> {
@ -67,7 +67,7 @@ impl<F: Filter> Filter for WsTransport<F> {
} else {
CloseCode::Normal
};
let _ = self.codec.encode(
let _ = self.codec.encode_vec(
Message::Close(Some(CloseReason {
code,
description: None,
@ -91,17 +91,17 @@ impl<F: Filter> Filter for WsTransport<F> {
}
#[inline]
fn get_read_buf(&self) -> Option<BytesMut> {
fn get_read_buf(&self) -> Option<BytesVec> {
self.read_buf.take()
}
#[inline]
fn get_write_buf(&self) -> Option<BytesMut> {
fn get_write_buf(&self) -> Option<BytesVec> {
None
}
#[inline]
fn release_read_buf(&self, buf: BytesMut) {
fn release_read_buf(&self, buf: BytesVec) {
self.read_buf.set(Some(buf));
}
@ -136,11 +136,12 @@ impl<F: Filter> Filter for WsTransport<F> {
dst.reserve(hw - remaining);
}
let frame = if let Some(frame) = self.codec.decode(&mut src).map_err(|e| {
log::trace!("Failed to decode ws codec frames: {:?}", e);
self.insert_flags(Flags::PROTO_ERR);
io::Error::new(io::ErrorKind::Other, e)
})? {
let frame = if let Some(frame) =
self.codec.decode_vec(&mut src).map_err(|e| {
log::trace!("Failed to decode ws codec frames: {:?}", e);
self.insert_flags(Flags::PROTO_ERR);
io::Error::new(io::ErrorKind::Other, e)
})? {
frame
} else {
break;
@ -182,7 +183,7 @@ impl<F: Filter> Filter for WsTransport<F> {
.inner
.get_write_buf()
.unwrap_or_else(|| self.pool.get_write_buf());
let _ = self.codec.encode(Message::Pong(msg), &mut b);
let _ = self.codec.encode_vec(Message::Pong(msg), &mut b);
self.release_write_buf(b)?;
}
Frame::Pong(_) => (),
@ -205,7 +206,7 @@ impl<F: Filter> Filter for WsTransport<F> {
Ok((dlen, nbytes))
}
fn release_write_buf(&self, src: BytesMut) -> Result<(), io::Error> {
fn release_write_buf(&self, src: BytesVec) -> Result<(), io::Error> {
let mut buf = if let Some(buf) = self.inner.get_write_buf() {
buf
} else {
@ -220,7 +221,9 @@ impl<F: Filter> Filter for WsTransport<F> {
}
// Encoder ws::Codec do not fail
let _ = self.codec.encode(Message::Binary(src.freeze()), &mut buf);
let _ = self
.codec
.encode_vec(Message::Binary(src.freeze()), &mut buf);
self.inner.release_write_buf(buf)
}
}

View file

@ -1,12 +1,11 @@
use std::io;
use std::sync::Arc;
use std::{io, sync::Arc};
use ntex::codec::BytesCodec;
use ntex::connect::Connect;
use ntex::io::{types::PeerAddr, Io};
use ntex::server::test_server;
use ntex::service::{fn_service, pipeline_factory, Service, ServiceFactory};
use ntex::util::Bytes;
use ntex::{time, util::Bytes};
#[cfg(feature = "openssl")]
fn ssl_acceptor() -> tls_openssl::ssl::SslAcceptor {
@ -90,6 +89,7 @@ async fn test_openssl_string() {
io.send(Bytes::from_static(b"test"), &BytesCodec)
.await
.unwrap();
time::sleep(time::Millis(100)).await;
Ok::<_, Box<dyn std::error::Error>>(())
}))
});
@ -144,7 +144,6 @@ async fn test_openssl_read_before_error() {
#[cfg(feature = "rustls")]
#[ntex::test]
#[ignore]
async fn test_rustls_string() {
use ntex::server::rustls;
use ntex_tls::rustls::PeerCert;