Refactor bytes impl (#64)

* remove mut api for Bytes

* refactor Vec layout

* track allocations in memory pools

* integrate memory pools to framed

* add pool async readiness polling

* optimize readiness check

* use memory pool in framed dispatcher
This commit is contained in:
Nikolay Kim 2021-12-02 21:09:10 +06:00 committed by GitHub
parent 6eea3dd2ad
commit 42b8292ecd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1524 additions and 1165 deletions

View file

@ -1,5 +1,13 @@
# Changes
## 0.1.5 (2021-12-02)
* Split,freeze,truncate operations produce inline Bytes object if possible
* Refactor Vec representation
* Introduce memory pools
## 0.1.4 (2021-06-27)
* Reduce size of Option<Bytes> by using NonNull

View file

@ -1,8 +1,8 @@
[package]
name = "ntex-bytes"
version = "0.1.4"
version = "0.1.5"
license = "MIT"
authors = ["Carl Lerche <me@carllerche.com>"]
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Carl Lerche <me@carllerche.com>"]
description = "Types and traits for working with bytes (bytes crate fork)"
documentation = "https://docs.rs/ntex-bytes"
repository = "https://github.com/ntex-rs/ntex-bytes"
@ -12,9 +12,12 @@ categories = ["network-programming", "data-structures"]
edition = "2018"
[dependencies]
serde = "1.0"
bytes = "1.0.1"
bitflags = "1.3"
bytes = "1.0.0"
serde = "1.0.0"
futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] }
[dev-dependencies]
serde_test = "1.0"
serde_json = "1.0"
ntex = "0.4.10"

File diff suppressed because it is too large Load diff

View file

@ -52,8 +52,8 @@
#![deny(
warnings,
missing_docs,
missing_debug_implementations,
// missing_docs,
// missing_debug_implementations,
rust_2018_idioms
)]
#![doc(html_root_url = "https://docs.rs/ntex-bytes/")]
@ -64,9 +64,12 @@ pub use crate::buf::{Buf, BufMut};
mod bytes;
mod debug;
mod hex;
mod pool;
mod serde;
mod string;
pub use crate::bytes::{Bytes, BytesMut};
pub use crate::string::ByteString;
mod serde;
#[doc(hidden)]
pub use crate::pool::{Pool, PoolId, PoolRef};

777
ntex-bytes/src/pool.rs Normal file
View file

