Fix memory pool waiters management (#334)

This commit is contained in:
Nikolay Kim 2024-04-02 18:06:49 +05:00 committed by GitHub
parent 351f69919a
commit 975f64cc44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 101 additions and 101 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.1.25] (2024-04-02)
* Fix pool waiters management
## [0.1.24] (2024-02-01) ## [0.1.24] (2024-02-01)
* Add `checked` api * Add `checked` api

View file

@ -1,10 +1,10 @@
[package] [package]
name = "ntex-bytes" name = "ntex-bytes"
version = "0.1.24" version = "0.1.25"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Carl Lerche <me@carllerche.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Carl Lerche <me@carllerche.com>"]
description = "Types and traits for working with bytes (bytes crate fork)" description = "Types and traits for working with bytes (bytes crate fork)"
documentation = "https://docs.rs/ntex-bytes" documentation = "https://docs.rs/ntex-bytes"
repository = "https://github.com/ntex-rs/ntex.git" repository = "https://github.com/ntex-rs/ntex"
readme = "README.md" readme = "README.md"
keywords = ["buffers", "zero-copy", "io"] keywords = ["buffers", "zero-copy", "io"]
categories = ["network-programming", "data-structures"] categories = ["network-programming", "data-structures"]
@ -18,13 +18,15 @@ default = []
simd = ["simdutf8"] simd = ["simdutf8"]
[dependencies] [dependencies]
bitflags = "2.4" bitflags = "2"
bytes = "1" bytes = "1"
serde = "1" serde = "1"
futures-core = { version = "0.3", default-features = false, features = ["alloc"] } futures-core = { version = "0.3", default-features = false }
simdutf8 = { version = "0.1.4", optional = true } simdutf8 = { version = "0.1.4", optional = true }
backtrace = "*"
[dev-dependencies] [dev-dependencies]
serde_test = "1.0" serde_test = "1"
serde_json = "1.0" serde_json = "1"
ntex = { version = "1", features = ["tokio"] } ntex = { version = "1", features = ["tokio"] }

View file

