Make memory pools optional (#340)

This commit is contained in:
Nikolay Kim 2024-04-04 22:04:37 +05:00 committed by GitHub
parent 9bd05487de
commit 0b73ae8fa8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 294 additions and 265 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.26] (2024-04-04)
* Make memory pools optional
## [0.1.25] (2024-04-02)
* Fix pool waiters management

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-bytes"
version = "0.1.25"
version = "0.1.26"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Carl Lerche <me@carllerche.com>"]
description = "Types and traits for working with bytes (bytes crate fork)"
documentation = "https://docs.rs/ntex-bytes"
@ -17,6 +17,8 @@ default = []
# simd utf8 check support
simd = ["simdutf8"]
mpool = []
[dependencies]
bitflags = "2"
bytes = "1"
@ -28,3 +30,4 @@ simdutf8 = { version = "0.1.4", optional = true }
serde_test = "1"
serde_json = "1"
ntex = { version = "1", features = ["tokio"] }
ntex-bytes = { version = "*", features = ["mpool"] }

View file

@ -1,14 +1,14 @@
#![allow(clippy::type_complexity)]
use std::sync::atomic::Ordering::{Relaxed, Release};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::task::{Context, Poll, Waker};
use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, ptr, rc::Rc};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed};
use std::task::{Context, Poll};
use std::{cell::Cell, cell::RefCell, fmt, future::Future, pin::Pin, ptr, rc::Rc};
use futures_core::task::__internal::AtomicWaker;
use crate::{BufMut, BytesMut, BytesVec};
pub struct Pool {
#[cfg(feature = "mpool")]
idx: Cell<usize>,
inner: &'static MemoryPool,
}
@ -37,7 +37,8 @@ struct MemoryPool {
id: PoolId,
waker: AtomicWaker,
waker_alive: AtomicBool,
waiters: RefCell<Waiters>,
#[cfg(feature = "mpool")]
waiters: RefCell<mpool::Waiters>,
flags: Cell<Flags>,
size: AtomicUsize,
@ -81,6 +82,7 @@ impl PoolId {
#[inline]
pub fn pool(self) -> Pool {
POOLS.with(|pools| Pool {
#[cfg(feature = "mpool")]
idx: Cell::new(usize::MAX),
inner: pools[self.0 as usize],
})
@ -173,6 +175,7 @@ impl PoolRef {
/// Get `Pool` instance for this pool ref.
pub fn pool(self) -> Pool {
Pool {
#[cfg(feature = "mpool")]
idx: Cell::new(0),
inner: self.0,
}
@ -234,9 +237,12 @@ impl PoolRef {
self.0.windows.set(windows);
// release old waiters
let mut waiters = self.0.waiters.borrow_mut();
while let Some(waker) = waiters.consume() {
waker.wake();
#[cfg(feature = "mpool")]
{
let mut waiters = self.0.waiters.borrow_mut();
while let Some(waker) = waiters.consume() {
waker.wake();
}
}
self
@ -424,7 +430,8 @@ impl MemoryPool {
id,
waker: AtomicWaker::new(),
waker_alive: AtomicBool::new(false),
waiters: RefCell::new(Waiters::new()),
#[cfg(feature = "mpool")]
waiters: RefCell::new(mpool::Waiters::new()),
flags: Cell::new(Flags::empty()),
size: AtomicUsize::new(0),
@ -462,6 +469,7 @@ impl Clone for Pool {
#[inline]
fn clone(&self) -> Pool {
Pool {
#[cfg(feature = "mpool")]
idx: Cell::new(usize::MAX),
inner: self.inner,
}
@ -482,6 +490,7 @@ impl From<PoolRef> for Pool {
}
}
#[cfg(feature = "mpool")]
impl Drop for Pool {
fn drop(&mut self) {
// cleanup waiter
@ -512,10 +521,15 @@ impl Pool {
#[inline]
/// Check if pool is ready
pub fn is_ready(&self) -> bool {
let idx = self.idx.get();
if idx != usize::MAX {
if let Some(Entry::Occupied(_)) = self.inner.waiters.borrow().entries.get(idx) {
return false;
#[cfg(feature = "mpool")]
{
let idx = self.idx.get();
if idx != usize::MAX {
if let Some(mpool::Entry::Occupied(_)) =
self.inner.waiters.borrow().entries.get(idx)
{
return false;
}
}
}
true
@ -528,7 +542,8 @@ impl Pool {
}
#[inline]
pub fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
pub fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll<()> {
#[cfg(feature = "mpool")]
if self.inner.max_size.get() > 0 {
let window_l = self.inner.window_l.get();
if window_l == 0 {
@ -554,10 +569,10 @@ impl Pool {
let new = {
let idx = self.idx.get();
if idx == usize::MAX {
self.idx.set(waiters.append(ctx.waker().clone()));
self.idx.set(waiters.append(_ctx.waker().clone()));
true
} else {
waiters.update(idx, ctx.waker().clone())
waiters.update(idx, _ctx.waker().clone())
}
};
@ -579,7 +594,7 @@ impl Pool {
if !flags.contains(Flags::SPAWNED) {
flags.insert(Flags::SPAWNED);
self.inner.flags.set(flags);
spawn(Box::pin(Driver { pool: self.inner }))
spawn(Box::pin(mpool::Driver { pool: self.inner }))
}
return Poll::Pending;
}
@ -588,284 +603,291 @@ impl Pool {
}
}
struct Driver {
pool: &'static MemoryPool,
}
#[cfg(feature = "mpool")]
mod mpool {
use std::{mem, sync::atomic::Ordering::Release, task::Waker};
impl Driver {
fn release(&self, waiters_num: usize) {
let mut waiters = self.pool.waiters.borrow_mut();
use super::*;
let mut to_release = waiters.occupied_len >> 4;
if waiters_num > to_release {
to_release += waiters_num >> 1;
} else {
to_release += waiters_num;
}
pub(super) struct Driver {
pub(super) pool: &'static MemoryPool,
}
while to_release > 0 {
if let Some(waker) = waiters.consume() {
waker.wake();
to_release -= 1;
impl Driver {
pub(super) fn release(&self, waiters_num: usize) {
let mut waiters = self.pool.waiters.borrow_mut();
let mut to_release = waiters.occupied_len >> 4;
if waiters_num > to_release {
to_release += waiters_num >> 1;
} else {
break;
to_release += waiters_num;
}
}
}
fn release_all(&self) {
let mut waiters = self.pool.waiters.borrow_mut();
while let Some(waker) = waiters.consume() {
waker.wake();
}
}
}
impl Future for Driver {
type Output = ();
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pool = self.as_ref().pool;
let allocated = pool.size.load(Relaxed);
let win_l = pool.window_l.get();
let win_h = pool.window_h.get();
// allocated size is decreased, release waiters
if allocated < win_l {
let mut idx = pool.window_idx.get() + 1;
let mut waiters = pool.window_waiters.get();
let windows = pool.windows.get();
loop {
// allocated size decreased more than 10%, release all
if idx == 10 {
self.release_all();
pool.window_l.set(windows[0].0);
pool.window_h.set(windows[0].1);
pool.window_idx.set(0);
pool.window_waiters.set(0);
pool.flags.set(Flags::INCREASED);
return Poll::Ready(());
while to_release > 0 {
if let Some(waker) = waiters.consume() {
waker.wake();
to_release -= 1;
} else {
// release 6% of pending waiters
self.release(waiters);
if allocated > windows[idx].0 {
pool.window_l.set(windows[idx].0);
pool.window_h.set(windows[idx].1);
pool.window_idx.set(idx);
pool.window_waiters.set(0);
pool.flags.set(Flags::SPAWNED);
break;
}
idx += 1;
waiters = 0;
break;
}
}
}
// allocated size is increased
else if allocated > win_h {
// reset window
let idx = pool.window_idx.get() - 1;
let windows = pool.windows.get();
pool.window_l.set(windows[idx].0);
pool.window_h.set(windows[idx].1);
pool.window_idx.set(idx);
pool.window_waiters.set(0);
pool.flags.set(Flags::SPAWNED | Flags::INCREASED);
}
// register waker
pool.waker.register(cx.waker());
pool.waker_alive.store(true, Release);
Poll::Pending
}
}
struct Waiters {
entries: Vec<Entry>,
root: usize,
tail: usize,
free: usize,
len: usize,
occupied_len: usize,
}
#[derive(Debug)]
enum Entry {
Vacant(usize),
Consumed,
Occupied(Node),
}
#[derive(Debug)]
struct Node {
item: Waker,
prev: usize,
next: usize,
}
impl Waiters {
fn new() -> Waiters {
Waiters {
entries: Vec::new(),
root: usize::MAX,
tail: usize::MAX,
free: 0,
len: 0,
occupied_len: 0,
pub(super) fn release_all(&self) {
let mut waiters = self.pool.waiters.borrow_mut();
while let Some(waker) = waiters.consume() {
waker.wake();
}
}
}
fn get_node(&mut self, key: usize) -> &mut Node {
if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) {
return node;
}
unreachable!()
}
impl Future for Driver {
type Output = ();
// consume root item
fn consume(&mut self) -> Option<Waker> {
if self.root != usize::MAX {
self.occupied_len -= 1;
let entry =
mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed);
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pool = self.as_ref().pool;
let allocated = pool.size.load(Relaxed);
match entry {
Entry::Occupied(node) => {
debug_assert!(node.prev == usize::MAX);
let win_l = pool.window_l.get();
let win_h = pool.window_h.get();
// last item
if self.tail == self.root {
self.tail = usize::MAX;
self.root = usize::MAX;
// allocated size is decreased, release waiters
if allocated < win_l {
let mut idx = pool.window_idx.get() + 1;
let mut waiters = pool.window_waiters.get();
let windows = pool.windows.get();
loop {
// allocated size decreased more than 10%, release all
if idx == 10 {
self.release_all();
pool.window_l.set(windows[0].0);
pool.window_h.set(windows[0].1);
pool.window_idx.set(0);
pool.window_waiters.set(0);
pool.flags.set(Flags::INCREASED);
return Poll::Ready(());
} else {
// release 6% of pending waiters
self.release(waiters);
if allocated > windows[idx].0 {
pool.window_l.set(windows[idx].0);
pool.window_h.set(windows[idx].1);
pool.window_idx.set(idx);
pool.window_waiters.set(0);
pool.flags.set(Flags::SPAWNED);
break;
}
idx += 1;
waiters = 0;
}
}
}
// allocated size is increased
else if allocated > win_h {
// reset window
let idx = pool.window_idx.get() - 1;
let windows = pool.windows.get();
pool.window_l.set(windows[idx].0);
pool.window_h.set(windows[idx].1);
pool.window_idx.set(idx);
pool.window_waiters.set(0);
pool.flags.set(Flags::SPAWNED | Flags::INCREASED);
}
// register waker
pool.waker.register(cx.waker());
pool.waker_alive.store(true, Release);
Poll::Pending
}
}
pub(super) struct Waiters {
pub(super) entries: Vec<Entry>,
root: usize,
tail: usize,
free: usize,
len: usize,
occupied_len: usize,
}
#[derive(Debug)]
pub(super) enum Entry {
Vacant(usize),
Consumed,
Occupied(Node),
}
#[derive(Debug)]
pub(super) struct Node {
item: Waker,
prev: usize,
next: usize,
}
impl Waiters {
pub(super) fn new() -> Waiters {
Waiters {
entries: Vec::new(),
root: usize::MAX,
tail: usize::MAX,
free: 0,
len: 0,
occupied_len: 0,
}
}
fn get_node(&mut self, key: usize) -> &mut Node {
if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) {
return node;
}
unreachable!()
}
// consume root item
pub(super) fn consume(&mut self) -> Option<Waker> {
if self.root != usize::MAX {
self.occupied_len -= 1;
let entry =
mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed);
match entry {
Entry::Occupied(node) => {
debug_assert!(node.prev == usize::MAX);
// last item
if self.tail == self.root {
self.tail = usize::MAX;
self.root = usize::MAX;
} else {
// remove from root
self.root = node.next;
if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
}
}
Some(node.item)
}
_ => unreachable!(),
}
} else {
None
}
}
pub(super) fn update(&mut self, idx: usize, val: Waker) -> bool {
let entry = self
.entries
.get_mut(idx)
.expect("Entry is expected to exist");
match entry {
Entry::Occupied(ref mut node) => {
node.item = val;
false
}
Entry::Consumed => {
// append to the tail
*entry = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
self.occupied_len += 1;
if self.root == usize::MAX {
self.root = idx;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}
self.tail = idx;
true
}
Entry::Vacant(_) => unreachable!(),
}
}
pub(super) fn remove(&mut self, key: usize) {
if let Some(entry) = self.entries.get_mut(key) {
// Swap the entry at the provided value
let entry = mem::replace(entry, Entry::Vacant(self.free));
self.len -= 1;
self.free = key;
match entry {
Entry::Occupied(node) => {
self.occupied_len -= 1;
// remove from root
self.root = node.next;
if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
if self.root == key {
self.root = node.next;
if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
}
}
// remove from tail
if self.tail == key {
self.tail = node.prev;
if self.tail != usize::MAX {
self.get_node(self.tail).next = usize::MAX;
}
}
}
Some(node.item)
Entry::Consumed => {}
Entry::Vacant(_) => unreachable!(),
}
_ => unreachable!(),
}
} else {
None
}
}
fn update(&mut self, idx: usize, val: Waker) -> bool {
let entry = self
.entries
.get_mut(idx)
.expect("Entry is expected to exist");
match entry {
Entry::Occupied(ref mut node) => {
node.item = val;
false
if self.len == 0 {
self.entries.truncate(128);
}
}
Entry::Consumed => {
// append to the tail
*entry = Entry::Occupied(Node {
}
pub(super) fn append(&mut self, val: Waker) -> usize {
let idx = self.free;
self.len += 1;
self.occupied_len += 1;
// root points to first entry, append to empty list
if self.root == usize::MAX {
self.root = idx;
}
// tail points to last entry
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}
// append item to entries, first free item is not allocated yet
if idx == self.entries.len() {
self.entries.push(Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
}));
self.tail = idx;
self.free = idx + 1;
} else {
// entries has enough capacity
self.free = match self.entries.get(idx) {
Some(&Entry::Vacant(next)) => next,
_ => unreachable!(),
};
self.entries[idx] = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
self.occupied_len += 1;
if self.root == usize::MAX {
self.root = idx;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}
self.tail = idx;
true
}
Entry::Vacant(_) => unreachable!(),
}
}
fn remove(&mut self, key: usize) {
if let Some(entry) = self.entries.get_mut(key) {
// Swap the entry at the provided value
let entry = mem::replace(entry, Entry::Vacant(self.free));
self.len -= 1;
self.free = key;
match entry {
Entry::Occupied(node) => {
self.occupied_len -= 1;
// remove from root
if self.root == key {
self.root = node.next;
if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
}
}
// remove from tail
if self.tail == key {
self.tail = node.prev;
if self.tail != usize::MAX {
self.get_node(self.tail).next = usize::MAX;
}
}
}
Entry::Consumed => {}
Entry::Vacant(_) => unreachable!(),
}
if self.len == 0 {
self.entries.truncate(128);
}
idx
}
}
fn append(&mut self, val: Waker) -> usize {
let idx = self.free;
self.len += 1;
self.occupied_len += 1;
// root points to first entry, append to empty list
if self.root == usize::MAX {
self.root = idx;
}
// tail points to last entry
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}
// append item to entries, first free item is not allocated yet
if idx == self.entries.len() {
self.entries.push(Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
}));
self.tail = idx;
self.free = idx + 1;
} else {
// entries has enough capacity
self.free = match self.entries.get(idx) {
Some(&Entry::Vacant(next)) => next,
_ => unreachable!(),
};
self.entries[idx] = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
self.tail = idx;
}
idx
}
}