diff --git a/ntex-bytes/CHANGELOG.md b/ntex-bytes/CHANGELOG.md index d838b8c9..d7b3b277 100644 --- a/ntex-bytes/CHANGELOG.md +++ b/ntex-bytes/CHANGELOG.md @@ -1,5 +1,9 @@ # Changes +## [0.1.25] (2024-04-02) + +* Fix pool waiters management + ## [0.1.24] (2024-02-01) * Add `checked` api diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index 5a91e288..b406fba2 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "ntex-bytes" -version = "0.1.24" +version = "0.1.25" authors = ["Nikolay Kim ", "Carl Lerche "] description = "Types and traits for working with bytes (bytes crate fork)" documentation = "https://docs.rs/ntex-bytes" -repository = "https://github.com/ntex-rs/ntex.git" +repository = "https://github.com/ntex-rs/ntex" readme = "README.md" keywords = ["buffers", "zero-copy", "io"] categories = ["network-programming", "data-structures"] @@ -18,13 +18,15 @@ default = [] simd = ["simdutf8"] [dependencies] -bitflags = "2.4" +bitflags = "2" bytes = "1" serde = "1" -futures-core = { version = "0.3", default-features = false, features = ["alloc"] } +futures-core = { version = "0.3", default-features = false } simdutf8 = { version = "0.1.4", optional = true } +backtrace = "*" + [dev-dependencies] -serde_test = "1.0" -serde_json = "1.0" +serde_test = "1" +serde_json = "1" ntex = { version = "1", features = ["tokio"] } diff --git a/ntex-bytes/src/pool.rs b/ntex-bytes/src/pool.rs index c09ff137..3b9e46b4 100644 --- a/ntex-bytes/src/pool.rs +++ b/ntex-bytes/src/pool.rs @@ -81,7 +81,7 @@ impl PoolId { #[inline] pub fn pool(self) -> Pool { POOLS.with(|pools| Pool { - idx: Cell::new(0), + idx: Cell::new(usize::MAX), inner: pools[self.0 as usize], }) } @@ -462,7 +462,7 @@ impl Clone for Pool { #[inline] fn clone(&self) -> Pool { Pool { - idx: Cell::new(0), + idx: Cell::new(usize::MAX), inner: self.inner, } } @@ -484,12 +484,10 @@ impl From for Pool { impl Drop for Pool { fn drop(&mut self) { + // cleanup waiter let idx = self.idx.get(); - if idx > 0 { - // cleanup waiter - let mut waiters = self.inner.waiters.borrow_mut(); - waiters.remove(idx - 1); - waiters.truncate(); + if idx != usize::MAX { + self.inner.waiters.borrow_mut().remove(idx); } } } @@ -515,10 +513,8 @@ impl Pool { /// Check if pool is ready pub fn is_ready(&self) -> bool { let idx = self.idx.get(); - if idx > 0 { - if let Some(Entry::Occupied(_)) = - self.inner.waiters.borrow().entries.get(idx - 1) - { + if idx != usize::MAX { + if let Some(Entry::Occupied(_)) = self.inner.waiters.borrow().entries.get(idx) { return false; } } @@ -543,26 +539,26 @@ impl Pool { let allocated = self.inner.size.load(Relaxed); if allocated < window_l { let idx = self.idx.get(); - if idx > 0 { + if idx != usize::MAX { // cleanup waiter - let mut waiters = self.inner.waiters.borrow_mut(); - waiters.remove(idx - 1); - waiters.truncate(); - self.idx.set(0); + self.inner.waiters.borrow_mut().remove(idx); + self.idx.set(usize::MAX); } return Poll::Ready(()); } // register waiter only if spawn fn is provided if let Some(spawn) = &*self.inner.spawn.borrow() { - let idx = self.idx.get(); let mut flags = self.inner.flags.get(); let mut waiters = self.inner.waiters.borrow_mut(); - let new = if idx == 0 { - self.idx.set(waiters.append(ctx.waker().clone()) + 1); - true - } else { - waiters.update(idx - 1, ctx.waker().clone()) + let new = { + let idx = self.idx.get(); + if idx == usize::MAX { + self.idx.set(waiters.append(ctx.waker().clone())); + true + } else { + waiters.update(idx, ctx.waker().clone()) + } }; // if memory usage has increased since last window change, @@ -600,7 +596,7 @@ impl Driver { fn release(&self, waiters_num: usize) { let mut waiters = self.pool.waiters.borrow_mut(); - let mut to_release = waiters.occupied_len / 100 * 5; + let mut to_release = waiters.occupied_len >> 4; if waiters_num > to_release { to_release += waiters_num >> 1; } else { @@ -654,7 +650,7 @@ impl Future for Driver { pool.flags.set(Flags::INCREASED); return Poll::Ready(()); } else { - // release 5% of pending waiters + // release 6% of pending waiters self.release(waiters); if allocated > windows[idx].0 { @@ -725,15 +721,6 @@ impl Waiters { } } - fn truncate(&mut self) { - if self.len == 0 { - self.entries.truncate(0); - self.root = usize::MAX; - self.tail = usize::MAX; - self.free = 0; - } - } - fn get_node(&mut self, key: usize) -> &mut Node { if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) { return node; @@ -745,11 +732,10 @@ impl Waiters { fn consume(&mut self) -> Option { if self.root != usize::MAX { self.occupied_len -= 1; + let entry = + mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed); - let entry = self.entries.get_mut(self.root).unwrap(); - let prev = mem::replace(entry, Entry::Consumed); - - match prev { + match entry { Entry::Occupied(node) => { debug_assert!(node.prev == usize::MAX); @@ -760,57 +746,63 @@ impl Waiters { } else { // remove from root self.root = node.next; - self.get_node(self.root).prev = usize::MAX; + if self.root != usize::MAX { + self.get_node(self.root).prev = usize::MAX; + } } Some(node.item) } - _ => { - unreachable!() - } + _ => unreachable!(), } } else { None } } - fn update(&mut self, key: usize, val: Waker) -> bool { - if let Some(entry) = self.entries.get_mut(key) { - match entry { - Entry::Occupied(ref mut node) => { - node.item = val; - return false; - } - Entry::Consumed => { - *entry = Entry::Occupied(Node { - item: val, - prev: self.tail, - next: usize::MAX, - }); - } - _ => unreachable!(), + fn update(&mut self, idx: usize, val: Waker) -> bool { + let entry = self + .entries + .get_mut(idx) + .expect("Entry is expected to exist"); + match entry { + Entry::Occupied(ref mut node) => { + node.item = val; + false } + Entry::Consumed => { + // append to the tail + *entry = Entry::Occupied(Node { + item: val, + prev: self.tail, + next: usize::MAX, + }); + + self.occupied_len += 1; + if self.root == usize::MAX { + self.root = idx; + } + if self.tail != usize::MAX { + self.get_node(self.tail).next = idx; + } + self.tail = idx; + true + } + Entry::Vacant(_) => unreachable!(), } - self.occupied_len += 1; - if self.root == usize::MAX { - self.root = key; - } - if self.tail != usize::MAX { - self.get_node(self.tail).next = key; - } - self.tail = key; - true } fn remove(&mut self, key: usize) { if let Some(entry) = self.entries.get_mut(key) { // Swap the entry at the provided value - let prev = mem::replace(entry, Entry::Vacant(self.free)); + let entry = mem::replace(entry, Entry::Vacant(self.free)); - match prev { + self.len -= 1; + self.free = key; + + match entry { Entry::Occupied(node) => { - self.len -= 1; self.occupied_len -= 1; - self.free = key; + // remove from root if self.root == key { self.root = node.next; @@ -826,52 +818,54 @@ impl Waiters { } } } - Entry::Consumed => { - self.len -= 1; - self.free = key; - } - _ => { - unreachable!() - } + Entry::Consumed => {} + Entry::Vacant(_) => unreachable!(), + } + + if self.len == 0 { + self.entries.truncate(128); } } } fn append(&mut self, val: Waker) -> usize { + let idx = self.free; + self.len += 1; self.occupied_len += 1; - let key = self.free; - if key == self.entries.len() { - if self.root == usize::MAX { - self.root = key; - } - if self.tail != usize::MAX { - self.get_node(self.tail).next = key; - } + // root points to first entry, append to empty list + if self.root == usize::MAX { + self.root = idx; + } + // tail points to last entry + if self.tail != usize::MAX { + self.get_node(self.tail).next = idx; + } + // append item to entries, first free item is not allocated yet + if idx == self.entries.len() { self.entries.push(Entry::Occupied(Node { item: val, prev: self.tail, next: usize::MAX, })); - self.tail = key; - self.free = key + 1; + self.tail = idx; + self.free = idx + 1; } else { - self.free = match self.entries.get(key) { + // entries has enough capacity + self.free = match self.entries.get(idx) { Some(&Entry::Vacant(next)) => next, _ => unreachable!(), }; - if self.tail != usize::MAX { - self.get_node(self.tail).next = key; - } - self.entries[key] = Entry::Occupied(Node { + self.entries[idx] = Entry::Occupied(Node { item: val, prev: self.tail, next: usize::MAX, }); - self.tail = key; + self.tail = idx; } - key + + idx } } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index fc6c340a..1b31d63a 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -5,7 +5,7 @@ authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" keywords = ["ntex", "networking", "framework", "async", "futures"] -repository = "https://github.com/ntex-rs/ntex.git" +repository = "https://github.com/ntex-rs/ntex" documentation = "https://docs.rs/ntex/" categories = [ "network-programming", @@ -63,7 +63,7 @@ ntex-router = "0.5.3" ntex-service = "2.0.1" ntex-macros = "0.1.3" ntex-util = "1.0.1" -ntex-bytes = "0.1.24" +ntex-bytes = "0.1.25" ntex-server = "1.0.5" ntex-h2 = "0.5.2" ntex-rt = "0.4.12"