@ -81,7 +81,7 @@ impl PoolId {
#[inline] #[inline]
pub fn pool(self) -> Pool { pub fn pool(self) -> Pool {
POOLS.with(|pools| Pool { POOLS.with(|pools| Pool {
idx: Cell::new(0), idx: Cell::new(usize::MAX),
inner: pools[self.0 as usize], inner: pools[self.0 as usize],
}) })
} }
@ -462,7 +462,7 @@ impl Clone for Pool {
#[inline] #[inline]
fn clone(&self) -> Pool { fn clone(&self) -> Pool {
Pool { Pool {
idx: Cell::new(0), idx: Cell::new(usize::MAX),
inner: self.inner, inner: self.inner,
} }
} }
@ -484,12 +484,10 @@ impl From<PoolRef> for Pool {
impl Drop for Pool { impl Drop for Pool {
fn drop(&mut self) { fn drop(&mut self) {
// cleanup waiter
let idx = self.idx.get(); let idx = self.idx.get();
if idx > 0 { if idx != usize::MAX {
// cleanup waiter self.inner.waiters.borrow_mut().remove(idx);
let mut waiters = self.inner.waiters.borrow_mut();
waiters.remove(idx - 1);
waiters.truncate();
} }
} }
} }
@ -515,10 +513,8 @@ impl Pool {
/// Check if pool is ready /// Check if pool is ready
pub fn is_ready(&self) -> bool { pub fn is_ready(&self) -> bool {
let idx = self.idx.get(); let idx = self.idx.get();
if idx > 0 { if idx != usize::MAX {
if let Some(Entry::Occupied(_)) = if let Some(Entry::Occupied(_)) = self.inner.waiters.borrow().entries.get(idx) {
self.inner.waiters.borrow().entries.get(idx - 1)
{
return false; return false;
} }
} }
@ -543,26 +539,26 @@ impl Pool {
let allocated = self.inner.size.load(Relaxed); let allocated = self.inner.size.load(Relaxed);
if allocated < window_l { if allocated < window_l {
let idx = self.idx.get(); let idx = self.idx.get();
if idx > 0 { if idx != usize::MAX {
// cleanup waiter // cleanup waiter
let mut waiters = self.inner.waiters.borrow_mut(); self.inner.waiters.borrow_mut().remove(idx);
waiters.remove(idx - 1); self.idx.set(usize::MAX);
waiters.truncate();
self.idx.set(0);
} }
return Poll::Ready(()); return Poll::Ready(());
} }
// register waiter only if spawn fn is provided // register waiter only if spawn fn is provided
if let Some(spawn) = &*self.inner.spawn.borrow() { if let Some(spawn) = &*self.inner.spawn.borrow() {
let idx = self.idx.get();
let mut flags = self.inner.flags.get(); let mut flags = self.inner.flags.get();
let mut waiters = self.inner.waiters.borrow_mut(); let mut waiters = self.inner.waiters.borrow_mut();
let new = if idx == 0 { let new = {
self.idx.set(waiters.append(ctx.waker().clone()) + 1); let idx = self.idx.get();
true if idx == usize::MAX {
} else { self.idx.set(waiters.append(ctx.waker().clone()));
waiters.update(idx - 1, ctx.waker().clone()) true
} else {
waiters.update(idx, ctx.waker().clone())
}
}; };
// if memory usage has increased since last window change, // if memory usage has increased since last window change,
@ -600,7 +596,7 @@ impl Driver {
fn release(&self, waiters_num: usize) { fn release(&self, waiters_num: usize) {
let mut waiters = self.pool.waiters.borrow_mut(); let mut waiters = self.pool.waiters.borrow_mut();
let mut to_release = waiters.occupied_len / 100 * 5; let mut to_release = waiters.occupied_len >> 4;
if waiters_num > to_release { if waiters_num > to_release {
to_release += waiters_num >> 1; to_release += waiters_num >> 1;
} else { } else {
@ -654,7 +650,7 @@ impl Future for Driver {
pool.flags.set(Flags::INCREASED); pool.flags.set(Flags::INCREASED);
return Poll::Ready(()); return Poll::Ready(());
} else { } else {
// release 5% of pending waiters // release 6% of pending waiters
self.release(waiters); self.release(waiters);
if allocated > windows[idx].0 { if allocated > windows[idx].0 {
@ -725,15 +721,6 @@ impl Waiters {
} }
} }
fn truncate(&mut self) {
if self.len == 0 {
self.entries.truncate(0);
self.root = usize::MAX;
self.tail = usize::MAX;
self.free = 0;
}
}
fn get_node(&mut self, key: usize) -> &mut Node { fn get_node(&mut self, key: usize) -> &mut Node {
if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) { if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) {
return node; return node;
@ -745,11 +732,10 @@ impl Waiters {
fn consume(&mut self) -> Option<Waker> { fn consume(&mut self) -> Option<Waker> {
if self.root != usize::MAX { if self.root != usize::MAX {
self.occupied_len -= 1; self.occupied_len -= 1;
let entry =
mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed);
let entry = self.entries.get_mut(self.root).unwrap(); match entry {
let prev = mem::replace(entry, Entry::Consumed);
match prev {
Entry::Occupied(node) => { Entry::Occupied(node) => {
debug_assert!(node.prev == usize::MAX); debug_assert!(node.prev == usize::MAX);
@ -760,57 +746,63 @@ impl Waiters {
} else { } else {
// remove from root // remove from root
self.root = node.next; self.root = node.next;
self.get_node(self.root).prev = usize::MAX; if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
}
} }
Some(node.item) Some(node.item)
} }
_ => { _ => unreachable!(),
unreachable!()
}
} }
} else { } else {
None None
} }
} }
fn update(&mut self, key: usize, val: Waker) -> bool { fn update(&mut self, idx: usize, val: Waker) -> bool {
if let Some(entry) = self.entries.get_mut(key) { let entry = self
match entry { .entries
Entry::Occupied(ref mut node) => { .get_mut(idx)
node.item = val; .expect("Entry is expected to exist");
return false; match entry {
} Entry::Occupied(ref mut node) => {
Entry::Consumed => { node.item = val;
*entry = Entry::Occupied(Node { false
item: val,
prev: self.tail,
next: usize::MAX,
});
}
_ => unreachable!(),
} }
Entry::Consumed => {
// append to the tail
*entry = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
self.occupied_len += 1;
if self.root == usize::MAX {
self.root = idx;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}
self.tail = idx;
true
}
Entry::Vacant(_) => unreachable!(),
} }
self.occupied_len += 1;
if self.root == usize::MAX {
self.root = key;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.tail = key;
true
} }
fn remove(&mut self, key: usize) { fn remove(&mut self, key: usize) {
if let Some(entry) = self.entries.get_mut(key) { if let Some(entry) = self.entries.get_mut(key) {
// Swap the entry at the provided value // Swap the entry at the provided value
let prev = mem::replace(entry, Entry::Vacant(self.free)); let entry = mem::replace(entry, Entry::Vacant(self.free));
match prev { self.len -= 1;
self.free = key;
match entry {
Entry::Occupied(node) => { Entry::Occupied(node) => {
self.len -= 1;
self.occupied_len -= 1; self.occupied_len -= 1;
self.free = key;
// remove from root // remove from root
if self.root == key { if self.root == key {
self.root = node.next; self.root = node.next;
@ -826,52 +818,54 @@ impl Waiters {
} }
} }
} }
Entry::Consumed => { Entry::Consumed => {}
self.len -= 1; Entry::Vacant(_) => unreachable!(),
self.free = key; }
}
_ => { if self.len == 0 {
unreachable!() self.entries.truncate(128);
}
} }
} }
} }
fn append(&mut self, val: Waker) -> usize { fn append(&mut self, val: Waker) -> usize {
let idx = self.free;
self.len += 1; self.len += 1;
self.occupied_len += 1; self.occupied_len += 1;
let key = self.free;
if key == self.entries.len() { // root points to first entry, append to empty list
if self.root == usize::MAX { if self.root == usize::MAX {
self.root = key; self.root = idx;
} }
if self.tail != usize::MAX { // tail points to last entry
self.get_node(self.tail).next = key; if self.tail != usize::MAX {
} self.get_node(self.tail).next = idx;
}
// append item to entries, first free item is not allocated yet
if idx == self.entries.len() {
self.entries.push(Entry::Occupied(Node { self.entries.push(Entry::Occupied(Node {
item: val, item: val,
prev: self.tail, prev: self.tail,
next: usize::MAX, next: usize::MAX,
})); }));
self.tail = key; self.tail = idx;
self.free = key + 1; self.free = idx + 1;
} else { } else {
self.free = match self.entries.get(key) { // entries has enough capacity
self.free = match self.entries.get(idx) {
Some(&Entry::Vacant(next)) => next, Some(&Entry::Vacant(next)) => next,
_ => unreachable!(), _ => unreachable!(),
}; };
if self.tail != usize::MAX { self.entries[idx] = Entry::Occupied(Node {
self.get_node(self.tail).next = key;
}
self.entries[key] = Entry::Occupied(Node {
item: val, item: val,
prev: self.tail, prev: self.tail,
next: usize::MAX, next: usize::MAX,
}); });
self.tail = key; self.tail = idx;
} }
key
idx
} }
} }

View file

@ -5,7 +5,7 @@ authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
keywords = ["ntex", "networking", "framework", "async", "futures"] keywords = ["ntex", "networking", "framework", "async", "futures"]
repository = "https://github.com/ntex-rs/ntex.git" repository = "https://github.com/ntex-rs/ntex"
documentation = "https://docs.rs/ntex/" documentation = "https://docs.rs/ntex/"
categories = [ categories = [
"network-programming", "network-programming",
@ -63,7 +63,7 @@ ntex-router = "0.5.3"
ntex-service = "2.0.1" ntex-service = "2.0.1"
ntex-macros = "0.1.3" ntex-macros = "0.1.3"
ntex-util = "1.0.1" ntex-util = "1.0.1"
ntex-bytes = "0.1.24" ntex-bytes = "0.1.25"
ntex-server = "1.0.5" ntex-server = "1.0.5"
ntex-h2 = "0.5.2" ntex-h2 = "0.5.2"
ntex-rt = "0.4.12" ntex-rt = "0.4.12"