@ -0,0 +1,777 @@
#![allow(clippy::type_complexity)]
use std::sync::atomic::Ordering::{Relaxed, Release};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::task::{Context, Poll, Waker};
use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, rc::Rc};
use futures_core::task::__internal::AtomicWaker;
use crate::BytesMut;
pub struct Pool {
idx: Cell<usize>,
inner: &'static MemoryPool,
}
#[derive(Copy, Clone)]
pub struct PoolRef(&'static MemoryPool);
#[derive(Copy, Clone, Debug)]
pub struct PoolId(u8);
pub trait AsPoolRef {
fn pool_ref(&self) -> PoolRef;
}
#[derive(Copy, Clone)]
pub struct BufParams {
pub high: u16,
pub low: u16,
}
bitflags::bitflags! {
struct Flags: u8 {
const SPAWNED = 0b0000_0001;
const INCREASED = 0b0000_0010;
}
}
struct MemoryPool {
id: PoolId,
waker: AtomicWaker,
waker_alive: AtomicBool,
waiters: RefCell<Waiters>,
flags: Cell<Flags>,
size: AtomicUsize,
max_size: Cell<usize>,
window_h: Cell<usize>,
window_l: Cell<usize>,
window_idx: Cell<usize>,
window_waiters: Cell<usize>,
windows: Cell<[(usize, usize); 10]>,
// io read/write bytesmut cache and params
read_wm: Cell<BufParams>,
read_cache: RefCell<Vec<BytesMut>>,
write_wm: Cell<BufParams>,
write_cache: RefCell<Vec<BytesMut>>,
spawn: RefCell<Option<Rc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)>>>,
}
const CACHE_SIZE: usize = 16;
impl PoolId {
pub const P0: PoolId = PoolId(0);
pub const P1: PoolId = PoolId(1);
pub const P2: PoolId = PoolId(2);
pub const P3: PoolId = PoolId(3);
pub const P4: PoolId = PoolId(4);
pub const P5: PoolId = PoolId(5);
pub const P6: PoolId = PoolId(6);
pub const P7: PoolId = PoolId(7);
pub const P8: PoolId = PoolId(8);
pub const P9: PoolId = PoolId(9);
pub const P10: PoolId = PoolId(10);
pub const P11: PoolId = PoolId(11);
pub const P12: PoolId = PoolId(12);
pub const P13: PoolId = PoolId(13);
pub const P14: PoolId = PoolId(14);
pub const DEFAULT: PoolId = PoolId(15);
#[inline]
pub fn pool(self) -> Pool {
POOLS.with(|pools| Pool {
idx: Cell::new(0),
inner: pools[self.0 as usize],
})
}
#[inline]
pub fn pool_ref(self) -> PoolRef {
POOLS.with(|pools| PoolRef(pools[self.0 as usize]))
}
/// Set future spawn fn
pub fn set_spawn_fn<T>(f: T)
where
T: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
{
let spawn: Rc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>)> =
Rc::new(move |fut| f(fut));
POOLS.with(move |pools| {
for pool in pools.iter().take(15) {
*pool.spawn.borrow_mut() = Some(spawn.clone());
}
});
}
}
impl AsPoolRef for PoolId {
#[inline]
fn pool_ref(&self) -> PoolRef {
POOLS.with(|pools| PoolRef(pools[self.0 as usize]))
}
}
thread_local! {
static POOLS: [&'static MemoryPool; 16] = [
MemoryPool::create(PoolId::P0),
MemoryPool::create(PoolId::P1),
MemoryPool::create(PoolId::P2),
MemoryPool::create(PoolId::P3),
MemoryPool::create(PoolId::P4),
MemoryPool::create(PoolId::P5),
MemoryPool::create(PoolId::P6),
MemoryPool::create(PoolId::P7),
MemoryPool::create(PoolId::P8),
MemoryPool::create(PoolId::P9),
MemoryPool::create(PoolId::P10),
MemoryPool::create(PoolId::P11),
MemoryPool::create(PoolId::P12),
MemoryPool::create(PoolId::P13),
MemoryPool::create(PoolId::P14),
MemoryPool::create(PoolId::DEFAULT),
];
}
impl PoolRef {
#[inline]
/// Get pool id.
pub fn id(self) -> PoolId {
self.0.id
}
#[inline]
/// Get `Pool` instance for this pool ref.
pub fn pool(self) -> Pool {
Pool {
idx: Cell::new(0),
inner: self.0,
}
}
#[inline]
/// Get total number of allocated bytes.
pub fn allocated(self) -> usize {
self.0.size.load(Relaxed)
}
#[inline]
pub fn move_in(self, buf: &mut BytesMut) {
buf.move_to_pool(self);
}
#[inline]
/// Creates a new `BytesMut` with the specified capacity.
pub fn buf_with_capacity(self, cap: usize) -> BytesMut {
BytesMut::with_capacity_in_priv(cap, self)
}
#[inline]
/// Set max pool size
pub fn set_pool_size(self, size: usize) -> Self {
self.0.max_size.set(size);
self.0.window_waiters.set(0);
self.0.window_l.set(size);
self.0.window_h.set(usize::MAX);
self.0.window_idx.set(0);
let mut flags = self.0.flags.get();
flags.insert(Flags::INCREASED);
self.0.flags.set(flags);
// calc windows
let mut l = size;
let mut h = usize::MAX;
let mut windows: [(usize, usize); 10] = Default::default();
windows[0] = (l, h);
for (idx, item) in windows.iter_mut().enumerate().skip(1) {
h = l;
l = size - (size / 100) * idx;
*item = (l, h);
}
self.0.windows.set(windows);
// release old waiters
let mut waiters = self.0.waiters.borrow_mut();
while let Some(waker) = waiters.consume() {
waker.wake();
}
self
}
#[doc(hidden)]
#[inline]
pub fn read_params(self) -> BufParams {
self.0.read_wm.get()
}
#[doc(hidden)]
#[inline]
pub fn read_params_high(self) -> usize {
self.0.read_wm.get().high as usize
}
#[doc(hidden)]
#[inline]
pub fn set_read_params(self, h: u16, l: u16) -> Self {
assert!(l < h);
self.0.read_wm.set(BufParams { high: h, low: l });
self
}
#[doc(hidden)]
#[inline]
pub fn write_params(self) -> BufParams {
self.0.write_wm.get()
}
#[doc(hidden)]
#[inline]
pub fn write_params_high(self) -> usize {
self.0.write_wm.get().high as usize
}
#[doc(hidden)]
#[inline]
pub fn set_write_params(self, h: u16, l: u16) -> Self {
assert!(l < h);
self.0.write_wm.set(BufParams { high: h, low: l });
self
}
#[doc(hidden)]
#[inline]
pub fn get_read_buf(self) -> BytesMut {
if let Some(buf) = self.0.read_cache.borrow_mut().pop() {
buf
} else {
BytesMut::with_capacity_in_priv(self.0.read_wm.get().high as usize, self)
}
}
#[doc(hidden)]
#[inline]
/// Release read buffer, buf must be allocated from this pool
pub fn release_read_buf(self, mut buf: BytesMut) {
let cap = buf.capacity();
let (hw, lw) = self.0.read_wm.get().unpack();
if cap > lw && cap <= hw {
let v = &mut self.0.read_cache.borrow_mut();
if v.len() < CACHE_SIZE {
buf.clear();
v.push(buf);
}
}
}
#[doc(hidden)]
#[inline]
pub fn get_write_buf(self) -> BytesMut {
if let Some(buf) = self.0.write_cache.borrow_mut().pop() {
buf
} else {
BytesMut::with_capacity_in_priv(self.0.write_wm.get().high as usize, self)
}
}
#[doc(hidden)]
#[inline]
/// Release write buffer, buf must be allocated from this pool
pub fn release_write_buf(self, mut buf: BytesMut) {
let cap = buf.capacity();
let (hw, lw) = self.0.write_wm.get().unpack();
if cap > lw && cap <= hw {
let v = &mut self.0.write_cache.borrow_mut();
if v.len() < CACHE_SIZE {
buf.clear();
v.push(buf);
}
}
}
#[inline]
pub(crate) fn acquire(self, size: usize) {
let prev = self.0.size.fetch_add(size, Relaxed);
if self.0.waker_alive.load(Relaxed) {
self.wake_driver(prev + size)
}
}
#[inline]
pub(crate) fn release(self, size: usize) {
let prev = self.0.size.fetch_sub(size, Relaxed);
if self.0.waker_alive.load(Relaxed) {
self.wake_driver(prev - size)
}
}
fn wake_driver(self, allocated: usize) {
let l = self.0.window_l.get();
let h = self.0.window_h.get();
if allocated < l || allocated > h {
self.0.waker_alive.store(false, Relaxed);
self.0.waker.wake();
}
}
}
impl Default for PoolRef {
#[inline]
fn default() -> PoolRef {
PoolId::DEFAULT.pool_ref()
}
}
impl AsPoolRef for PoolRef {
#[inline]
fn pool_ref(&self) -> PoolRef {
*self
}
}
impl fmt::Debug for PoolRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolRef")
.field("id", &self.id().0)
.field("allocated", &self.allocated())
.finish()
}
}
impl MemoryPool {
fn create(id: PoolId) -> &'static MemoryPool {
Box::leak(Box::new(MemoryPool {
id,
waker: AtomicWaker::new(),
waker_alive: AtomicBool::new(false),
waiters: RefCell::new(Waiters::new()),
flags: Cell::new(Flags::empty()),
size: AtomicUsize::new(0),
max_size: Cell::new(0),
window_h: Cell::new(0),
window_l: Cell::new(0),
window_waiters: Cell::new(0),
window_idx: Cell::new(0),
windows: Default::default(),
read_wm: Cell::new(BufParams {
high: 4 * 1024,
low: 1024,
}),
read_cache: RefCell::new(Vec::with_capacity(CACHE_SIZE)),
write_wm: Cell::new(BufParams {
high: 4 * 1024,
low: 1024,
}),
write_cache: RefCell::new(Vec::with_capacity(CACHE_SIZE)),
spawn: RefCell::new(None),
}))
}
}
impl BufParams {
#[inline]
pub fn unpack(self) -> (usize, usize) {
(self.high as usize, self.low as usize)
}
}
impl Clone for Pool {
fn clone(&self) -> Pool {
Pool {
idx: Cell::new(0),
inner: self.inner,
}
}
}
impl Drop for Pool {
fn drop(&mut self) {
let idx = self.idx.get();
if idx > 0 {
// cleanup waiter
let mut waiters = self.inner.waiters.borrow_mut();
waiters.remove(idx - 1);
waiters.truncate();
}
}
}
impl Pool {
#[inline]
/// Get pool id.
pub fn id(&self) -> PoolId {
self.inner.id
}
#[inline]
/// Check if pool is pedning
pub fn is_pending(&self) -> bool {
let idx = self.idx.get();
if idx > 0 {
if let Some(Entry::Occupied(_)) =
self.inner.waiters.borrow().entries.get(idx - 1)
{
return true;
}
}
false
}
#[doc(hidden)]
#[inline]
/// Check if pool is pedning
pub fn windows(&self) -> [(usize, usize); 10] {
self.inner.windows.get()
}
#[inline]
/// Get `PoolRef` instance for this pool.
pub fn pool_ref(&self) -> PoolRef {
PoolRef(self.inner)
}
#[inline]
pub fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
if self.inner.max_size.get() > 0 {
let allocated = self.inner.size.load(Relaxed);
// lower than low
let window_l = self.inner.window_l.get();
if window_l == 0 || allocated < window_l {
let idx = self.idx.get();
if idx > 0 {
// cleanup waiter
let mut waiters = self.inner.waiters.borrow_mut();
waiters.remove(idx - 1);
waiters.truncate();
self.idx.set(0);
}
return Poll::Ready(());
}
// register waiter
if let Some(spawn) = &*self.inner.spawn.borrow() {
let idx = self.idx.get();
let mut flags = self.inner.flags.get();
let mut waiters = self.inner.waiters.borrow_mut();
let new = if idx == 0 {
self.idx.set(waiters.append(ctx.waker().clone()) + 1);
true
} else {
waiters.update(idx - 1, ctx.waker().clone())
};
if flags.contains(Flags::INCREASED) || !new {
self.inner
.window_waiters
.set(self.inner.window_waiters.get() + 1);
} else if let Some(waker) = waiters.consume() {
waker.wake();
}
// start driver task
if !flags.contains(Flags::SPAWNED) {
flags.insert(Flags::SPAWNED);
self.inner.flags.set(flags);
spawn(Box::pin(Driver { pool: self.inner }))
}
return Poll::Pending;
}
}
Poll::Ready(())
}
}
struct Driver {
pool: &'static MemoryPool,
}
impl Driver {
fn release(&self, waiters_num: usize) {
let mut waiters = self.pool.waiters.borrow_mut();
let mut to_release = waiters.occupied_len / 100 * 5;
if waiters_num > to_release {
to_release += waiters_num >> 1;
} else {
to_release += waiters_num;
}
while to_release > 0 {
if let Some(waker) = waiters.consume() {
waker.wake();
to_release -= 1;
} else {
break;
}
}
}
fn release_all(&self) {
let mut waiters = self.pool.waiters.borrow_mut();
while let Some(waker) = waiters.consume() {
waker.wake();
}
}
}
impl Future for Driver {
type Output = ();
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pool = self.as_ref().pool;
let allocated = pool.size.load(Relaxed);
let win_l = pool.window_l.get();
let win_h = pool.window_h.get();
// allocated size is decreased, release waiters
if allocated < win_l {
let mut idx = pool.window_idx.get() + 1;
let mut waiters = pool.window_waiters.get();
let windows = pool.windows.get();
loop {
// allocated size decreased more than 10%, release all
if idx == 10 {
self.release_all();
pool.window_l.set(windows[0].0);
pool.window_h.set(windows[0].1);
pool.window_idx.set(0);
pool.window_waiters.set(0);
pool.flags.set(Flags::INCREASED);
return Poll::Ready(());
} else {
// release 5% of pending waiters
self.release(waiters);
if allocated > windows[idx].0 {
pool.window_l.set(windows[idx].0);
pool.window_h.set(windows[idx].1);
pool.window_idx.set(idx);
pool.window_waiters.set(0);
pool.flags.set(Flags::SPAWNED);
break;
}
idx += 1;
waiters = 0;
}
}
}
// allocated size is increased
else if allocated > win_h {
// reset window
let idx = pool.window_idx.get() - 1;
let windows = pool.windows.get();
pool.window_l.set(windows[idx].0);
pool.window_h.set(windows[idx].1);
pool.window_idx.set(idx);
pool.window_waiters.set(0);
pool.flags.set(Flags::SPAWNED | Flags::INCREASED);
}
// register waker
pool.waker.register(cx.waker());
pool.waker_alive.store(true, Release);
Poll::Pending
}
}
struct Waiters {
entries: Vec<Entry>,
root: usize,
tail: usize,
free: usize,
len: usize,
occupied_len: usize,
}
#[derive(Debug)]
enum Entry {
Vacant(usize),
Consumed,
Occupied(Node),
}
#[derive(Debug)]
struct Node {
item: Waker,
prev: usize,
next: usize,
}
impl Waiters {
fn new() -> Waiters {
Waiters {
entries: Vec::new(),
root: usize::MAX,
tail: usize::MAX,
free: 0,
len: 0,
occupied_len: 0,
}
}
fn truncate(&mut self) {
if self.len == 0 {
self.entries.truncate(0);
self.root = usize::MAX;
self.tail = usize::MAX;
self.free = 0;
}
}
fn get_node(&mut self, key: usize) -> &mut Node {
if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) {
return node;
}
unreachable!()
}
// consume root item
fn consume(&mut self) -> Option<Waker> {
if self.root != usize::MAX {
self.occupied_len -= 1;
let entry = self.entries.get_mut(self.root).unwrap();
let prev = mem::replace(entry, Entry::Consumed);
match prev {
Entry::Occupied(node) => {
debug_assert!(node.prev == usize::MAX);
// last item
if self.tail == self.root {
self.tail = usize::MAX;
self.root = usize::MAX;
} else {
// remove from root
self.root = node.next;
self.get_node(self.root).prev = usize::MAX;
}
Some(node.item)
}
_ => {
unreachable!()
}
}
} else {
None
}
}
fn update(&mut self, key: usize, val: Waker) -> bool {
if let Some(entry) = self.entries.get_mut(key) {
match entry {
Entry::Occupied(ref mut node) => {
node.item = val;
return false;
}
Entry::Consumed => {
*entry = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
}
_ => unreachable!(),
}
}
self.occupied_len += 1;
if self.root == usize::MAX {
self.root = key;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.tail = key;
true
}
fn remove(&mut self, key: usize) {
if let Some(entry) = self.entries.get_mut(key) {
// Swap the entry at the provided value
let prev = mem::replace(entry, Entry::Vacant(self.free));
match prev {
Entry::Occupied(node) => {
self.len -= 1;
self.occupied_len -= 1;
self.free = key;
// remove from root
if self.root == key {
self.root = node.next;
if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
}
}
// remove from tail
if self.tail == key {
self.tail = node.prev;
if self.tail != usize::MAX {
self.get_node(self.tail).next = usize::MAX;
}
}
}
Entry::Consumed => {
self.len -= 1;
self.free = key;
}
_ => {
unreachable!()
}
}
}
}
fn append(&mut self, val: Waker) -> usize {
self.len += 1;
self.occupied_len += 1;
let key = self.free;
if key == self.entries.len() {
if self.root == usize::MAX {
self.root = key;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.entries.push(Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
}));
self.tail = key;
self.free = key + 1;
} else {
self.free = match self.entries.get(key) {
Some(&Entry::Vacant(next)) => next,
_ => unreachable!(),
};
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.entries[key] = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
self.tail = key;
}
key
}
}

