From 0b73ae8fa8cd8a4e04e18ddfe0a2b653cf897587 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 4 Apr 2024 22:04:37 +0500 Subject: [PATCH] Make memory pools optional (#340) --- ntex-bytes/CHANGELOG.md | 4 + ntex-bytes/Cargo.toml | 5 +- ntex-bytes/src/pool.rs | 550 +++++++++++++++++++++------------------- 3 files changed, 294 insertions(+), 265 deletions(-) diff --git a/ntex-bytes/CHANGELOG.md b/ntex-bytes/CHANGELOG.md index d7b3b277..4f8ebe89 100644 --- a/ntex-bytes/CHANGELOG.md +++ b/ntex-bytes/CHANGELOG.md @@ -1,5 +1,9 @@ # Changes +## [0.1.26] (2024-04-04) + +* Make memory pools optional + ## [0.1.25] (2024-04-02) * Fix pool waiters management diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index d2a06080..fc11acbf 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-bytes" -version = "0.1.25" +version = "0.1.26" authors = ["Nikolay Kim ", "Carl Lerche "] description = "Types and traits for working with bytes (bytes crate fork)" documentation = "https://docs.rs/ntex-bytes" @@ -17,6 +17,8 @@ default = [] # simd utf8 check support simd = ["simdutf8"] +mpool = [] + [dependencies] bitflags = "2" bytes = "1" @@ -28,3 +30,4 @@ simdutf8 = { version = "0.1.4", optional = true } serde_test = "1" serde_json = "1" ntex = { version = "1", features = ["tokio"] } +ntex-bytes = { version = "*", features = ["mpool"] } diff --git a/ntex-bytes/src/pool.rs b/ntex-bytes/src/pool.rs index 3b9e46b4..dc07147d 100644 --- a/ntex-bytes/src/pool.rs +++ b/ntex-bytes/src/pool.rs @@ -1,14 +1,14 @@ #![allow(clippy::type_complexity)] -use std::sync::atomic::Ordering::{Relaxed, Release}; -use std::sync::atomic::{AtomicBool, AtomicUsize}; -use std::task::{Context, Poll, Waker}; -use std::{cell::Cell, cell::RefCell, fmt, future::Future, mem, pin::Pin, ptr, rc::Rc}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed}; +use std::task::{Context, Poll}; +use std::{cell::Cell, cell::RefCell, fmt, future::Future, pin::Pin, ptr, rc::Rc}; use futures_core::task::__internal::AtomicWaker; use crate::{BufMut, BytesMut, BytesVec}; pub struct Pool { + #[cfg(feature = "mpool")] idx: Cell, inner: &'static MemoryPool, } @@ -37,7 +37,8 @@ struct MemoryPool { id: PoolId, waker: AtomicWaker, waker_alive: AtomicBool, - waiters: RefCell, + #[cfg(feature = "mpool")] + waiters: RefCell, flags: Cell, size: AtomicUsize, @@ -81,6 +82,7 @@ impl PoolId { #[inline] pub fn pool(self) -> Pool { POOLS.with(|pools| Pool { + #[cfg(feature = "mpool")] idx: Cell::new(usize::MAX), inner: pools[self.0 as usize], }) @@ -173,6 +175,7 @@ impl PoolRef { /// Get `Pool` instance for this pool ref. pub fn pool(self) -> Pool { Pool { + #[cfg(feature = "mpool")] idx: Cell::new(0), inner: self.0, } @@ -234,9 +237,12 @@ impl PoolRef { self.0.windows.set(windows); // release old waiters - let mut waiters = self.0.waiters.borrow_mut(); - while let Some(waker) = waiters.consume() { - waker.wake(); + #[cfg(feature = "mpool")] + { + let mut waiters = self.0.waiters.borrow_mut(); + while let Some(waker) = waiters.consume() { + waker.wake(); + } } self @@ -424,7 +430,8 @@ impl MemoryPool { id, waker: AtomicWaker::new(), waker_alive: AtomicBool::new(false), - waiters: RefCell::new(Waiters::new()), + #[cfg(feature = "mpool")] + waiters: RefCell::new(mpool::Waiters::new()), flags: Cell::new(Flags::empty()), size: AtomicUsize::new(0), @@ -462,6 +469,7 @@ impl Clone for Pool { #[inline] fn clone(&self) -> Pool { Pool { + #[cfg(feature = "mpool")] idx: Cell::new(usize::MAX), inner: self.inner, } @@ -482,6 +490,7 @@ impl From for Pool { } } +#[cfg(feature = "mpool")] impl Drop for Pool { fn drop(&mut self) { // cleanup waiter @@ -512,10 +521,15 @@ impl Pool { #[inline] /// Check if pool is ready pub fn is_ready(&self) -> bool { - let idx = self.idx.get(); - if idx != usize::MAX { - if let Some(Entry::Occupied(_)) = self.inner.waiters.borrow().entries.get(idx) { - return false; + #[cfg(feature = "mpool")] + { + let idx = self.idx.get(); + if idx != usize::MAX { + if let Some(mpool::Entry::Occupied(_)) = + self.inner.waiters.borrow().entries.get(idx) + { + return false; + } } } true @@ -528,7 +542,8 @@ impl Pool { } #[inline] - pub fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> { + pub fn poll_ready(&self, _ctx: &mut Context<'_>) -> Poll<()> { + #[cfg(feature = "mpool")] if self.inner.max_size.get() > 0 { let window_l = self.inner.window_l.get(); if window_l == 0 { @@ -554,10 +569,10 @@ impl Pool { let new = { let idx = self.idx.get(); if idx == usize::MAX { - self.idx.set(waiters.append(ctx.waker().clone())); + self.idx.set(waiters.append(_ctx.waker().clone())); true } else { - waiters.update(idx, ctx.waker().clone()) + waiters.update(idx, _ctx.waker().clone()) } }; @@ -579,7 +594,7 @@ impl Pool { if !flags.contains(Flags::SPAWNED) { flags.insert(Flags::SPAWNED); self.inner.flags.set(flags); - spawn(Box::pin(Driver { pool: self.inner })) + spawn(Box::pin(mpool::Driver { pool: self.inner })) } return Poll::Pending; } @@ -588,284 +603,291 @@ impl Pool { } } -struct Driver { - pool: &'static MemoryPool, -} +#[cfg(feature = "mpool")] +mod mpool { + use std::{mem, sync::atomic::Ordering::Release, task::Waker}; -impl Driver { - fn release(&self, waiters_num: usize) { - let mut waiters = self.pool.waiters.borrow_mut(); + use super::*; - let mut to_release = waiters.occupied_len >> 4; - if waiters_num > to_release { - to_release += waiters_num >> 1; - } else { - to_release += waiters_num; - } + pub(super) struct Driver { + pub(super) pool: &'static MemoryPool, + } - while to_release > 0 { - if let Some(waker) = waiters.consume() { - waker.wake(); - to_release -= 1; + impl Driver { + pub(super) fn release(&self, waiters_num: usize) { + let mut waiters = self.pool.waiters.borrow_mut(); + + let mut to_release = waiters.occupied_len >> 4; + if waiters_num > to_release { + to_release += waiters_num >> 1; } else { - break; + to_release += waiters_num; } - } - } - fn release_all(&self) { - let mut waiters = self.pool.waiters.borrow_mut(); - while let Some(waker) = waiters.consume() { - waker.wake(); - } - } -} - -impl Future for Driver { - type Output = (); - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let pool = self.as_ref().pool; - let allocated = pool.size.load(Relaxed); - - let win_l = pool.window_l.get(); - let win_h = pool.window_h.get(); - - // allocated size is decreased, release waiters - if allocated < win_l { - let mut idx = pool.window_idx.get() + 1; - let mut waiters = pool.window_waiters.get(); - let windows = pool.windows.get(); - - loop { - // allocated size decreased more than 10%, release all - if idx == 10 { - self.release_all(); - - pool.window_l.set(windows[0].0); - pool.window_h.set(windows[0].1); - pool.window_idx.set(0); - pool.window_waiters.set(0); - pool.flags.set(Flags::INCREASED); - return Poll::Ready(()); + while to_release > 0 { + if let Some(waker) = waiters.consume() { + waker.wake(); + to_release -= 1; } else { - // release 6% of pending waiters - self.release(waiters); - - if allocated > windows[idx].0 { - pool.window_l.set(windows[idx].0); - pool.window_h.set(windows[idx].1); - pool.window_idx.set(idx); - pool.window_waiters.set(0); - pool.flags.set(Flags::SPAWNED); - break; - } - idx += 1; - waiters = 0; + break; } } } - // allocated size is increased - else if allocated > win_h { - // reset window - let idx = pool.window_idx.get() - 1; - let windows = pool.windows.get(); - pool.window_l.set(windows[idx].0); - pool.window_h.set(windows[idx].1); - pool.window_idx.set(idx); - pool.window_waiters.set(0); - pool.flags.set(Flags::SPAWNED | Flags::INCREASED); - } - // register waker - pool.waker.register(cx.waker()); - pool.waker_alive.store(true, Release); - - Poll::Pending - } -} - -struct Waiters { - entries: Vec, - root: usize, - tail: usize, - free: usize, - len: usize, - occupied_len: usize, -} - -#[derive(Debug)] -enum Entry { - Vacant(usize), - Consumed, - Occupied(Node), -} - -#[derive(Debug)] -struct Node { - item: Waker, - prev: usize, - next: usize, -} - -impl Waiters { - fn new() -> Waiters { - Waiters { - entries: Vec::new(), - root: usize::MAX, - tail: usize::MAX, - free: 0, - len: 0, - occupied_len: 0, + pub(super) fn release_all(&self) { + let mut waiters = self.pool.waiters.borrow_mut(); + while let Some(waker) = waiters.consume() { + waker.wake(); + } } } - fn get_node(&mut self, key: usize) -> &mut Node { - if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) { - return node; - } - unreachable!() - } + impl Future for Driver { + type Output = (); - // consume root item - fn consume(&mut self) -> Option { - if self.root != usize::MAX { - self.occupied_len -= 1; - let entry = - mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed); + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pool = self.as_ref().pool; + let allocated = pool.size.load(Relaxed); - match entry { - Entry::Occupied(node) => { - debug_assert!(node.prev == usize::MAX); + let win_l = pool.window_l.get(); + let win_h = pool.window_h.get(); - // last item - if self.tail == self.root { - self.tail = usize::MAX; - self.root = usize::MAX; + // allocated size is decreased, release waiters + if allocated < win_l { + let mut idx = pool.window_idx.get() + 1; + let mut waiters = pool.window_waiters.get(); + let windows = pool.windows.get(); + + loop { + // allocated size decreased more than 10%, release all + if idx == 10 { + self.release_all(); + + pool.window_l.set(windows[0].0); + pool.window_h.set(windows[0].1); + pool.window_idx.set(0); + pool.window_waiters.set(0); + pool.flags.set(Flags::INCREASED); + return Poll::Ready(()); } else { + // release 6% of pending waiters + self.release(waiters); + + if allocated > windows[idx].0 { + pool.window_l.set(windows[idx].0); + pool.window_h.set(windows[idx].1); + pool.window_idx.set(idx); + pool.window_waiters.set(0); + pool.flags.set(Flags::SPAWNED); + break; + } + idx += 1; + waiters = 0; + } + } + } + // allocated size is increased + else if allocated > win_h { + // reset window + let idx = pool.window_idx.get() - 1; + let windows = pool.windows.get(); + pool.window_l.set(windows[idx].0); + pool.window_h.set(windows[idx].1); + pool.window_idx.set(idx); + pool.window_waiters.set(0); + pool.flags.set(Flags::SPAWNED | Flags::INCREASED); + } + + // register waker + pool.waker.register(cx.waker()); + pool.waker_alive.store(true, Release); + + Poll::Pending + } + } + + pub(super) struct Waiters { + pub(super) entries: Vec, + root: usize, + tail: usize, + free: usize, + len: usize, + occupied_len: usize, + } + + #[derive(Debug)] + pub(super) enum Entry { + Vacant(usize), + Consumed, + Occupied(Node), + } + + #[derive(Debug)] + pub(super) struct Node { + item: Waker, + prev: usize, + next: usize, + } + + impl Waiters { + pub(super) fn new() -> Waiters { + Waiters { + entries: Vec::new(), + root: usize::MAX, + tail: usize::MAX, + free: 0, + len: 0, + occupied_len: 0, + } + } + + fn get_node(&mut self, key: usize) -> &mut Node { + if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) { + return node; + } + unreachable!() + } + + // consume root item + pub(super) fn consume(&mut self) -> Option { + if self.root != usize::MAX { + self.occupied_len -= 1; + let entry = + mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed); + + match entry { + Entry::Occupied(node) => { + debug_assert!(node.prev == usize::MAX); + + // last item + if self.tail == self.root { + self.tail = usize::MAX; + self.root = usize::MAX; + } else { + // remove from root + self.root = node.next; + if self.root != usize::MAX { + self.get_node(self.root).prev = usize::MAX; + } + } + Some(node.item) + } + _ => unreachable!(), + } + } else { + None + } + } + + pub(super) fn update(&mut self, idx: usize, val: Waker) -> bool { + let entry = self + .entries + .get_mut(idx) + .expect("Entry is expected to exist"); + match entry { + Entry::Occupied(ref mut node) => { + node.item = val; + false + } + Entry::Consumed => { + // append to the tail + *entry = Entry::Occupied(Node { + item: val, + prev: self.tail, + next: usize::MAX, + }); + + self.occupied_len += 1; + if self.root == usize::MAX { + self.root = idx; + } + if self.tail != usize::MAX { + self.get_node(self.tail).next = idx; + } + self.tail = idx; + true + } + Entry::Vacant(_) => unreachable!(), + } + } + + pub(super) fn remove(&mut self, key: usize) { + if let Some(entry) = self.entries.get_mut(key) { + // Swap the entry at the provided value + let entry = mem::replace(entry, Entry::Vacant(self.free)); + + self.len -= 1; + self.free = key; + + match entry { + Entry::Occupied(node) => { + self.occupied_len -= 1; + // remove from root - self.root = node.next; - if self.root != usize::MAX { - self.get_node(self.root).prev = usize::MAX; + if self.root == key { + self.root = node.next; + if self.root != usize::MAX { + self.get_node(self.root).prev = usize::MAX; + } + } + // remove from tail + if self.tail == key { + self.tail = node.prev; + if self.tail != usize::MAX { + self.get_node(self.tail).next = usize::MAX; + } } } - Some(node.item) + Entry::Consumed => {} + Entry::Vacant(_) => unreachable!(), } - _ => unreachable!(), - } - } else { - None - } - } - fn update(&mut self, idx: usize, val: Waker) -> bool { - let entry = self - .entries - .get_mut(idx) - .expect("Entry is expected to exist"); - match entry { - Entry::Occupied(ref mut node) => { - node.item = val; - false + if self.len == 0 { + self.entries.truncate(128); + } } - Entry::Consumed => { - // append to the tail - *entry = Entry::Occupied(Node { + } + + pub(super) fn append(&mut self, val: Waker) -> usize { + let idx = self.free; + + self.len += 1; + self.occupied_len += 1; + + // root points to first entry, append to empty list + if self.root == usize::MAX { + self.root = idx; + } + // tail points to last entry + if self.tail != usize::MAX { + self.get_node(self.tail).next = idx; + } + + // append item to entries, first free item is not allocated yet + if idx == self.entries.len() { + self.entries.push(Entry::Occupied(Node { + item: val, + prev: self.tail, + next: usize::MAX, + })); + self.tail = idx; + self.free = idx + 1; + } else { + // entries has enough capacity + self.free = match self.entries.get(idx) { + Some(&Entry::Vacant(next)) => next, + _ => unreachable!(), + }; + self.entries[idx] = Entry::Occupied(Node { item: val, prev: self.tail, next: usize::MAX, }); - - self.occupied_len += 1; - if self.root == usize::MAX { - self.root = idx; - } - if self.tail != usize::MAX { - self.get_node(self.tail).next = idx; - } self.tail = idx; - true - } - Entry::Vacant(_) => unreachable!(), - } - } - - fn remove(&mut self, key: usize) { - if let Some(entry) = self.entries.get_mut(key) { - // Swap the entry at the provided value - let entry = mem::replace(entry, Entry::Vacant(self.free)); - - self.len -= 1; - self.free = key; - - match entry { - Entry::Occupied(node) => { - self.occupied_len -= 1; - - // remove from root - if self.root == key { - self.root = node.next; - if self.root != usize::MAX { - self.get_node(self.root).prev = usize::MAX; - } - } - // remove from tail - if self.tail == key { - self.tail = node.prev; - if self.tail != usize::MAX { - self.get_node(self.tail).next = usize::MAX; - } - } - } - Entry::Consumed => {} - Entry::Vacant(_) => unreachable!(), } - if self.len == 0 { - self.entries.truncate(128); - } + idx } } - - fn append(&mut self, val: Waker) -> usize { - let idx = self.free; - - self.len += 1; - self.occupied_len += 1; - - // root points to first entry, append to empty list - if self.root == usize::MAX { - self.root = idx; - } - // tail points to last entry - if self.tail != usize::MAX { - self.get_node(self.tail).next = idx; - } - - // append item to entries, first free item is not allocated yet - if idx == self.entries.len() { - self.entries.push(Entry::Occupied(Node { - item: val, - prev: self.tail, - next: usize::MAX, - })); - self.tail = idx; - self.free = idx + 1; - } else { - // entries has enough capacity - self.free = match self.entries.get(idx) { - Some(&Entry::Vacant(next)) => next, - _ => unreachable!(), - }; - self.entries[idx] = Entry::Occupied(Node { - item: val, - prev: self.tail, - next: usize::MAX, - }); - self.tail = idx; - } - - idx - } }