View file

@ -1,7 +1,7 @@
//! A UTF-8 encoded read-only string using Bytes as storage.
use std::{borrow, convert::TryFrom, fmt, hash, ops, slice, str};
use crate::Bytes;
use crate::{Bytes, BytesMut};
/// An immutable UTF-8 encoded string with [`Bytes`] as a storage.
#[derive(Clone, Default, Eq, PartialOrd, Ord)]
@ -242,7 +242,7 @@ impl TryFrom<Bytes> for ByteString {
}
}
impl TryFrom<crate::BytesMut> for ByteString {
impl TryFrom<BytesMut> for ByteString {
type Error = str::Utf8Error;
#[inline]

View file

@ -1,6 +1,7 @@
#![deny(warnings, rust_2018_idioms)]
//#![deny(warnings, rust_2018_idioms)]
use std::task::Poll;
use ntex_bytes::{Buf, BufMut, Bytes, BytesMut};
use ntex_bytes::{Buf, BufMut, Bytes, BytesMut, PoolId};
const LONG: &'static [u8] = b"mary had a little lamb, little lamb, little lamb";
const SHORT: &'static [u8] = b"hello world";
@ -10,6 +11,11 @@ fn inline_cap() -> usize {
4 * mem::size_of::<usize>() - 1
}
const fn shared_vec() -> usize {
use std::mem;
3 * mem::size_of::<usize>()
}
fn is_sync<T: Sync>() {}
fn is_send<T: Send>() {}
@ -108,6 +114,37 @@ fn len() {
assert!(a.is_empty());
}
#[test]
fn inline() {
let a = Bytes::from("abcdefg".to_string());
assert!(a.is_inline());
let a = BytesMut::from(&b"abcdefg"[..]).freeze();
assert!(a.is_inline());
let a = Bytes::from("".to_string());
assert!(a.is_empty());
let a = BytesMut::from(&b""[..]).freeze();
assert!(a.is_inline());
let mut a = BytesMut::from(vec![b'*'; 35]).freeze();
assert!(!a.is_inline());
let b = a.split_to(8);
assert!(b.is_inline());
assert!(a.is_inline());
let mut a = BytesMut::from(vec![b'*'; 35]).freeze();
let b = a.split_off(8);
assert!(b.is_inline());
assert!(a.is_inline());
let mut a = BytesMut::from(vec![b'*'; 35]).freeze();
a.truncate(8);
assert!(a.is_inline());
}
#[test]
fn index() {
let a = Bytes::from(&b"hello world"[..]);
@ -194,7 +231,7 @@ fn split_off_to_loop() {
let mut bytes = Bytes::from(&s[..]);
let off = bytes.split_off(i);
assert_eq!(i, bytes.len());
let mut sum = Vec::new();
let mut sum: Vec<u8> = Vec::new();
sum.extend(bytes.iter());
sum.extend(off.iter());
assert_eq!(&s[..], &sum[..]);
@ -203,7 +240,7 @@ fn split_off_to_loop() {
let mut bytes = BytesMut::from(&s[..]);
let off = bytes.split_off(i);
assert_eq!(i, bytes.len());
let mut sum = Vec::new();
let mut sum: Vec<u8> = Vec::new();
sum.extend(&bytes);
sum.extend(&off);
assert_eq!(&s[..], &sum[..]);
@ -212,7 +249,7 @@ fn split_off_to_loop() {
let mut bytes = Bytes::from(&s[..]);
let off = bytes.split_to(i);
assert_eq!(i, off.len());
let mut sum = Vec::new();
let mut sum: Vec<u8> = Vec::new();
sum.extend(off.iter());
sum.extend(bytes.iter());
assert_eq!(&s[..], &sum[..]);
@ -221,7 +258,7 @@ fn split_off_to_loop() {
let mut bytes = BytesMut::from(&s[..]);
let off = bytes.split_to(i);
assert_eq!(i, off.len());
let mut sum = Vec::new();
let mut sum: Vec<u8> = Vec::new();
sum.extend(&off);
sum.extend(&bytes);
assert_eq!(&s[..], &sum[..]);
@ -232,20 +269,20 @@ fn split_off_to_loop() {
#[test]
fn split_to_1() {
// Inline
let mut a = Bytes::from(SHORT);
let mut a = Bytes::from(&SHORT[..]);
let b = a.split_to(4);
assert_eq!(SHORT[4..], a);
assert_eq!(SHORT[..4], b);
// Allocated
let mut a = Bytes::from(LONG);
let mut a = Bytes::from(Vec::from(LONG));
let b = a.split_to(4);
assert_eq!(LONG[4..], a);
assert_eq!(LONG[..4], b);
let mut a = Bytes::from(LONG);
let mut a = Bytes::from(Vec::from(LONG));
let b = a.split_to(30);
assert_eq!(LONG[30..], a);
@ -322,31 +359,13 @@ fn fns_defined_for_bytes_mut() {
#[test]
fn reserve_convert() {
// Inline -> Vec
let mut bytes = BytesMut::with_capacity(8);
bytes.put("hello".as_bytes());
bytes.reserve(40);
assert_eq!(bytes.capacity(), 45);
assert_eq!(bytes, "hello");
// Inline -> Inline
let mut bytes = BytesMut::with_capacity(inline_cap());
bytes.put("abcdefghijkl".as_bytes());
let a = bytes.split_to(10);
bytes.reserve(inline_cap() - 3);
assert_eq!(inline_cap(), bytes.capacity());
assert_eq!(bytes, "kl");
assert_eq!(a, "abcdefghij");
// Vec -> Vec
let mut bytes = BytesMut::from(LONG);
bytes.reserve(64);
assert_eq!(bytes.capacity(), LONG.len() + 64);
// Arc -> Vec
let mut bytes = BytesMut::from(LONG);
let mut bytes = BytesMut::from(Vec::from(LONG));
let a = bytes.split_to(30);
bytes.reserve(128);
@ -355,46 +374,6 @@ fn reserve_convert() {
drop(a);
}
#[test]
fn reserve_growth() {
let mut bytes = BytesMut::with_capacity(64);
bytes.put("hello world".as_bytes());
let _ = bytes.split();
bytes.reserve(65);
assert_eq!(bytes.capacity(), 128);
}
#[test]
fn reserve_allocates_at_least_original_capacity() {
let mut bytes = BytesMut::with_capacity(1024);
for i in 0..1020 {
bytes.put_u8(i as u8);
}
let _other = bytes.split();
bytes.reserve(16);
assert_eq!(bytes.capacity(), 1024);
}
#[test]
fn reserve_max_original_capacity_value() {
const SIZE: usize = 128 * 1024;
let mut bytes = BytesMut::with_capacity(SIZE);
for _ in 0..SIZE {
bytes.put_u8(0u8);
}
let _other = bytes.split();
bytes.reserve(16);
assert_eq!(bytes.capacity(), 64 * 1024);
}
// Without either looking at the internals of the BytesMut or doing weird stuff
// with the memory allocator, there's no good way to automatically verify from
// within the program that this actually recycles memory. Instead, just exercise
@ -431,7 +410,7 @@ fn reserve_in_arc_unique_doubles() {
assert_eq!(1000, bytes.capacity());
bytes.reserve(1001);
assert_eq!(2000, bytes.capacity());
assert_eq!(1001, bytes.capacity());
}
#[test]
@ -462,13 +441,6 @@ fn extend_mut() {
assert_eq!(*bytes, LONG[..]);
}
#[test]
fn extend_shr() {
let mut bytes = Bytes::new();
bytes.extend(LONG);
assert_eq!(*bytes, LONG[..]);
}
#[test]
fn extend_from_slice_mut() {
for &i in &[3, 34] {
@ -479,16 +451,6 @@ fn extend_from_slice_mut() {
}
}
#[test]
fn extend_from_slice_shr() {
for &i in &[3, 34] {
let mut bytes = Bytes::new();
bytes.extend_from_slice(&LONG[..i]);
bytes.extend_from_slice(&LONG[i..]);
assert_eq!(LONG[..], *bytes);
}
}
#[test]
fn from_static() {
let mut a = Bytes::from_static(b"ab");
@ -585,258 +547,6 @@ fn partial_eq_bytesmut() {
assert!(bytesmut != bytes2);
}
#[test]
fn bytes_unsplit_basic() {
let mut buf = Bytes::with_capacity(64);
buf.extend_from_slice(b"aaabbbcccddd");
let splitted = buf.split_off(6);
assert_eq!(b"aaabbb", &buf[..]);
assert_eq!(b"cccddd", &splitted[..]);
buf.unsplit(splitted);
assert_eq!(b"aaabbbcccddd", &buf[..]);
}
#[test]
fn bytes_unsplit_empty_other() {
let mut buf = Bytes::with_capacity(64);
buf.extend_from_slice(b"aaabbbcccddd");
// empty other
let other = Bytes::new();
buf.unsplit(other);
assert_eq!(b"aaabbbcccddd", &buf[..]);
}
#[test]
fn bytes_unsplit_empty_self() {
// empty self
let mut buf = Bytes::new();
let mut other = Bytes::with_capacity(64);
other.extend_from_slice(b"aaabbbcccddd");
buf.unsplit(other);
assert_eq!(b"aaabbbcccddd", &buf[..]);
}
#[test]
fn bytes_unsplit_inline_arc() {
let mut buf = Bytes::with_capacity(8); //inline
buf.extend_from_slice(b"aaaabbbb");
let mut buf2 = Bytes::with_capacity(64);
buf2.extend_from_slice(b"ccccddddeeee");
buf2.split_off(8); //arc
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_unsplit_arc_inline() {
let mut buf = Bytes::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbeeee");
buf.split_off(8); //arc
let mut buf2 = Bytes::with_capacity(8); //inline
buf2.extend_from_slice(b"ccccdddd");
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_unsplit_both_inline() {
let mut buf = Bytes::with_capacity(16); //inline
buf.extend_from_slice(b"aaaabbbbccccdddd");
let splitted = buf.split_off(8); // both inline
assert_eq!(b"aaaabbbb", &buf[..]);
assert_eq!(b"ccccdddd", &splitted[..]);
buf.unsplit(splitted);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_unsplit_arc_different() {
let mut buf = Bytes::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbeeee");
buf.split_off(8); //arc
let mut buf2 = Bytes::with_capacity(64);
buf2.extend_from_slice(b"ccccddddeeee");
buf2.split_off(8); //arc
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_unsplit_arc_non_contiguous() {
let mut buf = Bytes::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbeeeeccccdddd");
let mut buf2 = buf.split_off(8); //arc
let buf3 = buf2.split_off(4); //arc
buf.unsplit(buf3);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_unsplit_two_split_offs() {
let mut buf = Bytes::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbccccdddd");
let mut buf2 = buf.split_off(8); //arc
let buf3 = buf2.split_off(4); //arc
buf2.unsplit(buf3);
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_unsplit_overlapping_references() {
let mut buf = Bytes::with_capacity(64);
buf.extend_from_slice(b"abcdefghijklmnopqrstuvwxyz");
let mut buf0010 = buf.slice(0..10);
let buf1020 = buf.slice(10..20);
let buf0515 = buf.slice(5..15);
buf0010.unsplit(buf1020);
assert_eq!(b"abcdefghijklmnopqrst", &buf0010[..]);
assert_eq!(b"fghijklmno", &buf0515[..]);
}
#[test]
fn bytes_mut_unsplit_basic() {
let mut buf = BytesMut::with_capacity(64);
buf.extend_from_slice(b"aaabbbcccddd");
let splitted = buf.split_off(6);
assert_eq!(b"aaabbb", &buf[..]);
assert_eq!(b"cccddd", &splitted[..]);
buf.unsplit(splitted);
assert_eq!(b"aaabbbcccddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_empty_other() {
let mut buf = BytesMut::with_capacity(64);
buf.extend_from_slice(b"aaabbbcccddd");
// empty other
let other = BytesMut::new();
buf.unsplit(other);
assert_eq!(b"aaabbbcccddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_empty_self() {
// empty self
let mut buf = BytesMut::new();
let mut other = BytesMut::with_capacity(64);
other.extend_from_slice(b"aaabbbcccddd");
buf.unsplit(other);
assert_eq!(b"aaabbbcccddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_inline_arc() {
let mut buf = BytesMut::with_capacity(8); //inline
buf.extend_from_slice(b"aaaabbbb");
let mut buf2 = BytesMut::with_capacity(64);
buf2.extend_from_slice(b"ccccddddeeee");
buf2.split_off(8); //arc
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_arc_inline() {
let mut buf = BytesMut::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbeeee");
buf.split_off(8); //arc
let mut buf2 = BytesMut::with_capacity(8); //inline
buf2.extend_from_slice(b"ccccdddd");
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_both_inline() {
let mut buf = BytesMut::with_capacity(16); //inline
buf.extend_from_slice(b"aaaabbbbccccdddd");
let splitted = buf.split_off(8); // both inline
assert_eq!(b"aaaabbbb", &buf[..]);
assert_eq!(b"ccccdddd", &splitted[..]);
buf.unsplit(splitted);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_arc_different() {
let mut buf = BytesMut::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbeeee");
buf.split_off(8); //arc
let mut buf2 = BytesMut::with_capacity(64);
buf2.extend_from_slice(b"ccccddddeeee");
buf2.split_off(8); //arc
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_arc_non_contiguous() {
let mut buf = BytesMut::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbeeeeccccdddd");
let mut buf2 = buf.split_off(8); //arc
let buf3 = buf2.split_off(4); //arc
buf.unsplit(buf3);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn bytes_mut_unsplit_two_split_offs() {
let mut buf = BytesMut::with_capacity(64);
buf.extend_from_slice(b"aaaabbbbccccdddd");
let mut buf2 = buf.split_off(8); //arc
let buf3 = buf2.split_off(4); //arc
buf2.unsplit(buf3);
buf.unsplit(buf2);
assert_eq!(b"aaaabbbbccccdddd", &buf[..]);
}
#[test]
fn from_iter_no_size_hint() {
use std::iter;
@ -910,3 +620,82 @@ fn empty_slice_ref_catches_not_an_empty_subset() {
bytes.slice_ref(slice);
}
#[test]
fn pool() {
// Pool
let p1 = PoolId::P1.pool_ref();
assert_eq!(p1.allocated(), 0);
let mut buf = BytesMut::with_capacity_in(1024, p1);
assert_eq!(p1.allocated(), 1024 + shared_vec());
buf.reserve(2048);
assert_eq!(p1.allocated(), 2048 + shared_vec());
drop(buf);
assert_eq!(p1.allocated(), 0);
// Default pool
let p = PoolId::DEFAULT.pool_ref();
assert_eq!(p.allocated(), 0);
let mut buf = BytesMut::with_capacity(1024);
assert_eq!(p.allocated(), 1024 + shared_vec());
buf.reserve(2048);
assert_eq!(p.allocated(), 2048 + shared_vec());
drop(buf);
assert_eq!(p.allocated(), 0);
let mut buf = BytesMut::with_capacity(1024);
assert_eq!(p.allocated(), 1024 + shared_vec());
assert_eq!(p1.allocated(), 0);
p1.move_in(&mut buf);
assert_eq!(p.allocated(), 0);
assert_eq!(p1.allocated(), 1024 + shared_vec());
}
#[ntex::test]
async fn pool_usage() {
use ntex::{time, util};
PoolId::set_spawn_fn(|f| {
let _ = ntex::rt::spawn(f);
});
let p_ref = PoolId::P1.pool_ref().set_pool_size(10 * 1024);
let p1 = p_ref.pool();
let p2 = p_ref.pool();
assert_eq!(Poll::Ready(()), util::lazy(|cx| p1.poll_ready(cx)).await);
let buf = BytesMut::with_capacity_in(11 * 1024, p_ref);
assert_eq!(Poll::Pending, util::lazy(|cx| p1.poll_ready(cx)).await);
assert!(p1.is_pending());
assert_eq!(Poll::Pending, util::lazy(|cx| p2.poll_ready(cx)).await);
assert!(p2.is_pending());
time::sleep(time::Millis(50)).await;
drop(buf);
time::sleep(time::Millis(50)).await;
assert!(!p1.is_pending());
assert!(!p2.is_pending());
assert_eq!(Poll::Ready(()), util::lazy(|cx| p1.poll_ready(cx)).await);
assert_eq!(Poll::Ready(()), util::lazy(|cx| p2.poll_ready(cx)).await);
// pool is full
let buf = BytesMut::with_capacity_in(11 * 1024, p_ref);
assert_eq!(Poll::Pending, util::lazy(|cx| p1.poll_ready(cx)).await);
assert_eq!(Poll::Pending, util::lazy(|cx| p2.poll_ready(cx)).await);
drop(buf);
// pool has some space
let buf = BytesMut::with_capacity_in(10100, p_ref);
time::sleep(time::Millis(50)).await;
assert!(!p1.is_pending());
assert!(p2.is_pending());
// new pools should wait for next update
assert_eq!(Poll::Pending, util::lazy(|cx| p1.poll_ready(cx)).await);
drop(buf);
time::sleep(time::Millis(50)).await;
assert!(!p1.is_pending());
assert!(!p2.is_pending());
}

View file

@ -18,8 +18,8 @@ path = "src/lib.rs"
[dependencies]
bitflags = "1.2"
slab = "0.4"
futures-core = { version = "0.3.13", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.13", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.18", default-features = false, features = ["alloc"] }
pin-project-lite = "0.2.6"
[dev-dependencies]

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.11] - 2021-12-xx
* framed: Use memory pool
## [0.4.10] - 2021-11-29
* Fix potential overflow sub in timer wheel
@ -10,7 +14,6 @@
* Update webpki to 0.22
* Update webpki-roots to 0.22
* Update tokio-rustls to 0.23
* Update tokio-ssl to 0.6.3
* Adapt code for rustls breaking changes
## [0.4.8] - 2021-11-08

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.4.10"
version = "0.4.11"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -49,14 +49,14 @@ ntex-router = "0.5.1"
ntex-service = "0.2.1"
ntex-macros = "0.1.3"
ntex-util = "0.1.1"
ntex-bytes = "0.1.4"
ntex-bytes = "0.1.5"
base64 = "0.13"
bitflags = "1.3"
derive_more = "0.99.14"
fxhash = "0.2.1"
futures-core = { version = "0.3.16", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.16", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3.18", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.18", default-features = false, features = ["alloc"] }
log = "0.4"
mio = "0.7.11"
num_cpus = "1.13"
@ -86,7 +86,7 @@ coo-kie = { version = "0.15", package = "cookie", optional = true }
# openssl
open-ssl = { version="0.10", package = "openssl", optional = true }
tokio-openssl = { version = "0.6.3", optional = true }
tokio-openssl = { version = "0.6", optional = true }
# rustls
rust-tls = { version = "0.20", package = "rustls", optional = true }
@ -97,7 +97,7 @@ webpki-roots = { version = "0.22", optional = true }
# compression
brotli2 = { version="0.3.2", optional = true }
flate2 = { version = "1.0.20", optional = true }
flate2 = { version = "1.0.22", optional = true }
[dev-dependencies]
env_logger = "0.9"
@ -106,4 +106,4 @@ time = "0.2"
open-ssl = { version="0.10", package = "openssl" }
rust-tls = { version = "0.20", package="rustls", features = ["dangerous_configuration"] }
webpki = "0.21"
futures = "0.3.16"
futures = "0.3"

View file

@ -8,7 +8,7 @@ use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use crate::framed::{DispatchItem, Read, ReadTask, State, Timer, Write, WriteTask};
use crate::service::{IntoService, Service};
use crate::time::Seconds;
use crate::util::Either;
use crate::util::{Either, Pool};
type Response<U> = <U as Encoder>::Item;
@ -43,6 +43,7 @@ where
ka_updated: Cell<Instant>,
error: Cell<Option<S::Error>>,
shared: Rc<DispatcherShared<S, U>>,
pool: Pool,
}
struct DispatcherShared<S, U>
@ -128,9 +129,7 @@ where
service: service.into_service(),
fut: None,
inner: DispatcherInner {
state,
timer,
ka_timeout,
pool: state.memory_pool().pool(),
ka_updated: Cell::new(updated),
error: Cell::new(None),
st: Cell::new(DispatcherState::Processing),
@ -139,6 +138,9 @@ where
error: Cell::new(None),
inflight: Cell::new(0),
}),
state,
timer,
ka_timeout,
},
}
}
@ -222,6 +224,12 @@ where
}
}
// handle memory pool pressure
if slf.pool.poll_ready(cx).is_pending() {
read.pause(cx.waker());
return Poll::Pending;
}
loop {
match slf.st.get() {
DispatcherState::Processing => {
@ -517,7 +525,7 @@ mod tests {
use crate::codec::BytesCodec;
use crate::testing::Io;
use crate::time::{sleep, Millis};
use crate::util::{Bytes, Ready};
use crate::util::{Bytes, PoolRef, Ready};
use super::*;
@ -567,6 +575,7 @@ mod tests {
state: state.clone(),
error: Cell::new(None),
st: Cell::new(DispatcherState::Processing),
pool: state.memory_pool().pool(),
},
},
state,
@ -597,11 +606,10 @@ mod tests {
});
sleep(Millis(25)).await;
client.write("GET /test HTTP/1\r\n\r\n");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
client.write("GET /test HTTP/1\r\n\r\n");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
@ -774,7 +782,9 @@ mod tests {
}
}),
);
state.set_buffer_params(8 * 1024, 16 * 1024, 1024);
let pool = PoolRef::default();
pool.set_read_params(8 * 1024, 1024);
pool.set_write_params(16 * 1024, 1024);
crate::rt::spawn(async move {
let _ = disp.await;
});

View file

@ -7,7 +7,7 @@ use slab::Slab;
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use crate::task::LocalWaker;
use crate::time::Seconds;
use crate::util::{poll_fn, Buf, BytesMut, Either};
use crate::util::{poll_fn, Buf, BytesMut, Either, PoolId, PoolRef};
bitflags::bitflags! {
pub struct Flags: u16 {
@ -43,9 +43,7 @@ pub struct State(Rc<IoStateInner>);
pub(crate) struct IoStateInner {
flags: Cell<Flags>,
lw: Cell<u16>,
read_hw: Cell<u16>,
write_hw: Cell<u16>,
pool: Cell<PoolRef>,
disconnect_timeout: Cell<Seconds>,
error: Cell<Option<io::Error>>,
read_task: LocalWaker,
@ -56,29 +54,6 @@ pub(crate) struct IoStateInner {
on_disconnect: RefCell<Slab<Option<LocalWaker>>>,
}
thread_local!(static R_BYTES_POOL: RefCell<Vec<BytesMut>> = RefCell::new(Vec::with_capacity(16)));
thread_local!(static W_BYTES_POOL: RefCell<Vec<BytesMut>> = RefCell::new(Vec::with_capacity(16)));
fn release_to_r_pool(mut buf: BytesMut) {
R_BYTES_POOL.with(|pool| {
let v = &mut pool.borrow_mut();
if v.len() < 16 {
buf.clear();
v.push(buf);
}
})
}
fn release_to_w_pool(mut buf: BytesMut) {
W_BYTES_POOL.with(|pool| {
let v = &mut pool.borrow_mut();
if v.len() < 16 {
buf.clear();
v.push(buf);
}
})
}
impl IoStateInner {
fn insert_flags(&self, f: Flags) {
let mut flags = self.flags.get();
@ -96,13 +71,7 @@ impl IoStateInner {
if let Some(buf) = self.read_buf.take() {
buf
} else {
R_BYTES_POOL.with(|pool| {
if let Some(buf) = pool.borrow_mut().pop() {
buf
} else {
BytesMut::with_capacity(self.read_hw.get() as usize)
}
})
self.pool.get().get_read_buf()
}
}
@ -110,21 +79,13 @@ impl IoStateInner {
if let Some(buf) = self.write_buf.take() {
buf
} else {
W_BYTES_POOL.with(|pool| {
if let Some(buf) = pool.borrow_mut().pop() {
buf
} else {
BytesMut::with_capacity(self.write_hw.get() as usize)
}
})
self.pool.get().get_write_buf()
}
}
fn release_read_buf(&self, buf: BytesMut) {
if buf.is_empty() {
if buf.capacity() > (self.lw.get() as usize) {
release_to_r_pool(buf);
}
self.pool.get().release_read_buf(buf);
} else {
self.read_buf.set(Some(buf));
}
@ -132,10 +93,7 @@ impl IoStateInner {
fn release_write_buf(&self, buf: BytesMut) {
if buf.is_empty() {
let cap = buf.capacity();
if cap > (self.lw.get() as usize) && cap <= self.write_hw.get() as usize {
release_to_w_pool(buf);
}
self.pool.get().release_write_buf(buf);
} else {
self.write_buf.set(Some(buf));
}
@ -145,16 +103,10 @@ impl IoStateInner {
impl Drop for IoStateInner {
fn drop(&mut self) {
if let Some(buf) = self.read_buf.take() {
let cap = buf.capacity();
if cap > (self.lw.get() as usize) && cap <= self.read_hw.get() as usize {
release_to_r_pool(buf);
}
self.pool.get().release_read_buf(buf);
}
if let Some(buf) = self.write_buf.take() {
let cap = buf.capacity();
if cap > (self.lw.get() as usize) && cap <= self.write_hw.get() as usize {
release_to_w_pool(buf);
}
self.pool.get().release_write_buf(buf);
}
}
}
@ -183,12 +135,16 @@ impl State {
#[inline]
/// Create `State` instance
pub fn new() -> Self {
Self::with_memory_pool(PoolId::DEFAULT.pool_ref())
}
#[inline]
/// Create `State` instance with specific memory pool.
pub fn with_memory_pool(pool: PoolRef) -> Self {
State(Rc::new(IoStateInner {
pool: Cell::new(pool),
flags: Cell::new(Flags::empty()),
error: Cell::new(None),
lw: Cell::new(1024),
read_hw: Cell::new(8 * 1024),
write_hw: Cell::new(8 * 1024),
disconnect_timeout: Cell::new(Seconds(1)),
dispatch_task: LocalWaker::new(),
read_task: LocalWaker::new(),
@ -202,13 +158,16 @@ impl State {
#[inline]
/// Create `State` from Framed
pub fn from_framed<Io, U>(framed: Framed<Io, U>) -> (Io, U, Self) {
let parts = framed.into_parts();
let pool = PoolId::DEFAULT.pool_ref();
let mut parts = framed.into_parts();
let read_buf = if !parts.read_buf.is_empty() {
pool.move_in(&mut parts.read_buf);
Cell::new(Some(parts.read_buf))
} else {
Cell::new(None)
};
let write_buf = if !parts.write_buf.is_empty() {
pool.move_in(&mut parts.write_buf);
Cell::new(Some(parts.write_buf))
} else {
Cell::new(None)
@ -217,11 +176,9 @@ impl State {
let state = State(Rc::new(IoStateInner {
read_buf,
write_buf,
pool: Cell::new(pool),
flags: Cell::new(Flags::empty()),
error: Cell::new(None),
lw: Cell::new(1024),
read_hw: Cell::new(8 * 1024),
write_hw: Cell::new(8 * 1024),
disconnect_timeout: Cell::new(Seconds(1)),
dispatch_task: LocalWaker::new(),
read_task: LocalWaker::new(),
@ -231,20 +188,19 @@ impl State {
(parts.io, parts.codec, state)
}
#[doc(hidden)]
#[inline]
/// Create `State` instance with custom params
pub fn with_params(
max_read_buf_size: u16,
max_write_buf_size: u16,
min_buf_size: u16,
_max_read_buf_size: u16,
_max_write_buf_size: u16,
_min_buf_size: u16,
disconnect_timeout: Seconds,
) -> Self {
State(Rc::new(IoStateInner {
pool: Cell::new(PoolId::DEFAULT.pool_ref()),
flags: Cell::new(Flags::empty()),
error: Cell::new(None),
lw: Cell::new(min_buf_size),
read_hw: Cell::new(max_read_buf_size),
write_hw: Cell::new(max_write_buf_size),
disconnect_timeout: Cell::new(disconnect_timeout),
dispatch_task: LocalWaker::new(),
read_buf: Cell::new(None),
@ -275,10 +231,8 @@ impl State {
pub(crate) fn keepalive_timeout(&self) {
let state = self.0.as_ref();
let mut flags = state.flags.get();
flags.insert(Flags::DSP_KEEPALIVE);
state.flags.set(flags);
state.dispatch_task.wake();
state.insert_flags(Flags::DSP_KEEPALIVE);
}
pub(super) fn get_disconnect_timeout(&self) -> Seconds {
@ -304,19 +258,38 @@ impl State {
self.0.flags.get()
}
#[inline]
/// Get memory pool
pub fn memory_pool(&self) -> PoolRef {
self.0.pool.get()
}
#[inline]
/// Set memory pool
pub fn set_memory_pool(&self, pool: PoolRef) {
if let Some(mut buf) = self.0.read_buf.take() {
pool.move_in(&mut buf);
self.0.read_buf.set(Some(buf));
}
if let Some(mut buf) = self.0.write_buf.take() {
pool.move_in(&mut buf);
self.0.write_buf.set(Some(buf));
}
self.0.pool.set(pool)
}
#[doc(hidden)]
#[deprecated(since = "0.4.11", note = "Use memory pool config")]
#[inline]
/// Set read/write buffer sizes
///
/// By default read max buf size is 8kb, write max buf size is 8kb
pub fn set_buffer_params(
&self,
max_read_buf_size: u16,
max_write_buf_size: u16,
min_buf_size: u16,
_max_read_buf_size: u16,
_max_write_buf_size: u16,
_min_buf_size: u16,
) {
self.0.read_hw.set(max_read_buf_size);
self.0.write_hw.set(max_write_buf_size);
self.0.lw.set(min_buf_size);
}
#[inline]
@ -622,7 +595,7 @@ impl State {
T: AsyncRead + AsyncWrite + Unpin,
{
let inner = self.0.as_ref();
let lw = inner.lw.get() as usize;
let (hw, lw) = inner.pool.get().read_params().unpack();
let mut buf = inner.get_read_buf();
// read data from socket
@ -631,7 +604,7 @@ impl State {
// make sure we've got room
let remaining = buf.capacity() - buf.len();
if remaining < lw {
buf.reserve((inner.read_hw.get() as usize) - remaining);
buf.reserve(hw - remaining);
}
match crate::codec::poll_read_buf(Pin::new(&mut *io), cx, &mut buf) {
@ -643,7 +616,7 @@ impl State {
self.set_io_error(None);
return false;
} else {
if buf.len() > inner.read_hw.get() as usize {
if buf.len() > hw {
log::trace!(
"buffer is too large {}, enable read back-pressure",
buf.len()
@ -736,7 +709,7 @@ impl State {
}
// if write buffer is smaller than high watermark value, turn off back-pressure
if buf.len() < self.0.write_hw.get() as usize {
if buf.len() < self.0.pool.get().write_params_high() {
let mut flags = self.0.flags.get();
if flags.contains(Flags::WR_BACKPRESSURE) {
flags.remove(Flags::WR_BACKPRESSURE);
@ -783,7 +756,8 @@ impl<'a> Write<'a> {
/// Check if write buffer is full
pub fn is_full(&self) -> bool {
if let Some(buf) = self.0.read_buf.take() {
let result = buf.len() >= self.0.write_hw.get() as usize;
let hw = self.0.pool.get().write_params_high();
let result = buf.len() >= hw;
self.0.write_buf.set(Some(buf));
result
} else {
@ -841,11 +815,12 @@ impl<'a> Write<'a> {
if !flags.intersects(Flags::IO_ERR | Flags::IO_SHUTDOWN) {
let mut buf = self.0.get_write_buf();
let is_write_sleep = buf.is_empty();
let (hw, lw) = self.0.pool.get().write_params().unpack();
// make sure we've got room
let remaining = buf.capacity() - buf.len();
if remaining < self.0.lw.get() as usize {
buf.reserve((self.0.write_hw.get() as usize) - remaining);
if remaining < lw {
buf.reserve(hw - remaining);
}
// encode item and wake write task
@ -853,7 +828,7 @@ impl<'a> Write<'a> {
if is_write_sleep {
self.0.write_task.wake();
}
buf.len() < self.0.write_hw.get() as usize
buf.len() < hw
});
self.0.write_buf.set(Some(buf));
result
@ -879,11 +854,12 @@ impl<'a> Write<'a> {
Ok(Some(item)) => {
let mut buf = self.0.get_write_buf();
let is_write_sleep = buf.is_empty();
let (hw, lw) = self.0.pool.get().write_params().unpack();
// make sure we've got room
let remaining = buf.capacity() - buf.len();
if remaining < self.0.lw.get() as usize {
buf.reserve((self.0.write_hw.get() as usize) - remaining);
if remaining < lw {
buf.reserve(hw - remaining);
}
// encode item
@ -896,7 +872,7 @@ impl<'a> Write<'a> {
} else if is_write_sleep {
self.0.write_task.wake();
}
let result = Ok(buf.len() < self.0.write_hw.get() as usize);
let result = Ok(buf.len() < hw);
self.0.write_buf.set(Some(buf));
result
}
@ -927,7 +903,7 @@ impl<'a> Read<'a> {
/// Check if read buffer is full
pub fn is_full(&self) -> bool {
if let Some(buf) = self.0.read_buf.take() {
let result = buf.len() >= self.0.read_hw.get() as usize;
let result = buf.len() >= self.0.pool.get().read_params_high();
self.0.read_buf.set(Some(buf));
result
} else {
@ -983,13 +959,10 @@ impl<'a> Read<'a> {
where
U: Decoder,
{
if let Some(mut buf) = self.0.read_buf.take() {
let result = codec.decode(&mut buf);
self.0.release_read_buf(buf);
result
} else {
codec.decode(&mut BytesMut::new())
}
let mut buf = self.0.get_read_buf();
let result = codec.decode(&mut buf);
self.0.release_read_buf(buf);
result
}
/// Get mut access to read buffer
@ -997,13 +970,10 @@ impl<'a> Read<'a> {
where
F: FnOnce(&mut BytesMut) -> R,
{
if let Some(mut buf) = self.0.read_buf.take() {
let res = f(&mut buf);
self.0.release_read_buf(buf);
res
} else {
f(&mut BytesMut::new())
}
let mut buf = self.0.get_read_buf();
let res = f(&mut buf);
self.0.release_read_buf(buf);
res
}
}

View file

@ -4,7 +4,7 @@ use crate::framed::Timer;
use crate::http::{Request, Response};
use crate::service::boxed::BoxService;
use crate::time::{sleep, Millis, Seconds, Sleep};
use crate::util::BytesMut;
use crate::util::{BytesMut, PoolRef};
#[derive(Debug, PartialEq, Clone, Copy)]
/// Server keep-alive setting
@ -87,6 +87,10 @@ impl ServiceConfig {
};
let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO };
PoolRef::default()
.set_read_params(read_hw, lw)
.set_write_params(write_hw, lw);
ServiceConfig(Rc::new(Inner {
keep_alive,
ka_enabled,

View file

@ -992,7 +992,9 @@ mod tests {
let mut h1 = h1(server, |_| {
Box::pin(async { Ok::<_, io::Error>(Response::Ok().finish()) })
});
h1.inner.state.set_buffer_params(15 * 1024, 15 * 1024, 1024);
crate::util::PoolRef::default()
.set_read_params(15 * 1024, 1024)
.set_write_params(15 * 1024, 1024);
let mut decoder = ClientCodec::default();

View file

@ -10,7 +10,7 @@ pub mod variant;
pub use self::extensions::Extensions;
pub use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
pub use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut, Pool, PoolId, PoolRef};
pub use ntex_util::future::*;
pub type HashMap<K, V> = std::collections::HashMap<K, V, fxhash::FxBuildHasher>;

View file

@ -242,9 +242,7 @@ mod tests {
Ok(Some((finished, opcode, payload))) => F {
finished,
opcode,
payload: payload
.map(|b| b.freeze())
.unwrap_or_else(|| Bytes::from("")),
payload: payload.map(|b| b.freeze()).unwrap_or_else(Bytes::new),
},
_ => unreachable!("error"),
}

View file

@ -64,7 +64,7 @@ where
loop {
if !this.buf.is_empty() {
match this.codec.decode(&mut this.buf) {
match this.codec.decode(this.buf) {
Ok(Some(item)) => return Poll::Ready(Some(Ok(item))),
Ok(None) => (),
Err(err) => return Poll::Ready(Some(Err(err.into()))),

View file

@ -49,7 +49,7 @@ mod danger {
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: SystemTime,
) -> Result<rust_tls::client::ServerCertVerified, rust_tls::TLSError> {
) -> Result<rust_tls::client::ServerCertVerified, rust_tls::Error> {
Ok(rust_tls::client::ServerCertVerified::assertion())
}
}